client.go 8.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284
  1. // Package client provides a ntfy client to publish and subscribe to topics
  2. package client
  3. import (
  4. "bufio"
  5. "context"
  6. "encoding/json"
  7. "errors"
  8. "fmt"
  9. "heckel.io/ntfy/util"
  10. "io"
  11. "log"
  12. "net/http"
  13. "strings"
  14. "sync"
  15. "time"
  16. )
  17. // Event type constants
  18. const (
  19. MessageEvent = "message"
  20. KeepaliveEvent = "keepalive"
  21. OpenEvent = "open"
  22. PollRequestEvent = "poll_request"
  23. )
  24. const (
  25. maxResponseBytes = 4096
  26. )
  27. // Client is the ntfy client that can be used to publish and subscribe to ntfy topics
  28. type Client struct {
  29. Messages chan *Message
  30. config *Config
  31. subscriptions map[string]*subscription
  32. mu sync.Mutex
  33. }
  34. // Message is a struct that represents a ntfy message
  35. type Message struct { // TODO combine with server.message
  36. ID string
  37. Event string
  38. Time int64
  39. Topic string
  40. Message string
  41. Title string
  42. Priority int
  43. Tags []string
  44. Click string
  45. Attachment *Attachment
  46. // Additional fields
  47. TopicURL string
  48. SubscriptionID string
  49. Raw string
  50. }
  51. // Attachment represents a message attachment
  52. type Attachment struct {
  53. Name string `json:"name"`
  54. Type string `json:"type,omitempty"`
  55. Size int64 `json:"size,omitempty"`
  56. Expires int64 `json:"expires,omitempty"`
  57. URL string `json:"url"`
  58. Owner string `json:"-"` // IP address of uploader, used for rate limiting
  59. }
  60. type subscription struct {
  61. ID string
  62. topicURL string
  63. cancel context.CancelFunc
  64. }
  65. // New creates a new Client using a given Config
  66. func New(config *Config) *Client {
  67. return &Client{
  68. Messages: make(chan *Message, 50), // Allow reading a few messages
  69. config: config,
  70. subscriptions: make(map[string]*subscription),
  71. }
  72. }
  73. // Publish sends a message to a specific topic, optionally using options.
  74. // See PublishReader for details.
  75. func (c *Client) Publish(topic, message string, options ...PublishOption) (*Message, error) {
  76. return c.PublishReader(topic, strings.NewReader(message), options...)
  77. }
  78. // PublishReader sends a message to a specific topic, optionally using options.
  79. //
  80. // A topic can be either a full URL (e.g. https://myhost.lan/mytopic), a short URL which is then prepended https://
  81. // (e.g. myhost.lan -> https://myhost.lan), or a short name which is expanded using the default host in the
  82. // config (e.g. mytopic -> https://ntfy.sh/mytopic).
  83. //
  84. // To pass title, priority and tags, check out WithTitle, WithPriority, WithTagsList, WithDelay, WithNoCache,
  85. // WithNoFirebase, and the generic WithHeader.
  86. func (c *Client) PublishReader(topic string, body io.Reader, options ...PublishOption) (*Message, error) {
  87. topicURL := c.expandTopicURL(topic)
  88. req, _ := http.NewRequest("POST", topicURL, body)
  89. for _, option := range options {
  90. if err := option(req); err != nil {
  91. return nil, err
  92. }
  93. }
  94. resp, err := http.DefaultClient.Do(req)
  95. if err != nil {
  96. return nil, err
  97. }
  98. defer resp.Body.Close()
  99. b, err := io.ReadAll(io.LimitReader(resp.Body, maxResponseBytes))
  100. if err != nil {
  101. return nil, err
  102. }
  103. if resp.StatusCode != http.StatusOK {
  104. return nil, errors.New(strings.TrimSpace(string(b)))
  105. }
  106. m, err := toMessage(string(b), topicURL, "")
  107. if err != nil {
  108. return nil, err
  109. }
  110. return m, nil
  111. }
  112. // Poll queries a topic for all (or a limited set) of messages. Unlike Subscribe, this method only polls for
  113. // messages and does not subscribe to messages that arrive after this call.
  114. //
  115. // A topic can be either a full URL (e.g. https://myhost.lan/mytopic), a short URL which is then prepended https://
  116. // (e.g. myhost.lan -> https://myhost.lan), or a short name which is expanded using the default host in the
  117. // config (e.g. mytopic -> https://ntfy.sh/mytopic).
  118. //
  119. // By default, all messages will be returned, but you can change this behavior using a SubscribeOption.
  120. // See WithSince, WithSinceAll, WithSinceUnixTime, WithScheduled, and the generic WithQueryParam.
  121. func (c *Client) Poll(topic string, options ...SubscribeOption) ([]*Message, error) {
  122. ctx := context.Background()
  123. messages := make([]*Message, 0)
  124. msgChan := make(chan *Message)
  125. errChan := make(chan error)
  126. topicURL := c.expandTopicURL(topic)
  127. options = append(options, WithPoll())
  128. go func() {
  129. err := performSubscribeRequest(ctx, msgChan, topicURL, "", options...)
  130. close(msgChan)
  131. errChan <- err
  132. }()
  133. for m := range msgChan {
  134. messages = append(messages, m)
  135. }
  136. return messages, <-errChan
  137. }
  138. // Subscribe subscribes to a topic to listen for newly incoming messages. The method starts a connection in the
  139. // background and returns new messages via the Messages channel.
  140. //
  141. // A topic can be either a full URL (e.g. https://myhost.lan/mytopic), a short URL which is then prepended https://
  142. // (e.g. myhost.lan -> https://myhost.lan), or a short name which is expanded using the default host in the
  143. // config (e.g. mytopic -> https://ntfy.sh/mytopic).
  144. //
  145. // By default, only new messages will be returned, but you can change this behavior using a SubscribeOption.
  146. // See WithSince, WithSinceAll, WithSinceUnixTime, WithScheduled, and the generic WithQueryParam.
  147. //
  148. // The method returns a unique subscriptionID that can be used in Unsubscribe.
  149. //
  150. // Example:
  151. // c := client.New(client.NewConfig())
  152. // subscriptionID := c.Subscribe("mytopic")
  153. // for m := range c.Messages {
  154. // fmt.Printf("New message: %s", m.Message)
  155. // }
  156. func (c *Client) Subscribe(topic string, options ...SubscribeOption) string {
  157. c.mu.Lock()
  158. defer c.mu.Unlock()
  159. subscriptionID := util.RandomString(10)
  160. topicURL := c.expandTopicURL(topic)
  161. ctx, cancel := context.WithCancel(context.Background())
  162. c.subscriptions[subscriptionID] = &subscription{
  163. ID: subscriptionID,
  164. topicURL: topicURL,
  165. cancel: cancel,
  166. }
  167. go handleSubscribeConnLoop(ctx, c.Messages, topicURL, subscriptionID, options...)
  168. return subscriptionID
  169. }
  170. // Unsubscribe unsubscribes from a topic that has been previously subscribed to using the unique
  171. // subscriptionID returned in Subscribe.
  172. func (c *Client) Unsubscribe(subscriptionID string) {
  173. c.mu.Lock()
  174. defer c.mu.Unlock()
  175. sub, ok := c.subscriptions[subscriptionID]
  176. if !ok {
  177. return
  178. }
  179. delete(c.subscriptions, subscriptionID)
  180. sub.cancel()
  181. }
  182. // UnsubscribeAll unsubscribes from a topic that has been previously subscribed with Subscribe.
  183. // If there are multiple subscriptions matching the topic, all of them are unsubscribed from.
  184. //
  185. // A topic can be either a full URL (e.g. https://myhost.lan/mytopic), a short URL which is then prepended https://
  186. // (e.g. myhost.lan -> https://myhost.lan), or a short name which is expanded using the default host in the
  187. // config (e.g. mytopic -> https://ntfy.sh/mytopic).
  188. func (c *Client) UnsubscribeAll(topic string) {
  189. c.mu.Lock()
  190. defer c.mu.Unlock()
  191. topicURL := c.expandTopicURL(topic)
  192. for _, sub := range c.subscriptions {
  193. if sub.topicURL == topicURL {
  194. delete(c.subscriptions, sub.ID)
  195. sub.cancel()
  196. }
  197. }
  198. }
  199. func (c *Client) expandTopicURL(topic string) string {
  200. if strings.HasPrefix(topic, "http://") || strings.HasPrefix(topic, "https://") {
  201. return topic
  202. } else if strings.Contains(topic, "/") {
  203. return fmt.Sprintf("https://%s", topic)
  204. }
  205. return fmt.Sprintf("%s/%s", c.config.DefaultHost, topic)
  206. }
  207. func handleSubscribeConnLoop(ctx context.Context, msgChan chan *Message, topicURL, subcriptionID string, options ...SubscribeOption) {
  208. for {
  209. // TODO The retry logic is crude and may lose messages. It should record the last message like the
  210. // Android client, use since=, and do incremental backoff too
  211. if err := performSubscribeRequest(ctx, msgChan, topicURL, subcriptionID, options...); err != nil {
  212. log.Printf("Connection to %s failed: %s", topicURL, err.Error())
  213. }
  214. select {
  215. case <-ctx.Done():
  216. log.Printf("Connection to %s exited", topicURL)
  217. return
  218. case <-time.After(10 * time.Second): // TODO Add incremental backoff
  219. }
  220. }
  221. }
  222. func performSubscribeRequest(ctx context.Context, msgChan chan *Message, topicURL string, subscriptionID string, options ...SubscribeOption) error {
  223. req, err := http.NewRequestWithContext(ctx, http.MethodGet, fmt.Sprintf("%s/json", topicURL), nil)
  224. if err != nil {
  225. return err
  226. }
  227. for _, option := range options {
  228. if err := option(req); err != nil {
  229. return err
  230. }
  231. }
  232. resp, err := http.DefaultClient.Do(req)
  233. if err != nil {
  234. return err
  235. }
  236. defer resp.Body.Close()
  237. if resp.StatusCode != http.StatusOK {
  238. b, err := io.ReadAll(io.LimitReader(resp.Body, maxResponseBytes))
  239. if err != nil {
  240. return err
  241. }
  242. return errors.New(strings.TrimSpace(string(b)))
  243. }
  244. scanner := bufio.NewScanner(resp.Body)
  245. for scanner.Scan() {
  246. m, err := toMessage(scanner.Text(), topicURL, subscriptionID)
  247. if err != nil {
  248. return err
  249. }
  250. if m.Event == MessageEvent {
  251. msgChan <- m
  252. }
  253. }
  254. return nil
  255. }
  256. func toMessage(s, topicURL, subscriptionID string) (*Message, error) {
  257. var m *Message
  258. if err := json.NewDecoder(strings.NewReader(s)).Decode(&m); err != nil {
  259. return nil, err
  260. }
  261. m.TopicURL = topicURL
  262. m.SubscriptionID = subscriptionID
  263. m.Raw = s
  264. return m, nil
  265. }