filer_client_util.go 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172
  1. package filer2
  2. import (
  3. "context"
  4. "fmt"
  5. "io"
  6. "math"
  7. "strings"
  8. "sync"
  9. "github.com/chrislusf/seaweedfs/weed/glog"
  10. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  11. "github.com/chrislusf/seaweedfs/weed/util"
  12. )
  13. func VolumeId(fileId string) string {
  14. lastCommaIndex := strings.LastIndex(fileId, ",")
  15. if lastCommaIndex > 0 {
  16. return fileId[:lastCommaIndex]
  17. }
  18. return fileId
  19. }
  20. type FilerClient interface {
  21. WithFilerClient(ctx context.Context, fn func(context.Context, filer_pb.SeaweedFilerClient) error) error
  22. }
  23. func ReadIntoBuffer(ctx context.Context, filerClient FilerClient, fullFilePath FullPath, buff []byte, chunkViews []*ChunkView, baseOffset int64) (totalRead int64, err error) {
  24. var vids []string
  25. for _, chunkView := range chunkViews {
  26. vids = append(vids, VolumeId(chunkView.FileId))
  27. }
  28. vid2Locations := make(map[string]*filer_pb.Locations)
  29. err = filerClient.WithFilerClient(ctx, func(ctx context.Context, client filer_pb.SeaweedFilerClient) error {
  30. glog.V(4).Infof("read fh lookup volume id locations: %v", vids)
  31. resp, err := client.LookupVolume(ctx, &filer_pb.LookupVolumeRequest{
  32. VolumeIds: vids,
  33. })
  34. if err != nil {
  35. return err
  36. }
  37. vid2Locations = resp.LocationsMap
  38. return nil
  39. })
  40. if err != nil {
  41. return 0, fmt.Errorf("failed to lookup volume ids %v: %v", vids, err)
  42. }
  43. var wg sync.WaitGroup
  44. for _, chunkView := range chunkViews {
  45. wg.Add(1)
  46. go func(chunkView *ChunkView) {
  47. defer wg.Done()
  48. glog.V(4).Infof("read fh reading chunk: %+v", chunkView)
  49. locations := vid2Locations[VolumeId(chunkView.FileId)]
  50. if locations == nil || len(locations.Locations) == 0 {
  51. glog.V(0).Infof("failed to locate %s", chunkView.FileId)
  52. err = fmt.Errorf("failed to locate %s", chunkView.FileId)
  53. return
  54. }
  55. var n int64
  56. n, err = util.ReadUrl(
  57. fmt.Sprintf("http://%s/%s", locations.Locations[0].Url, chunkView.FileId),
  58. chunkView.Offset,
  59. int(chunkView.Size),
  60. buff[chunkView.LogicOffset-baseOffset:chunkView.LogicOffset-baseOffset+int64(chunkView.Size)],
  61. !chunkView.IsFullChunk)
  62. if err != nil {
  63. glog.V(0).Infof("%v read http://%s/%v %v bytes: %v", fullFilePath, locations.Locations[0].Url, chunkView.FileId, n, err)
  64. err = fmt.Errorf("failed to read http://%s/%s: %v",
  65. locations.Locations[0].Url, chunkView.FileId, err)
  66. return
  67. }
  68. glog.V(4).Infof("read fh read %d bytes: %+v", n, chunkView)
  69. totalRead += n
  70. }(chunkView)
  71. }
  72. wg.Wait()
  73. return
  74. }
  75. func GetEntry(ctx context.Context, filerClient FilerClient, fullFilePath FullPath) (entry *filer_pb.Entry, err error) {
  76. dir, name := fullFilePath.DirAndName()
  77. err = filerClient.WithFilerClient(ctx, func(ctx context.Context, client filer_pb.SeaweedFilerClient) error {
  78. request := &filer_pb.LookupDirectoryEntryRequest{
  79. Directory: dir,
  80. Name: name,
  81. }
  82. // glog.V(3).Infof("read %s request: %v", fullFilePath, request)
  83. resp, err := client.LookupDirectoryEntry(ctx, request)
  84. if err != nil {
  85. if err == ErrNotFound || strings.Contains(err.Error(), ErrNotFound.Error()) {
  86. return nil
  87. }
  88. glog.V(3).Infof("read %s %v: %v", fullFilePath, resp, err)
  89. return err
  90. }
  91. if resp.Entry == nil {
  92. // glog.V(3).Infof("read %s entry: %v", fullFilePath, entry)
  93. return nil
  94. }
  95. entry = resp.Entry
  96. return nil
  97. })
  98. return
  99. }
  100. func ReadDirAllEntries(ctx context.Context, filerClient FilerClient, fullDirPath FullPath, prefix string, fn func(entry *filer_pb.Entry, isLast bool)) (err error) {
  101. err = filerClient.WithFilerClient(ctx, func(ctx context.Context, client filer_pb.SeaweedFilerClient) error {
  102. lastEntryName := ""
  103. request := &filer_pb.ListEntriesRequest{
  104. Directory: string(fullDirPath),
  105. Prefix: prefix,
  106. StartFromFileName: lastEntryName,
  107. Limit: math.MaxUint32,
  108. }
  109. glog.V(3).Infof("read directory: %v", request)
  110. stream, err := client.ListEntries(ctx, request)
  111. if err != nil {
  112. return fmt.Errorf("list %s: %v", fullDirPath, err)
  113. }
  114. var prevEntry *filer_pb.Entry
  115. for {
  116. resp, recvErr := stream.Recv()
  117. if recvErr != nil {
  118. if recvErr == io.EOF {
  119. if prevEntry != nil {
  120. fn(prevEntry, true)
  121. }
  122. break
  123. } else {
  124. return recvErr
  125. }
  126. }
  127. if prevEntry != nil {
  128. fn(prevEntry, false)
  129. }
  130. prevEntry = resp.Entry
  131. }
  132. return nil
  133. })
  134. return
  135. }