gateway_server.go 3.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106
  1. package weed_server
  2. import (
  3. "github.com/chrislusf/seaweedfs/weed/operation"
  4. "google.golang.org/grpc"
  5. "math/rand"
  6. "net/http"
  7. "github.com/chrislusf/seaweedfs/weed/util"
  8. _ "github.com/chrislusf/seaweedfs/weed/filer/cassandra"
  9. _ "github.com/chrislusf/seaweedfs/weed/filer/elastic/v7"
  10. _ "github.com/chrislusf/seaweedfs/weed/filer/etcd"
  11. _ "github.com/chrislusf/seaweedfs/weed/filer/hbase"
  12. _ "github.com/chrislusf/seaweedfs/weed/filer/leveldb"
  13. _ "github.com/chrislusf/seaweedfs/weed/filer/leveldb2"
  14. _ "github.com/chrislusf/seaweedfs/weed/filer/leveldb3"
  15. _ "github.com/chrislusf/seaweedfs/weed/filer/mongodb"
  16. _ "github.com/chrislusf/seaweedfs/weed/filer/mysql"
  17. _ "github.com/chrislusf/seaweedfs/weed/filer/mysql2"
  18. _ "github.com/chrislusf/seaweedfs/weed/filer/postgres"
  19. _ "github.com/chrislusf/seaweedfs/weed/filer/postgres2"
  20. _ "github.com/chrislusf/seaweedfs/weed/filer/redis"
  21. _ "github.com/chrislusf/seaweedfs/weed/filer/redis2"
  22. "github.com/chrislusf/seaweedfs/weed/glog"
  23. _ "github.com/chrislusf/seaweedfs/weed/notification/aws_sqs"
  24. _ "github.com/chrislusf/seaweedfs/weed/notification/gocdk_pub_sub"
  25. _ "github.com/chrislusf/seaweedfs/weed/notification/google_pub_sub"
  26. _ "github.com/chrislusf/seaweedfs/weed/notification/kafka"
  27. _ "github.com/chrislusf/seaweedfs/weed/notification/log"
  28. "github.com/chrislusf/seaweedfs/weed/security"
  29. )
  30. type GatewayOption struct {
  31. Masters []string
  32. Filers []string
  33. MaxMB int
  34. IsSecure bool
  35. }
  36. type GatewayServer struct {
  37. option *GatewayOption
  38. secret security.SigningKey
  39. grpcDialOption grpc.DialOption
  40. }
  41. func NewGatewayServer(defaultMux *http.ServeMux, option *GatewayOption) (fs *GatewayServer, err error) {
  42. fs = &GatewayServer{
  43. option: option,
  44. grpcDialOption: security.LoadClientTLS(util.GetViper(), "grpc.client"),
  45. }
  46. if len(option.Masters) == 0 {
  47. glog.Fatal("master list is required!")
  48. }
  49. defaultMux.HandleFunc("/blobs/", fs.blobsHandler)
  50. defaultMux.HandleFunc("/files/", fs.filesHandler)
  51. defaultMux.HandleFunc("/topics/", fs.topicsHandler)
  52. return fs, nil
  53. }
  54. func (fs *GatewayServer) getMaster() string {
  55. randMaster := rand.Intn(len(fs.option.Masters))
  56. return fs.option.Masters[randMaster]
  57. }
  58. func (fs *GatewayServer) blobsHandler(w http.ResponseWriter, r *http.Request) {
  59. switch r.Method {
  60. case "DELETE":
  61. chunkId := r.URL.Path[len("/blobs/"):]
  62. fullUrl, err := operation.LookupFileId(fs.getMaster, chunkId)
  63. if err != nil {
  64. writeJsonError(w, r, http.StatusNotFound, err)
  65. return
  66. }
  67. var jwtAuthorization security.EncodedJwt
  68. if fs.option.IsSecure {
  69. jwtAuthorization = operation.LookupJwt(fs.getMaster(), chunkId)
  70. }
  71. body, statusCode, err := util.DeleteProxied(fullUrl, string(jwtAuthorization))
  72. if err != nil {
  73. writeJsonError(w, r, http.StatusNotFound, err)
  74. return
  75. }
  76. w.WriteHeader(statusCode)
  77. w.Write(body)
  78. case "POST":
  79. submitForClientHandler(w, r, fs.getMaster, fs.grpcDialOption)
  80. }
  81. }
  82. func (fs *GatewayServer) filesHandler(w http.ResponseWriter, r *http.Request) {
  83. switch r.Method {
  84. case "DELETE":
  85. case "POST":
  86. }
  87. }
  88. func (fs *GatewayServer) topicsHandler(w http.ResponseWriter, r *http.Request) {
  89. switch r.Method {
  90. case "POST":
  91. }
  92. }