ec_volume.go 7.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254
  1. package erasure_coding
  2. import (
  3. "errors"
  4. "fmt"
  5. "github.com/seaweedfs/seaweedfs/weed/pb"
  6. "github.com/seaweedfs/seaweedfs/weed/storage/volume_info"
  7. "golang.org/x/exp/slices"
  8. "math"
  9. "os"
  10. "sync"
  11. "time"
  12. "github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
  13. "github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
  14. "github.com/seaweedfs/seaweedfs/weed/storage/idx"
  15. "github.com/seaweedfs/seaweedfs/weed/storage/needle"
  16. "github.com/seaweedfs/seaweedfs/weed/storage/types"
  17. )
  18. var (
  19. NotFoundError = errors.New("needle not found")
  20. )
  21. type EcVolume struct {
  22. VolumeId needle.VolumeId
  23. Collection string
  24. dir string
  25. dirIdx string
  26. ecxFile *os.File
  27. ecxFileSize int64
  28. ecxCreatedAt time.Time
  29. Shards []*EcVolumeShard
  30. ShardLocations map[ShardId][]pb.ServerAddress
  31. ShardLocationsRefreshTime time.Time
  32. ShardLocationsLock sync.RWMutex
  33. Version needle.Version
  34. ecjFile *os.File
  35. ecjFileAccessLock sync.Mutex
  36. diskType types.DiskType
  37. }
  38. func NewEcVolume(diskType types.DiskType, dir string, dirIdx string, collection string, vid needle.VolumeId) (ev *EcVolume, err error) {
  39. ev = &EcVolume{dir: dir, dirIdx: dirIdx, Collection: collection, VolumeId: vid, diskType: diskType}
  40. dataBaseFileName := EcShardFileName(collection, dir, int(vid))
  41. indexBaseFileName := EcShardFileName(collection, dirIdx, int(vid))
  42. // open ecx file
  43. if ev.ecxFile, err = os.OpenFile(indexBaseFileName+".ecx", os.O_RDWR, 0644); err != nil {
  44. return nil, fmt.Errorf("cannot open ec volume index %s.ecx: %v", indexBaseFileName, err)
  45. }
  46. ecxFi, statErr := ev.ecxFile.Stat()
  47. if statErr != nil {
  48. return nil, fmt.Errorf("can not stat ec volume index %s.ecx: %v", indexBaseFileName, statErr)
  49. }
  50. ev.ecxFileSize = ecxFi.Size()
  51. ev.ecxCreatedAt = ecxFi.ModTime()
  52. // open ecj file
  53. if ev.ecjFile, err = os.OpenFile(indexBaseFileName+".ecj", os.O_RDWR|os.O_CREATE, 0644); err != nil {
  54. return nil, fmt.Errorf("cannot open ec volume journal %s.ecj: %v", indexBaseFileName, err)
  55. }
  56. // read volume info
  57. ev.Version = needle.Version3
  58. if volumeInfo, _, found, _ := volume_info.MaybeLoadVolumeInfo(dataBaseFileName + ".vif"); found {
  59. ev.Version = needle.Version(volumeInfo.Version)
  60. } else {
  61. volume_info.SaveVolumeInfo(dataBaseFileName+".vif", &volume_server_pb.VolumeInfo{Version: uint32(ev.Version)})
  62. }
  63. ev.ShardLocations = make(map[ShardId][]pb.ServerAddress)
  64. return
  65. }
  66. func (ev *EcVolume) AddEcVolumeShard(ecVolumeShard *EcVolumeShard) bool {
  67. for _, s := range ev.Shards {
  68. if s.ShardId == ecVolumeShard.ShardId {
  69. return false
  70. }
  71. }
  72. ev.Shards = append(ev.Shards, ecVolumeShard)
  73. slices.SortFunc(ev.Shards, func(a, b *EcVolumeShard) bool {
  74. return a.VolumeId < b.VolumeId || a.VolumeId == b.VolumeId && a.ShardId < b.ShardId
  75. })
  76. return true
  77. }
  78. func (ev *EcVolume) DeleteEcVolumeShard(shardId ShardId) (ecVolumeShard *EcVolumeShard, deleted bool) {
  79. foundPosition := -1
  80. for i, s := range ev.Shards {
  81. if s.ShardId == shardId {
  82. foundPosition = i
  83. }
  84. }
  85. if foundPosition < 0 {
  86. return nil, false
  87. }
  88. ecVolumeShard = ev.Shards[foundPosition]
  89. ev.Shards = append(ev.Shards[:foundPosition], ev.Shards[foundPosition+1:]...)
  90. return ecVolumeShard, true
  91. }
  92. func (ev *EcVolume) FindEcVolumeShard(shardId ShardId) (ecVolumeShard *EcVolumeShard, found bool) {
  93. for _, s := range ev.Shards {
  94. if s.ShardId == shardId {
  95. return s, true
  96. }
  97. }
  98. return nil, false
  99. }
  100. func (ev *EcVolume) Close() {
  101. for _, s := range ev.Shards {
  102. s.Close()
  103. }
  104. if ev.ecjFile != nil {
  105. ev.ecjFileAccessLock.Lock()
  106. _ = ev.ecjFile.Close()
  107. ev.ecjFile = nil
  108. ev.ecjFileAccessLock.Unlock()
  109. }
  110. if ev.ecxFile != nil {
  111. _ = ev.ecxFile.Close()
  112. ev.ecxFile = nil
  113. }
  114. }
  115. func (ev *EcVolume) Destroy() {
  116. ev.Close()
  117. for _, s := range ev.Shards {
  118. s.Destroy()
  119. }
  120. os.Remove(ev.FileName(".ecx"))
  121. os.Remove(ev.FileName(".ecj"))
  122. os.Remove(ev.FileName(".vif"))
  123. }
  124. func (ev *EcVolume) FileName(ext string) string {
  125. switch ext {
  126. case ".ecx", ".ecj":
  127. return ev.IndexBaseFileName() + ext
  128. }
  129. // .vif
  130. return ev.DataBaseFileName() + ext
  131. }
  132. func (ev *EcVolume) DataBaseFileName() string {
  133. return EcShardFileName(ev.Collection, ev.dir, int(ev.VolumeId))
  134. }
  135. func (ev *EcVolume) IndexBaseFileName() string {
  136. return EcShardFileName(ev.Collection, ev.dirIdx, int(ev.VolumeId))
  137. }
  138. func (ev *EcVolume) ShardSize() uint64 {
  139. if len(ev.Shards) > 0 {
  140. return uint64(ev.Shards[0].Size())
  141. }
  142. return 0
  143. }
  144. func (ev *EcVolume) Size() (size int64) {
  145. for _, shard := range ev.Shards {
  146. size += shard.Size()
  147. }
  148. return
  149. }
  150. func (ev *EcVolume) CreatedAt() time.Time {
  151. return ev.ecxCreatedAt
  152. }
  153. func (ev *EcVolume) ShardIdList() (shardIds []ShardId) {
  154. for _, s := range ev.Shards {
  155. shardIds = append(shardIds, s.ShardId)
  156. }
  157. return
  158. }
  159. func (ev *EcVolume) ToVolumeEcShardInformationMessage() (messages []*master_pb.VolumeEcShardInformationMessage) {
  160. prevVolumeId := needle.VolumeId(math.MaxUint32)
  161. var m *master_pb.VolumeEcShardInformationMessage
  162. for _, s := range ev.Shards {
  163. if s.VolumeId != prevVolumeId {
  164. m = &master_pb.VolumeEcShardInformationMessage{
  165. Id: uint32(s.VolumeId),
  166. Collection: s.Collection,
  167. DiskType: string(ev.diskType),
  168. }
  169. messages = append(messages, m)
  170. }
  171. prevVolumeId = s.VolumeId
  172. m.EcIndexBits = uint32(ShardBits(m.EcIndexBits).AddShardId(s.ShardId))
  173. }
  174. return
  175. }
  176. func (ev *EcVolume) LocateEcShardNeedle(needleId types.NeedleId, version needle.Version) (offset types.Offset, size types.Size, intervals []Interval, err error) {
  177. // find the needle from ecx file
  178. offset, size, err = ev.FindNeedleFromEcx(needleId)
  179. if err != nil {
  180. return types.Offset{}, 0, nil, fmt.Errorf("FindNeedleFromEcx: %v", err)
  181. }
  182. intervals = ev.LocateEcShardNeedleInterval(version, offset.ToActualOffset(), types.Size(needle.GetActualSize(size, version)))
  183. return
  184. }
  185. func (ev *EcVolume) LocateEcShardNeedleInterval(version needle.Version, offset int64, size types.Size) (intervals []Interval) {
  186. shard := ev.Shards[0]
  187. // calculate the locations in the ec shards
  188. intervals = LocateData(ErasureCodingLargeBlockSize, ErasureCodingSmallBlockSize, DataShardsCount*shard.ecdFileSize, offset, types.Size(needle.GetActualSize(size, version)))
  189. return
  190. }
  191. func (ev *EcVolume) FindNeedleFromEcx(needleId types.NeedleId) (offset types.Offset, size types.Size, err error) {
  192. return SearchNeedleFromSortedIndex(ev.ecxFile, ev.ecxFileSize, needleId, nil)
  193. }
  194. func SearchNeedleFromSortedIndex(ecxFile *os.File, ecxFileSize int64, needleId types.NeedleId, processNeedleFn func(file *os.File, offset int64) error) (offset types.Offset, size types.Size, err error) {
  195. var key types.NeedleId
  196. buf := make([]byte, types.NeedleMapEntrySize)
  197. l, h := int64(0), ecxFileSize/types.NeedleMapEntrySize
  198. for l < h {
  199. m := (l + h) / 2
  200. if _, err := ecxFile.ReadAt(buf, m*types.NeedleMapEntrySize); err != nil {
  201. return types.Offset{}, types.TombstoneFileSize, fmt.Errorf("ecx file %d read at %d: %v", ecxFileSize, m*types.NeedleMapEntrySize, err)
  202. }
  203. key, offset, size = idx.IdxFileEntry(buf)
  204. if key == needleId {
  205. if processNeedleFn != nil {
  206. err = processNeedleFn(ecxFile, m*types.NeedleHeaderSize)
  207. }
  208. return
  209. }
  210. if key < needleId {
  211. l = m + 1
  212. } else {
  213. h = m
  214. }
  215. }
  216. err = NotFoundError
  217. return
  218. }