123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310 |
- import { DynamicThreadPool } from 'poolifier'
- import os from 'node:os'
- import fs from 'node:fs/promises'
- import path from 'node:path'
- import cronparser from 'cron-parser'
- import { DateTime } from 'luxon'
- import { v4 as uuid } from 'uuid'
- import { createDeferred } from '../helpers/common.mjs'
- import { camelCase, find, remove } from 'lodash-es'
- export default {
- workerPool: null,
- maxWorkers: 1,
- activeWorkers: 0,
- pollingRef: null,
- scheduledRef: null,
- tasks: null,
- completionPromises: [],
- async init () {
- this.maxWorkers = WIKI.config.scheduler.workers === 'auto' ? (os.cpus().length - 1) : WIKI.config.scheduler.workers
- if (this.maxWorkers < 1) { this.maxWorkers = 1 }
- WIKI.logger.info(`Initializing Worker Pool (Limit: ${this.maxWorkers})...`)
- this.workerPool = new DynamicThreadPool(1, this.maxWorkers, path.join(WIKI.SERVERPATH, 'worker.mjs'), {
- errorHandler: (err) => WIKI.logger.warn(err),
- exitHandler: () => WIKI.logger.debug('A worker has gone offline.'),
- onlineHandler: () => WIKI.logger.debug('New worker is online.')
- })
- this.tasks = {}
- for (const f of (await fs.readdir(path.join(WIKI.SERVERPATH, 'tasks/simple')))) {
- const taskName = camelCase(f.replace('.mjs', ''))
- this.tasks[taskName] = (await import(path.join(WIKI.SERVERPATH, 'tasks/simple', f))).task
- }
- return this
- },
- async start () {
- WIKI.logger.info('Starting Scheduler...')
- // -> Add PostgreSQL Sub Channel
- WIKI.db.listener.addChannel('scheduler', async payload => {
- switch (payload.event) {
- case 'newJob': {
- if (this.activeWorkers < this.maxWorkers) {
- this.activeWorkers++
- await this.processJob()
- this.activeWorkers--
- }
- break
- }
- case 'jobCompleted': {
- const jobPromise = find(this.completionPromises, ['id', payload.id])
- if (jobPromise) {
- if (payload.state === 'success') {
- jobPromise.resolve()
- } else {
- jobPromise.reject(new Error(payload.errorMessage))
- }
- setTimeout(() => {
- remove(this.completionPromises, ['id', payload.id])
- })
- }
- break
- }
- }
- })
- // -> Start scheduled jobs check
- this.scheduledRef = setInterval(async () => {
- this.addScheduled()
- }, WIKI.config.scheduler.scheduledCheck * 1000)
- // -> Add scheduled jobs on init
- await this.addScheduled()
- // -> Start job polling
- this.pollingRef = setInterval(async () => {
- this.processJob()
- }, WIKI.config.scheduler.pollingCheck * 1000)
- WIKI.logger.info('Scheduler: [ STARTED ]')
- },
- /**
- * Add a job to the scheduler
- * @param {Object} opts - Job options
- * @param {string} opts.task - The task name to execute.
- * @param {Object} [opts.payload={}] - An optional data object to pass to the job.
- * @param {Date} [opts.waitUntil] - An optional datetime after which the task is allowed to run.
- * @param {Number} [opts.maxRetries] - The number of times this job can be restarted upon failure. Uses server defaults if not provided.
- * @param {Boolean} [opts.isScheduled=false] - Whether this is a scheduled job.
- * @param {Boolean} [opts.notify=true] - Whether to notify all instances that a new job is available.
- * @param {Boolean} [opts.promise=false] - Whether to return a promise property that resolves when the job completes.
- * @returns {Promise}
- */
- async addJob ({ task, payload = {}, waitUntil, maxRetries, isScheduled = false, notify = true, promise = false }) {
- try {
- const jobId = uuid()
- const jobDefer = createDeferred()
- if (promise) {
- this.completionPromises.push({
- id: jobId,
- added: DateTime.utc(),
- resolve: jobDefer.resolve,
- reject: jobDefer.reject
- })
- }
- await WIKI.db.knex('jobs')
- .insert({
- id: jobId,
- task,
- useWorker: !(typeof this.tasks[task] === 'function'),
- payload,
- maxRetries: maxRetries ?? WIKI.config.scheduler.maxRetries,
- isScheduled,
- waitUntil,
- createdBy: WIKI.INSTANCE_ID
- })
- if (notify) {
- WIKI.db.listener.publish('scheduler', {
- source: WIKI.INSTANCE_ID,
- event: 'newJob',
- id: jobId
- })
- }
- return {
- id: jobId,
- ...promise && { promise: jobDefer.promise }
- }
- } catch (err) {
- WIKI.logger.warn(`Failed to add job to scheduler: ${err.message}`)
- }
- },
- async processJob () {
- let jobIds = []
- try {
- const availableWorkers = this.maxWorkers - this.activeWorkers
- if (availableWorkers < 1) {
- WIKI.logger.debug('All workers are busy. Cannot process more jobs at the moment.')
- return
- }
- await WIKI.db.knex.transaction(async trx => {
- const jobs = await trx('jobs')
- .whereIn('id', WIKI.db.knex.raw(`(SELECT id FROM jobs WHERE ("waitUntil" IS NULL OR "waitUntil" <= NOW()) ORDER BY id FOR UPDATE SKIP LOCKED LIMIT ${availableWorkers})`))
- .returning('*')
- .del()
- if (jobs && jobs.length > 0) {
- for (const job of jobs) {
- WIKI.logger.info(`Processing new job ${job.id}: ${job.task}...`)
- // -> Add to Job History
- await WIKI.db.knex('jobHistory').insert({
- id: job.id,
- task: job.task,
- state: 'active',
- useWorker: job.useWorker,
- wasScheduled: job.isScheduled,
- payload: job.payload,
- attempt: job.retries + 1,
- maxRetries: job.maxRetries,
- executedBy: WIKI.INSTANCE_ID,
- createdAt: job.createdAt
- }).onConflict('id').merge({
- executedBy: WIKI.INSTANCE_ID,
- startedAt: new Date()
- })
- jobIds.push(job.id)
- // -> Start working on it
- try {
- if (job.useWorker) {
- await this.workerPool.execute({
- ...job,
- INSTANCE_ID: `${WIKI.INSTANCE_ID}:WKR`
- })
- } else {
- await this.tasks[job.task](job.payload)
- }
- // -> Update job history (success)
- await WIKI.db.knex('jobHistory').where({
- id: job.id
- }).update({
- state: 'completed',
- completedAt: new Date()
- })
- WIKI.logger.info(`Completed job ${job.id}: ${job.task}`)
- WIKI.db.listener.publish('scheduler', {
- source: WIKI.INSTANCE_ID,
- event: 'jobCompleted',
- state: 'success',
- id: job.id
- })
- } catch (err) {
- WIKI.logger.warn(`Failed to complete job ${job.id}: ${job.task} [ FAILED ]`)
- WIKI.logger.warn(err)
- // -> Update job history (fail)
- await WIKI.db.knex('jobHistory').where({
- id: job.id
- }).update({
- attempt: job.retries + 1,
- state: 'failed',
- lastErrorMessage: err.message
- })
- WIKI.db.listener.publish('scheduler', {
- source: WIKI.INSTANCE_ID,
- event: 'jobCompleted',
- state: 'failed',
- id: job.id,
- errorMessage: err.message
- })
- // -> Reschedule for retry
- if (job.retries < job.maxRetries) {
- const backoffDelay = (2 ** job.retries) * WIKI.config.scheduler.retryBackoff
- await trx('jobs').insert({
- ...job,
- retries: job.retries + 1,
- waitUntil: DateTime.utc().plus({ seconds: backoffDelay }).toJSDate(),
- updatedAt: new Date()
- })
- WIKI.logger.warn(`Rescheduling new attempt for job ${job.id}: ${job.task}...`)
- }
- }
- }
- }
- })
- } catch (err) {
- WIKI.logger.warn(err)
- if (jobIds && jobIds.length > 0) {
- WIKI.db.knex('jobHistory').whereIn('id', jobIds).update({
- state: 'interrupted',
- lastErrorMessage: err.message
- })
- }
- }
- },
- async addScheduled () {
- try {
- await WIKI.db.knex.transaction(async trx => {
- // -> Acquire lock
- const jobLock = await trx('jobLock')
- .where(
- 'key',
- WIKI.db.knex('jobLock')
- .select('key')
- .where('key', 'cron')
- .andWhere('lastCheckedAt', '<=', DateTime.utc().minus({ minutes: 5 }).toISO())
- .forUpdate()
- .skipLocked()
- .limit(1)
- ).update({
- lastCheckedBy: WIKI.INSTANCE_ID,
- lastCheckedAt: DateTime.utc().toISO()
- })
- if (jobLock > 0) {
- WIKI.logger.info(`Scheduling future planned jobs...`)
- const scheduledJobs = await WIKI.db.knex('jobSchedule')
- if (scheduledJobs?.length > 0) {
- // -> Get existing scheduled jobs
- const existingJobs = await WIKI.db.knex('jobs').where('isScheduled', true)
- let totalAdded = 0
- for (const job of scheduledJobs) {
- // -> Get next planned iterations
- const plannedIterations = cronparser.parseExpression(job.cron, {
- startDate: DateTime.utc().toJSDate(),
- endDate: DateTime.utc().plus({ days: 1, minutes: 5 }).toJSDate(),
- iterator: true,
- tz: 'UTC'
- })
- // -> Add a maximum of 10 future iterations for a single task
- let addedFutureJobs = 0
- while (true) {
- try {
- const next = plannedIterations.next()
- // -> Ensure this iteration isn't already scheduled
- if (!existingJobs.some(j => j.task === job.task && j.waitUntil.getTime() === next.value.getTime())) {
- this.addJob({
- task: job.task,
- useWorker: !(typeof this.tasks[job.task] === 'function'),
- payload: job.payload,
- isScheduled: true,
- waitUntil: next.value.toISOString(),
- notify: false
- })
- addedFutureJobs++
- totalAdded++
- }
- // -> No more iterations for this period or max iterations count reached
- if (next.done || addedFutureJobs >= 10) { break }
- } catch (err) {
- break
- }
- }
- }
- if (totalAdded > 0) {
- WIKI.logger.info(`Scheduled ${totalAdded} new future planned jobs: [ OK ]`)
- } else {
- WIKI.logger.info(`No new future planned jobs to schedule: [ OK ]`)
- }
- }
- }
- })
- } catch (err) {
- WIKI.logger.warn(err)
- }
- },
- async stop () {
- WIKI.logger.info('Stopping Scheduler...')
- clearInterval(this.scheduledRef)
- clearInterval(this.pollingRef)
- await this.workerPool.destroy()
- WIKI.logger.info('Scheduler: [ STOPPED ]')
- }
- }
|