attachment_factory.rb 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111
  1. module Import
  2. module OTRS
  3. class Article
  4. module AttachmentFactory
  5. extend Import::Helper
  6. extend self
  7. def import(args)
  8. attachments = args[:attachments] || []
  9. local_article = args[:local_article]
  10. return if skip_import?(attachments, local_article)
  11. perform_import(attachments, local_article)
  12. end
  13. private
  14. def perform_import(attachments, local_article)
  15. attachments.each { |attachment| import_single(local_article, attachment) }
  16. end
  17. def import_single(local_article, attachment)
  18. decoded_filename = Base64.decode64(attachment['Filename'])
  19. decoded_content = Base64.decode64(attachment['Content'])
  20. # TODO: should be done by a/the Storage object
  21. # to handle fingerprinting
  22. sha = Digest::SHA256.hexdigest(decoded_content)
  23. retries = 3
  24. begin
  25. queueing(sha, decoded_filename)
  26. log "Ticket #{local_article.ticket_id}, Article #{local_article.id} - Starting import for fingerprint #{sha} (#{decoded_filename})... Queue: #{@sha_queue[sha]}."
  27. ActiveRecord::Base.transaction do
  28. Store.add(
  29. object: 'Ticket::Article',
  30. o_id: local_article.id,
  31. filename: decoded_filename.force_encoding('utf-8'),
  32. data: decoded_content,
  33. preferences: {
  34. 'Mime-Type' => attachment['ContentType'],
  35. 'Content-ID' => attachment['ContentID'],
  36. 'content-alternative' => attachment['ContentAlternative'],
  37. },
  38. created_by_id: 1,
  39. )
  40. end
  41. log "Ticket #{local_article.ticket_id}, Article #{local_article.id} - Finished import for fingerprint #{sha} (#{decoded_filename})... Queue: #{@sha_queue[sha]}."
  42. rescue ActiveRecord::RecordNotUnique, ActiveRecord::StatementInvalid => e
  43. log "Ticket #{local_article.ticket_id} - #{sha} - #{e.class}: #{e}"
  44. sleep rand 3
  45. retry if !(retries -= 1).zero?
  46. raise
  47. rescue => e
  48. log "Ticket #{local_article.ticket_id} - #{sha} - #{e}: #{attachment.inspect}"
  49. raise
  50. ensure
  51. queue_cleanup(sha)
  52. end
  53. end
  54. def skip_import?(attachments, local_article)
  55. local_attachments = local_article.attachments
  56. return true if local_attachments.count == attachments.count
  57. # get a common ground
  58. local_attachments.each(&:delete)
  59. return true if attachments.blank?
  60. false
  61. end
  62. def queueing(sha, decoded_filename)
  63. # this is (currently) needed for avoiding
  64. # race conditions inserting attachments with
  65. # the same fingerprint in the DB in concurrent threads
  66. @sha_queue ||= {}
  67. @sha_queue[sha] ||= []
  68. return if !queueing_active?
  69. @sha_queue[sha].push(queue_id)
  70. while @sha_queue[sha].first != queue_id
  71. sleep_time = 0.25
  72. log "Found active import for fingerprint #{sha} (#{decoded_filename})... sleeping #{sleep_time} seconds. Queue: #{@sha_queue[sha]}."
  73. sleep sleep_time
  74. end
  75. end
  76. def queue_cleanup(sha)
  77. return if !queueing_active?
  78. @sha_queue[sha].shift
  79. end
  80. def queueing_active?
  81. return if !queue_id
  82. true
  83. end
  84. def queue_id
  85. Thread.current[:thread_no]
  86. end
  87. end
  88. end
  89. end
  90. end