otrs.rb 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164
  1. # Copyright (C) 2012-2025 Zammad Foundation, https://zammad-foundation.org/
  2. module Import
  3. module OTRS
  4. extend Import::Helper
  5. extend Import::OTRS::ImportStats
  6. extend Import::OTRS::Async
  7. extend Import::OTRS::Diff
  8. extend self
  9. # Start import with specific parameters.
  10. # Useful for debug and continuing from breakpoint of last not success import
  11. #
  12. # @example
  13. # Import::OTRS::start() - Nomrmal usage
  14. #
  15. # Import::OTRS::start(thread: 1, offset: 1000) - Run the task in Single-Thread and start from offset 1000
  16. def start(args = {})
  17. log 'Start import...'
  18. checks
  19. prerequisites
  20. base_objects
  21. updateable_objects
  22. customer_user
  23. threaded_import('Ticket', args)
  24. true
  25. end
  26. def connection_test
  27. Import::OTRS::Requester.connection_test
  28. end
  29. private
  30. def checks
  31. check_import_mode
  32. check_system_init_done
  33. connection_test
  34. end
  35. def prerequisites
  36. # make sure to create store type otherwise
  37. # it might lead to race conditions while
  38. # creating it in different import threads
  39. Store::Object.create_if_not_exists(name: 'Ticket::Article')
  40. end
  41. def import(remote_object, args = {})
  42. log "loading #{remote_object}..."
  43. import_action(remote_object, args)
  44. end
  45. def threaded_import(remote_object, args = {})
  46. thread_count = args[:threads] || 8
  47. limit = args[:limit] || 20
  48. start_offset_base = args[:offset] || 0
  49. Thread.abort_on_exception = true
  50. threads = {}
  51. (1..thread_count).each do |thread|
  52. threads[thread] = Thread.new do
  53. Thread.current.name = "OTRS import thread #{thread}"
  54. # In some environments the Model.reset_column_information
  55. # is not reflected to threads. So an import error message appears.
  56. # Reset needed model column information for each thread.
  57. reset_database_information
  58. Thread.current[:thread_no] = thread
  59. Thread.current[:loop_count] = 0
  60. log "Importing #{remote_object} in steps of #{limit}"
  61. loop do
  62. # get the offset for the current thread and loop count
  63. thread_offset_base = (Thread.current[:thread_no] - 1) * limit
  64. thread_step = thread_count * limit
  65. offset = (Thread.current[:loop_count] * thread_step) + thread_offset_base + start_offset_base
  66. break if !imported?(
  67. remote_object: remote_object,
  68. limit: limit,
  69. offset: offset,
  70. diff: args[:diff]
  71. )
  72. Thread.current[:loop_count] += 1
  73. end
  74. ActiveRecord::Base.connection.close
  75. end
  76. end
  77. (1..thread_count).each do |thread| # rubocop:disable Style/CombinableLoops
  78. threads[thread].join
  79. end
  80. end
  81. def limit_import(remote_object, args = {})
  82. offset = 0
  83. limit = args[:limit] || 20
  84. log "Importing #{remote_object} in steps of #{limit}"
  85. loop do
  86. break if !imported?(
  87. remote_object: remote_object,
  88. limit: limit,
  89. offset: offset,
  90. diff: args[:diff]
  91. )
  92. offset += limit
  93. end
  94. end
  95. def imported?(args)
  96. log "loading #{args[:limit]} #{args[:remote_object]} starting at #{args[:offset]}..."
  97. return false if !import_action(args[:remote_object], limit: args[:limit], offset: args[:offset], diff: args[:diff])
  98. true
  99. end
  100. def import_action(remote_object, args = {})
  101. records = Import::OTRS::Requester.load(remote_object, limit: args[:limit], offset: args[:offset], diff: args[:diff])
  102. if records.blank?
  103. log '... no more work.'
  104. return false
  105. end
  106. factory_class(remote_object).import(records)
  107. end
  108. def factory_class(object)
  109. "Import::OTRS::#{object}Factory".constantize
  110. end
  111. # sync settings
  112. def base_objects
  113. import('SysConfig')
  114. import('DynamicField')
  115. end
  116. def updateable_objects
  117. import('State')
  118. import('Priority')
  119. import('Queue')
  120. import('User')
  121. import('Customer')
  122. end
  123. def customer_user
  124. limit_import('CustomerUser', limit: 50)
  125. end
  126. def reset_database_information
  127. ::Ticket.reset_column_information
  128. end
  129. end
  130. end