123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603 |
- package server
- import (
- "database/sql"
- "encoding/json"
- "errors"
- "fmt"
- _ "github.com/mattn/go-sqlite3" // SQLite driver
- "heckel.io/ntfy/util"
- "log"
- "strings"
- "time"
- )
- var (
- errUnexpectedMessageType = errors.New("unexpected message type")
- )
- // Messages cache
- const (
- createMessagesTableQuery = `
- BEGIN;
- CREATE TABLE IF NOT EXISTS messages (
- id INTEGER PRIMARY KEY AUTOINCREMENT,
- mid TEXT NOT NULL,
- time INT NOT NULL,
- topic TEXT NOT NULL,
- message TEXT NOT NULL,
- title TEXT NOT NULL,
- priority INT NOT NULL,
- tags TEXT NOT NULL,
- click TEXT NOT NULL,
- actions TEXT NOT NULL,
- attachment_name TEXT NOT NULL,
- attachment_type TEXT NOT NULL,
- attachment_size INT NOT NULL,
- attachment_expires INT NOT NULL,
- attachment_url TEXT NOT NULL,
- attachment_owner TEXT NOT NULL,
- encoding TEXT NOT NULL,
- published INT NOT NULL
- );
- CREATE INDEX IF NOT EXISTS idx_mid ON messages (mid);
- CREATE INDEX IF NOT EXISTS idx_topic ON messages (topic);
- COMMIT;
- `
- insertMessageQuery = `
- INSERT INTO messages (mid, time, topic, message, title, priority, tags, click, actions, attachment_name, attachment_type, attachment_size, attachment_expires, attachment_url, attachment_owner, encoding, published)
- VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
- `
- pruneMessagesQuery = `DELETE FROM messages WHERE time < ? AND published = 1`
- selectRowIDFromMessageID = `SELECT id FROM messages WHERE topic = ? AND mid = ?`
- selectMessagesSinceTimeQuery = `
- SELECT mid, time, topic, message, title, priority, tags, click, actions, attachment_name, attachment_type, attachment_size, attachment_expires, attachment_url, attachment_owner, encoding
- FROM messages
- WHERE topic = ? AND time >= ? AND published = 1
- ORDER BY time, id
- `
- selectMessagesSinceTimeIncludeScheduledQuery = `
- SELECT mid, time, topic, message, title, priority, tags, click, actions, attachment_name, attachment_type, attachment_size, attachment_expires, attachment_url, attachment_owner, encoding
- FROM messages
- WHERE topic = ? AND time >= ?
- ORDER BY time, id
- `
- selectMessagesSinceIDQuery = `
- SELECT mid, time, topic, message, title, priority, tags, click, actions, attachment_name, attachment_type, attachment_size, attachment_expires, attachment_url, attachment_owner, encoding
- FROM messages
- WHERE topic = ? AND id > ? AND published = 1
- ORDER BY time, id
- `
- selectMessagesSinceIDIncludeScheduledQuery = `
- SELECT mid, time, topic, message, title, priority, tags, click, actions, attachment_name, attachment_type, attachment_size, attachment_expires, attachment_url, attachment_owner, encoding
- FROM messages
- WHERE topic = ? AND (id > ? OR published = 0)
- ORDER BY time, id
- `
- selectMessagesDueQuery = `
- SELECT mid, time, topic, message, title, priority, tags, click, actions, attachment_name, attachment_type, attachment_size, attachment_expires, attachment_url, attachment_owner, encoding
- FROM messages
- WHERE time <= ? AND published = 0
- ORDER BY time, id
- `
- updateMessagePublishedQuery = `UPDATE messages SET published = 1 WHERE mid = ?`
- selectMessagesCountQuery = `SELECT COUNT(*) FROM messages`
- selectMessageCountForTopicQuery = `SELECT COUNT(*) FROM messages WHERE topic = ?`
- selectTopicsQuery = `SELECT topic FROM messages GROUP BY topic`
- selectAttachmentsSizeQuery = `SELECT IFNULL(SUM(attachment_size), 0) FROM messages WHERE attachment_owner = ? AND attachment_expires >= ?`
- selectAttachmentsExpiredQuery = `SELECT mid FROM messages WHERE attachment_expires > 0 AND attachment_expires < ?`
- )
- // Schema management queries
- const (
- currentSchemaVersion = 6
- createSchemaVersionTableQuery = `
- CREATE TABLE IF NOT EXISTS schemaVersion (
- id INT PRIMARY KEY,
- version INT NOT NULL
- );
- `
- insertSchemaVersion = `INSERT INTO schemaVersion VALUES (1, ?)`
- updateSchemaVersion = `UPDATE schemaVersion SET version = ? WHERE id = 1`
- selectSchemaVersionQuery = `SELECT version FROM schemaVersion WHERE id = 1`
- // 0 -> 1
- migrate0To1AlterMessagesTableQuery = `
- BEGIN;
- ALTER TABLE messages ADD COLUMN title TEXT NOT NULL DEFAULT('');
- ALTER TABLE messages ADD COLUMN priority INT NOT NULL DEFAULT(0);
- ALTER TABLE messages ADD COLUMN tags TEXT NOT NULL DEFAULT('');
- COMMIT;
- `
- // 1 -> 2
- migrate1To2AlterMessagesTableQuery = `
- ALTER TABLE messages ADD COLUMN published INT NOT NULL DEFAULT(1);
- `
- // 2 -> 3
- migrate2To3AlterMessagesTableQuery = `
- BEGIN;
- ALTER TABLE messages ADD COLUMN click TEXT NOT NULL DEFAULT('');
- ALTER TABLE messages ADD COLUMN attachment_name TEXT NOT NULL DEFAULT('');
- ALTER TABLE messages ADD COLUMN attachment_type TEXT NOT NULL DEFAULT('');
- ALTER TABLE messages ADD COLUMN attachment_size INT NOT NULL DEFAULT('0');
- ALTER TABLE messages ADD COLUMN attachment_expires INT NOT NULL DEFAULT('0');
- ALTER TABLE messages ADD COLUMN attachment_owner TEXT NOT NULL DEFAULT('');
- ALTER TABLE messages ADD COLUMN attachment_url TEXT NOT NULL DEFAULT('');
- COMMIT;
- `
- // 3 -> 4
- migrate3To4AlterMessagesTableQuery = `
- ALTER TABLE messages ADD COLUMN encoding TEXT NOT NULL DEFAULT('');
- `
- // 4 -> 5
- migrate4To5AlterMessagesTableQuery = `
- BEGIN;
- CREATE TABLE IF NOT EXISTS messages_new (
- id INTEGER PRIMARY KEY AUTOINCREMENT,
- mid TEXT NOT NULL,
- time INT NOT NULL,
- topic TEXT NOT NULL,
- message TEXT NOT NULL,
- title TEXT NOT NULL,
- priority INT NOT NULL,
- tags TEXT NOT NULL,
- click TEXT NOT NULL,
- attachment_name TEXT NOT NULL,
- attachment_type TEXT NOT NULL,
- attachment_size INT NOT NULL,
- attachment_expires INT NOT NULL,
- attachment_url TEXT NOT NULL,
- attachment_owner TEXT NOT NULL,
- encoding TEXT NOT NULL,
- published INT NOT NULL
- );
- CREATE INDEX IF NOT EXISTS idx_mid ON messages_new (mid);
- CREATE INDEX IF NOT EXISTS idx_topic ON messages_new (topic);
- INSERT
- INTO messages_new (
- mid, time, topic, message, title, priority, tags, click, attachment_name, attachment_type,
- attachment_size, attachment_expires, attachment_url, attachment_owner, encoding, published)
- SELECT
- id, time, topic, message, title, priority, tags, click, attachment_name, attachment_type,
- attachment_size, attachment_expires, attachment_url, attachment_owner, encoding, published
- FROM messages;
- DROP TABLE messages;
- ALTER TABLE messages_new RENAME TO messages;
- COMMIT;
- `
- // 5 -> 6
- migrate5To6AlterMessagesTableQuery = `
- ALTER TABLE messages ADD COLUMN actions TEXT NOT NULL DEFAULT('');
- `
- )
- type messageCache struct {
- db *sql.DB
- nop bool
- }
- // newSqliteCache creates a SQLite file-backed cache
- func newSqliteCache(filename string, nop bool) (*messageCache, error) {
- db, err := sql.Open("sqlite3", filename)
- if err != nil {
- return nil, err
- }
- if err := setupCacheDB(db); err != nil {
- return nil, err
- }
- return &messageCache{
- db: db,
- nop: nop,
- }, nil
- }
- // newMemCache creates an in-memory cache
- func newMemCache() (*messageCache, error) {
- return newSqliteCache(createMemoryFilename(), false)
- }
- // newNopCache creates an in-memory cache that discards all messages;
- // it is always empty and can be used if caching is entirely disabled
- func newNopCache() (*messageCache, error) {
- return newSqliteCache(createMemoryFilename(), true)
- }
- // createMemoryFilename creates a unique memory filename to use for the SQLite backend.
- // From mattn/go-sqlite3: "Each connection to ":memory:" opens a brand new in-memory
- // sql database, so if the stdlib's sql engine happens to open another connection and
- // you've only specified ":memory:", that connection will see a brand new database.
- // A workaround is to use "file::memory:?cache=shared" (or "file:foobar?mode=memory&cache=shared").
- // Every connection to this string will point to the same in-memory database."
- func createMemoryFilename() string {
- return fmt.Sprintf("file:%s?mode=memory&cache=shared", util.RandomString(10))
- }
- func (c *messageCache) AddMessage(m *message) error {
- if m.Event != messageEvent {
- return errUnexpectedMessageType
- }
- if c.nop {
- return nil
- }
- published := m.Time <= time.Now().Unix()
- tags := strings.Join(m.Tags, ",")
- var attachmentName, attachmentType, attachmentURL, attachmentOwner string
- var attachmentSize, attachmentExpires int64
- if m.Attachment != nil {
- attachmentName = m.Attachment.Name
- attachmentType = m.Attachment.Type
- attachmentSize = m.Attachment.Size
- attachmentExpires = m.Attachment.Expires
- attachmentURL = m.Attachment.URL
- attachmentOwner = m.Attachment.Owner
- }
- var actionsStr string
- if len(m.Actions) > 0 {
- actionsBytes, err := json.Marshal(m.Actions)
- if err != nil {
- return err
- }
- actionsStr = string(actionsBytes)
- }
- _, err := c.db.Exec(
- insertMessageQuery,
- m.ID,
- m.Time,
- m.Topic,
- m.Message,
- m.Title,
- m.Priority,
- tags,
- m.Click,
- actionsStr,
- attachmentName,
- attachmentType,
- attachmentSize,
- attachmentExpires,
- attachmentURL,
- attachmentOwner,
- m.Encoding,
- published,
- )
- return err
- }
- func (c *messageCache) Messages(topic string, since sinceMarker, scheduled bool) ([]*message, error) {
- if since.IsNone() {
- return make([]*message, 0), nil
- } else if since.IsID() {
- return c.messagesSinceID(topic, since, scheduled)
- }
- return c.messagesSinceTime(topic, since, scheduled)
- }
- func (c *messageCache) messagesSinceTime(topic string, since sinceMarker, scheduled bool) ([]*message, error) {
- var rows *sql.Rows
- var err error
- if scheduled {
- rows, err = c.db.Query(selectMessagesSinceTimeIncludeScheduledQuery, topic, since.Time().Unix())
- } else {
- rows, err = c.db.Query(selectMessagesSinceTimeQuery, topic, since.Time().Unix())
- }
- if err != nil {
- return nil, err
- }
- return readMessages(rows)
- }
- func (c *messageCache) messagesSinceID(topic string, since sinceMarker, scheduled bool) ([]*message, error) {
- idrows, err := c.db.Query(selectRowIDFromMessageID, topic, since.ID())
- if err != nil {
- return nil, err
- }
- defer idrows.Close()
- if !idrows.Next() {
- return c.messagesSinceTime(topic, sinceAllMessages, scheduled)
- }
- var rowID int64
- if err := idrows.Scan(&rowID); err != nil {
- return nil, err
- }
- idrows.Close()
- var rows *sql.Rows
- if scheduled {
- rows, err = c.db.Query(selectMessagesSinceIDIncludeScheduledQuery, topic, rowID)
- } else {
- rows, err = c.db.Query(selectMessagesSinceIDQuery, topic, rowID)
- }
- if err != nil {
- return nil, err
- }
- return readMessages(rows)
- }
- func (c *messageCache) MessagesDue() ([]*message, error) {
- rows, err := c.db.Query(selectMessagesDueQuery, time.Now().Unix())
- if err != nil {
- return nil, err
- }
- return readMessages(rows)
- }
- func (c *messageCache) MarkPublished(m *message) error {
- _, err := c.db.Exec(updateMessagePublishedQuery, m.ID)
- return err
- }
- func (c *messageCache) MessageCount(topic string) (int, error) {
- rows, err := c.db.Query(selectMessageCountForTopicQuery, topic)
- if err != nil {
- return 0, err
- }
- defer rows.Close()
- var count int
- if !rows.Next() {
- return 0, errors.New("no rows found")
- }
- if err := rows.Scan(&count); err != nil {
- return 0, err
- } else if err := rows.Err(); err != nil {
- return 0, err
- }
- return count, nil
- }
- func (c *messageCache) Topics() (map[string]*topic, error) {
- rows, err := c.db.Query(selectTopicsQuery)
- if err != nil {
- return nil, err
- }
- defer rows.Close()
- topics := make(map[string]*topic)
- for rows.Next() {
- var id string
- if err := rows.Scan(&id); err != nil {
- return nil, err
- }
- topics[id] = newTopic(id)
- }
- if err := rows.Err(); err != nil {
- return nil, err
- }
- return topics, nil
- }
- func (c *messageCache) Prune(olderThan time.Time) error {
- _, err := c.db.Exec(pruneMessagesQuery, olderThan.Unix())
- return err
- }
- func (c *messageCache) AttachmentBytesUsed(owner string) (int64, error) {
- rows, err := c.db.Query(selectAttachmentsSizeQuery, owner, time.Now().Unix())
- if err != nil {
- return 0, err
- }
- defer rows.Close()
- var size int64
- if !rows.Next() {
- return 0, errors.New("no rows found")
- }
- if err := rows.Scan(&size); err != nil {
- return 0, err
- } else if err := rows.Err(); err != nil {
- return 0, err
- }
- return size, nil
- }
- func (c *messageCache) AttachmentsExpired() ([]string, error) {
- rows, err := c.db.Query(selectAttachmentsExpiredQuery, time.Now().Unix())
- if err != nil {
- return nil, err
- }
- defer rows.Close()
- ids := make([]string, 0)
- for rows.Next() {
- var id string
- if err := rows.Scan(&id); err != nil {
- return nil, err
- }
- ids = append(ids, id)
- }
- if err := rows.Err(); err != nil {
- return nil, err
- }
- return ids, nil
- }
- func readMessages(rows *sql.Rows) ([]*message, error) {
- defer rows.Close()
- messages := make([]*message, 0)
- for rows.Next() {
- var timestamp, attachmentSize, attachmentExpires int64
- var priority int
- var id, topic, msg, title, tagsStr, click, actionsStr, attachmentName, attachmentType, attachmentURL, attachmentOwner, encoding string
- err := rows.Scan(
- &id,
- ×tamp,
- &topic,
- &msg,
- &title,
- &priority,
- &tagsStr,
- &click,
- &actionsStr,
- &attachmentName,
- &attachmentType,
- &attachmentSize,
- &attachmentExpires,
- &attachmentURL,
- &attachmentOwner,
- &encoding,
- )
- if err != nil {
- return nil, err
- }
- var tags []string
- if tagsStr != "" {
- tags = strings.Split(tagsStr, ",")
- }
- var actions []*action
- if actionsStr != "" {
- if err := json.Unmarshal([]byte(actionsStr), &actions); err != nil {
- return nil, err
- }
- }
- var att *attachment
- if attachmentName != "" && attachmentURL != "" {
- att = &attachment{
- Name: attachmentName,
- Type: attachmentType,
- Size: attachmentSize,
- Expires: attachmentExpires,
- URL: attachmentURL,
- Owner: attachmentOwner,
- }
- }
- messages = append(messages, &message{
- ID: id,
- Time: timestamp,
- Event: messageEvent,
- Topic: topic,
- Message: msg,
- Title: title,
- Priority: priority,
- Tags: tags,
- Click: click,
- Actions: actions,
- Attachment: att,
- Encoding: encoding,
- })
- }
- if err := rows.Err(); err != nil {
- return nil, err
- }
- return messages, nil
- }
- func setupCacheDB(db *sql.DB) error {
- // If 'messages' table does not exist, this must be a new database
- rowsMC, err := db.Query(selectMessagesCountQuery)
- if err != nil {
- return setupNewCacheDB(db)
- }
- rowsMC.Close()
- // If 'messages' table exists, check 'schemaVersion' table
- schemaVersion := 0
- rowsSV, err := db.Query(selectSchemaVersionQuery)
- if err == nil {
- defer rowsSV.Close()
- if !rowsSV.Next() {
- return errors.New("cannot determine schema version: cache file may be corrupt")
- }
- if err := rowsSV.Scan(&schemaVersion); err != nil {
- return err
- }
- rowsSV.Close()
- }
- // Do migrations
- if schemaVersion == currentSchemaVersion {
- return nil
- } else if schemaVersion == 0 {
- return migrateFrom0(db)
- } else if schemaVersion == 1 {
- return migrateFrom1(db)
- } else if schemaVersion == 2 {
- return migrateFrom2(db)
- } else if schemaVersion == 3 {
- return migrateFrom3(db)
- } else if schemaVersion == 4 {
- return migrateFrom4(db)
- } else if schemaVersion == 5 {
- return migrateFrom5(db)
- }
- return fmt.Errorf("unexpected schema version found: %d", schemaVersion)
- }
- func setupNewCacheDB(db *sql.DB) error {
- if _, err := db.Exec(createMessagesTableQuery); err != nil {
- return err
- }
- if _, err := db.Exec(createSchemaVersionTableQuery); err != nil {
- return err
- }
- if _, err := db.Exec(insertSchemaVersion, currentSchemaVersion); err != nil {
- return err
- }
- return nil
- }
- func migrateFrom0(db *sql.DB) error {
- log.Print("Migrating cache database schema: from 0 to 1")
- if _, err := db.Exec(migrate0To1AlterMessagesTableQuery); err != nil {
- return err
- }
- if _, err := db.Exec(createSchemaVersionTableQuery); err != nil {
- return err
- }
- if _, err := db.Exec(insertSchemaVersion, 1); err != nil {
- return err
- }
- return migrateFrom1(db)
- }
- func migrateFrom1(db *sql.DB) error {
- log.Print("Migrating cache database schema: from 1 to 2")
- if _, err := db.Exec(migrate1To2AlterMessagesTableQuery); err != nil {
- return err
- }
- if _, err := db.Exec(updateSchemaVersion, 2); err != nil {
- return err
- }
- return migrateFrom2(db)
- }
- func migrateFrom2(db *sql.DB) error {
- log.Print("Migrating cache database schema: from 2 to 3")
- if _, err := db.Exec(migrate2To3AlterMessagesTableQuery); err != nil {
- return err
- }
- if _, err := db.Exec(updateSchemaVersion, 3); err != nil {
- return err
- }
- return migrateFrom3(db)
- }
- func migrateFrom3(db *sql.DB) error {
- log.Print("Migrating cache database schema: from 3 to 4")
- if _, err := db.Exec(migrate3To4AlterMessagesTableQuery); err != nil {
- return err
- }
- if _, err := db.Exec(updateSchemaVersion, 4); err != nil {
- return err
- }
- return migrateFrom4(db)
- }
- func migrateFrom4(db *sql.DB) error {
- log.Print("Migrating cache database schema: from 4 to 5")
- if _, err := db.Exec(migrate4To5AlterMessagesTableQuery); err != nil {
- return err
- }
- if _, err := db.Exec(updateSchemaVersion, 5); err != nil {
- return err
- }
- return migrateFrom5(db)
- }
- func migrateFrom5(db *sql.DB) error {
- log.Print("Migrating cache database schema: from 5 to 6")
- if _, err := db.Exec(migrate5To6AlterMessagesTableQuery); err != nil {
- return err
- }
- if _, err := db.Exec(updateSchemaVersion, 6); err != nil {
- return err
- }
- return nil // Update this when a new version is added
- }
|