filer_notify_append.go 2.6 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192
  1. package filer
  2. import (
  3. "context"
  4. "fmt"
  5. "os"
  6. "time"
  7. "github.com/seaweedfs/seaweedfs/weed/operation"
  8. "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
  9. "github.com/seaweedfs/seaweedfs/weed/util"
  10. )
  11. func (f *Filer) appendToFile(targetFile string, data []byte) error {
  12. assignResult, uploadResult, err2 := f.assignAndUpload(targetFile, data)
  13. if err2 != nil {
  14. return err2
  15. }
  16. // find out existing entry
  17. fullpath := util.FullPath(targetFile)
  18. entry, err := f.FindEntry(context.Background(), fullpath)
  19. var offset int64 = 0
  20. if err == filer_pb.ErrNotFound {
  21. entry = &Entry{
  22. FullPath: fullpath,
  23. Attr: Attr{
  24. Crtime: time.Now(),
  25. Mtime: time.Now(),
  26. Mode: os.FileMode(0644),
  27. Uid: OS_UID,
  28. Gid: OS_GID,
  29. },
  30. }
  31. } else if err != nil {
  32. return fmt.Errorf("find %s: %v", fullpath, err)
  33. } else {
  34. offset = int64(TotalSize(entry.GetChunks()))
  35. }
  36. // append to existing chunks
  37. entry.Chunks = append(entry.GetChunks(), uploadResult.ToPbFileChunk(assignResult.Fid, offset, time.Now().UnixNano()))
  38. // update the entry
  39. err = f.CreateEntry(context.Background(), entry, false, false, nil, false, f.MaxFilenameLength)
  40. return err
  41. }
  42. func (f *Filer) assignAndUpload(targetFile string, data []byte) (*operation.AssignResult, *operation.UploadResult, error) {
  43. // assign a volume location
  44. rule := f.FilerConf.MatchStorageRule(targetFile)
  45. assignRequest := &operation.VolumeAssignRequest{
  46. Count: 1,
  47. Collection: util.Nvl(f.metaLogCollection, rule.Collection),
  48. Replication: util.Nvl(f.metaLogReplication, rule.Replication),
  49. WritableVolumeCount: rule.VolumeGrowthCount,
  50. }
  51. assignResult, err := operation.Assign(f.GetMaster, f.GrpcDialOption, assignRequest)
  52. if err != nil {
  53. return nil, nil, fmt.Errorf("AssignVolume: %v", err)
  54. }
  55. if assignResult.Error != "" {
  56. return nil, nil, fmt.Errorf("AssignVolume error: %v", assignResult.Error)
  57. }
  58. // upload data
  59. targetUrl := "http://" + assignResult.Url + "/" + assignResult.Fid
  60. uploadOption := &operation.UploadOption{
  61. UploadUrl: targetUrl,
  62. Filename: "",
  63. Cipher: f.Cipher,
  64. IsInputCompressed: false,
  65. MimeType: "",
  66. PairMap: nil,
  67. Jwt: assignResult.Auth,
  68. }
  69. uploader, err := operation.NewUploader()
  70. if err != nil {
  71. return nil, nil, fmt.Errorf("upload data %s: %v", targetUrl, err)
  72. }
  73. uploadResult, err := uploader.UploadData(data, uploadOption)
  74. if err != nil {
  75. return nil, nil, fmt.Errorf("upload data %s: %v", targetUrl, err)
  76. }
  77. // println("uploaded to", targetUrl)
  78. return assignResult, uploadResult, nil
  79. }