Browse Source

Added option to use multiple session instances.

Martin Edenhofer 7 years ago
parent
commit
486802770b

+ 0 - 1
app/assets/javascripts/app/controllers/ticket_overview.coffee

@@ -956,7 +956,6 @@ class Table extends App.Controller
     ticketListShow = []
     for ticket in tickets
       ticketListShow.push App.Ticket.find(ticket.id)
-    console.log('overview', overview)
     @overview = App.Overview.find(overview.id)
     @table.update(
       overviewAttributes: @overview.view.s

+ 48 - 16
lib/sessions.rb

@@ -109,8 +109,11 @@ returns
 =end
 
   def self.session_exists?(client_id)
-    client_ids = sessions
-    client_ids.include? client_id.to_s
+    session_dir = "#{@path}/#{client_id}"
+    return false if !File.exist?(session_dir)
+    session_file = "#{session_dir}/session"
+    return false if !File.exist?(session_file)
+    true
   end
 
 =begin
@@ -247,14 +250,14 @@ returns
     data         = nil
 
     # if no session dir exists, session got destoried
-    if !File.exist? session_dir
+    if !File.exist?(session_dir)
       destroy(client_id)
-      log('debug', "missing session directory for '#{client_id}', remove session.")
+      log('debug', "missing session directory #{session_dir} for '#{client_id}', remove session.")
       return
     end
 
     # if only session file is missing, then it's an error behavior
-    if !File.exist? session_file
+    if !File.exist?(session_file)
       destroy(client_id)
       log('error', "missing session file for '#{client_id}', remove session.")
       return
@@ -558,16 +561,47 @@ remove all session and spool messages
     data
   end
 
-  def self.jobs
+  def self.jobs(node_id = nil)
 
     # just make sure that spool path exists
     if !File.exist?(@path)
       FileUtils.mkpath @path
     end
 
+    # dispatch sessions
+    if node_id && node_id.zero?
+      loop do
+
+        # nodes
+        nodes_stats = Sessions::Node.stats
+
+        client_ids = sessions
+        client_ids.each do |client_id|
+
+          # ask nodes for nodes
+          next if nodes_stats[client_id]
+
+          # assigne to node
+          Sessions::Node.session_assigne(client_id)
+        end
+        sleep 1
+      end
+    end
+
     Thread.abort_on_exception = true
     loop do
-      client_ids = sessions
+
+      if node_id
+
+        # register node
+        Sessions::Node.register(node_id)
+
+        # watch for assigned sessions
+        client_ids = Sessions::Node.sessions_by(node_id)
+      else
+        client_ids = sessions
+      end
+
       client_ids.each do |client_id|
 
         # connection already open, ignore
@@ -586,7 +620,7 @@ remove all session and spool messages
 
         @@client_threads[client_id] = true
         @@client_threads[client_id] = Thread.new do
-          thread_client(client_id)
+          thread_client(client_id, 0, Time.now.utc, node_id)
           @@client_threads[client_id] = nil
           log('debug', "close client (#{client_id}) thread")
           if ActiveRecord::Base.connection.owner == Thread.current
@@ -629,10 +663,10 @@ returns
 
 =end
 
-  def self.thread_client(client_id, try_count = 0, try_run_time = Time.now.utc)
-    log('debug', "LOOP #{client_id} - #{try_count}")
+  def self.thread_client(client_id, try_count = 0, try_run_time = Time.now.utc, node_id)
+    log('debug', "LOOP #{node_id}.#{client_id} - #{try_count}")
     begin
-      Sessions::Client.new(client_id)
+      Sessions::Client.new(client_id, node_id)
     rescue => e
       log('error', "thread_client #{client_id} exited with error #{e.inspect}")
       log('error', e.backtrace.join("\n  ") )
@@ -654,13 +688,11 @@ returns
 
       # restart job again
       if try_run_max > try_count
-        thread_client(client_id, try_count, try_run_time)
-        return
+        thread_client(client_id, try_count, try_run_time, node_id)
       end
-
-      raise "STOP thread_client for client #{client_id} after #{try_run_max} tries"
+      raise "STOP thread_client for client #{node_id}.#{client_id} after #{try_run_max} tries"
     end
-    log('debug', "/LOOP #{client_id} - #{try_count}")
+    log('debug', "/LOOP #{node_id}.#{client_id} - #{try_count}")
   end
 
   def self.symbolize_keys(hash)

+ 6 - 2
lib/sessions/client.rb

@@ -1,7 +1,8 @@
 class Sessions::Client
 
-  def initialize(client_id)
+  def initialize(client_id, node_id)
     @client_id = client_id
+    @node_id = node_id
     log '---client start ws connection---'
     fetch
     log '---client exiting ws connection---'
@@ -22,6 +23,9 @@ class Sessions::Client
     loop_count = 0
     loop do
 
+      # check if session still exists
+      return if !Sessions.session_exists?(@client_id)
+
       # get connection user
       session_data = Sessions.get(@client_id)
       return if !session_data
@@ -71,6 +75,6 @@ class Sessions::Client
   end
 
   def log(msg)
-    Rails.logger.debug "client(#{@client_id}) #{msg}"
+    Rails.logger.debug "client(#{@node_id}.#{@client_id}) #{msg}"
   end
 end

+ 169 - 0
lib/sessions/node.rb

@@ -0,0 +1,169 @@
+module Sessions::Node
+
+  # get application root directory
+  @root = Dir.pwd.to_s
+  if @root.blank? || @root == '/'
+    @root = Rails.root
+  end
+
+  # get working directories
+  @path = "#{@root}/tmp/session_node_#{Rails.env}"
+
+  def self.session_assigne(client_id, force = false)
+
+    # get available nodes
+    nodes = Sessions::Node.registered
+    session_count = {}
+
+    nodes.each do |node|
+      count = Sessions::Node.sessions_by(node['node_id'], force).count
+      session_count[node['node_id']] = count
+    end
+
+    # search for lowest session count
+    node_id = nil
+    node_count = nil
+    session_count.each do |local_node_id, count|
+      next if !node_count.nil? && count > node_count
+      node_count = count
+      node_id = local_node_id
+    end
+
+    # assigne session
+    Rails.logger.info "Assigne session to node #{node_id} (#{client_id})"
+    Sessions::Node.sessions_for(node_id, client_id)
+
+    # write node status file
+    node_id
+  end
+
+  def self.cleanup
+    FileUtils.rm_rf @path
+  end
+
+  def self.registered
+    path = "#{@path}/*.status"
+    nodes = []
+    files = Dir.glob(path)
+    files.each do |filename|
+      File.open(filename, 'rb') do |file|
+        file.flock(File::LOCK_SH)
+        content = file.read
+        file.flock(File::LOCK_UN)
+        begin
+          data = JSON.parse(content)
+          nodes.push data
+        rescue => e
+          Rails.logger.error "can't parse status file #{filename}, #{e.inspect}"
+          #to_delete.push "#{path}/#{entry}"
+          #next
+        end
+      end
+    end
+    nodes
+  end
+
+  def self.register(node_id)
+    if !File.exist?(@path)
+      FileUtils.mkpath @path
+    end
+
+    status_file = "#{@path}/#{node_id}.status"
+
+    # write node status file
+    data = {
+      updated_at_human: Time.now.utc,
+      updated_at: Time.now.utc.to_i,
+      node_id: node_id.to_s,
+      pid: $PROCESS_ID,
+    }
+    content = data.to_json
+
+    # store session data in session file
+    File.open(status_file, 'wb') do |file|
+      file.write content
+    end
+
+  end
+
+  def self.stats
+    # read node sessions
+    path = "#{@path}/*.session"
+
+    sessions = {}
+    files = Dir.glob(path)
+    files.each do |filename|
+      File.open(filename, 'rb') do |file|
+        file.flock(File::LOCK_SH)
+        content = file.read
+        file.flock(File::LOCK_UN)
+        begin
+          next if content.blank?
+          data = JSON.parse(content)
+          next if data.blank?
+          next if data['client_id'].blank?
+          sessions[data['client_id']] = data['node_id']
+        rescue => e
+          Rails.logger.error "can't parse session file #{filename}, #{e.inspect}"
+          #to_delete.push "#{path}/#{entry}"
+          #next
+        end
+      end
+    end
+    sessions
+  end
+
+  def self.sessions_for(node_id, client_id)
+    if !File.exist?(@path)
+      FileUtils.mkpath @path
+    end
+
+    status_file = "#{@path}/#{node_id}.#{client_id}.session"
+
+    # write node status file
+    data = {
+      updated_at_human: Time.now.utc,
+      updated_at: Time.now.utc.to_i,
+      node_id: node_id.to_s,
+      client_id: client_id.to_s,
+      pid: $PROCESS_ID,
+    }
+    content = data.to_json
+
+    # store session data in session file
+    File.open(status_file, 'wb') do |file|
+      file.write content
+    end
+
+  end
+
+  def self.sessions_by(node_id, force = false)
+
+    # read node sessions
+    path = "#{@path}/#{node_id}.*.session"
+
+    sessions = []
+    files = Dir.glob(path)
+    files.each do |filename|
+      File.open(filename, 'rb') do |file|
+        file.flock(File::LOCK_SH)
+        content = file.read
+        file.flock(File::LOCK_UN)
+        begin
+          next if content.blank?
+          data = JSON.parse(content)
+          next if data.blank?
+          next if data['client_id'].blank?
+          next if !Sessions.session_exists?(data['client_id']) && force == false
+          sessions.push data['client_id']
+        rescue => e
+          Rails.logger.error "can't parse session file #{filename}, #{e.inspect}"
+          #to_delete.push "#{path}/#{entry}"
+          #next
+        end
+      end
+    end
+    sessions
+  end
+
+end