filer_notify_append.go 2.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475
  1. package filer
  2. import (
  3. "context"
  4. "fmt"
  5. "os"
  6. "time"
  7. "github.com/chrislusf/seaweedfs/weed/operation"
  8. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  9. "github.com/chrislusf/seaweedfs/weed/util"
  10. )
  11. func (f *Filer) appendToFile(targetFile string, data []byte) error {
  12. assignResult, uploadResult, err2 := f.assignAndUpload(targetFile, data)
  13. if err2 != nil {
  14. return err2
  15. }
  16. // find out existing entry
  17. fullpath := util.FullPath(targetFile)
  18. entry, err := f.FindEntry(context.Background(), fullpath)
  19. var offset int64 = 0
  20. if err == filer_pb.ErrNotFound {
  21. entry = &Entry{
  22. FullPath: fullpath,
  23. Attr: Attr{
  24. Crtime: time.Now(),
  25. Mtime: time.Now(),
  26. Mode: os.FileMode(0644),
  27. Uid: OS_UID,
  28. Gid: OS_GID,
  29. },
  30. }
  31. } else {
  32. offset = int64(TotalSize(entry.Chunks))
  33. }
  34. // append to existing chunks
  35. entry.Chunks = append(entry.Chunks, uploadResult.ToPbFileChunk(assignResult.Fid, offset))
  36. // update the entry
  37. err = f.CreateEntry(context.Background(), entry, false, false, nil)
  38. return err
  39. }
  40. func (f *Filer) assignAndUpload(targetFile string, data []byte) (*operation.AssignResult, *operation.UploadResult, error) {
  41. // assign a volume location
  42. rule := f.FilerConf.MatchStorageRule(targetFile)
  43. assignRequest := &operation.VolumeAssignRequest{
  44. Count: 1,
  45. Collection: util.Nvl(f.metaLogCollection, rule.Collection),
  46. Replication: util.Nvl(f.metaLogReplication, rule.Replication),
  47. WritableVolumeCount: rule.VolumeGrowthCount,
  48. }
  49. assignResult, err := operation.Assign(f.GetMaster, f.GrpcDialOption, assignRequest)
  50. if err != nil {
  51. return nil, nil, fmt.Errorf("AssignVolume: %v", err)
  52. }
  53. if assignResult.Error != "" {
  54. return nil, nil, fmt.Errorf("AssignVolume error: %v", assignResult.Error)
  55. }
  56. // upload data
  57. targetUrl := "http://" + assignResult.Url + "/" + assignResult.Fid
  58. uploadResult, err := operation.UploadData(targetUrl, "", f.Cipher, data, false, "", nil, assignResult.Auth)
  59. if err != nil {
  60. return nil, nil, fmt.Errorf("upload data %s: %v", targetUrl, err)
  61. }
  62. // println("uploaded to", targetUrl)
  63. return assignResult, uploadResult, nil
  64. }