packet_queue.h 3.0 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697
  1. #pragma once
  2. #include "udp_recv_packet.h"
  3. #include <library/cpp/threading/chunk_queue/queue.h>
  4. #include <util/network/init.h>
  5. #include <library/cpp/deprecated/atomic/atomic.h>
  6. #include <util/system/event.h>
  7. #include <util/system/yassert.h>
  8. #include <library/cpp/deprecated/atomic/atomic_ops.h>
  9. #include <utility>
  10. namespace NNetlibaSocket {
  11. struct TPacketMeta {
  12. sockaddr_in6 RemoteAddr;
  13. sockaddr_in6 MyAddr;
  14. };
  15. template <size_t TTNumWriterThreads>
  16. class TLockFreePacketQueue {
  17. private:
  18. static constexpr int MAX_PACKETS_IN_QUEUE = INT_MAX;
  19. static constexpr int CMD_QUEUE_RESERVE = 1 << 20;
  20. static constexpr int MAX_DATA_IN_QUEUE = 32 << 20;
  21. typedef std::pair<TUdpRecvPacket*, TPacketMeta> TPacket;
  22. typedef std::conditional_t<TTNumWriterThreads == 1, NThreading::TOneOneQueue<TPacket>, NThreading::TManyOneQueue<TPacket, TTNumWriterThreads>> TImpl;
  23. mutable TImpl Queue;
  24. mutable TSystemEvent QueueEvent;
  25. mutable TAtomic NumPackets;
  26. TAtomic DataSize;
  27. public:
  28. TLockFreePacketQueue()
  29. : NumPackets(0)
  30. , DataSize(0)
  31. {
  32. }
  33. ~TLockFreePacketQueue() {
  34. TPacket packet;
  35. while (Queue.Dequeue(packet)) {
  36. delete packet.first;
  37. }
  38. }
  39. bool IsDataPartFull() const {
  40. return (AtomicGet(NumPackets) >= MAX_PACKETS_IN_QUEUE || AtomicGet(DataSize) >= MAX_DATA_IN_QUEUE - CMD_QUEUE_RESERVE);
  41. }
  42. bool Push(TUdpRecvPacket* packet, const TPacketMeta& meta) {
  43. // simulate OS behavior on buffer overflow - drop packets.
  44. // yeah it contains small data race (we can add little bit more packets, but nobody cares)
  45. if (AtomicGet(NumPackets) >= MAX_PACKETS_IN_QUEUE || AtomicGet(DataSize) >= MAX_DATA_IN_QUEUE) {
  46. return false;
  47. }
  48. AtomicAdd(NumPackets, 1);
  49. AtomicAdd(DataSize, packet->DataSize);
  50. Y_ASSERT(packet->DataStart == 0);
  51. Queue.Enqueue(TPacket(std::make_pair(packet, meta)));
  52. QueueEvent.Signal();
  53. return true;
  54. }
  55. bool Pop(TUdpRecvPacket** packet, sockaddr_in6* srcAddr, sockaddr_in6* dstAddr) {
  56. TPacket p;
  57. if (!Queue.Dequeue(p)) {
  58. QueueEvent.Reset();
  59. if (!Queue.Dequeue(p)) {
  60. return false;
  61. }
  62. QueueEvent.Signal();
  63. }
  64. *packet = p.first;
  65. *srcAddr = p.second.RemoteAddr;
  66. *dstAddr = p.second.MyAddr;
  67. AtomicSub(NumPackets, 1);
  68. AtomicSub(DataSize, (*packet)->DataSize);
  69. Y_ASSERT(AtomicGet(NumPackets) >= 0 && AtomicGet(DataSize) >= 0);
  70. return true;
  71. }
  72. bool IsEmpty() const {
  73. return AtomicAdd(NumPackets, 0) == 0;
  74. }
  75. TSystemEvent& GetEvent() const {
  76. return QueueEvent;
  77. }
  78. };
  79. }