123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211 |
- package command
- import (
- "context"
- "fmt"
- "github.com/golang/protobuf/jsonpb"
- jsoniter "github.com/json-iterator/go"
- "github.com/olivere/elastic/v7"
- "io"
- "os"
- "path/filepath"
- "strings"
- "time"
- "github.com/chrislusf/seaweedfs/weed/pb"
- "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
- "github.com/chrislusf/seaweedfs/weed/security"
- "github.com/chrislusf/seaweedfs/weed/util"
- )
- func init() {
- cmdFilerMetaTail.Run = runFilerMetaTail // break init cycle
- }
- var cmdFilerMetaTail = &Command{
- UsageLine: "filer.meta.tail [-filer=localhost:8888] [-pathPrefix=/]",
- Short: "see continuous changes on a filer",
- Long: `See continuous changes on a filer.
- weed filer.meta.tail -timeAgo=30h | grep truncate
- weed filer.meta.tail -timeAgo=30h | jq .
- weed filer.meta.tail -timeAgo=30h | jq .eventNotification.newEntry.name
- `,
- }
- var (
- tailFiler = cmdFilerMetaTail.Flag.String("filer", "localhost:8888", "filer hostname:port")
- tailTarget = cmdFilerMetaTail.Flag.String("pathPrefix", "/", "path to a folder or common prefix for the folders or files on filer")
- tailStart = cmdFilerMetaTail.Flag.Duration("timeAgo", 0, "start time before now. \"300ms\", \"1.5h\" or \"2h45m\". Valid time units are \"ns\", \"us\" (or \"µs\"), \"ms\", \"s\", \"m\", \"h\"")
- tailPattern = cmdFilerMetaTail.Flag.String("pattern", "", "full path or just filename pattern, ex: \"/home/?opher\", \"*.pdf\", see https://golang.org/pkg/path/filepath/#Match ")
- esServers = cmdFilerMetaTail.Flag.String("es", "", "comma-separated elastic servers http://<host:port>")
- esIndex = cmdFilerMetaTail.Flag.String("es.index", "seaweedfs", "ES index name")
- )
- func runFilerMetaTail(cmd *Command, args []string) bool {
- grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client")
- var filterFunc func(dir, fname string) bool
- if *tailPattern != "" {
- if strings.Contains(*tailPattern, "/") {
- println("watch path pattern", *tailPattern)
- filterFunc = func(dir, fname string) bool {
- matched, err := filepath.Match(*tailPattern, dir+"/"+fname)
- if err != nil {
- fmt.Printf("error: %v", err)
- }
- return matched
- }
- } else {
- println("watch file pattern", *tailPattern)
- filterFunc = func(dir, fname string) bool {
- matched, err := filepath.Match(*tailPattern, fname)
- if err != nil {
- fmt.Printf("error: %v", err)
- }
- return matched
- }
- }
- }
- shouldPrint := func(resp *filer_pb.SubscribeMetadataResponse) bool {
- if filterFunc == nil {
- return true
- }
- if resp.EventNotification.OldEntry == nil && resp.EventNotification.NewEntry == nil {
- return false
- }
- if resp.EventNotification.OldEntry != nil && filterFunc(resp.Directory, resp.EventNotification.OldEntry.Name) {
- return true
- }
- if resp.EventNotification.NewEntry != nil && filterFunc(resp.EventNotification.NewParentPath, resp.EventNotification.NewEntry.Name) {
- return true
- }
- return false
- }
- jsonpbMarshaler := jsonpb.Marshaler{
- EmitDefaults: false,
- }
- eachEntryFunc := func(resp *filer_pb.SubscribeMetadataResponse) error {
- jsonpbMarshaler.Marshal(os.Stdout, resp)
- fmt.Fprintln(os.Stdout)
- return nil
- }
- if *esServers != "" {
- var err error
- eachEntryFunc, err = sendToElasticSearchFunc(*esServers, *esIndex)
- if err != nil {
- fmt.Printf("create elastic search client to %s: %+v\n", *esServers, err)
- return false
- }
- }
- tailErr := pb.WithFilerClient(*tailFiler, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
- ctx, cancel := context.WithCancel(context.Background())
- defer cancel()
- stream, err := client.SubscribeMetadata(ctx, &filer_pb.SubscribeMetadataRequest{
- ClientName: "tail",
- PathPrefix: *tailTarget,
- SinceNs: time.Now().Add(-*tailStart).UnixNano(),
- })
- if err != nil {
- return fmt.Errorf("listen: %v", err)
- }
- for {
- resp, listenErr := stream.Recv()
- if listenErr == io.EOF {
- return nil
- }
- if listenErr != nil {
- return listenErr
- }
- if !shouldPrint(resp) {
- continue
- }
- if err = eachEntryFunc(resp); err != nil {
- return err
- }
- }
- })
- if tailErr != nil {
- fmt.Printf("tail %s: %v\n", *tailFiler, tailErr)
- }
- return true
- }
- type EsDocument struct {
- Dir string `json:"dir,omitempty"`
- Name string `json:"name,omitempty"`
- IsDirectory bool `json:"isDir,omitempty"`
- Size uint64 `json:"size,omitempty"`
- Uid uint32 `json:"uid,omitempty"`
- Gid uint32 `json:"gid,omitempty"`
- UserName string `json:"userName,omitempty"`
- Collection string `json:"collection,omitempty"`
- Crtime int64 `json:"crtime,omitempty"`
- Mtime int64 `json:"mtime,omitempty"`
- Mime string `json:"mime,omitempty"`
- }
- func toEsEntry(event *filer_pb.EventNotification) (*EsDocument, string) {
- entry := event.NewEntry
- dir, name := event.NewParentPath, entry.Name
- id := util.Md5String([]byte(util.NewFullPath(dir, name)))
- esEntry := &EsDocument{
- Dir: dir,
- Name: name,
- IsDirectory: entry.IsDirectory,
- Size: entry.Attributes.FileSize,
- Uid: entry.Attributes.Uid,
- Gid: entry.Attributes.Gid,
- UserName: entry.Attributes.UserName,
- Collection: entry.Attributes.Collection,
- Crtime: entry.Attributes.Crtime,
- Mtime: entry.Attributes.Mtime,
- Mime: entry.Attributes.Mime,
- }
- return esEntry, id
- }
- func sendToElasticSearchFunc(servers string, esIndex string) (func(resp *filer_pb.SubscribeMetadataResponse) error, error) {
- options := []elastic.ClientOptionFunc{}
- options = append(options, elastic.SetURL(strings.Split(servers, ",")...))
- options = append(options, elastic.SetSniff(false))
- options = append(options, elastic.SetHealthcheck(false))
- client, err := elastic.NewClient(options...)
- if err != nil {
- return nil, err
- }
- return func(resp *filer_pb.SubscribeMetadataResponse) error {
- event := resp.EventNotification
- if event.OldEntry != nil &&
- (event.NewEntry == nil || resp.Directory != event.NewParentPath || event.OldEntry.Name != event.NewEntry.Name) {
- // delete or not update the same file
- dir, name := resp.Directory, event.OldEntry.Name
- id := util.Md5String([]byte(util.NewFullPath(dir, name)))
- println("delete", id)
- _, err := client.Delete().Index(esIndex).Id(id).Do(context.Background())
- return err
- }
- if event.NewEntry != nil {
- // add a new file or update the same file
- esEntry, id := toEsEntry(event)
- value, err := jsoniter.Marshal(esEntry)
- if err != nil {
- return err
- }
- println(string(value))
- _, err = client.Index().Index(esIndex).Id(id).BodyJson(string(value)).Do(context.Background())
- return err
- }
- return nil
- }, nil
- }
|