auth_credentials_subscribe.go 1.6 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970
  1. package s3api
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/chrislusf/seaweedfs/weed/filer"
  6. "github.com/chrislusf/seaweedfs/weed/glog"
  7. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  8. "io"
  9. "time"
  10. )
  11. func (s3a *S3ApiServer) subscribeMetaEvents(clientName string, prefix string, lastTsNs int64) error {
  12. processEventFn := func(resp *filer_pb.SubscribeMetadataResponse) error {
  13. message := resp.EventNotification
  14. if message.NewEntry == nil {
  15. return nil
  16. }
  17. dir := resp.Directory
  18. if message.NewParentPath != "" {
  19. dir = message.NewParentPath
  20. }
  21. if dir == filer.IamConfigDirecotry && message.NewEntry.Name == filer.IamIdentityFile {
  22. if err := s3a.iam.loadS3ApiConfigurationFromBytes(message.NewEntry.Content); err != nil {
  23. return err
  24. }
  25. glog.V(0).Infof("updated %s/%s", filer.IamConfigDirecotry, filer.IamIdentityFile)
  26. }
  27. return nil
  28. }
  29. for {
  30. err := s3a.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
  31. ctx, cancel := context.WithCancel(context.Background())
  32. defer cancel()
  33. stream, err := client.SubscribeMetadata(ctx, &filer_pb.SubscribeMetadataRequest{
  34. ClientName: clientName,
  35. PathPrefix: prefix,
  36. SinceNs: lastTsNs,
  37. })
  38. if err != nil {
  39. return fmt.Errorf("subscribe: %v", err)
  40. }
  41. for {
  42. resp, listenErr := stream.Recv()
  43. if listenErr == io.EOF {
  44. return nil
  45. }
  46. if listenErr != nil {
  47. return listenErr
  48. }
  49. if err := processEventFn(resp); err != nil {
  50. glog.Fatalf("process %v: %v", resp, err)
  51. }
  52. lastTsNs = resp.TsNs
  53. }
  54. })
  55. if err != nil {
  56. glog.Errorf("subscribing filer meta change: %v", err)
  57. }
  58. time.Sleep(time.Second)
  59. }
  60. }