scheduler.mjs 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310
  1. import { DynamicThreadPool } from 'poolifier'
  2. import os from 'node:os'
  3. import fs from 'node:fs/promises'
  4. import path from 'node:path'
  5. import cronparser from 'cron-parser'
  6. import { DateTime } from 'luxon'
  7. import { v4 as uuid } from 'uuid'
  8. import { createDeferred } from '../helpers/common.mjs'
  9. import { camelCase, find, remove } from 'lodash-es'
  10. export default {
  11. workerPool: null,
  12. maxWorkers: 1,
  13. activeWorkers: 0,
  14. pollingRef: null,
  15. scheduledRef: null,
  16. tasks: null,
  17. completionPromises: [],
  18. async init () {
  19. this.maxWorkers = WIKI.config.scheduler.workers === 'auto' ? (os.cpus().length - 1) : WIKI.config.scheduler.workers
  20. if (this.maxWorkers < 1) { this.maxWorkers = 1 }
  21. WIKI.logger.info(`Initializing Worker Pool (Limit: ${this.maxWorkers})...`)
  22. this.workerPool = new DynamicThreadPool(1, this.maxWorkers, path.join(WIKI.SERVERPATH, 'worker.mjs'), {
  23. errorHandler: (err) => WIKI.logger.warn(err),
  24. exitHandler: () => WIKI.logger.debug('A worker has gone offline.'),
  25. onlineHandler: () => WIKI.logger.debug('New worker is online.')
  26. })
  27. this.tasks = {}
  28. for (const f of (await fs.readdir(path.join(WIKI.SERVERPATH, 'tasks/simple')))) {
  29. const taskName = camelCase(f.replace('.mjs', ''))
  30. this.tasks[taskName] = (await import(path.join(WIKI.SERVERPATH, 'tasks/simple', f))).task
  31. }
  32. return this
  33. },
  34. async start () {
  35. WIKI.logger.info('Starting Scheduler...')
  36. // -> Add PostgreSQL Sub Channel
  37. WIKI.db.listener.addChannel('scheduler', async payload => {
  38. switch (payload.event) {
  39. case 'newJob': {
  40. if (this.activeWorkers < this.maxWorkers) {
  41. this.activeWorkers++
  42. await this.processJob()
  43. this.activeWorkers--
  44. }
  45. break
  46. }
  47. case 'jobCompleted': {
  48. const jobPromise = find(this.completionPromises, ['id', payload.id])
  49. if (jobPromise) {
  50. if (payload.state === 'success') {
  51. jobPromise.resolve()
  52. } else {
  53. jobPromise.reject(new Error(payload.errorMessage))
  54. }
  55. setTimeout(() => {
  56. remove(this.completionPromises, ['id', payload.id])
  57. })
  58. }
  59. break
  60. }
  61. }
  62. })
  63. // -> Start scheduled jobs check
  64. this.scheduledRef = setInterval(async () => {
  65. this.addScheduled()
  66. }, WIKI.config.scheduler.scheduledCheck * 1000)
  67. // -> Add scheduled jobs on init
  68. await this.addScheduled()
  69. // -> Start job polling
  70. this.pollingRef = setInterval(async () => {
  71. this.processJob()
  72. }, WIKI.config.scheduler.pollingCheck * 1000)
  73. WIKI.logger.info('Scheduler: [ STARTED ]')
  74. },
  75. /**
  76. * Add a job to the scheduler
  77. * @param {Object} opts - Job options
  78. * @param {string} opts.task - The task name to execute.
  79. * @param {Object} [opts.payload={}] - An optional data object to pass to the job.
  80. * @param {Date} [opts.waitUntil] - An optional datetime after which the task is allowed to run.
  81. * @param {Number} [opts.maxRetries] - The number of times this job can be restarted upon failure. Uses server defaults if not provided.
  82. * @param {Boolean} [opts.isScheduled=false] - Whether this is a scheduled job.
  83. * @param {Boolean} [opts.notify=true] - Whether to notify all instances that a new job is available.
  84. * @param {Boolean} [opts.promise=false] - Whether to return a promise property that resolves when the job completes.
  85. * @returns {Promise}
  86. */
  87. async addJob ({ task, payload = {}, waitUntil, maxRetries, isScheduled = false, notify = true, promise = false }) {
  88. try {
  89. const jobId = uuid()
  90. const jobDefer = createDeferred()
  91. if (promise) {
  92. this.completionPromises.push({
  93. id: jobId,
  94. added: DateTime.utc(),
  95. resolve: jobDefer.resolve,
  96. reject: jobDefer.reject
  97. })
  98. }
  99. await WIKI.db.knex('jobs')
  100. .insert({
  101. id: jobId,
  102. task,
  103. useWorker: !(typeof this.tasks[task] === 'function'),
  104. payload,
  105. maxRetries: maxRetries ?? WIKI.config.scheduler.maxRetries,
  106. isScheduled,
  107. waitUntil,
  108. createdBy: WIKI.INSTANCE_ID
  109. })
  110. if (notify) {
  111. WIKI.db.listener.publish('scheduler', {
  112. source: WIKI.INSTANCE_ID,
  113. event: 'newJob',
  114. id: jobId
  115. })
  116. }
  117. return {
  118. id: jobId,
  119. ...promise && { promise: jobDefer.promise }
  120. }
  121. } catch (err) {
  122. WIKI.logger.warn(`Failed to add job to scheduler: ${err.message}`)
  123. }
  124. },
  125. async processJob () {
  126. let jobIds = []
  127. try {
  128. const availableWorkers = this.maxWorkers - this.activeWorkers
  129. if (availableWorkers < 1) {
  130. WIKI.logger.debug('All workers are busy. Cannot process more jobs at the moment.')
  131. return
  132. }
  133. await WIKI.db.knex.transaction(async trx => {
  134. const jobs = await trx('jobs')
  135. .whereIn('id', WIKI.db.knex.raw(`(SELECT id FROM jobs WHERE ("waitUntil" IS NULL OR "waitUntil" <= NOW()) ORDER BY id FOR UPDATE SKIP LOCKED LIMIT ${availableWorkers})`))
  136. .returning('*')
  137. .del()
  138. if (jobs && jobs.length > 0) {
  139. for (const job of jobs) {
  140. WIKI.logger.info(`Processing new job ${job.id}: ${job.task}...`)
  141. // -> Add to Job History
  142. await WIKI.db.knex('jobHistory').insert({
  143. id: job.id,
  144. task: job.task,
  145. state: 'active',
  146. useWorker: job.useWorker,
  147. wasScheduled: job.isScheduled,
  148. payload: job.payload,
  149. attempt: job.retries + 1,
  150. maxRetries: job.maxRetries,
  151. executedBy: WIKI.INSTANCE_ID,
  152. createdAt: job.createdAt
  153. }).onConflict('id').merge({
  154. executedBy: WIKI.INSTANCE_ID,
  155. startedAt: new Date()
  156. })
  157. jobIds.push(job.id)
  158. // -> Start working on it
  159. try {
  160. if (job.useWorker) {
  161. await this.workerPool.execute({
  162. ...job,
  163. INSTANCE_ID: `${WIKI.INSTANCE_ID}:WKR`
  164. })
  165. } else {
  166. await this.tasks[job.task](job.payload)
  167. }
  168. // -> Update job history (success)
  169. await WIKI.db.knex('jobHistory').where({
  170. id: job.id
  171. }).update({
  172. state: 'completed',
  173. completedAt: new Date()
  174. })
  175. WIKI.logger.info(`Completed job ${job.id}: ${job.task}`)
  176. WIKI.db.listener.publish('scheduler', {
  177. source: WIKI.INSTANCE_ID,
  178. event: 'jobCompleted',
  179. state: 'success',
  180. id: job.id
  181. })
  182. } catch (err) {
  183. WIKI.logger.warn(`Failed to complete job ${job.id}: ${job.task} [ FAILED ]`)
  184. WIKI.logger.warn(err)
  185. // -> Update job history (fail)
  186. await WIKI.db.knex('jobHistory').where({
  187. id: job.id
  188. }).update({
  189. attempt: job.retries + 1,
  190. state: 'failed',
  191. lastErrorMessage: err.message
  192. })
  193. WIKI.db.listener.publish('scheduler', {
  194. source: WIKI.INSTANCE_ID,
  195. event: 'jobCompleted',
  196. state: 'failed',
  197. id: job.id,
  198. errorMessage: err.message
  199. })
  200. // -> Reschedule for retry
  201. if (job.retries < job.maxRetries) {
  202. const backoffDelay = (2 ** job.retries) * WIKI.config.scheduler.retryBackoff
  203. await trx('jobs').insert({
  204. ...job,
  205. retries: job.retries + 1,
  206. waitUntil: DateTime.utc().plus({ seconds: backoffDelay }).toJSDate(),
  207. updatedAt: new Date()
  208. })
  209. WIKI.logger.warn(`Rescheduling new attempt for job ${job.id}: ${job.task}...`)
  210. }
  211. }
  212. }
  213. }
  214. })
  215. } catch (err) {
  216. WIKI.logger.warn(err)
  217. if (jobIds && jobIds.length > 0) {
  218. WIKI.db.knex('jobHistory').whereIn('id', jobIds).update({
  219. state: 'interrupted',
  220. lastErrorMessage: err.message
  221. })
  222. }
  223. }
  224. },
  225. async addScheduled () {
  226. try {
  227. await WIKI.db.knex.transaction(async trx => {
  228. // -> Acquire lock
  229. const jobLock = await trx('jobLock')
  230. .where(
  231. 'key',
  232. WIKI.db.knex('jobLock')
  233. .select('key')
  234. .where('key', 'cron')
  235. .andWhere('lastCheckedAt', '<=', DateTime.utc().minus({ minutes: 5 }).toISO())
  236. .forUpdate()
  237. .skipLocked()
  238. .limit(1)
  239. ).update({
  240. lastCheckedBy: WIKI.INSTANCE_ID,
  241. lastCheckedAt: DateTime.utc().toISO()
  242. })
  243. if (jobLock > 0) {
  244. WIKI.logger.info(`Scheduling future planned jobs...`)
  245. const scheduledJobs = await WIKI.db.knex('jobSchedule')
  246. if (scheduledJobs?.length > 0) {
  247. // -> Get existing scheduled jobs
  248. const existingJobs = await WIKI.db.knex('jobs').where('isScheduled', true)
  249. let totalAdded = 0
  250. for (const job of scheduledJobs) {
  251. // -> Get next planned iterations
  252. const plannedIterations = cronparser.parseExpression(job.cron, {
  253. startDate: DateTime.utc().toJSDate(),
  254. endDate: DateTime.utc().plus({ days: 1, minutes: 5 }).toJSDate(),
  255. iterator: true,
  256. tz: 'UTC'
  257. })
  258. // -> Add a maximum of 10 future iterations for a single task
  259. let addedFutureJobs = 0
  260. while (true) {
  261. try {
  262. const next = plannedIterations.next()
  263. // -> Ensure this iteration isn't already scheduled
  264. if (!existingJobs.some(j => j.task === job.task && j.waitUntil.getTime() === next.value.getTime())) {
  265. this.addJob({
  266. task: job.task,
  267. useWorker: !(typeof this.tasks[job.task] === 'function'),
  268. payload: job.payload,
  269. isScheduled: true,
  270. waitUntil: next.value.toISOString(),
  271. notify: false
  272. })
  273. addedFutureJobs++
  274. totalAdded++
  275. }
  276. // -> No more iterations for this period or max iterations count reached
  277. if (next.done || addedFutureJobs >= 10) { break }
  278. } catch (err) {
  279. break
  280. }
  281. }
  282. }
  283. if (totalAdded > 0) {
  284. WIKI.logger.info(`Scheduled ${totalAdded} new future planned jobs: [ OK ]`)
  285. } else {
  286. WIKI.logger.info(`No new future planned jobs to schedule: [ OK ]`)
  287. }
  288. }
  289. }
  290. })
  291. } catch (err) {
  292. WIKI.logger.warn(err)
  293. }
  294. },
  295. async stop () {
  296. WIKI.logger.info('Stopping Scheduler...')
  297. clearInterval(this.scheduledRef)
  298. clearInterval(this.pollingRef)
  299. await this.workerPool.destroy()
  300. WIKI.logger.info('Scheduler: [ STOPPED ]')
  301. }
  302. }