command_fs_meta_notify.go 1.6 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273
  1. package shell
  2. import (
  3. "fmt"
  4. "io"
  5. "github.com/seaweedfs/seaweedfs/weed/notification"
  6. "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
  7. "github.com/seaweedfs/seaweedfs/weed/util"
  8. )
  9. func init() {
  10. Commands = append(Commands, &commandFsMetaNotify{})
  11. }
  12. type commandFsMetaNotify struct {
  13. }
  14. func (c *commandFsMetaNotify) Name() string {
  15. return "fs.meta.notify"
  16. }
  17. func (c *commandFsMetaNotify) Help() string {
  18. return `recursively send directory and file meta data to notification message queue
  19. fs.meta.notify # send meta data from current directory to notification message queue
  20. The message queue will use it to trigger replication from this filer.
  21. `
  22. }
  23. func (c *commandFsMetaNotify) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
  24. path, err := commandEnv.parseUrl(findInputDirectory(args))
  25. if err != nil {
  26. return err
  27. }
  28. util.LoadConfiguration("notification", true)
  29. v := util.GetViper()
  30. notification.LoadConfiguration(v, "notification.")
  31. var dirCount, fileCount uint64
  32. err = filer_pb.TraverseBfs(commandEnv, util.FullPath(path), func(parentPath util.FullPath, entry *filer_pb.Entry) {
  33. if entry.IsDirectory {
  34. dirCount++
  35. } else {
  36. fileCount++
  37. }
  38. notifyErr := notification.Queue.SendMessage(
  39. string(parentPath.Child(entry.Name)),
  40. &filer_pb.EventNotification{
  41. NewEntry: entry,
  42. },
  43. )
  44. if notifyErr != nil {
  45. fmt.Fprintf(writer, "fail to notify new entry event for %s: %v\n", parentPath.Child(entry.Name), notifyErr)
  46. }
  47. })
  48. if err == nil {
  49. fmt.Fprintf(writer, "\ntotal notified %d directories, %d files\n", dirCount, fileCount)
  50. }
  51. return err
  52. }