command_s3_circuitbreaker.go 10.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362
  1. package shell
  2. import (
  3. "bytes"
  4. "flag"
  5. "fmt"
  6. "github.com/seaweedfs/seaweedfs/weed/filer"
  7. "github.com/seaweedfs/seaweedfs/weed/pb/s3_pb"
  8. "github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants"
  9. "io"
  10. "strconv"
  11. "strings"
  12. "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
  13. )
  14. var LoadConfig = loadConfig
  15. func init() {
  16. Commands = append(Commands, &commandS3CircuitBreaker{})
  17. }
  18. type commandS3CircuitBreaker struct {
  19. }
  20. func (c *commandS3CircuitBreaker) Name() string {
  21. return "s3.circuitBreaker"
  22. }
  23. func (c *commandS3CircuitBreaker) Help() string {
  24. return `configure and apply s3 circuit breaker options for each bucket
  25. # examples
  26. # add circuit breaker config for global
  27. s3.circuitBreaker -global -type count -actions Read,Write -values 500,200 -apply
  28. # disable global config
  29. s3.circuitBreaker -global -disable -apply
  30. # add circuit breaker config for buckets x,y,z
  31. s3.circuitBreaker -buckets x,y,z -type count -actions Read,Write -values 200,100 -apply
  32. # disable circuit breaker config of x
  33. s3.circuitBreaker -buckets x -disable -apply
  34. # delete circuit breaker config of x
  35. s3.circuitBreaker -buckets x -delete -apply
  36. # clear all circuit breaker config
  37. s3.circuitBreaker -delete -apply
  38. `
  39. }
  40. func (c *commandS3CircuitBreaker) HasTag(CommandTag) bool {
  41. return false
  42. }
  43. func (c *commandS3CircuitBreaker) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
  44. dir := s3_constants.CircuitBreakerConfigDir
  45. file := s3_constants.CircuitBreakerConfigFile
  46. s3CircuitBreakerCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
  47. buckets := s3CircuitBreakerCommand.String("buckets", "", "the bucket name(s) to configure, eg: -buckets x,y,z")
  48. global := s3CircuitBreakerCommand.Bool("global", false, "configure global circuit breaker")
  49. actions := s3CircuitBreakerCommand.String("actions", "", "comma separated actions names: Read,Write,List,Tagging,Admin")
  50. limitType := s3CircuitBreakerCommand.String("type", "", "'Count' or 'MB'; Count represents the number of simultaneous requests, and MB represents the content size of all simultaneous requests")
  51. values := s3CircuitBreakerCommand.String("values", "", "comma separated values")
  52. disabled := s3CircuitBreakerCommand.Bool("disable", false, "disable global or buckets circuit breaker")
  53. deleted := s3CircuitBreakerCommand.Bool("delete", false, "delete circuit breaker config")
  54. apply := s3CircuitBreakerCommand.Bool("apply", false, "update and apply current configuration")
  55. if err = s3CircuitBreakerCommand.Parse(args); err != nil {
  56. return nil
  57. }
  58. var buf bytes.Buffer
  59. err = LoadConfig(commandEnv, dir, file, &buf)
  60. if err != nil {
  61. return err
  62. }
  63. cbCfg := &s3_pb.S3CircuitBreakerConfig{
  64. Buckets: make(map[string]*s3_pb.S3CircuitBreakerOptions),
  65. }
  66. if buf.Len() > 0 {
  67. if err = filer.ParseS3ConfigurationFromBytes(buf.Bytes(), cbCfg); err != nil {
  68. return err
  69. }
  70. }
  71. if *deleted {
  72. cmdBuckets, cmdActions, _, err := c.initActionsAndValues(buckets, actions, limitType, values, true)
  73. if err != nil {
  74. return err
  75. }
  76. if len(cmdBuckets) <= 0 && !*global {
  77. if len(cmdActions) > 0 {
  78. deleteGlobalActions(cbCfg, cmdActions, limitType)
  79. if cbCfg.Buckets != nil {
  80. var allBuckets []string
  81. for bucket := range cbCfg.Buckets {
  82. allBuckets = append(allBuckets, bucket)
  83. }
  84. deleteBucketsActions(allBuckets, cbCfg, cmdActions, limitType)
  85. }
  86. } else {
  87. cbCfg.Global = nil
  88. cbCfg.Buckets = nil
  89. }
  90. } else {
  91. if len(cmdBuckets) > 0 {
  92. deleteBucketsActions(cmdBuckets, cbCfg, cmdActions, limitType)
  93. }
  94. if *global {
  95. deleteGlobalActions(cbCfg, cmdActions, nil)
  96. }
  97. }
  98. } else {
  99. cmdBuckets, cmdActions, cmdValues, err := c.initActionsAndValues(buckets, actions, limitType, values, *disabled)
  100. if err != nil {
  101. return err
  102. }
  103. if len(cmdActions) > 0 && len(*buckets) <= 0 && !*global {
  104. return fmt.Errorf("one of -global and -buckets must be specified")
  105. }
  106. if len(*buckets) > 0 {
  107. for _, bucket := range cmdBuckets {
  108. var cbOptions *s3_pb.S3CircuitBreakerOptions
  109. var exists bool
  110. if cbOptions, exists = cbCfg.Buckets[bucket]; !exists {
  111. cbOptions = &s3_pb.S3CircuitBreakerOptions{}
  112. cbCfg.Buckets[bucket] = cbOptions
  113. }
  114. cbOptions.Enabled = !*disabled
  115. if len(cmdActions) > 0 {
  116. err = insertOrUpdateValues(cbOptions, cmdActions, cmdValues, limitType)
  117. if err != nil {
  118. return err
  119. }
  120. }
  121. if len(cbOptions.Actions) <= 0 && !cbOptions.Enabled {
  122. delete(cbCfg.Buckets, bucket)
  123. }
  124. }
  125. }
  126. if *global {
  127. globalOptions := cbCfg.Global
  128. if globalOptions == nil {
  129. globalOptions = &s3_pb.S3CircuitBreakerOptions{Actions: make(map[string]int64, len(cmdActions))}
  130. cbCfg.Global = globalOptions
  131. }
  132. globalOptions.Enabled = !*disabled
  133. if len(cmdActions) > 0 {
  134. err = insertOrUpdateValues(globalOptions, cmdActions, cmdValues, limitType)
  135. if err != nil {
  136. return err
  137. }
  138. }
  139. if len(globalOptions.Actions) <= 0 && !globalOptions.Enabled {
  140. cbCfg.Global = nil
  141. }
  142. }
  143. }
  144. buf.Reset()
  145. err = filer.ProtoToText(&buf, cbCfg)
  146. if err != nil {
  147. return err
  148. }
  149. _, _ = fmt.Fprintf(writer, string(buf.Bytes()))
  150. _, _ = fmt.Fprintln(writer)
  151. if *apply {
  152. if err := commandEnv.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
  153. return filer.SaveInsideFiler(client, dir, file, buf.Bytes())
  154. }); err != nil {
  155. return err
  156. }
  157. }
  158. return nil
  159. }
  160. func loadConfig(commandEnv *CommandEnv, dir string, file string, buf *bytes.Buffer) error {
  161. if err := commandEnv.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
  162. return filer.ReadEntry(commandEnv.MasterClient, client, dir, file, buf)
  163. }); err != nil && err != filer_pb.ErrNotFound {
  164. return err
  165. }
  166. return nil
  167. }
  168. func insertOrUpdateValues(cbOptions *s3_pb.S3CircuitBreakerOptions, cmdActions []string, cmdValues []int64, limitType *string) error {
  169. if len(*limitType) == 0 {
  170. return fmt.Errorf("type not valid, only 'count' and 'bytes' are allowed")
  171. }
  172. if cbOptions.Actions == nil {
  173. cbOptions.Actions = make(map[string]int64, len(cmdActions))
  174. }
  175. if len(cmdValues) > 0 {
  176. for i, action := range cmdActions {
  177. cbOptions.Actions[s3_constants.Concat(action, *limitType)] = cmdValues[i]
  178. }
  179. }
  180. return nil
  181. }
  182. func deleteBucketsActions(cmdBuckets []string, cbCfg *s3_pb.S3CircuitBreakerConfig, cmdActions []string, limitType *string) {
  183. if cbCfg.Buckets == nil {
  184. return
  185. }
  186. if len(cmdActions) == 0 {
  187. for _, bucket := range cmdBuckets {
  188. delete(cbCfg.Buckets, bucket)
  189. }
  190. } else {
  191. for _, bucket := range cmdBuckets {
  192. if cbOption, ok := cbCfg.Buckets[bucket]; ok {
  193. if len(cmdActions) > 0 && cbOption.Actions != nil {
  194. for _, action := range cmdActions {
  195. delete(cbOption.Actions, s3_constants.Concat(action, *limitType))
  196. }
  197. }
  198. if len(cbOption.Actions) == 0 && !cbOption.Enabled {
  199. delete(cbCfg.Buckets, bucket)
  200. }
  201. }
  202. }
  203. }
  204. if len(cbCfg.Buckets) == 0 {
  205. cbCfg.Buckets = nil
  206. }
  207. }
  208. func deleteGlobalActions(cbCfg *s3_pb.S3CircuitBreakerConfig, cmdActions []string, limitType *string) {
  209. globalOptions := cbCfg.Global
  210. if globalOptions == nil {
  211. return
  212. }
  213. if len(cmdActions) == 0 && globalOptions.Actions != nil {
  214. globalOptions.Actions = nil
  215. return
  216. } else {
  217. for _, action := range cmdActions {
  218. delete(globalOptions.Actions, s3_constants.Concat(action, *limitType))
  219. }
  220. }
  221. if len(globalOptions.Actions) == 0 && !globalOptions.Enabled {
  222. cbCfg.Global = nil
  223. }
  224. }
  225. func (c *commandS3CircuitBreaker) initActionsAndValues(buckets, actions, limitType, values *string, parseValues bool) (cmdBuckets, cmdActions []string, cmdValues []int64, err error) {
  226. if len(*buckets) > 0 {
  227. cmdBuckets = strings.Split(*buckets, ",")
  228. }
  229. if len(*actions) > 0 {
  230. cmdActions = strings.Split(*actions, ",")
  231. //check action valid
  232. for _, action := range cmdActions {
  233. var found bool
  234. for _, allowedAction := range s3_constants.AllowedActions {
  235. if allowedAction == action {
  236. found = true
  237. }
  238. }
  239. if !found {
  240. return nil, nil, nil, fmt.Errorf("value(%s) of flag[-action] not valid, allowed actions: %v", *actions, s3_constants.AllowedActions)
  241. }
  242. }
  243. }
  244. if !parseValues {
  245. if len(cmdActions) < 0 {
  246. for _, action := range s3_constants.AllowedActions {
  247. cmdActions = append(cmdActions, action)
  248. }
  249. }
  250. if len(*limitType) > 0 {
  251. switch *limitType {
  252. case s3_constants.LimitTypeCount:
  253. elements := strings.Split(*values, ",")
  254. if len(cmdActions) != len(elements) {
  255. if len(elements) != 1 || len(elements) == 0 {
  256. return nil, nil, nil, fmt.Errorf("count of flag[-actions] and flag[-counts] not equal")
  257. }
  258. v, err := strconv.Atoi(elements[0])
  259. if err != nil {
  260. return nil, nil, nil, fmt.Errorf("value of -values must be a legal number(s)")
  261. }
  262. for range cmdActions {
  263. cmdValues = append(cmdValues, int64(v))
  264. }
  265. } else {
  266. for _, value := range elements {
  267. v, err := strconv.Atoi(value)
  268. if err != nil {
  269. return nil, nil, nil, fmt.Errorf("value of -values must be a legal number(s)")
  270. }
  271. cmdValues = append(cmdValues, int64(v))
  272. }
  273. }
  274. case s3_constants.LimitTypeBytes:
  275. elements := strings.Split(*values, ",")
  276. if len(cmdActions) != len(elements) {
  277. if len(elements) != 1 || len(elements) == 0 {
  278. return nil, nil, nil, fmt.Errorf("values count of -actions and -values not equal")
  279. }
  280. v, err := parseMBToBytes(elements[0])
  281. if err != nil {
  282. return nil, nil, nil, fmt.Errorf("value of -max must be a legal number(s)")
  283. }
  284. for range cmdActions {
  285. cmdValues = append(cmdValues, v)
  286. }
  287. } else {
  288. for _, value := range elements {
  289. v, err := parseMBToBytes(value)
  290. if err != nil {
  291. return nil, nil, nil, fmt.Errorf("value of -max must be a legal number(s)")
  292. }
  293. cmdValues = append(cmdValues, v)
  294. }
  295. }
  296. default:
  297. return nil, nil, nil, fmt.Errorf("type not valid, only 'count' and 'bytes' are allowed")
  298. }
  299. } else {
  300. *limitType = ""
  301. }
  302. }
  303. return cmdBuckets, cmdActions, cmdValues, nil
  304. }
  305. func parseMBToBytes(valStr string) (int64, error) {
  306. v, err := strconv.Atoi(valStr)
  307. v *= 1024 * 1024
  308. return int64(v), err
  309. }