etcd_sequencer.go 7.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296
  1. package sequence
  2. /*
  3. Note :
  4. (1) store the sequence in the ETCD cluster, and local file(sequence.dat)
  5. (2) batch get the sequences from ETCD cluster, and store the max sequence id in the local file
  6. (3) the sequence range is : [currentSeqId, maxSeqId), when the currentSeqId >= maxSeqId, fetch the new maxSeqId.
  7. */
  8. import (
  9. "context"
  10. "fmt"
  11. "sync"
  12. "time"
  13. "io"
  14. "os"
  15. "strconv"
  16. "strings"
  17. "github.com/chrislusf/seaweedfs/weed/util/log"
  18. "go.etcd.io/etcd/client"
  19. )
  20. const (
  21. // EtcdKeyPrefix = "/seaweedfs"
  22. EtcdKeySequence = "/master/sequence"
  23. EtcdContextTimeoutSecond = 100 * time.Second
  24. DefaultEtcdSteps uint64 = 500 // internal counter
  25. SequencerFileName = "sequencer.dat"
  26. FileMaxSequenceLength = 128
  27. )
  28. type EtcdSequencer struct {
  29. sequenceLock sync.Mutex
  30. // available sequence range : [currentSeqId, maxSeqId)
  31. currentSeqId uint64
  32. maxSeqId uint64
  33. keysAPI client.KeysAPI
  34. seqFile *os.File
  35. }
  36. func NewEtcdSequencer(etcdUrls string, metaFolder string) (*EtcdSequencer, error) {
  37. file, err := openSequenceFile(metaFolder + "/" + SequencerFileName)
  38. if nil != err {
  39. return nil, fmt.Errorf("open sequence file fialed, %v", err)
  40. }
  41. cli, err := client.New(client.Config{
  42. Endpoints: strings.Split(etcdUrls, ","),
  43. Username: "",
  44. Password: "",
  45. })
  46. if err != nil {
  47. return nil, err
  48. }
  49. keysApi := client.NewKeysAPI(cli)
  50. // TODO: the current sequence id in local file is not used
  51. maxValue, _, err := readSequenceFile(file)
  52. if err != nil {
  53. return nil, fmt.Errorf("read sequence from file failed, %v", err)
  54. }
  55. log.Tracef("read sequence from file : %d", maxValue)
  56. newSeq, err := setMaxSequenceToEtcd(keysApi, maxValue)
  57. if err != nil {
  58. return nil, err
  59. }
  60. sequencer := &EtcdSequencer{maxSeqId: newSeq,
  61. currentSeqId: newSeq,
  62. keysAPI: keysApi,
  63. seqFile: file,
  64. }
  65. return sequencer, nil
  66. }
  67. func (es *EtcdSequencer) NextFileId(count uint64) uint64 {
  68. es.sequenceLock.Lock()
  69. defer es.sequenceLock.Unlock()
  70. if (es.currentSeqId + count) >= es.maxSeqId {
  71. reqSteps := DefaultEtcdSteps
  72. if count > DefaultEtcdSteps {
  73. reqSteps += count
  74. }
  75. maxId, err := batchGetSequenceFromEtcd(es.keysAPI, reqSteps)
  76. log.Tracef("get max sequence id from etcd, %d", maxId)
  77. if err != nil {
  78. log.Error(err)
  79. return 0
  80. }
  81. es.currentSeqId, es.maxSeqId = maxId-reqSteps, maxId
  82. log.Tracef("current id : %d, max id : %d", es.currentSeqId, es.maxSeqId)
  83. if err := writeSequenceFile(es.seqFile, es.maxSeqId, es.currentSeqId); err != nil {
  84. log.Errorf("flush sequence to file failed, %v", err)
  85. }
  86. }
  87. ret := es.currentSeqId
  88. es.currentSeqId += count
  89. return ret
  90. }
  91. /**
  92. instead of collecting the max value from volume server,
  93. the max value should be saved in local config file and ETCD cluster
  94. */
  95. func (es *EtcdSequencer) SetMax(seenValue uint64) {
  96. es.sequenceLock.Lock()
  97. defer es.sequenceLock.Unlock()
  98. if seenValue > es.maxSeqId {
  99. maxId, err := setMaxSequenceToEtcd(es.keysAPI, seenValue)
  100. if err != nil {
  101. log.Errorf("set Etcd Max sequence failed : %v", err)
  102. return
  103. }
  104. es.currentSeqId, es.maxSeqId = maxId, maxId
  105. if err := writeSequenceFile(es.seqFile, maxId, maxId); err != nil {
  106. log.Errorf("flush sequence to file failed, %v", err)
  107. }
  108. }
  109. }
  110. func (es *EtcdSequencer) GetMax() uint64 {
  111. return es.maxSeqId
  112. }
  113. func (es *EtcdSequencer) Peek() uint64 {
  114. return es.currentSeqId
  115. }
  116. func batchGetSequenceFromEtcd(kvApi client.KeysAPI, step uint64) (uint64, error) {
  117. if step <= 0 {
  118. return 0, fmt.Errorf("the step must be large than 1")
  119. }
  120. ctx, cancel := context.WithTimeout(context.Background(), EtcdContextTimeoutSecond)
  121. var endSeqValue uint64 = 0
  122. defer cancel()
  123. for {
  124. getResp, err := kvApi.Get(ctx, EtcdKeySequence, &client.GetOptions{Recursive: false, Quorum: true})
  125. if err != nil {
  126. return 0, err
  127. }
  128. if getResp.Node == nil {
  129. continue
  130. }
  131. prevValue := getResp.Node.Value
  132. prevSeqValue, err := strconv.ParseUint(prevValue, 10, 64)
  133. if err != nil {
  134. return 0, fmt.Errorf("get sequence from etcd failed, %v", err)
  135. }
  136. endSeqValue = prevSeqValue + step
  137. endSeqStr := strconv.FormatUint(endSeqValue, 10)
  138. _, err = kvApi.Set(ctx, EtcdKeySequence, endSeqStr, &client.SetOptions{PrevValue: prevValue})
  139. if err == nil {
  140. break
  141. }
  142. log.Error(err)
  143. }
  144. return endSeqValue, nil
  145. }
  146. /**
  147. update the value of the key EtcdKeySequence in ETCD cluster with the parameter of maxSeq,
  148. when the value of the key EtcdKeySequence is equal to or large than the parameter maxSeq,
  149. return the value of EtcdKeySequence in the ETCD cluster;
  150. when the value of the EtcdKeySequence is less than the parameter maxSeq,
  151. return the value of the parameter maxSeq
  152. */
  153. func setMaxSequenceToEtcd(kvApi client.KeysAPI, maxSeq uint64) (uint64, error) {
  154. maxSeqStr := strconv.FormatUint(maxSeq, 10)
  155. ctx, cancel := context.WithTimeout(context.Background(), EtcdContextTimeoutSecond)
  156. defer cancel()
  157. for {
  158. getResp, err := kvApi.Get(ctx, EtcdKeySequence, &client.GetOptions{Recursive: false, Quorum: true})
  159. if err != nil {
  160. if ce, ok := err.(client.Error); ok && (ce.Code == client.ErrorCodeKeyNotFound) {
  161. _, err := kvApi.Create(ctx, EtcdKeySequence, maxSeqStr)
  162. if err == nil {
  163. continue
  164. }
  165. if ce, ok = err.(client.Error); ok && (ce.Code == client.ErrorCodeNodeExist) {
  166. continue
  167. }
  168. return 0, err
  169. } else {
  170. return 0, err
  171. }
  172. }
  173. if getResp.Node == nil {
  174. continue
  175. }
  176. prevSeqStr := getResp.Node.Value
  177. prevSeq, err := strconv.ParseUint(prevSeqStr, 10, 64)
  178. if err != nil {
  179. return 0, err
  180. }
  181. if prevSeq >= maxSeq {
  182. return prevSeq, nil
  183. }
  184. _, err = kvApi.Set(ctx, EtcdKeySequence, maxSeqStr, &client.SetOptions{PrevValue: prevSeqStr})
  185. if err != nil {
  186. return 0, err
  187. }
  188. }
  189. }
  190. func openSequenceFile(file string) (*os.File, error) {
  191. _, err := os.Stat(file)
  192. if os.IsNotExist(err) {
  193. fid, err := os.OpenFile(file, os.O_RDWR|os.O_CREATE, 0644)
  194. if err != nil {
  195. return nil, err
  196. }
  197. if err := writeSequenceFile(fid, 1, 0); err != nil {
  198. return nil, err
  199. }
  200. return fid, nil
  201. } else {
  202. return os.OpenFile(file, os.O_RDWR|os.O_CREATE, 0644)
  203. }
  204. }
  205. /*
  206. read sequence and step from sequence file
  207. */
  208. func readSequenceFile(file *os.File) (uint64, uint64, error) {
  209. sequence := make([]byte, FileMaxSequenceLength)
  210. size, err := file.ReadAt(sequence, 0)
  211. if (err != nil) && (err != io.EOF) {
  212. err := fmt.Errorf("cannot read file %s, %v", file.Name(), err)
  213. return 0, 0, err
  214. }
  215. sequence = sequence[0:size]
  216. seqs := strings.Split(string(sequence), ":")
  217. maxId, err := strconv.ParseUint(seqs[0], 10, 64)
  218. if err != nil {
  219. return 0, 0, fmt.Errorf("parse sequence from file failed, %v", err)
  220. }
  221. if len(seqs) > 1 {
  222. step, err := strconv.ParseUint(seqs[1], 10, 64)
  223. if err != nil {
  224. return 0, 0, fmt.Errorf("parse sequence from file failed, %v", err)
  225. }
  226. return maxId, step, nil
  227. }
  228. return maxId, 0, nil
  229. }
  230. /**
  231. write the sequence and step to sequence file
  232. */
  233. func writeSequenceFile(file *os.File, sequence, step uint64) error {
  234. _ = step
  235. seqStr := fmt.Sprintf("%d:%d", sequence, sequence)
  236. if _, err := file.Seek(0, 0); err != nil {
  237. err = fmt.Errorf("cannot seek to the beginning of %s: %v", file.Name(), err)
  238. return err
  239. }
  240. if err := file.Truncate(0); err != nil {
  241. return fmt.Errorf("truncate sequence file faield : %v", err)
  242. }
  243. if _, err := file.WriteString(seqStr); err != nil {
  244. return fmt.Errorf("write file %s failed, %v", file.Name(), err)
  245. }
  246. if err := file.Sync(); err != nil {
  247. return fmt.Errorf("flush file %s failed, %v", file.Name(), err)
  248. }
  249. return nil
  250. }
  251. // the UT helper method
  252. // func deleteEtcdKey(kvApi client.KeysAPI, key string) error {
  253. // ctx, cancel := context.WithTimeout(context.Background(), EtcdContextTimeoutSecond)
  254. // defer cancel()
  255. // _, err := kvApi.Delete(ctx, key, &client.DeleteOptions{Dir: false})
  256. // if err != nil {
  257. // return err
  258. // }
  259. // return nil
  260. // }