123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284 |
- package server
- import (
- "database/sql"
- "errors"
- "fmt"
- _ "github.com/mattn/go-sqlite3" // SQLite driver
- "log"
- "strings"
- "time"
- )
- // Messages cache
- const (
- createMessagesTableQuery = `
- BEGIN;
- CREATE TABLE IF NOT EXISTS messages (
- id VARCHAR(20) PRIMARY KEY,
- time INT NOT NULL,
- topic VARCHAR(64) NOT NULL,
- message VARCHAR(512) NOT NULL,
- title VARCHAR(256) NOT NULL,
- priority INT NOT NULL,
- tags VARCHAR(256) NOT NULL,
- published INT NOT NULL
- );
- CREATE INDEX IF NOT EXISTS idx_topic ON messages (topic);
- COMMIT;
- `
- insertMessageQuery = `INSERT INTO messages (id, time, topic, message, title, priority, tags, published) VALUES (?, ?, ?, ?, ?, ?, ?, ?)`
- pruneMessagesQuery = `DELETE FROM messages WHERE time < ? AND published = 1`
- selectMessagesSinceTimeQuery = `
- SELECT id, time, topic, message, title, priority, tags
- FROM messages
- WHERE topic = ? AND time >= ? AND published = 1
- ORDER BY time ASC
- `
- selectMessagesSinceTimeIncludeScheduledQuery = `
- SELECT id, time, topic, message, title, priority, tags
- FROM messages
- WHERE topic = ? AND time >= ?
- ORDER BY time ASC
- `
- selectMessagesDueQuery = `
- SELECT id, time, topic, message, title, priority, tags
- FROM messages
- WHERE time <= ? AND published = 0
- `
- updateMessagePublishedQuery = `UPDATE messages SET published = 1 WHERE id = ?`
- selectMessagesCountQuery = `SELECT COUNT(*) FROM messages`
- selectMessageCountForTopicQuery = `SELECT COUNT(*) FROM messages WHERE topic = ?`
- selectTopicsQuery = `SELECT topic FROM messages GROUP BY topic`
- )
- // Schema management queries
- const (
- currentSchemaVersion = 2
- createSchemaVersionTableQuery = `
- CREATE TABLE IF NOT EXISTS schemaVersion (
- id INT PRIMARY KEY,
- version INT NOT NULL
- );
- `
- insertSchemaVersion = `INSERT INTO schemaVersion VALUES (1, ?)`
- updateSchemaVersion = `UPDATE schemaVersion SET version = ? WHERE id = 1`
- selectSchemaVersionQuery = `SELECT version FROM schemaVersion WHERE id = 1`
- // 0 -> 1
- migrate0To1AlterMessagesTableQuery = `
- BEGIN;
- ALTER TABLE messages ADD COLUMN title VARCHAR(256) NOT NULL DEFAULT('');
- ALTER TABLE messages ADD COLUMN priority INT NOT NULL DEFAULT(0);
- ALTER TABLE messages ADD COLUMN tags VARCHAR(256) NOT NULL DEFAULT('');
- COMMIT;
- `
- // 1 -> 2
- migrate1To2AlterMessagesTableQuery = `
- ALTER TABLE messages ADD COLUMN published INT NOT NULL DEFAULT(1);
- `
- )
- type sqliteCache struct {
- db *sql.DB
- }
- var _ cache = (*sqliteCache)(nil)
- func newSqliteCache(filename string) (*sqliteCache, error) {
- db, err := sql.Open("sqlite3", filename)
- if err != nil {
- return nil, err
- }
- if err := setupDB(db); err != nil {
- return nil, err
- }
- return &sqliteCache{
- db: db,
- }, nil
- }
- func (c *sqliteCache) AddMessage(m *message) error {
- if m.Event != messageEvent {
- return errUnexpectedMessageType
- }
- published := m.Time <= time.Now().Unix()
- _, err := c.db.Exec(insertMessageQuery, m.ID, m.Time, m.Topic, m.Message, m.Title, m.Priority, strings.Join(m.Tags, ","), published)
- return err
- }
- func (c *sqliteCache) Messages(topic string, since sinceTime, scheduled bool) ([]*message, error) {
- if since.IsNone() {
- return make([]*message, 0), nil
- }
- var rows *sql.Rows
- var err error
- if scheduled {
- rows, err = c.db.Query(selectMessagesSinceTimeIncludeScheduledQuery, topic, since.Time().Unix())
- } else {
- rows, err = c.db.Query(selectMessagesSinceTimeQuery, topic, since.Time().Unix())
- }
- if err != nil {
- return nil, err
- }
- return readMessages(rows)
- }
- func (c *sqliteCache) MessagesDue() ([]*message, error) {
- rows, err := c.db.Query(selectMessagesDueQuery, time.Now().Unix())
- if err != nil {
- return nil, err
- }
- return readMessages(rows)
- }
- func (c *sqliteCache) MarkPublished(m *message) error {
- _, err := c.db.Exec(updateMessagePublishedQuery, m.ID)
- return err
- }
- func (c *sqliteCache) MessageCount(topic string) (int, error) {
- rows, err := c.db.Query(selectMessageCountForTopicQuery, topic)
- if err != nil {
- return 0, err
- }
- defer rows.Close()
- var count int
- if !rows.Next() {
- return 0, errors.New("no rows found")
- }
- if err := rows.Scan(&count); err != nil {
- return 0, err
- } else if err := rows.Err(); err != nil {
- return 0, err
- }
- return count, nil
- }
- func (c *sqliteCache) Topics() (map[string]*topic, error) {
- rows, err := c.db.Query(selectTopicsQuery)
- if err != nil {
- return nil, err
- }
- defer rows.Close()
- topics := make(map[string]*topic)
- for rows.Next() {
- var id string
- if err := rows.Scan(&id); err != nil {
- return nil, err
- }
- topics[id] = newTopic(id)
- }
- if err := rows.Err(); err != nil {
- return nil, err
- }
- return topics, nil
- }
- func (c *sqliteCache) Prune(olderThan time.Time) error {
- _, err := c.db.Exec(pruneMessagesQuery, olderThan.Unix())
- return err
- }
- func readMessages(rows *sql.Rows) ([]*message, error) {
- defer rows.Close()
- messages := make([]*message, 0)
- for rows.Next() {
- var timestamp int64
- var priority int
- var id, topic, msg, title, tagsStr string
- if err := rows.Scan(&id, ×tamp, &topic, &msg, &title, &priority, &tagsStr); err != nil {
- return nil, err
- }
- var tags []string
- if tagsStr != "" {
- tags = strings.Split(tagsStr, ",")
- }
- messages = append(messages, &message{
- ID: id,
- Time: timestamp,
- Event: messageEvent,
- Topic: topic,
- Message: msg,
- Title: title,
- Priority: priority,
- Tags: tags,
- })
- }
- if err := rows.Err(); err != nil {
- return nil, err
- }
- return messages, nil
- }
- func setupDB(db *sql.DB) error {
- // If 'messages' table does not exist, this must be a new database
- rowsMC, err := db.Query(selectMessagesCountQuery)
- if err != nil {
- return setupNewDB(db)
- }
- rowsMC.Close()
- // If 'messages' table exists, check 'schemaVersion' table
- schemaVersion := 0
- rowsSV, err := db.Query(selectSchemaVersionQuery)
- if err == nil {
- defer rowsSV.Close()
- if !rowsSV.Next() {
- return errors.New("cannot determine schema version: cache file may be corrupt")
- }
- if err := rowsSV.Scan(&schemaVersion); err != nil {
- return err
- }
- rowsSV.Close()
- }
- // Do migrations
- if schemaVersion == currentSchemaVersion {
- return nil
- } else if schemaVersion == 0 {
- return migrateFrom0(db)
- } else if schemaVersion == 1 {
- return migrateFrom1(db)
- }
- return fmt.Errorf("unexpected schema version found: %d", schemaVersion)
- }
- func setupNewDB(db *sql.DB) error {
- if _, err := db.Exec(createMessagesTableQuery); err != nil {
- return err
- }
- if _, err := db.Exec(createSchemaVersionTableQuery); err != nil {
- return err
- }
- if _, err := db.Exec(insertSchemaVersion, currentSchemaVersion); err != nil {
- return err
- }
- return nil
- }
- func migrateFrom0(db *sql.DB) error {
- log.Print("Migrating cache database schema: from 0 to 1")
- if _, err := db.Exec(migrate0To1AlterMessagesTableQuery); err != nil {
- return err
- }
- if _, err := db.Exec(createSchemaVersionTableQuery); err != nil {
- return err
- }
- if _, err := db.Exec(insertSchemaVersion, 1); err != nil {
- return err
- }
- return migrateFrom1(db)
- }
- func migrateFrom1(db *sql.DB) error {
- log.Print("Migrating cache database schema: from 1 to 2")
- if _, err := db.Exec(migrate1To2AlterMessagesTableQuery); err != nil {
- return err
- }
- if _, err := db.Exec(updateSchemaVersion, 2); err != nil {
- return err
- }
- return nil // Update this when a new version is added
- }
|