message_cache_pg.go 6.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179
  1. package server
  2. import (
  3. "database/sql"
  4. _ "github.com/lib/pq" // PostgreSQL driver
  5. "heckel.io/ntfy/v2/util"
  6. "time"
  7. )
  8. // Messages cache
  9. const (
  10. pgCreateMessagesTableQuery = `
  11. BEGIN;
  12. CREATE TABLE IF NOT EXISTS messages (
  13. id SERIAL PRIMARY KEY,
  14. mid TEXT NOT NULL,
  15. time INT NOT NULL,
  16. expires INT NOT NULL,
  17. topic TEXT NOT NULL,
  18. message TEXT NOT NULL,
  19. title TEXT NOT NULL,
  20. priority INT NOT NULL,
  21. tags TEXT NOT NULL,
  22. click TEXT NOT NULL,
  23. icon TEXT NOT NULL,
  24. actions TEXT NOT NULL,
  25. attachment_name TEXT NOT NULL,
  26. attachment_type TEXT NOT NULL,
  27. attachment_size INT NOT NULL,
  28. attachment_expires INT NOT NULL,
  29. attachment_url TEXT NOT NULL,
  30. attachment_deleted BOOLEAN NOT NULL,
  31. sender TEXT NOT NULL,
  32. "user" TEXT NOT NULL,
  33. content_type TEXT NOT NULL,
  34. encoding TEXT NOT NULL,
  35. published BOOLEAN NOT NULL
  36. );
  37. CREATE INDEX IF NOT EXISTS idx_mid ON messages (mid);
  38. CREATE INDEX IF NOT EXISTS idx_time ON messages (time);
  39. CREATE INDEX IF NOT EXISTS idx_topic ON messages (topic);
  40. CREATE INDEX IF NOT EXISTS idx_expires ON messages (expires);
  41. CREATE INDEX IF NOT EXISTS idx_sender ON messages (sender);
  42. CREATE INDEX IF NOT EXISTS idx_user ON messages ("user");
  43. CREATE INDEX IF NOT EXISTS idx_attachment_expires ON messages (attachment_expires);
  44. CREATE TABLE IF NOT EXISTS stats (
  45. key TEXT PRIMARY KEY,
  46. value INT
  47. );
  48. INSERT INTO stats (key, value) VALUES ('messages', 0);
  49. COMMIT;
  50. `
  51. pgSelectMessagesCountQuery = `SELECT COUNT(*) FROM messages`
  52. )
  53. var (
  54. pgMessageCacheQueries = &messageCacheQueries{
  55. insertMessage: `
  56. INSERT INTO messages (mid, time, expires, topic, message, title, priority, tags, click, icon, actions, attachment_name, attachment_type, attachment_size, attachment_expires, attachment_url, attachment_deleted, sender, "user", content_type, encoding, published)
  57. VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20, $21, $22)
  58. `,
  59. deleteMessage: `DELETE FROM messages WHERE mid = $1`,
  60. updateMessagesForTopicExpiry: `UPDATE messages SET expires = $1 WHERE topic = $2`,
  61. selectRowIDFromMessageID: `SELECT id FROM messages WHERE mid = $1`, // Do not include topic, see #336 and TestServer_PollSinceID_MultipleTopics
  62. selectMessagesByID: `
  63. SELECT mid, time, expires, topic, message, title, priority, tags, click, icon, actions, attachment_name, attachment_type, attachment_size, attachment_expires, attachment_url, sender, "user", content_type, encoding
  64. FROM messages
  65. WHERE mid = $1
  66. `,
  67. selectMessagesSinceTime: `
  68. SELECT mid, time, expires, topic, message, title, priority, tags, click, icon, actions, attachment_name, attachment_type, attachment_size, attachment_expires, attachment_url, sender, "user", content_type, encoding
  69. FROM messages
  70. WHERE topic = $1 AND time >= $2 AND published = TRUE
  71. ORDER BY time, id
  72. `,
  73. selectMessagesSinceTimeIncludeScheduled: `
  74. SELECT mid, time, expires, topic, message, title, priority, tags, click, icon, actions, attachment_name, attachment_type, attachment_size, attachment_expires, attachment_url, sender, "user", content_type, encoding
  75. FROM messages
  76. WHERE topic = $1 AND time >= $2
  77. ORDER BY time, id
  78. `,
  79. selectMessagesSinceID: `
  80. SELECT mid, time, expires, topic, message, title, priority, tags, click, icon, actions, attachment_name, attachment_type, attachment_size, attachment_expires, attachment_url, sender, "user", content_type, encoding
  81. FROM messages
  82. WHERE topic = $1 AND id > $2 AND published = TRUE
  83. ORDER BY time, id
  84. `,
  85. selectMessagesSinceIDIncludeScheduled: `
  86. SELECT mid, time, expires, topic, message, title, priority, tags, click, icon, actions, attachment_name, attachment_type, attachment_size, attachment_expires, attachment_url, sender, "user", content_type, encoding
  87. FROM messages
  88. WHERE topic = $1 AND (id > $2 OR published = FALSE)
  89. ORDER BY time, id
  90. `,
  91. selectMessagesDue: `
  92. SELECT mid, time, expires, topic, message, title, priority, tags, click, icon, actions, attachment_name, attachment_type, attachment_size, attachment_expires, attachment_url, sender, "user", content_type, encoding
  93. FROM messages
  94. WHERE time <= $1 AND published = FALSE
  95. ORDER BY time, id
  96. `,
  97. selectMessagesExpired: `SELECT mid FROM messages WHERE expires <= $1 AND published = TRUE`,
  98. updateMessagePublished: `UPDATE messages SET published = TRUE WHERE mid = $1`,
  99. selectMessageCountPerTopic: `SELECT topic, COUNT(*) FROM messages GROUP BY topic`,
  100. selectTopics: `SELECT topic FROM messages GROUP BY topic`,
  101. updateAttachmentDeleted: `UPDATE messages SET attachment_deleted = TRUE WHERE mid = $1`,
  102. selectAttachmentsExpired: `SELECT mid FROM messages WHERE attachment_expires > 0 AND attachment_expires <= $1 AND attachment_deleted = FALSE`,
  103. selectAttachmentsSizeBySender: `SELECT COALESCE(SUM(attachment_size), 0) FROM messages WHERE "user" = '' AND sender = $1 AND attachment_expires >= $2`,
  104. selectAttachmentsSizeByUserID: `SELECT COALESCE(SUM(attachment_size), 0) FROM messages WHERE "user" = $1 AND attachment_expires >= $2`,
  105. selectStats: `SELECT value FROM stats WHERE key = 'messages'`,
  106. updateStats: `UPDATE stats SET value = $1 WHERE key = 'messages'`,
  107. }
  108. )
  109. type pgMessageCache struct {
  110. *commonMessageCache
  111. }
  112. var _ MessageCache = (*pgMessageCache)(nil)
  113. func newPgMessageCache(connectionString, startupQueries string, batchSize int, batchTimeout time.Duration) (*pgMessageCache, error) {
  114. db, err := sql.Open("postgres", connectionString)
  115. if err != nil {
  116. return nil, err
  117. }
  118. if err := setupPgMessagesDB(db, startupQueries); err != nil {
  119. return nil, err
  120. }
  121. var queue *util.BatchingQueue[*message]
  122. if batchSize > 0 || batchTimeout > 0 {
  123. queue = util.NewBatchingQueue[*message](batchSize, batchTimeout)
  124. }
  125. cache := &pgMessageCache{
  126. commonMessageCache: &commonMessageCache{
  127. db: db,
  128. queue: queue,
  129. queries: pgMessageCacheQueries,
  130. },
  131. }
  132. go cache.processMessageBatches()
  133. return cache, nil
  134. }
  135. func setupPgMessagesDB(db *sql.DB, startupQueries string) error {
  136. // Run startup queries
  137. if startupQueries != "" {
  138. if _, err := db.Exec(startupQueries); err != nil {
  139. return err
  140. }
  141. }
  142. // If 'messages' table does not exist, this must be a new database
  143. rowsMC, err := db.Query(pgSelectMessagesCountQuery)
  144. if err != nil {
  145. return setupNewPgCacheDB(db)
  146. }
  147. rowsMC.Close()
  148. return nil
  149. // FIXME schema migration
  150. }
  151. func setupNewPgCacheDB(db *sql.DB) error {
  152. if _, err := db.Exec(pgCreateMessagesTableQuery); err != nil {
  153. return err
  154. }
  155. /*
  156. // FIXME
  157. if _, err := db.Exec(pgCreateSchemaVersionTableQuery); err != nil {
  158. return err
  159. }
  160. if _, err := db.Exec(insertSchemaVersion, currentSchemaVersion); err != nil {
  161. return err
  162. }
  163. */
  164. return nil
  165. }