message_cache_sqlite_test.go 7.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254
  1. package server
  2. import (
  3. "database/sql"
  4. "fmt"
  5. "github.com/stretchr/testify/assert"
  6. "github.com/stretchr/testify/require"
  7. "testing"
  8. "time"
  9. )
  10. func TestSqliteCache_Migration_From0(t *testing.T) {
  11. filename := newSqliteTestCacheFile(t)
  12. db, err := sql.Open("sqlite3", filename)
  13. require.Nil(t, err)
  14. // Create "version 0" schema
  15. _, err = db.Exec(`
  16. BEGIN;
  17. CREATE TABLE IF NOT EXISTS messages (
  18. id VARCHAR(20) PRIMARY KEY,
  19. time INT NOT NULL,
  20. topic VARCHAR(64) NOT NULL,
  21. message VARCHAR(1024) NOT NULL
  22. );
  23. CREATE INDEX IF NOT EXISTS idx_topic ON messages (topic);
  24. COMMIT;
  25. `)
  26. require.Nil(t, err)
  27. // Insert a bunch of messages
  28. for i := 0; i < 10; i++ {
  29. _, err = db.Exec(`INSERT INTO messages (id, time, topic, message) VALUES (?, ?, ?, ?)`,
  30. fmt.Sprintf("abcd%d", i), time.Now().Unix(), "mytopic", fmt.Sprintf("some message %d", i))
  31. require.Nil(t, err)
  32. }
  33. require.Nil(t, db.Close())
  34. // Create cache to trigger migration
  35. c := newSqliteTestCacheFromFile(t, filename, "")
  36. checkSchemaVersion(t, c.db)
  37. messages, err := c.Messages("mytopic", sinceAllMessages, false)
  38. require.Nil(t, err)
  39. require.Equal(t, 10, len(messages))
  40. require.Equal(t, "some message 5", messages[5].Message)
  41. require.Equal(t, "", messages[5].Title)
  42. require.Nil(t, messages[5].Tags)
  43. require.Equal(t, 0, messages[5].Priority)
  44. }
  45. func TestSqliteCache_Migration_From1(t *testing.T) {
  46. filename := newSqliteTestCacheFile(t)
  47. db, err := sql.Open("sqlite3", filename)
  48. require.Nil(t, err)
  49. // Create "version 1" schema
  50. _, err = db.Exec(`
  51. CREATE TABLE IF NOT EXISTS messages (
  52. id VARCHAR(20) PRIMARY KEY,
  53. time INT NOT NULL,
  54. topic VARCHAR(64) NOT NULL,
  55. message VARCHAR(512) NOT NULL,
  56. title VARCHAR(256) NOT NULL,
  57. priority INT NOT NULL,
  58. tags VARCHAR(256) NOT NULL
  59. );
  60. CREATE INDEX IF NOT EXISTS idx_topic ON messages (topic);
  61. CREATE TABLE IF NOT EXISTS schemaVersion (
  62. id INT PRIMARY KEY,
  63. version INT NOT NULL
  64. );
  65. INSERT INTO schemaVersion (id, version) VALUES (1, 1);
  66. `)
  67. require.Nil(t, err)
  68. // Insert a bunch of messages
  69. for i := 0; i < 10; i++ {
  70. _, err = db.Exec(`INSERT INTO messages (id, time, topic, message, title, priority, tags) VALUES (?, ?, ?, ?, ?, ?, ?)`,
  71. fmt.Sprintf("abcd%d", i), time.Now().Unix(), "mytopic", fmt.Sprintf("some message %d", i), "", 0, "")
  72. require.Nil(t, err)
  73. }
  74. require.Nil(t, db.Close())
  75. // Create cache to trigger migration
  76. c := newSqliteTestCacheFromFile(t, filename, "")
  77. checkSchemaVersion(t, c.db)
  78. // Add delayed message
  79. delayedMessage := newDefaultMessage("mytopic", "some delayed message")
  80. delayedMessage.Time = time.Now().Add(time.Minute).Unix()
  81. require.Nil(t, c.AddMessage(delayedMessage))
  82. // 10, not 11!
  83. messages, err := c.Messages("mytopic", sinceAllMessages, false)
  84. require.Nil(t, err)
  85. require.Equal(t, 10, len(messages))
  86. // 11!
  87. messages, err = c.Messages("mytopic", sinceAllMessages, true)
  88. require.Nil(t, err)
  89. require.Equal(t, 11, len(messages))
  90. }
  91. func TestSqliteCache_Migration_From9(t *testing.T) {
  92. // This primarily tests the awkward migration that introduces the "expires" column.
  93. // The migration logic has to update the column, using the existing "cache-duration" value.
  94. filename := newSqliteTestCacheFile(t)
  95. db, err := sql.Open("sqlite3", filename)
  96. require.Nil(t, err)
  97. // Create "version 8" schema
  98. _, err = db.Exec(`
  99. BEGIN;
  100. CREATE TABLE IF NOT EXISTS messages (
  101. id INTEGER PRIMARY KEY AUTOINCREMENT,
  102. mid TEXT NOT NULL,
  103. time INT NOT NULL,
  104. topic TEXT NOT NULL,
  105. message TEXT NOT NULL,
  106. title TEXT NOT NULL,
  107. priority INT NOT NULL,
  108. tags TEXT NOT NULL,
  109. click TEXT NOT NULL,
  110. icon TEXT NOT NULL,
  111. actions TEXT NOT NULL,
  112. attachment_name TEXT NOT NULL,
  113. attachment_type TEXT NOT NULL,
  114. attachment_size INT NOT NULL,
  115. attachment_expires INT NOT NULL,
  116. attachment_url TEXT NOT NULL,
  117. sender TEXT NOT NULL,
  118. encoding TEXT NOT NULL,
  119. published INT NOT NULL
  120. );
  121. CREATE INDEX IF NOT EXISTS idx_mid ON messages (mid);
  122. CREATE INDEX IF NOT EXISTS idx_time ON messages (time);
  123. CREATE INDEX IF NOT EXISTS idx_topic ON messages (topic);
  124. CREATE TABLE IF NOT EXISTS schemaVersion (
  125. id INT PRIMARY KEY,
  126. version INT NOT NULL
  127. );
  128. INSERT INTO schemaVersion (id, version) VALUES (1, 9);
  129. COMMIT;
  130. `)
  131. require.Nil(t, err)
  132. // Insert a bunch of messages
  133. insertQuery := `
  134. INSERT INTO messages (mid, time, topic, message, title, priority, tags, click, icon, actions, attachment_name, attachment_type, attachment_size, attachment_expires, attachment_url, sender, encoding, published)
  135. VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
  136. `
  137. for i := 0; i < 10; i++ {
  138. _, err = db.Exec(
  139. insertQuery,
  140. fmt.Sprintf("abcd%d", i),
  141. time.Now().Unix(),
  142. "mytopic",
  143. fmt.Sprintf("some message %d", i),
  144. "", // title
  145. 0, // priority
  146. "", // tags
  147. "", // click
  148. "", // icon
  149. "", // actions
  150. "", // attachment_name
  151. "", // attachment_type
  152. 0, // attachment_size
  153. 0, // attachment_type
  154. "", // attachment_url
  155. "9.9.9.9", // sender
  156. "", // encoding
  157. 1, // published
  158. )
  159. require.Nil(t, err)
  160. }
  161. // Create cache to trigger migration
  162. cacheDuration := 17 * time.Hour
  163. c, err := newSqliteMessageCache(filename, "", cacheDuration, 0, 0, false)
  164. require.Nil(t, err)
  165. checkSchemaVersion(t, c.db)
  166. // Check version
  167. rows, err := db.Query(`SELECT version FROM main.schemaVersion WHERE id = 1`)
  168. require.Nil(t, err)
  169. require.True(t, rows.Next())
  170. var version int
  171. require.Nil(t, rows.Scan(&version))
  172. require.Equal(t, currentSchemaVersion, version)
  173. messages, err := c.Messages("mytopic", sinceAllMessages, false)
  174. require.Nil(t, err)
  175. require.Equal(t, 10, len(messages))
  176. for _, m := range messages {
  177. require.True(t, m.Expires > time.Now().Add(cacheDuration-5*time.Second).Unix())
  178. require.True(t, m.Expires < time.Now().Add(cacheDuration+5*time.Second).Unix())
  179. }
  180. }
  181. func TestSqliteCache_StartupQueries_WAL(t *testing.T) {
  182. filename := newSqliteTestCacheFile(t)
  183. startupQueries := `pragma journal_mode = WAL;
  184. pragma synchronous = normal;
  185. pragma temp_store = memory;`
  186. db, err := newSqliteMessageCache(filename, startupQueries, time.Hour, 0, 0, false)
  187. require.Nil(t, err)
  188. require.Nil(t, db.AddMessage(newDefaultMessage("mytopic", "some message")))
  189. require.FileExists(t, filename)
  190. require.FileExists(t, filename+"-wal")
  191. require.FileExists(t, filename+"-shm")
  192. }
  193. func TestSqliteCache_StartupQueries_None(t *testing.T) {
  194. filename := newSqliteTestCacheFile(t)
  195. startupQueries := ""
  196. db, err := newSqliteMessageCache(filename, startupQueries, time.Hour, 0, 0, false)
  197. require.Nil(t, err)
  198. require.Nil(t, db.AddMessage(newDefaultMessage("mytopic", "some message")))
  199. require.FileExists(t, filename)
  200. require.NoFileExists(t, filename+"-wal")
  201. require.NoFileExists(t, filename+"-shm")
  202. }
  203. func TestSqliteCache_StartupQueries_Fail(t *testing.T) {
  204. filename := newSqliteTestCacheFile(t)
  205. startupQueries := `xx error`
  206. _, err := newSqliteMessageCache(filename, startupQueries, time.Hour, 0, 0, false)
  207. require.Error(t, err)
  208. }
  209. func TestMemCache_NopCache(t *testing.T) {
  210. c, _ := newNopCache()
  211. assert.Nil(t, c.AddMessage(newDefaultMessage("mytopic", "my message")))
  212. messages, err := c.Messages("mytopic", sinceAllMessages, false)
  213. assert.Nil(t, err)
  214. assert.Empty(t, messages)
  215. topics, err := c.Topics()
  216. assert.Nil(t, err)
  217. assert.Empty(t, topics)
  218. }
  219. func checkSchemaVersion(t *testing.T, db *sql.DB) {
  220. rows, err := db.Query(`SELECT version FROM schemaVersion`)
  221. require.Nil(t, err)
  222. require.True(t, rows.Next())
  223. var schemaVersion int
  224. require.Nil(t, rows.Scan(&schemaVersion))
  225. require.Equal(t, currentSchemaVersion, schemaVersion)
  226. require.Nil(t, rows.Close())
  227. }