123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859 |
- class SearchIndexBackend
- def self.info
- url = Setting.get('es_url').to_s
- return if url.blank?
- Rails.logger.info "# curl -X GET \"#{url}\""
- response = UserAgent.get(
- url,
- {},
- {
- json: true,
- open_timeout: 8,
- read_timeout: 12,
- user: Setting.get('es_user'),
- password: Setting.get('es_password'),
- }
- )
- Rails.logger.info "# #{response.code}"
- if response.success?
- installed_version = response.data.dig('version', 'number')
- raise "Unable to get elasticsearch version from response: #{response.inspect}" if installed_version.blank?
- version_supported = Gem::Version.new(installed_version) < Gem::Version.new('8')
- raise "Version #{installed_version} of configured elasticsearch is not supported." if !version_supported
- version_supported = Gem::Version.new(installed_version) > Gem::Version.new('2.3')
- raise "Version #{installed_version} of configured elasticsearch is not supported." if !version_supported
- return response.data
- end
- raise humanized_error(
- verb: 'GET',
- url: url,
- response: response,
- )
- end
- def self.processors(data)
- data.each do |key, items|
- url = "#{Setting.get('es_url')}/#{key}"
- items.each do |item|
- if item[:action] == 'delete'
- Rails.logger.info "# curl -X DELETE \"#{url}\""
- response = UserAgent.delete(
- url,
- {
- json: true,
- open_timeout: 8,
- read_timeout: 12,
- user: Setting.get('es_user'),
- password: Setting.get('es_password'),
- }
- )
- Rails.logger.info "# #{response.code}"
- next if response.success?
- next if response.code.to_s == '404'
- raise humanized_error(
- verb: 'DELETE',
- url: url,
- response: response,
- )
- end
- Rails.logger.info "# curl -X PUT \"#{url}\" \\"
- Rails.logger.debug { "-d '#{data.to_json}'" }
- item.delete(:action)
- response = UserAgent.put(
- url,
- item,
- {
- json: true,
- open_timeout: 8,
- read_timeout: 12,
- user: Setting.get('es_user'),
- password: Setting.get('es_password'),
- }
- )
- Rails.logger.info "# #{response.code}"
- next if response.success?
- raise humanized_error(
- verb: 'PUT',
- url: url,
- payload: item,
- response: response,
- )
- end
- end
- true
- end
- def self.index(data)
- url = build_url(data[:name], nil, false, false)
- return if url.blank?
- if data[:action] && data[:action] == 'delete'
- return SearchIndexBackend.remove(data[:name])
- end
- Rails.logger.info "# curl -X PUT \"#{url}\" \\"
- Rails.logger.debug { "-d '#{data[:data].to_json}'" }
-
-
-
-
-
-
- response = UserAgent.put(
- url,
- data[:data],
- {
- json: true,
- open_timeout: 8,
- read_timeout: 30,
- user: Setting.get('es_user'),
- password: Setting.get('es_password'),
- }
- )
- Rails.logger.info "# #{response.code}"
- return true if response.success?
- raise humanized_error(
- verb: 'PUT',
- url: url,
- payload: data[:data],
- response: response,
- )
- end
- def self.add(type, data)
- url = build_url(type, data['id'])
- return if url.blank?
- Rails.logger.info "# curl -X POST \"#{url}\" \\"
- Rails.logger.debug { "-d '#{data.to_json}'" }
- response = UserAgent.post(
- url,
- data,
- {
- json: true,
- open_timeout: 8,
- read_timeout: 16,
- user: Setting.get('es_user'),
- password: Setting.get('es_password'),
- }
- )
- Rails.logger.info "# #{response.code}"
- return true if response.success?
- raise humanized_error(
- verb: 'POST',
- url: url,
- payload: data,
- response: response,
- )
- end
- def self.remove(type, o_id = nil)
- url = build_url(type, o_id, false, false)
- return if url.blank?
- Rails.logger.info "# curl -X DELETE \"#{url}\""
- response = UserAgent.delete(
- url,
- {
- open_timeout: 8,
- read_timeout: 16,
- user: Setting.get('es_user'),
- password: Setting.get('es_password'),
- }
- )
- Rails.logger.info "# #{response.code}"
- return true if response.success?
- return true if response.code.to_s == '400'
- humanized_error = humanized_error(
- verb: 'DELETE',
- url: url,
- response: response,
- )
- Rails.logger.info "NOTICE: can't delete index: #{humanized_error}"
- false
- end
- def self.search(query, index, options = {})
- if !index.is_a? Array
- return search_by_index(query, index, options)
- end
- index
- .map { |local_index| search_by_index(query, local_index, options) }
- .compact
- .flatten(1)
- end
- def self.search_by_index(query, index, options = {})
- return [] if query.blank?
- url = build_url
- return if url.blank?
- url += build_search_url(index)
-
- condition = {
- 'query_string' => {
- 'query' => append_wildcard_to_simple_query(query),
- 'default_operator' => 'AND',
- 'analyze_wildcard' => true,
- }
- }
- if (fields = options.dig(:highlight_fields_by_indexes, index.to_sym))
- condition['query_string']['fields'] = fields
- end
- query_data = build_query(condition, options)
- if (fields = options.dig(:highlight_fields_by_indexes, index.to_sym))
- fields_for_highlight = fields.each_with_object({}) { |elem, memo| memo[elem] = {} }
- query_data[:highlight] = { fields: fields_for_highlight }
- end
- Rails.logger.info "# curl -X POST \"#{url}\" \\"
- Rails.logger.debug { " -d'#{query_data.to_json}'" }
- response = UserAgent.get(
- url,
- query_data,
- {
- json: true,
- open_timeout: 5,
- read_timeout: 14,
- user: Setting.get('es_user'),
- password: Setting.get('es_password'),
- }
- )
- Rails.logger.info "# #{response.code}"
- if !response.success?
- Rails.logger.error humanized_error(
- verb: 'GET',
- url: url,
- payload: query_data,
- response: response,
- )
- return []
- end
- data = response.data&.dig('hits', 'hits')
- return [] if !data
- data.map do |item|
- Rails.logger.info "... #{item['_type']} #{item['_id']}"
- output = {
- id: item['_id'],
- type: index,
- }
- if options.dig(:highlight_fields_by_indexes, index.to_sym)
- output[:highlight] = item['highlight']
- end
- output
- end
- end
- def self.search_by_index_sort(sort_by = nil, order_by = nil)
- result = []
- sort_by&.each_with_index do |value, index|
- next if value.blank?
- next if order_by&.at(index).blank?
-
- if value !~ /\./ && value !~ /_(time|date|till|id|ids|at)$/
- value += '.raw'
- end
- result.push(
- value => {
- order: order_by[index],
- },
- )
- end
- if result.blank?
- result.push(
- updated_at: {
- order: 'desc',
- },
- )
- end
- result.push('_score')
- result
- end
- def self.selectors(index, selectors = nil, options = {}, aggs_interval = nil)
- raise 'no selectors given' if !selectors
- url = build_url(nil, nil, false, false)
- return if url.blank?
- url += build_search_url(index)
- data = selector2query(selectors, options, aggs_interval)
- Rails.logger.info "# curl -X POST \"#{url}\" \\"
- Rails.logger.debug { " -d'#{data.to_json}'" }
- response = UserAgent.get(
- url,
- data,
- {
- json: true,
- open_timeout: 5,
- read_timeout: 14,
- user: Setting.get('es_user'),
- password: Setting.get('es_password'),
- }
- )
- Rails.logger.info "# #{response.code}"
- if !response.success?
- raise humanized_error(
- verb: 'GET',
- url: url,
- payload: data,
- response: response,
- )
- end
- Rails.logger.debug { response.data.to_json }
- if aggs_interval.blank? || aggs_interval[:interval].blank?
- ticket_ids = []
- response.data['hits']['hits'].each do |item|
- ticket_ids.push item['_id']
- end
- return {
- count: response.data['hits']['total'],
- ticket_ids: ticket_ids,
- }
- end
- response.data
- end
- DEFAULT_SELECTOR_OPTIONS = {
- limit: 10
- }.freeze
- def self.selector2query(selector, options, aggs_interval)
- options = DEFAULT_QUERY_OPTIONS.merge(options.deep_symbolize_keys)
- query_must = []
- query_must_not = []
- relative_map = {
- day: 'd',
- year: 'y',
- month: 'M',
- hour: 'h',
- minute: 'm',
- }
- if selector.present?
- selector.each do |key, data|
- key_tmp = key.sub(/^.+?\./, '')
- t = {}
-
- if data['value'].is_a?(Array)
- data['value'].each do |value|
- if value.is_a?(String) && value =~ /::/
- key_tmp += '.raw'
- break
- end
- end
- elsif data['value'].is_a?(String)
- if /::/.match?(data['value'])
- key_tmp += '.raw'
- end
- end
-
- if data['operator'] == 'is' || data['operator'] == 'is not' || data['operator'] == 'contains' || data['operator'] == 'contains not'
- if data['value'].is_a?(Array)
- t[:terms] = {}
- t[:terms][key_tmp] = data['value']
- else
- t[:term] = {}
- t[:term][key_tmp] = data['value']
- end
- if data['operator'] == 'is' || data['operator'] == 'contains'
- query_must.push t
- elsif data['operator'] == 'is not' || data['operator'] == 'contains not'
- query_must_not.push t
- end
- elsif data['operator'] == 'contains all' || data['operator'] == 'contains one' || data['operator'] == 'contains all not' || data['operator'] == 'contains one not'
- values = data['value'].split(',').map(&:strip)
- t[:query_string] = {}
- if data['operator'] == 'contains all'
- t[:query_string][:query] = "#{key_tmp}:\"#{values.join('" AND "')}\""
- query_must.push t
- elsif data['operator'] == 'contains one not'
- t[:query_string][:query] = "#{key_tmp}:\"#{values.join('" OR "')}\""
- query_must_not.push t
- elsif data['operator'] == 'contains one'
- t[:query_string][:query] = "#{key_tmp}:\"#{values.join('" OR "')}\""
- query_must.push t
- elsif data['operator'] == 'contains all not'
- t[:query_string][:query] = "#{key_tmp}:\"#{values.join('" AND "')}\""
- query_must_not.push t
- end
-
- elsif data['operator'] == 'within last (relative)' || data['operator'] == 'within next (relative)'
- range = relative_map[data['range'].to_sym]
- if range.blank?
- raise "Invalid relative_map for range '#{data['range']}'."
- end
- t[:range] = {}
- t[:range][key_tmp] = {}
- if data['operator'] == 'within last (relative)'
- t[:range][key_tmp][:gte] = "now-#{data['value']}#{range}"
- else
- t[:range][key_tmp][:lt] = "now+#{data['value']}#{range}"
- end
- query_must.push t
-
- elsif data['operator'] == 'before (relative)' || data['operator'] == 'after (relative)'
- range = relative_map[data['range'].to_sym]
- if range.blank?
- raise "Invalid relative_map for range '#{data['range']}'."
- end
- t[:range] = {}
- t[:range][key_tmp] = {}
- if data['operator'] == 'before (relative)'
- t[:range][key_tmp][:lt] = "now-#{data['value']}#{range}"
- else
- t[:range][key_tmp][:gt] = "now+#{data['value']}#{range}"
- end
- query_must.push t
-
- elsif data['operator'] == 'before (absolute)' || data['operator'] == 'after (absolute)'
- t[:range] = {}
- t[:range][key_tmp] = {}
- if data['operator'] == 'before (absolute)'
- t[:range][key_tmp][:lt] = (data['value'])
- else
- t[:range][key_tmp][:gt] = (data['value'])
- end
- query_must.push t
- else
- raise "unknown operator '#{data['operator']}' for #{key}"
- end
- end
- end
- data = {
- query: {},
- size: options[:limit],
- }
-
- if aggs_interval.present?
- if aggs_interval[:interval].present?
- data[:size] = 0
- data[:aggs] = {
- time_buckets: {
- date_histogram: {
- field: aggs_interval[:field],
- interval: aggs_interval[:interval],
- }
- }
- }
- if aggs_interval[:timezone].present?
- data[:aggs][:time_buckets][:date_histogram][:time_zone] = aggs_interval[:timezone]
- end
- end
- r = {}
- r[:range] = {}
- r[:range][aggs_interval[:field]] = {
- from: aggs_interval[:from],
- to: aggs_interval[:to],
- }
- query_must.push r
- end
- data[:query][:bool] ||= {}
- if query_must.present?
- data[:query][:bool][:must] = query_must
- end
- if query_must_not.present?
- data[:query][:bool][:must_not] = query_must_not
- end
-
- if aggs_interval.present? && aggs_interval[:field].present? && aggs_interval[:interval].blank?
- sort = []
- sort[0] = {}
- sort[0][aggs_interval[:field]] = {
- order: 'desc'
- }
- sort[1] = '_score'
- data['sort'] = sort
- end
- data
- end
- def self.enabled?
- return false if Setting.get('es_url').blank?
- true
- end
- def self.build_index_name(index)
- local_index = "#{Setting.get('es_index')}_#{Rails.env}"
- "#{local_index}_#{index.underscore.tr('/', '_')}"
- end
- def self.build_url(type = nil, o_id = nil, pipeline = true, with_type = true)
- return if !SearchIndexBackend.enabled?
-
- index = "#{Setting.get('es_index')}_#{Rails.env}"
- if Setting.get('es_multi_index') == false
- url = Setting.get('es_url')
- url = if type
- url_pipline = Setting.get('es_pipeline')
- if url_pipline.present?
- url_pipline = "?pipeline=#{url_pipline}"
- end
- if o_id
- "#{url}/#{index}/#{type}/#{o_id}#{url_pipline}"
- else
- "#{url}/#{index}/#{type}#{url_pipline}"
- end
- else
- "#{url}/#{index}"
- end
- return url
- end
-
- url = Setting.get('es_url')
- if pipeline == true
- url_pipline = Setting.get('es_pipeline')
- if url_pipline.present?
- url_pipline = "?pipeline=#{url_pipline}"
- end
- end
- if type
- index = build_index_name(type)
- if with_type == false
- return "#{url}/#{index}"
- end
- if o_id
- return "#{url}/#{index}/_doc/#{o_id}#{url_pipline}"
- end
- return "#{url}/#{index}/_doc#{url_pipline}"
- end
- "#{url}/"
- end
- def self.build_search_url(index)
-
- if Setting.get('es_multi_index') == false
- if index
- return "/#{index}/_search"
- end
- return '/_search'
- end
-
- "#{build_index_name(index)}/_doc/_search"
- end
- def self.humanized_error(verb:, url:, payload: nil, response:)
- prefix = "Unable to process #{verb} request to elasticsearch URL '#{url}'."
- suffix = "\n\nResponse:\n#{response.inspect}\n\nPayload:\n#{payload.inspect}"
- if payload.respond_to?(:to_json)
- suffix += "\n\nPayload size: #{payload.to_json.bytesize / 1024 / 1024}M"
- end
- message = if response&.error&.match?('Connection refused')
- "Elasticsearch is not reachable, probably because it's not running or even installed."
- elsif url.end_with?('pipeline/zammad-attachment', 'pipeline=zammad-attachment') && response.code == 400
- 'The installed attachment plugin could not handle the request payload. Ensure that the correct attachment plugin is installed (5.6 => ingest-attachment, 2.4 - 5.5 => mapper-attachments).'
- else
- 'Check the response and payload for detailed information: '
- end
- result = "#{prefix} #{message}#{suffix}"
- Rails.logger.error result.first(40_000)
- result
- end
-
- def self.append_wildcard_to_simple_query(query)
- query.strip!
- query += '*' if !query.match?(/:/)
- query
- end
- DEFAULT_QUERY_OPTIONS = {
- from: 0,
- limit: 10
- }.freeze
- def self.build_query(condition, options = {})
- options = DEFAULT_QUERY_OPTIONS.merge(options.deep_symbolize_keys)
- data = {
- from: options[:from],
- size: options[:limit],
- sort: search_by_index_sort(options[:sort_by], options[:order_by]),
- query: {
- bool: {
- must: []
- }
- }
- }
- if (extension = options.dig(:query_extension))
- data[:query].deep_merge! extension.deep_dup
- end
- data[:query][:bool][:must].push condition
- data
- end
- end
|