command_fs_meta_notify.go 1.7 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374
  1. package shell
  2. import (
  3. "fmt"
  4. "io"
  5. "github.com/chrislusf/seaweedfs/weed/filer2"
  6. "github.com/chrislusf/seaweedfs/weed/notification"
  7. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  8. "github.com/chrislusf/seaweedfs/weed/util"
  9. )
  10. func init() {
  11. Commands = append(Commands, &commandFsMetaNotify{})
  12. }
  13. type commandFsMetaNotify struct {
  14. }
  15. func (c *commandFsMetaNotify) Name() string {
  16. return "fs.meta.notify"
  17. }
  18. func (c *commandFsMetaNotify) Help() string {
  19. return `recursively send directory and file meta data to notifiction message queue
  20. fs.meta.notify # send meta data from current directory to notification message queue
  21. The message queue will use it to trigger replication from this filer.
  22. `
  23. }
  24. func (c *commandFsMetaNotify) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
  25. filerServer, filerPort, path, err := commandEnv.parseUrl(findInputDirectory(args))
  26. if err != nil {
  27. return err
  28. }
  29. util.LoadConfiguration("notification", true)
  30. v := util.GetViper()
  31. notification.LoadConfiguration(v, "notification.")
  32. var dirCount, fileCount uint64
  33. err = doTraverseBFS(writer, commandEnv.getFilerClient(filerServer, filerPort), filer2.FullPath(path), func(parentPath filer2.FullPath, entry *filer_pb.Entry) {
  34. if entry.IsDirectory {
  35. dirCount++
  36. } else {
  37. fileCount++
  38. }
  39. notifyErr := notification.Queue.SendMessage(
  40. string(parentPath.Child(entry.Name)),
  41. &filer_pb.EventNotification{
  42. NewEntry: entry,
  43. },
  44. )
  45. if notifyErr != nil {
  46. fmt.Fprintf(writer, "fail to notify new entry event for %s: %v\n", parentPath.Child(entry.Name), notifyErr)
  47. }
  48. })
  49. if err == nil {
  50. fmt.Fprintf(writer, "\ntotal notified %d directories, %d files\n", dirCount, fileCount)
  51. }
  52. return err
  53. }