meta_aggregator.go 8.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272
  1. package filer
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
  6. "github.com/seaweedfs/seaweedfs/weed/util"
  7. "io"
  8. "strings"
  9. "sync"
  10. "sync/atomic"
  11. "time"
  12. "google.golang.org/grpc"
  13. "google.golang.org/protobuf/proto"
  14. "github.com/seaweedfs/seaweedfs/weed/glog"
  15. "github.com/seaweedfs/seaweedfs/weed/pb"
  16. "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
  17. "github.com/seaweedfs/seaweedfs/weed/util/log_buffer"
  18. )
  19. type MetaAggregator struct {
  20. filer *Filer
  21. self pb.ServerAddress
  22. isLeader bool
  23. grpcDialOption grpc.DialOption
  24. MetaLogBuffer *log_buffer.LogBuffer
  25. peerChans map[pb.ServerAddress]chan struct{}
  26. peerChansLock sync.Mutex
  27. // notifying clients
  28. ListenersLock sync.Mutex
  29. ListenersCond *sync.Cond
  30. }
  31. // MetaAggregator only aggregates data "on the fly". The logs are not re-persisted to disk.
  32. // The old data comes from what each LocalMetadata persisted on disk.
  33. func NewMetaAggregator(filer *Filer, self pb.ServerAddress, grpcDialOption grpc.DialOption) *MetaAggregator {
  34. t := &MetaAggregator{
  35. filer: filer,
  36. self: self,
  37. grpcDialOption: grpcDialOption,
  38. peerChans: make(map[pb.ServerAddress]chan struct{}),
  39. }
  40. t.ListenersCond = sync.NewCond(&t.ListenersLock)
  41. t.MetaLogBuffer = log_buffer.NewLogBuffer("aggr", LogFlushInterval, nil, nil, func() {
  42. t.ListenersCond.Broadcast()
  43. })
  44. return t
  45. }
  46. func (ma *MetaAggregator) OnPeerUpdate(update *master_pb.ClusterNodeUpdate, startFrom time.Time) {
  47. ma.peerChansLock.Lock()
  48. defer ma.peerChansLock.Unlock()
  49. address := pb.ServerAddress(update.Address)
  50. if update.IsAdd {
  51. // cancel previous subscription if any
  52. if prevChan, found := ma.peerChans[address]; found {
  53. close(prevChan)
  54. }
  55. stopChan := make(chan struct{})
  56. ma.peerChans[address] = stopChan
  57. go ma.loopSubscribeToOneFiler(ma.filer, ma.self, address, startFrom, stopChan)
  58. } else {
  59. if prevChan, found := ma.peerChans[address]; found {
  60. close(prevChan)
  61. delete(ma.peerChans, address)
  62. }
  63. }
  64. }
  65. func (ma *MetaAggregator) loopSubscribeToOneFiler(f *Filer, self pb.ServerAddress, peer pb.ServerAddress, startFrom time.Time, stopChan chan struct{}) {
  66. lastTsNs := startFrom.UnixNano()
  67. for {
  68. glog.V(0).Infof("loopSubscribeToOneFiler read %s start from %v %d", peer, time.Unix(0, lastTsNs), lastTsNs)
  69. nextLastTsNs, err := ma.doSubscribeToOneFiler(f, self, peer, lastTsNs)
  70. // check stopChan to see if we should stop
  71. select {
  72. case <-stopChan:
  73. glog.V(0).Infof("stop subscribing peer %s meta change", peer)
  74. return
  75. default:
  76. }
  77. if err != nil {
  78. errLvl := glog.Level(0)
  79. if strings.Contains(err.Error(), "duplicated local subscription detected") {
  80. errLvl = glog.Level(4)
  81. }
  82. glog.V(errLvl).Infof("subscribing remote %s meta change: %v", peer, err)
  83. }
  84. if lastTsNs < nextLastTsNs {
  85. lastTsNs = nextLastTsNs
  86. }
  87. time.Sleep(1733 * time.Millisecond)
  88. }
  89. }
  90. func (ma *MetaAggregator) doSubscribeToOneFiler(f *Filer, self pb.ServerAddress, peer pb.ServerAddress, startFrom int64) (int64, error) {
  91. /*
  92. Each filer reads the "filer.store.id", which is the store's signature when filer starts.
  93. When reading from other filers' local meta changes:
  94. * if the received change does not contain signature from self, apply the change to current filer store.
  95. Upon connecting to other filers, need to remember their signature and their offsets.
  96. */
  97. var maybeReplicateMetadataChange func(*filer_pb.SubscribeMetadataResponse)
  98. lastPersistTime := time.Now()
  99. lastTsNs := startFrom
  100. peerSignature, err := ma.readFilerStoreSignature(peer)
  101. if err != nil {
  102. return lastTsNs, fmt.Errorf("connecting to peer filer %s: %v", peer, err)
  103. }
  104. // when filer store is not shared by multiple filers
  105. if peerSignature != f.Signature {
  106. if prevTsNs, err := ma.readOffset(f, peer, peerSignature); err == nil {
  107. lastTsNs = prevTsNs
  108. defer func(prevTsNs int64) {
  109. if lastTsNs != prevTsNs && lastTsNs != lastPersistTime.UnixNano() {
  110. if err := ma.updateOffset(f, peer, peerSignature, lastTsNs); err == nil {
  111. glog.V(0).Infof("last sync time with %s at %v (%d)", peer, time.Unix(0, lastTsNs), lastTsNs)
  112. } else {
  113. glog.Errorf("failed to save last sync time with %s at %v (%d)", peer, time.Unix(0, lastTsNs), lastTsNs)
  114. }
  115. }
  116. }(prevTsNs)
  117. }
  118. glog.V(0).Infof("follow peer: %v, last %v (%d)", peer, time.Unix(0, lastTsNs), lastTsNs)
  119. var counter int64
  120. var synced bool
  121. maybeReplicateMetadataChange = func(event *filer_pb.SubscribeMetadataResponse) {
  122. if err := Replay(f.Store, event); err != nil {
  123. glog.Errorf("failed to reply metadata change from %v: %v", peer, err)
  124. return
  125. }
  126. counter++
  127. if lastPersistTime.Add(time.Minute).Before(time.Now()) {
  128. if err := ma.updateOffset(f, peer, peerSignature, event.TsNs); err == nil {
  129. if event.TsNs < time.Now().Add(-2*time.Minute).UnixNano() {
  130. glog.V(0).Infof("sync with %s progressed to: %v %0.2f/sec", peer, time.Unix(0, event.TsNs), float64(counter)/60.0)
  131. } else if !synced {
  132. synced = true
  133. glog.V(0).Infof("synced with %s", peer)
  134. }
  135. lastPersistTime = time.Now()
  136. counter = 0
  137. } else {
  138. glog.V(0).Infof("failed to update offset for %v: %v", peer, err)
  139. }
  140. }
  141. }
  142. }
  143. processEventFn := func(event *filer_pb.SubscribeMetadataResponse) error {
  144. data, err := proto.Marshal(event)
  145. if err != nil {
  146. glog.Errorf("failed to marshal subscribed filer_pb.SubscribeMetadataResponse %+v: %v", event, err)
  147. return err
  148. }
  149. dir := event.Directory
  150. // println("received meta change", dir, "size", len(data))
  151. ma.MetaLogBuffer.AddDataToBuffer([]byte(dir), data, event.TsNs)
  152. if maybeReplicateMetadataChange != nil {
  153. maybeReplicateMetadataChange(event)
  154. }
  155. return nil
  156. }
  157. glog.V(0).Infof("subscribing remote %s meta change: %v, clientId:%d", peer, time.Unix(0, lastTsNs), ma.filer.UniqueFilerId)
  158. err = pb.WithFilerClient(true, 0, peer, ma.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
  159. ctx, cancel := context.WithCancel(context.Background())
  160. defer cancel()
  161. atomic.AddInt32(&ma.filer.UniqueFilerEpoch, 1)
  162. stream, err := client.SubscribeLocalMetadata(ctx, &filer_pb.SubscribeMetadataRequest{
  163. ClientName: "filer:" + string(self),
  164. PathPrefix: "/",
  165. SinceNs: lastTsNs,
  166. ClientId: ma.filer.UniqueFilerId,
  167. ClientEpoch: atomic.LoadInt32(&ma.filer.UniqueFilerEpoch),
  168. })
  169. if err != nil {
  170. glog.V(0).Infof("SubscribeLocalMetadata %v: %v", peer, err)
  171. return fmt.Errorf("subscribe: %v", err)
  172. }
  173. for {
  174. resp, listenErr := stream.Recv()
  175. if listenErr == io.EOF {
  176. return nil
  177. }
  178. if listenErr != nil {
  179. glog.V(0).Infof("SubscribeLocalMetadata stream %v: %v", peer, listenErr)
  180. return listenErr
  181. }
  182. if err := processEventFn(resp); err != nil {
  183. glog.V(0).Infof("SubscribeLocalMetadata process %v: %v", resp, err)
  184. return fmt.Errorf("process %v: %v", resp, err)
  185. }
  186. f.onMetadataChangeEvent(resp)
  187. lastTsNs = resp.TsNs
  188. }
  189. })
  190. return lastTsNs, err
  191. }
  192. func (ma *MetaAggregator) readFilerStoreSignature(peer pb.ServerAddress) (sig int32, err error) {
  193. err = pb.WithFilerClient(false, 0, peer, ma.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
  194. resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{})
  195. if err != nil {
  196. return err
  197. }
  198. sig = resp.Signature
  199. return nil
  200. })
  201. return
  202. }
  203. const (
  204. MetaOffsetPrefix = "Meta"
  205. )
  206. func GetPeerMetaOffsetKey(peerSignature int32) []byte {
  207. key := []byte(MetaOffsetPrefix + "xxxx")
  208. util.Uint32toBytes(key[len(MetaOffsetPrefix):], uint32(peerSignature))
  209. return key
  210. }
  211. func (ma *MetaAggregator) readOffset(f *Filer, peer pb.ServerAddress, peerSignature int32) (lastTsNs int64, err error) {
  212. key := GetPeerMetaOffsetKey(peerSignature)
  213. value, err := f.Store.KvGet(context.Background(), key)
  214. if err != nil {
  215. return 0, fmt.Errorf("readOffset %s : %v", peer, err)
  216. }
  217. lastTsNs = int64(util.BytesToUint64(value))
  218. glog.V(0).Infof("readOffset %s : %d", peer, lastTsNs)
  219. return
  220. }
  221. func (ma *MetaAggregator) updateOffset(f *Filer, peer pb.ServerAddress, peerSignature int32, lastTsNs int64) (err error) {
  222. key := GetPeerMetaOffsetKey(peerSignature)
  223. value := make([]byte, 8)
  224. util.Uint64toBytes(value, uint64(lastTsNs))
  225. err = f.Store.KvPut(context.Background(), key, value)
  226. if err != nil {
  227. return fmt.Errorf("updateOffset %s : %v", peer, err)
  228. }
  229. glog.V(4).Infof("updateOffset %s : %d", peer, lastTsNs)
  230. return
  231. }