attachment_factory.rb 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107
  1. module Import
  2. module OTRS
  3. class Article
  4. module AttachmentFactory
  5. extend Import::Helper
  6. # rubocop:disable Style/ModuleFunction
  7. extend self
  8. def import(args)
  9. attachments = args[:attachments] || []
  10. local_article = args[:local_article]
  11. return if skip_import?(attachments, local_article)
  12. perform_import(attachments, local_article)
  13. end
  14. private
  15. def perform_import(attachments, local_article)
  16. attachments.each { |attachment| import_single(local_article, attachment) }
  17. end
  18. def import_single(local_article, attachment)
  19. decoded_filename = Base64.decode64(attachment['Filename'])
  20. decoded_content = Base64.decode64(attachment['Content'])
  21. # TODO: should be done by a/the Storage object
  22. # to handle fingerprinting
  23. sha = Digest::SHA256.hexdigest(decoded_content)
  24. retries = 3
  25. begin
  26. queueing(sha, decoded_filename)
  27. log "Ticket #{local_article.ticket_id}, Article #{local_article.id} - Starting import for fingerprint #{sha} (#{decoded_filename})... Queue: #{@sha_queue[sha]}."
  28. ActiveRecord::Base.transaction do
  29. Store.add(
  30. object: 'Ticket::Article',
  31. o_id: local_article.id,
  32. filename: decoded_filename.force_encoding('utf-8'),
  33. data: decoded_content,
  34. preferences: {
  35. 'Mime-Type' => attachment['ContentType'],
  36. 'Content-ID' => attachment['ContentID'],
  37. 'content-alternative' => attachment['ContentAlternative'],
  38. },
  39. created_by_id: 1,
  40. )
  41. end
  42. log "Ticket #{local_article.ticket_id}, Article #{local_article.id} - Finished import for fingerprint #{sha} (#{decoded_filename})... Queue: #{@sha_queue[sha]}."
  43. rescue ActiveRecord::RecordNotUnique, ActiveRecord::StatementInvalid => e
  44. log "Ticket #{local_article.ticket_id} - #{sha} - #{e.class}: #{e}"
  45. sleep rand 3
  46. retry if !(retries -= 1).zero?
  47. raise
  48. rescue => e
  49. log "Ticket #{local_article.ticket_id} - #{sha} - #{e}: #{attachment.inspect}"
  50. raise
  51. ensure
  52. queue_cleanup(sha)
  53. end
  54. end
  55. def skip_import?(attachments, local_article)
  56. local_attachments = local_article.attachments
  57. return true if local_attachments.count == attachments.count
  58. # get a common ground
  59. local_attachments.each(&:delete)
  60. return true if attachments.blank?
  61. false
  62. end
  63. def queueing(sha, decoded_filename)
  64. # this is (currently) needed for avoiding
  65. # race conditions inserting attachments with
  66. # the same fingerprint in the DB in concurrent threads
  67. @sha_queue ||= {}
  68. @sha_queue[sha] ||= []
  69. return if !queueing_active?
  70. @sha_queue[sha].push(queue_id)
  71. while @sha_queue[sha].first != queue_id
  72. sleep_time = 0.25
  73. log "Found active import for fingerprint #{sha} (#{decoded_filename})... sleeping #{sleep_time} seconds. Queue: #{@sha_queue[sha]}."
  74. sleep sleep_time
  75. end
  76. end
  77. def queue_cleanup(sha)
  78. return if !queueing_active?
  79. @sha_queue[sha].shift
  80. end
  81. def queueing_active?
  82. return if !queue_id
  83. true
  84. end
  85. def queue_id
  86. Thread.current[:thread_no]
  87. end
  88. end
  89. end
  90. end
  91. end