store.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383
  1. package storage
  2. import (
  3. "fmt"
  4. "path/filepath"
  5. "strings"
  6. "sync/atomic"
  7. "google.golang.org/grpc"
  8. "github.com/chrislusf/seaweedfs/weed/glog"
  9. "github.com/chrislusf/seaweedfs/weed/pb"
  10. "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
  11. "github.com/chrislusf/seaweedfs/weed/stats"
  12. "github.com/chrislusf/seaweedfs/weed/storage/needle"
  13. "github.com/chrislusf/seaweedfs/weed/storage/super_block"
  14. . "github.com/chrislusf/seaweedfs/weed/storage/types"
  15. )
  16. const (
  17. MAX_TTL_VOLUME_REMOVAL_DELAY = 10 // 10 minutes
  18. )
  19. /*
  20. * A VolumeServer contains one Store
  21. */
  22. type Store struct {
  23. MasterAddress string
  24. grpcDialOption grpc.DialOption
  25. volumeSizeLimit uint64 //read from the master
  26. Ip string
  27. Port int
  28. PublicUrl string
  29. Locations []*DiskLocation
  30. dataCenter string //optional informaton, overwriting master setting if exists
  31. rack string //optional information, overwriting master setting if exists
  32. connected bool
  33. NeedleMapType NeedleMapType
  34. NewVolumesChan chan master_pb.VolumeShortInformationMessage
  35. DeletedVolumesChan chan master_pb.VolumeShortInformationMessage
  36. NewEcShardsChan chan master_pb.VolumeEcShardInformationMessage
  37. DeletedEcShardsChan chan master_pb.VolumeEcShardInformationMessage
  38. }
  39. func (s *Store) String() (str string) {
  40. str = fmt.Sprintf("Ip:%s, Port:%d, PublicUrl:%s, dataCenter:%s, rack:%s, connected:%v, volumeSizeLimit:%d", s.Ip, s.Port, s.PublicUrl, s.dataCenter, s.rack, s.connected, s.GetVolumeSizeLimit())
  41. return
  42. }
  43. func NewStore(grpcDialOption grpc.DialOption, port int, ip, publicUrl string, dirnames []string, maxVolumeCounts []int, needleMapKind NeedleMapType) (s *Store) {
  44. s = &Store{grpcDialOption: grpcDialOption, Port: port, Ip: ip, PublicUrl: publicUrl, NeedleMapType: needleMapKind}
  45. s.Locations = make([]*DiskLocation, 0)
  46. for i := 0; i < len(dirnames); i++ {
  47. location := NewDiskLocation(dirnames[i], maxVolumeCounts[i])
  48. location.loadExistingVolumes(needleMapKind)
  49. s.Locations = append(s.Locations, location)
  50. stats.VolumeServerMaxVolumeCounter.Add(float64(maxVolumeCounts[i]))
  51. }
  52. s.NewVolumesChan = make(chan master_pb.VolumeShortInformationMessage, 3)
  53. s.DeletedVolumesChan = make(chan master_pb.VolumeShortInformationMessage, 3)
  54. s.NewEcShardsChan = make(chan master_pb.VolumeEcShardInformationMessage, 3)
  55. s.DeletedEcShardsChan = make(chan master_pb.VolumeEcShardInformationMessage, 3)
  56. return
  57. }
  58. func (s *Store) AddVolume(volumeId needle.VolumeId, collection string, needleMapKind NeedleMapType, replicaPlacement string, ttlString string, preallocate int64, MemoryMapMaxSizeMb uint32) error {
  59. rt, e := super_block.NewReplicaPlacementFromString(replicaPlacement)
  60. if e != nil {
  61. return e
  62. }
  63. ttl, e := needle.ReadTTL(ttlString)
  64. if e != nil {
  65. return e
  66. }
  67. e = s.addVolume(volumeId, collection, needleMapKind, rt, ttl, preallocate, MemoryMapMaxSizeMb)
  68. return e
  69. }
  70. func (s *Store) DeleteCollection(collection string) (e error) {
  71. for _, location := range s.Locations {
  72. e = location.DeleteCollectionFromDiskLocation(collection)
  73. if e != nil {
  74. return
  75. }
  76. // let the heartbeat send the list of volumes, instead of sending the deleted volume ids to DeletedVolumesChan
  77. }
  78. return
  79. }
  80. func (s *Store) findVolume(vid needle.VolumeId) *Volume {
  81. for _, location := range s.Locations {
  82. if v, found := location.FindVolume(vid); found {
  83. return v
  84. }
  85. }
  86. return nil
  87. }
  88. func (s *Store) FindFreeLocation() (ret *DiskLocation) {
  89. max := 0
  90. for _, location := range s.Locations {
  91. currentFreeCount := location.MaxVolumeCount - location.VolumesLen()
  92. if currentFreeCount > max {
  93. max = currentFreeCount
  94. ret = location
  95. }
  96. }
  97. return ret
  98. }
  99. func (s *Store) addVolume(vid needle.VolumeId, collection string, needleMapKind NeedleMapType, replicaPlacement *super_block.ReplicaPlacement, ttl *needle.TTL, preallocate int64, memoryMapMaxSizeMb uint32) error {
  100. if s.findVolume(vid) != nil {
  101. return fmt.Errorf("Volume Id %d already exists!", vid)
  102. }
  103. if location := s.FindFreeLocation(); location != nil {
  104. glog.V(0).Infof("In dir %s adds volume:%v collection:%s replicaPlacement:%v ttl:%v",
  105. location.Directory, vid, collection, replicaPlacement, ttl)
  106. if volume, err := NewVolume(location.Directory, collection, vid, needleMapKind, replicaPlacement, ttl, preallocate, memoryMapMaxSizeMb); err == nil {
  107. location.SetVolume(vid, volume)
  108. glog.V(0).Infof("add volume %d", vid)
  109. s.NewVolumesChan <- master_pb.VolumeShortInformationMessage{
  110. Id: uint32(vid),
  111. Collection: collection,
  112. ReplicaPlacement: uint32(replicaPlacement.Byte()),
  113. Version: uint32(volume.Version()),
  114. Ttl: ttl.ToUint32(),
  115. }
  116. return nil
  117. } else {
  118. return err
  119. }
  120. }
  121. return fmt.Errorf("No more free space left")
  122. }
  123. func (s *Store) VolumeInfos() []*VolumeInfo {
  124. var stats []*VolumeInfo
  125. for _, location := range s.Locations {
  126. location.volumesLock.RLock()
  127. for k, v := range location.volumes {
  128. s := &VolumeInfo{
  129. Id: needle.VolumeId(k),
  130. Size: v.ContentSize(),
  131. Collection: v.Collection,
  132. ReplicaPlacement: v.ReplicaPlacement,
  133. Version: v.Version(),
  134. FileCount: int(v.FileCount()),
  135. DeleteCount: int(v.DeletedCount()),
  136. DeletedByteCount: v.DeletedSize(),
  137. ReadOnly: v.noWriteOrDelete || v.noWriteCanDelete,
  138. Ttl: v.Ttl,
  139. CompactRevision: uint32(v.CompactionRevision),
  140. }
  141. s.RemoteStorageName, s.RemoteStorageKey = v.RemoteStorageNameKey()
  142. stats = append(stats, s)
  143. }
  144. location.volumesLock.RUnlock()
  145. }
  146. sortVolumeInfos(stats)
  147. return stats
  148. }
  149. func (s *Store) SetDataCenter(dataCenter string) {
  150. s.dataCenter = dataCenter
  151. }
  152. func (s *Store) SetRack(rack string) {
  153. s.rack = rack
  154. }
  155. func (s *Store) CollectHeartbeat() *master_pb.Heartbeat {
  156. var volumeMessages []*master_pb.VolumeInformationMessage
  157. maxVolumeCount := 0
  158. var maxFileKey NeedleId
  159. collectionVolumeSize := make(map[string]uint64)
  160. for _, location := range s.Locations {
  161. var deleteVids []needle.VolumeId
  162. maxVolumeCount = maxVolumeCount + location.MaxVolumeCount
  163. location.volumesLock.RLock()
  164. for _, v := range location.volumes {
  165. if maxFileKey < v.MaxFileKey() {
  166. maxFileKey = v.MaxFileKey()
  167. }
  168. if !v.expired(s.GetVolumeSizeLimit()) {
  169. volumeMessages = append(volumeMessages, v.ToVolumeInformationMessage())
  170. } else {
  171. if v.expiredLongEnough(MAX_TTL_VOLUME_REMOVAL_DELAY) {
  172. deleteVids = append(deleteVids, v.Id)
  173. } else {
  174. glog.V(0).Infoln("volume", v.Id, "is expired.")
  175. }
  176. }
  177. fileSize, _, _ := v.FileStat()
  178. collectionVolumeSize[v.Collection] += fileSize
  179. }
  180. location.volumesLock.RUnlock()
  181. if len(deleteVids) > 0 {
  182. // delete expired volumes.
  183. location.volumesLock.Lock()
  184. for _, vid := range deleteVids {
  185. location.deleteVolumeById(vid)
  186. glog.V(0).Infoln("volume", vid, "is deleted.")
  187. }
  188. location.volumesLock.Unlock()
  189. }
  190. }
  191. for col, size := range collectionVolumeSize {
  192. stats.VolumeServerDiskSizeGauge.WithLabelValues(col, "normal").Set(float64(size))
  193. }
  194. return &master_pb.Heartbeat{
  195. Ip: s.Ip,
  196. Port: uint32(s.Port),
  197. PublicUrl: s.PublicUrl,
  198. MaxVolumeCount: uint32(maxVolumeCount),
  199. MaxFileKey: NeedleIdToUint64(maxFileKey),
  200. DataCenter: s.dataCenter,
  201. Rack: s.rack,
  202. Volumes: volumeMessages,
  203. HasNoVolumes: len(volumeMessages) == 0,
  204. }
  205. }
  206. func (s *Store) Close() {
  207. for _, location := range s.Locations {
  208. location.Close()
  209. }
  210. }
  211. func (s *Store) WriteVolumeNeedle(i needle.VolumeId, n *needle.Needle) (size uint32, isUnchanged bool, err error) {
  212. if v := s.findVolume(i); v != nil {
  213. if v.noWriteOrDelete || v.noWriteCanDelete {
  214. err = fmt.Errorf("volume %d is read only", i)
  215. return
  216. }
  217. if MaxPossibleVolumeSize >= v.ContentSize()+uint64(needle.GetActualSize(size, v.Version())) {
  218. _, size, isUnchanged, err = v.writeNeedle(n)
  219. } else {
  220. err = fmt.Errorf("volume size limit %d exceeded! current size is %d", s.GetVolumeSizeLimit(), v.ContentSize())
  221. }
  222. return
  223. }
  224. glog.V(0).Infoln("volume", i, "not found!")
  225. err = fmt.Errorf("volume %d not found on %s:%d", i, s.Ip, s.Port)
  226. return
  227. }
  228. func (s *Store) DeleteVolumeNeedle(i needle.VolumeId, n *needle.Needle) (uint32, error) {
  229. if v := s.findVolume(i); v != nil {
  230. if v.noWriteOrDelete {
  231. return 0, fmt.Errorf("volume %d is read only", i)
  232. }
  233. if MaxPossibleVolumeSize >= v.ContentSize()+uint64(needle.GetActualSize(0, v.Version())) {
  234. return v.deleteNeedle(n)
  235. } else {
  236. return 0, fmt.Errorf("volume size limit %d exceeded! current size is %d", s.GetVolumeSizeLimit(), v.ContentSize())
  237. }
  238. }
  239. return 0, fmt.Errorf("volume %d not found on %s:%d", i, s.Ip, s.Port)
  240. }
  241. func (s *Store) ReadVolumeNeedle(i needle.VolumeId, n *needle.Needle) (int, error) {
  242. if v := s.findVolume(i); v != nil {
  243. return v.readNeedle(n)
  244. }
  245. return 0, fmt.Errorf("volume %d not found", i)
  246. }
  247. func (s *Store) GetVolume(i needle.VolumeId) *Volume {
  248. return s.findVolume(i)
  249. }
  250. func (s *Store) HasVolume(i needle.VolumeId) bool {
  251. v := s.findVolume(i)
  252. return v != nil
  253. }
  254. func (s *Store) MarkVolumeReadonly(i needle.VolumeId) error {
  255. v := s.findVolume(i)
  256. if v == nil {
  257. return fmt.Errorf("volume %d not found", i)
  258. }
  259. v.noWriteOrDelete = true
  260. return nil
  261. }
  262. func (s *Store) MountVolume(i needle.VolumeId) error {
  263. for _, location := range s.Locations {
  264. if found := location.LoadVolume(i, s.NeedleMapType); found == true {
  265. glog.V(0).Infof("mount volume %d", i)
  266. v := s.findVolume(i)
  267. s.NewVolumesChan <- master_pb.VolumeShortInformationMessage{
  268. Id: uint32(v.Id),
  269. Collection: v.Collection,
  270. ReplicaPlacement: uint32(v.ReplicaPlacement.Byte()),
  271. Version: uint32(v.Version()),
  272. Ttl: v.Ttl.ToUint32(),
  273. }
  274. return nil
  275. }
  276. }
  277. return fmt.Errorf("volume %d not found on disk", i)
  278. }
  279. func (s *Store) UnmountVolume(i needle.VolumeId) error {
  280. v := s.findVolume(i)
  281. if v == nil {
  282. return nil
  283. }
  284. message := master_pb.VolumeShortInformationMessage{
  285. Id: uint32(v.Id),
  286. Collection: v.Collection,
  287. ReplicaPlacement: uint32(v.ReplicaPlacement.Byte()),
  288. Version: uint32(v.Version()),
  289. Ttl: v.Ttl.ToUint32(),
  290. }
  291. for _, location := range s.Locations {
  292. if err := location.UnloadVolume(i); err == nil {
  293. glog.V(0).Infof("UnmountVolume %d", i)
  294. s.DeletedVolumesChan <- message
  295. return nil
  296. }
  297. }
  298. return fmt.Errorf("volume %d not found on disk", i)
  299. }
  300. func (s *Store) DeleteVolume(i needle.VolumeId) error {
  301. v := s.findVolume(i)
  302. if v == nil {
  303. return nil
  304. }
  305. message := master_pb.VolumeShortInformationMessage{
  306. Id: uint32(v.Id),
  307. Collection: v.Collection,
  308. ReplicaPlacement: uint32(v.ReplicaPlacement.Byte()),
  309. Version: uint32(v.Version()),
  310. Ttl: v.Ttl.ToUint32(),
  311. }
  312. for _, location := range s.Locations {
  313. if error := location.deleteVolumeById(i); error == nil {
  314. glog.V(0).Infof("DeleteVolume %d", i)
  315. s.DeletedVolumesChan <- message
  316. return nil
  317. }
  318. }
  319. return fmt.Errorf("volume %d not found on disk", i)
  320. }
  321. func (s *Store) ConfigureVolume(i needle.VolumeId, replication string) error {
  322. for _, location := range s.Locations {
  323. fileInfo, found := location.LocateVolume(i)
  324. if !found {
  325. continue
  326. }
  327. // load, modify, save
  328. baseFileName := strings.TrimSuffix(fileInfo.Name(), filepath.Ext(fileInfo.Name()))
  329. vifFile := filepath.Join(location.Directory, baseFileName + ".vif")
  330. volumeInfo, _, err := pb.MaybeLoadVolumeInfo(vifFile)
  331. if err != nil {
  332. return fmt.Errorf("volume %d fail to load vif", i)
  333. }
  334. volumeInfo.Replication = replication
  335. err = pb.SaveVolumeInfo(vifFile, volumeInfo)
  336. if err != nil {
  337. return fmt.Errorf("volume %d fail to save vif", i)
  338. }
  339. return nil
  340. }
  341. return fmt.Errorf("volume %d not found on disk", i)
  342. }
  343. func (s *Store) SetVolumeSizeLimit(x uint64) {
  344. atomic.StoreUint64(&s.volumeSizeLimit, x)
  345. }
  346. func (s *Store) GetVolumeSizeLimit() uint64 {
  347. return atomic.LoadUint64(&s.volumeSizeLimit)
  348. }