topic.go 1.6 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273
  1. package topic
  2. import (
  3. "fmt"
  4. "github.com/seaweedfs/seaweedfs/weed/filer"
  5. "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
  6. "time"
  7. )
  8. type Namespace string
  9. type Topic struct {
  10. Namespace Namespace
  11. Name string
  12. }
  13. func NewTopic(namespace Namespace, name string) Topic {
  14. return Topic{
  15. Namespace: namespace,
  16. Name: name,
  17. }
  18. }
  19. func FromPbTopic(topic *mq_pb.Topic) Topic {
  20. return Topic{
  21. Namespace: Namespace(topic.Namespace),
  22. Name: topic.Name,
  23. }
  24. }
  25. func (tp Topic) String() string {
  26. return fmt.Sprintf("%s.%s", tp.Namespace, tp.Name)
  27. }
  28. type Segment struct {
  29. Topic Topic
  30. Id int32
  31. Partition Partition
  32. LastModified time.Time
  33. }
  34. func FromPbSegment(segment *mq_pb.Segment) *Segment {
  35. return &Segment{
  36. Topic: Topic{
  37. Namespace: Namespace(segment.Namespace),
  38. Name: segment.Topic,
  39. },
  40. Id: segment.Id,
  41. Partition: Partition{
  42. RangeStart: segment.Partition.RangeStart,
  43. RangeStop: segment.Partition.RangeStop,
  44. RingSize: segment.Partition.RingSize,
  45. },
  46. }
  47. }
  48. func (segment *Segment) ToPbSegment() *mq_pb.Segment {
  49. return &mq_pb.Segment{
  50. Namespace: string(segment.Topic.Namespace),
  51. Topic: segment.Topic.Name,
  52. Id: segment.Id,
  53. Partition: &mq_pb.Partition{
  54. RingSize: segment.Partition.RingSize,
  55. RangeStart: segment.Partition.RangeStart,
  56. RangeStop: segment.Partition.RangeStop,
  57. },
  58. }
  59. }
  60. func (segment *Segment) DirAndName() (dir string, name string) {
  61. dir = fmt.Sprintf("%s/%s/%s", filer.TopicsDir, segment.Topic.Namespace, segment.Topic.Name)
  62. name = fmt.Sprintf("%4d.segment", segment.Id)
  63. return
  64. }