Browse Source

Restoring authorship annotation for <>. Commit 2 of 2.

blaze 3 years ago

+ 202 - 202

@@ -1,17 +1,17 @@
-#include "dnscache.h" 
+#include "dnscache.h"
 #include "probes.h"
-#include "timekeeper.h" 
-#include <contrib/libs/c-ares/ares.h> 
-#include <util/system/guard.h> 
+#include "timekeeper.h"
+#include <contrib/libs/c-ares/ares.h>
+#include <util/system/guard.h>
 #include <util/datetime/systime.h>
 const TDnsCache::THost TDnsCache::NullHost;
 static_assert(sizeof(ares_channel) == sizeof(void*), "expect sizeof(ares_channel) == sizeof(void *)");
 TDnsCache::TDnsCache(bool allowIpv4, bool allowIpv6, time_t lifetime, time_t neg, ui32 timeout)
     : EntryLifetime(lifetime)
     , NegativeLifetime(neg)
@@ -30,28 +30,28 @@ TDnsCache::TDnsCache(bool allowIpv4, bool allowIpv6, time_t lifetime, time_t neg
-    ares_channel chan; 
-    if (ares_init(&chan) != ARES_SUCCESS) { 
+    ares_channel chan;
+    if (ares_init(&chan) != ARES_SUCCESS) {
-        ythrow yexception() << "ares_init() failed"; 
-    } 
-    Channel = chan; 
+        ythrow yexception() << "ares_init() failed";
+    }
+    Channel = chan;
 TDnsCache::~TDnsCache(void) {
-    ares_channel chan = static_cast<ares_channel>(Channel); 
-    ares_cancel(chan); 
-    ares_destroy(chan); 
+    ares_channel chan = static_cast<ares_channel>(Channel);
+    ares_cancel(chan);
+    ares_destroy(chan);
 #ifdef _win_
 TString TDnsCache::GetHostByAddr(const NAddr::IRemoteAddr& addr) {
     in6_addr key;
@@ -146,14 +146,14 @@ void TDnsCache::GetAllAddresses(
 void TDnsCache::GetStats(ui64& a_cache_hits, ui64& a_cache_misses,
                          ui64& ptr_cache_hits, ui64& ptr_cache_misses) {
-    TGuard<TMutex> lock(CacheMtx); 
-    a_cache_hits = ACacheHits; 
-    a_cache_misses = ACacheMisses; 
-    ptr_cache_hits = PtrCacheHits; 
-    ptr_cache_misses = PtrCacheMisses; 
+    TGuard<TMutex> lock(CacheMtx);
+    a_cache_hits = ACacheHits;
+    a_cache_misses = ACacheMisses;
+    ptr_cache_hits = PtrCacheHits;
+    ptr_cache_misses = PtrCacheMisses;
 bool TDnsCache::THost::IsStale(int family, const TDnsCache* ctx) const noexcept {
     time_t resolved = family == AF_INET ? ResolvedV4 : ResolvedV6;
     time_t notfound = family == AF_INET ? NotFoundV4 : NotFoundV6;
@@ -174,247 +174,247 @@ TDnsCache::Resolve(const TString& hostname, int family, bool cacheOnly) {
         return NullHost;
-    THostCache::iterator p; 
+    THostCache::iterator p;
     Y_ASSERT(family == AF_INET || family == AF_INET6);
-    { 
-        TGuard<TMutex> lock(CacheMtx); 
-        p = HostCache.find(hostname); 
-        if (p != HostCache.end()) { 
+    {
+        TGuard<TMutex> lock(CacheMtx);
+        p = HostCache.find(hostname);
+        if (p != HostCache.end()) {
             if (!p->second.IsStale(family, this)) {
-                /* Recently resolved, just return cached value */ 
-                ACacheHits += 1; 
+                /* Recently resolved, just return cached value */
+                ACacheHits += 1;
                 THost& host = p->second;
                 LWPROBE(ResolveFromCache, hostname, family, host.AddrsV4ToString(), host.AddrsV6ToString(), ACacheHits);
                 return host;
             } else {
                 LWPROBE(ResolveCacheTimeout, hostname);
-            } 
-        } else { 
-            /* Never resolved, create cache entry */ 
+            }
+        } else {
+            /* Never resolved, create cache entry */
             LWPROBE(ResolveCacheNew, hostname);
             p = HostCache.insert(std::make_pair(hostname, THost())).first;
-        } 
-        ACacheMisses += 1; 
-    } 
+        }
+        ACacheMisses += 1;
+    }
     if (cacheOnly)
         return NullHost;
     TAtomic& inprogress = (family == AF_INET ? p->second.InProgressV4 : p->second.InProgressV6);
-    { 
-        /* This way only! CacheMtx should always be taken AFTER AresMtx, 
-         * because later in ares_process it can only be done this way. 
-         * Lock order reversal will cause deadlock in unfortunate monents. 
-         */ 
-        TGuard<TMutex> areslock(AresMtx); 
-        TGuard<TMutex> cachelock(CacheMtx); 
+    {
+        /* This way only! CacheMtx should always be taken AFTER AresMtx,
+         * because later in ares_process it can only be done this way.
+         * Lock order reversal will cause deadlock in unfortunate monents.
+         */
+        TGuard<TMutex> areslock(AresMtx);
+        TGuard<TMutex> cachelock(CacheMtx);
         if (!inprogress) {
-            ares_channel chan = static_cast<ares_channel>(Channel); 
+            ares_channel chan = static_cast<ares_channel>(Channel);
             TGHBNContext* ctx = new TGHBNContext();
-            ctx->Owner = this; 
-            ctx->Hostname = hostname; 
-            ctx->Family = family; 
+            ctx->Owner = this;
+            ctx->Hostname = hostname;
+            ctx->Family = family;
             AtomicSet(inprogress, 1);
-            ares_gethostbyname(chan, hostname.c_str(), family, 
+            ares_gethostbyname(chan, hostname.c_str(), family,
                                &TDnsCache::GHBNCallback, ctx);
-        } 
-    } 
-    WaitTask(inprogress); 
+        }
+    }
+    WaitTask(inprogress);
     LWPROBE(ResolveDone, hostname, family, p->second.AddrsV4ToString(), p->second.AddrsV6ToString());
-    return p->second; 
+    return p->second;
 bool TDnsCache::ValidateHName(const TString& name) const noexcept {
     return name.size() > 0;
 const TDnsCache::TAddr& TDnsCache::ResolveAddr(const in6_addr& addr, int family) {
-    TAddrCache::iterator p; 
-    { 
-        TGuard<TMutex> lock(CacheMtx); 
-        p = AddrCache.find(addr); 
-        if (p != AddrCache.end()) { 
+    TAddrCache::iterator p;
+    {
+        TGuard<TMutex> lock(CacheMtx);
+        p = AddrCache.find(addr);
+        if (p != AddrCache.end()) {
             if (TTimeKeeper::GetTime() - p->second.Resolved < EntryLifetime || TTimeKeeper::GetTime() - p->second.NotFound < NegativeLifetime) {
-                /* Recently resolved, just return cached value */ 
-                PtrCacheHits += 1; 
-                return p->second; 
-            } 
-        } else { 
-            /* Never resolved, create cache entry */ 
+                /* Recently resolved, just return cached value */
+                PtrCacheHits += 1;
+                return p->second;
+            }
+        } else {
+            /* Never resolved, create cache entry */
             p = AddrCache.insert(std::make_pair(addr, TAddr())).first;
-        } 
-        PtrCacheMisses += 1; 
-    } 
-    { 
-        /* This way only! CacheMtx should always be taken AFTER AresMtx, 
-         * because later in ares_process it can only be done this way. 
-         * Lock order reversal will cause deadlock in unfortunate monents. 
-         */ 
-        TGuard<TMutex> areslock(AresMtx); 
-        TGuard<TMutex> cachelock(CacheMtx); 
+        }
+        PtrCacheMisses += 1;
+    }
+    {
+        /* This way only! CacheMtx should always be taken AFTER AresMtx,
+         * because later in ares_process it can only be done this way.
+         * Lock order reversal will cause deadlock in unfortunate monents.
+         */
+        TGuard<TMutex> areslock(AresMtx);
+        TGuard<TMutex> cachelock(CacheMtx);
         if (!p->second.InProgress) {
-            ares_channel chan = static_cast<ares_channel>(Channel); 
+            ares_channel chan = static_cast<ares_channel>(Channel);
             TGHBAContext* ctx = new TGHBAContext();
-            ctx->Owner = this; 
-            ctx->Addr = addr; 
+            ctx->Owner = this;
+            ctx->Addr = addr;
             AtomicSet(p->second.InProgress, 1);
-            ares_gethostbyaddr(chan, &addr, 
+            ares_gethostbyaddr(chan, &addr,
                                family == AF_INET ? sizeof(in_addr) : sizeof(in6_addr),
                                family, &TDnsCache::GHBACallback, ctx);
-        } 
-    } 
-    WaitTask(p->second.InProgress); 
-    return p->second; 
+        }
+    }
+    WaitTask(p->second.InProgress);
+    return p->second;
 void TDnsCache::WaitTask(TAtomic& flag) {
     const TInstant start = TInstant(TTimeKeeper::GetTimeval());
     while (AtomicGet(flag)) {
-        ares_channel chan = static_cast<ares_channel>(Channel); 
-        struct pollfd pfd[ARES_GETSOCK_MAXNUM]; 
-        int nfds; 
-        ares_socket_t socks[ARES_GETSOCK_MAXNUM]; 
-        int bits; 
-        { 
-            TGuard<TMutex> lock(AresMtx); 
-            bits = ares_getsock(chan, socks, ARES_GETSOCK_MAXNUM); 
-            if (bits == 0) { 
-                /* other thread did our job */ 
-                continue; 
-            } 
-        } 
-        for (nfds = 0; nfds < ARES_GETSOCK_MAXNUM; nfds++) { 
-            pfd[nfds].events = 0; 
-            pfd[nfds].revents = 0; 
-            if (ARES_GETSOCK_READABLE(bits, nfds)) { 
-                pfd[nfds].fd = socks[nfds]; 
+        ares_channel chan = static_cast<ares_channel>(Channel);
+        struct pollfd pfd[ARES_GETSOCK_MAXNUM];
+        int nfds;
+        ares_socket_t socks[ARES_GETSOCK_MAXNUM];
+        int bits;
+        {
+            TGuard<TMutex> lock(AresMtx);
+            bits = ares_getsock(chan, socks, ARES_GETSOCK_MAXNUM);
+            if (bits == 0) {
+                /* other thread did our job */
+                continue;
+            }
+        }
+        for (nfds = 0; nfds < ARES_GETSOCK_MAXNUM; nfds++) {
+            pfd[nfds].events = 0;
+            pfd[nfds].revents = 0;
+            if (ARES_GETSOCK_READABLE(bits, nfds)) {
+                pfd[nfds].fd = socks[nfds];
                 pfd[nfds].events |= POLLRDNORM | POLLIN;
-            } 
-            if (ARES_GETSOCK_WRITABLE(bits, nfds)) { 
-                pfd[nfds].fd = socks[nfds]; 
+            }
+            if (ARES_GETSOCK_WRITABLE(bits, nfds)) {
+                pfd[nfds].fd = socks[nfds];
                 pfd[nfds].events |= POLLWRNORM | POLLOUT;
-            } 
-            if (pfd[nfds].events == 0) { 
-                break; 
-            } 
-        } 
+            }
+            if (pfd[nfds].events == 0) {
+                break;
+            }
+        }
         Y_ASSERT(nfds != 0);
         const TDuration left = TInstant(TTimeKeeper::GetTimeval()) - start;
         const TDuration wait = Max(Timeout - left, TDuration::Zero());
         int rv = poll(pfd, nfds, wait.MilliSeconds());
-        if (rv == -1) { 
-            if (errno == EINTR) { 
-                continue; 
-            } 
-            /* Unknown error in select, can't recover. Just pretend there was no reply */ 
-            rv = 0; 
-        } 
-        if (rv == 0) { 
-            /* poll() timed out */ 
-            TGuard<TMutex> lock(AresMtx); 
-            ares_process_fd(chan, ARES_SOCKET_BAD, ARES_SOCKET_BAD); 
-        } else { 
-            for (int i = 0; i < nfds; i++) { 
-                if (pfd[i].revents == 0) { 
-                    continue; 
-                } 
-                TGuard<TMutex> lock(AresMtx); 
-                ares_process_fd(chan, 
+        if (rv == -1) {
+            if (errno == EINTR) {
+                continue;
+            }
+            /* Unknown error in select, can't recover. Just pretend there was no reply */
+            rv = 0;
+        }
+        if (rv == 0) {
+            /* poll() timed out */
+            TGuard<TMutex> lock(AresMtx);
+            ares_process_fd(chan, ARES_SOCKET_BAD, ARES_SOCKET_BAD);
+        } else {
+            for (int i = 0; i < nfds; i++) {
+                if (pfd[i].revents == 0) {
+                    continue;
+                }
+                TGuard<TMutex> lock(AresMtx);
+                ares_process_fd(chan,
                                 pfd[i].revents & (POLLRDNORM | POLLIN)
                                     ? pfd[i].fd
                                     : ARES_SOCKET_BAD,
                                 pfd[i].revents & (POLLWRNORM | POLLOUT)
                                     ? pfd[i].fd
                                     : ARES_SOCKET_BAD);
-            } 
-        } 
+            }
+        }
         if (start + Timeout <= TInstant(TTimeKeeper::GetTimeval())) {
-    } 
+    }
 void TDnsCache::GHBNCallback(void* arg, int status, int, struct hostent* info) {
     THolder<TGHBNContext> ctx(static_cast<TGHBNContext*>(arg));
-    TGuard<TMutex> lock(ctx->Owner->CacheMtx); 
-    THostCache::iterator p = ctx->Owner->HostCache.find(ctx->Hostname); 
+    TGuard<TMutex> lock(ctx->Owner->CacheMtx);
+    THostCache::iterator p = ctx->Owner->HostCache.find(ctx->Hostname);
     Y_ASSERT(p != ctx->Owner->HostCache.end());
     time_t& resolved = (ctx->Family == AF_INET ? p->second.ResolvedV4 : p->second.ResolvedV6);
     time_t& notfound = (ctx->Family == AF_INET ? p->second.NotFoundV4 : p->second.NotFoundV6);
     TAtomic& inprogress = (ctx->Family == AF_INET ? p->second.InProgressV4 : p->second.InProgressV6);
-    if (status == ARES_SUCCESS) { 
-        if (info->h_addrtype == AF_INET) { 
-            p->second.AddrsV4.clear(); 
+    if (status == ARES_SUCCESS) {
+        if (info->h_addrtype == AF_INET) {
+            p->second.AddrsV4.clear();
             for (int i = 0; info->h_addr_list[i] != nullptr; i++) {
-            } 
-            /* It is possible to ask ares for IPv6 and have IPv4 addrs instead, 
-               so take care and set V4 timers anyway. 
-             */ 
-            p->second.ResolvedV4 = TTimeKeeper::GetTime(); 
-            p->second.ResolvedV4 = 0; 
+            }
+            /* It is possible to ask ares for IPv6 and have IPv4 addrs instead,
+               so take care and set V4 timers anyway.
+             */
+            p->second.ResolvedV4 = TTimeKeeper::GetTime();
+            p->second.ResolvedV4 = 0;
             AtomicSet(p->second.InProgressV4, 0);
-        } else if (info->h_addrtype == AF_INET6) { 
-            p->second.AddrsV6.clear(); 
+        } else if (info->h_addrtype == AF_INET6) {
+            p->second.AddrsV6.clear();
             for (int i = 0; info->h_addr_list[i] != nullptr; i++) {
                 p->second.AddrsV6.push_back(*(struct in6_addr*)(info->h_addr_list[i]));
-            } 
-        } else { 
+            }
+        } else {
             Y_FAIL("unknown address type in ares callback");
-        } 
-        resolved = TTimeKeeper::GetTime(); 
-        notfound = 0; 
-    } else { 
-        notfound = TTimeKeeper::GetTime(); 
-        resolved = 0; 
-    } 
+        }
+        resolved = TTimeKeeper::GetTime();
+        notfound = 0;
+    } else {
+        notfound = TTimeKeeper::GetTime();
+        resolved = 0;
+    }
     AtomicSet(inprogress, 0);
 void TDnsCache::GHBACallback(void* arg, int status, int, struct hostent* info) {
     THolder<TGHBAContext> ctx(static_cast<TGHBAContext*>(arg));
-    TGuard<TMutex> lock(ctx->Owner->CacheMtx); 
-    TAddrCache::iterator p = ctx->Owner->AddrCache.find(ctx->Addr); 
+    TGuard<TMutex> lock(ctx->Owner->CacheMtx);
+    TAddrCache::iterator p = ctx->Owner->AddrCache.find(ctx->Addr);
     Y_ASSERT(p != ctx->Owner->AddrCache.end());
-    if (status == ARES_SUCCESS) { 
-        p->second.Hostname = info->h_name; 
-        p->second.Resolved = TTimeKeeper::GetTime(); 
-        p->second.NotFound = 0; 
-    } else { 
-        p->second.NotFound = TTimeKeeper::GetTime(); 
-        p->second.Resolved = 0; 
-    } 
+    if (status == ARES_SUCCESS) {
+        p->second.Hostname = info->h_name;
+        p->second.Resolved = TTimeKeeper::GetTime();
+        p->second.NotFound = 0;
+    } else {
+        p->second.NotFound = TTimeKeeper::GetTime();
+        p->second.Resolved = 0;
+    }
     AtomicSet(p->second.InProgress, 0);
 TString TDnsCache::THost::AddrsV4ToString() const {
     TStringStream ss;

+ 69 - 69

@@ -1,57 +1,57 @@
-#pragma once 
+#pragma once
 #include <contrib/libs/c-ares/ares.h>
-#include <util/generic/map.h> 
-#include <util/generic/vector.h> 
-#include <util/network/address.h> 
-#include <util/system/mutex.h> 
+#include <util/generic/map.h>
+#include <util/generic/vector.h>
+#include <util/network/address.h>
+#include <util/system/mutex.h>
 #include <util/datetime/base.h>
-/** Asynchronous DNS resolver. 
- * 
- * This is NOT general purpose resolver! It is designed with very specific assumptions: 
- * 1) there is relatively small and rarely changed set of resolved names (like, server pool in cluster) 
- * 2) this names supposed to have addresses, absense of A record is equal to DNS error 
- * 3) most of the time IP addresses do not change 
- * 4) it's OK to return old IP address when DNS server not responding in time 
- */ 
-class TDnsCache { 
+/** Asynchronous DNS resolver.
+ *
+ * This is NOT general purpose resolver! It is designed with very specific assumptions:
+ * 1) there is relatively small and rarely changed set of resolved names (like, server pool in cluster)
+ * 2) this names supposed to have addresses, absense of A record is equal to DNS error
+ * 3) most of the time IP addresses do not change
+ * 4) it's OK to return old IP address when DNS server not responding in time
+ */
+class TDnsCache {
     TDnsCache(bool allowIpv4 = true, bool allowIpv6 = true, time_t entry_lifetime = 1800, time_t neg_lifetime = 1, ui32 request_timeout = 500000);
     TString GetHostByAddr(const NAddr::IRemoteAddr&);
     // ip in network byte order
     TIpHost Get(const TString& host);
-    /* use with AF_INET, AF_INET6 or AF_UNSPEC */ 
+    /* use with AF_INET, AF_INET6 or AF_UNSPEC */
     NAddr::IRemoteAddrPtr GetAddr(const TString& host,
                                   int family,
                                   TIpPort port = 0,
                                   bool cacheOnly = false);
     void GetAllAddresses(const TString& host, TVector<NAddr::IRemoteAddrPtr>&);
     void GetStats(ui64& a_cache_hits, ui64& a_cache_misses,
                   ui64& ptr_cache_hits, ui64& ptr_cache_misses);
     bool ValidateHName(const TString& host) const noexcept;
-    struct TGHBNContext { 
+    struct TGHBNContext {
         TDnsCache* Owner;
         TString Hostname;
-        int Family; 
-    }; 
-    struct TGHBAContext { 
+        int Family;
+    };
+    struct TGHBAContext {
         TDnsCache* Owner;
-        in6_addr Addr; 
-    }; 
+        in6_addr Addr;
+    };
     struct THost {
         THost() noexcept {
@@ -60,7 +60,7 @@ private:
         time_t ResolvedV4 = 0;
         time_t NotFoundV4 = 0;
         TAtomic InProgressV4 = 0;
         TVector<in6_addr> AddrsV6;
         time_t ResolvedV6 = 0;
         time_t NotFoundV6 = 0;
@@ -70,8 +70,8 @@ private:
         TString AddrsV6ToString() const;
         bool IsStale(int family, const TDnsCache* ctx) const noexcept;
-    }; 
+    };
     typedef TMap<TString, THost> THostCache;
     struct TAddr {
@@ -79,54 +79,54 @@ private:
         time_t Resolved = 0;
         time_t NotFound = 0;
         TAtomic InProgress = 0;
-    }; 
-    /* IRemoteAddr is annoingly hard to use, so I'll use in6_addr as key 
-     * and put v4 addrs in it. 
-     */ 
-    struct TAddrCmp { 
+    };
+    /* IRemoteAddr is annoingly hard to use, so I'll use in6_addr as key
+     * and put v4 addrs in it.
+     */
+    struct TAddrCmp {
         bool operator()(const in6_addr& left, const in6_addr& right) const {
-            for (size_t i = 0; i < sizeof(left); i++) { 
-                if (left.s6_addr[i] < right.s6_addr[i]) { 
-                    return true; 
-                } else if (left.s6_addr[i] > right.s6_addr[i]) { 
-                    return false; 
-                } 
-            } 
-            // equal 
-            return false; 
-        } 
-    }; 
+            for (size_t i = 0; i < sizeof(left); i++) {
+                if (left.s6_addr[i] < right.s6_addr[i]) {
+                    return true;
+                } else if (left.s6_addr[i] > right.s6_addr[i]) {
+                    return false;
+                }
+            }
+            // equal
+            return false;
+        }
+    };
     typedef TMap<in6_addr, TAddr, TAddrCmp> TAddrCache;
     const THost& Resolve(const TString&, int family, bool cacheOnly = false);
     const TAddr& ResolveAddr(const in6_addr&, int family);
     void WaitTask(TAtomic&);
     static void GHBNCallback(void* arg, int status, int timeouts,
                              struct hostent* info);
     static void GHBACallback(void* arg, int status, int timeouts,
                              struct hostent* info);
-    const time_t EntryLifetime; 
-    const time_t NegativeLifetime; 
+    const time_t EntryLifetime;
+    const time_t NegativeLifetime;
     const TDuration Timeout;
     const bool AllowIpV4;
     const bool AllowIpV6;
-    TMutex CacheMtx; 
-    THostCache HostCache; 
-    TAddrCache AddrCache; 
-    ui64 ACacheHits; 
-    ui64 ACacheMisses; 
-    ui64 PtrCacheHits; 
-    ui64 PtrCacheMisses; 
+    TMutex CacheMtx;
+    THostCache HostCache;
+    TAddrCache AddrCache;
+    ui64 ACacheHits;
+    ui64 ACacheMisses;
+    ui64 PtrCacheHits;
+    ui64 PtrCacheMisses;
     const static THost NullHost;
-    TMutex AresMtx; 
+    TMutex AresMtx;
     void* Channel;
     struct TAresLibInit {
@@ -145,4 +145,4 @@ private:
     static TAresLibInit InitAresLib;

+ 31 - 31

@@ -1,31 +1,31 @@
-#pragma once 
+#pragma once
 #include <util/datetime/base.h>
-#include <util/generic/singleton.h> 
+#include <util/generic/singleton.h>
 #include <util/string/cast.h>
-#include <util/system/thread.h> 
+#include <util/system/thread.h>
 #include <util/system/event.h>
 #include <util/system/env.h>
 #include <cstdlib>
-/* Keeps current time accurate up to 1/10 second */ 
-class TTimeKeeper { 
+/* Keeps current time accurate up to 1/10 second */
+class TTimeKeeper {
     static TInstant GetNow(void) {
         return TInstant::MicroSeconds(GetTime());
-    static time_t GetTime(void) { 
-        return Singleton<TTimeKeeper>()->CurrentTime.tv_sec; 
-    } 
+    static time_t GetTime(void) {
+        return Singleton<TTimeKeeper>()->CurrentTime.tv_sec;
+    }
     static const struct timeval& GetTimeval(void) {
-        return Singleton<TTimeKeeper>()->CurrentTime; 
-    } 
-    TTimeKeeper() 
+        return Singleton<TTimeKeeper>()->CurrentTime;
+    }
+    TTimeKeeper()
         : Thread(&TTimeKeeper::Worker, this)
         ConstTime = !!GetEnv("TEST_TIME");
@@ -40,31 +40,31 @@ public:
             gettimeofday(&CurrentTime, nullptr);
-    } 
-    ~TTimeKeeper() { 
+    }
+    ~TTimeKeeper() {
         if (!ConstTime) {
-    } 
-    static const ui32 UpdateInterval = 100000; 
-    struct timeval CurrentTime; 
+    }
+    static const ui32 UpdateInterval = 100000;
+    struct timeval CurrentTime;
     bool ConstTime;
     TSystemEvent Exit;
-    TThread Thread; 
+    TThread Thread;
     static void* Worker(void* arg) {
         TTimeKeeper* owner = static_cast<TTimeKeeper*>(arg);
         do {
-            /* Race condition may occur here but locking looks too expensive */ 
+            /* Race condition may occur here but locking looks too expensive */
             gettimeofday(&owner->CurrentTime, nullptr);
         } while (!owner->Exit.WaitT(TDuration::MicroSeconds(UpdateInterval)));
         return nullptr;
-    } 
+    }

+ 112 - 112

@@ -1,44 +1,44 @@
-#pragma once 
-#include <util/datetime/base.h> 
-#include <util/system/mutex.h> 
+#pragma once
+#include <util/datetime/base.h>
+#include <util/system/mutex.h>
 #include <util/system/hp_timer.h>
-/* Token bucket. 
- * Makes flow of *inflow* units per second in average, with up to *capacity* bursts. 
- * Do not use for STRICT flow control. 
- */ 
-/* samples: create and use quoter sending 1000 bytes per second on average, 
-   with up to 60 seconds quota buildup. 
-   TBucketQuoter quoter(1000, 60000, NULL, NULL, NULL); 
-   for (;;) { 
-      T *msg = get_message(); 
-      quoter.Sleep(); 
-      quoter.Use(msg->GetSize()); 
-      send_message(msg); 
-   } 
-   ---------------------------- 
-   TBucketQuoter quoter(1000, 60000, NULL, NULL, NULL); 
-   for (;;) { 
-      T *msg = get_message(); 
-      while (! quoter.IsAvail()) { 
-          // do something else 
-      } 
-      quoter.Use(msg->GetSize()); 
-      send_message(msg); 
-   } 
+/* Token bucket.
+ * Makes flow of *inflow* units per second in average, with up to *capacity* bursts.
+ * Do not use for STRICT flow control.
+ */
+/* samples: create and use quoter sending 1000 bytes per second on average,
+   with up to 60 seconds quota buildup.
+   TBucketQuoter quoter(1000, 60000, NULL, NULL, NULL);
+   for (;;) {
+      T *msg = get_message();
+      quoter.Sleep();
+      quoter.Use(msg->GetSize());
+      send_message(msg);
+   }
+   ----------------------------
+   TBucketQuoter quoter(1000, 60000, NULL, NULL, NULL);
+   for (;;) {
+      T *msg = get_message();
+      while (! quoter.IsAvail()) {
+          // do something else
+      }
+      quoter.Use(msg->GetSize());
+      send_message(msg);
+   }
 struct TInstantTimerMs {
     using TTime = TInstant;
     static constexpr ui64 Resolution = 1000ull; // milliseconds
@@ -69,8 +69,8 @@ struct THPTimerUs {
 template <typename StatCounter, typename Lock = TMutex, typename Timer = TInstantTimerMs>
-class TBucketQuoter { 
+class TBucketQuoter {
     using TTime = typename Timer::TTime;
     struct TResult {
@@ -79,43 +79,43 @@ public:
         ui64 Seqno;
-    /* fixed quota */ 
+    /* fixed quota */
     TBucketQuoter(ui64 inflow, ui64 capacity, StatCounter* msgPassed = nullptr,
                   StatCounter* bucketUnderflows = nullptr, StatCounter* tokensUsed = nullptr,
                   StatCounter* usecWaited = nullptr, bool fill = false, StatCounter* aggregateInflow = nullptr)
-        : MsgPassed(msgPassed) 
-        , BucketUnderflows(bucketUnderflows) 
-        , TokensUsed(tokensUsed) 
+        : MsgPassed(msgPassed)
+        , BucketUnderflows(bucketUnderflows)
+        , TokensUsed(tokensUsed)
         , UsecWaited(usecWaited)
         , AggregateInflow(aggregateInflow)
         , Bucket(fill ? capacity : 0)
         , LastAdd(Timer::Now())
-        , InflowTokensPerSecond(&FixedInflow) 
-        , BucketTokensCapacity(&FixedCapacity) 
-        , FixedInflow(inflow) 
-        , FixedCapacity(capacity) 
-    { 
-        /* no-op */ 
-    } 
-    /* adjustable quotas */ 
+        , InflowTokensPerSecond(&FixedInflow)
+        , BucketTokensCapacity(&FixedCapacity)
+        , FixedInflow(inflow)
+        , FixedCapacity(capacity)
+    {
+        /* no-op */
+    }
+    /* adjustable quotas */
     TBucketQuoter(TAtomic* inflow, TAtomic* capacity, StatCounter* msgPassed = nullptr,
                   StatCounter* bucketUnderflows = nullptr, StatCounter* tokensUsed = nullptr,
                   StatCounter* usecWaited = nullptr, bool fill = false, StatCounter* aggregateInflow = nullptr)
-        : MsgPassed(msgPassed) 
-        , BucketUnderflows(bucketUnderflows) 
-        , TokensUsed(tokensUsed) 
+        : MsgPassed(msgPassed)
+        , BucketUnderflows(bucketUnderflows)
+        , TokensUsed(tokensUsed)
         , UsecWaited(usecWaited)
         , AggregateInflow(aggregateInflow)
         , Bucket(fill ? AtomicGet(*capacity) : 0)
         , LastAdd(Timer::Now())
-        , InflowTokensPerSecond(inflow) 
-        , BucketTokensCapacity(capacity) 
-    { 
-        /* no-op */ 
-    } 
-    bool IsAvail() { 
+        , InflowTokensPerSecond(inflow)
+        , BucketTokensCapacity(capacity)
+    {
+        /* no-op */
+    }
+    bool IsAvail() {
         TGuard<Lock> g(BucketMutex);
         if (Bucket < 0) {
@@ -125,21 +125,21 @@ public:
         return (Bucket >= 0);
     bool IsAvail(TResult& res) {
         TGuard<Lock> g(BucketMutex);
         res.Before = Bucket;
-        FillBucket(); 
+        FillBucket();
         res.After = Bucket;
         res.Seqno = ++Seqno;
-        if (Bucket < 0) { 
-            if (BucketUnderflows) { 
-                (*BucketUnderflows)++; 
-            } 
-        } 
-        return (Bucket >= 0); 
-    } 
+        if (Bucket < 0) {
+            if (BucketUnderflows) {
+                (*BucketUnderflows)++;
+            }
+        }
+        return (Bucket >= 0);
+    }
     ui64 GetAvail() {
         TGuard<Lock> g(BucketMutex);
@@ -158,8 +158,8 @@ public:
     void Use(ui64 tokens, bool sleep = false) {
         TGuard<Lock> g(BucketMutex);
         UseNoLock(tokens, sleep);
-    } 
+    }
     void Use(ui64 tokens, TResult& res, bool sleep = false) {
         TGuard<Lock> g(BucketMutex);
         res.Before = Bucket;
@@ -167,11 +167,11 @@ public:
         res.After = Bucket;
         res.Seqno = ++Seqno;
     i64 UseAndFill(ui64 tokens) {
         TGuard<Lock> g(BucketMutex);
-        FillBucket(); 
+        FillBucket();
         return Bucket;
@@ -192,14 +192,14 @@ public:
         TGuard<Lock> g(BucketMutex);
-        if (Bucket >= 0) { 
-            return 0; 
-        } 
-        ui32 usec = (-Bucket * 1000000) / (*InflowTokensPerSecond); 
-        return usec; 
-    } 
+        if (Bucket >= 0) {
+            return 0;
+        }
+        ui32 usec = (-Bucket * 1000000) / (*InflowTokensPerSecond);
+        return usec;
+    }
     ui32 GetWaitTime(TResult& res) {
         TGuard<Lock> g(BucketMutex);
         res.Before = Bucket;
@@ -213,22 +213,22 @@ public:
         return usec;
-    void Sleep() { 
+    void Sleep() {
         while (!IsAvail()) {
-            ui32 delay = GetWaitTime(); 
-            if (delay != 0) { 
-                usleep(delay); 
+            ui32 delay = GetWaitTime();
+            if (delay != 0) {
+                usleep(delay);
                 if (UsecWaited) {
                     (*UsecWaited) += delay;
-            } 
-        } 
-    } 
-    void FillBucket() { 
+            }
+        }
+    }
+    void FillBucket() {
         TTime now = Timer::Now();
         ui64 elapsed = Timer::Duration(LastAdd, now);
         if (*InflowTokensPerSecond * elapsed >= Timer::Resolution) {
             ui64 inflow = *InflowTokensPerSecond * elapsed / Timer::Resolution;
@@ -236,14 +236,14 @@ private:
                 *AggregateInflow += inflow;
             Bucket += inflow;
-            if (Bucket > *BucketTokensCapacity) { 
-                Bucket = *BucketTokensCapacity; 
-            } 
-            LastAdd = now; 
-        } 
-    } 
+            if (Bucket > *BucketTokensCapacity) {
+                Bucket = *BucketTokensCapacity;
+            }
+            LastAdd = now;
+        }
+    }
     void UseNoLock(ui64 tokens, bool sleep = false) {
         if (sleep)
@@ -268,14 +268,14 @@ private:
     StatCounter* TokensUsed;
     StatCounter* UsecWaited;
     StatCounter* AggregateInflow;
-    i64 Bucket; 
+    i64 Bucket;
     TTime LastAdd;
     Lock BucketMutex;
     ui64 Seqno = 0;
     TAtomic* InflowTokensPerSecond;
     TAtomic* BucketTokensCapacity;
-    TAtomic FixedInflow; 
-    TAtomic FixedCapacity; 
+    TAtomic FixedInflow;
+    TAtomic FixedCapacity;

+ 3 - 3

@@ -49,10 +49,10 @@ extern "C" void* malloc(size_t size) {
     return LFAlloc(size);
-extern "C" void* valloc(size_t size) { 
+extern "C" void* valloc(size_t size) {
     return LFVAlloc(size);
 extern "C" int posix_memalign(void** memptr, size_t alignment, size_t size) {
     return LFPosixMemalign(memptr, alignment, size);

+ 2 - 2

@@ -163,8 +163,8 @@ namespace NBus {
         } else {
             GetHeader()->FlagsInternal &= ~(MESSAGE_COMPRESS_INTERNAL);
-    } 
+    }
     void TBusMessage::SetCompressedResponse(bool v) {
         if (v) {
             GetHeader()->FlagsInternal |= MESSAGE_COMPRESS_RESPONSE;

+ 3 - 3

@@ -51,7 +51,7 @@ SRCS(
@@ -63,6 +63,6 @@ PEERDIR(

+ 1 - 1

@@ -98,7 +98,7 @@ namespace NBus {
             return NCodecs::ICodec::GetInstance("snappy");
     class TBusSyncSourceSession: public TAtomicRefCount<TBusSyncSourceSession> {
         friend class TBusMessageQueue;

+ 1 - 1

@@ -27,7 +27,7 @@ namespace NMonitoring {
 #define OUTPUT_NAMED_COUNTER(var, name) out << name << ": \t" << var << NMonitoring::PrettyNum(var, prettyBuf, 32) << '\n'
-#define OUTPUT_COUNTER(var) OUTPUT_NAMED_COUNTER(var, #var); 
     char* PrettyNumShort(i64 val, char* buf, size_t size);
     char* PrettyNum(i64 val, char* buf, size_t size);

+ 7 - 7

@@ -18,15 +18,15 @@ inline void Shuffle(TRandIter begin, TRandIterEnd end) {
         Shuffle(begin, end, TFastRng64(Seed()));
-template <typename TRandIter, typename TRandIterEnd, typename TRandGen> 
+template <typename TRandIter, typename TRandIterEnd, typename TRandGen>
 inline void Shuffle(TRandIter begin, TRandIterEnd end, TRandGen&& gen) {
-    const size_t sz = end - begin; 
-    for (size_t i = 1; i < sz; ++i) { 
+    const size_t sz = end - begin;
+    for (size_t i = 1; i < sz; ++i) {
         DoSwap(*(begin + i), *(begin + gen.Uniform(i + 1)));
-    } 
+    }
 template <typename TRange>
 inline void ShuffleRange(TRange& range) {

Some files were not shown because too many files changed in this diff