command_ec_encode.go 9.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290
  1. package shell
  2. import (
  3. "context"
  4. "flag"
  5. "fmt"
  6. "io"
  7. "sync"
  8. "time"
  9. "google.golang.org/grpc"
  10. "github.com/chrislusf/seaweedfs/weed/operation"
  11. "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
  12. "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
  13. "github.com/chrislusf/seaweedfs/weed/storage/erasure_coding"
  14. "github.com/chrislusf/seaweedfs/weed/storage/needle"
  15. "github.com/chrislusf/seaweedfs/weed/wdclient"
  16. )
  17. func init() {
  18. Commands = append(Commands, &commandEcEncode{})
  19. }
  20. type commandEcEncode struct {
  21. }
  22. func (c *commandEcEncode) Name() string {
  23. return "ec.encode"
  24. }
  25. func (c *commandEcEncode) Help() string {
  26. return `apply erasure coding to a volume
  27. ec.encode [-collection=""] [-fullPercent=95] [-quietFor=1h]
  28. ec.encode [-collection=""] [-volumeId=<volume_id>]
  29. This command will:
  30. 1. freeze one volume
  31. 2. apply erasure coding to the volume
  32. 3. move the encoded shards to multiple volume servers
  33. The erasure coding is 10.4. So ideally you have more than 14 volume servers, and you can afford
  34. to lose 4 volume servers.
  35. If the number of volumes are not high, the worst case is that you only have 4 volume servers,
  36. and the shards are spread as 4,4,3,3, respectively. You can afford to lose one volume server.
  37. If you only have less than 4 volume servers, with erasure coding, at least you can afford to
  38. have 4 corrupted shard files.
  39. `
  40. }
  41. func (c *commandEcEncode) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
  42. encodeCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
  43. volumeId := encodeCommand.Int("volumeId", 0, "the volume id")
  44. collection := encodeCommand.String("collection", "", "the collection name")
  45. fullPercentage := encodeCommand.Float64("fullPercent", 95, "the volume reaches the percentage of max volume size")
  46. quietPeriod := encodeCommand.Duration("quietFor", time.Hour, "select volumes without no writes for this period")
  47. if err = encodeCommand.Parse(args); err != nil {
  48. return nil
  49. }
  50. ctx := context.Background()
  51. vid := needle.VolumeId(*volumeId)
  52. // volumeId is provided
  53. if vid != 0 {
  54. return doEcEncode(ctx, commandEnv, *collection, vid)
  55. }
  56. // apply to all volumes in the collection
  57. volumeIds, err := collectVolumeIdsForEcEncode(ctx, commandEnv, *collection, *fullPercentage, *quietPeriod)
  58. if err != nil {
  59. return err
  60. }
  61. fmt.Printf("ec encode volumes: %v\n", volumeIds)
  62. for _, vid := range volumeIds {
  63. if err = doEcEncode(ctx, commandEnv, *collection, vid); err != nil {
  64. return err
  65. }
  66. }
  67. return nil
  68. }
  69. func doEcEncode(ctx context.Context, commandEnv *CommandEnv, collection string, vid needle.VolumeId) (err error) {
  70. // find volume location
  71. locations, found := commandEnv.MasterClient.GetLocations(uint32(vid))
  72. if !found {
  73. return fmt.Errorf("volume %d not found", vid)
  74. }
  75. // fmt.Printf("found ec %d shards on %v\n", vid, locations)
  76. // mark the volume as readonly
  77. err = markVolumeReadonly(ctx, commandEnv.option.GrpcDialOption, needle.VolumeId(vid), locations)
  78. if err != nil {
  79. return fmt.Errorf("mark volume %d as readonly on %s: %v", vid, locations[0].Url, err)
  80. }
  81. // generate ec shards
  82. err = generateEcShards(ctx, commandEnv.option.GrpcDialOption, needle.VolumeId(vid), collection, locations[0].Url)
  83. if err != nil {
  84. return fmt.Errorf("generate ec shards for volume %d on %s: %v", vid, locations[0].Url, err)
  85. }
  86. // balance the ec shards to current cluster
  87. err = spreadEcShards(ctx, commandEnv, vid, collection, locations)
  88. if err != nil {
  89. return fmt.Errorf("spread ec shards for volume %d from %s: %v", vid, locations[0].Url, err)
  90. }
  91. return nil
  92. }
  93. func markVolumeReadonly(ctx context.Context, grpcDialOption grpc.DialOption, volumeId needle.VolumeId, locations []wdclient.Location) error {
  94. for _, location := range locations {
  95. err := operation.WithVolumeServerClient(location.Url, grpcDialOption, func(ctx context.Context, volumeServerClient volume_server_pb.VolumeServerClient) error {
  96. _, markErr := volumeServerClient.VolumeMarkReadonly(ctx, &volume_server_pb.VolumeMarkReadonlyRequest{
  97. VolumeId: uint32(volumeId),
  98. })
  99. return markErr
  100. })
  101. if err != nil {
  102. return err
  103. }
  104. }
  105. return nil
  106. }
  107. func generateEcShards(ctx context.Context, grpcDialOption grpc.DialOption, volumeId needle.VolumeId, collection string, sourceVolumeServer string) error {
  108. err := operation.WithVolumeServerClient(sourceVolumeServer, grpcDialOption, func(ctx context.Context, volumeServerClient volume_server_pb.VolumeServerClient) error {
  109. _, genErr := volumeServerClient.VolumeEcShardsGenerate(ctx, &volume_server_pb.VolumeEcShardsGenerateRequest{
  110. VolumeId: uint32(volumeId),
  111. Collection: collection,
  112. })
  113. return genErr
  114. })
  115. return err
  116. }
  117. func spreadEcShards(ctx context.Context, commandEnv *CommandEnv, volumeId needle.VolumeId, collection string, existingLocations []wdclient.Location) (err error) {
  118. allEcNodes, totalFreeEcSlots, err := collectEcNodes(ctx, commandEnv, "")
  119. if err != nil {
  120. return err
  121. }
  122. if totalFreeEcSlots < erasure_coding.TotalShardsCount {
  123. return fmt.Errorf("not enough free ec shard slots. only %d left", totalFreeEcSlots)
  124. }
  125. allocatedDataNodes := allEcNodes
  126. if len(allocatedDataNodes) > erasure_coding.TotalShardsCount {
  127. allocatedDataNodes = allocatedDataNodes[:erasure_coding.TotalShardsCount]
  128. }
  129. // calculate how many shards to allocate for these servers
  130. allocatedEcIds := balancedEcDistribution(allocatedDataNodes)
  131. // ask the data nodes to copy from the source volume server
  132. copiedShardIds, err := parallelCopyEcShardsFromSource(ctx, commandEnv.option.GrpcDialOption, allocatedDataNodes, allocatedEcIds, volumeId, collection, existingLocations[0])
  133. if err != nil {
  134. return err
  135. }
  136. // unmount the to be deleted shards
  137. err = unmountEcShards(ctx, commandEnv.option.GrpcDialOption, volumeId, existingLocations[0].Url, copiedShardIds)
  138. if err != nil {
  139. return err
  140. }
  141. // ask the source volume server to clean up copied ec shards
  142. err = sourceServerDeleteEcShards(ctx, commandEnv.option.GrpcDialOption, collection, volumeId, existingLocations[0].Url, copiedShardIds)
  143. if err != nil {
  144. return fmt.Errorf("source delete copied ecShards %s %d.%v: %v", existingLocations[0].Url, volumeId, copiedShardIds, err)
  145. }
  146. // ask the source volume server to delete the original volume
  147. for _, location := range existingLocations {
  148. err = deleteVolume(ctx, commandEnv.option.GrpcDialOption, volumeId, location.Url)
  149. if err != nil {
  150. return fmt.Errorf("deleteVolume %s volume %d: %v", location.Url, volumeId, err)
  151. }
  152. }
  153. return err
  154. }
  155. func parallelCopyEcShardsFromSource(ctx context.Context, grpcDialOption grpc.DialOption,
  156. targetServers []*EcNode, allocatedEcIds [][]uint32,
  157. volumeId needle.VolumeId, collection string, existingLocation wdclient.Location) (actuallyCopied []uint32, err error) {
  158. // parallelize
  159. shardIdChan := make(chan []uint32, len(targetServers))
  160. var wg sync.WaitGroup
  161. for i, server := range targetServers {
  162. if len(allocatedEcIds[i]) <= 0 {
  163. continue
  164. }
  165. wg.Add(1)
  166. go func(server *EcNode, allocatedEcShardIds []uint32) {
  167. defer wg.Done()
  168. copiedShardIds, copyErr := oneServerCopyAndMountEcShardsFromSource(ctx, grpcDialOption, server,
  169. allocatedEcShardIds, volumeId, collection, existingLocation.Url)
  170. if copyErr != nil {
  171. err = copyErr
  172. } else {
  173. shardIdChan <- copiedShardIds
  174. server.addEcVolumeShards(volumeId, collection, copiedShardIds)
  175. }
  176. }(server, allocatedEcIds[i])
  177. }
  178. wg.Wait()
  179. close(shardIdChan)
  180. if err != nil {
  181. return nil, err
  182. }
  183. for shardIds := range shardIdChan {
  184. actuallyCopied = append(actuallyCopied, shardIds...)
  185. }
  186. return
  187. }
  188. func balancedEcDistribution(servers []*EcNode) (allocated [][]uint32) {
  189. allocated = make([][]uint32, len(servers))
  190. allocatedShardIdIndex := uint32(0)
  191. serverIndex := 0
  192. for allocatedShardIdIndex < erasure_coding.TotalShardsCount {
  193. if servers[serverIndex].freeEcSlot > 0 {
  194. allocated[serverIndex] = append(allocated[serverIndex], allocatedShardIdIndex)
  195. allocatedShardIdIndex++
  196. }
  197. serverIndex++
  198. if serverIndex >= len(servers) {
  199. serverIndex = 0
  200. }
  201. }
  202. return allocated
  203. }
  204. func collectVolumeIdsForEcEncode(ctx context.Context, commandEnv *CommandEnv, selectedCollection string, fullPercentage float64, quietPeriod time.Duration) (vids []needle.VolumeId, err error) {
  205. var resp *master_pb.VolumeListResponse
  206. err = commandEnv.MasterClient.WithClient(ctx, func(client master_pb.SeaweedClient) error {
  207. resp, err = client.VolumeList(ctx, &master_pb.VolumeListRequest{})
  208. return err
  209. })
  210. if err != nil {
  211. return
  212. }
  213. quietSeconds := int64(quietPeriod / time.Second)
  214. nowUnixSeconds := time.Now().Unix()
  215. fmt.Printf("ec encode volumes quiet for: %d seconds\n", quietSeconds)
  216. vidMap := make(map[uint32]bool)
  217. eachDataNode(resp.TopologyInfo, func(dc string, rack RackId, dn *master_pb.DataNodeInfo) {
  218. for _, v := range dn.VolumeInfos {
  219. if v.Collection == selectedCollection && v.ModifiedAtSecond+quietSeconds < nowUnixSeconds {
  220. if float64(v.Size) > fullPercentage/100*float64(resp.VolumeSizeLimitMb)*1024*1024 {
  221. vidMap[v.Id] = true
  222. }
  223. }
  224. }
  225. })
  226. for vid := range vidMap {
  227. vids = append(vids, needle.VolumeId(vid))
  228. }
  229. return
  230. }