volume_grpc_admin.go 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150
  1. package weed_server
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/chrislusf/seaweedfs/weed/glog"
  6. "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
  7. "github.com/chrislusf/seaweedfs/weed/storage/needle"
  8. "github.com/chrislusf/seaweedfs/weed/storage/super_block"
  9. )
  10. func (vs *VolumeServer) DeleteCollection(ctx context.Context, req *volume_server_pb.DeleteCollectionRequest) (*volume_server_pb.DeleteCollectionResponse, error) {
  11. resp := &volume_server_pb.DeleteCollectionResponse{}
  12. err := vs.store.DeleteCollection(req.Collection)
  13. if err != nil {
  14. glog.Errorf("delete collection %s: %v", req.Collection, err)
  15. } else {
  16. glog.V(2).Infof("delete collection %v", req)
  17. }
  18. return resp, err
  19. }
  20. func (vs *VolumeServer) AllocateVolume(ctx context.Context, req *volume_server_pb.AllocateVolumeRequest) (*volume_server_pb.AllocateVolumeResponse, error) {
  21. resp := &volume_server_pb.AllocateVolumeResponse{}
  22. err := vs.store.AddVolume(
  23. needle.VolumeId(req.VolumeId),
  24. req.Collection,
  25. vs.needleMapKind,
  26. req.Replication,
  27. req.Ttl,
  28. req.Preallocate,
  29. req.MemoryMapMaxSizeMb,
  30. )
  31. if err != nil {
  32. glog.Errorf("assign volume %v: %v", req, err)
  33. } else {
  34. glog.V(2).Infof("assign volume %v", req)
  35. }
  36. return resp, err
  37. }
  38. func (vs *VolumeServer) VolumeMount(ctx context.Context, req *volume_server_pb.VolumeMountRequest) (*volume_server_pb.VolumeMountResponse, error) {
  39. resp := &volume_server_pb.VolumeMountResponse{}
  40. err := vs.store.MountVolume(needle.VolumeId(req.VolumeId))
  41. if err != nil {
  42. glog.Errorf("volume mount %v: %v", req, err)
  43. } else {
  44. glog.V(2).Infof("volume mount %v", req)
  45. }
  46. return resp, err
  47. }
  48. func (vs *VolumeServer) VolumeUnmount(ctx context.Context, req *volume_server_pb.VolumeUnmountRequest) (*volume_server_pb.VolumeUnmountResponse, error) {
  49. resp := &volume_server_pb.VolumeUnmountResponse{}
  50. err := vs.store.UnmountVolume(needle.VolumeId(req.VolumeId))
  51. if err != nil {
  52. glog.Errorf("volume unmount %v: %v", req, err)
  53. } else {
  54. glog.V(2).Infof("volume unmount %v", req)
  55. }
  56. return resp, err
  57. }
  58. func (vs *VolumeServer) VolumeDelete(ctx context.Context, req *volume_server_pb.VolumeDeleteRequest) (*volume_server_pb.VolumeDeleteResponse, error) {
  59. resp := &volume_server_pb.VolumeDeleteResponse{}
  60. err := vs.store.DeleteVolume(needle.VolumeId(req.VolumeId))
  61. if err != nil {
  62. glog.Errorf("volume delete %v: %v", req, err)
  63. } else {
  64. glog.V(2).Infof("volume delete %v", req)
  65. }
  66. return resp, err
  67. }
  68. func (vs *VolumeServer) VolumeConfigure(ctx context.Context, req *volume_server_pb.VolumeConfigureRequest) (*volume_server_pb.VolumeConfigureResponse, error) {
  69. resp := &volume_server_pb.VolumeConfigureResponse{}
  70. // check replication format
  71. if _, err := super_block.NewReplicaPlacementFromString(req.Replication); err != nil {
  72. resp.Error = fmt.Sprintf("volume configure replication %v: %v", req, err)
  73. return resp, nil
  74. }
  75. // unmount
  76. if err := vs.store.UnmountVolume(needle.VolumeId(req.VolumeId)); err != nil {
  77. glog.Errorf("volume configure unmount %v: %v", req, err)
  78. resp.Error = fmt.Sprintf("volume configure unmount %v: %v", req, err)
  79. return resp, nil
  80. }
  81. // modify the volume info file
  82. if err := vs.store.ConfigureVolume(needle.VolumeId(req.VolumeId), req.Replication); err != nil {
  83. glog.Errorf("volume configure %v: %v", req, err)
  84. resp.Error = fmt.Sprintf("volume configure %v: %v", req, err)
  85. return resp, nil
  86. }
  87. // mount
  88. if err := vs.store.MountVolume(needle.VolumeId(req.VolumeId)); err != nil {
  89. glog.Errorf("volume configure mount %v: %v", req, err)
  90. resp.Error = fmt.Sprintf("volume configure mount %v: %v", req, err)
  91. return resp, nil
  92. }
  93. return resp, nil
  94. }
  95. func (vs *VolumeServer) VolumeMarkReadonly(ctx context.Context, req *volume_server_pb.VolumeMarkReadonlyRequest) (*volume_server_pb.VolumeMarkReadonlyResponse, error) {
  96. resp := &volume_server_pb.VolumeMarkReadonlyResponse{}
  97. err := vs.store.MarkVolumeReadonly(needle.VolumeId(req.VolumeId))
  98. if err != nil {
  99. glog.Errorf("volume mark readonly %v: %v", req, err)
  100. } else {
  101. glog.V(2).Infof("volume mark readonly %v", req)
  102. }
  103. return resp, err
  104. }