filer_client_util.go 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169
  1. package filer2
  2. import (
  3. "context"
  4. "fmt"
  5. "io"
  6. "math"
  7. "strings"
  8. "sync"
  9. "github.com/chrislusf/seaweedfs/weed/glog"
  10. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  11. "github.com/chrislusf/seaweedfs/weed/util"
  12. )
  13. func VolumeId(fileId string) string {
  14. lastCommaIndex := strings.LastIndex(fileId, ",")
  15. if lastCommaIndex > 0 {
  16. return fileId[:lastCommaIndex]
  17. }
  18. return fileId
  19. }
  20. type FilerClient interface {
  21. WithFilerClient(fn func(filer_pb.SeaweedFilerClient) error) error
  22. AdjustedUrl(hostAndPort string) string
  23. }
  24. func ReadIntoBuffer(filerClient FilerClient, fullFilePath FullPath, buff []byte, chunkViews []*ChunkView, baseOffset int64) (totalRead int64, err error) {
  25. var vids []string
  26. for _, chunkView := range chunkViews {
  27. vids = append(vids, VolumeId(chunkView.FileId))
  28. }
  29. vid2Locations := make(map[string]*filer_pb.Locations)
  30. err = filerClient.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
  31. glog.V(4).Infof("read fh lookup volume id locations: %v", vids)
  32. resp, err := client.LookupVolume(context.Background(), &filer_pb.LookupVolumeRequest{
  33. VolumeIds: vids,
  34. })
  35. if err != nil {
  36. return err
  37. }
  38. vid2Locations = resp.LocationsMap
  39. return nil
  40. })
  41. if err != nil {
  42. return 0, fmt.Errorf("failed to lookup volume ids %v: %v", vids, err)
  43. }
  44. var wg sync.WaitGroup
  45. for _, chunkView := range chunkViews {
  46. wg.Add(1)
  47. go func(chunkView *ChunkView) {
  48. defer wg.Done()
  49. glog.V(4).Infof("read fh reading chunk: %+v", chunkView)
  50. locations := vid2Locations[VolumeId(chunkView.FileId)]
  51. if locations == nil || len(locations.Locations) == 0 {
  52. glog.V(0).Infof("failed to locate %s", chunkView.FileId)
  53. err = fmt.Errorf("failed to locate %s", chunkView.FileId)
  54. return
  55. }
  56. volumeServerAddress := filerClient.AdjustedUrl(locations.Locations[0].Url)
  57. var n int64
  58. n, err = util.ReadUrl(fmt.Sprintf("http://%s/%s", volumeServerAddress, chunkView.FileId), chunkView.CipherKey, chunkView.isGzipped, chunkView.IsFullChunk, chunkView.Offset, int(chunkView.Size), buff[chunkView.LogicOffset-baseOffset:chunkView.LogicOffset-baseOffset+int64(chunkView.Size)])
  59. if err != nil {
  60. glog.V(0).Infof("%v read http://%s/%v %v bytes: %v", fullFilePath, volumeServerAddress, chunkView.FileId, n, err)
  61. err = fmt.Errorf("failed to read http://%s/%s: %v",
  62. volumeServerAddress, chunkView.FileId, err)
  63. return
  64. }
  65. glog.V(4).Infof("read fh read %d bytes: %+v", n, chunkView)
  66. totalRead += n
  67. }(chunkView)
  68. }
  69. wg.Wait()
  70. return
  71. }
  72. func GetEntry(filerClient FilerClient, fullFilePath FullPath) (entry *filer_pb.Entry, err error) {
  73. dir, name := fullFilePath.DirAndName()
  74. err = filerClient.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
  75. request := &filer_pb.LookupDirectoryEntryRequest{
  76. Directory: dir,
  77. Name: name,
  78. }
  79. // glog.V(3).Infof("read %s request: %v", fullFilePath, request)
  80. resp, err := filer_pb.LookupEntry(client, request)
  81. if err != nil {
  82. if err == filer_pb.ErrNotFound {
  83. return nil
  84. }
  85. glog.V(3).Infof("read %s %v: %v", fullFilePath, resp, err)
  86. return err
  87. }
  88. if resp.Entry == nil {
  89. // glog.V(3).Infof("read %s entry: %v", fullFilePath, entry)
  90. return nil
  91. }
  92. entry = resp.Entry
  93. return nil
  94. })
  95. return
  96. }
  97. func ReadDirAllEntries(filerClient FilerClient, fullDirPath FullPath, prefix string, fn func(entry *filer_pb.Entry, isLast bool)) (err error) {
  98. err = filerClient.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
  99. lastEntryName := ""
  100. request := &filer_pb.ListEntriesRequest{
  101. Directory: string(fullDirPath),
  102. Prefix: prefix,
  103. StartFromFileName: lastEntryName,
  104. Limit: math.MaxUint32,
  105. }
  106. glog.V(3).Infof("read directory: %v", request)
  107. stream, err := client.ListEntries(context.Background(), request)
  108. if err != nil {
  109. return fmt.Errorf("list %s: %v", fullDirPath, err)
  110. }
  111. var prevEntry *filer_pb.Entry
  112. for {
  113. resp, recvErr := stream.Recv()
  114. if recvErr != nil {
  115. if recvErr == io.EOF {
  116. if prevEntry != nil {
  117. fn(prevEntry, true)
  118. }
  119. break
  120. } else {
  121. return recvErr
  122. }
  123. }
  124. if prevEntry != nil {
  125. fn(prevEntry, false)
  126. }
  127. prevEntry = resp.Entry
  128. }
  129. return nil
  130. })
  131. return
  132. }