http_cache.cpp 25 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604
  1. #include "http.h"
  2. #include "http_proxy.h"
  3. #include "http_cache.h"
  4. #include <library/cpp/actors/core/actor_bootstrapped.h>
  5. #include <library/cpp/actors/core/executor_pool_basic.h>
  6. #include <library/cpp/actors/core/log.h>
  7. #include <library/cpp/actors/core/scheduler_basic.h>
  8. #include <library/cpp/actors/http/http.h>
  9. #include <library/cpp/digest/md5/md5.h>
  10. #include <util/digest/multi.h>
  11. #include <util/generic/queue.h>
  12. #include <util/string/cast.h>
  13. namespace NHttp {
  14. class THttpOutgoingCacheActor : public NActors::TActorBootstrapped<THttpOutgoingCacheActor>, THttpConfig {
  15. public:
  16. using TBase = NActors::TActorBootstrapped<THttpOutgoingCacheActor>;
  17. NActors::TActorId HttpProxyId;
  18. TGetCachePolicy GetCachePolicy;
  19. static constexpr TDuration RefreshTimeout = TDuration::Seconds(1);
  20. struct TCacheKey {
  21. TString Host;
  22. TString URL;
  23. TString Headers;
  24. operator size_t() const {
  25. return MultiHash(Host, URL, Headers);
  26. }
  27. TString GetId() const {
  28. return MD5::Calc(Host + ':' + URL + ':' + Headers);
  29. }
  30. };
  31. struct TCacheRecord {
  32. TInstant RefreshTime;
  33. TInstant DeathTime;
  34. TCachePolicy CachePolicy;
  35. NHttp::THttpOutgoingRequestPtr Request;
  36. NHttp::THttpOutgoingRequestPtr OutgoingRequest;
  37. TDuration Timeout;
  38. NHttp::THttpIncomingResponsePtr Response;
  39. TString Error;
  40. TVector<NHttp::TEvHttpProxy::TEvHttpOutgoingRequest::TPtr> Waiters;
  41. TCacheRecord(const TCachePolicy cachePolicy)
  42. : CachePolicy(cachePolicy)
  43. {}
  44. bool IsValid() const {
  45. return Response != nullptr || !Error.empty();
  46. }
  47. void UpdateResponse(NHttp::THttpIncomingResponsePtr response, const TString& error, TInstant now) {
  48. if (error.empty() || Response == nullptr || !CachePolicy.KeepOnError) {
  49. Response = response;
  50. Error = error;
  51. }
  52. RefreshTime = now + CachePolicy.TimeToRefresh;
  53. if (CachePolicy.PaceToRefresh) {
  54. RefreshTime += TDuration::MilliSeconds(RandomNumber<ui64>() % CachePolicy.PaceToRefresh.MilliSeconds());
  55. }
  56. }
  57. TString GetName() const {
  58. return TStringBuilder() << (Request->Secure ? "https://" : "http://") << Request->Host << Request->URL;
  59. }
  60. };
  61. struct TRefreshRecord {
  62. TCacheKey Key;
  63. TInstant RefreshTime;
  64. bool operator <(const TRefreshRecord& b) const {
  65. return RefreshTime > b.RefreshTime;
  66. }
  67. };
  68. THashMap<TCacheKey, TCacheRecord> Cache;
  69. TPriorityQueue<TRefreshRecord> RefreshQueue;
  70. THashMap<THttpOutgoingRequest*, TCacheKey> OutgoingRequests;
  71. THttpOutgoingCacheActor(const NActors::TActorId& httpProxyId, TGetCachePolicy getCachePolicy)
  72. : HttpProxyId(httpProxyId)
  73. , GetCachePolicy(std::move(getCachePolicy))
  74. {}
  75. static constexpr char ActorName[] = "HTTP_OUT_CACHE_ACTOR";
  76. void Bootstrap(const NActors::TActorContext&) {
  77. //
  78. Become(&THttpOutgoingCacheActor::StateWork, RefreshTimeout, new NActors::TEvents::TEvWakeup());
  79. }
  80. static TString GetCacheHeadersKey(const NHttp::THttpOutgoingRequest* request, const TCachePolicy& policy) {
  81. TStringBuilder key;
  82. if (!policy.HeadersToCacheKey.empty()) {
  83. NHttp::THeaders headers(request->Headers);
  84. for (const TString& header : policy.HeadersToCacheKey) {
  85. key << headers[header];
  86. }
  87. }
  88. return key;
  89. }
  90. static TCacheKey GetCacheKey(const NHttp::THttpOutgoingRequest* request, const TCachePolicy& policy) {
  91. return { ToString(request->Host), ToString(request->URL), GetCacheHeadersKey(request, policy) };
  92. }
  93. void Handle(NHttp::TEvHttpProxy::TEvHttpOutgoingResponse::TPtr event, const NActors::TActorContext& ctx) {
  94. ctx.Send(event->Forward(HttpProxyId));
  95. }
  96. void Handle(NHttp::TEvHttpProxy::TEvHttpIncomingRequest::TPtr event, const NActors::TActorContext& ctx) {
  97. ctx.Send(event->Forward(HttpProxyId));
  98. }
  99. void Handle(NHttp::TEvHttpProxy::TEvAddListeningPort::TPtr event, const NActors::TActorContext& ctx) {
  100. ctx.Send(event->Forward(HttpProxyId));
  101. }
  102. void Handle(NHttp::TEvHttpProxy::TEvRegisterHandler::TPtr event, const NActors::TActorContext& ctx) {
  103. ctx.Send(event->Forward(HttpProxyId));
  104. }
  105. void Handle(NHttp::TEvHttpProxy::TEvHttpIncomingResponse::TPtr event, const NActors::TActorContext& ctx) {
  106. NHttp::THttpOutgoingRequestPtr request(event->Get()->Request);
  107. NHttp::THttpIncomingResponsePtr response(event->Get()->Response);
  108. auto itRequests = OutgoingRequests.find(request.Get());
  109. if (itRequests == OutgoingRequests.end()) {
  110. LOG_ERROR_S(ctx, HttpLog, "Cache received response to unknown request " << request->Host << request->URL);
  111. return;
  112. }
  113. auto key = itRequests->second;
  114. OutgoingRequests.erase(itRequests);
  115. auto it = Cache.find(key);
  116. if (it == Cache.end()) {
  117. LOG_ERROR_S(ctx, HttpLog, "Cache received response to unknown cache key " << request->Host << request->URL);
  118. return;
  119. }
  120. TCacheRecord& cacheRecord = it->second;
  121. cacheRecord.OutgoingRequest.Reset();
  122. for (auto& waiter : cacheRecord.Waiters) {
  123. NHttp::THttpIncomingResponsePtr response2;
  124. TString error2;
  125. if (response != nullptr) {
  126. response2 = response->Duplicate(waiter->Get()->Request);
  127. }
  128. if (!event->Get()->Error.empty()) {
  129. error2 = event->Get()->Error;
  130. }
  131. ctx.Send(waiter->Sender, new NHttp::TEvHttpProxy::TEvHttpIncomingResponse(waiter->Get()->Request, response2, error2));
  132. }
  133. cacheRecord.Waiters.clear();
  134. TString error;
  135. if (event->Get()->Error.empty()) {
  136. if (event->Get()->Response != nullptr && event->Get()->Response->Status != "200") {
  137. error = event->Get()->Response->Message;
  138. }
  139. } else {
  140. error = event->Get()->Error;
  141. }
  142. if (!error.empty()) {
  143. LOG_WARN_S(ctx, HttpLog, "Error from " << cacheRecord.GetName() << ": " << error);
  144. }
  145. LOG_DEBUG_S(ctx, HttpLog, "OutgoingUpdate " << cacheRecord.GetName());
  146. cacheRecord.UpdateResponse(response, event->Get()->Error, ctx.Now());
  147. RefreshQueue.push({it->first, it->second.RefreshTime});
  148. LOG_DEBUG_S(ctx, HttpLog, "OutgoingSchedule " << cacheRecord.GetName() << " at " << cacheRecord.RefreshTime << " until " << cacheRecord.DeathTime);
  149. }
  150. void Handle(NHttp::TEvHttpProxy::TEvHttpOutgoingRequest::TPtr event, const NActors::TActorContext& ctx) {
  151. const NHttp::THttpOutgoingRequest* request = event->Get()->Request.Get();
  152. auto policy = GetCachePolicy(request);
  153. if (policy.TimeToExpire == TDuration()) {
  154. ctx.Send(event->Forward(HttpProxyId));
  155. return;
  156. }
  157. auto key = GetCacheKey(request, policy);
  158. auto it = Cache.find(key);
  159. if (it != Cache.end()) {
  160. if (it->second.IsValid()) {
  161. LOG_DEBUG_S(ctx, HttpLog, "OutgoingRespond "
  162. << it->second.GetName()
  163. << " ("
  164. << ((it->second.Response != nullptr) ? ToString(it->second.Response->Size()) : TString("error"))
  165. << ")");
  166. NHttp::THttpIncomingResponsePtr response = it->second.Response;
  167. if (response != nullptr) {
  168. response = response->Duplicate(event->Get()->Request);
  169. }
  170. ctx.Send(event->Sender,
  171. new NHttp::TEvHttpProxy::TEvHttpIncomingResponse(event->Get()->Request,
  172. response,
  173. it->second.Error));
  174. it->second.DeathTime = ctx.Now() + it->second.CachePolicy.TimeToExpire; // prolong active cache items
  175. return;
  176. }
  177. } else {
  178. it = Cache.emplace(key, policy).first;
  179. it->second.Request = event->Get()->Request;
  180. it->second.Timeout = event->Get()->Timeout;
  181. it->second.OutgoingRequest = it->second.Request->Duplicate();
  182. OutgoingRequests[it->second.OutgoingRequest.Get()] = key;
  183. LOG_DEBUG_S(ctx, HttpLog, "OutgoingInitiate " << it->second.GetName());
  184. ctx.Send(HttpProxyId, new NHttp::TEvHttpProxy::TEvHttpOutgoingRequest(it->second.OutgoingRequest, it->second.Timeout));
  185. }
  186. it->second.DeathTime = ctx.Now() + it->second.CachePolicy.TimeToExpire;
  187. it->second.Waiters.emplace_back(std::move(event));
  188. }
  189. void HandleRefresh(const NActors::TActorContext& ctx) {
  190. while (!RefreshQueue.empty() && RefreshQueue.top().RefreshTime <= ctx.Now()) {
  191. TRefreshRecord rrec = RefreshQueue.top();
  192. RefreshQueue.pop();
  193. auto it = Cache.find(rrec.Key);
  194. if (it != Cache.end()) {
  195. if (it->second.DeathTime > ctx.Now()) {
  196. LOG_DEBUG_S(ctx, HttpLog, "OutgoingRefresh " << it->second.GetName());
  197. it->second.OutgoingRequest = it->second.Request->Duplicate();
  198. OutgoingRequests[it->second.OutgoingRequest.Get()] = it->first;
  199. ctx.Send(HttpProxyId, new NHttp::TEvHttpProxy::TEvHttpOutgoingRequest(it->second.OutgoingRequest, it->second.Timeout));
  200. } else {
  201. LOG_DEBUG_S(ctx, HttpLog, "OutgoingForget " << it->second.GetName());
  202. if (it->second.OutgoingRequest) {
  203. OutgoingRequests.erase(it->second.OutgoingRequest.Get());
  204. }
  205. Cache.erase(it);
  206. }
  207. }
  208. }
  209. ctx.Schedule(RefreshTimeout, new NActors::TEvents::TEvWakeup());
  210. }
  211. STFUNC(StateWork) {
  212. switch (ev->GetTypeRewrite()) {
  213. HFunc(NHttp::TEvHttpProxy::TEvHttpIncomingResponse, Handle);
  214. HFunc(NHttp::TEvHttpProxy::TEvHttpOutgoingRequest, Handle);
  215. HFunc(NHttp::TEvHttpProxy::TEvAddListeningPort, Handle);
  216. HFunc(NHttp::TEvHttpProxy::TEvRegisterHandler, Handle);
  217. HFunc(NHttp::TEvHttpProxy::TEvHttpIncomingRequest, Handle);
  218. HFunc(NHttp::TEvHttpProxy::TEvHttpOutgoingResponse, Handle);
  219. CFunc(NActors::TEvents::TSystem::Wakeup, HandleRefresh);
  220. }
  221. }
  222. };
  223. const TDuration THttpOutgoingCacheActor::RefreshTimeout;
  224. class THttpIncomingCacheActor : public NActors::TActorBootstrapped<THttpIncomingCacheActor>, THttpConfig {
  225. public:
  226. using TBase = NActors::TActorBootstrapped<THttpIncomingCacheActor>;
  227. NActors::TActorId HttpProxyId;
  228. TGetCachePolicy GetCachePolicy;
  229. static constexpr TDuration RefreshTimeout = TDuration::Seconds(1);
  230. THashMap<TString, TActorId> Handlers;
  231. struct TCacheKey {
  232. TString Host;
  233. TString URL;
  234. TString Headers;
  235. operator size_t() const {
  236. return MultiHash(Host, URL, Headers);
  237. }
  238. TString GetId() const {
  239. return MD5::Calc(Host + ':' + URL + ':' + Headers);
  240. }
  241. };
  242. struct TCacheRecord {
  243. TInstant RefreshTime;
  244. TInstant DeathTime;
  245. TCachePolicy CachePolicy;
  246. TString CacheId;
  247. NHttp::THttpIncomingRequestPtr Request;
  248. TDuration Timeout;
  249. NHttp::THttpOutgoingResponsePtr Response;
  250. TVector<NHttp::TEvHttpProxy::TEvHttpIncomingRequest::TPtr> Waiters;
  251. ui32 Retries = 0;
  252. bool Enqueued = false;
  253. TCacheRecord(const TCachePolicy cachePolicy)
  254. : CachePolicy(cachePolicy)
  255. {}
  256. bool IsValid() const {
  257. return Response != nullptr;
  258. }
  259. void InitRequest(NHttp::THttpIncomingRequestPtr request) {
  260. Request = request;
  261. if (CachePolicy.TimeToExpire) {
  262. DeathTime = NActors::TlsActivationContext->Now() + CachePolicy.TimeToExpire;
  263. }
  264. }
  265. void UpdateResponse(NHttp::THttpOutgoingResponsePtr response, const TString& error, TInstant now) {
  266. if (error.empty() || !CachePolicy.KeepOnError) {
  267. Response = response;
  268. }
  269. Retries = 0;
  270. if (CachePolicy.TimeToRefresh) {
  271. RefreshTime = now + CachePolicy.TimeToRefresh;
  272. if (CachePolicy.PaceToRefresh) {
  273. RefreshTime += TDuration::MilliSeconds(RandomNumber<ui64>() % CachePolicy.PaceToRefresh.MilliSeconds());
  274. }
  275. }
  276. }
  277. void UpdateExpireTime() {
  278. if (CachePolicy.TimeToExpire) {
  279. DeathTime = NActors::TlsActivationContext->Now() + CachePolicy.TimeToExpire;
  280. }
  281. }
  282. TString GetName() const {
  283. return TStringBuilder() << (Request->Endpoint->Secure ? "https://" : "http://") << Request->Host << Request->URL
  284. << " (" << CacheId << ")";
  285. }
  286. };
  287. struct TRefreshRecord {
  288. TCacheKey Key;
  289. TInstant RefreshTime;
  290. bool operator <(const TRefreshRecord& b) const {
  291. return RefreshTime > b.RefreshTime;
  292. }
  293. };
  294. THashMap<TCacheKey, TCacheRecord> Cache;
  295. TPriorityQueue<TRefreshRecord> RefreshQueue;
  296. THashMap<THttpIncomingRequest*, TCacheKey> IncomingRequests;
  297. THttpIncomingCacheActor(const NActors::TActorId& httpProxyId, TGetCachePolicy getCachePolicy)
  298. : HttpProxyId(httpProxyId)
  299. , GetCachePolicy(std::move(getCachePolicy))
  300. {}
  301. static constexpr char ActorName[] = "HTTP_IN_CACHE_ACTOR";
  302. void Bootstrap(const NActors::TActorContext&) {
  303. //
  304. Become(&THttpIncomingCacheActor::StateWork, RefreshTimeout, new NActors::TEvents::TEvWakeup());
  305. }
  306. static TString GetCacheHeadersKey(const NHttp::THttpIncomingRequest* request, const TCachePolicy& policy) {
  307. TStringBuilder key;
  308. if (!policy.HeadersToCacheKey.empty()) {
  309. NHttp::THeaders headers(request->Headers);
  310. for (const TString& header : policy.HeadersToCacheKey) {
  311. key << headers[header];
  312. }
  313. }
  314. return key;
  315. }
  316. static TCacheKey GetCacheKey(const NHttp::THttpIncomingRequest* request, const TCachePolicy& policy) {
  317. return { ToString(request->Host), ToString(request->URL), GetCacheHeadersKey(request, policy) };
  318. }
  319. TActorId GetRequestHandler(NHttp::THttpIncomingRequestPtr request) {
  320. TStringBuf url = request->URL.Before('?');
  321. THashMap<TString, TActorId>::iterator it;
  322. while (!url.empty()) {
  323. it = Handlers.find(url);
  324. if (it != Handlers.end()) {
  325. return it->second;
  326. } else {
  327. if (url.EndsWith('/')) {
  328. url.Trunc(url.size() - 1);
  329. }
  330. size_t pos = url.rfind('/');
  331. if (pos == TStringBuf::npos) {
  332. break;
  333. } else {
  334. url = url.substr(0, pos + 1);
  335. }
  336. }
  337. }
  338. return {};
  339. }
  340. void SendCacheRequest(const TCacheKey& cacheKey, TCacheRecord& cacheRecord, const NActors::TActorContext& ctx) {
  341. cacheRecord.Request = cacheRecord.Request->Duplicate();
  342. cacheRecord.Request->AcceptEncoding.Clear(); // disable compression
  343. IncomingRequests[cacheRecord.Request.Get()] = cacheKey;
  344. TActorId handler = GetRequestHandler(cacheRecord.Request);
  345. if (handler) {
  346. Send(handler, new NHttp::TEvHttpProxy::TEvHttpIncomingRequest(cacheRecord.Request));
  347. } else {
  348. LOG_ERROR_S(ctx, HttpLog, "Can't find cache handler for " << cacheRecord.GetName());
  349. }
  350. }
  351. void DropCacheRecord(THashMap<TCacheKey, TCacheRecord>::iterator it) {
  352. if (it->second.Request) {
  353. IncomingRequests.erase(it->second.Request.Get());
  354. }
  355. for (auto& waiter : it->second.Waiters) {
  356. NHttp::THttpOutgoingResponsePtr response;
  357. response = waiter->Get()->Request->CreateResponseGatewayTimeout("Timeout", "text/plain");
  358. Send(waiter->Sender, new NHttp::TEvHttpProxy::TEvHttpOutgoingResponse(response));
  359. }
  360. Cache.erase(it);
  361. }
  362. void Handle(NHttp::TEvHttpProxy::TEvHttpIncomingResponse::TPtr event, const NActors::TActorContext& ctx) {
  363. ctx.Send(event->Forward(HttpProxyId));
  364. }
  365. void Handle(NHttp::TEvHttpProxy::TEvHttpOutgoingRequest::TPtr event, const NActors::TActorContext& ctx) {
  366. ctx.Send(event->Forward(HttpProxyId));
  367. }
  368. void Handle(NHttp::TEvHttpProxy::TEvAddListeningPort::TPtr event, const NActors::TActorContext& ctx) {
  369. ctx.Send(event->Forward(HttpProxyId));
  370. }
  371. void Handle(NHttp::TEvHttpProxy::TEvRegisterHandler::TPtr event, const NActors::TActorContext& ctx) {
  372. Handlers[event->Get()->Path] = event->Get()->Handler;
  373. ctx.Send(HttpProxyId, new NHttp::TEvHttpProxy::TEvRegisterHandler(event->Get()->Path, ctx.SelfID));
  374. }
  375. void Handle(NHttp::TEvHttpProxy::TEvHttpOutgoingResponse::TPtr event, const NActors::TActorContext& ctx) {
  376. NHttp::THttpIncomingRequestPtr request(event->Get()->Response->GetRequest());
  377. NHttp::THttpOutgoingResponsePtr response(event->Get()->Response);
  378. auto itRequests = IncomingRequests.find(request.Get());
  379. if (itRequests == IncomingRequests.end()) {
  380. LOG_ERROR_S(ctx, HttpLog, "Cache received response to unknown request " << request->Host << request->URL);
  381. return;
  382. }
  383. TCacheKey key = itRequests->second;
  384. auto it = Cache.find(key);
  385. if (it == Cache.end()) {
  386. LOG_ERROR_S(ctx, HttpLog, "Cache received response to unknown cache key " << request->Host << request->URL);
  387. return;
  388. }
  389. IncomingRequests.erase(itRequests);
  390. TCacheRecord& cacheRecord = it->second;
  391. TStringBuf status;
  392. TString error;
  393. if (event->Get()->Response != nullptr) {
  394. status = event->Get()->Response->Status;
  395. if (!status.StartsWith("2")) {
  396. error = event->Get()->Response->Message;
  397. }
  398. }
  399. if (cacheRecord.CachePolicy.RetriesCount > 0) {
  400. auto itStatusToRetry = std::find(cacheRecord.CachePolicy.StatusesToRetry.begin(), cacheRecord.CachePolicy.StatusesToRetry.end(), status);
  401. if (itStatusToRetry != cacheRecord.CachePolicy.StatusesToRetry.end()) {
  402. if (cacheRecord.Retries < cacheRecord.CachePolicy.RetriesCount) {
  403. ++cacheRecord.Retries;
  404. LOG_WARN_S(ctx, HttpLog, "IncomingRetry " << cacheRecord.GetName() << ": " << status << " " << error);
  405. SendCacheRequest(key, cacheRecord, ctx);
  406. return;
  407. }
  408. }
  409. }
  410. for (auto& waiter : cacheRecord.Waiters) {
  411. NHttp::THttpOutgoingResponsePtr response2;
  412. response2 = response->Duplicate(waiter->Get()->Request);
  413. ctx.Send(waiter->Sender, new NHttp::TEvHttpProxy::TEvHttpOutgoingResponse(response2));
  414. }
  415. cacheRecord.Waiters.clear();
  416. if (!error.empty()) {
  417. LOG_WARN_S(ctx, HttpLog, "Error from " << cacheRecord.GetName() << ": " << error);
  418. if (!cacheRecord.Response) {
  419. LOG_DEBUG_S(ctx, HttpLog, "IncomingDiscard " << cacheRecord.GetName());
  420. DropCacheRecord(it);
  421. return;
  422. }
  423. }
  424. if (cacheRecord.CachePolicy.TimeToRefresh) {
  425. LOG_DEBUG_S(ctx, HttpLog, "IncomingUpdate " << cacheRecord.GetName());
  426. cacheRecord.UpdateResponse(response, error, ctx.Now());
  427. if (!cacheRecord.Enqueued) {
  428. RefreshQueue.push({it->first, it->second.RefreshTime});
  429. cacheRecord.Enqueued = true;
  430. }
  431. LOG_DEBUG_S(ctx, HttpLog, "IncomingSchedule " << cacheRecord.GetName() << " at " << cacheRecord.RefreshTime << " until " << cacheRecord.DeathTime);
  432. } else {
  433. LOG_DEBUG_S(ctx, HttpLog, "IncomingDrop " << cacheRecord.GetName());
  434. DropCacheRecord(it);
  435. }
  436. }
  437. void Handle(NHttp::TEvHttpProxy::TEvHttpIncomingRequest::TPtr event, const NActors::TActorContext& ctx) {
  438. const NHttp::THttpIncomingRequest* request = event->Get()->Request.Get();
  439. TCachePolicy policy = GetCachePolicy(request);
  440. if (policy.TimeToExpire == TDuration() && policy.RetriesCount == 0) {
  441. TActorId handler = GetRequestHandler(event->Get()->Request);
  442. if (handler) {
  443. ctx.Send(event->Forward(handler));
  444. }
  445. return;
  446. }
  447. auto key = GetCacheKey(request, policy);
  448. auto it = Cache.find(key);
  449. if (it != Cache.end() && !policy.DiscardCache) {
  450. it->second.UpdateExpireTime();
  451. if (it->second.IsValid()) {
  452. LOG_DEBUG_S(ctx, HttpLog, "IncomingRespond "
  453. << it->second.GetName()
  454. << " ("
  455. << ((it->second.Response != nullptr) ? ToString(it->second.Response->Size()) : TString("error"))
  456. << ")");
  457. NHttp::THttpOutgoingResponsePtr response = it->second.Response;
  458. if (response != nullptr) {
  459. response = response->Duplicate(event->Get()->Request);
  460. }
  461. ctx.Send(event->Sender, new NHttp::TEvHttpProxy::TEvHttpOutgoingResponse(response));
  462. return;
  463. }
  464. } else {
  465. it = Cache.emplace(key, policy).first;
  466. it->second.CacheId = key.GetId(); // for debugging
  467. it->second.InitRequest(event->Get()->Request);
  468. if (policy.DiscardCache) {
  469. LOG_DEBUG_S(ctx, HttpLog, "IncomingDiscardCache " << it->second.GetName());
  470. }
  471. LOG_DEBUG_S(ctx, HttpLog, "IncomingInitiate " << it->second.GetName());
  472. SendCacheRequest(key, it->second, ctx);
  473. }
  474. it->second.Waiters.emplace_back(std::move(event));
  475. }
  476. void HandleRefresh(const NActors::TActorContext& ctx) {
  477. while (!RefreshQueue.empty() && RefreshQueue.top().RefreshTime <= ctx.Now()) {
  478. TRefreshRecord rrec = RefreshQueue.top();
  479. RefreshQueue.pop();
  480. auto it = Cache.find(rrec.Key);
  481. if (it != Cache.end()) {
  482. it->second.Enqueued = false;
  483. if (it->second.DeathTime > ctx.Now()) {
  484. LOG_DEBUG_S(ctx, HttpLog, "IncomingRefresh " << it->second.GetName());
  485. SendCacheRequest(it->first, it->second, ctx);
  486. } else {
  487. LOG_DEBUG_S(ctx, HttpLog, "IncomingForget " << it->second.GetName());
  488. DropCacheRecord(it);
  489. }
  490. }
  491. }
  492. ctx.Schedule(RefreshTimeout, new NActors::TEvents::TEvWakeup());
  493. }
  494. STFUNC(StateWork) {
  495. switch (ev->GetTypeRewrite()) {
  496. HFunc(NHttp::TEvHttpProxy::TEvHttpIncomingResponse, Handle);
  497. HFunc(NHttp::TEvHttpProxy::TEvHttpOutgoingRequest, Handle);
  498. HFunc(NHttp::TEvHttpProxy::TEvAddListeningPort, Handle);
  499. HFunc(NHttp::TEvHttpProxy::TEvRegisterHandler, Handle);
  500. HFunc(NHttp::TEvHttpProxy::TEvHttpIncomingRequest, Handle);
  501. HFunc(NHttp::TEvHttpProxy::TEvHttpOutgoingResponse, Handle);
  502. CFunc(NActors::TEvents::TSystem::Wakeup, HandleRefresh);
  503. }
  504. }
  505. };
  506. TCachePolicy GetDefaultCachePolicy(const THttpRequest* request, const TCachePolicy& defaultPolicy) {
  507. TCachePolicy policy = defaultPolicy;
  508. THeaders headers(request->Headers);
  509. TStringBuf cacheControl(headers["Cache-Control"]);
  510. while (TStringBuf cacheItem = cacheControl.NextTok(',')) {
  511. Trim(cacheItem, ' ');
  512. if (cacheItem == "no-store" || cacheItem == "no-cache") {
  513. policy.DiscardCache = true;
  514. }
  515. TStringBuf itemName = cacheItem.NextTok('=');
  516. TrimEnd(itemName, ' ');
  517. TrimBegin(cacheItem, ' ');
  518. if (itemName == "max-age") {
  519. policy.TimeToRefresh = policy.TimeToExpire = TDuration::Seconds(FromString(cacheItem));
  520. }
  521. if (itemName == "min-fresh") {
  522. policy.TimeToRefresh = policy.TimeToExpire = TDuration::Seconds(FromString(cacheItem));
  523. }
  524. if (itemName == "stale-if-error") {
  525. policy.KeepOnError = true;
  526. }
  527. }
  528. return policy;
  529. }
  530. NActors::IActor* CreateHttpCache(const NActors::TActorId& httpProxyId, TGetCachePolicy cachePolicy) {
  531. return new THttpOutgoingCacheActor(httpProxyId, std::move(cachePolicy));
  532. }
  533. NActors::IActor* CreateOutgoingHttpCache(const NActors::TActorId& httpProxyId, TGetCachePolicy cachePolicy) {
  534. return new THttpOutgoingCacheActor(httpProxyId, std::move(cachePolicy));
  535. }
  536. NActors::IActor* CreateIncomingHttpCache(const NActors::TActorId& httpProxyId, TGetCachePolicy cachePolicy) {
  537. return new THttpIncomingCacheActor(httpProxyId, std::move(cachePolicy));
  538. }
  539. }