webdav_server.go 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654
  1. package weed_server
  2. import (
  3. "context"
  4. "fmt"
  5. "io"
  6. "os"
  7. "path"
  8. "strings"
  9. "time"
  10. "github.com/seaweedfs/seaweedfs/weed/util/buffered_writer"
  11. "golang.org/x/net/webdav"
  12. "google.golang.org/grpc"
  13. "github.com/seaweedfs/seaweedfs/weed/operation"
  14. "github.com/seaweedfs/seaweedfs/weed/pb"
  15. "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
  16. "github.com/seaweedfs/seaweedfs/weed/util"
  17. "github.com/seaweedfs/seaweedfs/weed/util/chunk_cache"
  18. "github.com/seaweedfs/seaweedfs/weed/filer"
  19. "github.com/seaweedfs/seaweedfs/weed/glog"
  20. "github.com/seaweedfs/seaweedfs/weed/security"
  21. )
  22. type WebDavOption struct {
  23. Filer pb.ServerAddress
  24. FilerRootPath string
  25. DomainName string
  26. BucketsPath string
  27. GrpcDialOption grpc.DialOption
  28. Collection string
  29. Replication string
  30. DiskType string
  31. Uid uint32
  32. Gid uint32
  33. Cipher bool
  34. CacheDir string
  35. CacheSizeMB int64
  36. MaxMB int
  37. }
  38. type WebDavServer struct {
  39. option *WebDavOption
  40. secret security.SigningKey
  41. filer *filer.Filer
  42. grpcDialOption grpc.DialOption
  43. Handler *webdav.Handler
  44. }
  45. func max(x, y int64) int64 {
  46. if x <= y {
  47. return y
  48. }
  49. return x
  50. }
  51. func NewWebDavServer(option *WebDavOption) (ws *WebDavServer, err error) {
  52. fs, _ := NewWebDavFileSystem(option)
  53. // Fix no set filer.path , accessing "/" returns "//"
  54. if option.FilerRootPath == "/" {
  55. option.FilerRootPath = ""
  56. }
  57. // filer.path non "/" option means we are accessing filer's sub-folders
  58. if option.FilerRootPath != "" {
  59. fs = NewWrappedFs(fs, path.Clean(option.FilerRootPath))
  60. }
  61. ws = &WebDavServer{
  62. option: option,
  63. grpcDialOption: security.LoadClientTLS(util.GetViper(), "grpc.filer"),
  64. Handler: &webdav.Handler{
  65. FileSystem: fs,
  66. LockSystem: webdav.NewMemLS(),
  67. },
  68. }
  69. return ws, nil
  70. }
  71. // adapted from https://github.com/mattn/davfs/blob/master/plugin/mysql/mysql.go
  72. type WebDavFileSystem struct {
  73. option *WebDavOption
  74. secret security.SigningKey
  75. grpcDialOption grpc.DialOption
  76. chunkCache *chunk_cache.TieredChunkCache
  77. readerCache *filer.ReaderCache
  78. signature int32
  79. }
  80. type FileInfo struct {
  81. name string
  82. size int64
  83. mode os.FileMode
  84. modifiedTime time.Time
  85. etag string
  86. isDirectory bool
  87. err error
  88. }
  89. func (fi *FileInfo) Name() string { return fi.name }
  90. func (fi *FileInfo) Size() int64 { return fi.size }
  91. func (fi *FileInfo) Mode() os.FileMode { return fi.mode }
  92. func (fi *FileInfo) ModTime() time.Time { return fi.modifiedTime }
  93. func (fi *FileInfo) IsDir() bool { return fi.isDirectory }
  94. func (fi *FileInfo) Sys() interface{} { return nil }
  95. func (fi *FileInfo) ETag(ctx context.Context) (string, error) {
  96. if fi.err != nil {
  97. return "", fi.err
  98. }
  99. return fi.etag, nil
  100. }
  101. type WebDavFile struct {
  102. fs *WebDavFileSystem
  103. name string
  104. isDirectory bool
  105. off int64
  106. entry *filer_pb.Entry
  107. visibleIntervals *filer.IntervalList[*filer.VisibleInterval]
  108. reader io.ReaderAt
  109. bufWriter *buffered_writer.BufferedWriteCloser
  110. }
  111. func NewWebDavFileSystem(option *WebDavOption) (webdav.FileSystem, error) {
  112. cacheUniqueId := util.Md5String([]byte("webdav" + string(option.Filer) + util.Version()))[0:8]
  113. cacheDir := path.Join(option.CacheDir, cacheUniqueId)
  114. os.MkdirAll(cacheDir, os.FileMode(0755))
  115. chunkCache := chunk_cache.NewTieredChunkCache(256, cacheDir, option.CacheSizeMB, 1024*1024)
  116. t := &WebDavFileSystem{
  117. option: option,
  118. chunkCache: chunkCache,
  119. signature: util.RandomInt32(),
  120. }
  121. t.readerCache = filer.NewReaderCache(32, chunkCache, filer.LookupFn(t))
  122. return t, nil
  123. }
  124. var _ = filer_pb.FilerClient(&WebDavFileSystem{})
  125. func (fs *WebDavFileSystem) WithFilerClient(streamingMode bool, fn func(filer_pb.SeaweedFilerClient) error) error {
  126. return pb.WithGrpcClient(streamingMode, fs.signature, func(grpcConnection *grpc.ClientConn) error {
  127. client := filer_pb.NewSeaweedFilerClient(grpcConnection)
  128. return fn(client)
  129. }, fs.option.Filer.ToGrpcAddress(), false, fs.option.GrpcDialOption)
  130. }
  131. func (fs *WebDavFileSystem) AdjustedUrl(location *filer_pb.Location) string {
  132. return location.Url
  133. }
  134. func (fs *WebDavFileSystem) GetDataCenter() string {
  135. return ""
  136. }
  137. func clearName(name string) (string, error) {
  138. slashed := strings.HasSuffix(name, "/")
  139. name = path.Clean(name)
  140. if !strings.HasSuffix(name, "/") && slashed {
  141. name += "/"
  142. }
  143. if !strings.HasPrefix(name, "/") {
  144. return "", os.ErrInvalid
  145. }
  146. return name, nil
  147. }
  148. func (fs *WebDavFileSystem) Mkdir(ctx context.Context, fullDirPath string, perm os.FileMode) error {
  149. glog.V(2).Infof("WebDavFileSystem.Mkdir %v", fullDirPath)
  150. if !strings.HasSuffix(fullDirPath, "/") {
  151. fullDirPath += "/"
  152. }
  153. var err error
  154. if fullDirPath, err = clearName(fullDirPath); err != nil {
  155. return err
  156. }
  157. _, err = fs.stat(ctx, fullDirPath)
  158. if err == nil {
  159. return os.ErrExist
  160. }
  161. return fs.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
  162. dir, name := util.FullPath(fullDirPath).DirAndName()
  163. request := &filer_pb.CreateEntryRequest{
  164. Directory: dir,
  165. Entry: &filer_pb.Entry{
  166. Name: name,
  167. IsDirectory: true,
  168. Attributes: &filer_pb.FuseAttributes{
  169. Mtime: time.Now().Unix(),
  170. Crtime: time.Now().Unix(),
  171. FileMode: uint32(perm | os.ModeDir),
  172. Uid: fs.option.Uid,
  173. Gid: fs.option.Gid,
  174. },
  175. },
  176. Signatures: []int32{fs.signature},
  177. }
  178. glog.V(1).Infof("mkdir: %v", request)
  179. if err := filer_pb.CreateEntry(client, request); err != nil {
  180. return fmt.Errorf("mkdir %s/%s: %v", dir, name, err)
  181. }
  182. return nil
  183. })
  184. }
  185. func (fs *WebDavFileSystem) OpenFile(ctx context.Context, fullFilePath string, flag int, perm os.FileMode) (webdav.File, error) {
  186. glog.V(2).Infof("WebDavFileSystem.OpenFile %v %x", fullFilePath, flag)
  187. var err error
  188. if fullFilePath, err = clearName(fullFilePath); err != nil {
  189. return nil, err
  190. }
  191. if flag&os.O_CREATE != 0 {
  192. // file should not have / suffix.
  193. if strings.HasSuffix(fullFilePath, "/") {
  194. return nil, os.ErrInvalid
  195. }
  196. _, err = fs.stat(ctx, fullFilePath)
  197. if err == nil {
  198. if flag&os.O_EXCL != 0 {
  199. return nil, os.ErrExist
  200. }
  201. fs.removeAll(ctx, fullFilePath)
  202. }
  203. dir, name := util.FullPath(fullFilePath).DirAndName()
  204. err = fs.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
  205. if err := filer_pb.CreateEntry(client, &filer_pb.CreateEntryRequest{
  206. Directory: dir,
  207. Entry: &filer_pb.Entry{
  208. Name: name,
  209. IsDirectory: perm&os.ModeDir > 0,
  210. Attributes: &filer_pb.FuseAttributes{
  211. Mtime: 0,
  212. Crtime: time.Now().Unix(),
  213. FileMode: uint32(perm),
  214. Uid: fs.option.Uid,
  215. Gid: fs.option.Gid,
  216. TtlSec: 0,
  217. },
  218. },
  219. Signatures: []int32{fs.signature},
  220. }); err != nil {
  221. return fmt.Errorf("create %s: %v", fullFilePath, err)
  222. }
  223. return nil
  224. })
  225. if err != nil {
  226. return nil, err
  227. }
  228. return &WebDavFile{
  229. fs: fs,
  230. name: fullFilePath,
  231. isDirectory: false,
  232. bufWriter: buffered_writer.NewBufferedWriteCloser(fs.option.MaxMB * 1024 * 1024),
  233. }, nil
  234. }
  235. fi, err := fs.stat(ctx, fullFilePath)
  236. if err != nil {
  237. if err == os.ErrNotExist {
  238. return nil, err
  239. }
  240. return &WebDavFile{fs: fs}, nil
  241. }
  242. if !strings.HasSuffix(fullFilePath, "/") && fi.IsDir() {
  243. fullFilePath += "/"
  244. }
  245. return &WebDavFile{
  246. fs: fs,
  247. name: fullFilePath,
  248. isDirectory: false,
  249. bufWriter: buffered_writer.NewBufferedWriteCloser(fs.option.MaxMB * 1024 * 1024),
  250. }, nil
  251. }
  252. func (fs *WebDavFileSystem) removeAll(ctx context.Context, fullFilePath string) error {
  253. var err error
  254. if fullFilePath, err = clearName(fullFilePath); err != nil {
  255. return err
  256. }
  257. dir, name := util.FullPath(fullFilePath).DirAndName()
  258. return filer_pb.Remove(fs, dir, name, true, false, false, false, []int32{fs.signature})
  259. }
  260. func (fs *WebDavFileSystem) RemoveAll(ctx context.Context, name string) error {
  261. glog.V(2).Infof("WebDavFileSystem.RemoveAll %v", name)
  262. return fs.removeAll(ctx, name)
  263. }
  264. func (fs *WebDavFileSystem) Rename(ctx context.Context, oldName, newName string) error {
  265. glog.V(2).Infof("WebDavFileSystem.Rename %v to %v", oldName, newName)
  266. var err error
  267. if oldName, err = clearName(oldName); err != nil {
  268. return err
  269. }
  270. if newName, err = clearName(newName); err != nil {
  271. return err
  272. }
  273. of, err := fs.stat(ctx, oldName)
  274. if err != nil {
  275. return os.ErrExist
  276. }
  277. if of.IsDir() {
  278. if strings.HasSuffix(oldName, "/") {
  279. oldName = strings.TrimRight(oldName, "/")
  280. }
  281. if strings.HasSuffix(newName, "/") {
  282. newName = strings.TrimRight(newName, "/")
  283. }
  284. }
  285. _, err = fs.stat(ctx, newName)
  286. if err == nil {
  287. return os.ErrExist
  288. }
  289. oldDir, oldBaseName := util.FullPath(oldName).DirAndName()
  290. newDir, newBaseName := util.FullPath(newName).DirAndName()
  291. return fs.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
  292. request := &filer_pb.AtomicRenameEntryRequest{
  293. OldDirectory: oldDir,
  294. OldName: oldBaseName,
  295. NewDirectory: newDir,
  296. NewName: newBaseName,
  297. }
  298. _, err := client.AtomicRenameEntry(ctx, request)
  299. if err != nil {
  300. return fmt.Errorf("renaming %s/%s => %s/%s: %v", oldDir, oldBaseName, newDir, newBaseName, err)
  301. }
  302. return nil
  303. })
  304. }
  305. func (fs *WebDavFileSystem) stat(ctx context.Context, fullFilePath string) (os.FileInfo, error) {
  306. var err error
  307. if fullFilePath, err = clearName(fullFilePath); err != nil {
  308. return nil, err
  309. }
  310. fullpath := util.FullPath(fullFilePath)
  311. var fi FileInfo
  312. entry, err := filer_pb.GetEntry(fs, fullpath)
  313. if err != nil {
  314. if err == filer_pb.ErrNotFound {
  315. return nil, os.ErrNotExist
  316. }
  317. fi.err = err
  318. return &fi, nil
  319. }
  320. if entry == nil {
  321. return nil, os.ErrNotExist
  322. }
  323. fi.size = int64(filer.FileSize(entry))
  324. fi.name = string(fullpath)
  325. fi.mode = os.FileMode(entry.Attributes.FileMode)
  326. fi.modifiedTime = time.Unix(entry.Attributes.Mtime, 0)
  327. fi.etag = filer.ETag(entry)
  328. fi.isDirectory = entry.IsDirectory
  329. if fi.name == "/" {
  330. fi.modifiedTime = time.Now()
  331. fi.isDirectory = true
  332. }
  333. return &fi, nil
  334. }
  335. func (fs *WebDavFileSystem) Stat(ctx context.Context, name string) (os.FileInfo, error) {
  336. glog.V(2).Infof("WebDavFileSystem.Stat %v", name)
  337. return fs.stat(ctx, name)
  338. }
  339. func (f *WebDavFile) saveDataAsChunk(reader io.Reader, name string, offset int64, tsNs int64) (chunk *filer_pb.FileChunk, err error) {
  340. uploader, uploaderErr := operation.NewUploader()
  341. if uploaderErr != nil {
  342. glog.V(0).Infof("upload data %v: %v", f.name, uploaderErr)
  343. return nil, fmt.Errorf("upload data: %v", uploaderErr)
  344. }
  345. fileId, uploadResult, flushErr, _ := uploader.UploadWithRetry(
  346. f.fs,
  347. &filer_pb.AssignVolumeRequest{
  348. Count: 1,
  349. Replication: f.fs.option.Replication,
  350. Collection: f.fs.option.Collection,
  351. DiskType: f.fs.option.DiskType,
  352. Path: name,
  353. },
  354. &operation.UploadOption{
  355. Filename: f.name,
  356. Cipher: f.fs.option.Cipher,
  357. IsInputCompressed: false,
  358. MimeType: "",
  359. PairMap: nil,
  360. },
  361. func(host, fileId string) string {
  362. return fmt.Sprintf("http://%s/%s", host, fileId)
  363. },
  364. reader,
  365. )
  366. if flushErr != nil {
  367. glog.V(0).Infof("upload data %v: %v", f.name, flushErr)
  368. return nil, fmt.Errorf("upload data: %v", flushErr)
  369. }
  370. if uploadResult.Error != "" {
  371. glog.V(0).Infof("upload failure %v: %v", f.name, flushErr)
  372. return nil, fmt.Errorf("upload result: %v", uploadResult.Error)
  373. }
  374. return uploadResult.ToPbFileChunk(fileId, offset, tsNs), nil
  375. }
  376. func (f *WebDavFile) Write(buf []byte) (int, error) {
  377. glog.V(2).Infof("WebDavFileSystem.Write %v", f.name)
  378. fullPath := util.FullPath(f.name)
  379. dir, _ := fullPath.DirAndName()
  380. var getErr error
  381. ctx := context.Background()
  382. if f.entry == nil {
  383. f.entry, getErr = filer_pb.GetEntry(f.fs, fullPath)
  384. }
  385. if f.entry == nil {
  386. return 0, getErr
  387. }
  388. if getErr != nil {
  389. return 0, getErr
  390. }
  391. if f.bufWriter.FlushFunc == nil {
  392. f.bufWriter.FlushFunc = func(data []byte, offset int64) (flushErr error) {
  393. var chunk *filer_pb.FileChunk
  394. chunk, flushErr = f.saveDataAsChunk(util.NewBytesReader(data), f.name, offset, time.Now().UnixNano())
  395. if flushErr != nil {
  396. if f.entry.Attributes.Mtime == 0 {
  397. if err := f.fs.removeAll(ctx, f.name); err != nil {
  398. glog.Errorf("bufWriter.Flush remove file error: %+v", f.name)
  399. }
  400. }
  401. return fmt.Errorf("%s upload result: %v", f.name, flushErr)
  402. }
  403. f.entry.Content = nil
  404. f.entry.Chunks = append(f.entry.GetChunks(), chunk)
  405. return flushErr
  406. }
  407. f.bufWriter.CloseFunc = func() error {
  408. manifestedChunks, manifestErr := filer.MaybeManifestize(f.saveDataAsChunk, f.entry.GetChunks())
  409. if manifestErr != nil {
  410. // not good, but should be ok
  411. glog.V(0).Infof("file %s close MaybeManifestize: %v", f.name, manifestErr)
  412. } else {
  413. f.entry.Chunks = manifestedChunks
  414. }
  415. flushErr := f.fs.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
  416. f.entry.Attributes.Mtime = time.Now().Unix()
  417. request := &filer_pb.UpdateEntryRequest{
  418. Directory: dir,
  419. Entry: f.entry,
  420. Signatures: []int32{f.fs.signature},
  421. }
  422. if _, err := client.UpdateEntry(ctx, request); err != nil {
  423. return fmt.Errorf("update %s: %v", f.name, err)
  424. }
  425. return nil
  426. })
  427. return flushErr
  428. }
  429. }
  430. written, err := f.bufWriter.Write(buf)
  431. if err == nil {
  432. f.entry.Attributes.FileSize = uint64(max(f.off+int64(written), int64(f.entry.Attributes.FileSize)))
  433. glog.V(3).Infof("WebDavFileSystem.Write %v: written [%d,%d)", f.name, f.off, f.off+int64(len(buf)))
  434. f.off += int64(written)
  435. }
  436. return written, err
  437. }
  438. func (f *WebDavFile) Close() error {
  439. glog.V(2).Infof("WebDavFileSystem.Close %v", f.name)
  440. if f.bufWriter == nil {
  441. return nil
  442. }
  443. err := f.bufWriter.Close()
  444. if f.entry != nil {
  445. f.entry = nil
  446. f.visibleIntervals = nil
  447. }
  448. return err
  449. }
  450. func (f *WebDavFile) Read(p []byte) (readSize int, err error) {
  451. glog.V(2).Infof("WebDavFileSystem.Read %v", f.name)
  452. if f.entry == nil {
  453. f.entry, err = filer_pb.GetEntry(f.fs, util.FullPath(f.name))
  454. }
  455. if f.entry == nil {
  456. return 0, err
  457. }
  458. if err != nil {
  459. return 0, err
  460. }
  461. fileSize := int64(filer.FileSize(f.entry))
  462. if fileSize == 0 {
  463. return 0, io.EOF
  464. }
  465. if f.visibleIntervals == nil {
  466. f.visibleIntervals, _ = filer.NonOverlappingVisibleIntervals(filer.LookupFn(f.fs), f.entry.GetChunks(), 0, fileSize)
  467. f.reader = nil
  468. }
  469. if f.reader == nil {
  470. chunkViews := filer.ViewFromVisibleIntervals(f.visibleIntervals, 0, fileSize)
  471. f.reader = filer.NewChunkReaderAtFromClient(f.fs.readerCache, chunkViews, fileSize)
  472. }
  473. readSize, err = f.reader.ReadAt(p, f.off)
  474. glog.V(3).Infof("WebDavFileSystem.Read %v: [%d,%d)", f.name, f.off, f.off+int64(readSize))
  475. f.off += int64(readSize)
  476. if err != nil && err != io.EOF {
  477. glog.Errorf("file read %s: %v", f.name, err)
  478. }
  479. return
  480. }
  481. func (f *WebDavFile) Readdir(count int) (ret []os.FileInfo, err error) {
  482. glog.V(2).Infof("WebDavFileSystem.Readdir %v count %d", f.name, count)
  483. dir, _ := util.FullPath(f.name).DirAndName()
  484. err = filer_pb.ReadDirAllEntries(f.fs, util.FullPath(dir), "", func(entry *filer_pb.Entry, isLast bool) error {
  485. fi := FileInfo{
  486. size: int64(filer.FileSize(entry)),
  487. name: entry.Name,
  488. mode: os.FileMode(entry.Attributes.FileMode),
  489. modifiedTime: time.Unix(entry.Attributes.Mtime, 0),
  490. isDirectory: entry.IsDirectory,
  491. }
  492. if !strings.HasSuffix(fi.name, "/") && fi.IsDir() {
  493. fi.name += "/"
  494. }
  495. glog.V(4).Infof("entry: %v", fi.name)
  496. ret = append(ret, &fi)
  497. return nil
  498. })
  499. if err != nil {
  500. return nil, err
  501. }
  502. old := f.off
  503. if old >= int64(len(ret)) {
  504. if count > 0 {
  505. return nil, io.EOF
  506. }
  507. return nil, nil
  508. }
  509. if count > 0 {
  510. f.off += int64(count)
  511. if f.off > int64(len(ret)) {
  512. f.off = int64(len(ret))
  513. }
  514. } else {
  515. f.off = int64(len(ret))
  516. old = 0
  517. }
  518. return ret[old:f.off], nil
  519. }
  520. func (f *WebDavFile) Seek(offset int64, whence int) (int64, error) {
  521. glog.V(2).Infof("WebDavFile.Seek %v %v %v", f.name, offset, whence)
  522. ctx := context.Background()
  523. var err error
  524. switch whence {
  525. case io.SeekStart:
  526. f.off = 0
  527. case io.SeekEnd:
  528. if fi, err := f.fs.stat(ctx, f.name); err != nil {
  529. return 0, err
  530. } else {
  531. f.off = fi.Size()
  532. }
  533. }
  534. f.off += offset
  535. return f.off, err
  536. }
  537. func (f *WebDavFile) Stat() (os.FileInfo, error) {
  538. glog.V(2).Infof("WebDavFile.Stat %v", f.name)
  539. ctx := context.Background()
  540. return f.fs.stat(ctx, f.name)
  541. }