meta_aggregator.go 6.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215
  1. package filer
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/chrislusf/seaweedfs/weed/util"
  6. "io"
  7. "sync"
  8. "time"
  9. "github.com/golang/protobuf/proto"
  10. "google.golang.org/grpc"
  11. "github.com/chrislusf/seaweedfs/weed/glog"
  12. "github.com/chrislusf/seaweedfs/weed/pb"
  13. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  14. "github.com/chrislusf/seaweedfs/weed/util/log_buffer"
  15. )
  16. type MetaAggregator struct {
  17. filers []string
  18. grpcDialOption grpc.DialOption
  19. MetaLogBuffer *log_buffer.LogBuffer
  20. // notifying clients
  21. ListenersLock sync.Mutex
  22. ListenersCond *sync.Cond
  23. }
  24. // MetaAggregator only aggregates data "on the fly". The logs are not re-persisted to disk.
  25. // The old data comes from what each LocalMetadata persisted on disk.
  26. func NewMetaAggregator(filers []string, grpcDialOption grpc.DialOption) *MetaAggregator {
  27. t := &MetaAggregator{
  28. filers: filers,
  29. grpcDialOption: grpcDialOption,
  30. }
  31. t.ListenersCond = sync.NewCond(&t.ListenersLock)
  32. t.MetaLogBuffer = log_buffer.NewLogBuffer("aggr", LogFlushInterval, nil, func() {
  33. t.ListenersCond.Broadcast()
  34. })
  35. return t
  36. }
  37. func (ma *MetaAggregator) StartLoopSubscribe(f *Filer, self string) {
  38. for _, filer := range ma.filers {
  39. go ma.subscribeToOneFiler(f, self, filer)
  40. }
  41. }
  42. func (ma *MetaAggregator) subscribeToOneFiler(f *Filer, self string, peer string) {
  43. /*
  44. Each filer reads the "filer.store.id", which is the store's signature when filer starts.
  45. When reading from other filers' local meta changes:
  46. * if the received change does not contain signature from self, apply the change to current filer store.
  47. Upon connecting to other filers, need to remember their signature and their offsets.
  48. */
  49. var maybeReplicateMetadataChange func(*filer_pb.SubscribeMetadataResponse)
  50. lastPersistTime := time.Now()
  51. lastTsNs := time.Now().Add(-LogFlushInterval).UnixNano()
  52. peerSignature, err := ma.readFilerStoreSignature(peer)
  53. for err != nil {
  54. glog.V(0).Infof("connecting to peer filer %s: %v", peer, err)
  55. time.Sleep(1357 * time.Millisecond)
  56. peerSignature, err = ma.readFilerStoreSignature(peer)
  57. }
  58. // when filer store is not shared by multiple filers
  59. if peerSignature != f.Signature {
  60. lastTsNs = 0
  61. if prevTsNs, err := ma.readOffset(f, peer, peerSignature); err == nil {
  62. lastTsNs = prevTsNs
  63. }
  64. glog.V(0).Infof("follow peer: %v, last %v (%d)", peer, time.Unix(0, lastTsNs), lastTsNs)
  65. var counter int64
  66. var synced bool
  67. maybeReplicateMetadataChange = func(event *filer_pb.SubscribeMetadataResponse) {
  68. if err := Replay(f.Store, event); err != nil {
  69. glog.Errorf("failed to reply metadata change from %v: %v", peer, err)
  70. return
  71. }
  72. counter++
  73. if lastPersistTime.Add(time.Minute).Before(time.Now()) {
  74. if err := ma.updateOffset(f, peer, peerSignature, event.TsNs); err == nil {
  75. if event.TsNs < time.Now().Add(-2*time.Minute).UnixNano() {
  76. glog.V(0).Infof("sync with %s progressed to: %v %0.2f/sec", peer, time.Unix(0, event.TsNs), float64(counter)/60.0)
  77. } else if !synced {
  78. synced = true
  79. glog.V(0).Infof("synced with %s", peer)
  80. }
  81. lastPersistTime = time.Now()
  82. counter = 0
  83. } else {
  84. glog.V(0).Infof("failed to update offset for %v: %v", peer, err)
  85. }
  86. }
  87. }
  88. }
  89. processEventFn := func(event *filer_pb.SubscribeMetadataResponse) error {
  90. data, err := proto.Marshal(event)
  91. if err != nil {
  92. glog.Errorf("failed to marshal subscribed filer_pb.SubscribeMetadataResponse %+v: %v", event, err)
  93. return err
  94. }
  95. dir := event.Directory
  96. // println("received meta change", dir, "size", len(data))
  97. ma.MetaLogBuffer.AddToBuffer([]byte(dir), data, 0)
  98. if maybeReplicateMetadataChange != nil {
  99. maybeReplicateMetadataChange(event)
  100. }
  101. return nil
  102. }
  103. for {
  104. glog.V(4).Infof("subscribing remote %s meta change: %v", peer, time.Unix(0, lastTsNs))
  105. err := pb.WithFilerClient(peer, ma.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
  106. ctx, cancel := context.WithCancel(context.Background())
  107. defer cancel()
  108. stream, err := client.SubscribeLocalMetadata(ctx, &filer_pb.SubscribeMetadataRequest{
  109. ClientName: "filer:" + self,
  110. PathPrefix: "/",
  111. SinceNs: lastTsNs,
  112. })
  113. if err != nil {
  114. return fmt.Errorf("subscribe: %v", err)
  115. }
  116. for {
  117. resp, listenErr := stream.Recv()
  118. if listenErr == io.EOF {
  119. return nil
  120. }
  121. if listenErr != nil {
  122. return listenErr
  123. }
  124. if err := processEventFn(resp); err != nil {
  125. return fmt.Errorf("process %v: %v", resp, err)
  126. }
  127. lastTsNs = resp.TsNs
  128. f.onMetadataChangeEvent(resp)
  129. }
  130. })
  131. if err != nil {
  132. glog.V(0).Infof("subscribing remote %s meta change: %v", peer, err)
  133. time.Sleep(1733 * time.Millisecond)
  134. }
  135. }
  136. }
  137. func (ma *MetaAggregator) readFilerStoreSignature(peer string) (sig int32, err error) {
  138. err = pb.WithFilerClient(peer, ma.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
  139. resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{})
  140. if err != nil {
  141. return err
  142. }
  143. sig = resp.Signature
  144. return nil
  145. })
  146. return
  147. }
  148. const (
  149. MetaOffsetPrefix = "Meta"
  150. )
  151. func (ma *MetaAggregator) readOffset(f *Filer, peer string, peerSignature int32) (lastTsNs int64, err error) {
  152. key := []byte(MetaOffsetPrefix + "xxxx")
  153. util.Uint32toBytes(key[len(MetaOffsetPrefix):], uint32(peerSignature))
  154. value, err := f.Store.KvGet(context.Background(), key)
  155. if err == ErrKvNotFound {
  156. glog.Warningf("readOffset %s not found", peer)
  157. return 0, nil
  158. }
  159. if err != nil {
  160. return 0, fmt.Errorf("readOffset %s : %v", peer, err)
  161. }
  162. lastTsNs = int64(util.BytesToUint64(value))
  163. glog.V(0).Infof("readOffset %s : %d", peer, lastTsNs)
  164. return
  165. }
  166. func (ma *MetaAggregator) updateOffset(f *Filer, peer string, peerSignature int32, lastTsNs int64) (err error) {
  167. key := []byte(MetaOffsetPrefix + "xxxx")
  168. util.Uint32toBytes(key[len(MetaOffsetPrefix):], uint32(peerSignature))
  169. value := make([]byte, 8)
  170. util.Uint64toBytes(value, uint64(lastTsNs))
  171. err = f.Store.KvPut(context.Background(), key, value)
  172. if err != nil {
  173. return fmt.Errorf("updateOffset %s : %v", peer, err)
  174. }
  175. glog.V(4).Infof("updateOffset %s : %d", peer, lastTsNs)
  176. return
  177. }