|
@@ -423,9 +423,10 @@ returns
|
|
}
|
|
}
|
|
files.sort.each { |entry|
|
|
files.sort.each { |entry|
|
|
filename = "#{path}/#{entry}"
|
|
filename = "#{path}/#{entry}"
|
|
- if /^send/ =~ entry
|
|
|
|
- data.push Sessions.queue_file_read(path, entry)
|
|
|
|
- end
|
|
|
|
|
|
+ next if entry !~ /^send/
|
|
|
|
+ message = Sessions.queue_file_read(path, entry)
|
|
|
|
+ next if !message
|
|
|
|
+ data.push message
|
|
}
|
|
}
|
|
data
|
|
data
|
|
end
|
|
end
|
|
@@ -434,12 +435,17 @@ returns
|
|
file_old = "#{path}#{filename}"
|
|
file_old = "#{path}#{filename}"
|
|
file_new = "#{path}a-#{filename}"
|
|
file_new = "#{path}a-#{filename}"
|
|
FileUtils.mv(file_old, file_new)
|
|
FileUtils.mv(file_old, file_new)
|
|
- all = ''
|
|
|
|
|
|
+ message = ''
|
|
File.open(file_new, 'rb') { |file|
|
|
File.open(file_new, 'rb') { |file|
|
|
- all = file.read
|
|
|
|
|
|
+ message = file.read
|
|
}
|
|
}
|
|
File.delete(file_new)
|
|
File.delete(file_new)
|
|
- JSON.parse(all)
|
|
|
|
|
|
+ begin
|
|
|
|
+ return JSON.parse(message)
|
|
|
|
+ rescue => e
|
|
|
|
+ log('error', "can't parse queue message: #{message}, #{e.inspect}")
|
|
|
|
+ return
|
|
|
|
+ end
|
|
end
|
|
end
|
|
|
|
|
|
def self.cleanup
|
|
def self.cleanup
|
|
@@ -478,12 +484,13 @@ returns
|
|
filename = "#{path}/#{entry}"
|
|
filename = "#{path}/#{entry}"
|
|
next if !File.exist?(filename)
|
|
next if !File.exist?(filename)
|
|
File.open(filename, 'rb') { |file|
|
|
File.open(filename, 'rb') { |file|
|
|
- all = file.read
|
|
|
|
- spool = JSON.parse(all)
|
|
|
|
|
|
+ message = file.read
|
|
begin
|
|
begin
|
|
|
|
+ spool = JSON.parse(message)
|
|
message_parsed = JSON.parse(spool['msg'])
|
|
message_parsed = JSON.parse(spool['msg'])
|
|
rescue => e
|
|
rescue => e
|
|
log('error', "can't parse spool message: #{message}, #{e.inspect}")
|
|
log('error', "can't parse spool message: #{message}, #{e.inspect}")
|
|
|
|
+ to_delete.push "#{path}/#{entry}"
|
|
next
|
|
next
|
|
end
|
|
end
|
|
|
|
|