auth_credentials_subscribe.go 2.8 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192
  1. package s3api
  2. import (
  3. "github.com/seaweedfs/seaweedfs/weed/filer"
  4. "github.com/seaweedfs/seaweedfs/weed/glog"
  5. "github.com/seaweedfs/seaweedfs/weed/pb"
  6. "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
  7. "github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants"
  8. "github.com/seaweedfs/seaweedfs/weed/util"
  9. )
  10. func (s3a *S3ApiServer) subscribeMetaEvents(clientName string, lastTsNs int64, prefix string, directoriesToWatch []string) {
  11. processEventFn := func(resp *filer_pb.SubscribeMetadataResponse) error {
  12. message := resp.EventNotification
  13. if message.NewEntry == nil {
  14. return nil
  15. }
  16. dir := resp.Directory
  17. if message.NewParentPath != "" {
  18. dir = message.NewParentPath
  19. }
  20. fileName := message.NewEntry.Name
  21. content := message.NewEntry.Content
  22. _ = s3a.onIamConfigUpdate(dir, fileName, content)
  23. _ = s3a.onCircuitBreakerConfigUpdate(dir, fileName, content)
  24. _ = s3a.onBucketMetadataChange(dir, message.OldEntry, message.NewEntry)
  25. return nil
  26. }
  27. var clientEpoch int32
  28. metadataFollowOption := &pb.MetadataFollowOption{
  29. ClientName: clientName,
  30. ClientId: s3a.randomClientId,
  31. ClientEpoch: clientEpoch,
  32. SelfSignature: 0,
  33. PathPrefix: prefix,
  34. AdditionalPathPrefixes: nil,
  35. DirectoriesToWatch: directoriesToWatch,
  36. StartTsNs: lastTsNs,
  37. StopTsNs: 0,
  38. EventErrorType: pb.FatalOnError,
  39. }
  40. util.RetryForever("followIamChanges", func() error {
  41. clientEpoch++
  42. return pb.WithFilerClientFollowMetadata(s3a, metadataFollowOption, processEventFn)
  43. }, func(err error) bool {
  44. glog.V(0).Infof("iam follow metadata changes: %v", err)
  45. return true
  46. })
  47. }
  48. // reload iam config
  49. func (s3a *S3ApiServer) onIamConfigUpdate(dir, filename string, content []byte) error {
  50. if dir == filer.IamConfigDirectory && filename == filer.IamIdentityFile {
  51. if err := s3a.iam.LoadS3ApiConfigurationFromBytes(content); err != nil {
  52. return err
  53. }
  54. glog.V(0).Infof("updated %s/%s", dir, filename)
  55. }
  56. return nil
  57. }
  58. // reload circuit breaker config
  59. func (s3a *S3ApiServer) onCircuitBreakerConfigUpdate(dir, filename string, content []byte) error {
  60. if dir == s3_constants.CircuitBreakerConfigDir && filename == s3_constants.CircuitBreakerConfigFile {
  61. if err := s3a.cb.LoadS3ApiConfigurationFromBytes(content); err != nil {
  62. return err
  63. }
  64. glog.V(0).Infof("updated %s/%s", dir, filename)
  65. }
  66. return nil
  67. }
  68. // reload bucket metadata
  69. func (s3a *S3ApiServer) onBucketMetadataChange(dir string, oldEntry *filer_pb.Entry, newEntry *filer_pb.Entry) error {
  70. if dir == s3a.option.BucketsPath {
  71. if newEntry != nil {
  72. s3a.bucketRegistry.LoadBucketMetadata(newEntry)
  73. glog.V(0).Infof("updated bucketMetadata %s/%s", dir, newEntry)
  74. } else {
  75. s3a.bucketRegistry.RemoveBucketMetadata(oldEntry)
  76. glog.V(0).Infof("remove bucketMetadata %s/%s", dir, newEntry)
  77. }
  78. }
  79. return nil
  80. }