filer_client.go 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246
  1. package filer_pb
  2. import (
  3. "context"
  4. "errors"
  5. "fmt"
  6. "io"
  7. "math"
  8. "os"
  9. "strings"
  10. "time"
  11. "github.com/chrislusf/seaweedfs/weed/util/log"
  12. "github.com/chrislusf/seaweedfs/weed/util"
  13. )
  14. var (
  15. OS_UID = uint32(os.Getuid())
  16. OS_GID = uint32(os.Getgid())
  17. )
  18. type FilerClient interface {
  19. WithFilerClient(fn func(SeaweedFilerClient) error) error
  20. AdjustedUrl(location *Location) string
  21. }
  22. func GetEntry(filerClient FilerClient, fullFilePath util.FullPath) (entry *Entry, err error) {
  23. dir, name := fullFilePath.DirAndName()
  24. err = filerClient.WithFilerClient(func(client SeaweedFilerClient) error {
  25. request := &LookupDirectoryEntryRequest{
  26. Directory: dir,
  27. Name: name,
  28. }
  29. // log.Tracef("read %s request: %v", fullFilePath, request)
  30. resp, err := LookupEntry(client, request)
  31. if err != nil {
  32. if err == ErrNotFound {
  33. return nil
  34. }
  35. log.Tracef("read %s %v: %v", fullFilePath, resp, err)
  36. return err
  37. }
  38. if resp.Entry == nil {
  39. // log.Tracef("read %s entry: %v", fullFilePath, entry)
  40. return nil
  41. }
  42. entry = resp.Entry
  43. return nil
  44. })
  45. return
  46. }
  47. type EachEntryFunciton func(entry *Entry, isLast bool) error
  48. func ReadDirAllEntries(filerClient FilerClient, fullDirPath util.FullPath, prefix string, fn EachEntryFunciton) (err error) {
  49. return doList(filerClient, fullDirPath, prefix, fn, "", false, math.MaxUint32)
  50. }
  51. func List(filerClient FilerClient, parentDirectoryPath, prefix string, fn EachEntryFunciton, startFrom string, inclusive bool, limit uint32) (err error) {
  52. return doList(filerClient, util.FullPath(parentDirectoryPath), prefix, fn, startFrom, inclusive, limit)
  53. }
  54. func doList(filerClient FilerClient, fullDirPath util.FullPath, prefix string, fn EachEntryFunciton, startFrom string, inclusive bool, limit uint32) (err error) {
  55. err = filerClient.WithFilerClient(func(client SeaweedFilerClient) error {
  56. request := &ListEntriesRequest{
  57. Directory: string(fullDirPath),
  58. Prefix: prefix,
  59. StartFromFileName: startFrom,
  60. Limit: limit,
  61. InclusiveStartFrom: inclusive,
  62. }
  63. log.Tracef("read directory: %v", request)
  64. ctx, cancel := context.WithCancel(context.Background())
  65. defer cancel()
  66. stream, err := client.ListEntries(ctx, request)
  67. if err != nil {
  68. return fmt.Errorf("list %s: %v", fullDirPath, err)
  69. }
  70. var prevEntry *Entry
  71. for {
  72. resp, recvErr := stream.Recv()
  73. if recvErr != nil {
  74. if recvErr == io.EOF {
  75. if prevEntry != nil {
  76. if err := fn(prevEntry, true); err != nil {
  77. return err
  78. }
  79. }
  80. break
  81. } else {
  82. return recvErr
  83. }
  84. }
  85. if prevEntry != nil {
  86. if err := fn(prevEntry, false); err != nil {
  87. return err
  88. }
  89. }
  90. prevEntry = resp.Entry
  91. }
  92. return nil
  93. })
  94. return
  95. }
  96. func Exists(filerClient FilerClient, parentDirectoryPath string, entryName string, isDirectory bool) (exists bool, err error) {
  97. err = filerClient.WithFilerClient(func(client SeaweedFilerClient) error {
  98. request := &LookupDirectoryEntryRequest{
  99. Directory: parentDirectoryPath,
  100. Name: entryName,
  101. }
  102. log.Tracef("exists entry %v/%v: %v", parentDirectoryPath, entryName, request)
  103. resp, err := LookupEntry(client, request)
  104. if err != nil {
  105. if err == ErrNotFound {
  106. exists = false
  107. return nil
  108. }
  109. log.Infof("exists entry %v: %v", request, err)
  110. return fmt.Errorf("exists entry %s/%s: %v", parentDirectoryPath, entryName, err)
  111. }
  112. exists = resp.Entry.IsDirectory == isDirectory
  113. return nil
  114. })
  115. return
  116. }
  117. func Mkdir(filerClient FilerClient, parentDirectoryPath string, dirName string, fn func(entry *Entry)) error {
  118. return filerClient.WithFilerClient(func(client SeaweedFilerClient) error {
  119. entry := &Entry{
  120. Name: dirName,
  121. IsDirectory: true,
  122. Attributes: &FuseAttributes{
  123. Mtime: time.Now().Unix(),
  124. Crtime: time.Now().Unix(),
  125. FileMode: uint32(0777 | os.ModeDir),
  126. Uid: OS_UID,
  127. Gid: OS_GID,
  128. },
  129. }
  130. if fn != nil {
  131. fn(entry)
  132. }
  133. request := &CreateEntryRequest{
  134. Directory: parentDirectoryPath,
  135. Entry: entry,
  136. }
  137. log.Debugf("mkdir: %v", request)
  138. if err := CreateEntry(client, request); err != nil {
  139. log.Infof("mkdir %v: %v", request, err)
  140. return fmt.Errorf("mkdir %s/%s: %v", parentDirectoryPath, dirName, err)
  141. }
  142. return nil
  143. })
  144. }
  145. func MkFile(filerClient FilerClient, parentDirectoryPath string, fileName string, chunks []*FileChunk) error {
  146. return filerClient.WithFilerClient(func(client SeaweedFilerClient) error {
  147. entry := &Entry{
  148. Name: fileName,
  149. IsDirectory: false,
  150. Attributes: &FuseAttributes{
  151. Mtime: time.Now().Unix(),
  152. Crtime: time.Now().Unix(),
  153. FileMode: uint32(0770),
  154. Uid: OS_UID,
  155. Gid: OS_GID,
  156. },
  157. Chunks: chunks,
  158. }
  159. request := &CreateEntryRequest{
  160. Directory: parentDirectoryPath,
  161. Entry: entry,
  162. }
  163. log.Debugf("create file: %s/%s", parentDirectoryPath, fileName)
  164. if err := CreateEntry(client, request); err != nil {
  165. log.Infof("create file %v:%v", request, err)
  166. return fmt.Errorf("create file %s/%s: %v", parentDirectoryPath, fileName, err)
  167. }
  168. return nil
  169. })
  170. }
  171. func Remove(filerClient FilerClient, parentDirectoryPath, name string, isDeleteData, isRecursive, ignoreRecursiveErr, isFromOtherCluster bool, signatures []int32) error {
  172. return filerClient.WithFilerClient(func(client SeaweedFilerClient) error {
  173. deleteEntryRequest := &DeleteEntryRequest{
  174. Directory: parentDirectoryPath,
  175. Name: name,
  176. IsDeleteData: isDeleteData,
  177. IsRecursive: isRecursive,
  178. IgnoreRecursiveError: ignoreRecursiveErr,
  179. IsFromOtherCluster: isFromOtherCluster,
  180. Signatures: signatures,
  181. }
  182. if resp, err := client.DeleteEntry(context.Background(), deleteEntryRequest); err != nil {
  183. if strings.Contains(err.Error(), ErrNotFound.Error()) {
  184. return nil
  185. }
  186. return err
  187. } else {
  188. if resp.Error != "" {
  189. if strings.Contains(resp.Error, ErrNotFound.Error()) {
  190. return nil
  191. }
  192. return errors.New(resp.Error)
  193. }
  194. }
  195. return nil
  196. })
  197. }