filer_meta_tail_elastic.go 2.6 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980
  1. //go:build elastic
  2. // +build elastic
  3. package command
  4. import (
  5. "context"
  6. jsoniter "github.com/json-iterator/go"
  7. elastic "github.com/olivere/elastic/v7"
  8. "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
  9. "github.com/seaweedfs/seaweedfs/weed/util"
  10. "strings"
  11. )
  12. type EsDocument struct {
  13. Dir string `json:"dir,omitempty"`
  14. Name string `json:"name,omitempty"`
  15. IsDirectory bool `json:"isDir,omitempty"`
  16. Size uint64 `json:"size,omitempty"`
  17. Uid uint32 `json:"uid,omitempty"`
  18. Gid uint32 `json:"gid,omitempty"`
  19. UserName string `json:"userName,omitempty"`
  20. Crtime int64 `json:"crtime,omitempty"`
  21. Mtime int64 `json:"mtime,omitempty"`
  22. Mime string `json:"mime,omitempty"`
  23. }
  24. func toEsEntry(event *filer_pb.EventNotification) (*EsDocument, string) {
  25. entry := event.NewEntry
  26. dir, name := event.NewParentPath, entry.Name
  27. id := util.Md5String([]byte(util.NewFullPath(dir, name)))
  28. esEntry := &EsDocument{
  29. Dir: dir,
  30. Name: name,
  31. IsDirectory: entry.IsDirectory,
  32. Size: entry.Attributes.FileSize,
  33. Uid: entry.Attributes.Uid,
  34. Gid: entry.Attributes.Gid,
  35. UserName: entry.Attributes.UserName,
  36. Crtime: entry.Attributes.Crtime,
  37. Mtime: entry.Attributes.Mtime,
  38. Mime: entry.Attributes.Mime,
  39. }
  40. return esEntry, id
  41. }
  42. func sendToElasticSearchFunc(servers string, esIndex string) (func(resp *filer_pb.SubscribeMetadataResponse) error, error) {
  43. options := []elastic.ClientOptionFunc{}
  44. options = append(options, elastic.SetURL(strings.Split(servers, ",")...))
  45. options = append(options, elastic.SetSniff(false))
  46. options = append(options, elastic.SetHealthcheck(false))
  47. client, err := elastic.NewClient(options...)
  48. if err != nil {
  49. return nil, err
  50. }
  51. return func(resp *filer_pb.SubscribeMetadataResponse) error {
  52. event := resp.EventNotification
  53. if event.OldEntry != nil &&
  54. (event.NewEntry == nil || resp.Directory != event.NewParentPath || event.OldEntry.Name != event.NewEntry.Name) {
  55. // delete or not update the same file
  56. dir, name := resp.Directory, event.OldEntry.Name
  57. id := util.Md5String([]byte(util.NewFullPath(dir, name)))
  58. println("delete", id)
  59. _, err := client.Delete().Index(esIndex).Id(id).Do(context.Background())
  60. return err
  61. }
  62. if event.NewEntry != nil {
  63. // add a new file or update the same file
  64. esEntry, id := toEsEntry(event)
  65. value, err := jsoniter.Marshal(esEntry)
  66. if err != nil {
  67. return err
  68. }
  69. println(string(value))
  70. _, err = client.Index().Index(esIndex).Id(id).BodyJson(string(value)).Do(context.Background())
  71. return err
  72. }
  73. return nil
  74. }, nil
  75. }