meta_aggregator.go 8.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285
  1. package filer
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/seaweedfs/seaweedfs/weed/cluster"
  6. "github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
  7. "github.com/seaweedfs/seaweedfs/weed/util"
  8. "io"
  9. "strings"
  10. "sync"
  11. "sync/atomic"
  12. "time"
  13. "google.golang.org/grpc"
  14. "google.golang.org/protobuf/proto"
  15. "github.com/seaweedfs/seaweedfs/weed/glog"
  16. "github.com/seaweedfs/seaweedfs/weed/pb"
  17. "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
  18. "github.com/seaweedfs/seaweedfs/weed/util/log_buffer"
  19. )
  20. type MetaAggregator struct {
  21. filer *Filer
  22. self pb.ServerAddress
  23. isLeader bool
  24. grpcDialOption grpc.DialOption
  25. MetaLogBuffer *log_buffer.LogBuffer
  26. peerStatues map[pb.ServerAddress]int
  27. peerStatuesLock sync.Mutex
  28. // notifying clients
  29. ListenersLock sync.Mutex
  30. ListenersCond *sync.Cond
  31. }
  32. // MetaAggregator only aggregates data "on the fly". The logs are not re-persisted to disk.
  33. // The old data comes from what each LocalMetadata persisted on disk.
  34. func NewMetaAggregator(filer *Filer, self pb.ServerAddress, grpcDialOption grpc.DialOption) *MetaAggregator {
  35. t := &MetaAggregator{
  36. filer: filer,
  37. self: self,
  38. grpcDialOption: grpcDialOption,
  39. peerStatues: make(map[pb.ServerAddress]int),
  40. }
  41. t.ListenersCond = sync.NewCond(&t.ListenersLock)
  42. t.MetaLogBuffer = log_buffer.NewLogBuffer("aggr", LogFlushInterval, nil, func() {
  43. t.ListenersCond.Broadcast()
  44. })
  45. return t
  46. }
  47. func (ma *MetaAggregator) OnPeerUpdate(update *master_pb.ClusterNodeUpdate, startFrom time.Time) {
  48. if update.NodeType != cluster.FilerType {
  49. return
  50. }
  51. address := pb.ServerAddress(update.Address)
  52. if update.IsAdd {
  53. // every filer should subscribe to a new filer
  54. if ma.setActive(address, true) {
  55. go ma.loopSubscribeToOneFiler(ma.filer, ma.self, address, startFrom)
  56. }
  57. } else {
  58. ma.setActive(address, false)
  59. }
  60. }
  61. func (ma *MetaAggregator) setActive(address pb.ServerAddress, isActive bool) (notDuplicated bool) {
  62. ma.peerStatuesLock.Lock()
  63. defer ma.peerStatuesLock.Unlock()
  64. if isActive {
  65. if _, found := ma.peerStatues[address]; found {
  66. ma.peerStatues[address] += 1
  67. } else {
  68. ma.peerStatues[address] = 1
  69. notDuplicated = true
  70. }
  71. } else {
  72. if _, found := ma.peerStatues[address]; found {
  73. delete(ma.peerStatues, address)
  74. }
  75. }
  76. return
  77. }
  78. func (ma *MetaAggregator) isActive(address pb.ServerAddress) (isActive bool) {
  79. ma.peerStatuesLock.Lock()
  80. defer ma.peerStatuesLock.Unlock()
  81. var count int
  82. count, isActive = ma.peerStatues[address]
  83. return count > 0 && isActive
  84. }
  85. func (ma *MetaAggregator) loopSubscribeToOneFiler(f *Filer, self pb.ServerAddress, peer pb.ServerAddress, startFrom time.Time) {
  86. lastTsNs := startFrom.UnixNano()
  87. for {
  88. glog.V(0).Infof("loopSubscribeToOneFiler read %s start from %v %d", peer, time.Unix(0, lastTsNs), lastTsNs)
  89. nextLastTsNs, err := ma.doSubscribeToOneFiler(f, self, peer, lastTsNs)
  90. if !ma.isActive(peer) {
  91. glog.V(0).Infof("stop subscribing remote %s meta change", peer)
  92. return
  93. }
  94. if err != nil {
  95. errLvl := glog.Level(0)
  96. if strings.Contains(err.Error(), "duplicated local subscription detected") {
  97. errLvl = glog.Level(4)
  98. }
  99. glog.V(errLvl).Infof("subscribing remote %s meta change: %v", peer, err)
  100. }
  101. if lastTsNs < nextLastTsNs {
  102. lastTsNs = nextLastTsNs
  103. }
  104. time.Sleep(1733 * time.Millisecond)
  105. }
  106. }
  107. func (ma *MetaAggregator) doSubscribeToOneFiler(f *Filer, self pb.ServerAddress, peer pb.ServerAddress, startFrom int64) (int64, error) {
  108. /*
  109. Each filer reads the "filer.store.id", which is the store's signature when filer starts.
  110. When reading from other filers' local meta changes:
  111. * if the received change does not contain signature from self, apply the change to current filer store.
  112. Upon connecting to other filers, need to remember their signature and their offsets.
  113. */
  114. var maybeReplicateMetadataChange func(*filer_pb.SubscribeMetadataResponse)
  115. lastPersistTime := time.Now()
  116. lastTsNs := startFrom
  117. peerSignature, err := ma.readFilerStoreSignature(peer)
  118. if err != nil {
  119. return lastTsNs, fmt.Errorf("connecting to peer filer %s: %v", peer, err)
  120. }
  121. // when filer store is not shared by multiple filers
  122. if peerSignature != f.Signature {
  123. if prevTsNs, err := ma.readOffset(f, peer, peerSignature); err == nil {
  124. lastTsNs = prevTsNs
  125. defer func(prevTsNs int64) {
  126. if lastTsNs != prevTsNs && lastTsNs != lastPersistTime.UnixNano() {
  127. if err := ma.updateOffset(f, peer, peerSignature, lastTsNs); err == nil {
  128. glog.V(0).Infof("last sync time with %s at %v (%d)", peer, time.Unix(0, lastTsNs), lastTsNs)
  129. } else {
  130. glog.Errorf("failed to save last sync time with %s at %v (%d)", peer, time.Unix(0, lastTsNs), lastTsNs)
  131. }
  132. }
  133. }(prevTsNs)
  134. }
  135. glog.V(0).Infof("follow peer: %v, last %v (%d)", peer, time.Unix(0, lastTsNs), lastTsNs)
  136. var counter int64
  137. var synced bool
  138. maybeReplicateMetadataChange = func(event *filer_pb.SubscribeMetadataResponse) {
  139. if err := Replay(f.Store, event); err != nil {
  140. glog.Errorf("failed to reply metadata change from %v: %v", peer, err)
  141. return
  142. }
  143. counter++
  144. if lastPersistTime.Add(time.Minute).Before(time.Now()) {
  145. if err := ma.updateOffset(f, peer, peerSignature, event.TsNs); err == nil {
  146. if event.TsNs < time.Now().Add(-2*time.Minute).UnixNano() {
  147. glog.V(0).Infof("sync with %s progressed to: %v %0.2f/sec", peer, time.Unix(0, event.TsNs), float64(counter)/60.0)
  148. } else if !synced {
  149. synced = true
  150. glog.V(0).Infof("synced with %s", peer)
  151. }
  152. lastPersistTime = time.Now()
  153. counter = 0
  154. } else {
  155. glog.V(0).Infof("failed to update offset for %v: %v", peer, err)
  156. }
  157. }
  158. }
  159. }
  160. processEventFn := func(event *filer_pb.SubscribeMetadataResponse) error {
  161. data, err := proto.Marshal(event)
  162. if err != nil {
  163. glog.Errorf("failed to marshal subscribed filer_pb.SubscribeMetadataResponse %+v: %v", event, err)
  164. return err
  165. }
  166. dir := event.Directory
  167. // println("received meta change", dir, "size", len(data))
  168. ma.MetaLogBuffer.AddToBuffer([]byte(dir), data, event.TsNs)
  169. if maybeReplicateMetadataChange != nil {
  170. maybeReplicateMetadataChange(event)
  171. }
  172. return nil
  173. }
  174. glog.V(0).Infof("subscribing remote %s meta change: %v, clientId:%d", peer, time.Unix(0, lastTsNs), ma.filer.UniqueFilerId)
  175. err = pb.WithFilerClient(true, 0, peer, ma.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
  176. ctx, cancel := context.WithCancel(context.Background())
  177. defer cancel()
  178. atomic.AddInt32(&ma.filer.UniqueFilerEpoch, 1)
  179. stream, err := client.SubscribeLocalMetadata(ctx, &filer_pb.SubscribeMetadataRequest{
  180. ClientName: "filer:" + string(self),
  181. PathPrefix: "/",
  182. SinceNs: lastTsNs,
  183. ClientId: ma.filer.UniqueFilerId,
  184. ClientEpoch: atomic.LoadInt32(&ma.filer.UniqueFilerEpoch),
  185. })
  186. if err != nil {
  187. return fmt.Errorf("subscribe: %v", err)
  188. }
  189. for {
  190. resp, listenErr := stream.Recv()
  191. if listenErr == io.EOF {
  192. return nil
  193. }
  194. if listenErr != nil {
  195. return listenErr
  196. }
  197. if err := processEventFn(resp); err != nil {
  198. return fmt.Errorf("process %v: %v", resp, err)
  199. }
  200. f.onMetadataChangeEvent(resp)
  201. lastTsNs = resp.TsNs
  202. }
  203. })
  204. return lastTsNs, err
  205. }
  206. func (ma *MetaAggregator) readFilerStoreSignature(peer pb.ServerAddress) (sig int32, err error) {
  207. err = pb.WithFilerClient(false, 0, peer, ma.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
  208. resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{})
  209. if err != nil {
  210. return err
  211. }
  212. sig = resp.Signature
  213. return nil
  214. })
  215. return
  216. }
  217. const (
  218. MetaOffsetPrefix = "Meta"
  219. )
  220. func GetPeerMetaOffsetKey(peerSignature int32) []byte {
  221. key := []byte(MetaOffsetPrefix + "xxxx")
  222. util.Uint32toBytes(key[len(MetaOffsetPrefix):], uint32(peerSignature))
  223. return key
  224. }
  225. func (ma *MetaAggregator) readOffset(f *Filer, peer pb.ServerAddress, peerSignature int32) (lastTsNs int64, err error) {
  226. key := GetPeerMetaOffsetKey(peerSignature)
  227. value, err := f.Store.KvGet(context.Background(), key)
  228. if err != nil {
  229. return 0, fmt.Errorf("readOffset %s : %v", peer, err)
  230. }
  231. lastTsNs = int64(util.BytesToUint64(value))
  232. glog.V(0).Infof("readOffset %s : %d", peer, lastTsNs)
  233. return
  234. }
  235. func (ma *MetaAggregator) updateOffset(f *Filer, peer pb.ServerAddress, peerSignature int32, lastTsNs int64) (err error) {
  236. key := GetPeerMetaOffsetKey(peerSignature)
  237. value := make([]byte, 8)
  238. util.Uint64toBytes(value, uint64(lastTsNs))
  239. err = f.Store.KvPut(context.Background(), key, value)
  240. if err != nil {
  241. return fmt.Errorf("updateOffset %s : %v", peer, err)
  242. }
  243. glog.V(4).Infof("updateOffset %s : %d", peer, lastTsNs)
  244. return
  245. }