123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134 |
- const moment = require('moment')
- const childProcess = require('child_process')
- const _ = require('lodash')
- const configHelper = require('../helpers/config')
- /* global WIKI */
- class Job {
- constructor({
- name,
- immediate = false,
- schedule = 'P1D',
- repeat = false,
- worker = false
- }, queue) {
- this.queue = queue
- this.finished = Promise.resolve()
- this.name = name
- this.immediate = immediate
- this.schedule = moment.duration(schedule)
- this.repeat = repeat
- this.worker = worker
- }
- /**
- * Start Job
- *
- * @param {Object} data Job Data
- */
- start(data) {
- this.queue.jobs.push(this)
- if (this.immediate) {
- this.invoke(data)
- } else {
- this.enqueue(data)
- }
- }
- /**
- * Queue the next job run according to the wait duration
- *
- * @param {Object} data Job Data
- */
- enqueue(data) {
- this.timeout = setTimeout(this.invoke.bind(this), this.schedule.asMilliseconds(), data)
- }
- /**
- * Run the actual job
- *
- * @param {Object} data Job Data
- */
- async invoke(data) {
- try {
- if (this.worker) {
- const proc = childProcess.fork(`server/core/worker.js`, [
- `--job=${this.name}`,
- `--data=${data}`
- ], {
- cwd: WIKI.ROOTPATH,
- stdio: ['inherit', 'inherit', 'pipe', 'ipc']
- })
- const stderr = []
- proc.stderr.on('data', chunk => stderr.push(chunk))
- this.finished = new Promise((resolve, reject) => {
- proc.on('exit', (code, signal) => {
- const data = Buffer.concat(stderr).toString()
- if (code === 0) {
- resolve(data)
- } else {
- const err = new Error(`Error when running job ${this.name}: ${data}`)
- err.exitSignal = signal
- err.exitCode = code
- err.stderr = data
- reject(err)
- }
- proc.kill()
- })
- })
- } else {
- this.finished = require(`../jobs/${this.name}`)(data)
- }
- await this.finished
- } catch (err) {
- WIKI.logger.warn(err)
- }
- if (this.repeat && this.queue.jobs.includes(this)) {
- this.enqueue(data)
- } else {
- this.stop().catch(() => {})
- }
- }
- /**
- * Stop any future job invocation from occuring
- */
- async stop() {
- clearTimeout(this.timeout)
- this.queue.jobs = this.queue.jobs.filter(x => x !== this)
- return this.finished
- }
- }
- module.exports = {
- jobs: [],
- init() {
- return this
- },
- start() {
- _.forOwn(WIKI.data.jobs, (queueParams, queueName) => {
- if (WIKI.config.offline && queueParams.offlineSkip) {
- WIKI.logger.warn(`Skipping job ${queueName} because offline mode is enabled. [SKIPPED]`)
- return
- }
- const schedule = (configHelper.isValidDurationString(queueParams.schedule)) ? queueParams.schedule : 'P1D'
- this.registerJob({
- name: _.kebabCase(queueName),
- immediate: _.get(queueParams, 'onInit', false),
- schedule: schedule,
- repeat: _.get(queueParams, 'repeat', false),
- worker: _.get(queueParams, 'worker', false)
- })
- })
- },
- registerJob(opts, data) {
- const job = new Job(opts, this)
- job.start(data)
- return job
- },
- async stop() {
- return Promise.all(this.jobs.map(job => job.stop()))
- }
- }
|