message_cache.go 24 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860
  1. package server
  2. import (
  3. "database/sql"
  4. "encoding/json"
  5. "errors"
  6. "fmt"
  7. "net/netip"
  8. "strings"
  9. "time"
  10. _ "github.com/mattn/go-sqlite3" // SQLite driver
  11. "heckel.io/ntfy/log"
  12. "heckel.io/ntfy/util"
  13. )
  14. var (
  15. errUnexpectedMessageType = errors.New("unexpected message type")
  16. )
  17. // Messages cache
  18. const (
  19. createMessagesTableQuery = `
  20. BEGIN;
  21. CREATE TABLE IF NOT EXISTS messages (
  22. id INTEGER PRIMARY KEY AUTOINCREMENT,
  23. mid TEXT NOT NULL,
  24. time INT NOT NULL,
  25. expires INT NOT NULL,
  26. topic TEXT NOT NULL,
  27. message TEXT NOT NULL,
  28. title TEXT NOT NULL,
  29. priority INT NOT NULL,
  30. tags TEXT NOT NULL,
  31. click TEXT NOT NULL,
  32. icon TEXT NOT NULL,
  33. actions TEXT NOT NULL,
  34. attachment_name TEXT NOT NULL,
  35. attachment_type TEXT NOT NULL,
  36. attachment_size INT NOT NULL,
  37. attachment_expires INT NOT NULL,
  38. attachment_url TEXT NOT NULL,
  39. attachment_deleted INT NOT NULL,
  40. sender TEXT NOT NULL,
  41. user TEXT NOT NULL,
  42. encoding TEXT NOT NULL,
  43. published INT NOT NULL
  44. );
  45. CREATE INDEX IF NOT EXISTS idx_mid ON messages (mid);
  46. CREATE INDEX IF NOT EXISTS idx_time ON messages (time);
  47. CREATE INDEX IF NOT EXISTS idx_topic ON messages (topic);
  48. CREATE INDEX IF NOT EXISTS idx_expires ON messages (expires);
  49. CREATE INDEX IF NOT EXISTS idx_attachment_expires ON messages (attachment_expires);
  50. COMMIT;
  51. `
  52. insertMessageQuery = `
  53. INSERT INTO messages (mid, time, expires, topic, message, title, priority, tags, click, icon, actions, attachment_name, attachment_type, attachment_size, attachment_expires, attachment_url, attachment_deleted, sender, user, encoding, published)
  54. VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
  55. `
  56. deleteMessageQuery = `DELETE FROM messages WHERE mid = ?`
  57. updateMessagesForTopicExpiryQuery = `UPDATE messages SET expires = ? WHERE topic = ?`
  58. selectRowIDFromMessageID = `SELECT id FROM messages WHERE mid = ?` // Do not include topic, see #336 and TestServer_PollSinceID_MultipleTopics
  59. selectMessagesSinceTimeQuery = `
  60. SELECT mid, time, expires, topic, message, title, priority, tags, click, icon, actions, attachment_name, attachment_type, attachment_size, attachment_expires, attachment_url, sender, user, encoding
  61. FROM messages
  62. WHERE topic = ? AND time >= ? AND published = 1
  63. ORDER BY time, id
  64. `
  65. selectMessagesSinceTimeIncludeScheduledQuery = `
  66. SELECT mid, time, expires, topic, message, title, priority, tags, click, icon, actions, attachment_name, attachment_type, attachment_size, attachment_expires, attachment_url, sender, user, encoding
  67. FROM messages
  68. WHERE topic = ? AND time >= ?
  69. ORDER BY time, id
  70. `
  71. selectMessagesSinceIDQuery = `
  72. SELECT mid, time, expires, topic, message, title, priority, tags, click, icon, actions, attachment_name, attachment_type, attachment_size, attachment_expires, attachment_url, sender, user, encoding
  73. FROM messages
  74. WHERE topic = ? AND id > ? AND published = 1
  75. ORDER BY time, id
  76. `
  77. selectMessagesSinceIDIncludeScheduledQuery = `
  78. SELECT mid, time, expires, topic, message, title, priority, tags, click, icon, actions, attachment_name, attachment_type, attachment_size, attachment_expires, attachment_url, sender, user, encoding
  79. FROM messages
  80. WHERE topic = ? AND (id > ? OR published = 0)
  81. ORDER BY time, id
  82. `
  83. selectMessagesDueQuery = `
  84. SELECT mid, time, expires, topic, message, title, priority, tags, click, icon, actions, attachment_name, attachment_type, attachment_size, attachment_expires, attachment_url, sender, user, encoding
  85. FROM messages
  86. WHERE time <= ? AND published = 0
  87. ORDER BY time, id
  88. `
  89. selectMessagesExpiredQuery = `SELECT mid FROM messages WHERE expires <= ? AND published = 1`
  90. updateMessagePublishedQuery = `UPDATE messages SET published = 1 WHERE mid = ?`
  91. selectMessagesCountQuery = `SELECT COUNT(*) FROM messages`
  92. selectMessageCountPerTopicQuery = `SELECT topic, COUNT(*) FROM messages GROUP BY topic`
  93. selectTopicsQuery = `SELECT topic FROM messages GROUP BY topic`
  94. updateAttachmentDeleted = `UPDATE messages SET attachment_deleted = 1 WHERE mid = ?`
  95. selectAttachmentsExpiredQuery = `SELECT mid FROM messages WHERE attachment_expires > 0 AND attachment_expires <= ? AND attachment_deleted = 0`
  96. selectAttachmentsSizeBySenderQuery = `SELECT IFNULL(SUM(attachment_size), 0) FROM messages WHERE sender = ? AND attachment_expires >= ?`
  97. selectAttachmentsSizeByUserQuery = `SELECT IFNULL(SUM(attachment_size), 0) FROM messages WHERE user = ? AND attachment_expires >= ?`
  98. )
  99. // Schema management queries
  100. const (
  101. currentSchemaVersion = 10
  102. createSchemaVersionTableQuery = `
  103. CREATE TABLE IF NOT EXISTS schemaVersion (
  104. id INT PRIMARY KEY,
  105. version INT NOT NULL
  106. );
  107. `
  108. insertSchemaVersion = `INSERT INTO schemaVersion VALUES (1, ?)`
  109. updateSchemaVersion = `UPDATE schemaVersion SET version = ? WHERE id = 1`
  110. selectSchemaVersionQuery = `SELECT version FROM schemaVersion WHERE id = 1`
  111. // 0 -> 1
  112. migrate0To1AlterMessagesTableQuery = `
  113. BEGIN;
  114. ALTER TABLE messages ADD COLUMN title TEXT NOT NULL DEFAULT('');
  115. ALTER TABLE messages ADD COLUMN priority INT NOT NULL DEFAULT(0);
  116. ALTER TABLE messages ADD COLUMN tags TEXT NOT NULL DEFAULT('');
  117. COMMIT;
  118. `
  119. // 1 -> 2
  120. migrate1To2AlterMessagesTableQuery = `
  121. ALTER TABLE messages ADD COLUMN published INT NOT NULL DEFAULT(1);
  122. `
  123. // 2 -> 3
  124. migrate2To3AlterMessagesTableQuery = `
  125. BEGIN;
  126. ALTER TABLE messages ADD COLUMN click TEXT NOT NULL DEFAULT('');
  127. ALTER TABLE messages ADD COLUMN attachment_name TEXT NOT NULL DEFAULT('');
  128. ALTER TABLE messages ADD COLUMN attachment_type TEXT NOT NULL DEFAULT('');
  129. ALTER TABLE messages ADD COLUMN attachment_size INT NOT NULL DEFAULT('0');
  130. ALTER TABLE messages ADD COLUMN attachment_expires INT NOT NULL DEFAULT('0');
  131. ALTER TABLE messages ADD COLUMN attachment_owner TEXT NOT NULL DEFAULT('');
  132. ALTER TABLE messages ADD COLUMN attachment_url TEXT NOT NULL DEFAULT('');
  133. COMMIT;
  134. `
  135. // 3 -> 4
  136. migrate3To4AlterMessagesTableQuery = `
  137. ALTER TABLE messages ADD COLUMN encoding TEXT NOT NULL DEFAULT('');
  138. `
  139. // 4 -> 5
  140. migrate4To5AlterMessagesTableQuery = `
  141. BEGIN;
  142. CREATE TABLE IF NOT EXISTS messages_new (
  143. id INTEGER PRIMARY KEY AUTOINCREMENT,
  144. mid TEXT NOT NULL,
  145. time INT NOT NULL,
  146. topic TEXT NOT NULL,
  147. message TEXT NOT NULL,
  148. title TEXT NOT NULL,
  149. priority INT NOT NULL,
  150. tags TEXT NOT NULL,
  151. click TEXT NOT NULL,
  152. attachment_name TEXT NOT NULL,
  153. attachment_type TEXT NOT NULL,
  154. attachment_size INT NOT NULL,
  155. attachment_expires INT NOT NULL,
  156. attachment_url TEXT NOT NULL,
  157. attachment_owner TEXT NOT NULL,
  158. encoding TEXT NOT NULL,
  159. published INT NOT NULL
  160. );
  161. CREATE INDEX IF NOT EXISTS idx_mid ON messages_new (mid);
  162. CREATE INDEX IF NOT EXISTS idx_topic ON messages_new (topic);
  163. INSERT
  164. INTO messages_new (
  165. mid, time, topic, message, title, priority, tags, click, attachment_name, attachment_type,
  166. attachment_size, attachment_expires, attachment_url, attachment_owner, encoding, published)
  167. SELECT
  168. id, time, topic, message, title, priority, tags, click, attachment_name, attachment_type,
  169. attachment_size, attachment_expires, attachment_url, attachment_owner, encoding, published
  170. FROM messages;
  171. DROP TABLE messages;
  172. ALTER TABLE messages_new RENAME TO messages;
  173. COMMIT;
  174. `
  175. // 5 -> 6
  176. migrate5To6AlterMessagesTableQuery = `
  177. ALTER TABLE messages ADD COLUMN actions TEXT NOT NULL DEFAULT('');
  178. `
  179. // 6 -> 7
  180. migrate6To7AlterMessagesTableQuery = `
  181. ALTER TABLE messages RENAME COLUMN attachment_owner TO sender;
  182. `
  183. // 7 -> 8
  184. migrate7To8AlterMessagesTableQuery = `
  185. ALTER TABLE messages ADD COLUMN icon TEXT NOT NULL DEFAULT('');
  186. `
  187. // 8 -> 9
  188. migrate8To9AlterMessagesTableQuery = `
  189. CREATE INDEX IF NOT EXISTS idx_time ON messages (time);
  190. `
  191. // 9 -> 10
  192. migrate9To10AlterMessagesTableQuery = `
  193. ALTER TABLE messages ADD COLUMN user TEXT NOT NULL DEFAULT('');
  194. ALTER TABLE messages ADD COLUMN attachment_deleted INT NOT NULL DEFAULT('0');
  195. ALTER TABLE messages ADD COLUMN expires INT NOT NULL DEFAULT('0');
  196. CREATE INDEX IF NOT EXISTS idx_expires ON messages (expires);
  197. CREATE INDEX IF NOT EXISTS idx_attachment_expires ON messages (attachment_expires);
  198. `
  199. migrate9To10UpdateMessageExpiryQuery = `UPDATE messages SET expires = time + ?`
  200. )
  201. var (
  202. migrations = map[int]func(db *sql.DB, cacheDuration time.Duration) error{
  203. 0: migrateFrom0,
  204. 1: migrateFrom1,
  205. 2: migrateFrom2,
  206. 3: migrateFrom3,
  207. 4: migrateFrom4,
  208. 5: migrateFrom5,
  209. 6: migrateFrom6,
  210. 7: migrateFrom7,
  211. 8: migrateFrom8,
  212. 9: migrateFrom9,
  213. }
  214. )
  215. type messageCache struct {
  216. db *sql.DB
  217. queue *util.BatchingQueue[*message]
  218. nop bool
  219. }
  220. // newSqliteCache creates a SQLite file-backed cache
  221. func newSqliteCache(filename, startupQueries string, cacheDuration time.Duration, batchSize int, batchTimeout time.Duration, nop bool) (*messageCache, error) {
  222. db, err := sql.Open("sqlite3", filename)
  223. if err != nil {
  224. return nil, err
  225. }
  226. if err := setupDB(db, startupQueries, cacheDuration); err != nil {
  227. return nil, err
  228. }
  229. var queue *util.BatchingQueue[*message]
  230. if batchSize > 0 || batchTimeout > 0 {
  231. queue = util.NewBatchingQueue[*message](batchSize, batchTimeout)
  232. }
  233. cache := &messageCache{
  234. db: db,
  235. queue: queue,
  236. nop: nop,
  237. }
  238. go cache.processMessageBatches()
  239. return cache, nil
  240. }
  241. // newMemCache creates an in-memory cache
  242. func newMemCache() (*messageCache, error) {
  243. return newSqliteCache(createMemoryFilename(), "", 0, 0, 0, false)
  244. }
  245. // newNopCache creates an in-memory cache that discards all messages;
  246. // it is always empty and can be used if caching is entirely disabled
  247. func newNopCache() (*messageCache, error) {
  248. return newSqliteCache(createMemoryFilename(), "", 0, 0, 0, true)
  249. }
  250. // createMemoryFilename creates a unique memory filename to use for the SQLite backend.
  251. // From mattn/go-sqlite3: "Each connection to ":memory:" opens a brand new in-memory
  252. // sql database, so if the stdlib's sql engine happens to open another connection and
  253. // you've only specified ":memory:", that connection will see a brand new database.
  254. // A workaround is to use "file::memory:?cache=shared" (or "file:foobar?mode=memory&cache=shared").
  255. // Every connection to this string will point to the same in-memory database."
  256. func createMemoryFilename() string {
  257. return fmt.Sprintf("file:%s?mode=memory&cache=shared", util.RandomString(10))
  258. }
  259. // AddMessage stores a message to the message cache synchronously, or queues it to be stored at a later date asyncronously.
  260. // The message is queued only if "batchSize" or "batchTimeout" are passed to the constructor.
  261. func (c *messageCache) AddMessage(m *message) error {
  262. if c.queue != nil {
  263. c.queue.Enqueue(m)
  264. return nil
  265. }
  266. return c.addMessages([]*message{m})
  267. }
  268. // addMessages synchronously stores a match of messages. If the database is locked, the transaction waits until
  269. // SQLite's busy_timeout is exceeded before erroring out.
  270. func (c *messageCache) addMessages(ms []*message) error {
  271. if c.nop {
  272. return nil
  273. }
  274. if len(ms) == 0 {
  275. return nil
  276. }
  277. start := time.Now()
  278. tx, err := c.db.Begin()
  279. if err != nil {
  280. return err
  281. }
  282. defer tx.Rollback()
  283. stmt, err := tx.Prepare(insertMessageQuery)
  284. if err != nil {
  285. return err
  286. }
  287. defer stmt.Close()
  288. for _, m := range ms {
  289. if m.Event != messageEvent {
  290. return errUnexpectedMessageType
  291. }
  292. published := m.Time <= time.Now().Unix()
  293. tags := strings.Join(m.Tags, ",")
  294. var attachmentName, attachmentType, attachmentURL string
  295. var attachmentSize, attachmentExpires, attachmentDeleted int64
  296. if m.Attachment != nil {
  297. attachmentName = m.Attachment.Name
  298. attachmentType = m.Attachment.Type
  299. attachmentSize = m.Attachment.Size
  300. attachmentExpires = m.Attachment.Expires
  301. attachmentURL = m.Attachment.URL
  302. }
  303. var actionsStr string
  304. if len(m.Actions) > 0 {
  305. actionsBytes, err := json.Marshal(m.Actions)
  306. if err != nil {
  307. return err
  308. }
  309. actionsStr = string(actionsBytes)
  310. }
  311. var sender string
  312. if m.Sender.IsValid() {
  313. sender = m.Sender.String()
  314. }
  315. _, err := stmt.Exec(
  316. m.ID,
  317. m.Time,
  318. m.Expires,
  319. m.Topic,
  320. m.Message,
  321. m.Title,
  322. m.Priority,
  323. tags,
  324. m.Click,
  325. m.Icon,
  326. actionsStr,
  327. attachmentName,
  328. attachmentType,
  329. attachmentSize,
  330. attachmentExpires,
  331. attachmentURL,
  332. attachmentDeleted, // Always zero
  333. sender,
  334. m.User,
  335. m.Encoding,
  336. published,
  337. )
  338. if err != nil {
  339. return err
  340. }
  341. }
  342. if err := tx.Commit(); err != nil {
  343. log.Error("Message Cache: Writing %d message(s) failed (took %v)", len(ms), time.Since(start))
  344. return err
  345. }
  346. log.Debug("Message Cache: Wrote %d message(s) in %v", len(ms), time.Since(start))
  347. return nil
  348. }
  349. func (c *messageCache) Messages(topic string, since sinceMarker, scheduled bool) ([]*message, error) {
  350. if since.IsNone() {
  351. return make([]*message, 0), nil
  352. } else if since.IsID() {
  353. return c.messagesSinceID(topic, since, scheduled)
  354. }
  355. return c.messagesSinceTime(topic, since, scheduled)
  356. }
  357. func (c *messageCache) messagesSinceTime(topic string, since sinceMarker, scheduled bool) ([]*message, error) {
  358. var rows *sql.Rows
  359. var err error
  360. if scheduled {
  361. rows, err = c.db.Query(selectMessagesSinceTimeIncludeScheduledQuery, topic, since.Time().Unix())
  362. } else {
  363. rows, err = c.db.Query(selectMessagesSinceTimeQuery, topic, since.Time().Unix())
  364. }
  365. if err != nil {
  366. return nil, err
  367. }
  368. return readMessages(rows)
  369. }
  370. func (c *messageCache) messagesSinceID(topic string, since sinceMarker, scheduled bool) ([]*message, error) {
  371. idrows, err := c.db.Query(selectRowIDFromMessageID, since.ID())
  372. if err != nil {
  373. return nil, err
  374. }
  375. defer idrows.Close()
  376. if !idrows.Next() {
  377. return c.messagesSinceTime(topic, sinceAllMessages, scheduled)
  378. }
  379. var rowID int64
  380. if err := idrows.Scan(&rowID); err != nil {
  381. return nil, err
  382. }
  383. idrows.Close()
  384. var rows *sql.Rows
  385. if scheduled {
  386. rows, err = c.db.Query(selectMessagesSinceIDIncludeScheduledQuery, topic, rowID)
  387. } else {
  388. rows, err = c.db.Query(selectMessagesSinceIDQuery, topic, rowID)
  389. }
  390. if err != nil {
  391. return nil, err
  392. }
  393. return readMessages(rows)
  394. }
  395. func (c *messageCache) MessagesDue() ([]*message, error) {
  396. rows, err := c.db.Query(selectMessagesDueQuery, time.Now().Unix())
  397. if err != nil {
  398. return nil, err
  399. }
  400. return readMessages(rows)
  401. }
  402. // MessagesExpired returns a list of IDs for messages that have expires (should be deleted)
  403. func (c *messageCache) MessagesExpired() ([]string, error) {
  404. rows, err := c.db.Query(selectMessagesExpiredQuery, time.Now().Unix())
  405. if err != nil {
  406. return nil, err
  407. }
  408. defer rows.Close()
  409. ids := make([]string, 0)
  410. for rows.Next() {
  411. var id string
  412. if err := rows.Scan(&id); err != nil {
  413. return nil, err
  414. }
  415. ids = append(ids, id)
  416. }
  417. if err := rows.Err(); err != nil {
  418. return nil, err
  419. }
  420. return ids, nil
  421. }
  422. func (c *messageCache) MarkPublished(m *message) error {
  423. _, err := c.db.Exec(updateMessagePublishedQuery, m.ID)
  424. return err
  425. }
  426. func (c *messageCache) MessageCounts() (map[string]int, error) {
  427. rows, err := c.db.Query(selectMessageCountPerTopicQuery)
  428. if err != nil {
  429. return nil, err
  430. }
  431. defer rows.Close()
  432. var topic string
  433. var count int
  434. counts := make(map[string]int)
  435. for rows.Next() {
  436. if err := rows.Scan(&topic, &count); err != nil {
  437. return nil, err
  438. } else if err := rows.Err(); err != nil {
  439. return nil, err
  440. }
  441. counts[topic] = count
  442. }
  443. return counts, nil
  444. }
  445. func (c *messageCache) Topics() (map[string]*topic, error) {
  446. rows, err := c.db.Query(selectTopicsQuery)
  447. if err != nil {
  448. return nil, err
  449. }
  450. defer rows.Close()
  451. topics := make(map[string]*topic)
  452. for rows.Next() {
  453. var id string
  454. if err := rows.Scan(&id); err != nil {
  455. return nil, err
  456. }
  457. topics[id] = newTopic(id)
  458. }
  459. if err := rows.Err(); err != nil {
  460. return nil, err
  461. }
  462. return topics, nil
  463. }
  464. func (c *messageCache) DeleteMessages(ids ...string) error {
  465. tx, err := c.db.Begin()
  466. if err != nil {
  467. return err
  468. }
  469. defer tx.Rollback()
  470. for _, id := range ids {
  471. if _, err := tx.Exec(deleteMessageQuery, id); err != nil {
  472. return err
  473. }
  474. }
  475. return tx.Commit()
  476. }
  477. func (c *messageCache) ExpireMessages(topics ...string) error {
  478. tx, err := c.db.Begin()
  479. if err != nil {
  480. return err
  481. }
  482. defer tx.Rollback()
  483. for _, t := range topics {
  484. if _, err := tx.Exec(updateMessagesForTopicExpiryQuery, time.Now().Unix(), t); err != nil {
  485. return err
  486. }
  487. }
  488. return tx.Commit()
  489. }
  490. func (c *messageCache) AttachmentsExpired() ([]string, error) {
  491. rows, err := c.db.Query(selectAttachmentsExpiredQuery, time.Now().Unix())
  492. if err != nil {
  493. return nil, err
  494. }
  495. defer rows.Close()
  496. ids := make([]string, 0)
  497. for rows.Next() {
  498. var id string
  499. if err := rows.Scan(&id); err != nil {
  500. return nil, err
  501. }
  502. ids = append(ids, id)
  503. }
  504. if err := rows.Err(); err != nil {
  505. return nil, err
  506. }
  507. return ids, nil
  508. }
  509. func (c *messageCache) MarkAttachmentsDeleted(ids ...string) error {
  510. tx, err := c.db.Begin()
  511. if err != nil {
  512. return err
  513. }
  514. defer tx.Rollback()
  515. for _, id := range ids {
  516. if _, err := tx.Exec(updateAttachmentDeleted, id); err != nil {
  517. return err
  518. }
  519. }
  520. return tx.Commit()
  521. }
  522. func (c *messageCache) AttachmentBytesUsedBySender(sender string) (int64, error) {
  523. rows, err := c.db.Query(selectAttachmentsSizeBySenderQuery, sender, time.Now().Unix())
  524. if err != nil {
  525. return 0, err
  526. }
  527. return c.readAttachmentBytesUsed(rows)
  528. }
  529. func (c *messageCache) AttachmentBytesUsedByUser(user string) (int64, error) {
  530. rows, err := c.db.Query(selectAttachmentsSizeByUserQuery, user, time.Now().Unix())
  531. if err != nil {
  532. return 0, err
  533. }
  534. return c.readAttachmentBytesUsed(rows)
  535. }
  536. func (c *messageCache) readAttachmentBytesUsed(rows *sql.Rows) (int64, error) {
  537. defer rows.Close()
  538. var size int64
  539. if !rows.Next() {
  540. return 0, errors.New("no rows found")
  541. }
  542. if err := rows.Scan(&size); err != nil {
  543. return 0, err
  544. } else if err := rows.Err(); err != nil {
  545. return 0, err
  546. }
  547. return size, nil
  548. }
  549. func (c *messageCache) processMessageBatches() {
  550. if c.queue == nil {
  551. return
  552. }
  553. for messages := range c.queue.Dequeue() {
  554. if err := c.addMessages(messages); err != nil {
  555. log.Error("Message Cache: %s", err.Error())
  556. }
  557. }
  558. }
  559. func readMessages(rows *sql.Rows) ([]*message, error) {
  560. defer rows.Close()
  561. messages := make([]*message, 0)
  562. for rows.Next() {
  563. var timestamp, expires, attachmentSize, attachmentExpires int64
  564. var priority int
  565. var id, topic, msg, title, tagsStr, click, icon, actionsStr, attachmentName, attachmentType, attachmentURL, sender, user, encoding string
  566. err := rows.Scan(
  567. &id,
  568. &timestamp,
  569. &expires,
  570. &topic,
  571. &msg,
  572. &title,
  573. &priority,
  574. &tagsStr,
  575. &click,
  576. &icon,
  577. &actionsStr,
  578. &attachmentName,
  579. &attachmentType,
  580. &attachmentSize,
  581. &attachmentExpires,
  582. &attachmentURL,
  583. &sender,
  584. &user,
  585. &encoding,
  586. )
  587. if err != nil {
  588. return nil, err
  589. }
  590. var tags []string
  591. if tagsStr != "" {
  592. tags = strings.Split(tagsStr, ",")
  593. }
  594. var actions []*action
  595. if actionsStr != "" {
  596. if err := json.Unmarshal([]byte(actionsStr), &actions); err != nil {
  597. return nil, err
  598. }
  599. }
  600. senderIP, err := netip.ParseAddr(sender)
  601. if err != nil {
  602. senderIP = netip.Addr{} // if no IP stored in database, return invalid address
  603. }
  604. var att *attachment
  605. if attachmentName != "" && attachmentURL != "" {
  606. att = &attachment{
  607. Name: attachmentName,
  608. Type: attachmentType,
  609. Size: attachmentSize,
  610. Expires: attachmentExpires,
  611. URL: attachmentURL,
  612. }
  613. }
  614. messages = append(messages, &message{
  615. ID: id,
  616. Time: timestamp,
  617. Expires: expires,
  618. Event: messageEvent,
  619. Topic: topic,
  620. Message: msg,
  621. Title: title,
  622. Priority: priority,
  623. Tags: tags,
  624. Click: click,
  625. Icon: icon,
  626. Actions: actions,
  627. Attachment: att,
  628. Sender: senderIP, // Must parse assuming database must be correct
  629. User: user,
  630. Encoding: encoding,
  631. })
  632. }
  633. if err := rows.Err(); err != nil {
  634. return nil, err
  635. }
  636. return messages, nil
  637. }
  638. func setupDB(db *sql.DB, startupQueries string, cacheDuration time.Duration) error {
  639. // Run startup queries
  640. if startupQueries != "" {
  641. if _, err := db.Exec(startupQueries); err != nil {
  642. return err
  643. }
  644. }
  645. // If 'messages' table does not exist, this must be a new database
  646. rowsMC, err := db.Query(selectMessagesCountQuery)
  647. if err != nil {
  648. return setupNewCacheDB(db)
  649. }
  650. rowsMC.Close()
  651. // If 'messages' table exists, check 'schemaVersion' table
  652. schemaVersion := 0
  653. rowsSV, err := db.Query(selectSchemaVersionQuery)
  654. if err == nil {
  655. defer rowsSV.Close()
  656. if !rowsSV.Next() {
  657. return errors.New("cannot determine schema version: cache file may be corrupt")
  658. }
  659. if err := rowsSV.Scan(&schemaVersion); err != nil {
  660. return err
  661. }
  662. rowsSV.Close()
  663. }
  664. // Do migrations
  665. if schemaVersion == currentSchemaVersion {
  666. return nil
  667. } else if schemaVersion > currentSchemaVersion {
  668. return fmt.Errorf("unexpected schema version: version %d is higher than current version %d", schemaVersion, currentSchemaVersion)
  669. }
  670. for i := schemaVersion; i < currentSchemaVersion; i++ {
  671. fn, ok := migrations[i]
  672. if !ok {
  673. return fmt.Errorf("cannot find migration step from schema version %d to %d", i, i+1)
  674. } else if err := fn(db, cacheDuration); err != nil {
  675. return err
  676. }
  677. }
  678. return nil
  679. }
  680. func setupNewCacheDB(db *sql.DB) error {
  681. if _, err := db.Exec(createMessagesTableQuery); err != nil {
  682. return err
  683. }
  684. if _, err := db.Exec(createSchemaVersionTableQuery); err != nil {
  685. return err
  686. }
  687. if _, err := db.Exec(insertSchemaVersion, currentSchemaVersion); err != nil {
  688. return err
  689. }
  690. return nil
  691. }
  692. func migrateFrom0(db *sql.DB, _ time.Duration) error {
  693. log.Info("Migrating cache database schema: from 0 to 1")
  694. if _, err := db.Exec(migrate0To1AlterMessagesTableQuery); err != nil {
  695. return err
  696. }
  697. if _, err := db.Exec(createSchemaVersionTableQuery); err != nil {
  698. return err
  699. }
  700. if _, err := db.Exec(insertSchemaVersion, 1); err != nil {
  701. return err
  702. }
  703. return nil
  704. }
  705. func migrateFrom1(db *sql.DB, _ time.Duration) error {
  706. log.Info("Migrating cache database schema: from 1 to 2")
  707. if _, err := db.Exec(migrate1To2AlterMessagesTableQuery); err != nil {
  708. return err
  709. }
  710. if _, err := db.Exec(updateSchemaVersion, 2); err != nil {
  711. return err
  712. }
  713. return nil
  714. }
  715. func migrateFrom2(db *sql.DB, _ time.Duration) error {
  716. log.Info("Migrating cache database schema: from 2 to 3")
  717. if _, err := db.Exec(migrate2To3AlterMessagesTableQuery); err != nil {
  718. return err
  719. }
  720. if _, err := db.Exec(updateSchemaVersion, 3); err != nil {
  721. return err
  722. }
  723. return nil
  724. }
  725. func migrateFrom3(db *sql.DB, _ time.Duration) error {
  726. log.Info("Migrating cache database schema: from 3 to 4")
  727. if _, err := db.Exec(migrate3To4AlterMessagesTableQuery); err != nil {
  728. return err
  729. }
  730. if _, err := db.Exec(updateSchemaVersion, 4); err != nil {
  731. return err
  732. }
  733. return nil
  734. }
  735. func migrateFrom4(db *sql.DB, _ time.Duration) error {
  736. log.Info("Migrating cache database schema: from 4 to 5")
  737. if _, err := db.Exec(migrate4To5AlterMessagesTableQuery); err != nil {
  738. return err
  739. }
  740. if _, err := db.Exec(updateSchemaVersion, 5); err != nil {
  741. return err
  742. }
  743. return nil
  744. }
  745. func migrateFrom5(db *sql.DB, _ time.Duration) error {
  746. log.Info("Migrating cache database schema: from 5 to 6")
  747. if _, err := db.Exec(migrate5To6AlterMessagesTableQuery); err != nil {
  748. return err
  749. }
  750. if _, err := db.Exec(updateSchemaVersion, 6); err != nil {
  751. return err
  752. }
  753. return nil
  754. }
  755. func migrateFrom6(db *sql.DB, _ time.Duration) error {
  756. log.Info("Migrating cache database schema: from 6 to 7")
  757. if _, err := db.Exec(migrate6To7AlterMessagesTableQuery); err != nil {
  758. return err
  759. }
  760. if _, err := db.Exec(updateSchemaVersion, 7); err != nil {
  761. return err
  762. }
  763. return nil
  764. }
  765. func migrateFrom7(db *sql.DB, _ time.Duration) error {
  766. log.Info("Migrating cache database schema: from 7 to 8")
  767. if _, err := db.Exec(migrate7To8AlterMessagesTableQuery); err != nil {
  768. return err
  769. }
  770. if _, err := db.Exec(updateSchemaVersion, 8); err != nil {
  771. return err
  772. }
  773. return nil
  774. }
  775. func migrateFrom8(db *sql.DB, _ time.Duration) error {
  776. log.Info("Migrating cache database schema: from 8 to 9")
  777. if _, err := db.Exec(migrate8To9AlterMessagesTableQuery); err != nil {
  778. return err
  779. }
  780. if _, err := db.Exec(updateSchemaVersion, 9); err != nil {
  781. return err
  782. }
  783. return nil
  784. }
  785. func migrateFrom9(db *sql.DB, cacheDuration time.Duration) error {
  786. log.Info("Migrating cache database schema: from 9 to 10")
  787. tx, err := db.Begin()
  788. if err != nil {
  789. return err
  790. }
  791. defer tx.Rollback()
  792. if _, err := tx.Exec(migrate9To10AlterMessagesTableQuery); err != nil {
  793. return err
  794. }
  795. if _, err := tx.Exec(migrate9To10UpdateMessageExpiryQuery, int64(cacheDuration.Seconds())); err != nil {
  796. return err
  797. }
  798. if _, err := tx.Exec(updateSchemaVersion, 10); err != nil {
  799. return err
  800. }
  801. if err := tx.Commit(); err != nil {
  802. return err
  803. }
  804. return nil // Update this when a new version is added
  805. }