filer_client.go 7.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269
  1. package filer_pb
  2. import (
  3. "context"
  4. "errors"
  5. "fmt"
  6. "io"
  7. "os"
  8. "strings"
  9. "time"
  10. "github.com/chrislusf/seaweedfs/weed/glog"
  11. "github.com/chrislusf/seaweedfs/weed/util"
  12. )
  13. var (
  14. OS_UID = uint32(os.Getuid())
  15. OS_GID = uint32(os.Getgid())
  16. )
  17. type FilerClient interface {
  18. WithFilerClient(fn func(SeaweedFilerClient) error) error
  19. AdjustedUrl(location *Location) string
  20. }
  21. func GetEntry(filerClient FilerClient, fullFilePath util.FullPath) (entry *Entry, err error) {
  22. dir, name := fullFilePath.DirAndName()
  23. err = filerClient.WithFilerClient(func(client SeaweedFilerClient) error {
  24. request := &LookupDirectoryEntryRequest{
  25. Directory: dir,
  26. Name: name,
  27. }
  28. // glog.V(3).Infof("read %s request: %v", fullFilePath, request)
  29. resp, err := LookupEntry(client, request)
  30. if err != nil {
  31. if err == ErrNotFound {
  32. return nil
  33. }
  34. glog.V(3).Infof("read %s %v: %v", fullFilePath, resp, err)
  35. return err
  36. }
  37. if resp.Entry == nil {
  38. // glog.V(3).Infof("read %s entry: %v", fullFilePath, entry)
  39. return nil
  40. }
  41. entry = resp.Entry
  42. return nil
  43. })
  44. return
  45. }
  46. type EachEntryFunciton func(entry *Entry, isLast bool) error
  47. func ReadDirAllEntries(filerClient FilerClient, fullDirPath util.FullPath, prefix string, fn EachEntryFunciton) (err error) {
  48. var counter uint32
  49. var startFrom string
  50. var counterFunc = func(entry *Entry, isLast bool) error {
  51. counter++
  52. startFrom = entry.Name
  53. return fn(entry, isLast)
  54. }
  55. var paginationLimit uint32 = 10000
  56. if err = doList(filerClient, fullDirPath, prefix, counterFunc, "", false, paginationLimit); err != nil {
  57. return err
  58. }
  59. for counter == paginationLimit {
  60. counter = 0
  61. if err = doList(filerClient, fullDirPath, prefix, counterFunc, startFrom, false, paginationLimit); err != nil {
  62. return err
  63. }
  64. }
  65. return nil
  66. }
  67. func List(filerClient FilerClient, parentDirectoryPath, prefix string, fn EachEntryFunciton, startFrom string, inclusive bool, limit uint32) (err error) {
  68. return filerClient.WithFilerClient(func(client SeaweedFilerClient) error {
  69. return doSeaweedList(client, util.FullPath(parentDirectoryPath), prefix, fn, startFrom, inclusive, limit)
  70. })
  71. }
  72. func doList(filerClient FilerClient, fullDirPath util.FullPath, prefix string, fn EachEntryFunciton, startFrom string, inclusive bool, limit uint32) (err error) {
  73. return filerClient.WithFilerClient(func(client SeaweedFilerClient) error {
  74. return doSeaweedList(client, fullDirPath, prefix, fn, startFrom, inclusive, limit)
  75. })
  76. }
  77. func SeaweedList(client SeaweedFilerClient, parentDirectoryPath, prefix string, fn EachEntryFunciton, startFrom string, inclusive bool, limit uint32) (err error) {
  78. return doSeaweedList(client, util.FullPath(parentDirectoryPath), prefix, fn, startFrom, inclusive, limit)
  79. }
  80. func doSeaweedList(client SeaweedFilerClient, fullDirPath util.FullPath, prefix string, fn EachEntryFunciton, startFrom string, inclusive bool, limit uint32) (err error) {
  81. request := &ListEntriesRequest{
  82. Directory: string(fullDirPath),
  83. Prefix: prefix,
  84. StartFromFileName: startFrom,
  85. Limit: limit,
  86. InclusiveStartFrom: inclusive,
  87. }
  88. glog.V(4).Infof("read directory: %v", request)
  89. ctx, cancel := context.WithCancel(context.Background())
  90. defer cancel()
  91. stream, err := client.ListEntries(ctx, request)
  92. if err != nil {
  93. return fmt.Errorf("list %s: %v", fullDirPath, err)
  94. }
  95. var prevEntry *Entry
  96. for {
  97. resp, recvErr := stream.Recv()
  98. if recvErr != nil {
  99. if recvErr == io.EOF {
  100. if prevEntry != nil {
  101. if err := fn(prevEntry, true); err != nil {
  102. return err
  103. }
  104. }
  105. break
  106. } else {
  107. return recvErr
  108. }
  109. }
  110. if prevEntry != nil {
  111. if err := fn(prevEntry, false); err != nil {
  112. return err
  113. }
  114. }
  115. prevEntry = resp.Entry
  116. }
  117. return nil
  118. }
  119. func Exists(filerClient FilerClient, parentDirectoryPath string, entryName string, isDirectory bool) (exists bool, err error) {
  120. err = filerClient.WithFilerClient(func(client SeaweedFilerClient) error {
  121. request := &LookupDirectoryEntryRequest{
  122. Directory: parentDirectoryPath,
  123. Name: entryName,
  124. }
  125. glog.V(4).Infof("exists entry %v/%v: %v", parentDirectoryPath, entryName, request)
  126. resp, err := LookupEntry(client, request)
  127. if err != nil {
  128. if err == ErrNotFound {
  129. exists = false
  130. return nil
  131. }
  132. glog.V(0).Infof("exists entry %v: %v", request, err)
  133. return fmt.Errorf("exists entry %s/%s: %v", parentDirectoryPath, entryName, err)
  134. }
  135. exists = resp.Entry.IsDirectory == isDirectory
  136. return nil
  137. })
  138. return
  139. }
  140. func Mkdir(filerClient FilerClient, parentDirectoryPath string, dirName string, fn func(entry *Entry)) error {
  141. return filerClient.WithFilerClient(func(client SeaweedFilerClient) error {
  142. entry := &Entry{
  143. Name: dirName,
  144. IsDirectory: true,
  145. Attributes: &FuseAttributes{
  146. Mtime: time.Now().Unix(),
  147. Crtime: time.Now().Unix(),
  148. FileMode: uint32(0777 | os.ModeDir),
  149. Uid: OS_UID,
  150. Gid: OS_GID,
  151. },
  152. }
  153. if fn != nil {
  154. fn(entry)
  155. }
  156. request := &CreateEntryRequest{
  157. Directory: parentDirectoryPath,
  158. Entry: entry,
  159. }
  160. glog.V(1).Infof("mkdir: %v", request)
  161. if err := CreateEntry(client, request); err != nil {
  162. glog.V(0).Infof("mkdir %v: %v", request, err)
  163. return fmt.Errorf("mkdir %s/%s: %v", parentDirectoryPath, dirName, err)
  164. }
  165. return nil
  166. })
  167. }
  168. func MkFile(filerClient FilerClient, parentDirectoryPath string, fileName string, chunks []*FileChunk) error {
  169. return filerClient.WithFilerClient(func(client SeaweedFilerClient) error {
  170. entry := &Entry{
  171. Name: fileName,
  172. IsDirectory: false,
  173. Attributes: &FuseAttributes{
  174. Mtime: time.Now().Unix(),
  175. Crtime: time.Now().Unix(),
  176. FileMode: uint32(0770),
  177. Uid: OS_UID,
  178. Gid: OS_GID,
  179. },
  180. Chunks: chunks,
  181. }
  182. request := &CreateEntryRequest{
  183. Directory: parentDirectoryPath,
  184. Entry: entry,
  185. }
  186. glog.V(1).Infof("create file: %s/%s", parentDirectoryPath, fileName)
  187. if err := CreateEntry(client, request); err != nil {
  188. glog.V(0).Infof("create file %v:%v", request, err)
  189. return fmt.Errorf("create file %s/%s: %v", parentDirectoryPath, fileName, err)
  190. }
  191. return nil
  192. })
  193. }
  194. func Remove(filerClient FilerClient, parentDirectoryPath, name string, isDeleteData, isRecursive, ignoreRecursiveErr, isFromOtherCluster bool, signatures []int32) error {
  195. return filerClient.WithFilerClient(func(client SeaweedFilerClient) error {
  196. deleteEntryRequest := &DeleteEntryRequest{
  197. Directory: parentDirectoryPath,
  198. Name: name,
  199. IsDeleteData: isDeleteData,
  200. IsRecursive: isRecursive,
  201. IgnoreRecursiveError: ignoreRecursiveErr,
  202. IsFromOtherCluster: isFromOtherCluster,
  203. Signatures: signatures,
  204. }
  205. if resp, err := client.DeleteEntry(context.Background(), deleteEntryRequest); err != nil {
  206. if strings.Contains(err.Error(), ErrNotFound.Error()) {
  207. return nil
  208. }
  209. return err
  210. } else {
  211. if resp.Error != "" {
  212. if strings.Contains(resp.Error, ErrNotFound.Error()) {
  213. return nil
  214. }
  215. return errors.New(resp.Error)
  216. }
  217. }
  218. return nil
  219. })
  220. }