123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163 |
- # Copyright (C) 2012-2025 Zammad Foundation, https://zammad-foundation.org/
- module Import
- module OTRS
- extend Import::Helper
- extend Import::OTRS::ImportStats
- extend Import::OTRS::Async
- extend Import::OTRS::Diff
- extend self
- # Start import with specific parameters.
- # Useful for debug and continuing from breakpoint of last not success import
- #
- # @example
- # Import::OTRS::start() - Nomrmal usage
- #
- # Import::OTRS::start(thread: 1, offset: 1000) - Run the task in Single-Thread and start from offset 1000
- def start(args = {})
- log 'Start import...'
- checks
- prerequisites
- base_objects
- updateable_objects
- customer_user
- threaded_import('Ticket', args)
- true
- end
- def connection_test
- Import::OTRS::Requester.connection_test
- end
- private
- def checks
- check_import_mode
- check_system_init_done
- connection_test
- end
- def prerequisites
- # make sure to create store type otherwise
- # it might lead to race conditions while
- # creating it in different import threads
- Store::Object.create_if_not_exists(name: 'Ticket::Article')
- end
- def import(remote_object, args = {})
- log "loading #{remote_object}..."
- import_action(remote_object, args)
- end
- def threaded_import(remote_object, args = {})
- thread_count = args[:threads] || 8
- limit = args[:limit] || 20
- start_offset_base = args[:offset] || 0
- Thread.abort_on_exception = true
- threads = {}
- (1..thread_count).each do |thread|
- threads[thread] = Thread.new do
- # In some environments the Model.reset_column_information
- # is not reflected to threads. So an import error message appears.
- # Reset needed model column information for each thread.
- reset_database_information
- Thread.current[:thread_no] = thread
- Thread.current[:loop_count] = 0
- log "Importing #{remote_object} in steps of #{limit}"
- loop do
- # get the offset for the current thread and loop count
- thread_offset_base = (Thread.current[:thread_no] - 1) * limit
- thread_step = thread_count * limit
- offset = (Thread.current[:loop_count] * thread_step) + thread_offset_base + start_offset_base
- break if !imported?(
- remote_object: remote_object,
- limit: limit,
- offset: offset,
- diff: args[:diff]
- )
- Thread.current[:loop_count] += 1
- end
- ActiveRecord::Base.connection.close
- end
- end
- (1..thread_count).each do |thread| # rubocop:disable Style/CombinableLoops
- threads[thread].join
- end
- end
- def limit_import(remote_object, args = {})
- offset = 0
- limit = args[:limit] || 20
- log "Importing #{remote_object} in steps of #{limit}"
- loop do
- break if !imported?(
- remote_object: remote_object,
- limit: limit,
- offset: offset,
- diff: args[:diff]
- )
- offset += limit
- end
- end
- def imported?(args)
- log "loading #{args[:limit]} #{args[:remote_object]} starting at #{args[:offset]}..."
- return false if !import_action(args[:remote_object], limit: args[:limit], offset: args[:offset], diff: args[:diff])
- true
- end
- def import_action(remote_object, args = {})
- records = Import::OTRS::Requester.load(remote_object, limit: args[:limit], offset: args[:offset], diff: args[:diff])
- if records.blank?
- log '... no more work.'
- return false
- end
- factory_class(remote_object).import(records)
- end
- def factory_class(object)
- "Import::OTRS::#{object}Factory".constantize
- end
- # sync settings
- def base_objects
- import('SysConfig')
- import('DynamicField')
- end
- def updateable_objects
- import('State')
- import('Priority')
- import('Queue')
- import('User')
- import('Customer')
- end
- def customer_user
- limit_import('CustomerUser', limit: 50)
- end
- def reset_database_information
- ::Ticket.reset_column_information
- end
- end
- end
|