volume_grpc_erasure_coding.go 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472
  1. package weed_server
  2. import (
  3. "context"
  4. "fmt"
  5. "io"
  6. "math"
  7. "os"
  8. "path"
  9. "strings"
  10. "time"
  11. "github.com/seaweedfs/seaweedfs/weed/glog"
  12. "github.com/seaweedfs/seaweedfs/weed/operation"
  13. "github.com/seaweedfs/seaweedfs/weed/pb"
  14. "github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
  15. "github.com/seaweedfs/seaweedfs/weed/storage"
  16. "github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding"
  17. "github.com/seaweedfs/seaweedfs/weed/storage/needle"
  18. "github.com/seaweedfs/seaweedfs/weed/storage/types"
  19. "github.com/seaweedfs/seaweedfs/weed/storage/volume_info"
  20. "github.com/seaweedfs/seaweedfs/weed/util"
  21. )
  22. /*
  23. Steps to apply erasure coding to .dat .idx files
  24. 0. ensure the volume is readonly
  25. 1. client call VolumeEcShardsGenerate to generate the .ecx and .ec00 ~ .ec13 files
  26. 2. client ask master for possible servers to hold the ec files
  27. 3. client call VolumeEcShardsCopy on above target servers to copy ec files from the source server
  28. 4. target servers report the new ec files to the master
  29. 5. master stores vid -> [14]*DataNode
  30. 6. client checks master. If all 14 slices are ready, delete the original .idx, .idx files
  31. */
  32. // VolumeEcShardsGenerate generates the .ecx and .ec00 ~ .ec13 files
  33. func (vs *VolumeServer) VolumeEcShardsGenerate(ctx context.Context, req *volume_server_pb.VolumeEcShardsGenerateRequest) (*volume_server_pb.VolumeEcShardsGenerateResponse, error) {
  34. glog.V(0).Infof("VolumeEcShardsGenerate: %v", req)
  35. v := vs.store.GetVolume(needle.VolumeId(req.VolumeId))
  36. if v == nil {
  37. return nil, fmt.Errorf("volume %d not found", req.VolumeId)
  38. }
  39. baseFileName := v.DataFileName()
  40. if v.Collection != req.Collection {
  41. return nil, fmt.Errorf("existing collection:%v unexpected input: %v", v.Collection, req.Collection)
  42. }
  43. shouldCleanup := true
  44. defer func() {
  45. if !shouldCleanup {
  46. return
  47. }
  48. for i := 0; i < erasure_coding.TotalShardsCount; i++ {
  49. os.Remove(fmt.Sprintf("%s.ec%2d", baseFileName, i))
  50. }
  51. os.Remove(v.IndexFileName() + ".ecx")
  52. }()
  53. // write .ec00 ~ .ec13 files
  54. if err := erasure_coding.WriteEcFiles(baseFileName); err != nil {
  55. return nil, fmt.Errorf("WriteEcFiles %s: %v", baseFileName, err)
  56. }
  57. // write .ecx file
  58. if err := erasure_coding.WriteSortedFileFromIdx(v.IndexFileName(), ".ecx"); err != nil {
  59. return nil, fmt.Errorf("WriteSortedFileFromIdx %s: %v", v.IndexFileName(), err)
  60. }
  61. // write .vif files
  62. var destroyTime uint64
  63. if v.Ttl != nil {
  64. ttlMills := v.Ttl.ToSeconds()
  65. if ttlMills > 0 {
  66. destroyTime = uint64(time.Now().Unix()) + v.Ttl.ToSeconds() //calculated destroy time from the ec volume was created
  67. }
  68. }
  69. volumeInfo := &volume_server_pb.VolumeInfo{Version: uint32(v.Version())}
  70. if destroyTime == 0 {
  71. glog.Warningf("gen ec volume,cal ec volume destory time fail,set time to 0,ttl:%v", v.Ttl)
  72. } else {
  73. volumeInfo.DestroyTime = destroyTime
  74. }
  75. datSize, _, _ := v.FileStat()
  76. volumeInfo.DatFileSize = int64(datSize)
  77. if err := volume_info.SaveVolumeInfo(baseFileName+".vif", volumeInfo); err != nil {
  78. return nil, fmt.Errorf("SaveVolumeInfo %s: %v", baseFileName, err)
  79. }
  80. shouldCleanup = false
  81. return &volume_server_pb.VolumeEcShardsGenerateResponse{}, nil
  82. }
  83. // VolumeEcShardsRebuild generates the any of the missing .ec00 ~ .ec13 files
  84. func (vs *VolumeServer) VolumeEcShardsRebuild(ctx context.Context, req *volume_server_pb.VolumeEcShardsRebuildRequest) (*volume_server_pb.VolumeEcShardsRebuildResponse, error) {
  85. glog.V(0).Infof("VolumeEcShardsRebuild: %v", req)
  86. baseFileName := erasure_coding.EcShardBaseFileName(req.Collection, int(req.VolumeId))
  87. var rebuiltShardIds []uint32
  88. for _, location := range vs.store.Locations {
  89. _, _, existingShardCount, err := checkEcVolumeStatus(baseFileName, location)
  90. if err != nil {
  91. return nil, err
  92. }
  93. if existingShardCount == 0 {
  94. continue
  95. }
  96. if util.FileExists(path.Join(location.IdxDirectory, baseFileName+".ecx")) {
  97. // write .ec00 ~ .ec13 files
  98. dataBaseFileName := path.Join(location.Directory, baseFileName)
  99. if generatedShardIds, err := erasure_coding.RebuildEcFiles(dataBaseFileName); err != nil {
  100. return nil, fmt.Errorf("RebuildEcFiles %s: %v", dataBaseFileName, err)
  101. } else {
  102. rebuiltShardIds = generatedShardIds
  103. }
  104. indexBaseFileName := path.Join(location.IdxDirectory, baseFileName)
  105. if err := erasure_coding.RebuildEcxFile(indexBaseFileName); err != nil {
  106. return nil, fmt.Errorf("RebuildEcxFile %s: %v", dataBaseFileName, err)
  107. }
  108. break
  109. }
  110. }
  111. return &volume_server_pb.VolumeEcShardsRebuildResponse{
  112. RebuiltShardIds: rebuiltShardIds,
  113. }, nil
  114. }
  115. // VolumeEcShardsCopy copy the .ecx and some ec data slices
  116. func (vs *VolumeServer) VolumeEcShardsCopy(ctx context.Context, req *volume_server_pb.VolumeEcShardsCopyRequest) (*volume_server_pb.VolumeEcShardsCopyResponse, error) {
  117. glog.V(0).Infof("VolumeEcShardsCopy: %v", req)
  118. var location *storage.DiskLocation
  119. if req.CopyEcxFile {
  120. location = vs.store.FindFreeLocation(func(location *storage.DiskLocation) bool {
  121. return location.DiskType == types.HardDriveType
  122. })
  123. } else {
  124. location = vs.store.FindFreeLocation(func(location *storage.DiskLocation) bool {
  125. //(location.FindEcVolume) This method is error, will cause location is nil, redundant judgment
  126. // _, found := location.FindEcVolume(needle.VolumeId(req.VolumeId))
  127. // return found
  128. return true
  129. })
  130. }
  131. if location == nil {
  132. return nil, fmt.Errorf("no space left")
  133. }
  134. dataBaseFileName := storage.VolumeFileName(location.Directory, req.Collection, int(req.VolumeId))
  135. indexBaseFileName := storage.VolumeFileName(location.IdxDirectory, req.Collection, int(req.VolumeId))
  136. err := operation.WithVolumeServerClient(true, pb.ServerAddress(req.SourceDataNode), vs.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
  137. // copy ec data slices
  138. for _, shardId := range req.ShardIds {
  139. if _, err := vs.doCopyFile(client, true, req.Collection, req.VolumeId, math.MaxUint32, math.MaxInt64, dataBaseFileName, erasure_coding.ToExt(int(shardId)), false, false, nil); err != nil {
  140. return err
  141. }
  142. }
  143. if req.CopyEcxFile {
  144. // copy ecx file
  145. if _, err := vs.doCopyFile(client, true, req.Collection, req.VolumeId, math.MaxUint32, math.MaxInt64, indexBaseFileName, ".ecx", false, false, nil); err != nil {
  146. return err
  147. }
  148. }
  149. if req.CopyEcjFile {
  150. // copy ecj file
  151. if _, err := vs.doCopyFile(client, true, req.Collection, req.VolumeId, math.MaxUint32, math.MaxInt64, indexBaseFileName, ".ecj", true, true, nil); err != nil {
  152. return err
  153. }
  154. }
  155. if req.CopyVifFile {
  156. // copy vif file
  157. if _, err := vs.doCopyFile(client, true, req.Collection, req.VolumeId, math.MaxUint32, math.MaxInt64, dataBaseFileName, ".vif", false, true, nil); err != nil {
  158. return err
  159. }
  160. }
  161. return nil
  162. })
  163. if err != nil {
  164. return nil, fmt.Errorf("VolumeEcShardsCopy volume %d: %v", req.VolumeId, err)
  165. }
  166. return &volume_server_pb.VolumeEcShardsCopyResponse{}, nil
  167. }
  168. // VolumeEcShardsDelete local delete the .ecx and some ec data slices if not needed
  169. // the shard should not be mounted before calling this.
  170. func (vs *VolumeServer) VolumeEcShardsDelete(ctx context.Context, req *volume_server_pb.VolumeEcShardsDeleteRequest) (*volume_server_pb.VolumeEcShardsDeleteResponse, error) {
  171. bName := erasure_coding.EcShardBaseFileName(req.Collection, int(req.VolumeId))
  172. glog.V(0).Infof("ec volume %s shard delete %v", bName, req.ShardIds)
  173. for _, location := range vs.store.Locations {
  174. if err := deleteEcShardIdsForEachLocation(bName, location, req.ShardIds); err != nil {
  175. glog.Errorf("deleteEcShards from %s %s.%v: %v", location.Directory, bName, req.ShardIds, err)
  176. return nil, err
  177. }
  178. }
  179. return &volume_server_pb.VolumeEcShardsDeleteResponse{}, nil
  180. }
  181. func deleteEcShardIdsForEachLocation(bName string, location *storage.DiskLocation, shardIds []uint32) error {
  182. found := false
  183. indexBaseFilename := path.Join(location.IdxDirectory, bName)
  184. dataBaseFilename := path.Join(location.Directory, bName)
  185. if util.FileExists(path.Join(location.IdxDirectory, bName+".ecx")) {
  186. for _, shardId := range shardIds {
  187. shardFileName := dataBaseFilename + erasure_coding.ToExt(int(shardId))
  188. if util.FileExists(shardFileName) {
  189. found = true
  190. os.Remove(shardFileName)
  191. }
  192. }
  193. }
  194. if !found {
  195. return nil
  196. }
  197. hasEcxFile, hasIdxFile, existingShardCount, err := checkEcVolumeStatus(bName, location)
  198. if err != nil {
  199. return err
  200. }
  201. if hasEcxFile && existingShardCount == 0 {
  202. if err := os.Remove(indexBaseFilename + ".ecx"); err != nil {
  203. return err
  204. }
  205. os.Remove(indexBaseFilename + ".ecj")
  206. if !hasIdxFile {
  207. // .vif is used for ec volumes and normal volumes
  208. os.Remove(dataBaseFilename + ".vif")
  209. }
  210. }
  211. return nil
  212. }
  213. func checkEcVolumeStatus(bName string, location *storage.DiskLocation) (hasEcxFile bool, hasIdxFile bool, existingShardCount int, err error) {
  214. // check whether to delete the .ecx and .ecj file also
  215. fileInfos, err := os.ReadDir(location.Directory)
  216. if err != nil {
  217. return false, false, 0, err
  218. }
  219. if location.IdxDirectory != location.Directory {
  220. idxFileInfos, err := os.ReadDir(location.IdxDirectory)
  221. if err != nil {
  222. return false, false, 0, err
  223. }
  224. fileInfos = append(fileInfos, idxFileInfos...)
  225. }
  226. for _, fileInfo := range fileInfos {
  227. if fileInfo.Name() == bName+".ecx" || fileInfo.Name() == bName+".ecj" {
  228. hasEcxFile = true
  229. continue
  230. }
  231. if fileInfo.Name() == bName+".idx" {
  232. hasIdxFile = true
  233. continue
  234. }
  235. if strings.HasPrefix(fileInfo.Name(), bName+".ec") {
  236. existingShardCount++
  237. }
  238. }
  239. return hasEcxFile, hasIdxFile, existingShardCount, nil
  240. }
  241. func (vs *VolumeServer) VolumeEcShardsMount(ctx context.Context, req *volume_server_pb.VolumeEcShardsMountRequest) (*volume_server_pb.VolumeEcShardsMountResponse, error) {
  242. glog.V(0).Infof("VolumeEcShardsMount: %v", req)
  243. for _, shardId := range req.ShardIds {
  244. err := vs.store.MountEcShards(req.Collection, needle.VolumeId(req.VolumeId), erasure_coding.ShardId(shardId))
  245. if err != nil {
  246. glog.Errorf("ec shard mount %v: %v", req, err)
  247. } else {
  248. glog.V(2).Infof("ec shard mount %v", req)
  249. }
  250. if err != nil {
  251. return nil, fmt.Errorf("mount %d.%d: %v", req.VolumeId, shardId, err)
  252. }
  253. }
  254. return &volume_server_pb.VolumeEcShardsMountResponse{}, nil
  255. }
  256. func (vs *VolumeServer) VolumeEcShardsUnmount(ctx context.Context, req *volume_server_pb.VolumeEcShardsUnmountRequest) (*volume_server_pb.VolumeEcShardsUnmountResponse, error) {
  257. glog.V(0).Infof("VolumeEcShardsUnmount: %v", req)
  258. for _, shardId := range req.ShardIds {
  259. err := vs.store.UnmountEcShards(needle.VolumeId(req.VolumeId), erasure_coding.ShardId(shardId))
  260. if err != nil {
  261. glog.Errorf("ec shard unmount %v: %v", req, err)
  262. } else {
  263. glog.V(2).Infof("ec shard unmount %v", req)
  264. }
  265. if err != nil {
  266. return nil, fmt.Errorf("unmount %d.%d: %v", req.VolumeId, shardId, err)
  267. }
  268. }
  269. return &volume_server_pb.VolumeEcShardsUnmountResponse{}, nil
  270. }
  271. func (vs *VolumeServer) VolumeEcShardRead(req *volume_server_pb.VolumeEcShardReadRequest, stream volume_server_pb.VolumeServer_VolumeEcShardReadServer) error {
  272. ecVolume, found := vs.store.FindEcVolume(needle.VolumeId(req.VolumeId))
  273. if !found {
  274. return fmt.Errorf("VolumeEcShardRead not found ec volume id %d", req.VolumeId)
  275. }
  276. ecShard, found := ecVolume.FindEcVolumeShard(erasure_coding.ShardId(req.ShardId))
  277. if !found {
  278. return fmt.Errorf("not found ec shard %d.%d", req.VolumeId, req.ShardId)
  279. }
  280. if req.FileKey != 0 {
  281. _, size, _ := ecVolume.FindNeedleFromEcx(types.Uint64ToNeedleId(req.FileKey))
  282. if size.IsDeleted() {
  283. return stream.Send(&volume_server_pb.VolumeEcShardReadResponse{
  284. IsDeleted: true,
  285. })
  286. }
  287. }
  288. bufSize := req.Size
  289. if bufSize > BufferSizeLimit {
  290. bufSize = BufferSizeLimit
  291. }
  292. buffer := make([]byte, bufSize)
  293. startOffset, bytesToRead := req.Offset, req.Size
  294. for bytesToRead > 0 {
  295. // min of bytesToRead and bufSize
  296. bufferSize := bufSize
  297. if bufferSize > bytesToRead {
  298. bufferSize = bytesToRead
  299. }
  300. bytesread, err := ecShard.ReadAt(buffer[0:bufferSize], startOffset)
  301. // println("read", ecShard.FileName(), "startOffset", startOffset, bytesread, "bytes, with target", bufferSize)
  302. if bytesread > 0 {
  303. if int64(bytesread) > bytesToRead {
  304. bytesread = int(bytesToRead)
  305. }
  306. err = stream.Send(&volume_server_pb.VolumeEcShardReadResponse{
  307. Data: buffer[:bytesread],
  308. })
  309. if err != nil {
  310. // println("sending", bytesread, "bytes err", err.Error())
  311. return err
  312. }
  313. startOffset += int64(bytesread)
  314. bytesToRead -= int64(bytesread)
  315. }
  316. if err != nil {
  317. if err != io.EOF {
  318. return err
  319. }
  320. return nil
  321. }
  322. }
  323. return nil
  324. }
  325. func (vs *VolumeServer) VolumeEcBlobDelete(ctx context.Context, req *volume_server_pb.VolumeEcBlobDeleteRequest) (*volume_server_pb.VolumeEcBlobDeleteResponse, error) {
  326. glog.V(0).Infof("VolumeEcBlobDelete: %v", req)
  327. resp := &volume_server_pb.VolumeEcBlobDeleteResponse{}
  328. for _, location := range vs.store.Locations {
  329. if localEcVolume, found := location.FindEcVolume(needle.VolumeId(req.VolumeId)); found {
  330. _, size, _, err := localEcVolume.LocateEcShardNeedle(types.NeedleId(req.FileKey), needle.Version(req.Version))
  331. if err != nil {
  332. return nil, fmt.Errorf("locate in local ec volume: %v", err)
  333. }
  334. if size.IsDeleted() {
  335. return resp, nil
  336. }
  337. err = localEcVolume.DeleteNeedleFromEcx(types.NeedleId(req.FileKey))
  338. if err != nil {
  339. return nil, err
  340. }
  341. break
  342. }
  343. }
  344. return resp, nil
  345. }
  346. // VolumeEcShardsToVolume generates the .idx, .dat files from .ecx, .ecj and .ec01 ~ .ec14 files
  347. func (vs *VolumeServer) VolumeEcShardsToVolume(ctx context.Context, req *volume_server_pb.VolumeEcShardsToVolumeRequest) (*volume_server_pb.VolumeEcShardsToVolumeResponse, error) {
  348. glog.V(0).Infof("VolumeEcShardsToVolume: %v", req)
  349. // collect .ec00 ~ .ec09 files
  350. shardFileNames := make([]string, erasure_coding.DataShardsCount)
  351. v, found := vs.store.CollectEcShards(needle.VolumeId(req.VolumeId), shardFileNames)
  352. if !found {
  353. return nil, fmt.Errorf("ec volume %d not found", req.VolumeId)
  354. }
  355. if v.Collection != req.Collection {
  356. return nil, fmt.Errorf("existing collection:%v unexpected input: %v", v.Collection, req.Collection)
  357. }
  358. for shardId := 0; shardId < erasure_coding.DataShardsCount; shardId++ {
  359. if shardFileNames[shardId] == "" {
  360. return nil, fmt.Errorf("ec volume %d missing shard %d", req.VolumeId, shardId)
  361. }
  362. }
  363. dataBaseFileName, indexBaseFileName := v.DataBaseFileName(), v.IndexBaseFileName()
  364. // calculate .dat file size
  365. datFileSize, err := erasure_coding.FindDatFileSize(dataBaseFileName, indexBaseFileName)
  366. if err != nil {
  367. return nil, fmt.Errorf("FindDatFileSize %s: %v", dataBaseFileName, err)
  368. }
  369. // write .dat file from .ec00 ~ .ec09 files
  370. if err := erasure_coding.WriteDatFile(dataBaseFileName, datFileSize, shardFileNames); err != nil {
  371. return nil, fmt.Errorf("WriteDatFile %s: %v", dataBaseFileName, err)
  372. }
  373. // write .idx file from .ecx and .ecj files
  374. if err := erasure_coding.WriteIdxFileFromEcIndex(indexBaseFileName); err != nil {
  375. return nil, fmt.Errorf("WriteIdxFileFromEcIndex %s: %v", v.IndexBaseFileName(), err)
  376. }
  377. return &volume_server_pb.VolumeEcShardsToVolumeResponse{}, nil
  378. }