broker_write.go 2.3 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788
  1. package broker
  2. import (
  3. "fmt"
  4. "github.com/seaweedfs/seaweedfs/weed/filer"
  5. "github.com/seaweedfs/seaweedfs/weed/operation"
  6. "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
  7. "github.com/seaweedfs/seaweedfs/weed/util"
  8. "os"
  9. "time"
  10. )
  11. func (b *MessageQueueBroker) appendToFile(targetFile string, data []byte) error {
  12. fileId, uploadResult, err2 := b.assignAndUpload(targetFile, data)
  13. if err2 != nil {
  14. return err2
  15. }
  16. // find out existing entry
  17. fullpath := util.FullPath(targetFile)
  18. dir, name := fullpath.DirAndName()
  19. entry, err := filer_pb.GetEntry(b, fullpath)
  20. var offset int64 = 0
  21. if err == filer_pb.ErrNotFound {
  22. entry = &filer_pb.Entry{
  23. Name: name,
  24. IsDirectory: false,
  25. Attributes: &filer_pb.FuseAttributes{
  26. Crtime: time.Now().Unix(),
  27. Mtime: time.Now().Unix(),
  28. FileMode: uint32(os.FileMode(0644)),
  29. Uid: uint32(os.Getuid()),
  30. Gid: uint32(os.Getgid()),
  31. },
  32. }
  33. } else if err != nil {
  34. return fmt.Errorf("find %s: %v", fullpath, err)
  35. } else {
  36. offset = int64(filer.TotalSize(entry.GetChunks()))
  37. }
  38. // append to existing chunks
  39. entry.Chunks = append(entry.GetChunks(), uploadResult.ToPbFileChunk(fileId, offset, time.Now().UnixNano()))
  40. // update the entry
  41. return b.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
  42. return filer_pb.CreateEntry(client, &filer_pb.CreateEntryRequest{
  43. Directory: dir,
  44. Entry: entry,
  45. })
  46. })
  47. }
  48. func (b *MessageQueueBroker) assignAndUpload(targetFile string, data []byte) (fileId string, uploadResult *operation.UploadResult, err error) {
  49. reader := util.NewBytesReader(data)
  50. uploader, err := operation.NewUploader()
  51. if err != nil {
  52. return
  53. }
  54. fileId, uploadResult, err, _ = uploader.UploadWithRetry(
  55. b,
  56. &filer_pb.AssignVolumeRequest{
  57. Count: 1,
  58. Replication: b.option.DefaultReplication,
  59. Collection: "topics",
  60. // TtlSec: wfs.option.TtlSec,
  61. // DiskType: string(wfs.option.DiskType),
  62. DataCenter: b.option.DataCenter,
  63. Path: targetFile,
  64. },
  65. &operation.UploadOption{
  66. Cipher: b.option.Cipher,
  67. },
  68. func(host, fileId string) string {
  69. fileUrl := fmt.Sprintf("http://%s/%s", host, fileId)
  70. if b.option.VolumeServerAccess == "filerProxy" {
  71. fileUrl = fmt.Sprintf("http://%s/?proxyChunkId=%s", b.currentFiler, fileId)
  72. }
  73. return fileUrl
  74. },
  75. reader,
  76. )
  77. return
  78. }