ec_encoder.go 8.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306
  1. package erasure_coding
  2. import (
  3. "fmt"
  4. "io"
  5. "os"
  6. "github.com/klauspost/reedsolomon"
  7. "github.com/chrislusf/seaweedfs/weed/glog"
  8. "github.com/chrislusf/seaweedfs/weed/storage/idx"
  9. "github.com/chrislusf/seaweedfs/weed/storage/needle_map"
  10. "github.com/chrislusf/seaweedfs/weed/storage/types"
  11. "github.com/chrislusf/seaweedfs/weed/util"
  12. )
  13. const (
  14. DataShardsCount = 10
  15. ParityShardsCount = 4
  16. TotalShardsCount = DataShardsCount + ParityShardsCount
  17. ErasureCodingLargeBlockSize = 1024 * 1024 * 1024 // 1GB
  18. ErasureCodingSmallBlockSize = 1024 * 1024 // 1MB
  19. )
  20. // WriteSortedFileFromIdx generates .ecx file from existing .idx file
  21. // all keys are sorted in ascending order
  22. func WriteSortedFileFromIdx(baseFileName string, ext string) (e error) {
  23. nm, err := readNeedleMap(baseFileName)
  24. if nm != nil {
  25. defer nm.Close()
  26. }
  27. if err != nil {
  28. return fmt.Errorf("readNeedleMap: %v", err)
  29. }
  30. ecxFile, err := os.OpenFile(baseFileName+ext, os.O_TRUNC|os.O_CREATE|os.O_WRONLY, 0644)
  31. if err != nil {
  32. return fmt.Errorf("failed to open ecx file: %v", err)
  33. }
  34. defer ecxFile.Close()
  35. err = nm.AscendingVisit(func(value needle_map.NeedleValue) error {
  36. bytes := value.ToBytes()
  37. _, writeErr := ecxFile.Write(bytes)
  38. return writeErr
  39. })
  40. if err != nil {
  41. return fmt.Errorf("failed to visit idx file: %v", err)
  42. }
  43. return nil
  44. }
  45. // WriteEcFiles generates .ec00 ~ .ec13 files
  46. func WriteEcFiles(baseFileName string) error {
  47. return generateEcFiles(baseFileName, 256*1024, ErasureCodingLargeBlockSize, ErasureCodingSmallBlockSize)
  48. }
  49. func RebuildEcFiles(baseFileName string) ([]uint32, error) {
  50. return generateMissingEcFiles(baseFileName, 256*1024, ErasureCodingLargeBlockSize, ErasureCodingSmallBlockSize)
  51. }
  52. func ToExt(ecIndex int) string {
  53. return fmt.Sprintf(".ec%02d", ecIndex)
  54. }
  55. func generateEcFiles(baseFileName string, bufferSize int, largeBlockSize int64, smallBlockSize int64) error {
  56. file, err := os.OpenFile(baseFileName+".dat", os.O_RDONLY, 0)
  57. if err != nil {
  58. return fmt.Errorf("failed to open dat file: %v", err)
  59. }
  60. defer file.Close()
  61. fi, err := file.Stat()
  62. if err != nil {
  63. return fmt.Errorf("failed to stat dat file: %v", err)
  64. }
  65. glog.V(0).Infof("encodeDatFile %s.dat size:%d", baseFileName, fi.Size())
  66. err = encodeDatFile(fi.Size(), err, baseFileName, bufferSize, largeBlockSize, file, smallBlockSize)
  67. if err != nil {
  68. return fmt.Errorf("encodeDatFile: %v", err)
  69. }
  70. return nil
  71. }
  72. func generateMissingEcFiles(baseFileName string, bufferSize int, largeBlockSize int64, smallBlockSize int64) (generatedShardIds []uint32, err error) {
  73. shardHasData := make([]bool, TotalShardsCount)
  74. inputFiles := make([]*os.File, TotalShardsCount)
  75. outputFiles := make([]*os.File, TotalShardsCount)
  76. for shardId := 0; shardId < TotalShardsCount; shardId++ {
  77. shardFileName := baseFileName + ToExt(shardId)
  78. if util.FileExists(shardFileName) {
  79. shardHasData[shardId] = true
  80. inputFiles[shardId], err = os.OpenFile(shardFileName, os.O_RDONLY, 0)
  81. if err != nil {
  82. return nil, err
  83. }
  84. defer inputFiles[shardId].Close()
  85. } else {
  86. outputFiles[shardId], err = os.OpenFile(shardFileName, os.O_TRUNC|os.O_WRONLY|os.O_CREATE, 0644)
  87. if err != nil {
  88. return nil, err
  89. }
  90. defer outputFiles[shardId].Close()
  91. generatedShardIds = append(generatedShardIds, uint32(shardId))
  92. }
  93. }
  94. err = rebuildEcFiles(shardHasData, inputFiles, outputFiles)
  95. if err != nil {
  96. return nil, fmt.Errorf("rebuildEcFiles: %v", err)
  97. }
  98. return
  99. }
  100. func encodeData(file *os.File, enc reedsolomon.Encoder, startOffset, blockSize int64, buffers [][]byte, outputs []*os.File) error {
  101. bufferSize := int64(len(buffers[0]))
  102. batchCount := blockSize / bufferSize
  103. if blockSize%bufferSize != 0 {
  104. glog.Fatalf("unexpected block size %d buffer size %d", blockSize, bufferSize)
  105. }
  106. for b := int64(0); b < batchCount; b++ {
  107. err := encodeDataOneBatch(file, enc, startOffset+b*bufferSize, blockSize, buffers, outputs)
  108. if err != nil {
  109. return err
  110. }
  111. }
  112. return nil
  113. }
  114. func openEcFiles(baseFileName string, forRead bool) (files []*os.File, err error) {
  115. for i := 0; i < TotalShardsCount; i++ {
  116. fname := baseFileName + ToExt(i)
  117. openOption := os.O_TRUNC | os.O_CREATE | os.O_WRONLY
  118. if forRead {
  119. openOption = os.O_RDONLY
  120. }
  121. f, err := os.OpenFile(fname, openOption, 0644)
  122. if err != nil {
  123. return files, fmt.Errorf("failed to open file %s: %v", fname, err)
  124. }
  125. files = append(files, f)
  126. }
  127. return
  128. }
  129. func closeEcFiles(files []*os.File) {
  130. for _, f := range files {
  131. if f != nil {
  132. f.Close()
  133. }
  134. }
  135. }
  136. func encodeDataOneBatch(file *os.File, enc reedsolomon.Encoder, startOffset, blockSize int64, buffers [][]byte, outputs []*os.File) error {
  137. // read data into buffers
  138. for i := 0; i < DataShardsCount; i++ {
  139. n, err := file.ReadAt(buffers[i], startOffset+blockSize*int64(i))
  140. if err != nil {
  141. if err != io.EOF {
  142. return err
  143. }
  144. }
  145. if n < len(buffers[i]) {
  146. for t := len(buffers[i]) - 1; t >= n; t-- {
  147. buffers[i][t] = 0
  148. }
  149. }
  150. }
  151. err := enc.Encode(buffers)
  152. if err != nil {
  153. return err
  154. }
  155. for i := 0; i < TotalShardsCount; i++ {
  156. _, err := outputs[i].Write(buffers[i])
  157. if err != nil {
  158. return err
  159. }
  160. }
  161. return nil
  162. }
  163. func encodeDatFile(remainingSize int64, err error, baseFileName string, bufferSize int, largeBlockSize int64, file *os.File, smallBlockSize int64) error {
  164. var processedSize int64
  165. enc, err := reedsolomon.New(DataShardsCount, ParityShardsCount)
  166. if err != nil {
  167. return fmt.Errorf("failed to create encoder: %v", err)
  168. }
  169. buffers := make([][]byte, TotalShardsCount)
  170. for i := range buffers {
  171. buffers[i] = make([]byte, bufferSize)
  172. }
  173. outputs, err := openEcFiles(baseFileName, false)
  174. defer closeEcFiles(outputs)
  175. if err != nil {
  176. return fmt.Errorf("failed to open ec files %s: %v", baseFileName, err)
  177. }
  178. for remainingSize > largeBlockSize*DataShardsCount {
  179. err = encodeData(file, enc, processedSize, largeBlockSize, buffers, outputs)
  180. if err != nil {
  181. return fmt.Errorf("failed to encode large chunk data: %v", err)
  182. }
  183. remainingSize -= largeBlockSize * DataShardsCount
  184. processedSize += largeBlockSize * DataShardsCount
  185. }
  186. for remainingSize > 0 {
  187. encodeData(file, enc, processedSize, smallBlockSize, buffers, outputs)
  188. if err != nil {
  189. return fmt.Errorf("failed to encode small chunk data: %v", err)
  190. }
  191. remainingSize -= smallBlockSize * DataShardsCount
  192. processedSize += smallBlockSize * DataShardsCount
  193. }
  194. return nil
  195. }
  196. func rebuildEcFiles(shardHasData []bool, inputFiles []*os.File, outputFiles []*os.File) error {
  197. enc, err := reedsolomon.New(DataShardsCount, ParityShardsCount)
  198. if err != nil {
  199. return fmt.Errorf("failed to create encoder: %v", err)
  200. }
  201. buffers := make([][]byte, TotalShardsCount)
  202. for i := range buffers {
  203. if shardHasData[i] {
  204. buffers[i] = make([]byte, ErasureCodingSmallBlockSize)
  205. }
  206. }
  207. var startOffset int64
  208. var inputBufferDataSize int
  209. for {
  210. // read the input data from files
  211. for i := 0; i < TotalShardsCount; i++ {
  212. if shardHasData[i] {
  213. n, _ := inputFiles[i].ReadAt(buffers[i], startOffset)
  214. if n == 0 {
  215. return nil
  216. }
  217. if inputBufferDataSize == 0 {
  218. inputBufferDataSize = n
  219. }
  220. if inputBufferDataSize != n {
  221. return fmt.Errorf("ec shard size expected %d actual %d", inputBufferDataSize, n)
  222. }
  223. } else {
  224. buffers[i] = nil
  225. }
  226. }
  227. // encode the data
  228. err = enc.Reconstruct(buffers)
  229. if err != nil {
  230. return fmt.Errorf("reconstruct: %v", err)
  231. }
  232. // write the data to output files
  233. for i := 0; i < TotalShardsCount; i++ {
  234. if !shardHasData[i] {
  235. n, _ := outputFiles[i].WriteAt(buffers[i][:inputBufferDataSize], startOffset)
  236. if inputBufferDataSize != n {
  237. return fmt.Errorf("fail to write to %s", outputFiles[i].Name())
  238. }
  239. }
  240. }
  241. startOffset += int64(inputBufferDataSize)
  242. }
  243. }
  244. func readNeedleMap(baseFileName string) (*needle_map.MemDb, error) {
  245. indexFile, err := os.OpenFile(baseFileName+".idx", os.O_RDONLY, 0644)
  246. if err != nil {
  247. return nil, fmt.Errorf("cannot read Volume Index %s.idx: %v", baseFileName, err)
  248. }
  249. defer indexFile.Close()
  250. cm := needle_map.NewMemDb()
  251. err = idx.WalkIndexFile(indexFile, func(key types.NeedleId, offset types.Offset, size uint32) error {
  252. if !offset.IsZero() && size != types.TombstoneFileSize {
  253. cm.Set(key, offset, size)
  254. } else {
  255. cm.Delete(key)
  256. }
  257. return nil
  258. })
  259. return cm, err
  260. }