cache_sqlite.go 7.2 KB


  1. package server
  2. import (
  3. "database/sql"
  4. "errors"
  5. "fmt"
  6. _ "github.com/mattn/go-sqlite3" // SQLite driver
  7. "log"
  8. "strings"
  9. "time"
  10. )
  11. // Messages cache
  12. const (
  13. createMessagesTableQuery = `
  14. BEGIN;
  15. CREATE TABLE IF NOT EXISTS messages (
  16. id VARCHAR(20) PRIMARY KEY,
  17. time INT NOT NULL,
  18. topic VARCHAR(64) NOT NULL,
  19. message VARCHAR(512) NOT NULL,
  20. title VARCHAR(256) NOT NULL,
  21. priority INT NOT NULL,
  22. tags VARCHAR(256) NOT NULL,
  23. published INT NOT NULL
  24. );
  25. CREATE INDEX IF NOT EXISTS idx_topic ON messages (topic);
  26. COMMIT;
  27. `
  28. insertMessageQuery = `INSERT INTO messages (id, time, topic, message, title, priority, tags, published) VALUES (?, ?, ?, ?, ?, ?, ?, ?)`
  29. pruneMessagesQuery = `DELETE FROM messages WHERE time < ? AND published = 1`
  30. selectMessagesSinceTimeQuery = `
  31. SELECT id, time, topic, message, title, priority, tags
  32. FROM messages
  33. WHERE topic = ? AND time >= ? AND published = 1
  34. ORDER BY time ASC
  35. `
  36. selectMessagesSinceTimeIncludeScheduledQuery = `
  37. SELECT id, time, topic, message, title, priority, tags
  38. FROM messages
  39. WHERE topic = ? AND time >= ?
  40. ORDER BY time ASC
  41. `
  42. selectMessagesDueQuery = `
  43. SELECT id, time, topic, message, title, priority, tags
  44. FROM messages
  45. WHERE time <= ? AND published = 0
  46. `
  47. updateMessagePublishedQuery = `UPDATE messages SET published = 1 WHERE id = ?`
  48. selectMessagesCountQuery = `SELECT COUNT(*) FROM messages`
  49. selectMessageCountForTopicQuery = `SELECT COUNT(*) FROM messages WHERE topic = ?`
  50. selectTopicsQuery = `SELECT topic FROM messages GROUP BY topic`
  51. )
  52. // Schema management queries
  53. const (
  54. currentSchemaVersion = 2
  55. createSchemaVersionTableQuery = `
  56. CREATE TABLE IF NOT EXISTS schemaVersion (
  57. id INT PRIMARY KEY,
  58. version INT NOT NULL
  59. );
  60. `
  61. insertSchemaVersion = `INSERT INTO schemaVersion VALUES (1, ?)`
  62. updateSchemaVersion = `UPDATE schemaVersion SET version = ? WHERE id = 1`
  63. selectSchemaVersionQuery = `SELECT version FROM schemaVersion WHERE id = 1`
  64. // 0 -> 1
  65. migrate0To1AlterMessagesTableQuery = `
  66. BEGIN;
  67. ALTER TABLE messages ADD COLUMN title VARCHAR(256) NOT NULL DEFAULT('');
  68. ALTER TABLE messages ADD COLUMN priority INT NOT NULL DEFAULT(0);
  69. ALTER TABLE messages ADD COLUMN tags VARCHAR(256) NOT NULL DEFAULT('');
  70. COMMIT;
  71. `
  72. // 1 -> 2
  73. migrate1To2AlterMessagesTableQuery = `
  74. ALTER TABLE messages ADD COLUMN published INT NOT NULL DEFAULT(1);
  75. `
  76. )
  77. type sqliteCache struct {
  78. db *sql.DB
  79. }
  80. var _ cache = (*sqliteCache)(nil)
  81. func newSqliteCache(filename string) (*sqliteCache, error) {
  82. db, err := sql.Open("sqlite3", filename)
  83. if err != nil {
  84. return nil, err
  85. }
  86. if err := setupDB(db); err != nil {
  87. return nil, err
  88. }
  89. return &sqliteCache{
  90. db: db,
  91. }, nil
  92. }
  93. func (c *sqliteCache) AddMessage(m *message) error {
  94. if m.Event != messageEvent {
  95. return errUnexpectedMessageType
  96. }
  97. published := m.Time <= time.Now().Unix()
  98. _, err := c.db.Exec(insertMessageQuery, m.ID, m.Time, m.Topic, m.Message, m.Title, m.Priority, strings.Join(m.Tags, ","), published)
  99. return err
  100. }
  101. func (c *sqliteCache) Messages(topic string, since sinceTime, scheduled bool) ([]*message, error) {
  102. if since.IsNone() {
  103. return make([]*message, 0), nil
  104. }
  105. var rows *sql.Rows
  106. var err error
  107. if scheduled {
  108. rows, err = c.db.Query(selectMessagesSinceTimeIncludeScheduledQuery, topic, since.Time().Unix())
  109. } else {
  110. rows, err = c.db.Query(selectMessagesSinceTimeQuery, topic, since.Time().Unix())
  111. }
  112. if err != nil {
  113. return nil, err
  114. }
  115. return readMessages(rows)
  116. }
  117. func (c *sqliteCache) MessagesDue() ([]*message, error) {
  118. rows, err := c.db.Query(selectMessagesDueQuery, time.Now().Unix())
  119. if err != nil {
  120. return nil, err
  121. }
  122. return readMessages(rows)
  123. }
  124. func (c *sqliteCache) MarkPublished(m *message) error {
  125. _, err := c.db.Exec(updateMessagePublishedQuery, m.ID)
  126. return err
  127. }
  128. func (c *sqliteCache) MessageCount(topic string) (int, error) {
  129. rows, err := c.db.Query(selectMessageCountForTopicQuery, topic)
  130. if err != nil {
  131. return 0, err
  132. }
  133. defer rows.Close()
  134. var count int
  135. if !rows.Next() {
  136. return 0, errors.New("no rows found")
  137. }
  138. if err := rows.Scan(&count); err != nil {
  139. return 0, err
  140. } else if err := rows.Err(); err != nil {
  141. return 0, err
  142. }
  143. return count, nil
  144. }
  145. func (c *sqliteCache) Topics() (map[string]*topic, error) {
  146. rows, err := c.db.Query(selectTopicsQuery)
  147. if err != nil {
  148. return nil, err
  149. }
  150. defer rows.Close()
  151. topics := make(map[string]*topic)
  152. for rows.Next() {
  153. var id string
  154. if err := rows.Scan(&id); err != nil {
  155. return nil, err
  156. }
  157. topics[id] = newTopic(id)
  158. }
  159. if err := rows.Err(); err != nil {
  160. return nil, err
  161. }
  162. return topics, nil
  163. }
  164. func (c *sqliteCache) Prune(olderThan time.Time) error {
  165. _, err := c.db.Exec(pruneMessagesQuery, olderThan.Unix())
  166. return err
  167. }
  168. func readMessages(rows *sql.Rows) ([]*message, error) {
  169. defer rows.Close()
  170. messages := make([]*message, 0)
  171. for rows.Next() {
  172. var timestamp int64
  173. var priority int
  174. var id, topic, msg, title, tagsStr string
  175. if err := rows.Scan(&id, &timestamp, &topic, &msg, &title, &priority, &tagsStr); err != nil {
  176. return nil, err
  177. }
  178. var tags []string
  179. if tagsStr != "" {
  180. tags = strings.Split(tagsStr, ",")
  181. }
  182. messages = append(messages, &message{
  183. ID: id,
  184. Time: timestamp,
  185. Event: messageEvent,
  186. Topic: topic,
  187. Message: msg,
  188. Title: title,
  189. Priority: priority,
  190. Tags: tags,
  191. })
  192. }
  193. if err := rows.Err(); err != nil {
  194. return nil, err
  195. }
  196. return messages, nil
  197. }
  198. func setupDB(db *sql.DB) error {
  199. // If 'messages' table does not exist, this must be a new database
  200. rowsMC, err := db.Query(selectMessagesCountQuery)
  201. if err != nil {
  202. return setupNewDB(db)
  203. }
  204. rowsMC.Close()
  205. // If 'messages' table exists, check 'schemaVersion' table
  206. schemaVersion := 0
  207. rowsSV, err := db.Query(selectSchemaVersionQuery)
  208. if err == nil {
  209. defer rowsSV.Close()
  210. if !rowsSV.Next() {
  211. return errors.New("cannot determine schema version: cache file may be corrupt")
  212. }
  213. if err := rowsSV.Scan(&schemaVersion); err != nil {
  214. return err
  215. }
  216. rowsSV.Close()
  217. }
  218. // Do migrations
  219. if schemaVersion == currentSchemaVersion {
  220. return nil
  221. } else if schemaVersion == 0 {
  222. return migrateFrom0(db)
  223. } else if schemaVersion == 1 {
  224. return migrateFrom1(db)
  225. }
  226. return fmt.Errorf("unexpected schema version found: %d", schemaVersion)
  227. }
  228. func setupNewDB(db *sql.DB) error {
  229. if _, err := db.Exec(createMessagesTableQuery); err != nil {
  230. return err
  231. }
  232. if _, err := db.Exec(createSchemaVersionTableQuery); err != nil {
  233. return err
  234. }
  235. if _, err := db.Exec(insertSchemaVersion, currentSchemaVersion); err != nil {
  236. return err
  237. }
  238. return nil
  239. }
  240. func migrateFrom0(db *sql.DB) error {
  241. log.Print("Migrating cache database schema: from 0 to 1")
  242. if _, err := db.Exec(migrate0To1AlterMessagesTableQuery); err != nil {
  243. return err
  244. }
  245. if _, err := db.Exec(createSchemaVersionTableQuery); err != nil {
  246. return err
  247. }
  248. if _, err := db.Exec(insertSchemaVersion, 1); err != nil {
  249. return err
  250. }
  251. return migrateFrom1(db)
  252. }
  253. func migrateFrom1(db *sql.DB) error {
  254. log.Print("Migrating cache database schema: from 1 to 2")
  255. if _, err := db.Exec(migrate1To2AlterMessagesTableQuery); err != nil {
  256. return err
  257. }
  258. if _, err := db.Exec(updateSchemaVersion, 2); err != nil {
  259. return err
  260. }
  261. return nil // Update this when a new version is added
  262. }