client.go 8.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292
  1. // Package client provides a ntfy client to publish and subscribe to topics
  2. package client
  3. import (
  4. "bufio"
  5. "context"
  6. "encoding/json"
  7. "errors"
  8. "fmt"
  9. "heckel.io/ntfy/v2/log"
  10. "heckel.io/ntfy/v2/util"
  11. "io"
  12. "net/http"
  13. "regexp"
  14. "strings"
  15. "sync"
  16. "time"
  17. )
  18. const (
  19. // MessageEvent identifies a message event
  20. MessageEvent = "message"
  21. )
  22. const (
  23. maxResponseBytes = 4096
  24. )
  25. var (
  26. topicRegex = regexp.MustCompile(`^[-_A-Za-z0-9]{1,64}$`) // Same as in server/server.go
  27. )
  28. // Client is the ntfy client that can be used to publish and subscribe to ntfy topics
  29. type Client struct {
  30. Messages chan *Message
  31. config *Config
  32. subscriptions map[string]*subscription
  33. mu sync.Mutex
  34. }
  35. // Message is a struct that represents a ntfy message
  36. type Message struct { // TODO combine with server.message
  37. ID string
  38. Event string
  39. Time int64
  40. Topic string
  41. Message string
  42. Title string
  43. Priority int
  44. Tags []string
  45. Click string
  46. Icon string
  47. Attachment *Attachment
  48. // Additional fields
  49. TopicURL string
  50. SubscriptionID string
  51. Raw string
  52. }
  53. // Attachment represents a message attachment
  54. type Attachment struct {
  55. Name string `json:"name"`
  56. Type string `json:"type,omitempty"`
  57. Size int64 `json:"size,omitempty"`
  58. Expires int64 `json:"expires,omitempty"`
  59. URL string `json:"url"`
  60. Owner string `json:"-"` // IP address of uploader, used for rate limiting
  61. }
  62. type subscription struct {
  63. ID string
  64. topicURL string
  65. cancel context.CancelFunc
  66. }
  67. // New creates a new Client using a given Config
  68. func New(config *Config) *Client {
  69. return &Client{
  70. Messages: make(chan *Message, 50), // Allow reading a few messages
  71. config: config,
  72. subscriptions: make(map[string]*subscription),
  73. }
  74. }
  75. // Publish sends a message to a specific topic, optionally using options.
  76. // See PublishReader for details.
  77. func (c *Client) Publish(topic, message string, options ...PublishOption) (*Message, error) {
  78. return c.PublishReader(topic, strings.NewReader(message), options...)
  79. }
  80. // PublishReader sends a message to a specific topic, optionally using options.
  81. //
  82. // A topic can be either a full URL (e.g. https://myhost.lan/mytopic), a short URL which is then prepended https://
  83. // (e.g. myhost.lan -> https://myhost.lan), or a short name which is expanded using the default host in the
  84. // config (e.g. mytopic -> https://ntfy.sh/mytopic).
  85. //
  86. // To pass title, priority and tags, check out WithTitle, WithPriority, WithTagsList, WithDelay, WithNoCache,
  87. // WithNoFirebase, and the generic WithHeader.
  88. func (c *Client) PublishReader(topic string, body io.Reader, options ...PublishOption) (*Message, error) {
  89. topicURL, err := c.expandTopicURL(topic)
  90. if err != nil {
  91. return nil, err
  92. }
  93. req, err := http.NewRequest("POST", topicURL, body)
  94. if err != nil {
  95. return nil, err
  96. }
  97. for _, option := range options {
  98. if err := option(req); err != nil {
  99. return nil, err
  100. }
  101. }
  102. log.Debug("%s Publishing message with headers %s", util.ShortTopicURL(topicURL), req.Header)
  103. resp, err := http.DefaultClient.Do(req)
  104. if err != nil {
  105. return nil, err
  106. }
  107. defer resp.Body.Close()
  108. b, err := io.ReadAll(io.LimitReader(resp.Body, maxResponseBytes))
  109. if err != nil {
  110. return nil, err
  111. }
  112. if resp.StatusCode != http.StatusOK {
  113. return nil, errors.New(strings.TrimSpace(string(b)))
  114. }
  115. m, err := toMessage(string(b), topicURL, "")
  116. if err != nil {
  117. return nil, err
  118. }
  119. return m, nil
  120. }
  121. // Poll queries a topic for all (or a limited set) of messages. Unlike Subscribe, this method only polls for
  122. // messages and does not subscribe to messages that arrive after this call.
  123. //
  124. // A topic can be either a full URL (e.g. https://myhost.lan/mytopic), a short URL which is then prepended https://
  125. // (e.g. myhost.lan -> https://myhost.lan), or a short name which is expanded using the default host in the
  126. // config (e.g. mytopic -> https://ntfy.sh/mytopic).
  127. //
  128. // By default, all messages will be returned, but you can change this behavior using a SubscribeOption.
  129. // See WithSince, WithSinceAll, WithSinceUnixTime, WithScheduled, and the generic WithQueryParam.
  130. func (c *Client) Poll(topic string, options ...SubscribeOption) ([]*Message, error) {
  131. topicURL, err := c.expandTopicURL(topic)
  132. if err != nil {
  133. return nil, err
  134. }
  135. ctx := context.Background()
  136. messages := make([]*Message, 0)
  137. msgChan := make(chan *Message)
  138. errChan := make(chan error)
  139. log.Debug("%s Polling from topic", util.ShortTopicURL(topicURL))
  140. options = append(options, WithPoll())
  141. go func() {
  142. err := performSubscribeRequest(ctx, msgChan, topicURL, "", options...)
  143. close(msgChan)
  144. errChan <- err
  145. }()
  146. for m := range msgChan {
  147. messages = append(messages, m)
  148. }
  149. return messages, <-errChan
  150. }
  151. // Subscribe subscribes to a topic to listen for newly incoming messages. The method starts a connection in the
  152. // background and returns new messages via the Messages channel.
  153. //
  154. // A topic can be either a full URL (e.g. https://myhost.lan/mytopic), a short URL which is then prepended https://
  155. // (e.g. myhost.lan -> https://myhost.lan), or a short name which is expanded using the default host in the
  156. // config (e.g. mytopic -> https://ntfy.sh/mytopic).
  157. //
  158. // By default, only new messages will be returned, but you can change this behavior using a SubscribeOption.
  159. // See WithSince, WithSinceAll, WithSinceUnixTime, WithScheduled, and the generic WithQueryParam.
  160. //
  161. // The method returns a unique subscriptionID that can be used in Unsubscribe.
  162. //
  163. // Example:
  164. //
  165. // c := client.New(client.NewConfig())
  166. // subscriptionID, _ := c.Subscribe("mytopic")
  167. // for m := range c.Messages {
  168. // fmt.Printf("New message: %s", m.Message)
  169. // }
  170. func (c *Client) Subscribe(topic string, options ...SubscribeOption) (string, error) {
  171. topicURL, err := c.expandTopicURL(topic)
  172. if err != nil {
  173. return "", err
  174. }
  175. c.mu.Lock()
  176. defer c.mu.Unlock()
  177. subscriptionID := util.RandomString(10)
  178. log.Debug("%s Subscribing to topic", util.ShortTopicURL(topicURL))
  179. ctx, cancel := context.WithCancel(context.Background())
  180. c.subscriptions[subscriptionID] = &subscription{
  181. ID: subscriptionID,
  182. topicURL: topicURL,
  183. cancel: cancel,
  184. }
  185. go handleSubscribeConnLoop(ctx, c.Messages, topicURL, subscriptionID, options...)
  186. return subscriptionID, nil
  187. }
  188. // Unsubscribe unsubscribes from a topic that has been previously subscribed to using the unique
  189. // subscriptionID returned in Subscribe.
  190. func (c *Client) Unsubscribe(subscriptionID string) {
  191. c.mu.Lock()
  192. defer c.mu.Unlock()
  193. sub, ok := c.subscriptions[subscriptionID]
  194. if !ok {
  195. return
  196. }
  197. delete(c.subscriptions, subscriptionID)
  198. sub.cancel()
  199. }
  200. func (c *Client) expandTopicURL(topic string) (string, error) {
  201. if strings.HasPrefix(topic, "http://") || strings.HasPrefix(topic, "https://") {
  202. return topic, nil
  203. } else if strings.Contains(topic, "/") {
  204. return fmt.Sprintf("https://%s", topic), nil
  205. }
  206. if !topicRegex.MatchString(topic) {
  207. return "", fmt.Errorf("invalid topic name: %s", topic)
  208. }
  209. return fmt.Sprintf("%s/%s", c.config.DefaultHost, topic), nil
  210. }
  211. func handleSubscribeConnLoop(ctx context.Context, msgChan chan *Message, topicURL, subcriptionID string, options ...SubscribeOption) {
  212. for {
  213. // TODO The retry logic is crude and may lose messages. It should record the last message like the
  214. // Android client, use since=, and do incremental backoff too
  215. if err := performSubscribeRequest(ctx, msgChan, topicURL, subcriptionID, options...); err != nil {
  216. log.Warn("%s Connection failed: %s", util.ShortTopicURL(topicURL), err.Error())
  217. }
  218. select {
  219. case <-ctx.Done():
  220. log.Info("%s Connection exited", util.ShortTopicURL(topicURL))
  221. return
  222. case <-time.After(10 * time.Second): // TODO Add incremental backoff
  223. }
  224. }
  225. }
  226. func performSubscribeRequest(ctx context.Context, msgChan chan *Message, topicURL string, subscriptionID string, options ...SubscribeOption) error {
  227. streamURL := fmt.Sprintf("%s/json", topicURL)
  228. log.Debug("%s Listening to %s", util.ShortTopicURL(topicURL), streamURL)
  229. req, err := http.NewRequestWithContext(ctx, http.MethodGet, streamURL, nil)
  230. if err != nil {
  231. return err
  232. }
  233. for _, option := range options {
  234. if err := option(req); err != nil {
  235. return err
  236. }
  237. }
  238. resp, err := http.DefaultClient.Do(req)
  239. if err != nil {
  240. return err
  241. }
  242. defer resp.Body.Close()
  243. if resp.StatusCode != http.StatusOK {
  244. b, err := io.ReadAll(io.LimitReader(resp.Body, maxResponseBytes))
  245. if err != nil {
  246. return err
  247. }
  248. return errors.New(strings.TrimSpace(string(b)))
  249. }
  250. scanner := bufio.NewScanner(resp.Body)
  251. for scanner.Scan() {
  252. messageJSON := scanner.Text()
  253. m, err := toMessage(messageJSON, topicURL, subscriptionID)
  254. if err != nil {
  255. return err
  256. }
  257. log.Trace("%s Message received: %s", util.ShortTopicURL(topicURL), messageJSON)
  258. if m.Event == MessageEvent {
  259. msgChan <- m
  260. }
  261. }
  262. return nil
  263. }
  264. func toMessage(s, topicURL, subscriptionID string) (*Message, error) {
  265. var m *Message
  266. if err := json.NewDecoder(strings.NewReader(s)).Decode(&m); err != nil {
  267. return nil, err
  268. }
  269. m.TopicURL = topicURL
  270. m.SubscriptionID = subscriptionID
  271. m.Raw = s
  272. return m, nil
  273. }