volume_grpc_erasure_coding.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400
  1. package weed_server
  2. import (
  3. "context"
  4. "fmt"
  5. "io"
  6. "io/ioutil"
  7. "math"
  8. "os"
  9. "path"
  10. "strings"
  11. "github.com/chrislusf/seaweedfs/weed/glog"
  12. "github.com/chrislusf/seaweedfs/weed/operation"
  13. "github.com/chrislusf/seaweedfs/weed/pb"
  14. "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
  15. "github.com/chrislusf/seaweedfs/weed/storage"
  16. "github.com/chrislusf/seaweedfs/weed/storage/erasure_coding"
  17. "github.com/chrislusf/seaweedfs/weed/storage/needle"
  18. "github.com/chrislusf/seaweedfs/weed/storage/types"
  19. "github.com/chrislusf/seaweedfs/weed/util"
  20. )
  21. /*
  22. Steps to apply erasure coding to .dat .idx files
  23. 0. ensure the volume is readonly
  24. 1. client call VolumeEcShardsGenerate to generate the .ecx and .ec00 ~ .ec13 files
  25. 2. client ask master for possible servers to hold the ec files
  26. 3. client call VolumeEcShardsCopy on above target servers to copy ec files from the source server
  27. 4. target servers report the new ec files to the master
  28. 5. master stores vid -> [14]*DataNode
  29. 6. client checks master. If all 14 slices are ready, delete the original .idx, .idx files
  30. */
  31. // VolumeEcShardsGenerate generates the .ecx and .ec00 ~ .ec13 files
  32. func (vs *VolumeServer) VolumeEcShardsGenerate(ctx context.Context, req *volume_server_pb.VolumeEcShardsGenerateRequest) (*volume_server_pb.VolumeEcShardsGenerateResponse, error) {
  33. glog.V(0).Infof("VolumeEcShardsGenerate: %v", req)
  34. v := vs.store.GetVolume(needle.VolumeId(req.VolumeId))
  35. if v == nil {
  36. return nil, fmt.Errorf("volume %d not found", req.VolumeId)
  37. }
  38. baseFileName := v.DataFileName()
  39. if v.Collection != req.Collection {
  40. return nil, fmt.Errorf("existing collection:%v unexpected input: %v", v.Collection, req.Collection)
  41. }
  42. // write .ec00 ~ .ec13 files
  43. if err := erasure_coding.WriteEcFiles(baseFileName); err != nil {
  44. return nil, fmt.Errorf("WriteEcFiles %s: %v", baseFileName, err)
  45. }
  46. // write .ecx file
  47. if err := erasure_coding.WriteSortedFileFromIdx(v.IndexFileName(), ".ecx"); err != nil {
  48. return nil, fmt.Errorf("WriteSortedFileFromIdx %s: %v", v.IndexFileName(), err)
  49. }
  50. // write .vif files
  51. if err := pb.SaveVolumeInfo(baseFileName+".vif", &volume_server_pb.VolumeInfo{Version: uint32(v.Version())}); err != nil {
  52. return nil, fmt.Errorf("WriteEcFiles %s: %v", baseFileName, err)
  53. }
  54. return &volume_server_pb.VolumeEcShardsGenerateResponse{}, nil
  55. }
  56. // VolumeEcShardsRebuild generates the any of the missing .ec00 ~ .ec13 files
  57. func (vs *VolumeServer) VolumeEcShardsRebuild(ctx context.Context, req *volume_server_pb.VolumeEcShardsRebuildRequest) (*volume_server_pb.VolumeEcShardsRebuildResponse, error) {
  58. glog.V(0).Infof("VolumeEcShardsRebuild: %v", req)
  59. baseFileName := erasure_coding.EcShardBaseFileName(req.Collection, int(req.VolumeId))
  60. var rebuiltShardIds []uint32
  61. for _, location := range vs.store.Locations {
  62. if util.FileExists(path.Join(location.IdxDirectory, baseFileName+".ecx")) {
  63. // write .ec00 ~ .ec13 files
  64. dataBaseFileName := path.Join(location.Directory, baseFileName)
  65. if generatedShardIds, err := erasure_coding.RebuildEcFiles(dataBaseFileName); err != nil {
  66. return nil, fmt.Errorf("RebuildEcFiles %s: %v", dataBaseFileName, err)
  67. } else {
  68. rebuiltShardIds = generatedShardIds
  69. }
  70. indexBaseFileName := path.Join(location.IdxDirectory, baseFileName)
  71. if err := erasure_coding.RebuildEcxFile(indexBaseFileName); err != nil {
  72. return nil, fmt.Errorf("RebuildEcxFile %s: %v", dataBaseFileName, err)
  73. }
  74. break
  75. }
  76. }
  77. return &volume_server_pb.VolumeEcShardsRebuildResponse{
  78. RebuiltShardIds: rebuiltShardIds,
  79. }, nil
  80. }
  81. // VolumeEcShardsCopy copy the .ecx and some ec data slices
  82. func (vs *VolumeServer) VolumeEcShardsCopy(ctx context.Context, req *volume_server_pb.VolumeEcShardsCopyRequest) (*volume_server_pb.VolumeEcShardsCopyResponse, error) {
  83. glog.V(0).Infof("VolumeEcShardsCopy: %v", req)
  84. location := vs.store.FindFreeLocation()
  85. if location == nil {
  86. return nil, fmt.Errorf("no space left")
  87. }
  88. dataBaseFileName := storage.VolumeFileName(location.Directory, req.Collection, int(req.VolumeId))
  89. indexBaseFileName := storage.VolumeFileName(location.IdxDirectory, req.Collection, int(req.VolumeId))
  90. err := operation.WithVolumeServerClient(req.SourceDataNode, vs.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
  91. // copy ec data slices
  92. for _, shardId := range req.ShardIds {
  93. if err := vs.doCopyFile(client, true, req.Collection, req.VolumeId, math.MaxUint32, math.MaxInt64, dataBaseFileName, erasure_coding.ToExt(int(shardId)), false, false); err != nil {
  94. return err
  95. }
  96. }
  97. if req.CopyEcxFile {
  98. // copy ecx file
  99. if err := vs.doCopyFile(client, true, req.Collection, req.VolumeId, math.MaxUint32, math.MaxInt64, indexBaseFileName, ".ecx", false, false); err != nil {
  100. return err
  101. }
  102. return nil
  103. }
  104. if req.CopyEcjFile {
  105. // copy ecj file
  106. if err := vs.doCopyFile(client, true, req.Collection, req.VolumeId, math.MaxUint32, math.MaxInt64, indexBaseFileName, ".ecj", true, true); err != nil {
  107. return err
  108. }
  109. }
  110. if req.CopyVifFile {
  111. // copy vif file
  112. if err := vs.doCopyFile(client, true, req.Collection, req.VolumeId, math.MaxUint32, math.MaxInt64, dataBaseFileName, ".vif", false, true); err != nil {
  113. return err
  114. }
  115. }
  116. return nil
  117. })
  118. if err != nil {
  119. return nil, fmt.Errorf("VolumeEcShardsCopy volume %d: %v", req.VolumeId, err)
  120. }
  121. return &volume_server_pb.VolumeEcShardsCopyResponse{}, nil
  122. }
  123. // VolumeEcShardsDelete local delete the .ecx and some ec data slices if not needed
  124. // the shard should not be mounted before calling this.
  125. func (vs *VolumeServer) VolumeEcShardsDelete(ctx context.Context, req *volume_server_pb.VolumeEcShardsDeleteRequest) (*volume_server_pb.VolumeEcShardsDeleteResponse, error) {
  126. bName := erasure_coding.EcShardBaseFileName(req.Collection, int(req.VolumeId))
  127. glog.V(0).Infof("ec volume %d shard delete %v", req.VolumeId, req.ShardIds)
  128. found := false
  129. var indexBaseFilename, dataBaseFilename string
  130. for _, location := range vs.store.Locations {
  131. if util.FileExists(path.Join(location.IdxDirectory, bName+".ecx")) {
  132. found = true
  133. indexBaseFilename = path.Join(location.IdxDirectory, bName)
  134. dataBaseFilename = path.Join(location.Directory, bName)
  135. for _, shardId := range req.ShardIds {
  136. os.Remove(dataBaseFilename + erasure_coding.ToExt(int(shardId)))
  137. }
  138. break
  139. }
  140. }
  141. if !found {
  142. return nil, nil
  143. }
  144. // check whether to delete the .ecx and .ecj file also
  145. hasEcxFile := false
  146. hasIdxFile := false
  147. existingShardCount := 0
  148. for _, location := range vs.store.Locations {
  149. fileInfos, err := ioutil.ReadDir(location.Directory)
  150. if err != nil {
  151. continue
  152. }
  153. if location.IdxDirectory != location.Directory {
  154. idxFileInfos, err := ioutil.ReadDir(location.IdxDirectory)
  155. if err != nil {
  156. continue
  157. }
  158. fileInfos = append(fileInfos, idxFileInfos...)
  159. }
  160. for _, fileInfo := range fileInfos {
  161. if fileInfo.Name() == bName+".ecx" || fileInfo.Name() == bName+".ecj" {
  162. hasEcxFile = true
  163. continue
  164. }
  165. if fileInfo.Name() == bName+".idx" {
  166. hasIdxFile = true
  167. continue
  168. }
  169. if strings.HasPrefix(fileInfo.Name(), bName+".ec") {
  170. existingShardCount++
  171. }
  172. }
  173. }
  174. if hasEcxFile && existingShardCount == 0 {
  175. if err := os.Remove(indexBaseFilename + ".ecx"); err != nil {
  176. return nil, err
  177. }
  178. os.Remove(indexBaseFilename + ".ecj")
  179. }
  180. if !hasIdxFile {
  181. // .vif is used for ec volumes and normal volumes
  182. os.Remove(dataBaseFilename + ".vif")
  183. }
  184. return &volume_server_pb.VolumeEcShardsDeleteResponse{}, nil
  185. }
  186. func (vs *VolumeServer) VolumeEcShardsMount(ctx context.Context, req *volume_server_pb.VolumeEcShardsMountRequest) (*volume_server_pb.VolumeEcShardsMountResponse, error) {
  187. glog.V(0).Infof("VolumeEcShardsMount: %v", req)
  188. for _, shardId := range req.ShardIds {
  189. err := vs.store.MountEcShards(req.Collection, needle.VolumeId(req.VolumeId), erasure_coding.ShardId(shardId))
  190. if err != nil {
  191. glog.Errorf("ec shard mount %v: %v", req, err)
  192. } else {
  193. glog.V(2).Infof("ec shard mount %v", req)
  194. }
  195. if err != nil {
  196. return nil, fmt.Errorf("mount %d.%d: %v", req.VolumeId, shardId, err)
  197. }
  198. }
  199. return &volume_server_pb.VolumeEcShardsMountResponse{}, nil
  200. }
  201. func (vs *VolumeServer) VolumeEcShardsUnmount(ctx context.Context, req *volume_server_pb.VolumeEcShardsUnmountRequest) (*volume_server_pb.VolumeEcShardsUnmountResponse, error) {
  202. glog.V(0).Infof("VolumeEcShardsUnmount: %v", req)
  203. for _, shardId := range req.ShardIds {
  204. err := vs.store.UnmountEcShards(needle.VolumeId(req.VolumeId), erasure_coding.ShardId(shardId))
  205. if err != nil {
  206. glog.Errorf("ec shard unmount %v: %v", req, err)
  207. } else {
  208. glog.V(2).Infof("ec shard unmount %v", req)
  209. }
  210. if err != nil {
  211. return nil, fmt.Errorf("unmount %d.%d: %v", req.VolumeId, shardId, err)
  212. }
  213. }
  214. return &volume_server_pb.VolumeEcShardsUnmountResponse{}, nil
  215. }
  216. func (vs *VolumeServer) VolumeEcShardRead(req *volume_server_pb.VolumeEcShardReadRequest, stream volume_server_pb.VolumeServer_VolumeEcShardReadServer) error {
  217. ecVolume, found := vs.store.FindEcVolume(needle.VolumeId(req.VolumeId))
  218. if !found {
  219. return fmt.Errorf("VolumeEcShardRead not found ec volume id %d", req.VolumeId)
  220. }
  221. ecShard, found := ecVolume.FindEcVolumeShard(erasure_coding.ShardId(req.ShardId))
  222. if !found {
  223. return fmt.Errorf("not found ec shard %d.%d", req.VolumeId, req.ShardId)
  224. }
  225. if req.FileKey != 0 {
  226. _, size, _ := ecVolume.FindNeedleFromEcx(types.Uint64ToNeedleId(req.FileKey))
  227. if size.IsDeleted() {
  228. return stream.Send(&volume_server_pb.VolumeEcShardReadResponse{
  229. IsDeleted: true,
  230. })
  231. }
  232. }
  233. bufSize := req.Size
  234. if bufSize > BufferSizeLimit {
  235. bufSize = BufferSizeLimit
  236. }
  237. buffer := make([]byte, bufSize)
  238. startOffset, bytesToRead := req.Offset, req.Size
  239. for bytesToRead > 0 {
  240. // min of bytesToRead and bufSize
  241. bufferSize := bufSize
  242. if bufferSize > bytesToRead {
  243. bufferSize = bytesToRead
  244. }
  245. bytesread, err := ecShard.ReadAt(buffer[0:bufferSize], startOffset)
  246. // println("read", ecShard.FileName(), "startOffset", startOffset, bytesread, "bytes, with target", bufferSize)
  247. if bytesread > 0 {
  248. if int64(bytesread) > bytesToRead {
  249. bytesread = int(bytesToRead)
  250. }
  251. err = stream.Send(&volume_server_pb.VolumeEcShardReadResponse{
  252. Data: buffer[:bytesread],
  253. })
  254. if err != nil {
  255. // println("sending", bytesread, "bytes err", err.Error())
  256. return err
  257. }
  258. startOffset += int64(bytesread)
  259. bytesToRead -= int64(bytesread)
  260. }
  261. if err != nil {
  262. if err != io.EOF {
  263. return err
  264. }
  265. return nil
  266. }
  267. }
  268. return nil
  269. }
  270. func (vs *VolumeServer) VolumeEcBlobDelete(ctx context.Context, req *volume_server_pb.VolumeEcBlobDeleteRequest) (*volume_server_pb.VolumeEcBlobDeleteResponse, error) {
  271. glog.V(0).Infof("VolumeEcBlobDelete: %v", req)
  272. resp := &volume_server_pb.VolumeEcBlobDeleteResponse{}
  273. for _, location := range vs.store.Locations {
  274. if localEcVolume, found := location.FindEcVolume(needle.VolumeId(req.VolumeId)); found {
  275. _, size, _, err := localEcVolume.LocateEcShardNeedle(types.NeedleId(req.FileKey), needle.Version(req.Version))
  276. if err != nil {
  277. return nil, fmt.Errorf("locate in local ec volume: %v", err)
  278. }
  279. if size.IsDeleted() {
  280. return resp, nil
  281. }
  282. err = localEcVolume.DeleteNeedleFromEcx(types.NeedleId(req.FileKey))
  283. if err != nil {
  284. return nil, err
  285. }
  286. break
  287. }
  288. }
  289. return resp, nil
  290. }
  291. // VolumeEcShardsToVolume generates the .idx, .dat files from .ecx, .ecj and .ec01 ~ .ec14 files
  292. func (vs *VolumeServer) VolumeEcShardsToVolume(ctx context.Context, req *volume_server_pb.VolumeEcShardsToVolumeRequest) (*volume_server_pb.VolumeEcShardsToVolumeResponse, error) {
  293. glog.V(0).Infof("VolumeEcShardsToVolume: %v", req)
  294. v, found := vs.store.FindEcVolume(needle.VolumeId(req.VolumeId))
  295. if !found {
  296. return nil, fmt.Errorf("ec volume %d not found", req.VolumeId)
  297. }
  298. if v.Collection != req.Collection {
  299. return nil, fmt.Errorf("existing collection:%v unexpected input: %v", v.Collection, req.Collection)
  300. }
  301. dataBaseFileName, indexBaseFileName := v.DataBaseFileName(), v.IndexBaseFileName()
  302. // calculate .dat file size
  303. datFileSize, err := erasure_coding.FindDatFileSize(dataBaseFileName, indexBaseFileName)
  304. if err != nil {
  305. return nil, fmt.Errorf("FindDatFileSize %s: %v", dataBaseFileName, err)
  306. }
  307. // write .dat file from .ec00 ~ .ec09 files
  308. if err := erasure_coding.WriteDatFile(dataBaseFileName, datFileSize); err != nil {
  309. return nil, fmt.Errorf("WriteEcFiles %s: %v", dataBaseFileName, err)
  310. }
  311. // write .idx file from .ecx and .ecj files
  312. if err := erasure_coding.WriteIdxFileFromEcIndex(indexBaseFileName); err != nil {
  313. return nil, fmt.Errorf("WriteIdxFileFromEcIndex %s: %v", v.IndexBaseFileName(), err)
  314. }
  315. return &volume_server_pb.VolumeEcShardsToVolumeResponse{}, nil
  316. }