volume_grpc_copy.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353
  1. package weed_server
  2. import (
  3. "context"
  4. "fmt"
  5. "io"
  6. "math"
  7. "os"
  8. "time"
  9. "github.com/chrislusf/seaweedfs/weed/glog"
  10. "github.com/chrislusf/seaweedfs/weed/operation"
  11. "github.com/chrislusf/seaweedfs/weed/pb"
  12. "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
  13. "github.com/chrislusf/seaweedfs/weed/storage"
  14. "github.com/chrislusf/seaweedfs/weed/storage/erasure_coding"
  15. "github.com/chrislusf/seaweedfs/weed/storage/needle"
  16. "github.com/chrislusf/seaweedfs/weed/storage/types"
  17. "github.com/chrislusf/seaweedfs/weed/util"
  18. )
  19. const BufferSizeLimit = 1024 * 1024 * 2
  20. // VolumeCopy copy the .idx .dat .vif files, and mount the volume
  21. func (vs *VolumeServer) VolumeCopy(req *volume_server_pb.VolumeCopyRequest, stream volume_server_pb.VolumeServer_VolumeCopyServer) error {
  22. v := vs.store.GetVolume(needle.VolumeId(req.VolumeId))
  23. if v != nil {
  24. glog.V(0).Infof("volume %d already exists. deleted before copying...", req.VolumeId)
  25. err := vs.store.DeleteVolume(needle.VolumeId(req.VolumeId))
  26. if err != nil {
  27. return fmt.Errorf("failed to delete existing volume %d: %v", req.VolumeId, err)
  28. }
  29. glog.V(0).Infof("deleted existing volume %d before copying.", req.VolumeId)
  30. }
  31. // the master will not start compaction for read-only volumes, so it is safe to just copy files directly
  32. // copy .dat and .idx files
  33. // read .idx .dat file size and timestamp
  34. // send .idx file
  35. // send .dat file
  36. // confirm size and timestamp
  37. var volFileInfoResp *volume_server_pb.ReadVolumeFileStatusResponse
  38. var dataBaseFileName, indexBaseFileName, idxFileName, datFileName string
  39. err := operation.WithVolumeServerClient(true, pb.ServerAddress(req.SourceDataNode), vs.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
  40. var err error
  41. volFileInfoResp, err = client.ReadVolumeFileStatus(context.Background(),
  42. &volume_server_pb.ReadVolumeFileStatusRequest{
  43. VolumeId: req.VolumeId,
  44. })
  45. if nil != err {
  46. return fmt.Errorf("read volume file status failed, %v", err)
  47. }
  48. diskType := volFileInfoResp.DiskType
  49. if req.DiskType != "" {
  50. diskType = req.DiskType
  51. }
  52. location := vs.store.FindFreeLocation(types.ToDiskType(diskType))
  53. if location == nil {
  54. return fmt.Errorf("no space left for disk type %s", types.ToDiskType(diskType).ReadableString())
  55. }
  56. dataBaseFileName = storage.VolumeFileName(location.Directory, volFileInfoResp.Collection, int(req.VolumeId))
  57. indexBaseFileName = storage.VolumeFileName(location.IdxDirectory, volFileInfoResp.Collection, int(req.VolumeId))
  58. os.WriteFile(dataBaseFileName+".note", []byte(fmt.Sprintf("copying from %s", req.SourceDataNode)), 0755)
  59. defer func() {
  60. if err != nil {
  61. os.Remove(dataBaseFileName + ".dat")
  62. os.Remove(indexBaseFileName + ".idx")
  63. os.Remove(dataBaseFileName + ".vif")
  64. os.Remove(dataBaseFileName + ".note")
  65. }
  66. }()
  67. // println("source:", volFileInfoResp.String())
  68. copyResponse := &volume_server_pb.VolumeCopyResponse{}
  69. reportInterval := int64(1024 * 1024 * 128)
  70. nextReportTarget := reportInterval
  71. var modifiedTsNs int64
  72. var sendErr error
  73. if modifiedTsNs, err = vs.doCopyFile(client, false, req.Collection, req.VolumeId, volFileInfoResp.CompactionRevision, volFileInfoResp.DatFileSize, dataBaseFileName, ".dat", false, true, func(processed int64) bool {
  74. if processed > nextReportTarget {
  75. copyResponse.ProcessedBytes = processed
  76. if sendErr = stream.Send(copyResponse); sendErr != nil {
  77. return false
  78. }
  79. nextReportTarget = processed + reportInterval
  80. }
  81. return true
  82. }); err != nil {
  83. return err
  84. }
  85. if sendErr != nil {
  86. return sendErr
  87. }
  88. if modifiedTsNs > 0 {
  89. os.Chtimes(dataBaseFileName+".dat", time.Unix(0, modifiedTsNs), time.Unix(0, modifiedTsNs))
  90. }
  91. if modifiedTsNs, err = vs.doCopyFile(client, false, req.Collection, req.VolumeId, volFileInfoResp.CompactionRevision, volFileInfoResp.IdxFileSize, indexBaseFileName, ".idx", false, false, nil); err != nil {
  92. return err
  93. }
  94. if modifiedTsNs > 0 {
  95. os.Chtimes(indexBaseFileName+".idx", time.Unix(0, modifiedTsNs), time.Unix(0, modifiedTsNs))
  96. }
  97. if modifiedTsNs, err = vs.doCopyFile(client, false, req.Collection, req.VolumeId, volFileInfoResp.CompactionRevision, volFileInfoResp.DatFileSize, dataBaseFileName, ".vif", false, true, nil); err != nil {
  98. return err
  99. }
  100. if modifiedTsNs > 0 {
  101. os.Chtimes(dataBaseFileName+".vif", time.Unix(0, modifiedTsNs), time.Unix(0, modifiedTsNs))
  102. }
  103. os.Remove(dataBaseFileName + ".note")
  104. return nil
  105. })
  106. if err != nil {
  107. return err
  108. }
  109. if dataBaseFileName == "" {
  110. return fmt.Errorf("not found volume %d file", req.VolumeId)
  111. }
  112. idxFileName = indexBaseFileName + ".idx"
  113. datFileName = dataBaseFileName + ".dat"
  114. defer func() {
  115. if err != nil && dataBaseFileName != "" {
  116. os.Remove(idxFileName)
  117. os.Remove(datFileName)
  118. os.Remove(dataBaseFileName + ".vif")
  119. }
  120. }()
  121. if err = checkCopyFiles(volFileInfoResp, idxFileName, datFileName); err != nil { // added by panyc16
  122. return err
  123. }
  124. // mount the volume
  125. err = vs.store.MountVolume(needle.VolumeId(req.VolumeId))
  126. if err != nil {
  127. return fmt.Errorf("failed to mount volume %d: %v", req.VolumeId, err)
  128. }
  129. if err = stream.Send(&volume_server_pb.VolumeCopyResponse{
  130. LastAppendAtNs: volFileInfoResp.DatFileTimestampSeconds * uint64(time.Second),
  131. }); err != nil {
  132. glog.Errorf("send response: %v", err)
  133. }
  134. return err
  135. }
  136. func (vs *VolumeServer) doCopyFile(client volume_server_pb.VolumeServerClient, isEcVolume bool, collection string, vid, compactRevision uint32, stopOffset uint64, baseFileName, ext string, isAppend, ignoreSourceFileNotFound bool, progressFn storage.ProgressFunc) (modifiedTsNs int64, err error) {
  137. copyFileClient, err := client.CopyFile(context.Background(), &volume_server_pb.CopyFileRequest{
  138. VolumeId: vid,
  139. Ext: ext,
  140. CompactionRevision: compactRevision,
  141. StopOffset: stopOffset,
  142. Collection: collection,
  143. IsEcVolume: isEcVolume,
  144. IgnoreSourceFileNotFound: ignoreSourceFileNotFound,
  145. })
  146. if err != nil {
  147. return modifiedTsNs, fmt.Errorf("failed to start copying volume %d %s file: %v", vid, ext, err)
  148. }
  149. modifiedTsNs, err = writeToFile(copyFileClient, baseFileName+ext, util.NewWriteThrottler(vs.compactionBytePerSecond), isAppend, progressFn)
  150. if err != nil {
  151. return modifiedTsNs, fmt.Errorf("failed to copy %s file: %v", baseFileName+ext, err)
  152. }
  153. return modifiedTsNs, nil
  154. }
  155. /**
  156. only check the the differ of the file size
  157. todo: maybe should check the received count and deleted count of the volume
  158. */
  159. func checkCopyFiles(originFileInf *volume_server_pb.ReadVolumeFileStatusResponse, idxFileName, datFileName string) error {
  160. stat, err := os.Stat(idxFileName)
  161. if err != nil {
  162. return fmt.Errorf("stat idx file %s failed: %v", idxFileName, err)
  163. }
  164. if originFileInf.IdxFileSize != uint64(stat.Size()) {
  165. return fmt.Errorf("idx file %s size [%v] is not same as origin file size [%v]",
  166. idxFileName, stat.Size(), originFileInf.IdxFileSize)
  167. }
  168. stat, err = os.Stat(datFileName)
  169. if err != nil {
  170. return fmt.Errorf("get dat file info failed, %v", err)
  171. }
  172. if originFileInf.DatFileSize != uint64(stat.Size()) {
  173. return fmt.Errorf("the dat file size [%v] is not same as origin file size [%v]",
  174. stat.Size(), originFileInf.DatFileSize)
  175. }
  176. return nil
  177. }
  178. func writeToFile(client volume_server_pb.VolumeServer_CopyFileClient, fileName string, wt *util.WriteThrottler, isAppend bool, progressFn storage.ProgressFunc) (modifiedTsNs int64, err error) {
  179. glog.V(4).Infof("writing to %s", fileName)
  180. flags := os.O_WRONLY | os.O_CREATE | os.O_TRUNC
  181. if isAppend {
  182. flags = os.O_WRONLY | os.O_CREATE
  183. }
  184. dst, err := os.OpenFile(fileName, flags, 0644)
  185. if err != nil {
  186. return modifiedTsNs, nil
  187. }
  188. defer dst.Close()
  189. var progressedBytes int64
  190. for {
  191. resp, receiveErr := client.Recv()
  192. if receiveErr == io.EOF {
  193. break
  194. }
  195. if resp != nil && resp.ModifiedTsNs != 0 {
  196. modifiedTsNs = resp.ModifiedTsNs
  197. }
  198. if receiveErr != nil {
  199. return modifiedTsNs, fmt.Errorf("receiving %s: %v", fileName, receiveErr)
  200. }
  201. dst.Write(resp.FileContent)
  202. progressedBytes += int64(len(resp.FileContent))
  203. if progressFn != nil {
  204. if !progressFn(progressedBytes) {
  205. return modifiedTsNs, fmt.Errorf("interrupted copy operation")
  206. }
  207. }
  208. wt.MaybeSlowdown(int64(len(resp.FileContent)))
  209. }
  210. return modifiedTsNs, nil
  211. }
  212. func (vs *VolumeServer) ReadVolumeFileStatus(ctx context.Context, req *volume_server_pb.ReadVolumeFileStatusRequest) (*volume_server_pb.ReadVolumeFileStatusResponse, error) {
  213. resp := &volume_server_pb.ReadVolumeFileStatusResponse{}
  214. v := vs.store.GetVolume(needle.VolumeId(req.VolumeId))
  215. if v == nil {
  216. return nil, fmt.Errorf("not found volume id %d", req.VolumeId)
  217. }
  218. resp.VolumeId = req.VolumeId
  219. datSize, idxSize, modTime := v.FileStat()
  220. resp.DatFileSize = datSize
  221. resp.IdxFileSize = idxSize
  222. resp.DatFileTimestampSeconds = uint64(modTime.Unix())
  223. resp.IdxFileTimestampSeconds = uint64(modTime.Unix())
  224. resp.FileCount = v.FileCount()
  225. resp.CompactionRevision = uint32(v.CompactionRevision)
  226. resp.Collection = v.Collection
  227. resp.DiskType = string(v.DiskType())
  228. return resp, nil
  229. }
  230. // CopyFile client pulls the volume related file from the source server.
  231. // if req.CompactionRevision != math.MaxUint32, it ensures the compact revision is as expected
  232. // The copying still stop at req.StopOffset, but you can set it to math.MaxUint64 in order to read all data.
  233. func (vs *VolumeServer) CopyFile(req *volume_server_pb.CopyFileRequest, stream volume_server_pb.VolumeServer_CopyFileServer) error {
  234. var fileName string
  235. if !req.IsEcVolume {
  236. v := vs.store.GetVolume(needle.VolumeId(req.VolumeId))
  237. if v == nil {
  238. return fmt.Errorf("not found volume id %d", req.VolumeId)
  239. }
  240. if uint32(v.CompactionRevision) != req.CompactionRevision && req.CompactionRevision != math.MaxUint32 {
  241. return fmt.Errorf("volume %d is compacted", req.VolumeId)
  242. }
  243. fileName = v.FileName(req.Ext)
  244. } else {
  245. baseFileName := erasure_coding.EcShardBaseFileName(req.Collection, int(req.VolumeId)) + req.Ext
  246. for _, location := range vs.store.Locations {
  247. tName := util.Join(location.Directory, baseFileName)
  248. if util.FileExists(tName) {
  249. fileName = tName
  250. }
  251. tName = util.Join(location.IdxDirectory, baseFileName)
  252. if util.FileExists(tName) {
  253. fileName = tName
  254. }
  255. }
  256. if fileName == "" {
  257. if req.IgnoreSourceFileNotFound {
  258. return nil
  259. }
  260. return fmt.Errorf("CopyFile not found ec volume id %d", req.VolumeId)
  261. }
  262. }
  263. bytesToRead := int64(req.StopOffset)
  264. file, err := os.Open(fileName)
  265. if err != nil {
  266. if req.IgnoreSourceFileNotFound && err == os.ErrNotExist {
  267. return nil
  268. }
  269. return err
  270. }
  271. defer file.Close()
  272. fileInfo, err := file.Stat()
  273. if err != nil {
  274. return err
  275. }
  276. fileModTsNs := fileInfo.ModTime().UnixNano()
  277. buffer := make([]byte, BufferSizeLimit)
  278. for bytesToRead > 0 {
  279. bytesread, err := file.Read(buffer)
  280. // println(fileName, "read", bytesread, "bytes, with target", bytesToRead)
  281. if err != nil {
  282. if err != io.EOF {
  283. return err
  284. }
  285. // println(fileName, "read", bytesread, "bytes, with target", bytesToRead, "err", err.Error())
  286. break
  287. }
  288. if int64(bytesread) > bytesToRead {
  289. bytesread = int(bytesToRead)
  290. }
  291. err = stream.Send(&volume_server_pb.CopyFileResponse{
  292. FileContent: buffer[:bytesread],
  293. ModifiedTsNs: fileModTsNs,
  294. })
  295. if err != nil {
  296. // println("sending", bytesread, "bytes err", err.Error())
  297. return err
  298. }
  299. fileModTsNs = 0 // only send once
  300. bytesToRead -= int64(bytesread)
  301. }
  302. return nil
  303. }