filer_on_meta_event.go 1.3 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061
  1. package filer
  2. import (
  3. "bytes"
  4. "math"
  5. "github.com/chrislusf/seaweedfs/weed/util/log"
  6. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  7. "github.com/chrislusf/seaweedfs/weed/util"
  8. )
  9. // onMetadataChangeEvent is triggered after filer processed change events from local or remote filers
  10. func (f *Filer) onMetadataChangeEvent(event *filer_pb.SubscribeMetadataResponse) {
  11. if DirectoryEtc != event.Directory {
  12. if DirectoryEtc != event.EventNotification.NewParentPath {
  13. return
  14. }
  15. }
  16. entry := event.EventNotification.NewEntry
  17. if entry == nil {
  18. return
  19. }
  20. log.Infof("procesing %v", event)
  21. if entry.Name == FilerConfName {
  22. f.reloadFilerConfiguration(entry)
  23. }
  24. }
  25. func (f *Filer) readEntry(chunks []*filer_pb.FileChunk) ([]byte, error) {
  26. var buf bytes.Buffer
  27. err := StreamContent(f.MasterClient, &buf, chunks, 0, math.MaxInt64)
  28. if err != nil {
  29. return nil, err
  30. }
  31. return buf.Bytes(), nil
  32. }
  33. func (f *Filer) reloadFilerConfiguration(entry *filer_pb.Entry) {
  34. fc := NewFilerConf()
  35. err := fc.loadFromChunks(f, entry.Chunks)
  36. if err != nil {
  37. log.Errorf("read filer conf chunks: %v", err)
  38. return
  39. }
  40. f.FilerConf = fc
  41. }
  42. func (f *Filer) LoadFilerConf() {
  43. fc := NewFilerConf()
  44. err := util.Retry("loadFilerConf", func() error {
  45. return fc.loadFromFiler(f)
  46. })
  47. if err != nil {
  48. log.Errorf("read filer conf: %v", err)
  49. return
  50. }
  51. f.FilerConf = fc
  52. }