index.js 2.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113
  1. const config = require('../config');
  2. const Metadata = require('../metadata');
  3. const mozlog = require('../log');
  4. const createRedisClient = require('./redis');
  5. function getPrefix(seconds) {
  6. return Math.max(Math.floor(seconds / 86400), 1);
  7. }
  8. class DB {
  9. constructor(config) {
  10. let Storage = null;
  11. if (config.s3_bucket) {
  12. Storage = require('./s3');
  13. } else if (config.gcs_bucket) {
  14. Storage = require('./gcs');
  15. } else {
  16. Storage = require('./fs');
  17. }
  18. this.log = mozlog('send.storage');
  19. this.storage = new Storage(config, this.log);
  20. this.redis = createRedisClient(config);
  21. this.redis.on('error', err => {
  22. this.log.error('Redis:', err);
  23. });
  24. }
  25. async ttl(id) {
  26. const result = await this.redis.ttlAsync(id);
  27. return Math.ceil(result) * 1000;
  28. }
  29. async getPrefixedInfo(id) {
  30. const [prefix, dead, flagged] = await this.redis.hmgetAsync(
  31. id,
  32. 'prefix',
  33. 'dead',
  34. 'flagged'
  35. );
  36. return {
  37. filePath: `${prefix}-${id}`,
  38. flagged,
  39. dead
  40. };
  41. }
  42. async length(id) {
  43. const { filePath } = await this.getPrefixedInfo(id);
  44. return this.storage.length(filePath);
  45. }
  46. async get(id) {
  47. const info = await this.getPrefixedInfo(id);
  48. if (info.dead || info.flagged) {
  49. throw new Error(info.flagged ? 'flagged' : 'dead');
  50. }
  51. const length = await this.storage.length(info.filePath);
  52. return { length, stream: this.storage.getStream(info.filePath) };
  53. }
  54. async set(id, file, meta, expireSeconds = config.default_expire_seconds) {
  55. const prefix = getPrefix(expireSeconds);
  56. const filePath = `${prefix}-${id}`;
  57. await this.storage.set(filePath, file);
  58. if (meta) {
  59. this.redis.hmset(id, { prefix, ...meta });
  60. } else {
  61. this.redis.hset(id, 'prefix', prefix);
  62. }
  63. this.redis.expire(id, expireSeconds);
  64. }
  65. setField(id, key, value) {
  66. this.redis.hset(id, key, value);
  67. }
  68. async incrementField(id, key, increment = 1) {
  69. return await this.redis.hincrbyAsync(id, key, increment);
  70. }
  71. async kill(id) {
  72. const { filePath, dead } = await this.getPrefixedInfo(id);
  73. if (!dead) {
  74. this.redis.hset(id, 'dead', 1);
  75. this.storage.del(filePath);
  76. }
  77. }
  78. async flag(id) {
  79. await this.kill(id);
  80. this.redis.hset(id, 'flagged', 1);
  81. }
  82. async del(id) {
  83. const { filePath } = await this.getPrefixedInfo(id);
  84. this.redis.del(id);
  85. this.storage.del(filePath);
  86. }
  87. async ping() {
  88. await this.redis.pingAsync();
  89. await this.storage.ping();
  90. }
  91. async metadata(id) {
  92. const result = await this.redis.hgetallAsync(id);
  93. return result && new Metadata({ id, ...result }, this);
  94. }
  95. }
  96. module.exports = new DB(config);