webdav_server.go 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641
  1. package weed_server
  2. import (
  3. "context"
  4. "fmt"
  5. "io"
  6. "os"
  7. "path"
  8. "strings"
  9. "time"
  10. "github.com/seaweedfs/seaweedfs/weed/util/buffered_writer"
  11. "golang.org/x/net/webdav"
  12. "google.golang.org/grpc"
  13. "github.com/seaweedfs/seaweedfs/weed/operation"
  14. "github.com/seaweedfs/seaweedfs/weed/pb"
  15. "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
  16. "github.com/seaweedfs/seaweedfs/weed/util"
  17. "github.com/seaweedfs/seaweedfs/weed/util/chunk_cache"
  18. "github.com/seaweedfs/seaweedfs/weed/filer"
  19. "github.com/seaweedfs/seaweedfs/weed/glog"
  20. "github.com/seaweedfs/seaweedfs/weed/security"
  21. )
  22. type WebDavOption struct {
  23. Filer pb.ServerAddress
  24. FilerRootPath string
  25. DomainName string
  26. BucketsPath string
  27. GrpcDialOption grpc.DialOption
  28. Collection string
  29. Replication string
  30. DiskType string
  31. Uid uint32
  32. Gid uint32
  33. Cipher bool
  34. CacheDir string
  35. CacheSizeMB int64
  36. MaxMB int
  37. }
  38. type WebDavServer struct {
  39. option *WebDavOption
  40. secret security.SigningKey
  41. filer *filer.Filer
  42. grpcDialOption grpc.DialOption
  43. Handler *webdav.Handler
  44. }
  45. func max(x, y int64) int64 {
  46. if x <= y {
  47. return y
  48. }
  49. return x
  50. }
  51. func NewWebDavServer(option *WebDavOption) (ws *WebDavServer, err error) {
  52. fs, _ := NewWebDavFileSystem(option)
  53. // Fix no set filer.path , accessing "/" returns "//"
  54. if option.FilerRootPath == "/" {
  55. option.FilerRootPath = ""
  56. }
  57. // filer.path non "/" option means we are accessing filer's sub-folders
  58. if option.FilerRootPath != "" {
  59. fs = NewWrappedFs(fs, path.Clean(option.FilerRootPath))
  60. }
  61. ws = &WebDavServer{
  62. option: option,
  63. grpcDialOption: security.LoadClientTLS(util.GetViper(), "grpc.filer"),
  64. Handler: &webdav.Handler{
  65. FileSystem: fs,
  66. LockSystem: webdav.NewMemLS(),
  67. },
  68. }
  69. return ws, nil
  70. }
  71. // adapted from https://github.com/mattn/davfs/blob/master/plugin/mysql/mysql.go
  72. type WebDavFileSystem struct {
  73. option *WebDavOption
  74. secret security.SigningKey
  75. grpcDialOption grpc.DialOption
  76. chunkCache *chunk_cache.TieredChunkCache
  77. readerCache *filer.ReaderCache
  78. signature int32
  79. }
  80. type FileInfo struct {
  81. name string
  82. size int64
  83. mode os.FileMode
  84. modifiedTime time.Time
  85. etag string
  86. isDirectory bool
  87. }
  88. func (fi *FileInfo) Name() string { return fi.name }
  89. func (fi *FileInfo) Size() int64 { return fi.size }
  90. func (fi *FileInfo) Mode() os.FileMode { return fi.mode }
  91. func (fi *FileInfo) ModTime() time.Time { return fi.modifiedTime }
  92. func (fi *FileInfo) IsDir() bool { return fi.isDirectory }
  93. func (fi *FileInfo) Sys() interface{} { return nil }
  94. func (fi *FileInfo) ETag(ctx context.Context) (string, error) {
  95. return fi.etag, nil
  96. }
  97. type WebDavFile struct {
  98. fs *WebDavFileSystem
  99. name string
  100. isDirectory bool
  101. off int64
  102. entry *filer_pb.Entry
  103. visibleIntervals *filer.IntervalList[*filer.VisibleInterval]
  104. reader io.ReaderAt
  105. bufWriter *buffered_writer.BufferedWriteCloser
  106. }
  107. func NewWebDavFileSystem(option *WebDavOption) (webdav.FileSystem, error) {
  108. cacheUniqueId := util.Md5String([]byte("webdav" + string(option.Filer) + util.Version()))[0:8]
  109. cacheDir := path.Join(option.CacheDir, cacheUniqueId)
  110. os.MkdirAll(cacheDir, os.FileMode(0755))
  111. chunkCache := chunk_cache.NewTieredChunkCache(256, cacheDir, option.CacheSizeMB, 1024*1024)
  112. t := &WebDavFileSystem{
  113. option: option,
  114. chunkCache: chunkCache,
  115. signature: util.RandomInt32(),
  116. }
  117. t.readerCache = filer.NewReaderCache(32, chunkCache, filer.LookupFn(t))
  118. return t, nil
  119. }
  120. var _ = filer_pb.FilerClient(&WebDavFileSystem{})
  121. func (fs *WebDavFileSystem) WithFilerClient(streamingMode bool, fn func(filer_pb.SeaweedFilerClient) error) error {
  122. return pb.WithGrpcClient(streamingMode, fs.signature, func(grpcConnection *grpc.ClientConn) error {
  123. client := filer_pb.NewSeaweedFilerClient(grpcConnection)
  124. return fn(client)
  125. }, fs.option.Filer.ToGrpcAddress(), false, fs.option.GrpcDialOption)
  126. }
  127. func (fs *WebDavFileSystem) AdjustedUrl(location *filer_pb.Location) string {
  128. return location.Url
  129. }
  130. func (fs *WebDavFileSystem) GetDataCenter() string {
  131. return ""
  132. }
  133. func clearName(name string) (string, error) {
  134. slashed := strings.HasSuffix(name, "/")
  135. name = path.Clean(name)
  136. if !strings.HasSuffix(name, "/") && slashed {
  137. name += "/"
  138. }
  139. if !strings.HasPrefix(name, "/") {
  140. return "", os.ErrInvalid
  141. }
  142. return name, nil
  143. }
  144. func (fs *WebDavFileSystem) Mkdir(ctx context.Context, fullDirPath string, perm os.FileMode) error {
  145. glog.V(2).Infof("WebDavFileSystem.Mkdir %v", fullDirPath)
  146. if !strings.HasSuffix(fullDirPath, "/") {
  147. fullDirPath += "/"
  148. }
  149. var err error
  150. if fullDirPath, err = clearName(fullDirPath); err != nil {
  151. return err
  152. }
  153. _, err = fs.stat(ctx, fullDirPath)
  154. if err == nil {
  155. return os.ErrExist
  156. }
  157. return fs.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
  158. dir, name := util.FullPath(fullDirPath).DirAndName()
  159. request := &filer_pb.CreateEntryRequest{
  160. Directory: dir,
  161. Entry: &filer_pb.Entry{
  162. Name: name,
  163. IsDirectory: true,
  164. Attributes: &filer_pb.FuseAttributes{
  165. Mtime: time.Now().Unix(),
  166. Crtime: time.Now().Unix(),
  167. FileMode: uint32(perm | os.ModeDir),
  168. Uid: fs.option.Uid,
  169. Gid: fs.option.Gid,
  170. },
  171. },
  172. Signatures: []int32{fs.signature},
  173. }
  174. glog.V(1).Infof("mkdir: %v", request)
  175. if err := filer_pb.CreateEntry(client, request); err != nil {
  176. return fmt.Errorf("mkdir %s/%s: %v", dir, name, err)
  177. }
  178. return nil
  179. })
  180. }
  181. func (fs *WebDavFileSystem) OpenFile(ctx context.Context, fullFilePath string, flag int, perm os.FileMode) (webdav.File, error) {
  182. glog.V(2).Infof("WebDavFileSystem.OpenFile %v %x", fullFilePath, flag)
  183. var err error
  184. if fullFilePath, err = clearName(fullFilePath); err != nil {
  185. return nil, err
  186. }
  187. if flag&os.O_CREATE != 0 {
  188. // file should not have / suffix.
  189. if strings.HasSuffix(fullFilePath, "/") {
  190. return nil, os.ErrInvalid
  191. }
  192. _, err = fs.stat(ctx, fullFilePath)
  193. if err == nil {
  194. if flag&os.O_EXCL != 0 {
  195. return nil, os.ErrExist
  196. }
  197. fs.removeAll(ctx, fullFilePath)
  198. }
  199. dir, name := util.FullPath(fullFilePath).DirAndName()
  200. err = fs.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
  201. if err := filer_pb.CreateEntry(client, &filer_pb.CreateEntryRequest{
  202. Directory: dir,
  203. Entry: &filer_pb.Entry{
  204. Name: name,
  205. IsDirectory: perm&os.ModeDir > 0,
  206. Attributes: &filer_pb.FuseAttributes{
  207. Mtime: 0,
  208. Crtime: time.Now().Unix(),
  209. FileMode: uint32(perm),
  210. Uid: fs.option.Uid,
  211. Gid: fs.option.Gid,
  212. TtlSec: 0,
  213. },
  214. },
  215. Signatures: []int32{fs.signature},
  216. }); err != nil {
  217. return fmt.Errorf("create %s: %v", fullFilePath, err)
  218. }
  219. return nil
  220. })
  221. if err != nil {
  222. return nil, err
  223. }
  224. return &WebDavFile{
  225. fs: fs,
  226. name: fullFilePath,
  227. isDirectory: false,
  228. bufWriter: buffered_writer.NewBufferedWriteCloser(fs.option.MaxMB * 1024 * 1024),
  229. }, nil
  230. }
  231. fi, err := fs.stat(ctx, fullFilePath)
  232. if err != nil {
  233. return nil, os.ErrNotExist
  234. }
  235. if !strings.HasSuffix(fullFilePath, "/") && fi.IsDir() {
  236. fullFilePath += "/"
  237. }
  238. return &WebDavFile{
  239. fs: fs,
  240. name: fullFilePath,
  241. isDirectory: false,
  242. bufWriter: buffered_writer.NewBufferedWriteCloser(fs.option.MaxMB * 1024 * 1024),
  243. }, nil
  244. }
  245. func (fs *WebDavFileSystem) removeAll(ctx context.Context, fullFilePath string) error {
  246. var err error
  247. if fullFilePath, err = clearName(fullFilePath); err != nil {
  248. return err
  249. }
  250. dir, name := util.FullPath(fullFilePath).DirAndName()
  251. return filer_pb.Remove(fs, dir, name, true, false, false, false, []int32{fs.signature})
  252. }
  253. func (fs *WebDavFileSystem) RemoveAll(ctx context.Context, name string) error {
  254. glog.V(2).Infof("WebDavFileSystem.RemoveAll %v", name)
  255. return fs.removeAll(ctx, name)
  256. }
  257. func (fs *WebDavFileSystem) Rename(ctx context.Context, oldName, newName string) error {
  258. glog.V(2).Infof("WebDavFileSystem.Rename %v to %v", oldName, newName)
  259. var err error
  260. if oldName, err = clearName(oldName); err != nil {
  261. return err
  262. }
  263. if newName, err = clearName(newName); err != nil {
  264. return err
  265. }
  266. of, err := fs.stat(ctx, oldName)
  267. if err != nil {
  268. return os.ErrExist
  269. }
  270. if of.IsDir() {
  271. if strings.HasSuffix(oldName, "/") {
  272. oldName = strings.TrimRight(oldName, "/")
  273. }
  274. if strings.HasSuffix(newName, "/") {
  275. newName = strings.TrimRight(newName, "/")
  276. }
  277. }
  278. _, err = fs.stat(ctx, newName)
  279. if err == nil {
  280. return os.ErrExist
  281. }
  282. oldDir, oldBaseName := util.FullPath(oldName).DirAndName()
  283. newDir, newBaseName := util.FullPath(newName).DirAndName()
  284. return fs.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
  285. request := &filer_pb.AtomicRenameEntryRequest{
  286. OldDirectory: oldDir,
  287. OldName: oldBaseName,
  288. NewDirectory: newDir,
  289. NewName: newBaseName,
  290. }
  291. _, err := client.AtomicRenameEntry(ctx, request)
  292. if err != nil {
  293. return fmt.Errorf("renaming %s/%s => %s/%s: %v", oldDir, oldBaseName, newDir, newBaseName, err)
  294. }
  295. return nil
  296. })
  297. }
  298. func (fs *WebDavFileSystem) stat(ctx context.Context, fullFilePath string) (os.FileInfo, error) {
  299. var err error
  300. if fullFilePath, err = clearName(fullFilePath); err != nil {
  301. return nil, err
  302. }
  303. fullpath := util.FullPath(fullFilePath)
  304. var fi FileInfo
  305. entry, err := filer_pb.GetEntry(fs, fullpath)
  306. if entry == nil {
  307. return nil, os.ErrNotExist
  308. }
  309. if err != nil {
  310. return nil, err
  311. }
  312. fi.size = int64(filer.FileSize(entry))
  313. fi.name = string(fullpath)
  314. fi.mode = os.FileMode(entry.Attributes.FileMode)
  315. fi.modifiedTime = time.Unix(entry.Attributes.Mtime, 0)
  316. fi.etag = filer.ETag(entry)
  317. fi.isDirectory = entry.IsDirectory
  318. if fi.name == "/" {
  319. fi.modifiedTime = time.Now()
  320. fi.isDirectory = true
  321. }
  322. return &fi, nil
  323. }
  324. func (fs *WebDavFileSystem) Stat(ctx context.Context, name string) (os.FileInfo, error) {
  325. glog.V(2).Infof("WebDavFileSystem.Stat %v", name)
  326. return fs.stat(ctx, name)
  327. }
  328. func (f *WebDavFile) saveDataAsChunk(reader io.Reader, name string, offset int64, tsNs int64) (chunk *filer_pb.FileChunk, err error) {
  329. uploader, uploaderErr := operation.NewUploader()
  330. if uploaderErr != nil {
  331. glog.V(0).Infof("upload data %v: %v", f.name, uploaderErr)
  332. return nil, fmt.Errorf("upload data: %v", uploaderErr)
  333. }
  334. fileId, uploadResult, flushErr, _ := uploader.UploadWithRetry(
  335. f.fs,
  336. &filer_pb.AssignVolumeRequest{
  337. Count: 1,
  338. Replication: f.fs.option.Replication,
  339. Collection: f.fs.option.Collection,
  340. DiskType: f.fs.option.DiskType,
  341. Path: name,
  342. },
  343. &operation.UploadOption{
  344. Filename: f.name,
  345. Cipher: f.fs.option.Cipher,
  346. IsInputCompressed: false,
  347. MimeType: "",
  348. PairMap: nil,
  349. },
  350. func(host, fileId string) string {
  351. return fmt.Sprintf("http://%s/%s", host, fileId)
  352. },
  353. reader,
  354. )
  355. if flushErr != nil {
  356. glog.V(0).Infof("upload data %v: %v", f.name, flushErr)
  357. return nil, fmt.Errorf("upload data: %v", flushErr)
  358. }
  359. if uploadResult.Error != "" {
  360. glog.V(0).Infof("upload failure %v: %v", f.name, flushErr)
  361. return nil, fmt.Errorf("upload result: %v", uploadResult.Error)
  362. }
  363. return uploadResult.ToPbFileChunk(fileId, offset, tsNs), nil
  364. }
  365. func (f *WebDavFile) Write(buf []byte) (int, error) {
  366. glog.V(2).Infof("WebDavFileSystem.Write %v", f.name)
  367. fullPath := util.FullPath(f.name)
  368. dir, _ := fullPath.DirAndName()
  369. var getErr error
  370. ctx := context.Background()
  371. if f.entry == nil {
  372. f.entry, getErr = filer_pb.GetEntry(f.fs, fullPath)
  373. }
  374. if f.entry == nil {
  375. return 0, getErr
  376. }
  377. if getErr != nil {
  378. return 0, getErr
  379. }
  380. if f.bufWriter.FlushFunc == nil {
  381. f.bufWriter.FlushFunc = func(data []byte, offset int64) (flushErr error) {
  382. var chunk *filer_pb.FileChunk
  383. chunk, flushErr = f.saveDataAsChunk(util.NewBytesReader(data), f.name, offset, time.Now().UnixNano())
  384. if flushErr != nil {
  385. if f.entry.Attributes.Mtime == 0 {
  386. if err := f.fs.removeAll(ctx, f.name); err != nil {
  387. glog.Errorf("bufWriter.Flush remove file error: %+v", f.name)
  388. }
  389. }
  390. return fmt.Errorf("%s upload result: %v", f.name, flushErr)
  391. }
  392. f.entry.Content = nil
  393. f.entry.Chunks = append(f.entry.GetChunks(), chunk)
  394. return flushErr
  395. }
  396. f.bufWriter.CloseFunc = func() error {
  397. manifestedChunks, manifestErr := filer.MaybeManifestize(f.saveDataAsChunk, f.entry.GetChunks())
  398. if manifestErr != nil {
  399. // not good, but should be ok
  400. glog.V(0).Infof("file %s close MaybeManifestize: %v", f.name, manifestErr)
  401. } else {
  402. f.entry.Chunks = manifestedChunks
  403. }
  404. flushErr := f.fs.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
  405. f.entry.Attributes.Mtime = time.Now().Unix()
  406. request := &filer_pb.UpdateEntryRequest{
  407. Directory: dir,
  408. Entry: f.entry,
  409. Signatures: []int32{f.fs.signature},
  410. }
  411. if _, err := client.UpdateEntry(ctx, request); err != nil {
  412. return fmt.Errorf("update %s: %v", f.name, err)
  413. }
  414. return nil
  415. })
  416. return flushErr
  417. }
  418. }
  419. written, err := f.bufWriter.Write(buf)
  420. if err == nil {
  421. f.entry.Attributes.FileSize = uint64(max(f.off+int64(written), int64(f.entry.Attributes.FileSize)))
  422. glog.V(3).Infof("WebDavFileSystem.Write %v: written [%d,%d)", f.name, f.off, f.off+int64(len(buf)))
  423. f.off += int64(written)
  424. }
  425. return written, err
  426. }
  427. func (f *WebDavFile) Close() error {
  428. glog.V(2).Infof("WebDavFileSystem.Close %v", f.name)
  429. err := f.bufWriter.Close()
  430. if f.entry != nil {
  431. f.entry = nil
  432. f.visibleIntervals = nil
  433. }
  434. return err
  435. }
  436. func (f *WebDavFile) Read(p []byte) (readSize int, err error) {
  437. glog.V(2).Infof("WebDavFileSystem.Read %v", f.name)
  438. if f.entry == nil {
  439. f.entry, err = filer_pb.GetEntry(f.fs, util.FullPath(f.name))
  440. }
  441. if f.entry == nil {
  442. return 0, err
  443. }
  444. if err != nil {
  445. return 0, err
  446. }
  447. fileSize := int64(filer.FileSize(f.entry))
  448. if fileSize == 0 {
  449. return 0, io.EOF
  450. }
  451. if f.visibleIntervals == nil {
  452. f.visibleIntervals, _ = filer.NonOverlappingVisibleIntervals(filer.LookupFn(f.fs), f.entry.GetChunks(), 0, fileSize)
  453. f.reader = nil
  454. }
  455. if f.reader == nil {
  456. chunkViews := filer.ViewFromVisibleIntervals(f.visibleIntervals, 0, fileSize)
  457. f.reader = filer.NewChunkReaderAtFromClient(f.fs.readerCache, chunkViews, fileSize)
  458. }
  459. readSize, err = f.reader.ReadAt(p, f.off)
  460. glog.V(3).Infof("WebDavFileSystem.Read %v: [%d,%d)", f.name, f.off, f.off+int64(readSize))
  461. f.off += int64(readSize)
  462. if err != nil && err != io.EOF {
  463. glog.Errorf("file read %s: %v", f.name, err)
  464. }
  465. return
  466. }
  467. func (f *WebDavFile) Readdir(count int) (ret []os.FileInfo, err error) {
  468. glog.V(2).Infof("WebDavFileSystem.Readdir %v count %d", f.name, count)
  469. dir, _ := util.FullPath(f.name).DirAndName()
  470. err = filer_pb.ReadDirAllEntries(f.fs, util.FullPath(dir), "", func(entry *filer_pb.Entry, isLast bool) error {
  471. fi := FileInfo{
  472. size: int64(filer.FileSize(entry)),
  473. name: entry.Name,
  474. mode: os.FileMode(entry.Attributes.FileMode),
  475. modifiedTime: time.Unix(entry.Attributes.Mtime, 0),
  476. isDirectory: entry.IsDirectory,
  477. }
  478. if !strings.HasSuffix(fi.name, "/") && fi.IsDir() {
  479. fi.name += "/"
  480. }
  481. glog.V(4).Infof("entry: %v", fi.name)
  482. ret = append(ret, &fi)
  483. return nil
  484. })
  485. if err != nil {
  486. return nil, err
  487. }
  488. old := f.off
  489. if old >= int64(len(ret)) {
  490. if count > 0 {
  491. return nil, io.EOF
  492. }
  493. return nil, nil
  494. }
  495. if count > 0 {
  496. f.off += int64(count)
  497. if f.off > int64(len(ret)) {
  498. f.off = int64(len(ret))
  499. }
  500. } else {
  501. f.off = int64(len(ret))
  502. old = 0
  503. }
  504. return ret[old:f.off], nil
  505. }
  506. func (f *WebDavFile) Seek(offset int64, whence int) (int64, error) {
  507. glog.V(2).Infof("WebDavFile.Seek %v %v %v", f.name, offset, whence)
  508. ctx := context.Background()
  509. var err error
  510. switch whence {
  511. case io.SeekStart:
  512. f.off = 0
  513. case io.SeekEnd:
  514. if fi, err := f.fs.stat(ctx, f.name); err != nil {
  515. return 0, err
  516. } else {
  517. f.off = fi.Size()
  518. }
  519. }
  520. f.off += offset
  521. return f.off, err
  522. }
  523. func (f *WebDavFile) Stat() (os.FileInfo, error) {
  524. glog.V(2).Infof("WebDavFile.Stat %v", f.name)
  525. ctx := context.Background()
  526. return f.fs.stat(ctx, f.name)
  527. }