commands.go 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186
  1. package shell
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/seaweedfs/seaweedfs/weed/operation"
  6. "github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
  7. "github.com/seaweedfs/seaweedfs/weed/storage/needle_map"
  8. "io"
  9. "net/url"
  10. "strconv"
  11. "strings"
  12. "google.golang.org/grpc"
  13. "github.com/seaweedfs/seaweedfs/weed/pb"
  14. "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
  15. "github.com/seaweedfs/seaweedfs/weed/util"
  16. "github.com/seaweedfs/seaweedfs/weed/wdclient"
  17. "github.com/seaweedfs/seaweedfs/weed/wdclient/exclusive_locks"
  18. )
  19. type ShellOptions struct {
  20. Masters *string
  21. GrpcDialOption grpc.DialOption
  22. // shell transient context
  23. FilerHost string
  24. FilerPort int64
  25. FilerGroup *string
  26. FilerAddress pb.ServerAddress
  27. Directory string
  28. }
  29. type CommandEnv struct {
  30. env map[string]string
  31. MasterClient *wdclient.MasterClient
  32. option *ShellOptions
  33. locker *exclusive_locks.ExclusiveLocker
  34. }
  35. type command interface {
  36. Name() string
  37. Help() string
  38. Do([]string, *CommandEnv, io.Writer) error
  39. }
  40. var (
  41. Commands = []command{}
  42. )
  43. func NewCommandEnv(options *ShellOptions) *CommandEnv {
  44. ce := &CommandEnv{
  45. env: make(map[string]string),
  46. MasterClient: wdclient.NewMasterClient(options.GrpcDialOption, *options.FilerGroup, pb.AdminShellClient, "", "", "", pb.ServerAddresses(*options.Masters).ToAddressMap()),
  47. option: options,
  48. }
  49. ce.locker = exclusive_locks.NewExclusiveLocker(ce.MasterClient, "admin")
  50. return ce
  51. }
  52. func (ce *CommandEnv) parseUrl(input string) (path string, err error) {
  53. if strings.HasPrefix(input, "http") {
  54. err = fmt.Errorf("http://<filer>:<port> prefix is not supported any more")
  55. return
  56. }
  57. if !strings.HasPrefix(input, "/") {
  58. input = util.Join(ce.option.Directory, input)
  59. }
  60. return input, err
  61. }
  62. func (ce *CommandEnv) isDirectory(path string) bool {
  63. return ce.checkDirectory(path) == nil
  64. }
  65. func (ce *CommandEnv) confirmIsLocked(args []string) error {
  66. if ce.locker.IsLocked() {
  67. return nil
  68. }
  69. ce.locker.SetMessage(fmt.Sprintf("%v", args))
  70. return fmt.Errorf("need to run \"lock\" first to continue")
  71. }
  72. func (ce *CommandEnv) isLocked() bool {
  73. if ce == nil {
  74. return true
  75. }
  76. return ce.locker.IsLocked()
  77. }
  78. func (ce *CommandEnv) checkDirectory(path string) error {
  79. dir, name := util.FullPath(path).DirAndName()
  80. exists, err := filer_pb.Exists(ce, dir, name, true)
  81. if !exists {
  82. return fmt.Errorf("%s is not a directory", path)
  83. }
  84. return err
  85. }
  86. var _ = filer_pb.FilerClient(&CommandEnv{})
  87. func (ce *CommandEnv) WithFilerClient(streamingMode bool, fn func(filer_pb.SeaweedFilerClient) error) error {
  88. return pb.WithGrpcFilerClient(streamingMode, 0, ce.option.FilerAddress, ce.option.GrpcDialOption, fn)
  89. }
  90. func (ce *CommandEnv) AdjustedUrl(location *filer_pb.Location) string {
  91. return location.Url
  92. }
  93. func (ce *CommandEnv) GetDataCenter() string {
  94. return ce.MasterClient.DataCenter
  95. }
  96. func parseFilerUrl(entryPath string) (filerServer string, filerPort int64, path string, err error) {
  97. if strings.HasPrefix(entryPath, "http") {
  98. var u *url.URL
  99. u, err = url.Parse(entryPath)
  100. if err != nil {
  101. return
  102. }
  103. filerServer = u.Hostname()
  104. portString := u.Port()
  105. if portString != "" {
  106. filerPort, err = strconv.ParseInt(portString, 10, 32)
  107. }
  108. path = u.Path
  109. } else {
  110. err = fmt.Errorf("path should have full url /path/to/dirOrFile : %s", entryPath)
  111. }
  112. return
  113. }
  114. func findInputDirectory(args []string) (input string) {
  115. input = "."
  116. if len(args) > 0 {
  117. input = args[len(args)-1]
  118. if strings.HasPrefix(input, "-") {
  119. input = "."
  120. }
  121. }
  122. return input
  123. }
  124. func readNeedleMeta(grpcDialOption grpc.DialOption, volumeServer pb.ServerAddress, volumeId uint32, needleValue needle_map.NeedleValue) (resp *volume_server_pb.ReadNeedleMetaResponse, err error) {
  125. err = operation.WithVolumeServerClient(false, volumeServer, grpcDialOption,
  126. func(client volume_server_pb.VolumeServerClient) error {
  127. if resp, err = client.ReadNeedleMeta(context.Background(), &volume_server_pb.ReadNeedleMetaRequest{
  128. VolumeId: volumeId,
  129. NeedleId: uint64(needleValue.Key),
  130. Offset: needleValue.Offset.ToActualOffset(),
  131. Size: int32(needleValue.Size),
  132. }); err != nil {
  133. return err
  134. }
  135. return nil
  136. },
  137. )
  138. return
  139. }
  140. func readNeedleStatus(grpcDialOption grpc.DialOption, sourceVolumeServer pb.ServerAddress, volumeId uint32, needleValue needle_map.NeedleValue) (resp *volume_server_pb.VolumeNeedleStatusResponse, err error) {
  141. err = operation.WithVolumeServerClient(false, sourceVolumeServer, grpcDialOption,
  142. func(client volume_server_pb.VolumeServerClient) error {
  143. if resp, err = client.VolumeNeedleStatus(context.Background(), &volume_server_pb.VolumeNeedleStatusRequest{
  144. VolumeId: volumeId,
  145. NeedleId: uint64(needleValue.Key),
  146. }); err != nil {
  147. return err
  148. }
  149. return nil
  150. },
  151. )
  152. return
  153. }