123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243 |
- import { create, get, has, isEmpty, isPlainObject } from 'lodash-es'
- import path from 'node:path'
- import knex from 'knex'
- import fs from 'node:fs/promises'
- import Objection from 'objection'
- import PGPubSub from 'pg-pubsub'
- import semver from 'semver'
- import { createDeferred } from '../helpers/common.mjs'
- import migrationSource from '../db/migrator-source.mjs'
- // const migrateFromLegacy = require('../db/legacy')
- import { setTimeout } from 'node:timers/promises'
- /**
- * ORM DB module
- */
- export default {
- Objection,
- knex: null,
- listener: null,
- config: null,
- VERSION: null,
- LEGACY: false,
- onReady: createDeferred(),
- connectAttempts: 0,
- /**
- * Initialize DB
- */
- async init (workerMode = false) {
- WIKI.logger.info('Checking DB configuration...')
- // Fetch DB Config
- this.config = (!isEmpty(process.env.DATABASE_URL))
- ? process.env.DATABASE_URL
- : {
- host: WIKI.config.db.host.toString(),
- user: WIKI.config.db.user.toString(),
- password: WIKI.config.db.pass.toString(),
- database: WIKI.config.db.db.toString(),
- port: WIKI.config.db.port
- }
- // Handle SSL Options
- let dbUseSSL = (WIKI.config.db.ssl === true || WIKI.config.db.ssl === 'true' || WIKI.config.db.ssl === 1 || WIKI.config.db.ssl === '1')
- let sslOptions = null
- if (dbUseSSL && isPlainObject(this.config) && get(WIKI.config.db, 'sslOptions.auto', null) === false) {
- sslOptions = WIKI.config.db.sslOptions
- sslOptions.rejectUnauthorized = sslOptions.rejectUnauthorized !== false
- if (sslOptions.ca && sslOptions.ca.indexOf('-----') !== 0) {
- sslOptions.ca = await fs.readFile(path.resolve(WIKI.ROOTPATH, sslOptions.ca), 'utf-8')
- }
- if (sslOptions.cert) {
- sslOptions.cert = await fs.readFile(path.resolve(WIKI.ROOTPATH, sslOptions.cert), 'utf-8')
- }
- if (sslOptions.key) {
- sslOptions.key = await fs.readFile(path.resolve(WIKI.ROOTPATH, sslOptions.key), 'utf-8')
- }
- if (sslOptions.pfx) {
- sslOptions.pfx = await fs.readFile(path.resolve(WIKI.ROOTPATH, sslOptions.pfx), 'utf-8')
- }
- } else {
- sslOptions = true
- }
- // Handle inline SSL CA Certificate mode
- if (!isEmpty(process.env.DB_SSL_CA)) {
- const chunks = []
- for (let i = 0, charsLength = process.env.DB_SSL_CA.length; i < charsLength; i += 64) {
- chunks.push(process.env.DB_SSL_CA.substring(i, i + 64))
- }
- dbUseSSL = true
- sslOptions = {
- rejectUnauthorized: true,
- ca: '-----BEGIN CERTIFICATE-----\n' + chunks.join('\n') + '\n-----END CERTIFICATE-----\n'
- }
- }
- if (dbUseSSL && isPlainObject(this.config)) {
- this.config.ssl = (sslOptions === true) ? { rejectUnauthorized: true } : sslOptions
- }
- // Initialize Knex
- this.knex = knex({
- client: 'pg',
- useNullAsDefault: true,
- asyncStackTraces: WIKI.IS_DEBUG,
- connection: this.config,
- searchPath: [WIKI.config.db.schema],
- pool: {
- ...workerMode ? { min: 0, max: 1 } : WIKI.config.pool
- // FIXME: Throws DeprecatingWarning because Knex encapsulates this into a promisify...
- // async afterCreate (conn, done) {
- // // -> Set Connection App Name
- // if (workerMode) {
- // await conn.query(`set application_name = 'Wiki.js - ${WIKI.INSTANCE_ID}'`)
- // } else {
- // await conn.query(`set application_name = 'Wiki.js - ${WIKI.INSTANCE_ID}:MAIN'`)
- // }
- // done()
- // }
- },
- debug: WIKI.IS_DEBUG
- })
- Objection.Model.knex(this.knex)
- // Load DB Models
- WIKI.logger.info('Loading DB models...')
- const models = (await import(path.join(WIKI.SERVERPATH, 'models/index.mjs'))).default
- // Connect
- await this.connect()
- // Check DB Version
- const resVersion = await this.knex.raw('SHOW server_version;')
- const dbVersion = semver.coerce(resVersion.rows[0].server_version, { loose: true })
- this.VERSION = dbVersion.version
- this.LEGACY = dbVersion.major < 16
- if (dbVersion.major < 12) {
- WIKI.logger.error(`Your PostgreSQL database version (${dbVersion.major}) is too old and unsupported by Wiki.js. Requires >= 12. Exiting...`)
- process.exit(1)
- }
- WIKI.logger.info(`PostgreSQL ${dbVersion.version} [ ${this.LEGACY ? 'LEGACY MODE' : 'OK'} ]`)
- // Run Migrations
- if (!workerMode) {
- await this.migrateFromLegacy()
- await this.syncSchemas()
- }
- return {
- ...this,
- ...models
- }
- },
- /**
- * Subscribe to database LISTEN / NOTIFY for multi-instances events
- */
- async subscribeToNotifications () {
- let connSettings = this.knex.client.connectionSettings
- if (typeof connSettings === 'string') {
- const encodedName = encodeURIComponent(`Wiki.js - ${WIKI.INSTANCE_ID}:PSUB`)
- if (connSettings.indexOf('?') > 0) {
- connSettings = `${connSettings}&ApplicationName=${encodedName}`
- } else {
- connSettings = `${connSettings}?ApplicationName=${encodedName}`
- }
- } else {
- connSettings.application_name = `Wiki.js - ${WIKI.INSTANCE_ID}:PSUB`
- }
- this.listener = new PGPubSub(connSettings, {
- log (ev) {
- WIKI.logger.debug(ev)
- }
- })
- // -> Outbound events handling
- this.listener.addChannel('wiki', payload => {
- if (has(payload, 'event') && payload.source !== WIKI.INSTANCE_ID) {
- WIKI.logger.info(`Received event ${payload.event} from instance ${payload.source}: [ OK ]`)
- WIKI.events.inbound.emit(payload.event, payload.value)
- }
- })
- WIKI.events.outbound.onAny(this.notifyViaDB)
- // -> Listen to inbound events
- WIKI.auth.subscribeToEvents()
- WIKI.configSvc.subscribeToEvents()
- WIKI.db.pages.subscribeToEvents()
- WIKI.logger.info('PG PubSub Listener initialized successfully: [ OK ]')
- },
- /**
- * Unsubscribe from database LISTEN / NOTIFY
- */
- async unsubscribeToNotifications () {
- if (this.listener) {
- WIKI.events.outbound.offAny(this.notifyViaDB)
- WIKI.events.inbound.removeAllListeners()
- this.listener.close()
- }
- },
- /**
- * Publish event via database NOTIFY
- *
- * @param {string} event Event fired
- * @param {object} value Payload of the event
- */
- notifyViaDB (event, value) {
- WIKI.db.listener.publish('wiki', {
- source: WIKI.INSTANCE_ID,
- event,
- value
- })
- },
- /**
- * Attempt initial connection
- */
- async connect () {
- try {
- WIKI.logger.info('Connecting to database...')
- await this.knex.raw('SELECT 1 + 1;')
- WIKI.logger.info('Database Connection Successful [ OK ]')
- } catch (err) {
- if (this.connectAttempts < 10) {
- if (err.code) {
- WIKI.logger.error(`Database Connection Error: ${err.code} ${err.address}:${err.port}`)
- } else {
- WIKI.logger.error(`Database Connection Error: ${err.message}`)
- }
- WIKI.logger.warn(`Will retry in 3 seconds... [Attempt ${++this.connectAttempts} of 10]`)
- await setTimeout(3000)
- await this.connect()
- } else {
- throw err
- }
- }
- },
- /**
- * Migrate DB Schemas
- */
- async syncSchemas () {
- WIKI.logger.info('Ensuring DB schema exists...')
- await this.knex.raw(`CREATE SCHEMA IF NOT EXISTS ${WIKI.config.db.schema}`)
- WIKI.logger.info('Ensuring DB migrations have been applied...')
- return this.knex.migrate.latest({
- tableName: 'migrations',
- migrationSource,
- schemaName: WIKI.config.db.schema
- })
- },
- /**
- * Migrate DB Schemas from 2.x
- */
- async migrateFromLegacy () {
- // return migrateFromLegacy.migrate(self.knex)
- }
- }
|