broker_write.go 2.2 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182
  1. package broker
  2. import (
  3. "fmt"
  4. "github.com/seaweedfs/seaweedfs/weed/filer"
  5. "github.com/seaweedfs/seaweedfs/weed/operation"
  6. "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
  7. "github.com/seaweedfs/seaweedfs/weed/util"
  8. "os"
  9. "time"
  10. )
  11. func (b *MessageQueueBroker) appendToFile(targetFile string, data []byte) error {
  12. fileId, uploadResult, err2 := b.assignAndUpload(targetFile, data)
  13. if err2 != nil {
  14. return err2
  15. }
  16. // find out existing entry
  17. fullpath := util.FullPath(targetFile)
  18. dir, name := fullpath.DirAndName()
  19. entry, err := filer_pb.GetEntry(b, fullpath)
  20. var offset int64 = 0
  21. if err == filer_pb.ErrNotFound {
  22. entry = &filer_pb.Entry{
  23. Name: name,
  24. IsDirectory: false,
  25. Attributes: &filer_pb.FuseAttributes{
  26. Crtime: time.Now().Unix(),
  27. Mtime: time.Now().Unix(),
  28. FileMode: uint32(os.FileMode(0644)),
  29. Uid: uint32(os.Getuid()),
  30. Gid: uint32(os.Getgid()),
  31. },
  32. }
  33. } else if err != nil {
  34. return fmt.Errorf("find %s: %v", fullpath, err)
  35. } else {
  36. offset = int64(filer.TotalSize(entry.GetChunks()))
  37. }
  38. // append to existing chunks
  39. entry.Chunks = append(entry.GetChunks(), uploadResult.ToPbFileChunk(fileId, offset, time.Now().UnixNano()))
  40. // update the entry
  41. return b.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
  42. return filer_pb.CreateEntry(client, &filer_pb.CreateEntryRequest{
  43. Directory: dir,
  44. Entry: entry,
  45. })
  46. })
  47. }
  48. func (b *MessageQueueBroker) assignAndUpload(targetFile string, data []byte) (fileId string, uploadResult *operation.UploadResult, err error) {
  49. reader := util.NewBytesReader(data)
  50. fileId, uploadResult, err, _ = operation.UploadWithRetry(
  51. b,
  52. &filer_pb.AssignVolumeRequest{
  53. Count: 1,
  54. Replication: b.option.DefaultReplication,
  55. Collection: "topics",
  56. // TtlSec: wfs.option.TtlSec,
  57. // DiskType: string(wfs.option.DiskType),
  58. DataCenter: b.option.DataCenter,
  59. Path: targetFile,
  60. },
  61. &operation.UploadOption{
  62. Cipher: b.option.Cipher,
  63. },
  64. func(host, fileId string) string {
  65. fileUrl := fmt.Sprintf("http://%s/%s", host, fileId)
  66. if b.option.VolumeServerAccess == "filerProxy" {
  67. fileUrl = fmt.Sprintf("http://%s/?proxyChunkId=%s", b.currentFiler, fileId)
  68. }
  69. return fileUrl
  70. },
  71. reader,
  72. )
  73. return
  74. }