webdav_server.go 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623
  1. package weed_server
  2. import (
  3. "context"
  4. "fmt"
  5. "io"
  6. "os"
  7. "path"
  8. "strings"
  9. "time"
  10. "github.com/seaweedfs/seaweedfs/weed/util/buffered_writer"
  11. "golang.org/x/net/webdav"
  12. "google.golang.org/grpc"
  13. "github.com/seaweedfs/seaweedfs/weed/operation"
  14. "github.com/seaweedfs/seaweedfs/weed/pb"
  15. "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
  16. "github.com/seaweedfs/seaweedfs/weed/util"
  17. "github.com/seaweedfs/seaweedfs/weed/util/chunk_cache"
  18. "github.com/seaweedfs/seaweedfs/weed/filer"
  19. "github.com/seaweedfs/seaweedfs/weed/glog"
  20. "github.com/seaweedfs/seaweedfs/weed/security"
  21. )
  22. type WebDavOption struct {
  23. Filer pb.ServerAddress
  24. FilerRootPath string
  25. DomainName string
  26. BucketsPath string
  27. GrpcDialOption grpc.DialOption
  28. Collection string
  29. Replication string
  30. DiskType string
  31. Uid uint32
  32. Gid uint32
  33. Cipher bool
  34. CacheDir string
  35. CacheSizeMB int64
  36. }
  37. type WebDavServer struct {
  38. option *WebDavOption
  39. secret security.SigningKey
  40. filer *filer.Filer
  41. grpcDialOption grpc.DialOption
  42. Handler *webdav.Handler
  43. }
  44. func max(x, y int64) int64 {
  45. if x <= y {
  46. return y
  47. }
  48. return x
  49. }
  50. func NewWebDavServer(option *WebDavOption) (ws *WebDavServer, err error) {
  51. fs, _ := NewWebDavFileSystem(option)
  52. // Fix no set filer.path , accessing "/" returns "//"
  53. if option.FilerRootPath == "/" {
  54. option.FilerRootPath = ""
  55. }
  56. // filer.path non "/" option means we are accessing filer's sub-folders
  57. if option.FilerRootPath != "" {
  58. fs = NewWrappedFs(fs, path.Clean(option.FilerRootPath))
  59. }
  60. ws = &WebDavServer{
  61. option: option,
  62. grpcDialOption: security.LoadClientTLS(util.GetViper(), "grpc.filer"),
  63. Handler: &webdav.Handler{
  64. FileSystem: fs,
  65. LockSystem: webdav.NewMemLS(),
  66. },
  67. }
  68. return ws, nil
  69. }
  70. // adapted from https://github.com/mattn/davfs/blob/master/plugin/mysql/mysql.go
  71. type WebDavFileSystem struct {
  72. option *WebDavOption
  73. secret security.SigningKey
  74. grpcDialOption grpc.DialOption
  75. chunkCache *chunk_cache.TieredChunkCache
  76. readerCache *filer.ReaderCache
  77. signature int32
  78. }
  79. type FileInfo struct {
  80. name string
  81. size int64
  82. mode os.FileMode
  83. modifiedTime time.Time
  84. isDirectory bool
  85. }
  86. func (fi *FileInfo) Name() string { return fi.name }
  87. func (fi *FileInfo) Size() int64 { return fi.size }
  88. func (fi *FileInfo) Mode() os.FileMode { return fi.mode }
  89. func (fi *FileInfo) ModTime() time.Time { return fi.modifiedTime }
  90. func (fi *FileInfo) IsDir() bool { return fi.isDirectory }
  91. func (fi *FileInfo) Sys() interface{} { return nil }
  92. type WebDavFile struct {
  93. fs *WebDavFileSystem
  94. name string
  95. isDirectory bool
  96. off int64
  97. entry *filer_pb.Entry
  98. visibleIntervals *filer.IntervalList[*filer.VisibleInterval]
  99. reader io.ReaderAt
  100. bufWriter *buffered_writer.BufferedWriteCloser
  101. }
  102. func NewWebDavFileSystem(option *WebDavOption) (webdav.FileSystem, error) {
  103. cacheUniqueId := util.Md5String([]byte("webdav" + string(option.Filer) + util.Version()))[0:8]
  104. cacheDir := path.Join(option.CacheDir, cacheUniqueId)
  105. os.MkdirAll(cacheDir, os.FileMode(0755))
  106. chunkCache := chunk_cache.NewTieredChunkCache(256, cacheDir, option.CacheSizeMB, 1024*1024)
  107. t := &WebDavFileSystem{
  108. option: option,
  109. chunkCache: chunkCache,
  110. signature: util.RandomInt32(),
  111. }
  112. t.readerCache = filer.NewReaderCache(32, chunkCache, filer.LookupFn(t))
  113. return t, nil
  114. }
  115. var _ = filer_pb.FilerClient(&WebDavFileSystem{})
  116. func (fs *WebDavFileSystem) WithFilerClient(streamingMode bool, fn func(filer_pb.SeaweedFilerClient) error) error {
  117. return pb.WithGrpcClient(streamingMode, fs.signature, func(grpcConnection *grpc.ClientConn) error {
  118. client := filer_pb.NewSeaweedFilerClient(grpcConnection)
  119. return fn(client)
  120. }, fs.option.Filer.ToGrpcAddress(), false, fs.option.GrpcDialOption)
  121. }
  122. func (fs *WebDavFileSystem) AdjustedUrl(location *filer_pb.Location) string {
  123. return location.Url
  124. }
  125. func (fs *WebDavFileSystem) GetDataCenter() string {
  126. return ""
  127. }
  128. func clearName(name string) (string, error) {
  129. slashed := strings.HasSuffix(name, "/")
  130. name = path.Clean(name)
  131. if !strings.HasSuffix(name, "/") && slashed {
  132. name += "/"
  133. }
  134. if !strings.HasPrefix(name, "/") {
  135. return "", os.ErrInvalid
  136. }
  137. return name, nil
  138. }
  139. func (fs *WebDavFileSystem) Mkdir(ctx context.Context, fullDirPath string, perm os.FileMode) error {
  140. glog.V(2).Infof("WebDavFileSystem.Mkdir %v", fullDirPath)
  141. if !strings.HasSuffix(fullDirPath, "/") {
  142. fullDirPath += "/"
  143. }
  144. var err error
  145. if fullDirPath, err = clearName(fullDirPath); err != nil {
  146. return err
  147. }
  148. _, err = fs.stat(ctx, fullDirPath)
  149. if err == nil {
  150. return os.ErrExist
  151. }
  152. return fs.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
  153. dir, name := util.FullPath(fullDirPath).DirAndName()
  154. request := &filer_pb.CreateEntryRequest{
  155. Directory: dir,
  156. Entry: &filer_pb.Entry{
  157. Name: name,
  158. IsDirectory: true,
  159. Attributes: &filer_pb.FuseAttributes{
  160. Mtime: time.Now().Unix(),
  161. Crtime: time.Now().Unix(),
  162. FileMode: uint32(perm | os.ModeDir),
  163. Uid: fs.option.Uid,
  164. Gid: fs.option.Gid,
  165. },
  166. },
  167. Signatures: []int32{fs.signature},
  168. }
  169. glog.V(1).Infof("mkdir: %v", request)
  170. if err := filer_pb.CreateEntry(client, request); err != nil {
  171. return fmt.Errorf("mkdir %s/%s: %v", dir, name, err)
  172. }
  173. return nil
  174. })
  175. }
  176. func (fs *WebDavFileSystem) OpenFile(ctx context.Context, fullFilePath string, flag int, perm os.FileMode) (webdav.File, error) {
  177. glog.V(2).Infof("WebDavFileSystem.OpenFile %v %x", fullFilePath, flag)
  178. var err error
  179. if fullFilePath, err = clearName(fullFilePath); err != nil {
  180. return nil, err
  181. }
  182. if flag&os.O_CREATE != 0 {
  183. // file should not have / suffix.
  184. if strings.HasSuffix(fullFilePath, "/") {
  185. return nil, os.ErrInvalid
  186. }
  187. _, err = fs.stat(ctx, fullFilePath)
  188. if err == nil {
  189. if flag&os.O_EXCL != 0 {
  190. return nil, os.ErrExist
  191. }
  192. fs.removeAll(ctx, fullFilePath)
  193. }
  194. dir, name := util.FullPath(fullFilePath).DirAndName()
  195. err = fs.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
  196. if err := filer_pb.CreateEntry(client, &filer_pb.CreateEntryRequest{
  197. Directory: dir,
  198. Entry: &filer_pb.Entry{
  199. Name: name,
  200. IsDirectory: perm&os.ModeDir > 0,
  201. Attributes: &filer_pb.FuseAttributes{
  202. Mtime: time.Now().Unix(),
  203. Crtime: time.Now().Unix(),
  204. FileMode: uint32(perm),
  205. Uid: fs.option.Uid,
  206. Gid: fs.option.Gid,
  207. TtlSec: 0,
  208. },
  209. },
  210. Signatures: []int32{fs.signature},
  211. }); err != nil {
  212. return fmt.Errorf("create %s: %v", fullFilePath, err)
  213. }
  214. return nil
  215. })
  216. if err != nil {
  217. return nil, err
  218. }
  219. return &WebDavFile{
  220. fs: fs,
  221. name: fullFilePath,
  222. isDirectory: false,
  223. bufWriter: buffered_writer.NewBufferedWriteCloser(4 * 1024 * 1024),
  224. }, nil
  225. }
  226. fi, err := fs.stat(ctx, fullFilePath)
  227. if err != nil {
  228. return nil, os.ErrNotExist
  229. }
  230. if !strings.HasSuffix(fullFilePath, "/") && fi.IsDir() {
  231. fullFilePath += "/"
  232. }
  233. return &WebDavFile{
  234. fs: fs,
  235. name: fullFilePath,
  236. isDirectory: false,
  237. bufWriter: buffered_writer.NewBufferedWriteCloser(4 * 1024 * 1024),
  238. }, nil
  239. }
  240. func (fs *WebDavFileSystem) removeAll(ctx context.Context, fullFilePath string) error {
  241. var err error
  242. if fullFilePath, err = clearName(fullFilePath); err != nil {
  243. return err
  244. }
  245. dir, name := util.FullPath(fullFilePath).DirAndName()
  246. return filer_pb.Remove(fs, dir, name, true, false, false, false, []int32{fs.signature})
  247. }
  248. func (fs *WebDavFileSystem) RemoveAll(ctx context.Context, name string) error {
  249. glog.V(2).Infof("WebDavFileSystem.RemoveAll %v", name)
  250. return fs.removeAll(ctx, name)
  251. }
  252. func (fs *WebDavFileSystem) Rename(ctx context.Context, oldName, newName string) error {
  253. glog.V(2).Infof("WebDavFileSystem.Rename %v to %v", oldName, newName)
  254. var err error
  255. if oldName, err = clearName(oldName); err != nil {
  256. return err
  257. }
  258. if newName, err = clearName(newName); err != nil {
  259. return err
  260. }
  261. of, err := fs.stat(ctx, oldName)
  262. if err != nil {
  263. return os.ErrExist
  264. }
  265. if of.IsDir() {
  266. if strings.HasSuffix(oldName, "/") {
  267. oldName = strings.TrimRight(oldName, "/")
  268. }
  269. if strings.HasSuffix(newName, "/") {
  270. newName = strings.TrimRight(newName, "/")
  271. }
  272. }
  273. _, err = fs.stat(ctx, newName)
  274. if err == nil {
  275. return os.ErrExist
  276. }
  277. oldDir, oldBaseName := util.FullPath(oldName).DirAndName()
  278. newDir, newBaseName := util.FullPath(newName).DirAndName()
  279. return fs.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
  280. request := &filer_pb.AtomicRenameEntryRequest{
  281. OldDirectory: oldDir,
  282. OldName: oldBaseName,
  283. NewDirectory: newDir,
  284. NewName: newBaseName,
  285. }
  286. _, err := client.AtomicRenameEntry(ctx, request)
  287. if err != nil {
  288. return fmt.Errorf("renaming %s/%s => %s/%s: %v", oldDir, oldBaseName, newDir, newBaseName, err)
  289. }
  290. return nil
  291. })
  292. }
  293. func (fs *WebDavFileSystem) stat(ctx context.Context, fullFilePath string) (os.FileInfo, error) {
  294. var err error
  295. if fullFilePath, err = clearName(fullFilePath); err != nil {
  296. return nil, err
  297. }
  298. fullpath := util.FullPath(fullFilePath)
  299. var fi FileInfo
  300. entry, err := filer_pb.GetEntry(fs, fullpath)
  301. if entry == nil {
  302. return nil, os.ErrNotExist
  303. }
  304. if err != nil {
  305. return nil, err
  306. }
  307. fi.size = int64(filer.FileSize(entry))
  308. fi.name = string(fullpath)
  309. fi.mode = os.FileMode(entry.Attributes.FileMode)
  310. fi.modifiedTime = time.Unix(entry.Attributes.Mtime, 0)
  311. fi.isDirectory = entry.IsDirectory
  312. if fi.name == "/" {
  313. fi.modifiedTime = time.Now()
  314. fi.isDirectory = true
  315. }
  316. return &fi, nil
  317. }
  318. func (fs *WebDavFileSystem) Stat(ctx context.Context, name string) (os.FileInfo, error) {
  319. glog.V(2).Infof("WebDavFileSystem.Stat %v", name)
  320. return fs.stat(ctx, name)
  321. }
  322. func (f *WebDavFile) saveDataAsChunk(reader io.Reader, name string, offset int64, tsNs int64) (chunk *filer_pb.FileChunk, err error) {
  323. fileId, uploadResult, flushErr, _ := operation.UploadWithRetry(
  324. f.fs,
  325. &filer_pb.AssignVolumeRequest{
  326. Count: 1,
  327. Replication: f.fs.option.Replication,
  328. Collection: f.fs.option.Collection,
  329. DiskType: f.fs.option.DiskType,
  330. Path: name,
  331. },
  332. &operation.UploadOption{
  333. Filename: f.name,
  334. Cipher: f.fs.option.Cipher,
  335. IsInputCompressed: false,
  336. MimeType: "",
  337. PairMap: nil,
  338. },
  339. func(host, fileId string) string {
  340. return fmt.Sprintf("http://%s/%s", host, fileId)
  341. },
  342. reader,
  343. )
  344. if flushErr != nil {
  345. glog.V(0).Infof("upload data %v: %v", f.name, flushErr)
  346. return nil, fmt.Errorf("upload data: %v", flushErr)
  347. }
  348. if uploadResult.Error != "" {
  349. glog.V(0).Infof("upload failure %v: %v", f.name, flushErr)
  350. return nil, fmt.Errorf("upload result: %v", uploadResult.Error)
  351. }
  352. return uploadResult.ToPbFileChunk(fileId, offset, tsNs), nil
  353. }
  354. func (f *WebDavFile) Write(buf []byte) (int, error) {
  355. glog.V(2).Infof("WebDavFileSystem.Write %v", f.name)
  356. dir, _ := util.FullPath(f.name).DirAndName()
  357. var getErr error
  358. ctx := context.Background()
  359. if f.entry == nil {
  360. f.entry, getErr = filer_pb.GetEntry(f.fs, util.FullPath(f.name))
  361. }
  362. if f.entry == nil {
  363. return 0, getErr
  364. }
  365. if getErr != nil {
  366. return 0, getErr
  367. }
  368. if f.bufWriter.FlushFunc == nil {
  369. f.bufWriter.FlushFunc = func(data []byte, offset int64) (flushErr error) {
  370. var chunk *filer_pb.FileChunk
  371. chunk, flushErr = f.saveDataAsChunk(util.NewBytesReader(data), f.name, offset, time.Now().UnixNano())
  372. if flushErr != nil {
  373. return fmt.Errorf("%s upload result: %v", f.name, flushErr)
  374. }
  375. f.entry.Content = nil
  376. f.entry.Chunks = append(f.entry.GetChunks(), chunk)
  377. return flushErr
  378. }
  379. f.bufWriter.CloseFunc = func() error {
  380. manifestedChunks, manifestErr := filer.MaybeManifestize(f.saveDataAsChunk, f.entry.GetChunks())
  381. if manifestErr != nil {
  382. // not good, but should be ok
  383. glog.V(0).Infof("file %s close MaybeManifestize: %v", f.name, manifestErr)
  384. } else {
  385. f.entry.Chunks = manifestedChunks
  386. }
  387. flushErr := f.fs.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
  388. f.entry.Attributes.Mtime = time.Now().Unix()
  389. request := &filer_pb.UpdateEntryRequest{
  390. Directory: dir,
  391. Entry: f.entry,
  392. Signatures: []int32{f.fs.signature},
  393. }
  394. if _, err := client.UpdateEntry(ctx, request); err != nil {
  395. return fmt.Errorf("update %s: %v", f.name, err)
  396. }
  397. return nil
  398. })
  399. return flushErr
  400. }
  401. }
  402. written, err := f.bufWriter.Write(buf)
  403. if err == nil {
  404. f.entry.Attributes.FileSize = uint64(max(f.off+int64(written), int64(f.entry.Attributes.FileSize)))
  405. glog.V(3).Infof("WebDavFileSystem.Write %v: written [%d,%d)", f.name, f.off, f.off+int64(len(buf)))
  406. f.off += int64(written)
  407. }
  408. return written, err
  409. }
  410. func (f *WebDavFile) Close() error {
  411. glog.V(2).Infof("WebDavFileSystem.Close %v", f.name)
  412. err := f.bufWriter.Close()
  413. if f.entry != nil {
  414. f.entry = nil
  415. f.visibleIntervals = nil
  416. }
  417. return err
  418. }
  419. func (f *WebDavFile) Read(p []byte) (readSize int, err error) {
  420. glog.V(2).Infof("WebDavFileSystem.Read %v", f.name)
  421. if f.entry == nil {
  422. f.entry, err = filer_pb.GetEntry(f.fs, util.FullPath(f.name))
  423. }
  424. if f.entry == nil {
  425. return 0, err
  426. }
  427. if err != nil {
  428. return 0, err
  429. }
  430. fileSize := int64(filer.FileSize(f.entry))
  431. if fileSize == 0 {
  432. return 0, io.EOF
  433. }
  434. if f.visibleIntervals == nil {
  435. f.visibleIntervals, _ = filer.NonOverlappingVisibleIntervals(filer.LookupFn(f.fs), f.entry.GetChunks(), 0, fileSize)
  436. f.reader = nil
  437. }
  438. if f.reader == nil {
  439. chunkViews := filer.ViewFromVisibleIntervals(f.visibleIntervals, 0, fileSize)
  440. f.reader = filer.NewChunkReaderAtFromClient(f.fs.readerCache, chunkViews, fileSize)
  441. }
  442. readSize, err = f.reader.ReadAt(p, f.off)
  443. glog.V(3).Infof("WebDavFileSystem.Read %v: [%d,%d)", f.name, f.off, f.off+int64(readSize))
  444. f.off += int64(readSize)
  445. if err != nil && err != io.EOF {
  446. glog.Errorf("file read %s: %v", f.name, err)
  447. }
  448. return
  449. }
  450. func (f *WebDavFile) Readdir(count int) (ret []os.FileInfo, err error) {
  451. glog.V(2).Infof("WebDavFileSystem.Readdir %v count %d", f.name, count)
  452. dir, _ := util.FullPath(f.name).DirAndName()
  453. err = filer_pb.ReadDirAllEntries(f.fs, util.FullPath(dir), "", func(entry *filer_pb.Entry, isLast bool) error {
  454. fi := FileInfo{
  455. size: int64(filer.FileSize(entry)),
  456. name: entry.Name,
  457. mode: os.FileMode(entry.Attributes.FileMode),
  458. modifiedTime: time.Unix(entry.Attributes.Mtime, 0),
  459. isDirectory: entry.IsDirectory,
  460. }
  461. if !strings.HasSuffix(fi.name, "/") && fi.IsDir() {
  462. fi.name += "/"
  463. }
  464. glog.V(4).Infof("entry: %v", fi.name)
  465. ret = append(ret, &fi)
  466. return nil
  467. })
  468. if err != nil {
  469. return nil, err
  470. }
  471. old := f.off
  472. if old >= int64(len(ret)) {
  473. if count > 0 {
  474. return nil, io.EOF
  475. }
  476. return nil, nil
  477. }
  478. if count > 0 {
  479. f.off += int64(count)
  480. if f.off > int64(len(ret)) {
  481. f.off = int64(len(ret))
  482. }
  483. } else {
  484. f.off = int64(len(ret))
  485. old = 0
  486. }
  487. return ret[old:f.off], nil
  488. }
  489. func (f *WebDavFile) Seek(offset int64, whence int) (int64, error) {
  490. glog.V(2).Infof("WebDavFile.Seek %v %v %v", f.name, offset, whence)
  491. ctx := context.Background()
  492. var err error
  493. switch whence {
  494. case io.SeekStart:
  495. f.off = 0
  496. case io.SeekEnd:
  497. if fi, err := f.fs.stat(ctx, f.name); err != nil {
  498. return 0, err
  499. } else {
  500. f.off = fi.Size()
  501. }
  502. }
  503. f.off += offset
  504. return f.off, err
  505. }
  506. func (f *WebDavFile) Stat() (os.FileInfo, error) {
  507. glog.V(2).Infof("WebDavFile.Stat %v", f.name)
  508. ctx := context.Background()
  509. return f.fs.stat(ctx, f.name)
  510. }