db.mjs 7.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243
  1. import { create, get, has, isEmpty, isPlainObject } from 'lodash-es'
  2. import path from 'node:path'
  3. import knex from 'knex'
  4. import fs from 'node:fs/promises'
  5. import Objection from 'objection'
  6. import PGPubSub from 'pg-pubsub'
  7. import semver from 'semver'
  8. import { createDeferred } from '../helpers/common.mjs'
  9. import migrationSource from '../db/migrator-source.mjs'
  10. // const migrateFromLegacy = require('../db/legacy')
  11. import { setTimeout } from 'node:timers/promises'
  12. /**
  13. * ORM DB module
  14. */
  15. export default {
  16. Objection,
  17. knex: null,
  18. listener: null,
  19. config: null,
  20. VERSION: null,
  21. LEGACY: false,
  22. onReady: createDeferred(),
  23. connectAttempts: 0,
  24. /**
  25. * Initialize DB
  26. */
  27. async init (workerMode = false) {
  28. WIKI.logger.info('Checking DB configuration...')
  29. // Fetch DB Config
  30. this.config = (!isEmpty(process.env.DATABASE_URL))
  31. ? process.env.DATABASE_URL
  32. : {
  33. host: WIKI.config.db.host.toString(),
  34. user: WIKI.config.db.user.toString(),
  35. password: WIKI.config.db.pass.toString(),
  36. database: WIKI.config.db.db.toString(),
  37. port: WIKI.config.db.port
  38. }
  39. // Handle SSL Options
  40. let dbUseSSL = (WIKI.config.db.ssl === true || WIKI.config.db.ssl === 'true' || WIKI.config.db.ssl === 1 || WIKI.config.db.ssl === '1')
  41. let sslOptions = null
  42. if (dbUseSSL && isPlainObject(this.config) && get(WIKI.config.db, 'sslOptions.auto', null) === false) {
  43. sslOptions = WIKI.config.db.sslOptions
  44. sslOptions.rejectUnauthorized = sslOptions.rejectUnauthorized !== false
  45. if (sslOptions.ca && sslOptions.ca.indexOf('-----') !== 0) {
  46. sslOptions.ca = await fs.readFile(path.resolve(WIKI.ROOTPATH, sslOptions.ca), 'utf-8')
  47. }
  48. if (sslOptions.cert) {
  49. sslOptions.cert = await fs.readFile(path.resolve(WIKI.ROOTPATH, sslOptions.cert), 'utf-8')
  50. }
  51. if (sslOptions.key) {
  52. sslOptions.key = await fs.readFile(path.resolve(WIKI.ROOTPATH, sslOptions.key), 'utf-8')
  53. }
  54. if (sslOptions.pfx) {
  55. sslOptions.pfx = await fs.readFile(path.resolve(WIKI.ROOTPATH, sslOptions.pfx), 'utf-8')
  56. }
  57. } else {
  58. sslOptions = true
  59. }
  60. // Handle inline SSL CA Certificate mode
  61. if (!isEmpty(process.env.DB_SSL_CA)) {
  62. const chunks = []
  63. for (let i = 0, charsLength = process.env.DB_SSL_CA.length; i < charsLength; i += 64) {
  64. chunks.push(process.env.DB_SSL_CA.substring(i, i + 64))
  65. }
  66. dbUseSSL = true
  67. sslOptions = {
  68. rejectUnauthorized: true,
  69. ca: '-----BEGIN CERTIFICATE-----\n' + chunks.join('\n') + '\n-----END CERTIFICATE-----\n'
  70. }
  71. }
  72. if (dbUseSSL && isPlainObject(this.config)) {
  73. this.config.ssl = (sslOptions === true) ? { rejectUnauthorized: true } : sslOptions
  74. }
  75. // Initialize Knex
  76. this.knex = knex({
  77. client: 'pg',
  78. useNullAsDefault: true,
  79. asyncStackTraces: WIKI.IS_DEBUG,
  80. connection: this.config,
  81. searchPath: [WIKI.config.db.schema],
  82. pool: {
  83. ...workerMode ? { min: 0, max: 1 } : WIKI.config.pool
  84. // FIXME: Throws DeprecatingWarning because Knex encapsulates this into a promisify...
  85. // async afterCreate (conn, done) {
  86. // // -> Set Connection App Name
  87. // if (workerMode) {
  88. // await conn.query(`set application_name = 'Wiki.js - ${WIKI.INSTANCE_ID}'`)
  89. // } else {
  90. // await conn.query(`set application_name = 'Wiki.js - ${WIKI.INSTANCE_ID}:MAIN'`)
  91. // }
  92. // done()
  93. // }
  94. },
  95. debug: WIKI.IS_DEBUG
  96. })
  97. Objection.Model.knex(this.knex)
  98. // Load DB Models
  99. WIKI.logger.info('Loading DB models...')
  100. const models = (await import(path.join(WIKI.SERVERPATH, 'models/index.mjs'))).default
  101. // Connect
  102. await this.connect()
  103. // Check DB Version
  104. const resVersion = await this.knex.raw('SHOW server_version;')
  105. const dbVersion = semver.coerce(resVersion.rows[0].server_version, { loose: true })
  106. this.VERSION = dbVersion.version
  107. this.LEGACY = dbVersion.major < 16
  108. if (dbVersion.major < 12) {
  109. WIKI.logger.error(`Your PostgreSQL database version (${dbVersion.major}) is too old and unsupported by Wiki.js. Requires >= 12. Exiting...`)
  110. process.exit(1)
  111. }
  112. WIKI.logger.info(`PostgreSQL ${dbVersion.version} [ ${this.LEGACY ? 'LEGACY MODE' : 'OK'} ]`)
  113. // Run Migrations
  114. if (!workerMode) {
  115. await this.migrateFromLegacy()
  116. await this.syncSchemas()
  117. }
  118. return {
  119. ...this,
  120. ...models
  121. }
  122. },
  123. /**
  124. * Subscribe to database LISTEN / NOTIFY for multi-instances events
  125. */
  126. async subscribeToNotifications () {
  127. let connSettings = this.knex.client.connectionSettings
  128. if (typeof connSettings === 'string') {
  129. const encodedName = encodeURIComponent(`Wiki.js - ${WIKI.INSTANCE_ID}:PSUB`)
  130. if (connSettings.indexOf('?') > 0) {
  131. connSettings = `${connSettings}&ApplicationName=${encodedName}`
  132. } else {
  133. connSettings = `${connSettings}?ApplicationName=${encodedName}`
  134. }
  135. } else {
  136. connSettings.application_name = `Wiki.js - ${WIKI.INSTANCE_ID}:PSUB`
  137. }
  138. this.listener = new PGPubSub(connSettings, {
  139. log (ev) {
  140. WIKI.logger.debug(ev)
  141. }
  142. })
  143. // -> Outbound events handling
  144. this.listener.addChannel('wiki', payload => {
  145. if (has(payload, 'event') && payload.source !== WIKI.INSTANCE_ID) {
  146. WIKI.logger.info(`Received event ${payload.event} from instance ${payload.source}: [ OK ]`)
  147. WIKI.events.inbound.emit(payload.event, payload.value)
  148. }
  149. })
  150. WIKI.events.outbound.onAny(this.notifyViaDB)
  151. // -> Listen to inbound events
  152. WIKI.auth.subscribeToEvents()
  153. WIKI.configSvc.subscribeToEvents()
  154. WIKI.db.pages.subscribeToEvents()
  155. WIKI.logger.info('PG PubSub Listener initialized successfully: [ OK ]')
  156. },
  157. /**
  158. * Unsubscribe from database LISTEN / NOTIFY
  159. */
  160. async unsubscribeToNotifications () {
  161. if (this.listener) {
  162. WIKI.events.outbound.offAny(this.notifyViaDB)
  163. WIKI.events.inbound.removeAllListeners()
  164. this.listener.close()
  165. }
  166. },
  167. /**
  168. * Publish event via database NOTIFY
  169. *
  170. * @param {string} event Event fired
  171. * @param {object} value Payload of the event
  172. */
  173. notifyViaDB (event, value) {
  174. WIKI.db.listener.publish('wiki', {
  175. source: WIKI.INSTANCE_ID,
  176. event,
  177. value
  178. })
  179. },
  180. /**
  181. * Attempt initial connection
  182. */
  183. async connect () {
  184. try {
  185. WIKI.logger.info('Connecting to database...')
  186. await this.knex.raw('SELECT 1 + 1;')
  187. WIKI.logger.info('Database Connection Successful [ OK ]')
  188. } catch (err) {
  189. if (this.connectAttempts < 10) {
  190. if (err.code) {
  191. WIKI.logger.error(`Database Connection Error: ${err.code} ${err.address}:${err.port}`)
  192. } else {
  193. WIKI.logger.error(`Database Connection Error: ${err.message}`)
  194. }
  195. WIKI.logger.warn(`Will retry in 3 seconds... [Attempt ${++this.connectAttempts} of 10]`)
  196. await setTimeout(3000)
  197. await this.connect()
  198. } else {
  199. throw err
  200. }
  201. }
  202. },
  203. /**
  204. * Migrate DB Schemas
  205. */
  206. async syncSchemas () {
  207. WIKI.logger.info('Ensuring DB schema exists...')
  208. await this.knex.raw(`CREATE SCHEMA IF NOT EXISTS ${WIKI.config.db.schema}`)
  209. WIKI.logger.info('Ensuring DB migrations have been applied...')
  210. return this.knex.migrate.latest({
  211. tableName: 'migrations',
  212. migrationSource,
  213. schemaName: WIKI.config.db.schema
  214. })
  215. },
  216. /**
  217. * Migrate DB Schemas from 2.x
  218. */
  219. async migrateFromLegacy () {
  220. // return migrateFromLegacy.migrate(self.knex)
  221. }
  222. }