volume_loading.go 7.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211
  1. package storage
  2. import (
  3. "fmt"
  4. "os"
  5. "github.com/seaweedfs/seaweedfs/weed/storage/types"
  6. "github.com/syndtr/goleveldb/leveldb/opt"
  7. "github.com/seaweedfs/seaweedfs/weed/glog"
  8. "github.com/seaweedfs/seaweedfs/weed/stats"
  9. "github.com/seaweedfs/seaweedfs/weed/storage/backend"
  10. "github.com/seaweedfs/seaweedfs/weed/storage/needle"
  11. "github.com/seaweedfs/seaweedfs/weed/storage/super_block"
  12. "github.com/seaweedfs/seaweedfs/weed/util"
  13. )
  14. func loadVolumeWithoutIndex(dirname string, collection string, id needle.VolumeId, needleMapKind NeedleMapKind) (v *Volume, err error) {
  15. v = &Volume{dir: dirname, Collection: collection, Id: id}
  16. v.SuperBlock = super_block.SuperBlock{}
  17. v.needleMapKind = needleMapKind
  18. err = v.load(false, false, needleMapKind, 0)
  19. return
  20. }
  21. func (v *Volume) load(alsoLoadIndex bool, createDatIfMissing bool, needleMapKind NeedleMapKind, preallocate int64) (err error) {
  22. alreadyHasSuperBlock := false
  23. hasLoadedVolume := false
  24. defer func() {
  25. if !hasLoadedVolume {
  26. if v.nm != nil {
  27. v.nm.Close()
  28. v.nm = nil
  29. }
  30. if v.DataBackend != nil {
  31. v.DataBackend.Close()
  32. v.DataBackend = nil
  33. }
  34. }
  35. }()
  36. hasVolumeInfoFile := v.maybeLoadVolumeInfo()
  37. if v.HasRemoteFile() {
  38. v.noWriteCanDelete = true
  39. v.noWriteOrDelete = false
  40. glog.V(0).Infof("loading volume %d from remote %v", v.Id, v.volumeInfo)
  41. v.LoadRemoteFile()
  42. alreadyHasSuperBlock = true
  43. } else if exists, canRead, canWrite, modifiedTime, fileSize := util.CheckFile(v.FileName(".dat")); exists {
  44. // open dat file
  45. if !canRead {
  46. return fmt.Errorf("cannot read Volume Data file %s", v.FileName(".dat"))
  47. }
  48. var dataFile *os.File
  49. if canWrite {
  50. dataFile, err = os.OpenFile(v.FileName(".dat"), os.O_RDWR|os.O_CREATE, 0644)
  51. } else {
  52. glog.V(0).Infof("opening %s in READONLY mode", v.FileName(".dat"))
  53. dataFile, err = os.Open(v.FileName(".dat"))
  54. v.noWriteOrDelete = true
  55. }
  56. v.lastModifiedTsSeconds = uint64(modifiedTime.Unix())
  57. if fileSize >= super_block.SuperBlockSize {
  58. alreadyHasSuperBlock = true
  59. }
  60. v.DataBackend = backend.NewDiskFile(dataFile)
  61. } else {
  62. if createDatIfMissing {
  63. v.DataBackend, err = backend.CreateVolumeFile(v.FileName(".dat"), preallocate, v.MemoryMapMaxSizeMb)
  64. } else {
  65. return fmt.Errorf("volume data file %s does not exist", v.FileName(".dat"))
  66. }
  67. }
  68. if err != nil {
  69. if !os.IsPermission(err) {
  70. return fmt.Errorf("cannot load volume data %s: %v", v.FileName(".dat"), err)
  71. } else {
  72. return fmt.Errorf("load data file %s: %v", v.FileName(".dat"), err)
  73. }
  74. }
  75. if alreadyHasSuperBlock {
  76. err = v.readSuperBlock()
  77. if err == nil {
  78. v.volumeInfo.Version = uint32(v.SuperBlock.Version)
  79. }
  80. glog.V(0).Infof("readSuperBlock volume %d version %v", v.Id, v.SuperBlock.Version)
  81. if v.HasRemoteFile() {
  82. // maybe temporary network problem
  83. glog.Errorf("readSuperBlock remote volume %d: %v", v.Id, err)
  84. err = nil
  85. }
  86. } else {
  87. if !v.SuperBlock.Initialized() {
  88. return fmt.Errorf("volume %s not initialized", v.FileName(".dat"))
  89. }
  90. err = v.maybeWriteSuperBlock()
  91. }
  92. if err == nil && alsoLoadIndex {
  93. // adjust for existing volumes with .idx together with .dat files
  94. if v.dirIdx != v.dir {
  95. if util.FileExists(v.DataFileName() + ".idx") {
  96. v.dirIdx = v.dir
  97. }
  98. }
  99. // check volume idx files
  100. if err := v.checkIdxFile(); err != nil {
  101. glog.Fatalf("check volume idx file %s: %v", v.FileName(".idx"), err)
  102. }
  103. var indexFile *os.File
  104. if v.noWriteOrDelete {
  105. glog.V(0).Infoln("open to read file", v.FileName(".idx"))
  106. if indexFile, err = os.OpenFile(v.FileName(".idx"), os.O_RDONLY, 0644); err != nil {
  107. return fmt.Errorf("cannot read Volume Index %s: %v", v.FileName(".idx"), err)
  108. }
  109. } else {
  110. glog.V(1).Infoln("open to write file", v.FileName(".idx"))
  111. if indexFile, err = os.OpenFile(v.FileName(".idx"), os.O_RDWR|os.O_CREATE, 0644); err != nil {
  112. return fmt.Errorf("cannot write Volume Index %s: %v", v.FileName(".idx"), err)
  113. }
  114. }
  115. if v.lastAppendAtNs, err = CheckAndFixVolumeDataIntegrity(v, indexFile); err != nil {
  116. v.noWriteOrDelete = true
  117. glog.V(0).Infof("volumeDataIntegrityChecking failed %v", err)
  118. }
  119. if v.noWriteOrDelete || v.noWriteCanDelete {
  120. if v.nm, err = NewSortedFileNeedleMap(v.IndexFileName(), indexFile); err != nil {
  121. glog.V(0).Infof("loading sorted db %s error: %v", v.FileName(".sdx"), err)
  122. }
  123. } else {
  124. switch needleMapKind {
  125. case NeedleMapInMemory:
  126. if v.tmpNm != nil {
  127. glog.V(0).Infof("updating memory compact index %s ", v.FileName(".idx"))
  128. err = v.tmpNm.UpdateNeedleMap(v, indexFile, nil, 0)
  129. } else {
  130. glog.V(0).Infoln("loading memory index", v.FileName(".idx"), "to memory")
  131. if v.nm, err = LoadCompactNeedleMap(indexFile); err != nil {
  132. glog.V(0).Infof("loading index %s to memory error: %v", v.FileName(".idx"), err)
  133. }
  134. }
  135. case NeedleMapLevelDb:
  136. opts := &opt.Options{
  137. BlockCacheCapacity: 2 * 1024 * 1024, // default value is 8MiB
  138. WriteBuffer: 1 * 1024 * 1024, // default value is 4MiB
  139. CompactionTableSizeMultiplier: 10, // default value is 1
  140. }
  141. if v.tmpNm != nil {
  142. glog.V(0).Infoln("updating leveldb index", v.FileName(".ldb"))
  143. err = v.tmpNm.UpdateNeedleMap(v, indexFile, opts, v.ldbTimeout)
  144. } else {
  145. glog.V(0).Infoln("loading leveldb index", v.FileName(".ldb"))
  146. if v.nm, err = NewLevelDbNeedleMap(v.FileName(".ldb"), indexFile, opts, v.ldbTimeout); err != nil {
  147. glog.V(0).Infof("loading leveldb %s error: %v", v.FileName(".ldb"), err)
  148. }
  149. }
  150. case NeedleMapLevelDbMedium:
  151. opts := &opt.Options{
  152. BlockCacheCapacity: 4 * 1024 * 1024, // default value is 8MiB
  153. WriteBuffer: 2 * 1024 * 1024, // default value is 4MiB
  154. CompactionTableSizeMultiplier: 10, // default value is 1
  155. }
  156. if v.tmpNm != nil {
  157. glog.V(0).Infoln("updating leveldb medium index", v.FileName(".ldb"))
  158. err = v.tmpNm.UpdateNeedleMap(v, indexFile, opts, v.ldbTimeout)
  159. } else {
  160. glog.V(0).Infoln("loading leveldb medium index", v.FileName(".ldb"))
  161. if v.nm, err = NewLevelDbNeedleMap(v.FileName(".ldb"), indexFile, opts, v.ldbTimeout); err != nil {
  162. glog.V(0).Infof("loading leveldb %s error: %v", v.FileName(".ldb"), err)
  163. }
  164. }
  165. case NeedleMapLevelDbLarge:
  166. opts := &opt.Options{
  167. BlockCacheCapacity: 8 * 1024 * 1024, // default value is 8MiB
  168. WriteBuffer: 4 * 1024 * 1024, // default value is 4MiB
  169. CompactionTableSizeMultiplier: 10, // default value is 1
  170. }
  171. if v.tmpNm != nil {
  172. glog.V(0).Infoln("updating leveldb large index", v.FileName(".ldb"))
  173. err = v.tmpNm.UpdateNeedleMap(v, indexFile, opts, v.ldbTimeout)
  174. } else {
  175. glog.V(0).Infoln("loading leveldb large index", v.FileName(".ldb"))
  176. if v.nm, err = NewLevelDbNeedleMap(v.FileName(".ldb"), indexFile, opts, v.ldbTimeout); err != nil {
  177. glog.V(0).Infof("loading leveldb %s error: %v", v.FileName(".ldb"), err)
  178. }
  179. }
  180. }
  181. }
  182. }
  183. if !hasVolumeInfoFile {
  184. v.volumeInfo.Version = uint32(v.SuperBlock.Version)
  185. v.volumeInfo.BytesOffset = uint32(types.OffsetSize)
  186. if err := v.SaveVolumeInfo(); err != nil {
  187. glog.Warningf("volume %d failed to save file info: %v", v.Id, err)
  188. }
  189. }
  190. stats.VolumeServerVolumeCounter.WithLabelValues(v.Collection, "volume").Inc()
  191. if err == nil {
  192. hasLoadedVolume = true
  193. }
  194. return err
  195. }