ec_volume.go 6.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228
  1. package erasure_coding
  2. import (
  3. "errors"
  4. "fmt"
  5. "math"
  6. "os"
  7. "sort"
  8. "sync"
  9. "time"
  10. "github.com/chrislusf/seaweedfs/weed/pb"
  11. "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
  12. "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
  13. "github.com/chrislusf/seaweedfs/weed/storage/idx"
  14. "github.com/chrislusf/seaweedfs/weed/storage/needle"
  15. "github.com/chrislusf/seaweedfs/weed/storage/types"
  16. )
  17. var (
  18. NotFoundError = errors.New("needle not found")
  19. )
  20. type EcVolume struct {
  21. VolumeId needle.VolumeId
  22. Collection string
  23. dir string
  24. ecxFile *os.File
  25. ecxFileSize int64
  26. ecxCreatedAt time.Time
  27. Shards []*EcVolumeShard
  28. ShardLocations map[ShardId][]string
  29. ShardLocationsRefreshTime time.Time
  30. ShardLocationsLock sync.RWMutex
  31. Version needle.Version
  32. ecjFile *os.File
  33. ecjFileAccessLock sync.Mutex
  34. }
  35. func NewEcVolume(dir string, collection string, vid needle.VolumeId) (ev *EcVolume, err error) {
  36. ev = &EcVolume{dir: dir, Collection: collection, VolumeId: vid}
  37. baseFileName := EcShardFileName(collection, dir, int(vid))
  38. // open ecx file
  39. if ev.ecxFile, err = os.OpenFile(baseFileName+".ecx", os.O_RDWR, 0644); err != nil {
  40. return nil, fmt.Errorf("cannot open ec volume index %s.ecx: %v", baseFileName, err)
  41. }
  42. ecxFi, statErr := ev.ecxFile.Stat()
  43. if statErr != nil {
  44. return nil, fmt.Errorf("can not stat ec volume index %s.ecx: %v", baseFileName, statErr)
  45. }
  46. ev.ecxFileSize = ecxFi.Size()
  47. ev.ecxCreatedAt = ecxFi.ModTime()
  48. // open ecj file
  49. if ev.ecjFile, err = os.OpenFile(baseFileName+".ecj", os.O_RDWR|os.O_CREATE, 0644); err != nil {
  50. return nil, fmt.Errorf("cannot open ec volume journal %s.ecj: %v", baseFileName, err)
  51. }
  52. // read volume info
  53. ev.Version = needle.Version3
  54. if volumeInfo, found, _ := pb.MaybeLoadVolumeInfo(baseFileName + ".vif"); found {
  55. ev.Version = needle.Version(volumeInfo.Version)
  56. } else {
  57. pb.SaveVolumeInfo(baseFileName+".vif", &volume_server_pb.VolumeInfo{Version: uint32(ev.Version)})
  58. }
  59. ev.ShardLocations = make(map[ShardId][]string)
  60. return
  61. }
  62. func (ev *EcVolume) AddEcVolumeShard(ecVolumeShard *EcVolumeShard) bool {
  63. for _, s := range ev.Shards {
  64. if s.ShardId == ecVolumeShard.ShardId {
  65. return false
  66. }
  67. }
  68. ev.Shards = append(ev.Shards, ecVolumeShard)
  69. sort.Slice(ev.Shards, func(i, j int) bool {
  70. return ev.Shards[i].VolumeId < ev.Shards[j].VolumeId ||
  71. ev.Shards[i].VolumeId == ev.Shards[j].VolumeId && ev.Shards[i].ShardId < ev.Shards[j].ShardId
  72. })
  73. return true
  74. }
  75. func (ev *EcVolume) DeleteEcVolumeShard(shardId ShardId) (ecVolumeShard *EcVolumeShard, deleted bool) {
  76. foundPosition := -1
  77. for i, s := range ev.Shards {
  78. if s.ShardId == shardId {
  79. foundPosition = i
  80. }
  81. }
  82. if foundPosition < 0 {
  83. return nil, false
  84. }
  85. ecVolumeShard = ev.Shards[foundPosition]
  86. ev.Shards = append(ev.Shards[:foundPosition], ev.Shards[foundPosition+1:]...)
  87. return ecVolumeShard, true
  88. }
  89. func (ev *EcVolume) FindEcVolumeShard(shardId ShardId) (ecVolumeShard *EcVolumeShard, found bool) {
  90. for _, s := range ev.Shards {
  91. if s.ShardId == shardId {
  92. return s, true
  93. }
  94. }
  95. return nil, false
  96. }
  97. func (ev *EcVolume) Close() {
  98. for _, s := range ev.Shards {
  99. s.Close()
  100. }
  101. if ev.ecjFile != nil {
  102. ev.ecjFileAccessLock.Lock()
  103. _ = ev.ecjFile.Close()
  104. ev.ecjFile = nil
  105. ev.ecjFileAccessLock.Unlock()
  106. }
  107. if ev.ecxFile != nil {
  108. _ = ev.ecxFile.Close()
  109. ev.ecxFile = nil
  110. }
  111. }
  112. func (ev *EcVolume) Destroy() {
  113. ev.Close()
  114. for _, s := range ev.Shards {
  115. s.Destroy()
  116. }
  117. os.Remove(ev.FileName() + ".ecx")
  118. os.Remove(ev.FileName() + ".ecj")
  119. os.Remove(ev.FileName() + ".vif")
  120. }
  121. func (ev *EcVolume) FileName() string {
  122. return EcShardFileName(ev.Collection, ev.dir, int(ev.VolumeId))
  123. }
  124. func (ev *EcVolume) ShardSize() int64 {
  125. if len(ev.Shards) > 0 {
  126. return ev.Shards[0].Size()
  127. }
  128. return 0
  129. }
  130. func (ev *EcVolume) CreatedAt() time.Time {
  131. return ev.ecxCreatedAt
  132. }
  133. func (ev *EcVolume) ShardIdList() (shardIds []ShardId) {
  134. for _, s := range ev.Shards {
  135. shardIds = append(shardIds, s.ShardId)
  136. }
  137. return
  138. }
  139. func (ev *EcVolume) ToVolumeEcShardInformationMessage() (messages []*master_pb.VolumeEcShardInformationMessage) {
  140. prevVolumeId := needle.VolumeId(math.MaxUint32)
  141. var m *master_pb.VolumeEcShardInformationMessage
  142. for _, s := range ev.Shards {
  143. if s.VolumeId != prevVolumeId {
  144. m = &master_pb.VolumeEcShardInformationMessage{
  145. Id: uint32(s.VolumeId),
  146. Collection: s.Collection,
  147. }
  148. messages = append(messages, m)
  149. }
  150. prevVolumeId = s.VolumeId
  151. m.EcIndexBits = uint32(ShardBits(m.EcIndexBits).AddShardId(s.ShardId))
  152. }
  153. return
  154. }
  155. func (ev *EcVolume) LocateEcShardNeedle(needleId types.NeedleId, version needle.Version) (offset types.Offset, size uint32, intervals []Interval, err error) {
  156. // find the needle from ecx file
  157. offset, size, err = ev.FindNeedleFromEcx(needleId)
  158. if err != nil {
  159. return types.Offset{}, 0, nil, fmt.Errorf("FindNeedleFromEcx: %v", err)
  160. }
  161. shard := ev.Shards[0]
  162. // calculate the locations in the ec shards
  163. intervals = LocateData(ErasureCodingLargeBlockSize, ErasureCodingSmallBlockSize, DataShardsCount*shard.ecdFileSize, offset.ToAcutalOffset(), uint32(needle.GetActualSize(size, version)))
  164. return
  165. }
  166. func (ev *EcVolume) FindNeedleFromEcx(needleId types.NeedleId) (offset types.Offset, size uint32, err error) {
  167. return SearchNeedleFromSortedIndex(ev.ecxFile, ev.ecxFileSize, needleId, nil)
  168. }
  169. func SearchNeedleFromSortedIndex(ecxFile *os.File, ecxFileSize int64, needleId types.NeedleId, processNeedleFn func(file *os.File, offset int64) error) (offset types.Offset, size uint32, err error) {
  170. var key types.NeedleId
  171. buf := make([]byte, types.NeedleMapEntrySize)
  172. l, h := int64(0), ecxFileSize/types.NeedleMapEntrySize
  173. for l < h {
  174. m := (l + h) / 2
  175. if _, err := ecxFile.ReadAt(buf, m*types.NeedleMapEntrySize); err != nil {
  176. return types.Offset{}, types.TombstoneFileSize, fmt.Errorf("ecx file %d read at %d: %v", ecxFileSize, m*types.NeedleMapEntrySize, err)
  177. }
  178. key, offset, size = idx.IdxFileEntry(buf)
  179. if key == needleId {
  180. if processNeedleFn != nil {
  181. err = processNeedleFn(ecxFile, m*types.NeedleHeaderSize)
  182. }
  183. return
  184. }
  185. if key < needleId {
  186. l = m + 1
  187. } else {
  188. h = m
  189. }
  190. }
  191. err = NotFoundError
  192. return
  193. }