command_volume_fsck.go 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610
  1. package shell
  2. import (
  3. "bufio"
  4. "context"
  5. "flag"
  6. "fmt"
  7. "io"
  8. "io/ioutil"
  9. "math"
  10. "net/http"
  11. "net/url"
  12. "os"
  13. "path/filepath"
  14. "sync"
  15. "github.com/chrislusf/seaweedfs/weed/filer"
  16. "github.com/chrislusf/seaweedfs/weed/operation"
  17. "github.com/chrislusf/seaweedfs/weed/pb"
  18. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  19. "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
  20. "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
  21. "github.com/chrislusf/seaweedfs/weed/storage/needle"
  22. "github.com/chrislusf/seaweedfs/weed/storage/needle_map"
  23. "github.com/chrislusf/seaweedfs/weed/storage/types"
  24. "github.com/chrislusf/seaweedfs/weed/util"
  25. )
  26. func init() {
  27. Commands = append(Commands, &commandVolumeFsck{})
  28. }
  29. type commandVolumeFsck struct {
  30. env *CommandEnv
  31. }
  32. func (c *commandVolumeFsck) Name() string {
  33. return "volume.fsck"
  34. }
  35. func (c *commandVolumeFsck) Help() string {
  36. return `check all volumes to find entries not used by the filer
  37. Important assumption!!!
  38. the system is all used by one filer.
  39. This command works this way:
  40. 1. collect all file ids from all volumes, as set A
  41. 2. collect all file ids from the filer, as set B
  42. 3. find out the set A subtract B
  43. If -findMissingChunksInFiler is enabled, this works
  44. in a reverse way:
  45. 1. collect all file ids from all volumes, as set A
  46. 2. collect all file ids from the filer, as set B
  47. 3. find out the set B subtract A
  48. `
  49. }
  50. func (c *commandVolumeFsck) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
  51. fsckCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
  52. verbose := fsckCommand.Bool("v", false, "verbose mode")
  53. findMissingChunksInFiler := fsckCommand.Bool("findMissingChunksInFiler", false, "see \"help volume.fsck\"")
  54. findMissingChunksInFilerPath := fsckCommand.String("findMissingChunksInFilerPath", "/", "used together with findMissingChunksInFiler")
  55. applyPurging := fsckCommand.Bool("reallyDeleteFromVolume", false, "<expert only!> after detection, delete missing data from volumes / delete missing file entries from filer")
  56. purgeAbsent := fsckCommand.Bool("reallyDeleteFilerEntries", false, "<expert only!> delete missing file entries from filer if the corresponding volume is missing for any reason, please ensure all still existing/expected volumes are connected! used together with findMissingChunksInFiler")
  57. if err = fsckCommand.Parse(args); err != nil {
  58. return nil
  59. }
  60. if err = commandEnv.confirmIsLocked(args); err != nil {
  61. return
  62. }
  63. c.env = commandEnv
  64. // create a temp folder
  65. tempFolder, err := os.MkdirTemp("", "sw_fsck")
  66. if err != nil {
  67. return fmt.Errorf("failed to create temp folder: %v", err)
  68. }
  69. if *verbose {
  70. fmt.Fprintf(writer, "working directory: %s\n", tempFolder)
  71. }
  72. defer os.RemoveAll(tempFolder)
  73. // collect all volume id locations
  74. volumeIdToVInfo, err := c.collectVolumeIds(commandEnv, *verbose, writer)
  75. if err != nil {
  76. return fmt.Errorf("failed to collect all volume locations: %v", err)
  77. }
  78. // collect each volume file ids
  79. for volumeId, vinfo := range volumeIdToVInfo {
  80. err = c.collectOneVolumeFileIds(tempFolder, volumeId, vinfo, *verbose, writer)
  81. if err != nil {
  82. return fmt.Errorf("failed to collect file ids from volume %d on %s: %v", volumeId, vinfo.server, err)
  83. }
  84. }
  85. if *findMissingChunksInFiler {
  86. // collect all filer file ids and paths
  87. if err = c.collectFilerFileIdAndPaths(volumeIdToVInfo, tempFolder, writer, *findMissingChunksInFilerPath, *verbose, *purgeAbsent); err != nil {
  88. return fmt.Errorf("collectFilerFileIdAndPaths: %v", err)
  89. }
  90. // for each volume, check filer file ids
  91. if err = c.findFilerChunksMissingInVolumeServers(volumeIdToVInfo, tempFolder, writer, *verbose, *applyPurging); err != nil {
  92. return fmt.Errorf("findFilerChunksMissingInVolumeServers: %v", err)
  93. }
  94. } else {
  95. // collect all filer file ids
  96. if err = c.collectFilerFileIds(volumeIdToVInfo, tempFolder, writer, *verbose); err != nil {
  97. return fmt.Errorf("failed to collect file ids from filer: %v", err)
  98. }
  99. // volume file ids subtract filer file ids
  100. if err = c.findExtraChunksInVolumeServers(volumeIdToVInfo, tempFolder, writer, *verbose, *applyPurging); err != nil {
  101. return fmt.Errorf("findExtraChunksInVolumeServers: %v", err)
  102. }
  103. }
  104. return nil
  105. }
  106. func (c *commandVolumeFsck) collectFilerFileIdAndPaths(volumeIdToServer map[uint32]VInfo, tempFolder string, writer io.Writer, filerPath string, verbose bool, purgeAbsent bool) error {
  107. if verbose {
  108. fmt.Fprintf(writer, "checking each file from filer ...\n")
  109. }
  110. files := make(map[uint32]*os.File)
  111. for vid := range volumeIdToServer {
  112. dst, openErr := os.OpenFile(getFilerFileIdFile(tempFolder, vid), os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644)
  113. if openErr != nil {
  114. return fmt.Errorf("failed to create file %s: %v", getFilerFileIdFile(tempFolder, vid), openErr)
  115. }
  116. files[vid] = dst
  117. }
  118. defer func() {
  119. for _, f := range files {
  120. f.Close()
  121. }
  122. }()
  123. type Item struct {
  124. vid uint32
  125. fileKey uint64
  126. cookie uint32
  127. path util.FullPath
  128. }
  129. return doTraverseBfsAndSaving(c.env, nil, filerPath, false, func(entry *filer_pb.FullEntry, outputChan chan interface{}) (err error) {
  130. if verbose && entry.Entry.IsDirectory {
  131. fmt.Fprintf(writer, "checking directory %s\n", util.NewFullPath(entry.Dir, entry.Entry.Name))
  132. }
  133. dChunks, mChunks, resolveErr := filer.ResolveChunkManifest(filer.LookupFn(c.env), entry.Entry.Chunks, 0, math.MaxInt64)
  134. if resolveErr != nil {
  135. return nil
  136. }
  137. dChunks = append(dChunks, mChunks...)
  138. for _, chunk := range dChunks {
  139. outputChan <- &Item{
  140. vid: chunk.Fid.VolumeId,
  141. fileKey: chunk.Fid.FileKey,
  142. cookie: chunk.Fid.Cookie,
  143. path: util.NewFullPath(entry.Dir, entry.Entry.Name),
  144. }
  145. }
  146. return nil
  147. }, func(outputChan chan interface{}) {
  148. buffer := make([]byte, 16)
  149. for item := range outputChan {
  150. i := item.(*Item)
  151. if f, ok := files[i.vid]; ok {
  152. util.Uint64toBytes(buffer, i.fileKey)
  153. util.Uint32toBytes(buffer[8:], i.cookie)
  154. util.Uint32toBytes(buffer[12:], uint32(len(i.path)))
  155. f.Write(buffer)
  156. f.Write([]byte(i.path))
  157. // fmt.Fprintf(writer, "%d,%x%08x %d %s\n", i.vid, i.fileKey, i.cookie, len(i.path), i.path)
  158. } else {
  159. fmt.Fprintf(writer, "%d,%x%08x %s volume not found\n", i.vid, i.fileKey, i.cookie, i.path)
  160. if purgeAbsent {
  161. fmt.Printf("deleting path %s after volume not found", i.path)
  162. c.httpDelete(i.path, verbose)
  163. }
  164. }
  165. }
  166. })
  167. }
  168. func (c *commandVolumeFsck) findFilerChunksMissingInVolumeServers(volumeIdToVInfo map[uint32]VInfo, tempFolder string, writer io.Writer, verbose bool, applyPurging bool) error {
  169. for volumeId, vinfo := range volumeIdToVInfo {
  170. checkErr := c.oneVolumeFileIdsCheckOneVolume(tempFolder, volumeId, writer, verbose, applyPurging)
  171. if checkErr != nil {
  172. return fmt.Errorf("failed to collect file ids from volume %d on %s: %v", volumeId, vinfo.server, checkErr)
  173. }
  174. }
  175. return nil
  176. }
  177. func (c *commandVolumeFsck) findExtraChunksInVolumeServers(volumeIdToVInfo map[uint32]VInfo, tempFolder string, writer io.Writer, verbose bool, applyPurging bool) error {
  178. var totalInUseCount, totalOrphanChunkCount, totalOrphanDataSize uint64
  179. for volumeId, vinfo := range volumeIdToVInfo {
  180. inUseCount, orphanFileIds, orphanDataSize, checkErr := c.oneVolumeFileIdsSubtractFilerFileIds(tempFolder, volumeId, writer, verbose)
  181. if checkErr != nil {
  182. return fmt.Errorf("failed to collect file ids from volume %d on %s: %v", volumeId, vinfo.server, checkErr)
  183. }
  184. totalInUseCount += inUseCount
  185. totalOrphanChunkCount += uint64(len(orphanFileIds))
  186. totalOrphanDataSize += orphanDataSize
  187. if verbose {
  188. for _, fid := range orphanFileIds {
  189. fmt.Fprintf(writer, "%s\n", fid)
  190. }
  191. }
  192. if applyPurging && len(orphanFileIds) > 0 {
  193. if verbose {
  194. fmt.Fprintf(writer, "purging process for volume %d", volumeId)
  195. }
  196. if vinfo.isEcVolume {
  197. fmt.Fprintf(writer, "skip purging for Erasure Coded volume %d.\n", volumeId)
  198. continue
  199. }
  200. needleVID := needle.VolumeId(volumeId)
  201. if vinfo.isReadOnly {
  202. err := markVolumeWritable(c.env.option.GrpcDialOption, needleVID, vinfo.server, true)
  203. if err != nil {
  204. return fmt.Errorf("mark volume %d read/write: %v", volumeId, err)
  205. }
  206. fmt.Fprintf(writer, "temporarily marked %d on server %v writable for forced purge\n", volumeId, vinfo.server)
  207. defer markVolumeWritable(c.env.option.GrpcDialOption, needleVID, vinfo.server, false)
  208. }
  209. fmt.Fprintf(writer, "marked %d on server %v writable for forced purge\n", volumeId, vinfo.server)
  210. if verbose {
  211. fmt.Fprintf(writer, "purging files from volume %d\n", volumeId)
  212. }
  213. if err := c.purgeFileIdsForOneVolume(volumeId, orphanFileIds, writer); err != nil {
  214. return fmt.Errorf("purging volume %d: %v", volumeId, err)
  215. }
  216. }
  217. }
  218. if !applyPurging {
  219. pct := float64(totalOrphanChunkCount*100) / (float64(totalOrphanChunkCount + totalInUseCount))
  220. fmt.Fprintf(writer, "\nTotal\t\tentries:%d\torphan:%d\t%.2f%%\t%dB\n",
  221. totalOrphanChunkCount+totalInUseCount, totalOrphanChunkCount, pct, totalOrphanDataSize)
  222. fmt.Fprintf(writer, "This could be normal if multiple filers or no filers are used.\n")
  223. }
  224. if totalOrphanChunkCount == 0 {
  225. fmt.Fprintf(writer, "no orphan data\n")
  226. //return nil
  227. }
  228. return nil
  229. }
  230. func (c *commandVolumeFsck) collectOneVolumeFileIds(tempFolder string, volumeId uint32, vinfo VInfo, verbose bool, writer io.Writer) error {
  231. if verbose {
  232. fmt.Fprintf(writer, "collecting volume %d file ids from %s ...\n", volumeId, vinfo.server)
  233. }
  234. return operation.WithVolumeServerClient(false, vinfo.server, c.env.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
  235. ext := ".idx"
  236. if vinfo.isEcVolume {
  237. ext = ".ecx"
  238. }
  239. copyFileClient, err := volumeServerClient.CopyFile(context.Background(), &volume_server_pb.CopyFileRequest{
  240. VolumeId: volumeId,
  241. Ext: ext,
  242. CompactionRevision: math.MaxUint32,
  243. StopOffset: math.MaxInt64,
  244. Collection: vinfo.collection,
  245. IsEcVolume: vinfo.isEcVolume,
  246. IgnoreSourceFileNotFound: false,
  247. })
  248. if err != nil {
  249. return fmt.Errorf("failed to start copying volume %d%s: %v", volumeId, ext, err)
  250. }
  251. err = writeToFile(copyFileClient, getVolumeFileIdFile(tempFolder, volumeId))
  252. if err != nil {
  253. return fmt.Errorf("failed to copy %d%s from %s: %v", volumeId, ext, vinfo.server, err)
  254. }
  255. return nil
  256. })
  257. }
  258. func (c *commandVolumeFsck) collectFilerFileIds(volumeIdToServer map[uint32]VInfo, tempFolder string, writer io.Writer, verbose bool) error {
  259. if verbose {
  260. fmt.Fprintf(writer, "collecting file ids from filer ...\n")
  261. }
  262. files := make(map[uint32]*os.File)
  263. for vid := range volumeIdToServer {
  264. dst, openErr := os.OpenFile(getFilerFileIdFile(tempFolder, vid), os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644)
  265. if openErr != nil {
  266. return fmt.Errorf("failed to create file %s: %v", getFilerFileIdFile(tempFolder, vid), openErr)
  267. }
  268. files[vid] = dst
  269. }
  270. defer func() {
  271. for _, f := range files {
  272. f.Close()
  273. }
  274. }()
  275. type Item struct {
  276. vid uint32
  277. fileKey uint64
  278. }
  279. return doTraverseBfsAndSaving(c.env, nil, "/", false, func(entry *filer_pb.FullEntry, outputChan chan interface{}) (err error) {
  280. dChunks, mChunks, resolveErr := filer.ResolveChunkManifest(filer.LookupFn(c.env), entry.Entry.Chunks, 0, math.MaxInt64)
  281. if resolveErr != nil {
  282. if verbose {
  283. fmt.Fprintf(writer, "resolving manifest chunks in %s: %v\n", util.NewFullPath(entry.Dir, entry.Entry.Name), resolveErr)
  284. }
  285. return nil
  286. }
  287. dChunks = append(dChunks, mChunks...)
  288. for _, chunk := range dChunks {
  289. outputChan <- &Item{
  290. vid: chunk.Fid.VolumeId,
  291. fileKey: chunk.Fid.FileKey,
  292. }
  293. }
  294. return nil
  295. }, func(outputChan chan interface{}) {
  296. buffer := make([]byte, 8)
  297. for item := range outputChan {
  298. i := item.(*Item)
  299. util.Uint64toBytes(buffer, i.fileKey)
  300. files[i.vid].Write(buffer)
  301. }
  302. })
  303. }
  304. func (c *commandVolumeFsck) oneVolumeFileIdsCheckOneVolume(tempFolder string, volumeId uint32, writer io.Writer, verbose bool, applyPurging bool) (err error) {
  305. if verbose {
  306. fmt.Fprintf(writer, "find missing file chunks in volume %d ...\n", volumeId)
  307. }
  308. db := needle_map.NewMemDb()
  309. defer db.Close()
  310. if err = db.LoadFromIdx(getVolumeFileIdFile(tempFolder, volumeId)); err != nil {
  311. return
  312. }
  313. file := getFilerFileIdFile(tempFolder, volumeId)
  314. fp, err := os.Open(file)
  315. if err != nil {
  316. return
  317. }
  318. defer fp.Close()
  319. type Item struct {
  320. fileKey uint64
  321. cookie uint32
  322. path util.FullPath
  323. }
  324. br := bufio.NewReader(fp)
  325. buffer := make([]byte, 16)
  326. item := &Item{}
  327. var readSize int
  328. for {
  329. readSize, err = io.ReadFull(br, buffer)
  330. if err != nil || readSize != 16 {
  331. break
  332. }
  333. item.fileKey = util.BytesToUint64(buffer[:8])
  334. item.cookie = util.BytesToUint32(buffer[8:12])
  335. pathSize := util.BytesToUint32(buffer[12:16])
  336. pathBytes := make([]byte, int(pathSize))
  337. n, err := io.ReadFull(br, pathBytes)
  338. if err != nil {
  339. fmt.Fprintf(writer, "%d,%x%08x in unexpected error: %v\n", volumeId, item.fileKey, item.cookie, err)
  340. }
  341. if n != int(pathSize) {
  342. fmt.Fprintf(writer, "%d,%x%08x %d unexpected file name size %d\n", volumeId, item.fileKey, item.cookie, pathSize, n)
  343. }
  344. item.path = util.FullPath(string(pathBytes))
  345. needleId := types.NeedleId(item.fileKey)
  346. if _, found := db.Get(needleId); !found {
  347. fmt.Fprintf(writer, "%s\n", item.path)
  348. if applyPurging {
  349. // defining the URL this way automatically escapes complex path names
  350. c.httpDelete(item.path, verbose)
  351. }
  352. }
  353. }
  354. return nil
  355. }
  356. func (c *commandVolumeFsck) httpDelete(path util.FullPath, verbose bool) {
  357. req, err := http.NewRequest(http.MethodDelete, "", nil)
  358. req.URL = &url.URL{
  359. Scheme: "http",
  360. Host: c.env.option.FilerAddress.ToHttpAddress(),
  361. Path: string(path),
  362. }
  363. if verbose {
  364. fmt.Printf("full HTTP delete request to be sent: %v\n", req)
  365. }
  366. if err != nil {
  367. fmt.Errorf("HTTP delete request error: %v\n", err)
  368. }
  369. client := &http.Client{}
  370. resp, err := client.Do(req)
  371. if err != nil {
  372. fmt.Errorf("DELETE fetch error: %v\n", err)
  373. }
  374. defer resp.Body.Close()
  375. _, err = ioutil.ReadAll(resp.Body)
  376. if err != nil {
  377. fmt.Errorf("DELETE response error: %v\n", err)
  378. }
  379. if verbose {
  380. fmt.Println("delete response Status : ", resp.Status)
  381. fmt.Println("delete response Headers : ", resp.Header)
  382. }
  383. }
  384. func (c *commandVolumeFsck) oneVolumeFileIdsSubtractFilerFileIds(tempFolder string, volumeId uint32, writer io.Writer, verbose bool) (inUseCount uint64, orphanFileIds []string, orphanDataSize uint64, err error) {
  385. db := needle_map.NewMemDb()
  386. defer db.Close()
  387. if err = db.LoadFromIdx(getVolumeFileIdFile(tempFolder, volumeId)); err != nil {
  388. return
  389. }
  390. filerFileIdsData, err := os.ReadFile(getFilerFileIdFile(tempFolder, volumeId))
  391. if err != nil {
  392. return
  393. }
  394. dataLen := len(filerFileIdsData)
  395. if dataLen%8 != 0 {
  396. return 0, nil, 0, fmt.Errorf("filer data is corrupted")
  397. }
  398. for i := 0; i < len(filerFileIdsData); i += 8 {
  399. fileKey := util.BytesToUint64(filerFileIdsData[i : i+8])
  400. db.Delete(types.NeedleId(fileKey))
  401. inUseCount++
  402. }
  403. var orphanFileCount uint64
  404. db.AscendingVisit(func(n needle_map.NeedleValue) error {
  405. // fmt.Printf("%d,%x\n", volumeId, n.Key)
  406. orphanFileIds = append(orphanFileIds, fmt.Sprintf("%d,%s00000000", volumeId, n.Key.String()))
  407. orphanFileCount++
  408. orphanDataSize += uint64(n.Size)
  409. return nil
  410. })
  411. if orphanFileCount > 0 {
  412. pct := float64(orphanFileCount*100) / (float64(orphanFileCount + inUseCount))
  413. fmt.Fprintf(writer, "volume:%d\tentries:%d\torphan:%d\t%.2f%%\t%dB\n",
  414. volumeId, orphanFileCount+inUseCount, orphanFileCount, pct, orphanDataSize)
  415. }
  416. return
  417. }
  418. type VInfo struct {
  419. server pb.ServerAddress
  420. collection string
  421. isEcVolume bool
  422. isReadOnly bool
  423. }
  424. func (c *commandVolumeFsck) collectVolumeIds(commandEnv *CommandEnv, verbose bool, writer io.Writer) (volumeIdToServer map[uint32]VInfo, err error) {
  425. if verbose {
  426. fmt.Fprintf(writer, "collecting volume id and locations from master ...\n")
  427. }
  428. volumeIdToServer = make(map[uint32]VInfo)
  429. // collect topology information
  430. topologyInfo, _, err := collectTopologyInfo(commandEnv, 0)
  431. if err != nil {
  432. return
  433. }
  434. eachDataNode(topologyInfo, func(dc string, rack RackId, t *master_pb.DataNodeInfo) {
  435. for _, diskInfo := range t.DiskInfos {
  436. for _, vi := range diskInfo.VolumeInfos {
  437. volumeIdToServer[vi.Id] = VInfo{
  438. server: pb.NewServerAddressFromDataNode(t),
  439. collection: vi.Collection,
  440. isEcVolume: false,
  441. isReadOnly: vi.ReadOnly,
  442. }
  443. }
  444. for _, ecShardInfo := range diskInfo.EcShardInfos {
  445. volumeIdToServer[ecShardInfo.Id] = VInfo{
  446. server: pb.NewServerAddressFromDataNode(t),
  447. collection: ecShardInfo.Collection,
  448. isEcVolume: true,
  449. isReadOnly: true,
  450. }
  451. }
  452. }
  453. })
  454. if verbose {
  455. fmt.Fprintf(writer, "collected %d volumes and locations.\n", len(volumeIdToServer))
  456. }
  457. return
  458. }
  459. func (c *commandVolumeFsck) purgeFileIdsForOneVolume(volumeId uint32, fileIds []string, writer io.Writer) (err error) {
  460. fmt.Fprintf(writer, "purging orphan data for volume %d...\n", volumeId)
  461. locations, found := c.env.MasterClient.GetLocations(volumeId)
  462. if !found {
  463. return fmt.Errorf("failed to find volume %d locations", volumeId)
  464. }
  465. resultChan := make(chan []*volume_server_pb.DeleteResult, len(locations))
  466. var wg sync.WaitGroup
  467. for _, location := range locations {
  468. wg.Add(1)
  469. go func(server pb.ServerAddress, fidList []string) {
  470. defer wg.Done()
  471. if deleteResults, deleteErr := operation.DeleteFilesAtOneVolumeServer(server, c.env.option.GrpcDialOption, fidList, false); deleteErr != nil {
  472. err = deleteErr
  473. } else if deleteResults != nil {
  474. resultChan <- deleteResults
  475. }
  476. }(location.ServerAddress(), fileIds)
  477. }
  478. wg.Wait()
  479. close(resultChan)
  480. for results := range resultChan {
  481. for _, result := range results {
  482. if result.Error != "" {
  483. fmt.Fprintf(writer, "purge error: %s\n", result.Error)
  484. }
  485. }
  486. }
  487. return
  488. }
  489. func getVolumeFileIdFile(tempFolder string, vid uint32) string {
  490. return filepath.Join(tempFolder, fmt.Sprintf("%d.idx", vid))
  491. }
  492. func getFilerFileIdFile(tempFolder string, vid uint32) string {
  493. return filepath.Join(tempFolder, fmt.Sprintf("%d.fid", vid))
  494. }
  495. func writeToFile(client volume_server_pb.VolumeServer_CopyFileClient, fileName string) error {
  496. flags := os.O_WRONLY | os.O_CREATE | os.O_TRUNC
  497. dst, err := os.OpenFile(fileName, flags, 0644)
  498. if err != nil {
  499. return nil
  500. }
  501. defer dst.Close()
  502. for {
  503. resp, receiveErr := client.Recv()
  504. if receiveErr == io.EOF {
  505. break
  506. }
  507. if receiveErr != nil {
  508. return fmt.Errorf("receiving %s: %v", fileName, receiveErr)
  509. }
  510. dst.Write(resp.FileContent)
  511. }
  512. return nil
  513. }