engine.js 9.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353
  1. const _ = require('lodash')
  2. const stream = require('stream')
  3. const Promise = require('bluebird')
  4. const pipeline = Promise.promisify(stream.pipeline)
  5. /* global WIKI */
  6. module.exports = {
  7. async activate() {
  8. // not used
  9. },
  10. async deactivate() {
  11. // not used
  12. },
  13. /**
  14. * INIT
  15. */
  16. async init() {
  17. WIKI.logger.info(`(SEARCH/ELASTICSEARCH) Initializing...`)
  18. switch (this.config.apiVersion) {
  19. case '7.x':
  20. const { Client: Client7 } = require('elasticsearch7')
  21. this.client = new Client7({
  22. nodes: this.config.hosts.split(',').map(_.trim),
  23. sniffOnStart: this.config.sniffOnStart,
  24. sniffInterval: (this.config.sniffInterval > 0) ? this.config.sniffInterval : false,
  25. name: 'wiki-js'
  26. })
  27. break
  28. case '6.x':
  29. const { Client: Client6 } = require('elasticsearch6')
  30. this.client = new Client6({
  31. nodes: this.config.hosts.split(',').map(_.trim),
  32. sniffOnStart: this.config.sniffOnStart,
  33. sniffInterval: (this.config.sniffInterval > 0) ? this.config.sniffInterval : false,
  34. name: 'wiki-js'
  35. })
  36. break
  37. default:
  38. throw new Error('Unsupported version of elasticsearch! Update your settings in the Administration Area.')
  39. }
  40. // -> Create Search Index
  41. await this.createIndex()
  42. WIKI.logger.info(`(SEARCH/ELASTICSEARCH) Initialization completed.`)
  43. },
  44. /**
  45. * Create Index
  46. */
  47. async createIndex() {
  48. try {
  49. const indexExists = await this.client.indices.exists({ index: this.config.indexName })
  50. if (!indexExists.body) {
  51. WIKI.logger.info(`(SEARCH/ELASTICSEARCH) Creating index...`)
  52. try {
  53. const idxBody = {
  54. properties: {
  55. suggest: { type: 'completion' },
  56. title: { type: 'text', boost: 10.0 },
  57. description: { type: 'text', boost: 3.0 },
  58. content: { type: 'text', boost: 1.0 },
  59. locale: { type: 'keyword' },
  60. path: { type: 'text' },
  61. tags: { type: 'text', boost: 8.0 }
  62. }
  63. }
  64. await this.client.indices.create({
  65. index: this.config.indexName,
  66. body: {
  67. mappings: (this.config.apiVersion === '6.x') ? {
  68. _doc: idxBody
  69. } : idxBody,
  70. settings: {
  71. analysis: {
  72. analyzer: {
  73. default: {
  74. type: this.config.analyzer
  75. }
  76. }
  77. }
  78. }
  79. }
  80. })
  81. } catch (err) {
  82. WIKI.logger.error(`(SEARCH/ELASTICSEARCH) Create Index Error: `, _.get(err, 'meta.body.error', err))
  83. }
  84. }
  85. } catch (err) {
  86. WIKI.logger.error(`(SEARCH/ELASTICSEARCH) Index Check Error: `, _.get(err, 'meta.body.error', err))
  87. }
  88. },
  89. /**
  90. * QUERY
  91. *
  92. * @param {String} q Query
  93. * @param {Object} opts Additional options
  94. */
  95. async query(q, opts) {
  96. try {
  97. const results = await this.client.search({
  98. index: this.config.indexName,
  99. body: {
  100. query: {
  101. simple_query_string: {
  102. query: `*${q}*`,
  103. fields: ['title^20', 'description^3', 'tags^8', 'content^1'],
  104. default_operator: 'and',
  105. analyze_wildcard: true
  106. }
  107. },
  108. from: 0,
  109. size: 50,
  110. _source: ['title', 'description', 'path', 'locale'],
  111. suggest: {
  112. suggestions: {
  113. text: q,
  114. completion: {
  115. field: 'suggest',
  116. size: 5,
  117. skip_duplicates: true,
  118. fuzzy: true
  119. }
  120. }
  121. }
  122. }
  123. })
  124. return {
  125. results: _.get(results, 'body.hits.hits', []).map(r => ({
  126. id: r._id,
  127. locale: r._source.locale,
  128. path: r._source.path,
  129. title: r._source.title,
  130. description: r._source.description
  131. })),
  132. suggestions: _.reject(_.get(results, 'suggest.suggestions', []).map(s => _.get(s, 'options[0].text', false)), s => !s),
  133. totalHits: _.get(results, 'body.hits.total.value', _.get(results, 'body.hits.total', 0))
  134. }
  135. } catch (err) {
  136. WIKI.logger.warn('Search Engine Error: ', _.get(err, 'meta.body.error', err))
  137. }
  138. },
  139. /**
  140. * Build tags field
  141. * @param id
  142. * @returns {Promise<*|*[]>}
  143. */
  144. async buildTags(id) {
  145. const tags = await WIKI.models.pages.query().findById(id).select('*').withGraphJoined('tags')
  146. return (tags.tags && tags.tags.length > 0) ? tags.tags.map(function (tag) {
  147. return tag.title
  148. }) : []
  149. },
  150. /**
  151. * Build suggest field
  152. */
  153. buildSuggest(page) {
  154. return _.reject(_.uniq(_.concat(
  155. page.title.split(' ').map(s => ({
  156. input: s,
  157. weight: 10
  158. })),
  159. page.description.split(' ').map(s => ({
  160. input: s,
  161. weight: 3
  162. })),
  163. page.safeContent.split(' ').map(s => ({
  164. input: s,
  165. weight: 1
  166. }))
  167. )), ['input', ''])
  168. },
  169. /**
  170. * CREATE
  171. *
  172. * @param {Object} page Page to create
  173. */
  174. async created(page) {
  175. await this.client.index({
  176. index: this.config.indexName,
  177. type: '_doc',
  178. id: page.hash,
  179. body: {
  180. suggest: this.buildSuggest(page),
  181. locale: page.localeCode,
  182. path: page.path,
  183. title: page.title,
  184. description: page.description,
  185. content: page.safeContent,
  186. tags: await this.buildTags(page.id)
  187. },
  188. refresh: true
  189. })
  190. },
  191. /**
  192. * UPDATE
  193. *
  194. * @param {Object} page Page to update
  195. */
  196. async updated(page) {
  197. await this.client.index({
  198. index: this.config.indexName,
  199. type: '_doc',
  200. id: page.hash,
  201. body: {
  202. suggest: this.buildSuggest(page),
  203. locale: page.localeCode,
  204. path: page.path,
  205. title: page.title,
  206. description: page.description,
  207. content: page.safeContent,
  208. tags: await this.buildTags(page.id)
  209. },
  210. refresh: true
  211. })
  212. },
  213. /**
  214. * DELETE
  215. *
  216. * @param {Object} page Page to delete
  217. */
  218. async deleted(page) {
  219. await this.client.delete({
  220. index: this.config.indexName,
  221. type: '_doc',
  222. id: page.hash,
  223. refresh: true
  224. })
  225. },
  226. /**
  227. * RENAME
  228. *
  229. * @param {Object} page Page to rename
  230. */
  231. async renamed(page) {
  232. await this.client.delete({
  233. index: this.config.indexName,
  234. type: '_doc',
  235. id: page.hash,
  236. refresh: true
  237. })
  238. await this.client.index({
  239. index: this.config.indexName,
  240. type: '_doc',
  241. id: page.destinationHash,
  242. body: {
  243. suggest: this.buildSuggest(page),
  244. locale: page.destinationLocaleCode,
  245. path: page.destinationPath,
  246. title: page.title,
  247. description: page.description,
  248. content: page.safeContent,
  249. tags: await this.buildTags(page.id)
  250. },
  251. refresh: true
  252. })
  253. },
  254. /**
  255. * REBUILD INDEX
  256. */
  257. async rebuild() {
  258. WIKI.logger.info(`(SEARCH/ELASTICSEARCH) Rebuilding Index...`)
  259. await this.client.indices.delete({ index: this.config.indexName })
  260. await this.createIndex()
  261. const MAX_INDEXING_BYTES = 10 * Math.pow(2, 20) - Buffer.from('[').byteLength - Buffer.from(']').byteLength // 10 MB
  262. const MAX_INDEXING_COUNT = 1000
  263. const COMMA_BYTES = Buffer.from(',').byteLength
  264. let chunks = []
  265. let bytes = 0
  266. const processDocument = async (cb, doc) => {
  267. try {
  268. if (doc) {
  269. const docBytes = Buffer.from(JSON.stringify(doc)).byteLength
  270. doc['tags'] = await this.buildTags(doc.realId)
  271. // -> Current batch exceeds size limit, flush
  272. if (docBytes + COMMA_BYTES + bytes >= MAX_INDEXING_BYTES) {
  273. await flushBuffer()
  274. }
  275. if (chunks.length > 0) {
  276. bytes += COMMA_BYTES
  277. }
  278. bytes += docBytes
  279. chunks.push(doc)
  280. // -> Current batch exceeds count limit, flush
  281. if (chunks.length >= MAX_INDEXING_COUNT) {
  282. await flushBuffer()
  283. }
  284. } else {
  285. // -> End of stream, flush
  286. await flushBuffer()
  287. }
  288. cb()
  289. } catch (err) {
  290. cb(err)
  291. }
  292. }
  293. const flushBuffer = async () => {
  294. WIKI.logger.info(`(SEARCH/ELASTICSEARCH) Sending batch of ${chunks.length}...`)
  295. try {
  296. await this.client.bulk({
  297. index: this.config.indexName,
  298. body: _.reduce(chunks, (result, doc) => {
  299. result.push({
  300. index: {
  301. _index: this.config.indexName,
  302. _type: '_doc',
  303. _id: doc.id
  304. }
  305. })
  306. doc.safeContent = WIKI.models.pages.cleanHTML(doc.render)
  307. result.push({
  308. suggest: this.buildSuggest(doc),
  309. tags: doc.tags,
  310. locale: doc.locale,
  311. path: doc.path,
  312. title: doc.title,
  313. description: doc.description,
  314. content: doc.safeContent
  315. })
  316. return result
  317. }, []),
  318. refresh: true
  319. })
  320. } catch (err) {
  321. WIKI.logger.warn('(SEARCH/ELASTICSEARCH) Failed to send batch to elasticsearch: ', err)
  322. }
  323. chunks.length = 0
  324. bytes = 0
  325. }
  326. // Added real id in order to fetch page tags from the query
  327. await pipeline(
  328. WIKI.models.knex.column({ id: 'hash' }, 'path', { locale: 'localeCode' }, 'title', 'description', 'render', { realId: 'id' }).select().from('pages').where({
  329. isPublished: true,
  330. isPrivate: false
  331. }).stream(),
  332. new stream.Transform({
  333. objectMode: true,
  334. transform: async (chunk, enc, cb) => processDocument(cb, chunk),
  335. flush: async (cb) => processDocument(cb)
  336. })
  337. )
  338. WIKI.logger.info(`(SEARCH/ELASTICSEARCH) Index rebuilt successfully.`)
  339. }
  340. }