volume_loading.go 4.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148
  1. package storage
  2. import (
  3. "fmt"
  4. "os"
  5. "time"
  6. "github.com/chrislusf/seaweedfs/weed/stats"
  7. "github.com/chrislusf/seaweedfs/weed/storage/backend"
  8. "github.com/chrislusf/seaweedfs/weed/storage/needle"
  9. "github.com/syndtr/goleveldb/leveldb/opt"
  10. "github.com/chrislusf/seaweedfs/weed/glog"
  11. )
  12. func loadVolumeWithoutIndex(dirname string, collection string, id needle.VolumeId, needleMapKind NeedleMapType) (v *Volume, e error) {
  13. v = &Volume{dir: dirname, Collection: collection, Id: id}
  14. v.SuperBlock = SuperBlock{}
  15. v.needleMapKind = needleMapKind
  16. e = v.load(false, false, needleMapKind, 0)
  17. return
  18. }
  19. func (v *Volume) load(alsoLoadIndex bool, createDatIfMissing bool, needleMapKind NeedleMapType, preallocate int64) error {
  20. var e error
  21. fileName := v.FileName()
  22. alreadyHasSuperBlock := false
  23. // open dat file
  24. if exists, canRead, canWrite, modifiedTime, fileSize := checkFile(fileName + ".dat"); exists {
  25. if !canRead {
  26. return fmt.Errorf("cannot read Volume Data file %s.dat", fileName)
  27. }
  28. var dataFile *os.File
  29. if canWrite {
  30. dataFile, e = os.OpenFile(fileName+".dat", os.O_RDWR|os.O_CREATE, 0644)
  31. } else {
  32. glog.V(0).Infoln("opening " + fileName + ".dat in READONLY mode")
  33. dataFile, e = os.Open(fileName + ".dat")
  34. v.readOnly = true
  35. }
  36. v.lastModifiedTsSeconds = uint64(modifiedTime.Unix())
  37. if fileSize >= _SuperBlockSize {
  38. alreadyHasSuperBlock = true
  39. }
  40. v.DataBackend = backend.NewDiskFile(dataFile)
  41. } else {
  42. if createDatIfMissing {
  43. v.DataBackend, e = createVolumeFile(fileName+".dat", preallocate, v.MemoryMapMaxSizeMb)
  44. } else {
  45. return fmt.Errorf("Volume Data file %s.dat does not exist.", fileName)
  46. }
  47. }
  48. if e != nil {
  49. if !os.IsPermission(e) {
  50. return fmt.Errorf("cannot load Volume Data %s.dat: %v", fileName, e)
  51. } else {
  52. return fmt.Errorf("load data file %s.dat: %v", fileName, e)
  53. }
  54. }
  55. if alreadyHasSuperBlock {
  56. e = v.readSuperBlock()
  57. } else {
  58. if !v.SuperBlock.Initialized() {
  59. return fmt.Errorf("volume %s.dat not initialized", fileName)
  60. }
  61. e = v.maybeWriteSuperBlock()
  62. }
  63. if e == nil && alsoLoadIndex {
  64. var indexFile *os.File
  65. if v.readOnly {
  66. glog.V(1).Infoln("open to read file", fileName+".idx")
  67. if indexFile, e = os.OpenFile(fileName+".idx", os.O_RDONLY, 0644); e != nil {
  68. return fmt.Errorf("cannot read Volume Index %s.idx: %v", fileName, e)
  69. }
  70. } else {
  71. glog.V(1).Infoln("open to write file", fileName+".idx")
  72. if indexFile, e = os.OpenFile(fileName+".idx", os.O_RDWR|os.O_CREATE, 0644); e != nil {
  73. return fmt.Errorf("cannot write Volume Index %s.idx: %v", fileName, e)
  74. }
  75. }
  76. if v.lastAppendAtNs, e = CheckVolumeDataIntegrity(v, indexFile); e != nil {
  77. v.readOnly = true
  78. glog.V(0).Infof("volumeDataIntegrityChecking failed %v", e)
  79. }
  80. switch needleMapKind {
  81. case NeedleMapInMemory:
  82. glog.V(0).Infoln("loading index", fileName+".idx", "to memory readonly", v.readOnly)
  83. if v.nm, e = LoadCompactNeedleMap(indexFile); e != nil {
  84. glog.V(0).Infof("loading index %s to memory error: %v", fileName+".idx", e)
  85. }
  86. case NeedleMapLevelDb:
  87. glog.V(0).Infoln("loading leveldb", fileName+".ldb")
  88. opts := &opt.Options{
  89. BlockCacheCapacity: 2 * 1024 * 1024, // default value is 8MiB
  90. WriteBuffer: 1 * 1024 * 1024, // default value is 4MiB
  91. CompactionTableSizeMultiplier: 10, // default value is 1
  92. }
  93. if v.nm, e = NewLevelDbNeedleMap(fileName+".ldb", indexFile, opts); e != nil {
  94. glog.V(0).Infof("loading leveldb %s error: %v", fileName+".ldb", e)
  95. }
  96. case NeedleMapLevelDbMedium:
  97. glog.V(0).Infoln("loading leveldb medium", fileName+".ldb")
  98. opts := &opt.Options{
  99. BlockCacheCapacity: 4 * 1024 * 1024, // default value is 8MiB
  100. WriteBuffer: 2 * 1024 * 1024, // default value is 4MiB
  101. CompactionTableSizeMultiplier: 10, // default value is 1
  102. }
  103. if v.nm, e = NewLevelDbNeedleMap(fileName+".ldb", indexFile, opts); e != nil {
  104. glog.V(0).Infof("loading leveldb %s error: %v", fileName+".ldb", e)
  105. }
  106. case NeedleMapLevelDbLarge:
  107. glog.V(0).Infoln("loading leveldb large", fileName+".ldb")
  108. opts := &opt.Options{
  109. BlockCacheCapacity: 8 * 1024 * 1024, // default value is 8MiB
  110. WriteBuffer: 4 * 1024 * 1024, // default value is 4MiB
  111. CompactionTableSizeMultiplier: 10, // default value is 1
  112. }
  113. if v.nm, e = NewLevelDbNeedleMap(fileName+".ldb", indexFile, opts); e != nil {
  114. glog.V(0).Infof("loading leveldb %s error: %v", fileName+".ldb", e)
  115. }
  116. }
  117. }
  118. stats.VolumeServerVolumeCounter.WithLabelValues(v.Collection, "volume").Inc()
  119. return e
  120. }
  121. func checkFile(filename string) (exists, canRead, canWrite bool, modTime time.Time, fileSize int64) {
  122. exists = true
  123. fi, err := os.Stat(filename)
  124. if os.IsNotExist(err) {
  125. exists = false
  126. return
  127. }
  128. if fi.Mode()&0400 != 0 {
  129. canRead = true
  130. }
  131. if fi.Mode()&0200 != 0 {
  132. canWrite = true
  133. }
  134. modTime = fi.ModTime()
  135. fileSize = fi.Size()
  136. return
  137. }