123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236 |
- package weed_server
- import (
- "context"
- "fmt"
- "math/rand/v2"
- "net/http"
- "strconv"
- "github.com/seaweedfs/seaweedfs/weed/pb"
- "github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
- "github.com/seaweedfs/seaweedfs/weed/glog"
- "github.com/seaweedfs/seaweedfs/weed/operation"
- "github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
- "github.com/seaweedfs/seaweedfs/weed/storage/backend/memory_map"
- "github.com/seaweedfs/seaweedfs/weed/storage/needle"
- "github.com/seaweedfs/seaweedfs/weed/storage/super_block"
- "github.com/seaweedfs/seaweedfs/weed/storage/types"
- "github.com/seaweedfs/seaweedfs/weed/topology"
- "github.com/seaweedfs/seaweedfs/weed/util"
- util_http "github.com/seaweedfs/seaweedfs/weed/util/http"
- )
- func (ms *MasterServer) collectionDeleteHandler(w http.ResponseWriter, r *http.Request) {
- collectionName := r.FormValue("collection")
- collection, ok := ms.Topo.FindCollection(collectionName)
- if !ok {
- writeJsonError(w, r, http.StatusBadRequest, fmt.Errorf("collection %s does not exist", collectionName))
- return
- }
- for _, server := range collection.ListVolumeServers() {
- err := operation.WithVolumeServerClient(false, server.ServerAddress(), ms.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
- _, deleteErr := client.DeleteCollection(context.Background(), &volume_server_pb.DeleteCollectionRequest{
- Collection: collection.Name,
- })
- return deleteErr
- })
- if err != nil {
- writeJsonError(w, r, http.StatusInternalServerError, err)
- return
- }
- }
- ms.Topo.DeleteCollection(collectionName)
- w.WriteHeader(http.StatusNoContent)
- return
- }
- func (ms *MasterServer) dirStatusHandler(w http.ResponseWriter, r *http.Request) {
- m := make(map[string]interface{})
- m["Version"] = util.Version()
- m["Topology"] = ms.Topo.ToInfo()
- writeJsonQuiet(w, r, http.StatusOK, m)
- }
- func (ms *MasterServer) volumeVacuumHandler(w http.ResponseWriter, r *http.Request) {
- gcString := r.FormValue("garbageThreshold")
- gcThreshold := ms.option.GarbageThreshold
- if gcString != "" {
- var err error
- gcThreshold, err = strconv.ParseFloat(gcString, 32)
- if err != nil {
- glog.V(0).Infof("garbageThreshold %s is not a valid float number: %v", gcString, err)
- writeJsonError(w, r, http.StatusNotAcceptable, fmt.Errorf("garbageThreshold %s is not a valid float number", gcString))
- return
- }
- }
- // glog.Infoln("garbageThreshold =", gcThreshold)
- ms.Topo.Vacuum(ms.grpcDialOption, gcThreshold, ms.option.MaxParallelVacuumPerServer, 0, "", ms.preallocateSize, false)
- ms.dirStatusHandler(w, r)
- }
- func (ms *MasterServer) volumeGrowHandler(w http.ResponseWriter, r *http.Request) {
- count := uint64(0)
- option, err := ms.getVolumeGrowOption(r)
- if err != nil {
- writeJsonError(w, r, http.StatusNotAcceptable, err)
- return
- }
- glog.V(0).Infof("volumeGrowHandler received %v from %v", option.String(), r.RemoteAddr)
- if count, err = strconv.ParseUint(r.FormValue("count"), 10, 32); err == nil {
- replicaCount := int64(count * uint64(option.ReplicaPlacement.GetCopyCount()))
- if ms.Topo.AvailableSpaceFor(option) < replicaCount {
- err = fmt.Errorf("only %d volumes left, not enough for %d", ms.Topo.AvailableSpaceFor(option), replicaCount)
- } else if !ms.Topo.DataCenterExists(option.DataCenter) {
- err = fmt.Errorf("data center %v not found in topology", option.DataCenter)
- } else {
- var newVidLocations []*master_pb.VolumeLocation
- newVidLocations, err = ms.vg.GrowByCountAndType(ms.grpcDialOption, uint32(count), option, ms.Topo)
- count = uint64(len(newVidLocations))
- }
- } else {
- err = fmt.Errorf("can not parse parameter count %s", r.FormValue("count"))
- }
- if err != nil {
- writeJsonError(w, r, http.StatusNotAcceptable, err)
- } else {
- writeJsonQuiet(w, r, http.StatusOK, map[string]interface{}{"count": count})
- }
- }
- func (ms *MasterServer) volumeStatusHandler(w http.ResponseWriter, r *http.Request) {
- m := make(map[string]interface{})
- m["Version"] = util.Version()
- m["Volumes"] = ms.Topo.ToVolumeMap()
- writeJsonQuiet(w, r, http.StatusOK, m)
- }
- func (ms *MasterServer) redirectHandler(w http.ResponseWriter, r *http.Request) {
- vid, _, _, _, _ := parseURLPath(r.URL.Path)
- collection := r.FormValue("collection")
- location := ms.findVolumeLocation(collection, vid)
- if location.Error == "" {
- loc := location.Locations[rand.IntN(len(location.Locations))]
- url, _ := util_http.NormalizeUrl(loc.PublicUrl)
- if r.URL.RawQuery != "" {
- url = url + r.URL.Path + "?" + r.URL.RawQuery
- } else {
- url = url + r.URL.Path
- }
- http.Redirect(w, r, url, http.StatusPermanentRedirect)
- } else {
- writeJsonError(w, r, http.StatusNotFound, fmt.Errorf("volume id %s not found: %s", vid, location.Error))
- }
- }
- func (ms *MasterServer) submitFromMasterServerHandler(w http.ResponseWriter, r *http.Request) {
- if ms.Topo.IsLeader() {
- submitForClientHandler(w, r, func(ctx context.Context) pb.ServerAddress { return ms.option.Master }, ms.grpcDialOption)
- } else {
- masterUrl, err := ms.Topo.Leader()
- if err != nil {
- writeJsonError(w, r, http.StatusInternalServerError, err)
- } else {
- submitForClientHandler(w, r, func(ctx context.Context) pb.ServerAddress { return masterUrl }, ms.grpcDialOption)
- }
- }
- }
- func (ms *MasterServer) getVolumeGrowOption(r *http.Request) (*topology.VolumeGrowOption, error) {
- replicationString := r.FormValue("replication")
- if replicationString == "" {
- replicationString = ms.option.DefaultReplicaPlacement
- }
- replicaPlacement, err := super_block.NewReplicaPlacementFromString(replicationString)
- if err != nil {
- return nil, err
- }
- ttl, err := needle.ReadTTL(r.FormValue("ttl"))
- if err != nil {
- return nil, err
- }
- memoryMapMaxSizeMb, err := memory_map.ReadMemoryMapMaxSizeMb(r.FormValue("memoryMapMaxSizeMb"))
- if err != nil {
- return nil, err
- }
- diskType := types.ToDiskType(r.FormValue("disk"))
- preallocate := ms.preallocateSize
- if r.FormValue("preallocate") != "" {
- preallocate, err = strconv.ParseInt(r.FormValue("preallocate"), 10, 64)
- if err != nil {
- return nil, fmt.Errorf("Failed to parse int64 preallocate = %s: %v", r.FormValue("preallocate"), err)
- }
- }
- volumeGrowOption := &topology.VolumeGrowOption{
- Collection: r.FormValue("collection"),
- ReplicaPlacement: replicaPlacement,
- Ttl: ttl,
- DiskType: diskType,
- Preallocate: preallocate,
- DataCenter: r.FormValue("dataCenter"),
- Rack: r.FormValue("rack"),
- DataNode: r.FormValue("dataNode"),
- MemoryMapMaxSizeMb: memoryMapMaxSizeMb,
- }
- return volumeGrowOption, nil
- }
- func (ms *MasterServer) collectionInfoHandler(w http.ResponseWriter, r *http.Request) {
- //get collection from request
- collectionName := r.FormValue("collection")
- if collectionName == "" {
- writeJsonError(w, r, http.StatusBadRequest, fmt.Errorf("collection is required"))
- return
- }
- //output details of the volumes?
- detail := r.FormValue("detail") == "true"
- //collect collection info
- collection, ok := ms.Topo.FindCollection(collectionName)
- if !ok {
- writeJsonError(w, r, http.StatusBadRequest, fmt.Errorf("collection %s does not exist", collectionName))
- return
- }
- volumeLayouts := collection.GetAllVolumeLayouts()
- if detail {
- //prepare the json response
- all_stats := make([]map[string]interface{}, len(volumeLayouts))
- for i, volumeLayout := range volumeLayouts {
- volumeLayoutStats := volumeLayout.Stats()
- m := make(map[string]interface{})
- m["Version"] = util.Version()
- m["Collection"] = collectionName
- m["TotalSize"] = volumeLayoutStats.TotalSize
- m["FileCount"] = volumeLayoutStats.FileCount
- m["UsedSize"] = volumeLayoutStats.UsedSize
- all_stats[i] = m
- }
- //write it
- writeJsonQuiet(w, r, http.StatusOK, all_stats)
- } else {
- //prepare the json response
- collectionStats := map[string]interface{}{
- "Version": util.Version(),
- "Collection": collectionName,
- "TotalSize": uint64(0),
- "FileCount": uint64(0),
- "UsedSize": uint64(0),
- "VolumeCount": uint64(0),
- }
- for _, volumeLayout := range volumeLayouts {
- volumeLayoutStats := volumeLayout.Stats()
- collectionStats["TotalSize"] = collectionStats["TotalSize"].(uint64) + volumeLayoutStats.TotalSize
- collectionStats["FileCount"] = collectionStats["FileCount"].(uint64) + volumeLayoutStats.FileCount
- collectionStats["UsedSize"] = collectionStats["UsedSize"].(uint64) + volumeLayoutStats.UsedSize
- collectionStats["VolumeCount"] = collectionStats["VolumeCount"].(uint64) + 1
- }
- //write it
- writeJsonQuiet(w, r, http.StatusOK, collectionStats)
- }
- }
|