command_ec_decode.go 8.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268
  1. package shell
  2. import (
  3. "context"
  4. "flag"
  5. "fmt"
  6. "io"
  7. "google.golang.org/grpc"
  8. "github.com/chrislusf/seaweedfs/weed/operation"
  9. "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
  10. "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
  11. "github.com/chrislusf/seaweedfs/weed/storage/erasure_coding"
  12. "github.com/chrislusf/seaweedfs/weed/storage/needle"
  13. )
  14. func init() {
  15. Commands = append(Commands, &commandEcDecode{})
  16. }
  17. type commandEcDecode struct {
  18. }
  19. func (c *commandEcDecode) Name() string {
  20. return "ec.decode"
  21. }
  22. func (c *commandEcDecode) Help() string {
  23. return `decode a erasure coded volume into a normal volume
  24. ec.decode [-collection=""] [-volumeId=<volume_id>]
  25. `
  26. }
  27. func (c *commandEcDecode) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
  28. if err = commandEnv.confirmIsLocked(); err != nil {
  29. return
  30. }
  31. encodeCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
  32. volumeId := encodeCommand.Int("volumeId", 0, "the volume id")
  33. collection := encodeCommand.String("collection", "", "the collection name")
  34. if err = encodeCommand.Parse(args); err != nil {
  35. return nil
  36. }
  37. vid := needle.VolumeId(*volumeId)
  38. // collect topology information
  39. topologyInfo, err := collectTopologyInfo(commandEnv)
  40. if err != nil {
  41. return err
  42. }
  43. // volumeId is provided
  44. if vid != 0 {
  45. return doEcDecode(commandEnv, topologyInfo, *collection, vid)
  46. }
  47. // apply to all volumes in the collection
  48. volumeIds := collectEcShardIds(topologyInfo, *collection)
  49. fmt.Printf("ec encode volumes: %v\n", volumeIds)
  50. for _, vid := range volumeIds {
  51. if err = doEcDecode(commandEnv, topologyInfo, *collection, vid); err != nil {
  52. return err
  53. }
  54. }
  55. return nil
  56. }
  57. func doEcDecode(commandEnv *CommandEnv, topoInfo *master_pb.TopologyInfo, collection string, vid needle.VolumeId) (err error) {
  58. // find volume location
  59. nodeToEcIndexBits := collectEcNodeShardBits(topoInfo, vid)
  60. fmt.Printf("ec volume %d shard locations: %+v\n", vid, nodeToEcIndexBits)
  61. // collect ec shards to the server with most space
  62. targetNodeLocation, err := collectEcShards(commandEnv, nodeToEcIndexBits, collection, vid)
  63. if err != nil {
  64. return fmt.Errorf("collectEcShards for volume %d: %v", vid, err)
  65. }
  66. // generate a normal volume
  67. err = generateNormalVolume(commandEnv.option.GrpcDialOption, needle.VolumeId(vid), collection, targetNodeLocation)
  68. if err != nil {
  69. return fmt.Errorf("generate normal volume %d on %s: %v", vid, targetNodeLocation, err)
  70. }
  71. // delete the previous ec shards
  72. err = mountVolumeAndDeleteEcShards(commandEnv.option.GrpcDialOption, collection, targetNodeLocation, nodeToEcIndexBits, vid)
  73. if err != nil {
  74. return fmt.Errorf("delete ec shards for volume %d: %v", vid, err)
  75. }
  76. return nil
  77. }
  78. func mountVolumeAndDeleteEcShards(grpcDialOption grpc.DialOption, collection, targetNodeLocation string, nodeToEcIndexBits map[string]erasure_coding.ShardBits, vid needle.VolumeId) error {
  79. // mount volume
  80. if err := operation.WithVolumeServerClient(targetNodeLocation, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
  81. _, mountErr := volumeServerClient.VolumeMount(context.Background(), &volume_server_pb.VolumeMountRequest{
  82. VolumeId: uint32(vid),
  83. })
  84. return mountErr
  85. }); err != nil {
  86. return fmt.Errorf("mountVolumeAndDeleteEcShards mount volume %d on %s: %v", vid, targetNodeLocation, err)
  87. }
  88. // unmount ec shards
  89. for location, ecIndexBits := range nodeToEcIndexBits {
  90. fmt.Printf("unmount ec volume %d on %s has shards: %+v\n", vid, location, ecIndexBits.ShardIds())
  91. err := unmountEcShards(grpcDialOption, vid, location, ecIndexBits.ToUint32Slice())
  92. if err != nil {
  93. return fmt.Errorf("mountVolumeAndDeleteEcShards unmount ec volume %d on %s: %v", vid, location, err)
  94. }
  95. }
  96. // delete ec shards
  97. for location, ecIndexBits := range nodeToEcIndexBits {
  98. fmt.Printf("delete ec volume %d on %s has shards: %+v\n", vid, location, ecIndexBits.ShardIds())
  99. err := sourceServerDeleteEcShards(grpcDialOption, collection, vid, location, ecIndexBits.ToUint32Slice())
  100. if err != nil {
  101. return fmt.Errorf("mountVolumeAndDeleteEcShards delete ec volume %d on %s: %v", vid, location, err)
  102. }
  103. }
  104. return nil
  105. }
  106. func generateNormalVolume(grpcDialOption grpc.DialOption, vid needle.VolumeId, collection string, sourceVolumeServer string) error {
  107. fmt.Printf("generateNormalVolume from ec volume %d on %s\n", vid, sourceVolumeServer)
  108. err := operation.WithVolumeServerClient(sourceVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
  109. _, genErr := volumeServerClient.VolumeEcShardsToVolume(context.Background(), &volume_server_pb.VolumeEcShardsToVolumeRequest{
  110. VolumeId: uint32(vid),
  111. Collection: collection,
  112. })
  113. return genErr
  114. })
  115. return err
  116. }
  117. func collectEcShards(commandEnv *CommandEnv, nodeToEcIndexBits map[string]erasure_coding.ShardBits, collection string, vid needle.VolumeId) (targetNodeLocation string, err error) {
  118. maxShardCount := 0
  119. var exisitngEcIndexBits erasure_coding.ShardBits
  120. for loc, ecIndexBits := range nodeToEcIndexBits {
  121. toBeCopiedShardCount := ecIndexBits.MinusParityShards().ShardIdCount()
  122. if toBeCopiedShardCount > maxShardCount {
  123. maxShardCount = toBeCopiedShardCount
  124. targetNodeLocation = loc
  125. exisitngEcIndexBits = ecIndexBits
  126. }
  127. }
  128. fmt.Printf("collectEcShards: ec volume %d collect shards to %s from: %+v\n", vid, targetNodeLocation, nodeToEcIndexBits)
  129. var copiedEcIndexBits erasure_coding.ShardBits
  130. for loc, ecIndexBits := range nodeToEcIndexBits {
  131. if loc == targetNodeLocation {
  132. continue
  133. }
  134. needToCopyEcIndexBits := ecIndexBits.Minus(exisitngEcIndexBits).MinusParityShards()
  135. if needToCopyEcIndexBits.ShardIdCount() == 0 {
  136. continue
  137. }
  138. err = operation.WithVolumeServerClient(targetNodeLocation, commandEnv.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
  139. fmt.Printf("copy %d.%v %s => %s\n", vid, needToCopyEcIndexBits.ShardIds(), loc, targetNodeLocation)
  140. _, copyErr := volumeServerClient.VolumeEcShardsCopy(context.Background(), &volume_server_pb.VolumeEcShardsCopyRequest{
  141. VolumeId: uint32(vid),
  142. Collection: collection,
  143. ShardIds: needToCopyEcIndexBits.ToUint32Slice(),
  144. CopyEcxFile: false,
  145. CopyEcjFile: true,
  146. CopyVifFile: true,
  147. SourceDataNode: loc,
  148. })
  149. if copyErr != nil {
  150. return fmt.Errorf("copy %d.%v %s => %s : %v\n", vid, needToCopyEcIndexBits.ShardIds(), loc, targetNodeLocation, copyErr)
  151. }
  152. return nil
  153. })
  154. if err != nil {
  155. break
  156. }
  157. copiedEcIndexBits = copiedEcIndexBits.Plus(needToCopyEcIndexBits)
  158. }
  159. nodeToEcIndexBits[targetNodeLocation] = exisitngEcIndexBits.Plus(copiedEcIndexBits)
  160. return targetNodeLocation, err
  161. }
  162. func collectTopologyInfo(commandEnv *CommandEnv) (topoInfo *master_pb.TopologyInfo, err error) {
  163. var resp *master_pb.VolumeListResponse
  164. err = commandEnv.MasterClient.WithClient(func(client master_pb.SeaweedClient) error {
  165. resp, err = client.VolumeList(context.Background(), &master_pb.VolumeListRequest{})
  166. return err
  167. })
  168. if err != nil {
  169. return
  170. }
  171. return resp.TopologyInfo, nil
  172. }
  173. func collectEcShardInfos(topoInfo *master_pb.TopologyInfo, selectedCollection string, vid needle.VolumeId) (ecShardInfos []*master_pb.VolumeEcShardInformationMessage) {
  174. eachDataNode(topoInfo, func(dc string, rack RackId, dn *master_pb.DataNodeInfo) {
  175. for _, v := range dn.EcShardInfos {
  176. if v.Collection == selectedCollection && v.Id == uint32(vid) {
  177. ecShardInfos = append(ecShardInfos, v)
  178. }
  179. }
  180. })
  181. return
  182. }
  183. func collectEcShardIds(topoInfo *master_pb.TopologyInfo, selectedCollection string) (vids []needle.VolumeId) {
  184. vidMap := make(map[uint32]bool)
  185. eachDataNode(topoInfo, func(dc string, rack RackId, dn *master_pb.DataNodeInfo) {
  186. for _, v := range dn.EcShardInfos {
  187. if v.Collection == selectedCollection {
  188. vidMap[v.Id] = true
  189. }
  190. }
  191. })
  192. for vid := range vidMap {
  193. vids = append(vids, needle.VolumeId(vid))
  194. }
  195. return
  196. }
  197. func collectEcNodeShardBits(topoInfo *master_pb.TopologyInfo, vid needle.VolumeId) map[string]erasure_coding.ShardBits {
  198. nodeToEcIndexBits := make(map[string]erasure_coding.ShardBits)
  199. eachDataNode(topoInfo, func(dc string, rack RackId, dn *master_pb.DataNodeInfo) {
  200. for _, v := range dn.EcShardInfos {
  201. if v.Id == uint32(vid) {
  202. nodeToEcIndexBits[dn.Id] = erasure_coding.ShardBits(v.EcIndexBits)
  203. }
  204. }
  205. })
  206. return nodeToEcIndexBits
  207. }