command_ec_encode.go 9.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304
  1. package shell
  2. import (
  3. "context"
  4. "flag"
  5. "fmt"
  6. "io"
  7. "sync"
  8. "time"
  9. "google.golang.org/grpc"
  10. "github.com/chrislusf/seaweedfs/weed/operation"
  11. "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
  12. "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
  13. "github.com/chrislusf/seaweedfs/weed/storage/erasure_coding"
  14. "github.com/chrislusf/seaweedfs/weed/storage/needle"
  15. "github.com/chrislusf/seaweedfs/weed/wdclient"
  16. )
  17. func init() {
  18. Commands = append(Commands, &commandEcEncode{})
  19. }
  20. type commandEcEncode struct {
  21. }
  22. func (c *commandEcEncode) Name() string {
  23. return "ec.encode"
  24. }
  25. func (c *commandEcEncode) Help() string {
  26. return `apply erasure coding to a volume
  27. ec.encode [-collection=""] [-fullPercent=95] [-quietFor=1h]
  28. ec.encode [-collection=""] [-volumeId=<volume_id>]
  29. This command will:
  30. 1. freeze one volume
  31. 2. apply erasure coding to the volume
  32. 3. move the encoded shards to multiple volume servers
  33. The erasure coding is 10.4. So ideally you have more than 14 volume servers, and you can afford
  34. to lose 4 volume servers.
  35. If the number of volumes are not high, the worst case is that you only have 4 volume servers,
  36. and the shards are spread as 4,4,3,3, respectively. You can afford to lose one volume server.
  37. If you only have less than 4 volume servers, with erasure coding, at least you can afford to
  38. have 4 corrupted shard files.
  39. `
  40. }
  41. func (c *commandEcEncode) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
  42. if err = commandEnv.confirmIsLocked(); err != nil {
  43. return
  44. }
  45. encodeCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
  46. volumeId := encodeCommand.Int("volumeId", 0, "the volume id")
  47. collection := encodeCommand.String("collection", "", "the collection name")
  48. fullPercentage := encodeCommand.Float64("fullPercent", 95, "the volume reaches the percentage of max volume size")
  49. quietPeriod := encodeCommand.Duration("quietFor", time.Hour, "select volumes without no writes for this period")
  50. parallelCopy := encodeCommand.Bool("parallelCopy", true, "copy shards in parallel")
  51. if err = encodeCommand.Parse(args); err != nil {
  52. return nil
  53. }
  54. vid := needle.VolumeId(*volumeId)
  55. // volumeId is provided
  56. if vid != 0 {
  57. return doEcEncode(commandEnv, *collection, vid, *parallelCopy)
  58. }
  59. // apply to all volumes in the collection
  60. volumeIds, err := collectVolumeIdsForEcEncode(commandEnv, *collection, *fullPercentage, *quietPeriod)
  61. if err != nil {
  62. return err
  63. }
  64. fmt.Printf("ec encode volumes: %v\n", volumeIds)
  65. for _, vid := range volumeIds {
  66. if err = doEcEncode(commandEnv, *collection, vid, *parallelCopy); err != nil {
  67. return err
  68. }
  69. }
  70. return nil
  71. }
  72. func doEcEncode(commandEnv *CommandEnv, collection string, vid needle.VolumeId, parallelCopy bool) (err error) {
  73. // find volume location
  74. locations, found := commandEnv.MasterClient.GetLocations(uint32(vid))
  75. if !found {
  76. return fmt.Errorf("volume %d not found", vid)
  77. }
  78. // fmt.Printf("found ec %d shards on %v\n", vid, locations)
  79. // mark the volume as readonly
  80. err = markVolumeReadonly(commandEnv.option.GrpcDialOption, vid, locations)
  81. if err != nil {
  82. return fmt.Errorf("mark volume %d as readonly on %s: %v", vid, locations[0].Url, err)
  83. }
  84. // generate ec shards
  85. err = generateEcShards(commandEnv.option.GrpcDialOption, vid, collection, locations[0].Url)
  86. if err != nil {
  87. return fmt.Errorf("generate ec shards for volume %d on %s: %v", vid, locations[0].Url, err)
  88. }
  89. // balance the ec shards to current cluster
  90. err = spreadEcShards(commandEnv, vid, collection, locations, parallelCopy)
  91. if err != nil {
  92. return fmt.Errorf("spread ec shards for volume %d from %s: %v", vid, locations[0].Url, err)
  93. }
  94. return nil
  95. }
  96. func markVolumeReadonly(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, locations []wdclient.Location) error {
  97. for _, location := range locations {
  98. fmt.Printf("markVolumeReadonly %d on %s ...\n", volumeId, location.Url)
  99. err := operation.WithVolumeServerClient(location.Url, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
  100. _, markErr := volumeServerClient.VolumeMarkReadonly(context.Background(), &volume_server_pb.VolumeMarkReadonlyRequest{
  101. VolumeId: uint32(volumeId),
  102. })
  103. return markErr
  104. })
  105. if err != nil {
  106. return err
  107. }
  108. }
  109. return nil
  110. }
  111. func generateEcShards(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, collection string, sourceVolumeServer string) error {
  112. fmt.Printf("generateEcShards %s %d on %s ...\n", collection, volumeId, sourceVolumeServer)
  113. err := operation.WithVolumeServerClient(sourceVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
  114. _, genErr := volumeServerClient.VolumeEcShardsGenerate(context.Background(), &volume_server_pb.VolumeEcShardsGenerateRequest{
  115. VolumeId: uint32(volumeId),
  116. Collection: collection,
  117. })
  118. return genErr
  119. })
  120. return err
  121. }
  122. func spreadEcShards(commandEnv *CommandEnv, volumeId needle.VolumeId, collection string, existingLocations []wdclient.Location, parallelCopy bool) (err error) {
  123. allEcNodes, totalFreeEcSlots, err := collectEcNodes(commandEnv, "")
  124. if err != nil {
  125. return err
  126. }
  127. if totalFreeEcSlots < erasure_coding.TotalShardsCount {
  128. return fmt.Errorf("not enough free ec shard slots. only %d left", totalFreeEcSlots)
  129. }
  130. allocatedDataNodes := allEcNodes
  131. if len(allocatedDataNodes) > erasure_coding.TotalShardsCount {
  132. allocatedDataNodes = allocatedDataNodes[:erasure_coding.TotalShardsCount]
  133. }
  134. // calculate how many shards to allocate for these servers
  135. allocatedEcIds := balancedEcDistribution(allocatedDataNodes)
  136. // ask the data nodes to copy from the source volume server
  137. copiedShardIds, err := parallelCopyEcShardsFromSource(commandEnv.option.GrpcDialOption, allocatedDataNodes, allocatedEcIds, volumeId, collection, existingLocations[0], parallelCopy)
  138. if err != nil {
  139. return err
  140. }
  141. // unmount the to be deleted shards
  142. err = unmountEcShards(commandEnv.option.GrpcDialOption, volumeId, existingLocations[0].Url, copiedShardIds)
  143. if err != nil {
  144. return err
  145. }
  146. // ask the source volume server to clean up copied ec shards
  147. err = sourceServerDeleteEcShards(commandEnv.option.GrpcDialOption, collection, volumeId, existingLocations[0].Url, copiedShardIds)
  148. if err != nil {
  149. return fmt.Errorf("source delete copied ecShards %s %d.%v: %v", existingLocations[0].Url, volumeId, copiedShardIds, err)
  150. }
  151. // ask the source volume server to delete the original volume
  152. for _, location := range existingLocations {
  153. fmt.Printf("delete volume %d from %s\n", volumeId, location.Url)
  154. err = deleteVolume(commandEnv.option.GrpcDialOption, volumeId, location.Url)
  155. if err != nil {
  156. return fmt.Errorf("deleteVolume %s volume %d: %v", location.Url, volumeId, err)
  157. }
  158. }
  159. return err
  160. }
  161. func parallelCopyEcShardsFromSource(grpcDialOption grpc.DialOption, targetServers []*EcNode, allocatedEcIds [][]uint32, volumeId needle.VolumeId, collection string, existingLocation wdclient.Location, parallelCopy bool) (actuallyCopied []uint32, err error) {
  162. fmt.Printf("parallelCopyEcShardsFromSource %d %s\n", volumeId, existingLocation.Url)
  163. var wg sync.WaitGroup
  164. shardIdChan := make(chan []uint32, len(targetServers))
  165. copyFunc := func(server *EcNode, allocatedEcShardIds []uint32) {
  166. defer wg.Done()
  167. copiedShardIds, copyErr := oneServerCopyAndMountEcShardsFromSource(grpcDialOption, server,
  168. allocatedEcShardIds, volumeId, collection, existingLocation.Url)
  169. if copyErr != nil {
  170. err = copyErr
  171. } else {
  172. shardIdChan <- copiedShardIds
  173. server.addEcVolumeShards(volumeId, collection, copiedShardIds)
  174. }
  175. }
  176. // maybe parallelize
  177. for i, server := range targetServers {
  178. if len(allocatedEcIds[i]) <= 0 {
  179. continue
  180. }
  181. wg.Add(1)
  182. if parallelCopy {
  183. go copyFunc(server, allocatedEcIds[i])
  184. } else {
  185. copyFunc(server, allocatedEcIds[i])
  186. }
  187. }
  188. wg.Wait()
  189. close(shardIdChan)
  190. if err != nil {
  191. return nil, err
  192. }
  193. for shardIds := range shardIdChan {
  194. actuallyCopied = append(actuallyCopied, shardIds...)
  195. }
  196. return
  197. }
  198. func balancedEcDistribution(servers []*EcNode) (allocated [][]uint32) {
  199. allocated = make([][]uint32, len(servers))
  200. allocatedShardIdIndex := uint32(0)
  201. serverIndex := 0
  202. for allocatedShardIdIndex < erasure_coding.TotalShardsCount {
  203. if servers[serverIndex].freeEcSlot > 0 {
  204. allocated[serverIndex] = append(allocated[serverIndex], allocatedShardIdIndex)
  205. allocatedShardIdIndex++
  206. }
  207. serverIndex++
  208. if serverIndex >= len(servers) {
  209. serverIndex = 0
  210. }
  211. }
  212. return allocated
  213. }
  214. func collectVolumeIdsForEcEncode(commandEnv *CommandEnv, selectedCollection string, fullPercentage float64, quietPeriod time.Duration) (vids []needle.VolumeId, err error) {
  215. // collect topology information
  216. topologyInfo, volumeSizeLimitMb, err := collectTopologyInfo(commandEnv)
  217. if err != nil {
  218. return
  219. }
  220. quietSeconds := int64(quietPeriod / time.Second)
  221. nowUnixSeconds := time.Now().Unix()
  222. fmt.Printf("collect volumes quiet for: %d seconds\n", quietSeconds)
  223. vidMap := make(map[uint32]bool)
  224. eachDataNode(topologyInfo, func(dc string, rack RackId, dn *master_pb.DataNodeInfo) {
  225. for _, diskInfo := range dn.DiskInfos {
  226. for _, v := range diskInfo.VolumeInfos {
  227. if v.Collection == selectedCollection && v.ModifiedAtSecond+quietSeconds < nowUnixSeconds {
  228. if float64(v.Size) > fullPercentage/100*float64(volumeSizeLimitMb)*1024*1024 {
  229. vidMap[v.Id] = true
  230. }
  231. }
  232. }
  233. }
  234. })
  235. for vid := range vidMap {
  236. vids = append(vids, needle.VolumeId(vid))
  237. }
  238. return
  239. }