auth_credentials_subscribe.go 1.9 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465
  1. package s3api
  2. import (
  3. "github.com/seaweedfs/seaweedfs/weed/filer"
  4. "github.com/seaweedfs/seaweedfs/weed/glog"
  5. "github.com/seaweedfs/seaweedfs/weed/pb"
  6. "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
  7. "github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants"
  8. "github.com/seaweedfs/seaweedfs/weed/util"
  9. )
  10. func (s3a *S3ApiServer) subscribeMetaEvents(clientName string, prefix string, lastTsNs int64) {
  11. processEventFn := func(resp *filer_pb.SubscribeMetadataResponse) error {
  12. message := resp.EventNotification
  13. if message.NewEntry == nil {
  14. return nil
  15. }
  16. dir := resp.Directory
  17. if message.NewParentPath != "" {
  18. dir = message.NewParentPath
  19. }
  20. fileName := message.NewEntry.Name
  21. content := message.NewEntry.Content
  22. _ = s3a.onIamConfigUpdate(dir, fileName, content)
  23. _ = s3a.onCircuitBreakerConfigUpdate(dir, fileName, content)
  24. return nil
  25. }
  26. var clientEpoch int32
  27. util.RetryForever("followIamChanges", func() error {
  28. clientEpoch++
  29. return pb.WithFilerClientFollowMetadata(s3a, clientName, s3a.randomClientId, clientEpoch, prefix, nil, &lastTsNs, 0, 0, processEventFn, pb.FatalOnError)
  30. }, func(err error) bool {
  31. glog.V(0).Infof("iam follow metadata changes: %v", err)
  32. return true
  33. })
  34. }
  35. // reload iam config
  36. func (s3a *S3ApiServer) onIamConfigUpdate(dir, filename string, content []byte) error {
  37. if dir == filer.IamConfigDirectory && filename == filer.IamIdentityFile {
  38. if err := s3a.iam.LoadS3ApiConfigurationFromBytes(content); err != nil {
  39. return err
  40. }
  41. glog.V(0).Infof("updated %s/%s", dir, filename)
  42. }
  43. return nil
  44. }
  45. // reload circuit breaker config
  46. func (s3a *S3ApiServer) onCircuitBreakerConfigUpdate(dir, filename string, content []byte) error {
  47. if dir == s3_constants.CircuitBreakerConfigDir && filename == s3_constants.CircuitBreakerConfigFile {
  48. if err := s3a.cb.LoadS3ApiConfigurationFromBytes(content); err != nil {
  49. return err
  50. }
  51. glog.V(0).Infof("updated %s/%s", dir, filename)
  52. }
  53. return nil
  54. }