needle_map_leveldb.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459
  1. package storage
  2. import (
  3. "fmt"
  4. "os"
  5. "path/filepath"
  6. "strings"
  7. "sync"
  8. "time"
  9. "github.com/syndtr/goleveldb/leveldb/errors"
  10. "github.com/syndtr/goleveldb/leveldb/opt"
  11. "github.com/seaweedfs/seaweedfs/weed/storage/idx"
  12. "github.com/seaweedfs/seaweedfs/weed/util"
  13. "github.com/syndtr/goleveldb/leveldb"
  14. "github.com/seaweedfs/seaweedfs/weed/glog"
  15. "github.com/seaweedfs/seaweedfs/weed/storage/needle_map"
  16. . "github.com/seaweedfs/seaweedfs/weed/storage/types"
  17. )
  18. // mark it every watermarkBatchSize operations
  19. const watermarkBatchSize = 10000
  20. var watermarkKey = []byte("idx_entry_watermark")
  21. type LevelDbNeedleMap struct {
  22. baseNeedleMapper
  23. dbFileName string
  24. db *leveldb.DB
  25. ldbOpts *opt.Options
  26. ldbAccessLock sync.RWMutex
  27. exitChan chan bool
  28. // no need to use atomic
  29. accessFlag int64
  30. ldbTimeout int64
  31. recordCount uint64
  32. }
  33. func NewLevelDbNeedleMap(dbFileName string, indexFile *os.File, opts *opt.Options, ldbTimeout int64) (m *LevelDbNeedleMap, err error) {
  34. m = &LevelDbNeedleMap{dbFileName: dbFileName}
  35. m.indexFile = indexFile
  36. if !isLevelDbFresh(dbFileName, indexFile) {
  37. glog.V(1).Infof("Start to Generate %s from %s", dbFileName, indexFile.Name())
  38. generateLevelDbFile(dbFileName, indexFile)
  39. glog.V(1).Infof("Finished Generating %s from %s", dbFileName, indexFile.Name())
  40. }
  41. if stat, err := indexFile.Stat(); err != nil {
  42. glog.Fatalf("stat file %s: %v", indexFile.Name(), err)
  43. } else {
  44. m.indexFileOffset = stat.Size()
  45. }
  46. glog.V(1).Infof("Opening %s...", dbFileName)
  47. if m.ldbTimeout == 0 {
  48. if m.db, err = leveldb.OpenFile(dbFileName, opts); err != nil {
  49. if errors.IsCorrupted(err) {
  50. m.db, err = leveldb.RecoverFile(dbFileName, opts)
  51. }
  52. if err != nil {
  53. return
  54. }
  55. }
  56. glog.V(0).Infof("Loading %s... , watermark: %d", dbFileName, getWatermark(m.db))
  57. m.recordCount = uint64(m.indexFileOffset / NeedleMapEntrySize)
  58. watermark := (m.recordCount / watermarkBatchSize) * watermarkBatchSize
  59. err = setWatermark(m.db, watermark)
  60. if err != nil {
  61. glog.Fatalf("set watermark for %s error: %s\n", dbFileName, err)
  62. return
  63. }
  64. }
  65. mm, indexLoadError := newNeedleMapMetricFromIndexFile(indexFile)
  66. if indexLoadError != nil {
  67. return nil, indexLoadError
  68. }
  69. m.mapMetric = *mm
  70. m.ldbTimeout = ldbTimeout
  71. if m.ldbTimeout > 0 {
  72. m.ldbOpts = opts
  73. m.exitChan = make(chan bool, 1)
  74. m.accessFlag = 0
  75. go lazyLoadingRoutine(m)
  76. }
  77. return
  78. }
  79. func isLevelDbFresh(dbFileName string, indexFile *os.File) bool {
  80. // normally we always write to index file first
  81. dbLogFile, err := os.Open(filepath.Join(dbFileName, "LOG"))
  82. if err != nil {
  83. return false
  84. }
  85. defer dbLogFile.Close()
  86. dbStat, dbStatErr := dbLogFile.Stat()
  87. indexStat, indexStatErr := indexFile.Stat()
  88. if dbStatErr != nil || indexStatErr != nil {
  89. glog.V(0).Infof("Can not stat file: %v and %v", dbStatErr, indexStatErr)
  90. return false
  91. }
  92. return dbStat.ModTime().After(indexStat.ModTime())
  93. }
  94. func generateLevelDbFile(dbFileName string, indexFile *os.File) error {
  95. db, err := leveldb.OpenFile(dbFileName, nil)
  96. if err != nil {
  97. return err
  98. }
  99. defer db.Close()
  100. watermark := getWatermark(db)
  101. if stat, err := indexFile.Stat(); err != nil {
  102. glog.Fatalf("stat file %s: %v", indexFile.Name(), err)
  103. return err
  104. } else {
  105. if watermark*NeedleMapEntrySize > uint64(stat.Size()) {
  106. glog.Warningf("wrong watermark %d for filesize %d", watermark, stat.Size())
  107. }
  108. glog.V(0).Infof("generateLevelDbFile %s, watermark %d, num of entries:%d", dbFileName, watermark, (uint64(stat.Size())-watermark*NeedleMapEntrySize)/NeedleMapEntrySize)
  109. }
  110. return idx.WalkIndexFile(indexFile, watermark, func(key NeedleId, offset Offset, size Size) error {
  111. if !offset.IsZero() && size.IsValid() {
  112. levelDbWrite(db, key, offset, size, false, 0)
  113. } else {
  114. levelDbDelete(db, key)
  115. }
  116. return nil
  117. })
  118. }
  119. func (m *LevelDbNeedleMap) Get(key NeedleId) (element *needle_map.NeedleValue, ok bool) {
  120. bytes := make([]byte, NeedleIdSize)
  121. if m.ldbTimeout > 0 {
  122. m.ldbAccessLock.RLock()
  123. defer m.ldbAccessLock.RUnlock()
  124. loadErr := reloadLdb(m)
  125. if loadErr != nil {
  126. return nil, false
  127. }
  128. }
  129. NeedleIdToBytes(bytes[0:NeedleIdSize], key)
  130. data, err := m.db.Get(bytes, nil)
  131. if err != nil || len(data) != OffsetSize+SizeSize {
  132. return nil, false
  133. }
  134. offset := BytesToOffset(data[0:OffsetSize])
  135. size := BytesToSize(data[OffsetSize : OffsetSize+SizeSize])
  136. return &needle_map.NeedleValue{Key: key, Offset: offset, Size: size}, true
  137. }
  138. func (m *LevelDbNeedleMap) Put(key NeedleId, offset Offset, size Size) error {
  139. var oldSize Size
  140. var watermark uint64
  141. if m.ldbTimeout > 0 {
  142. m.ldbAccessLock.RLock()
  143. defer m.ldbAccessLock.RUnlock()
  144. loadErr := reloadLdb(m)
  145. if loadErr != nil {
  146. return loadErr
  147. }
  148. }
  149. if oldNeedle, ok := m.Get(key); ok {
  150. oldSize = oldNeedle.Size
  151. }
  152. m.logPut(key, oldSize, size)
  153. // write to index file first
  154. if err := m.appendToIndexFile(key, offset, size); err != nil {
  155. return fmt.Errorf("cannot write to indexfile %s: %v", m.indexFile.Name(), err)
  156. }
  157. m.recordCount++
  158. if m.recordCount%watermarkBatchSize != 0 {
  159. watermark = 0
  160. } else {
  161. watermark = (m.recordCount / watermarkBatchSize) * watermarkBatchSize
  162. glog.V(1).Infof("put cnt:%d for %s,watermark: %d", m.recordCount, m.dbFileName, watermark)
  163. }
  164. return levelDbWrite(m.db, key, offset, size, watermark == 0, watermark)
  165. }
  166. func getWatermark(db *leveldb.DB) uint64 {
  167. data, err := db.Get(watermarkKey, nil)
  168. if err != nil || len(data) != 8 {
  169. glog.V(1).Infof("read previous watermark from db: %v, %d", err, len(data))
  170. return 0
  171. }
  172. return util.BytesToUint64(data)
  173. }
  174. func setWatermark(db *leveldb.DB, watermark uint64) error {
  175. glog.V(3).Infof("set watermark %d", watermark)
  176. var wmBytes = make([]byte, 8)
  177. util.Uint64toBytes(wmBytes, watermark)
  178. if err := db.Put(watermarkKey, wmBytes, nil); err != nil {
  179. return fmt.Errorf("failed to setWatermark: %v", err)
  180. }
  181. return nil
  182. }
  183. func levelDbWrite(db *leveldb.DB, key NeedleId, offset Offset, size Size, updateWatermark bool, watermark uint64) error {
  184. bytes := needle_map.ToBytes(key, offset, size)
  185. if err := db.Put(bytes[0:NeedleIdSize], bytes[NeedleIdSize:NeedleIdSize+OffsetSize+SizeSize], nil); err != nil {
  186. return fmt.Errorf("failed to write leveldb: %v", err)
  187. }
  188. // set watermark
  189. if updateWatermark {
  190. return setWatermark(db, watermark)
  191. }
  192. return nil
  193. }
  194. func levelDbDelete(db *leveldb.DB, key NeedleId) error {
  195. bytes := make([]byte, NeedleIdSize)
  196. NeedleIdToBytes(bytes, key)
  197. return db.Delete(bytes, nil)
  198. }
  199. func (m *LevelDbNeedleMap) Delete(key NeedleId, offset Offset) error {
  200. var watermark uint64
  201. if m.ldbTimeout > 0 {
  202. m.ldbAccessLock.RLock()
  203. defer m.ldbAccessLock.RUnlock()
  204. loadErr := reloadLdb(m)
  205. if loadErr != nil {
  206. return loadErr
  207. }
  208. }
  209. oldNeedle, found := m.Get(key)
  210. if !found || oldNeedle.Size.IsDeleted() {
  211. return nil
  212. }
  213. m.logDelete(oldNeedle.Size)
  214. // write to index file first
  215. if err := m.appendToIndexFile(key, offset, TombstoneFileSize); err != nil {
  216. return err
  217. }
  218. m.recordCount++
  219. if m.recordCount%watermarkBatchSize != 0 {
  220. watermark = 0
  221. } else {
  222. watermark = (m.recordCount / watermarkBatchSize) * watermarkBatchSize
  223. }
  224. return levelDbWrite(m.db, key, oldNeedle.Offset, -oldNeedle.Size, watermark == 0, watermark)
  225. }
  226. func (m *LevelDbNeedleMap) Close() {
  227. if m.indexFile != nil {
  228. indexFileName := m.indexFile.Name()
  229. if err := m.indexFile.Sync(); err != nil {
  230. glog.Warningf("sync file %s failed: %v", indexFileName, err)
  231. }
  232. if err := m.indexFile.Close(); err != nil {
  233. glog.Warningf("close index file %s failed: %v", indexFileName, err)
  234. }
  235. }
  236. if m.db != nil {
  237. if err := m.db.Close(); err != nil {
  238. glog.Warningf("close levelDB failed: %v", err)
  239. }
  240. }
  241. if m.ldbTimeout > 0 {
  242. m.exitChan <- true
  243. }
  244. }
  245. func (m *LevelDbNeedleMap) Destroy() error {
  246. m.Close()
  247. os.Remove(m.indexFile.Name())
  248. return os.RemoveAll(m.dbFileName)
  249. }
  250. func (m *LevelDbNeedleMap) UpdateNeedleMap(v *Volume, indexFile *os.File, opts *opt.Options, ldbTimeout int64) error {
  251. if v.nm != nil {
  252. v.nm.Close()
  253. v.nm = nil
  254. }
  255. defer func() {
  256. if v.tmpNm != nil {
  257. v.tmpNm.Close()
  258. v.tmpNm = nil
  259. }
  260. }()
  261. levelDbFile := v.FileName(".ldb")
  262. m.indexFile = indexFile
  263. err := os.RemoveAll(levelDbFile)
  264. if err != nil {
  265. return err
  266. }
  267. if err = os.Rename(v.FileName(".cpldb"), levelDbFile); err != nil {
  268. return fmt.Errorf("rename %s: %v", levelDbFile, err)
  269. }
  270. db, err := leveldb.OpenFile(levelDbFile, opts)
  271. if err != nil {
  272. if errors.IsCorrupted(err) {
  273. db, err = leveldb.RecoverFile(levelDbFile, opts)
  274. }
  275. if err != nil {
  276. return err
  277. }
  278. }
  279. m.db = db
  280. stat, e := indexFile.Stat()
  281. if e != nil {
  282. glog.Fatalf("stat file %s: %v", indexFile.Name(), e)
  283. return e
  284. }
  285. m.indexFileOffset = stat.Size()
  286. m.recordCount = uint64(stat.Size() / NeedleMapEntrySize)
  287. //set watermark
  288. watermark := (m.recordCount / watermarkBatchSize) * watermarkBatchSize
  289. err = setWatermark(db, uint64(watermark))
  290. if err != nil {
  291. glog.Fatalf("setting watermark failed %s: %v", indexFile.Name(), err)
  292. return err
  293. }
  294. v.nm = m
  295. v.tmpNm = nil
  296. m.ldbTimeout = ldbTimeout
  297. if m.ldbTimeout > 0 {
  298. m.ldbOpts = opts
  299. m.exitChan = make(chan bool, 1)
  300. m.accessFlag = 0
  301. go lazyLoadingRoutine(m)
  302. }
  303. return e
  304. }
  305. func (m *LevelDbNeedleMap) DoOffsetLoading(v *Volume, indexFile *os.File, startFrom uint64) (err error) {
  306. glog.V(0).Infof("loading idx to leveldb from offset %d for file: %s", startFrom, indexFile.Name())
  307. dbFileName := v.FileName(".cpldb")
  308. db, dbErr := leveldb.OpenFile(dbFileName, nil)
  309. defer func() {
  310. if dbErr == nil {
  311. db.Close()
  312. }
  313. if err != nil {
  314. os.RemoveAll(dbFileName)
  315. }
  316. }()
  317. if dbErr != nil {
  318. if errors.IsCorrupted(err) {
  319. db, dbErr = leveldb.RecoverFile(dbFileName, nil)
  320. }
  321. if dbErr != nil {
  322. return dbErr
  323. }
  324. }
  325. err = idx.WalkIndexFile(indexFile, startFrom, func(key NeedleId, offset Offset, size Size) (e error) {
  326. m.mapMetric.FileCounter++
  327. bytes := make([]byte, NeedleIdSize)
  328. NeedleIdToBytes(bytes[0:NeedleIdSize], key)
  329. // fresh loading
  330. if startFrom == 0 {
  331. m.mapMetric.FileByteCounter += uint64(size)
  332. e = levelDbWrite(db, key, offset, size, false, 0)
  333. return e
  334. }
  335. // increment loading
  336. data, err := db.Get(bytes, nil)
  337. if err != nil {
  338. if !strings.Contains(strings.ToLower(err.Error()), "not found") {
  339. // unexpected error
  340. return err
  341. }
  342. // new needle, unlikely happen
  343. m.mapMetric.FileByteCounter += uint64(size)
  344. e = levelDbWrite(db, key, offset, size, false, 0)
  345. } else {
  346. // needle is found
  347. oldSize := BytesToSize(data[OffsetSize : OffsetSize+SizeSize])
  348. oldOffset := BytesToOffset(data[0:OffsetSize])
  349. if !offset.IsZero() && size.IsValid() {
  350. // updated needle
  351. m.mapMetric.FileByteCounter += uint64(size)
  352. if !oldOffset.IsZero() && oldSize.IsValid() {
  353. m.mapMetric.DeletionCounter++
  354. m.mapMetric.DeletionByteCounter += uint64(oldSize)
  355. }
  356. e = levelDbWrite(db, key, offset, size, false, 0)
  357. } else {
  358. // deleted needle
  359. m.mapMetric.DeletionCounter++
  360. m.mapMetric.DeletionByteCounter += uint64(oldSize)
  361. e = levelDbDelete(db, key)
  362. }
  363. }
  364. return e
  365. })
  366. return err
  367. }
  368. func reloadLdb(m *LevelDbNeedleMap) (err error) {
  369. if m.db != nil {
  370. return nil
  371. }
  372. glog.V(1).Infof("reloading leveldb %s", m.dbFileName)
  373. m.accessFlag = 1
  374. if m.db, err = leveldb.OpenFile(m.dbFileName, m.ldbOpts); err != nil {
  375. if errors.IsCorrupted(err) {
  376. m.db, err = leveldb.RecoverFile(m.dbFileName, m.ldbOpts)
  377. }
  378. if err != nil {
  379. glog.Fatalf("RecoverFile %s failed:%v", m.dbFileName, err)
  380. return err
  381. }
  382. }
  383. return nil
  384. }
  385. func unloadLdb(m *LevelDbNeedleMap) (err error) {
  386. m.ldbAccessLock.Lock()
  387. defer m.ldbAccessLock.Unlock()
  388. if m.db != nil {
  389. glog.V(1).Infof("reached max idle count, unload leveldb, %s", m.dbFileName)
  390. m.db.Close()
  391. m.db = nil
  392. }
  393. return nil
  394. }
  395. func lazyLoadingRoutine(m *LevelDbNeedleMap) (err error) {
  396. glog.V(1).Infof("lazyLoadingRoutine %s", m.dbFileName)
  397. var accessRecord int64
  398. accessRecord = 1
  399. for {
  400. select {
  401. case exit := <-m.exitChan:
  402. if exit {
  403. glog.V(1).Infof("exit from lazyLoadingRoutine")
  404. return nil
  405. }
  406. case <-time.After(time.Hour * 1):
  407. glog.V(1).Infof("timeout %s", m.dbFileName)
  408. if m.accessFlag == 0 {
  409. accessRecord++
  410. glog.V(1).Infof("accessRecord++")
  411. if accessRecord >= m.ldbTimeout {
  412. unloadLdb(m)
  413. }
  414. } else {
  415. glog.V(1).Infof("reset accessRecord %s", m.dbFileName)
  416. // reset accessRecord
  417. accessRecord = 0
  418. }
  419. continue
  420. }
  421. }
  422. }