filer_meta_tail.go 6.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195
  1. package command
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/chrislusf/seaweedfs/weed/pb"
  6. "github.com/golang/protobuf/jsonpb"
  7. jsoniter "github.com/json-iterator/go"
  8. elastic "github.com/olivere/elastic/v7"
  9. "os"
  10. "path/filepath"
  11. "strings"
  12. "time"
  13. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  14. "github.com/chrislusf/seaweedfs/weed/security"
  15. "github.com/chrislusf/seaweedfs/weed/util"
  16. )
  17. func init() {
  18. cmdFilerMetaTail.Run = runFilerMetaTail // break init cycle
  19. }
  20. var cmdFilerMetaTail = &Command{
  21. UsageLine: "filer.meta.tail [-filer=localhost:8888] [-pathPrefix=/]",
  22. Short: "see continuous changes on a filer",
  23. Long: `See continuous changes on a filer.
  24. weed filer.meta.tail -timeAgo=30h | grep truncate
  25. weed filer.meta.tail -timeAgo=30h | jq .
  26. weed filer.meta.tail -timeAgo=30h | jq .eventNotification.newEntry.name
  27. weed filer.meta.tail -timeAgo=30h -es=http://<elasticSearchServerHost>:<port> -es.index=seaweedfs
  28. `,
  29. }
  30. var (
  31. tailFiler = cmdFilerMetaTail.Flag.String("filer", "localhost:8888", "filer hostname:port")
  32. tailTarget = cmdFilerMetaTail.Flag.String("pathPrefix", "/", "path to a folder or common prefix for the folders or files on filer")
  33. tailStart = cmdFilerMetaTail.Flag.Duration("timeAgo", 0, "start time before now. \"300ms\", \"1.5h\" or \"2h45m\". Valid time units are \"ns\", \"us\" (or \"µs\"), \"ms\", \"s\", \"m\", \"h\"")
  34. tailPattern = cmdFilerMetaTail.Flag.String("pattern", "", "full path or just filename pattern, ex: \"/home/?opher\", \"*.pdf\", see https://golang.org/pkg/path/filepath/#Match ")
  35. esServers = cmdFilerMetaTail.Flag.String("es", "", "comma-separated elastic servers http://<host:port>")
  36. esIndex = cmdFilerMetaTail.Flag.String("es.index", "seaweedfs", "ES index name")
  37. )
  38. func runFilerMetaTail(cmd *Command, args []string) bool {
  39. util.LoadConfiguration("security", false)
  40. grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client")
  41. clientId := util.RandomInt32()
  42. var filterFunc func(dir, fname string) bool
  43. if *tailPattern != "" {
  44. if strings.Contains(*tailPattern, "/") {
  45. println("watch path pattern", *tailPattern)
  46. filterFunc = func(dir, fname string) bool {
  47. matched, err := filepath.Match(*tailPattern, dir+"/"+fname)
  48. if err != nil {
  49. fmt.Printf("error: %v", err)
  50. }
  51. return matched
  52. }
  53. } else {
  54. println("watch file pattern", *tailPattern)
  55. filterFunc = func(dir, fname string) bool {
  56. matched, err := filepath.Match(*tailPattern, fname)
  57. if err != nil {
  58. fmt.Printf("error: %v", err)
  59. }
  60. return matched
  61. }
  62. }
  63. }
  64. shouldPrint := func(resp *filer_pb.SubscribeMetadataResponse) bool {
  65. if resp.EventNotification.OldEntry == nil && resp.EventNotification.NewEntry == nil {
  66. return false
  67. }
  68. if filterFunc == nil {
  69. return true
  70. }
  71. if resp.EventNotification.OldEntry != nil && filterFunc(resp.Directory, resp.EventNotification.OldEntry.Name) {
  72. return true
  73. }
  74. if resp.EventNotification.NewEntry != nil && filterFunc(resp.EventNotification.NewParentPath, resp.EventNotification.NewEntry.Name) {
  75. return true
  76. }
  77. return false
  78. }
  79. jsonpbMarshaler := jsonpb.Marshaler{
  80. EmitDefaults: false,
  81. }
  82. eachEntryFunc := func(resp *filer_pb.SubscribeMetadataResponse) error {
  83. jsonpbMarshaler.Marshal(os.Stdout, resp)
  84. fmt.Fprintln(os.Stdout)
  85. return nil
  86. }
  87. if *esServers != "" {
  88. var err error
  89. eachEntryFunc, err = sendToElasticSearchFunc(*esServers, *esIndex)
  90. if err != nil {
  91. fmt.Printf("create elastic search client to %s: %+v\n", *esServers, err)
  92. return false
  93. }
  94. }
  95. tailErr := pb.FollowMetadata(pb.ServerAddress(*tailFiler), grpcDialOption, "tail", clientId,
  96. *tailTarget, nil, time.Now().Add(-*tailStart).UnixNano(), 0,
  97. func(resp *filer_pb.SubscribeMetadataResponse) error {
  98. if !shouldPrint(resp) {
  99. return nil
  100. }
  101. if err := eachEntryFunc(resp); err != nil {
  102. return err
  103. }
  104. return nil
  105. }, false)
  106. if tailErr != nil {
  107. fmt.Printf("tail %s: %v\n", *tailFiler, tailErr)
  108. }
  109. return true
  110. }
  111. type EsDocument struct {
  112. Dir string `json:"dir,omitempty"`
  113. Name string `json:"name,omitempty"`
  114. IsDirectory bool `json:"isDir,omitempty"`
  115. Size uint64 `json:"size,omitempty"`
  116. Uid uint32 `json:"uid,omitempty"`
  117. Gid uint32 `json:"gid,omitempty"`
  118. UserName string `json:"userName,omitempty"`
  119. Collection string `json:"collection,omitempty"`
  120. Crtime int64 `json:"crtime,omitempty"`
  121. Mtime int64 `json:"mtime,omitempty"`
  122. Mime string `json:"mime,omitempty"`
  123. }
  124. func toEsEntry(event *filer_pb.EventNotification) (*EsDocument, string) {
  125. entry := event.NewEntry
  126. dir, name := event.NewParentPath, entry.Name
  127. id := util.Md5String([]byte(util.NewFullPath(dir, name)))
  128. esEntry := &EsDocument{
  129. Dir: dir,
  130. Name: name,
  131. IsDirectory: entry.IsDirectory,
  132. Size: entry.Attributes.FileSize,
  133. Uid: entry.Attributes.Uid,
  134. Gid: entry.Attributes.Gid,
  135. UserName: entry.Attributes.UserName,
  136. Collection: entry.Attributes.Collection,
  137. Crtime: entry.Attributes.Crtime,
  138. Mtime: entry.Attributes.Mtime,
  139. Mime: entry.Attributes.Mime,
  140. }
  141. return esEntry, id
  142. }
  143. func sendToElasticSearchFunc(servers string, esIndex string) (func(resp *filer_pb.SubscribeMetadataResponse) error, error) {
  144. options := []elastic.ClientOptionFunc{}
  145. options = append(options, elastic.SetURL(strings.Split(servers, ",")...))
  146. options = append(options, elastic.SetSniff(false))
  147. options = append(options, elastic.SetHealthcheck(false))
  148. client, err := elastic.NewClient(options...)
  149. if err != nil {
  150. return nil, err
  151. }
  152. return func(resp *filer_pb.SubscribeMetadataResponse) error {
  153. event := resp.EventNotification
  154. if event.OldEntry != nil &&
  155. (event.NewEntry == nil || resp.Directory != event.NewParentPath || event.OldEntry.Name != event.NewEntry.Name) {
  156. // delete or not update the same file
  157. dir, name := resp.Directory, event.OldEntry.Name
  158. id := util.Md5String([]byte(util.NewFullPath(dir, name)))
  159. println("delete", id)
  160. _, err := client.Delete().Index(esIndex).Id(id).Do(context.Background())
  161. return err
  162. }
  163. if event.NewEntry != nil {
  164. // add a new file or update the same file
  165. esEntry, id := toEsEntry(event)
  166. value, err := jsoniter.Marshal(esEntry)
  167. if err != nil {
  168. return err
  169. }
  170. println(string(value))
  171. _, err = client.Index().Index(esIndex).Id(id).BodyJson(string(value)).Do(context.Background())
  172. return err
  173. }
  174. return nil
  175. }, nil
  176. }