collection.go 2.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778
  1. package topology
  2. import (
  3. "fmt"
  4. "github.com/seaweedfs/seaweedfs/weed/storage/types"
  5. "github.com/seaweedfs/seaweedfs/weed/storage/needle"
  6. "github.com/seaweedfs/seaweedfs/weed/storage/super_block"
  7. "github.com/seaweedfs/seaweedfs/weed/util"
  8. )
  9. type Collection struct {
  10. Name string
  11. volumeSizeLimit uint64
  12. replicationAsMin bool
  13. storageType2VolumeLayout *util.ConcurrentReadMap
  14. }
  15. func NewCollection(name string, volumeSizeLimit uint64, replicationAsMin bool) *Collection {
  16. c := &Collection{
  17. Name: name,
  18. volumeSizeLimit: volumeSizeLimit,
  19. replicationAsMin: replicationAsMin,
  20. }
  21. c.storageType2VolumeLayout = util.NewConcurrentReadMap()
  22. return c
  23. }
  24. func (c *Collection) String() string {
  25. return fmt.Sprintf("Name:%s, volumeSizeLimit:%d, storageType2VolumeLayout:%v", c.Name, c.volumeSizeLimit, c.storageType2VolumeLayout)
  26. }
  27. func (c *Collection) GetOrCreateVolumeLayout(rp *super_block.ReplicaPlacement, ttl *needle.TTL, diskType types.DiskType) *VolumeLayout {
  28. keyString := rp.String()
  29. if ttl != nil {
  30. keyString += ttl.String()
  31. }
  32. if diskType != types.HardDriveType {
  33. keyString += string(diskType)
  34. }
  35. vl := c.storageType2VolumeLayout.Get(keyString, func() interface{} {
  36. return NewVolumeLayout(rp, ttl, diskType, c.volumeSizeLimit, c.replicationAsMin)
  37. })
  38. return vl.(*VolumeLayout)
  39. }
  40. func (c *Collection) DeleteVolumeLayout(rp *super_block.ReplicaPlacement, ttl *needle.TTL, diskType types.DiskType) {
  41. keyString := rp.String()
  42. if ttl != nil {
  43. keyString += ttl.String()
  44. }
  45. if diskType != types.HardDriveType {
  46. keyString += string(diskType)
  47. }
  48. c.storageType2VolumeLayout.Delete(keyString)
  49. }
  50. func (c *Collection) Lookup(vid needle.VolumeId) []*DataNode {
  51. for _, vl := range c.storageType2VolumeLayout.Items() {
  52. if vl != nil {
  53. if list := vl.(*VolumeLayout).Lookup(vid); list != nil {
  54. return list
  55. }
  56. }
  57. }
  58. return nil
  59. }
  60. func (c *Collection) ListVolumeServers() (nodes []*DataNode) {
  61. for _, vl := range c.storageType2VolumeLayout.Items() {
  62. if vl != nil {
  63. if list := vl.(*VolumeLayout).ListVolumeServers(); list != nil {
  64. nodes = append(nodes, list...)
  65. }
  66. }
  67. }
  68. return
  69. }