filer_meta_tail.go 6.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201
  1. package command
  2. import (
  3. "context"
  4. "fmt"
  5. jsoniter "github.com/json-iterator/go"
  6. "github.com/olivere/elastic/v7"
  7. "io"
  8. "path/filepath"
  9. "strings"
  10. "time"
  11. "github.com/chrislusf/seaweedfs/weed/pb"
  12. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  13. "github.com/chrislusf/seaweedfs/weed/security"
  14. "github.com/chrislusf/seaweedfs/weed/util"
  15. )
  16. func init() {
  17. cmdFilerMetaTail.Run = runFilerMetaTail // break init cycle
  18. }
  19. var cmdFilerMetaTail = &Command{
  20. UsageLine: "filer.meta.tail [-filer=localhost:8888] [-target=/]",
  21. Short: "see recent changes on a filer",
  22. Long: `See recent changes on a filer.
  23. `,
  24. }
  25. var (
  26. tailFiler = cmdFilerMetaTail.Flag.String("filer", "localhost:8888", "filer hostname:port")
  27. tailTarget = cmdFilerMetaTail.Flag.String("pathPrefix", "/", "path to a folder or file, or common prefix for the folders or files on filer")
  28. tailStart = cmdFilerMetaTail.Flag.Duration("timeAgo", 0, "start time before now. \"300ms\", \"1.5h\" or \"2h45m\". Valid time units are \"ns\", \"us\" (or \"µs\"), \"ms\", \"s\", \"m\", \"h\"")
  29. tailPattern = cmdFilerMetaTail.Flag.String("pattern", "", "full path or just filename pattern, ex: \"/home/?opher\", \"*.pdf\", see https://golang.org/pkg/path/filepath/#Match ")
  30. esServers = cmdFilerMetaTail.Flag.String("es", "", "comma-separated elastic servers http://<host:port>")
  31. esIndex = cmdFilerMetaTail.Flag.String("es.index", "seaweedfs", "ES index name")
  32. )
  33. func runFilerMetaTail(cmd *Command, args []string) bool {
  34. grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client")
  35. var filterFunc func(dir, fname string) bool
  36. if *tailPattern != "" {
  37. if strings.Contains(*tailPattern, "/") {
  38. println("watch path pattern", *tailPattern)
  39. filterFunc = func(dir, fname string) bool {
  40. matched, err := filepath.Match(*tailPattern, dir+"/"+fname)
  41. if err != nil {
  42. fmt.Printf("error: %v", err)
  43. }
  44. return matched
  45. }
  46. } else {
  47. println("watch file pattern", *tailPattern)
  48. filterFunc = func(dir, fname string) bool {
  49. matched, err := filepath.Match(*tailPattern, fname)
  50. if err != nil {
  51. fmt.Printf("error: %v", err)
  52. }
  53. return matched
  54. }
  55. }
  56. }
  57. shouldPrint := func(resp *filer_pb.SubscribeMetadataResponse) bool {
  58. if filterFunc == nil {
  59. return true
  60. }
  61. if resp.EventNotification.OldEntry == nil && resp.EventNotification.NewEntry == nil {
  62. return false
  63. }
  64. if resp.EventNotification.OldEntry != nil && filterFunc(resp.Directory, resp.EventNotification.OldEntry.Name) {
  65. return true
  66. }
  67. if resp.EventNotification.NewEntry != nil && filterFunc(resp.EventNotification.NewParentPath, resp.EventNotification.NewEntry.Name) {
  68. return true
  69. }
  70. return false
  71. }
  72. eachEntryFunc := func(resp *filer_pb.SubscribeMetadataResponse) error {
  73. fmt.Printf("dir:%s %+v\n", resp.Directory, resp.EventNotification)
  74. return nil
  75. }
  76. if *esServers != "" {
  77. var err error
  78. eachEntryFunc, err = sendToElasticSearchFunc(*esServers, *esIndex)
  79. if err != nil {
  80. fmt.Printf("create elastic search client to %s: %+v\n", *esServers, err)
  81. return false
  82. }
  83. }
  84. tailErr := pb.WithFilerClient(*tailFiler, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
  85. ctx, cancel := context.WithCancel(context.Background())
  86. defer cancel()
  87. stream, err := client.SubscribeMetadata(ctx, &filer_pb.SubscribeMetadataRequest{
  88. ClientName: "tail",
  89. PathPrefix: *tailTarget,
  90. SinceNs: time.Now().Add(-*tailStart).UnixNano(),
  91. })
  92. if err != nil {
  93. return fmt.Errorf("listen: %v", err)
  94. }
  95. for {
  96. resp, listenErr := stream.Recv()
  97. if listenErr == io.EOF {
  98. return nil
  99. }
  100. if listenErr != nil {
  101. return listenErr
  102. }
  103. if !shouldPrint(resp) {
  104. continue
  105. }
  106. if err = eachEntryFunc(resp); err != nil {
  107. return err
  108. }
  109. }
  110. })
  111. if tailErr != nil {
  112. fmt.Printf("tail %s: %v\n", *tailFiler, tailErr)
  113. }
  114. return true
  115. }
  116. type EsDocument struct {
  117. Dir string `json:"dir,omitempty"`
  118. Name string `json:"name,omitempty"`
  119. IsDirectory bool `json:"isDir,omitempty"`
  120. Size uint64 `json:"size,omitempty"`
  121. Uid uint32 `json:"uid,omitempty"`
  122. Gid uint32 `json:"gid,omitempty"`
  123. UserName string `json:"userName,omitempty"`
  124. Collection string `json:"collection,omitempty"`
  125. Crtime int64 `json:"crtime,omitempty"`
  126. Mtime int64 `json:"mtime,omitempty"`
  127. Mime string `json:"mime,omitempty"`
  128. }
  129. func toEsEntry(event *filer_pb.EventNotification) (*EsDocument, string) {
  130. entry := event.NewEntry
  131. dir, name := event.NewParentPath, entry.Name
  132. id := util.Md5String([]byte(util.NewFullPath(dir, name)))
  133. esEntry := &EsDocument{
  134. Dir: dir,
  135. Name: name,
  136. IsDirectory: entry.IsDirectory,
  137. Size: entry.Attributes.FileSize,
  138. Uid: entry.Attributes.Uid,
  139. Gid: entry.Attributes.Gid,
  140. UserName: entry.Attributes.UserName,
  141. Collection: entry.Attributes.Collection,
  142. Crtime: entry.Attributes.Crtime,
  143. Mtime: entry.Attributes.Mtime,
  144. Mime: entry.Attributes.Mime,
  145. }
  146. return esEntry, id
  147. }
  148. func sendToElasticSearchFunc(servers string, esIndex string) (func(resp *filer_pb.SubscribeMetadataResponse) error, error) {
  149. options := []elastic.ClientOptionFunc{}
  150. options = append(options, elastic.SetURL(strings.Split(servers, ",")...))
  151. options = append(options, elastic.SetSniff(false))
  152. options = append(options, elastic.SetHealthcheck(false))
  153. client, err := elastic.NewClient(options...)
  154. if err != nil {
  155. return nil, err
  156. }
  157. return func(resp *filer_pb.SubscribeMetadataResponse) error {
  158. event := resp.EventNotification
  159. if event.OldEntry != nil &&
  160. (event.NewEntry == nil || resp.Directory != event.NewParentPath || event.OldEntry.Name != event.NewEntry.Name) {
  161. // delete or not update the same file
  162. dir, name := resp.Directory, event.OldEntry.Name
  163. id := util.Md5String([]byte(util.NewFullPath(dir, name)))
  164. println("delete", id)
  165. _, err := client.Delete().Index(esIndex).Id(id).Do(context.Background())
  166. return err
  167. }
  168. if event.NewEntry != nil {
  169. // add a new file or update the same file
  170. esEntry, id := toEsEntry(event)
  171. value, err := jsoniter.Marshal(esEntry)
  172. if err != nil {
  173. return err
  174. }
  175. println(string(value))
  176. _, err = client.Index().Index(esIndex).Id(id).BodyJson(string(value)).Do(context.Background())
  177. return err
  178. }
  179. return nil
  180. }, nil
  181. }