twitter.rb 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422
  1. # Copyright (C) 2012-2015 Zammad Foundation, http://zammad-foundation.org/
  2. class Channel::Driver::Twitter
  3. =begin
  4. fetch tweets from twitter account
  5. options = {
  6. adapter: 'twitter',
  7. auth: {
  8. consumer_key: consumer_key,
  9. consumer_secret: consumer_secret,
  10. oauth_token: armin_theo_token,
  11. oauth_token_secret: armin_theo_token_secret,
  12. },
  13. sync: {
  14. search: [
  15. {
  16. term: '#citheo42',
  17. group_id: 2,
  18. },
  19. {
  20. term: '#citheo24',
  21. group_id: 1,
  22. },
  23. ],
  24. mentions: {
  25. group_id: 2,
  26. },
  27. direct_messages: {
  28. group_id: 2,
  29. }
  30. }
  31. }
  32. instance = Channel::Driver::Twitter.new
  33. result = instance.fetch(options, channel)
  34. returns
  35. {
  36. result: 'ok',
  37. }
  38. =end
  39. def fetch(options, channel)
  40. options = check_external_credential(options)
  41. @rest_client = TweetRest.new(options[:auth])
  42. @sync = options[:sync]
  43. @channel = channel
  44. Rails.logger.debug 'twitter fetch started'
  45. fetch_mentions
  46. fetch_search
  47. fetch_direct_messages
  48. disconnect
  49. Rails.logger.debug 'twitter fetch completed'
  50. {
  51. result: 'ok',
  52. notice: '',
  53. }
  54. end
  55. =begin
  56. instance = Channel::Driver::Twitter.new
  57. instance.fetchable?(channel)
  58. =end
  59. def fetchable?(channel)
  60. return true if Rails.env.test?
  61. # only fetch once in 30 minutes
  62. return true if !channel.preferences
  63. return true if !channel.preferences[:last_fetch]
  64. return false if channel.preferences[:last_fetch] > Time.zone.now - 20.minutes
  65. true
  66. end
  67. =begin
  68. instance = Channel::Driver::Twitter.new
  69. instance.send(
  70. {
  71. adapter: 'twitter',
  72. auth: {
  73. consumer_key: consumer_key,
  74. consumer_secret: consumer_secret,
  75. oauth_token: armin_theo_token,
  76. oauth_token_secret: armin_theo_token_secret,
  77. },
  78. },
  79. twitter_attributes,
  80. notification
  81. )
  82. =end
  83. def send(options, article, _notification = false)
  84. # return if we run import mode
  85. return if Setting.get('import_mode')
  86. options = check_external_credential(options)
  87. @rest_client = TweetRest.new(options[:auth])
  88. tweet = @rest_client.from_article(article)
  89. disconnect
  90. tweet
  91. end
  92. def disconnect
  93. @stream_client.disconnect if @stream_client
  94. @rest_client.disconnect if @rest_client
  95. end
  96. =begin
  97. Channel::Driver::Twitter.streamable?
  98. returns
  99. true|false
  100. =end
  101. def self.streamable?
  102. true
  103. end
  104. =begin
  105. create stream endpoint form twitter account
  106. options = {
  107. adapter: 'twitter',
  108. auth: {
  109. consumer_key: consumer_key,
  110. consumer_secret: consumer_secret,
  111. oauth_token: armin_theo_token,
  112. oauth_token_secret: armin_theo_token_secret,
  113. },
  114. sync: {
  115. search: [
  116. {
  117. term: '#citheo42',
  118. group_id: 2,
  119. },
  120. {
  121. term: '#citheo24',
  122. group_id: 1,
  123. },
  124. ],
  125. mentions: {
  126. group_id: 2,
  127. },
  128. direct_messages: {
  129. group_id: 2,
  130. }
  131. }
  132. }
  133. instance = Channel::Driver::Twitter.new
  134. stream_instance = instance.stream_instance(channel)
  135. returns
  136. instance_of_stream_handle
  137. =end
  138. def stream_instance(channel)
  139. @channel = channel
  140. options = @channel.options
  141. @stream_client = TweetStream.new(options[:auth])
  142. end
  143. =begin
  144. stream tweets from twitter account
  145. instance.stream
  146. returns
  147. # endless loop
  148. =end
  149. def stream
  150. sleep_on_unauthorized = 65
  151. 2.times do |loop_count|
  152. begin
  153. stream_start
  154. rescue Twitter::Error::Unauthorized => e
  155. Rails.logger.info "Unable to stream, try #{loop_count}, error #{e.inspect}"
  156. if loop_count < 2
  157. Rails.logger.info "wait for #{sleep_on_unauthorized} sec. and try it again"
  158. sleep sleep_on_unauthorized
  159. else
  160. raise "Unable to stream, try #{loop_count}, error #{e.inspect}"
  161. end
  162. end
  163. end
  164. end
  165. def stream_start
  166. sync = @channel.options['sync']
  167. raise 'Need channel.options[\'sync\'] for account, but no params found' if !sync
  168. filter = {}
  169. if sync['search']
  170. hashtags = []
  171. sync['search'].each do |item|
  172. next if item['term'].blank?
  173. next if item['term'] == '#'
  174. next if item['group_id'].blank?
  175. hashtags.push item['term']
  176. end
  177. filter[:track] = hashtags.join(',')
  178. end
  179. if sync['mentions'] && sync['mentions']['group_id'] != ''
  180. filter[:replies] = 'all'
  181. end
  182. return if filter.empty?
  183. @stream_client.client.user(filter) do |tweet|
  184. next if tweet.class != Twitter::Tweet && tweet.class != Twitter::DirectMessage
  185. # wait until own posts are stored in local database to prevent importing own tweets
  186. next if @stream_client.locale_sender?(tweet) && own_tweet_already_imported?(tweet)
  187. next if Ticket::Article.find_by(message_id: tweet.id)
  188. # check direct message
  189. if tweet.class == Twitter::DirectMessage
  190. if sync['direct_messages'] && sync['direct_messages']['group_id'] != ''
  191. next if @stream_client.direct_message_limit_reached(tweet, 2)
  192. @stream_client.to_group(tweet, sync['direct_messages']['group_id'], @channel)
  193. end
  194. next
  195. end
  196. next if !track_retweets? && tweet.retweet?
  197. next if @stream_client.tweet_limit_reached(tweet, 2)
  198. # check if it's mention
  199. if sync['mentions'] && sync['mentions']['group_id'].present?
  200. hit = false
  201. if tweet.user_mentions
  202. tweet.user_mentions.each do |user|
  203. if user.id.to_s == @channel.options['user']['id'].to_s
  204. hit = true
  205. end
  206. end
  207. end
  208. if hit
  209. @stream_client.to_group(tweet, sync['mentions']['group_id'], @channel)
  210. next
  211. end
  212. end
  213. # check hashtags
  214. if sync['search'] && tweet.hashtags
  215. hit = false
  216. sync['search'].each do |item|
  217. next if item['term'].blank?
  218. next if item['term'] == '#'
  219. next if item['group_id'].blank?
  220. tweet.hashtags.each do |hashtag|
  221. next if item['term'] !~ /^#/
  222. if item['term'].sub(/^#/, '') == hashtag.text
  223. hit = item
  224. end
  225. end
  226. end
  227. if hit
  228. @stream_client.to_group(tweet, hit['group_id'], @channel)
  229. next
  230. end
  231. end
  232. # check stings
  233. if sync['search']
  234. hit = false
  235. body = tweet.text
  236. sync['search'].each do |item|
  237. next if item['term'].blank?
  238. next if item['term'] == '#'
  239. next if item['group_id'].blank?
  240. if body =~ /#{item['term']}/
  241. hit = item
  242. end
  243. end
  244. if hit
  245. @stream_client.to_group(tweet, hit['group_id'], @channel)
  246. end
  247. end
  248. end
  249. end
  250. private
  251. def fetch_search
  252. return if @sync[:search].blank?
  253. @sync[:search].each do |search|
  254. next if search[:term].blank?
  255. next if search[:term] == '#'
  256. next if search[:group_id].blank?
  257. result_type = search[:type] || 'mixed'
  258. Rails.logger.debug " - searching for '#{search[:term]}'"
  259. older_import = 0
  260. older_import_max = 20
  261. @rest_client.client.search(search[:term], result_type: result_type).collect do |tweet|
  262. next if !track_retweets? && tweet.retweet?
  263. # ignore older messages
  264. if (@channel.created_at - 15.days) > tweet.created_at.dup.utc || older_import >= older_import_max
  265. older_import += 1
  266. Rails.logger.debug "tweet to old: #{tweet.id}/#{tweet.created_at}"
  267. next
  268. end
  269. next if @rest_client.locale_sender?(tweet) && own_tweet_already_imported?(tweet)
  270. next if Ticket::Article.find_by(message_id: tweet.id)
  271. break if @rest_client.tweet_limit_reached(tweet)
  272. @rest_client.to_group(tweet, search[:group_id], @channel)
  273. end
  274. end
  275. end
  276. def fetch_mentions
  277. return if @sync[:mentions].blank?
  278. return if @sync[:mentions][:group_id].blank?
  279. Rails.logger.debug ' - searching for mentions'
  280. older_import = 0
  281. older_import_max = 20
  282. @rest_client.client.mentions_timeline.each do |tweet|
  283. next if !track_retweets? && tweet.retweet?
  284. # ignore older messages
  285. if (@channel.created_at - 15.days) > tweet.created_at.dup.utc || older_import >= older_import_max
  286. older_import += 1
  287. Rails.logger.debug "tweet to old: #{tweet.id}/#{tweet.created_at}"
  288. next
  289. end
  290. next if Ticket::Article.find_by(message_id: tweet.id)
  291. break if @rest_client.tweet_limit_reached(tweet)
  292. @rest_client.to_group(tweet, @sync[:mentions][:group_id], @channel)
  293. end
  294. end
  295. def fetch_direct_messages
  296. return if @sync[:direct_messages].blank?
  297. return if @sync[:direct_messages][:group_id].blank?
  298. Rails.logger.debug ' - searching for direct_messages'
  299. older_import = 0
  300. older_import_max = 20
  301. @rest_client.client.direct_messages(full_text: 'true').each do |tweet|
  302. # ignore older messages
  303. if (@channel.created_at - 15.days) > tweet.created_at.dup.utc || older_import >= older_import_max
  304. older_import += 1
  305. Rails.logger.debug "tweet to old: #{tweet.id}/#{tweet.created_at}"
  306. next
  307. end
  308. next if Ticket::Article.find_by(message_id: tweet.id)
  309. break if @rest_client.direct_message_limit_reached(tweet)
  310. @rest_client.to_group(tweet, @sync[:direct_messages][:group_id], @channel)
  311. end
  312. end
  313. def check_external_credential(options)
  314. if options[:auth] && options[:auth][:external_credential_id]
  315. external_credential = ExternalCredential.find_by(id: options[:auth][:external_credential_id])
  316. raise "No such ExternalCredential.find(#{options[:auth][:external_credential_id]})" if !external_credential
  317. options[:auth][:consumer_key] = external_credential.credentials['consumer_key']
  318. options[:auth][:consumer_secret] = external_credential.credentials['consumer_secret']
  319. end
  320. options
  321. end
  322. def track_retweets?
  323. @channel.options && @channel.options['sync'] && @channel.options['sync']['track_retweets']
  324. end
  325. def own_tweet_already_imported?(tweet)
  326. event_time = Time.zone.now
  327. sleep 4
  328. 12.times do |loop_count|
  329. if Ticket::Article.find_by(message_id: tweet.id)
  330. Rails.logger.debug "Own tweet already imported, skipping tweet #{tweet.id}"
  331. return true
  332. end
  333. count = Delayed::Job.where('created_at < ?', event_time).count
  334. break if count.zero?
  335. sleep_time = 2 * count
  336. sleep_time = 5 if sleep_time > 5
  337. Rails.logger.debug "Delay importing own tweets - sleep #{sleep_time} (loop #{loop_count})"
  338. sleep sleep_time
  339. end
  340. if Ticket::Article.find_by(message_id: tweet.id)
  341. Rails.logger.debug "Own tweet already imported, skipping tweet #{tweet.id}"
  342. return true
  343. end
  344. false
  345. end
  346. end