message_cache.go 13 KB


  1. package server
  2. import (
  3. "database/sql"
  4. "encoding/json"
  5. "errors"
  6. "heckel.io/ntfy/v2/log"
  7. "heckel.io/ntfy/v2/util"
  8. "net/netip"
  9. "strings"
  10. "time"
  11. )
  12. type MessageCache interface {
  13. AddMessage(m *message) error
  14. AddMessages(ms []*message) error
  15. Messages(topic string, since sinceMarker, scheduled bool) ([]*message, error)
  16. MessagesDue() ([]*message, error)
  17. MessagesExpired() ([]string, error)
  18. Message(id string) (*message, error)
  19. MarkPublished(m *message) error
  20. MessageCounts() (map[string]int, error)
  21. Topics() (map[string]*topic, error)
  22. DeleteMessages(ids ...string) error
  23. ExpireMessages(topics ...string) error
  24. AttachmentsExpired() ([]string, error)
  25. MarkAttachmentsDeleted(ids ...string) error
  26. AttachmentBytesUsedBySender(sender string) (int64, error)
  27. AttachmentBytesUsedByUser(userID string) (int64, error)
  28. UpdateStats(messages int64) error
  29. Stats() (messages int64, err error)
  30. DB() *sql.DB
  31. Close() error
  32. }
  33. type commonMessageCache struct {
  34. db *sql.DB
  35. queue *util.BatchingQueue[*message]
  36. queries *messageCacheQueries
  37. }
  38. var _ MessageCache = (*commonMessageCache)(nil)
  39. type messageCacheQueries struct {
  40. insertMessage string
  41. deleteMessage string
  42. updateMessagesForTopicExpiry string
  43. selectRowIDFromMessageID string // Do not include topic, see #336 and TestServer_PollSinceID_MultipleTopics
  44. selectMessagesByID string
  45. selectMessagesSinceTime string
  46. selectMessagesSinceTimeIncludeScheduled string
  47. selectMessagesSinceID string
  48. selectMessagesSinceIDIncludeScheduled string
  49. selectMessagesDue string
  50. selectMessagesExpired string
  51. updateMessagePublished string
  52. selectMessageCountPerTopic string
  53. selectTopics string
  54. updateAttachmentDeleted string
  55. selectAttachmentsExpired string
  56. selectAttachmentsSizeBySender string
  57. selectAttachmentsSizeByUserID string
  58. selectStats string
  59. updateStats string
  60. }
  61. // AddMessage stores a message to the message cache synchronously, or queues it to be stored at a later date asyncronously.
  62. // The message is queued only if "batchSize" or "batchTimeout" are passed to the constructor.
  63. func (c *commonMessageCache) AddMessage(m *message) error {
  64. if c.queue != nil {
  65. c.queue.Enqueue(m)
  66. return nil
  67. }
  68. return c.AddMessages([]*message{m})
  69. }
  70. // AddMessages synchronously stores a match of messages. If the database is locked, the transaction waits until
  71. // SQLite's busy_timeout is exceeded before erroring out.
  72. func (c *commonMessageCache) AddMessages(ms []*message) error {
  73. if len(ms) == 0 {
  74. return nil
  75. }
  76. start := time.Now()
  77. tx, err := c.db.Begin()
  78. if err != nil {
  79. return err
  80. }
  81. defer tx.Rollback()
  82. stmt, err := tx.Prepare(c.queries.insertMessage)
  83. if err != nil {
  84. return err
  85. }
  86. defer stmt.Close()
  87. for _, m := range ms {
  88. if m.Event != messageEvent {
  89. return errUnexpectedMessageType
  90. }
  91. published := m.Time <= time.Now().Unix()
  92. tags := strings.Join(m.Tags, ",")
  93. var attachmentName, attachmentType, attachmentURL string
  94. var attachmentSize, attachmentExpires int64
  95. var attachmentDeleted bool
  96. if m.Attachment != nil {
  97. attachmentName = m.Attachment.Name
  98. attachmentType = m.Attachment.Type
  99. attachmentSize = m.Attachment.Size
  100. attachmentExpires = m.Attachment.Expires
  101. attachmentURL = m.Attachment.URL
  102. }
  103. var actionsStr string
  104. if len(m.Actions) > 0 {
  105. actionsBytes, err := json.Marshal(m.Actions)
  106. if err != nil {
  107. return err
  108. }
  109. actionsStr = string(actionsBytes)
  110. }
  111. var sender string
  112. if m.Sender.IsValid() {
  113. sender = m.Sender.String()
  114. }
  115. _, err := stmt.Exec(
  116. m.ID,
  117. m.Time,
  118. m.Expires,
  119. m.Topic,
  120. m.Message,
  121. m.Title,
  122. m.Priority,
  123. tags,
  124. m.Click,
  125. m.Icon,
  126. actionsStr,
  127. attachmentName,
  128. attachmentType,
  129. attachmentSize,
  130. attachmentExpires,
  131. attachmentURL,
  132. attachmentDeleted, // Always false
  133. sender,
  134. m.User,
  135. m.ContentType,
  136. m.Encoding,
  137. published,
  138. )
  139. if err != nil {
  140. return err
  141. }
  142. }
  143. if err := tx.Commit(); err != nil {
  144. log.Tag(tagMessageCache).Err(err).Error("Writing %d message(s) failed (took %v)", len(ms), time.Since(start))
  145. return err
  146. }
  147. log.Tag(tagMessageCache).Debug("Wrote %d message(s) in %v", len(ms), time.Since(start))
  148. return nil
  149. }
  150. func (c *commonMessageCache) Messages(topic string, since sinceMarker, scheduled bool) ([]*message, error) {
  151. if since.IsNone() {
  152. return make([]*message, 0), nil
  153. } else if since.IsID() {
  154. return c.messagesSinceID(topic, since, scheduled)
  155. }
  156. return c.messagesSinceTime(topic, since, scheduled)
  157. }
  158. func (c *commonMessageCache) messagesSinceTime(topic string, since sinceMarker, scheduled bool) ([]*message, error) {
  159. var rows *sql.Rows
  160. var err error
  161. if scheduled {
  162. rows, err = c.db.Query(c.queries.selectMessagesSinceTimeIncludeScheduled, topic, since.Time().Unix())
  163. } else {
  164. rows, err = c.db.Query(c.queries.selectMessagesSinceTime, topic, since.Time().Unix())
  165. }
  166. if err != nil {
  167. return nil, err
  168. }
  169. return readMessages(rows)
  170. }
  171. func (c *commonMessageCache) messagesSinceID(topic string, since sinceMarker, scheduled bool) ([]*message, error) {
  172. idrows, err := c.db.Query(c.queries.selectRowIDFromMessageID, since.ID())
  173. if err != nil {
  174. return nil, err
  175. }
  176. defer idrows.Close()
  177. if !idrows.Next() {
  178. return c.messagesSinceTime(topic, sinceAllMessages, scheduled)
  179. }
  180. var rowID int64
  181. if err := idrows.Scan(&rowID); err != nil {
  182. return nil, err
  183. }
  184. idrows.Close()
  185. var rows *sql.Rows
  186. if scheduled {
  187. rows, err = c.db.Query(c.queries.selectMessagesSinceIDIncludeScheduled, topic, rowID)
  188. } else {
  189. rows, err = c.db.Query(c.queries.selectMessagesSinceID, topic, rowID)
  190. }
  191. if err != nil {
  192. return nil, err
  193. }
  194. return readMessages(rows)
  195. }
  196. func (c *commonMessageCache) MessagesDue() ([]*message, error) {
  197. rows, err := c.db.Query(c.queries.selectMessagesDue, time.Now().Unix())
  198. if err != nil {
  199. return nil, err
  200. }
  201. return readMessages(rows)
  202. }
  203. // MessagesExpired returns a list of IDs for messages that have expires (should be deleted)
  204. func (c *commonMessageCache) MessagesExpired() ([]string, error) {
  205. rows, err := c.db.Query(c.queries.selectMessagesExpired, time.Now().Unix())
  206. if err != nil {
  207. return nil, err
  208. }
  209. defer rows.Close()
  210. ids := make([]string, 0)
  211. for rows.Next() {
  212. var id string
  213. if err := rows.Scan(&id); err != nil {
  214. return nil, err
  215. }
  216. ids = append(ids, id)
  217. }
  218. if err := rows.Err(); err != nil {
  219. return nil, err
  220. }
  221. return ids, nil
  222. }
  223. func (c *commonMessageCache) Message(id string) (*message, error) {
  224. rows, err := c.db.Query(c.queries.selectMessagesByID, id)
  225. if err != nil {
  226. return nil, err
  227. } else if !rows.Next() {
  228. return nil, errMessageNotFound
  229. }
  230. defer rows.Close()
  231. return readMessage(rows)
  232. }
  233. func (c *commonMessageCache) MarkPublished(m *message) error {
  234. _, err := c.db.Exec(c.queries.updateMessagePublished, m.ID)
  235. return err
  236. }
  237. func (c *commonMessageCache) MessageCounts() (map[string]int, error) {
  238. rows, err := c.db.Query(c.queries.selectMessageCountPerTopic)
  239. if err != nil {
  240. return nil, err
  241. }
  242. defer rows.Close()
  243. var topic string
  244. var count int
  245. counts := make(map[string]int)
  246. for rows.Next() {
  247. if err := rows.Scan(&topic, &count); err != nil {
  248. return nil, err
  249. } else if err := rows.Err(); err != nil {
  250. return nil, err
  251. }
  252. counts[topic] = count
  253. }
  254. return counts, nil
  255. }
  256. func (c *commonMessageCache) Topics() (map[string]*topic, error) {
  257. rows, err := c.db.Query(c.queries.selectTopics)
  258. if err != nil {
  259. return nil, err
  260. }
  261. defer rows.Close()
  262. topics := make(map[string]*topic)
  263. for rows.Next() {
  264. var id string
  265. if err := rows.Scan(&id); err != nil {
  266. return nil, err
  267. }
  268. topics[id] = newTopic(id)
  269. }
  270. if err := rows.Err(); err != nil {
  271. return nil, err
  272. }
  273. return topics, nil
  274. }
  275. func (c *commonMessageCache) DeleteMessages(ids ...string) error {
  276. tx, err := c.db.Begin()
  277. if err != nil {
  278. return err
  279. }
  280. defer tx.Rollback()
  281. for _, id := range ids {
  282. if _, err := tx.Exec(c.queries.deleteMessage, id); err != nil {
  283. return err
  284. }
  285. }
  286. return tx.Commit()
  287. }
  288. func (c *commonMessageCache) ExpireMessages(topics ...string) error {
  289. tx, err := c.db.Begin()
  290. if err != nil {
  291. return err
  292. }
  293. defer tx.Rollback()
  294. for _, t := range topics {
  295. if _, err := tx.Exec(c.queries.updateMessagesForTopicExpiry, time.Now().Unix()-1, t); err != nil {
  296. return err
  297. }
  298. }
  299. return tx.Commit()
  300. }
  301. func (c *commonMessageCache) AttachmentsExpired() ([]string, error) {
  302. rows, err := c.db.Query(c.queries.selectAttachmentsExpired, time.Now().Unix())
  303. if err != nil {
  304. return nil, err
  305. }
  306. defer rows.Close()
  307. ids := make([]string, 0)
  308. for rows.Next() {
  309. var id string
  310. if err := rows.Scan(&id); err != nil {
  311. return nil, err
  312. }
  313. ids = append(ids, id)
  314. }
  315. if err := rows.Err(); err != nil {
  316. return nil, err
  317. }
  318. return ids, nil
  319. }
  320. func (c *commonMessageCache) MarkAttachmentsDeleted(ids ...string) error {
  321. tx, err := c.db.Begin()
  322. if err != nil {
  323. return err
  324. }
  325. defer tx.Rollback()
  326. for _, id := range ids {
  327. if _, err := tx.Exec(c.queries.updateAttachmentDeleted, id); err != nil {
  328. return err
  329. }
  330. }
  331. return tx.Commit()
  332. }
  333. func (c *commonMessageCache) AttachmentBytesUsedBySender(sender string) (int64, error) {
  334. rows, err := c.db.Query(c.queries.selectAttachmentsSizeBySender, sender, time.Now().Unix())
  335. if err != nil {
  336. return 0, err
  337. }
  338. return c.readAttachmentBytesUsed(rows)
  339. }
  340. func (c *commonMessageCache) AttachmentBytesUsedByUser(userID string) (int64, error) {
  341. rows, err := c.db.Query(c.queries.selectAttachmentsSizeByUserID, userID, time.Now().Unix())
  342. if err != nil {
  343. return 0, err
  344. }
  345. return c.readAttachmentBytesUsed(rows)
  346. }
  347. func (c *commonMessageCache) readAttachmentBytesUsed(rows *sql.Rows) (int64, error) {
  348. defer rows.Close()
  349. var size int64
  350. if !rows.Next() {
  351. return 0, errors.New("no rows found")
  352. }
  353. if err := rows.Scan(&size); err != nil {
  354. return 0, err
  355. } else if err := rows.Err(); err != nil {
  356. return 0, err
  357. }
  358. return size, nil
  359. }
  360. func (c *commonMessageCache) processMessageBatches() {
  361. if c.queue == nil {
  362. return
  363. }
  364. for messages := range c.queue.Dequeue() {
  365. if err := c.AddMessages(messages); err != nil {
  366. log.Tag(tagMessageCache).Err(err).Error("Cannot write message batch")
  367. }
  368. }
  369. }
  370. func (c *commonMessageCache) UpdateStats(messages int64) error {
  371. _, err := c.db.Exec(c.queries.updateStats, messages)
  372. return err
  373. }
  374. func (c *commonMessageCache) Stats() (messages int64, err error) {
  375. rows, err := c.db.Query(c.queries.selectStats)
  376. if err != nil {
  377. return 0, err
  378. }
  379. defer rows.Close()
  380. if !rows.Next() {
  381. return 0, errNoRows
  382. }
  383. if err := rows.Scan(&messages); err != nil {
  384. return 0, err
  385. }
  386. return messages, nil
  387. }
  388. func (c *commonMessageCache) DB() *sql.DB {
  389. return c.db
  390. }
  391. func (c *commonMessageCache) Close() error {
  392. return c.db.Close()
  393. }
  394. func readMessages(rows *sql.Rows) ([]*message, error) {
  395. defer rows.Close()
  396. messages := make([]*message, 0)
  397. for rows.Next() {
  398. m, err := readMessage(rows)
  399. if err != nil {
  400. return nil, err
  401. }
  402. messages = append(messages, m)
  403. }
  404. if err := rows.Err(); err != nil {
  405. return nil, err
  406. }
  407. return messages, nil
  408. }
  409. func readMessage(rows *sql.Rows) (*message, error) {
  410. var timestamp, expires, attachmentSize, attachmentExpires int64
  411. var priority int
  412. var id, topic, msg, title, tagsStr, click, icon, actionsStr, attachmentName, attachmentType, attachmentURL, sender, user, contentType, encoding string
  413. err := rows.Scan(
  414. &id,
  415. &timestamp,
  416. &expires,
  417. &topic,
  418. &msg,
  419. &title,
  420. &priority,
  421. &tagsStr,
  422. &click,
  423. &icon,
  424. &actionsStr,
  425. &attachmentName,
  426. &attachmentType,
  427. &attachmentSize,
  428. &attachmentExpires,
  429. &attachmentURL,
  430. &sender,
  431. &user,
  432. &contentType,
  433. &encoding,
  434. )
  435. if err != nil {
  436. return nil, err
  437. }
  438. var tags []string
  439. if tagsStr != "" {
  440. tags = strings.Split(tagsStr, ",")
  441. }
  442. var actions []*action
  443. if actionsStr != "" {
  444. if err := json.Unmarshal([]byte(actionsStr), &actions); err != nil {
  445. return nil, err
  446. }
  447. }
  448. senderIP, err := netip.ParseAddr(sender)
  449. if err != nil {
  450. senderIP = netip.Addr{} // if no IP stored in database, return invalid address
  451. }
  452. var att *attachment
  453. if attachmentName != "" && attachmentURL != "" {
  454. att = &attachment{
  455. Name: attachmentName,
  456. Type: attachmentType,
  457. Size: attachmentSize,
  458. Expires: attachmentExpires,
  459. URL: attachmentURL,
  460. }
  461. }
  462. return &message{
  463. ID: id,
  464. Time: timestamp,
  465. Expires: expires,
  466. Event: messageEvent,
  467. Topic: topic,
  468. Message: msg,
  469. Title: title,
  470. Priority: priority,
  471. Tags: tags,
  472. Click: click,
  473. Icon: icon,
  474. Actions: actions,
  475. Attachment: att,
  476. Sender: senderIP, // Must parse assuming database must be correct
  477. User: user,
  478. ContentType: contentType,
  479. Encoding: encoding,
  480. }, nil
  481. }