disk_location.go 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206
  1. package storage
  2. import (
  3. "io/ioutil"
  4. "os"
  5. "strings"
  6. "sync"
  7. "fmt"
  8. "github.com/chrislusf/seaweedfs/weed/glog"
  9. )
  10. type DiskLocation struct {
  11. Directory string
  12. MaxVolumeCount int
  13. volumes map[VolumeId]*Volume
  14. sync.RWMutex
  15. }
  16. func NewDiskLocation(dir string, maxVolumeCount int) *DiskLocation {
  17. location := &DiskLocation{Directory: dir, MaxVolumeCount: maxVolumeCount}
  18. location.volumes = make(map[VolumeId]*Volume)
  19. return location
  20. }
  21. func (l *DiskLocation) volumeIdFromPath(dir os.FileInfo) (VolumeId, string, error) {
  22. name := dir.Name()
  23. if !dir.IsDir() && strings.HasSuffix(name, ".dat") {
  24. collection := ""
  25. base := name[:len(name)-len(".dat")]
  26. i := strings.LastIndex(base, "_")
  27. if i > 0 {
  28. collection, base = base[0:i], base[i+1:]
  29. }
  30. vol, err := NewVolumeId(base)
  31. return vol, collection, err
  32. }
  33. return 0, "", fmt.Errorf("Path is not a volume: %s", name)
  34. }
  35. func (l *DiskLocation) loadExistingVolume(dir os.FileInfo, needleMapKind NeedleMapType, mutex *sync.RWMutex) {
  36. name := dir.Name()
  37. if !dir.IsDir() && strings.HasSuffix(name, ".dat") {
  38. vid, collection, err := l.volumeIdFromPath(dir)
  39. if err == nil {
  40. mutex.RLock()
  41. _, found := l.volumes[vid]
  42. mutex.RUnlock()
  43. if !found {
  44. if v, e := NewVolume(l.Directory, collection, vid, needleMapKind, nil, nil, 0); e == nil {
  45. mutex.Lock()
  46. l.volumes[vid] = v
  47. mutex.Unlock()
  48. glog.V(0).Infof("data file %s, replicaPlacement=%s v=%d size=%d ttl=%s",
  49. l.Directory+"/"+name, v.ReplicaPlacement, v.Version(), v.Size(), v.Ttl.String())
  50. } else {
  51. glog.V(0).Infof("new volume %s error %s", name, e)
  52. }
  53. }
  54. }
  55. }
  56. }
  57. func (l *DiskLocation) concurrentLoadingVolumes(needleMapKind NeedleMapType, concurrentFlag bool) {
  58. var concurrency int
  59. if concurrentFlag {
  60. //You could choose a better optimized concurency value after testing at your environment
  61. concurrency = 10
  62. } else {
  63. concurrency = 1
  64. }
  65. task_queue := make(chan os.FileInfo, 10*concurrency)
  66. go func() {
  67. if dirs, err := ioutil.ReadDir(l.Directory); err == nil {
  68. for _, dir := range dirs {
  69. task_queue <- dir
  70. }
  71. }
  72. close(task_queue)
  73. }()
  74. var wg sync.WaitGroup
  75. var mutex sync.RWMutex
  76. for workerNum := 0; workerNum < concurrency; workerNum++ {
  77. wg.Add(1)
  78. go func() {
  79. defer wg.Done()
  80. for dir := range task_queue {
  81. l.loadExistingVolume(dir, needleMapKind, &mutex)
  82. }
  83. }()
  84. }
  85. wg.Wait()
  86. }
  87. func (l *DiskLocation) loadExistingVolumes(needleMapKind NeedleMapType) {
  88. l.Lock()
  89. defer l.Unlock()
  90. l.concurrentLoadingVolumes(needleMapKind, true)
  91. glog.V(0).Infoln("Store started on dir:", l.Directory, "with", len(l.volumes), "volumes", "max", l.MaxVolumeCount)
  92. }
  93. func (l *DiskLocation) DeleteCollectionFromDiskLocation(collection string) (e error) {
  94. l.Lock()
  95. defer l.Unlock()
  96. for k, v := range l.volumes {
  97. if v.Collection == collection {
  98. e = l.deleteVolumeById(k)
  99. if e != nil {
  100. return
  101. }
  102. }
  103. }
  104. return
  105. }
  106. func (l *DiskLocation) deleteVolumeById(vid VolumeId) (e error) {
  107. v, ok := l.volumes[vid]
  108. if !ok {
  109. return
  110. }
  111. e = v.Destroy()
  112. if e != nil {
  113. return
  114. }
  115. delete(l.volumes, vid)
  116. return
  117. }
  118. func (l *DiskLocation) LoadVolume(vid VolumeId, needleMapKind NeedleMapType) bool {
  119. if dirs, err := ioutil.ReadDir(l.Directory); err == nil {
  120. for _, dir := range dirs {
  121. volId, _, err := l.volumeIdFromPath(dir)
  122. if vid == volId && err == nil {
  123. var mutex sync.RWMutex
  124. l.loadExistingVolume(dir, needleMapKind, &mutex)
  125. return true
  126. }
  127. }
  128. }
  129. return false
  130. }
  131. func (l *DiskLocation) DeleteVolume(vid VolumeId) error {
  132. l.Lock()
  133. defer l.Unlock()
  134. _, ok := l.volumes[vid]
  135. if !ok {
  136. return fmt.Errorf("Volume not found, VolumeId: %d", vid)
  137. }
  138. return l.deleteVolumeById(vid)
  139. }
  140. func (l *DiskLocation) UnloadVolume(vid VolumeId) error {
  141. l.Lock()
  142. defer l.Unlock()
  143. v, ok := l.volumes[vid]
  144. if !ok {
  145. return fmt.Errorf("Volume not loaded, VolumeId: %d", vid)
  146. }
  147. v.Close()
  148. delete(l.volumes, vid)
  149. return nil
  150. }
  151. func (l *DiskLocation) SetVolume(vid VolumeId, volume *Volume) {
  152. l.Lock()
  153. defer l.Unlock()
  154. l.volumes[vid] = volume
  155. }
  156. func (l *DiskLocation) FindVolume(vid VolumeId) (*Volume, bool) {
  157. l.RLock()
  158. defer l.RUnlock()
  159. v, ok := l.volumes[vid]
  160. return v, ok
  161. }
  162. func (l *DiskLocation) VolumesLen() int {
  163. l.RLock()
  164. defer l.RUnlock()
  165. return len(l.volumes)
  166. }
  167. func (l *DiskLocation) Close() {
  168. l.Lock()
  169. defer l.Unlock()
  170. for _, v := range l.volumes {
  171. v.Close()
  172. }
  173. return
  174. }