collection.go 2.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100
  1. package topology
  2. import (
  3. "fmt"
  4. "github.com/seaweedfs/seaweedfs/weed/storage/types"
  5. "github.com/seaweedfs/seaweedfs/weed/storage/needle"
  6. "github.com/seaweedfs/seaweedfs/weed/storage/super_block"
  7. "github.com/seaweedfs/seaweedfs/weed/util"
  8. )
  9. type Collection struct {
  10. Name string
  11. volumeSizeLimit uint64
  12. replicationAsMin bool
  13. storageType2VolumeLayout *util.ConcurrentReadMap
  14. }
  15. func NewCollection(name string, volumeSizeLimit uint64, replicationAsMin bool) *Collection {
  16. c := &Collection{
  17. Name: name,
  18. volumeSizeLimit: volumeSizeLimit,
  19. replicationAsMin: replicationAsMin,
  20. }
  21. c.storageType2VolumeLayout = util.NewConcurrentReadMap()
  22. return c
  23. }
  24. func (c *Collection) String() string {
  25. return fmt.Sprintf("Name:%s, volumeSizeLimit:%d, storageType2VolumeLayout:%v", c.Name, c.volumeSizeLimit, c.storageType2VolumeLayout)
  26. }
  27. func (c *Collection) GetOrCreateVolumeLayout(rp *super_block.ReplicaPlacement, ttl *needle.TTL, diskType types.DiskType) *VolumeLayout {
  28. keyString := rp.String()
  29. if ttl != nil {
  30. keyString += ttl.String()
  31. }
  32. if diskType != types.HardDriveType {
  33. keyString += string(diskType)
  34. }
  35. vl := c.storageType2VolumeLayout.Get(keyString, func() interface{} {
  36. return NewVolumeLayout(rp, ttl, diskType, c.volumeSizeLimit, c.replicationAsMin)
  37. })
  38. return vl.(*VolumeLayout)
  39. }
  40. func (c *Collection) GetVolumeLayout(rp *super_block.ReplicaPlacement, ttl *needle.TTL, diskType types.DiskType) (*VolumeLayout, bool) {
  41. keyString := rp.String()
  42. if ttl != nil {
  43. keyString += ttl.String()
  44. }
  45. if diskType != types.HardDriveType {
  46. keyString += string(diskType)
  47. }
  48. vl, ok := c.storageType2VolumeLayout.Find(keyString)
  49. return vl.(*VolumeLayout), ok
  50. }
  51. func (c *Collection) GetAllVolumeLayouts() []*VolumeLayout {
  52. var vls []*VolumeLayout
  53. for _, vl := range c.storageType2VolumeLayout.Items() {
  54. if vl != nil {
  55. vls = append(vls, vl.(*VolumeLayout))
  56. }
  57. }
  58. return vls
  59. }
  60. func (c *Collection) DeleteVolumeLayout(rp *super_block.ReplicaPlacement, ttl *needle.TTL, diskType types.DiskType) {
  61. keyString := rp.String()
  62. if ttl != nil {
  63. keyString += ttl.String()
  64. }
  65. if diskType != types.HardDriveType {
  66. keyString += string(diskType)
  67. }
  68. c.storageType2VolumeLayout.Delete(keyString)
  69. }
  70. func (c *Collection) Lookup(vid needle.VolumeId) []*DataNode {
  71. for _, vl := range c.storageType2VolumeLayout.Items() {
  72. if vl != nil {
  73. if list := vl.(*VolumeLayout).Lookup(vid); list != nil {
  74. return list
  75. }
  76. }
  77. }
  78. return nil
  79. }
  80. func (c *Collection) ListVolumeServers() (nodes []*DataNode) {
  81. for _, vl := range c.storageType2VolumeLayout.Items() {
  82. if vl != nil {
  83. if list := vl.(*VolumeLayout).ListVolumeServers(); list != nil {
  84. nodes = append(nodes, list...)
  85. }
  86. }
  87. }
  88. return
  89. }