#pragma once #if !defined(INCLUDE_FUTURE_INL_H) #error "you should never include future-inl.h directly" #endif // INCLUDE_FUTURE_INL_H namespace NThreading { namespace NImpl { //////////////////////////////////////////////////////////////////////////////// template using TCallback = std::function&)>; template using TCallbackList = TVector>; // TODO: small vector //////////////////////////////////////////////////////////////////////////////// [[noreturn]] void ThrowFutureException(TStringBuf message, const TSourceLocation& source); enum class TError { Error }; template class TFutureState: public TAtomicRefCount> { enum { NotReady, ExceptionSet, ValueMoved, // keep the ordering of this and following values ValueSet, ValueRead, }; private: mutable TAtomic State; TAdaptiveLock StateLock; TCallbackList Callbacks; mutable THolder ReadyEvent; std::exception_ptr Exception; union { char NullValue; T Value; }; void AccessValue(TDuration timeout, int acquireState) const { TAtomicBase state = AtomicGet(State); if (Y_UNLIKELY(state == NotReady)) { if (timeout == TDuration::Zero()) { ::NThreading::NImpl::ThrowFutureException("value not set"sv, __LOCATION__); } if (!Wait(timeout)) { ::NThreading::NImpl::ThrowFutureException("wait timeout"sv, __LOCATION__); } state = AtomicGet(State); } TryRethrowWithState(state); switch (AtomicGetAndCas(&State, acquireState, ValueSet)) { case ValueSet: break; case ValueRead: if (acquireState != ValueRead) { ::NThreading::NImpl::ThrowFutureException("value being read"sv, __LOCATION__); } break; case ValueMoved: ::NThreading::NImpl::ThrowFutureException("value was moved"sv, __LOCATION__); default: Y_ASSERT(state == ValueSet); } } public: TFutureState() : State(NotReady) , NullValue(0) { } template TFutureState(TT&& value) : State(ValueSet) , Value(std::forward(value)) { } TFutureState(std::exception_ptr exception, TError) : State(ExceptionSet) , Exception(std::move(exception)) , NullValue(0) { } ~TFutureState() { if (State >= ValueMoved) { // ValueMoved, ValueSet, ValueRead Value.~T(); } } bool HasValue() const { return AtomicGet(State) >= ValueMoved; // ValueMoved, ValueSet, ValueRead } void TryRethrow() const { TAtomicBase state = AtomicGet(State); TryRethrowWithState(state); } bool HasException() const { return AtomicGet(State) == ExceptionSet; } const T& GetValue(TDuration timeout = TDuration::Zero()) const { AccessValue(timeout, ValueRead); return Value; } T ExtractValue(TDuration timeout = TDuration::Zero()) { AccessValue(timeout, ValueMoved); return std::move(Value); } template void SetValue(TT&& value) { bool success = TrySetValue(std::forward(value)); if (Y_UNLIKELY(!success)) { ::NThreading::NImpl::ThrowFutureException("value already set"sv, __LOCATION__); } } template bool TrySetValue(TT&& value) { TSystemEvent* readyEvent = nullptr; TCallbackList callbacks; with_lock (StateLock) { TAtomicBase state = AtomicGet(State); if (Y_UNLIKELY(state != NotReady)) { return false; } new (&Value) T(std::forward(value)); readyEvent = ReadyEvent.Get(); callbacks = std::move(Callbacks); AtomicSet(State, ValueSet); } if (readyEvent) { readyEvent->Signal(); } if (callbacks) { TFuture temp(this); for (auto& callback : callbacks) { callback(temp); } } return true; } void SetException(std::exception_ptr e) { bool success = TrySetException(std::move(e)); if (Y_UNLIKELY(!success)) { ::NThreading::NImpl::ThrowFutureException("value already set"sv, __LOCATION__); } } bool TrySetException(std::exception_ptr e) { TSystemEvent* readyEvent; TCallbackList callbacks; with_lock (StateLock) { TAtomicBase state = AtomicGet(State); if (Y_UNLIKELY(state != NotReady)) { return false; } Exception = std::move(e); readyEvent = ReadyEvent.Get(); callbacks = std::move(Callbacks); AtomicSet(State, ExceptionSet); } if (readyEvent) { readyEvent->Signal(); } if (callbacks) { TFuture temp(this); for (auto& callback : callbacks) { callback(temp); } } return true; } template bool Subscribe(F&& func) { with_lock (StateLock) { TAtomicBase state = AtomicGet(State); if (state == NotReady) { Callbacks.emplace_back(std::forward(func)); return true; } } return false; } void Wait() const { Wait(TInstant::Max()); } bool Wait(TDuration timeout) const { return Wait(timeout.ToDeadLine()); } bool Wait(TInstant deadline) const { TSystemEvent* readyEvent = nullptr; with_lock (StateLock) { TAtomicBase state = AtomicGet(State); if (state != NotReady) { return true; } if (!ReadyEvent) { ReadyEvent.Reset(new TSystemEvent()); } readyEvent = ReadyEvent.Get(); } Y_ASSERT(readyEvent); return readyEvent->WaitD(deadline); } void TryRethrowWithState(TAtomicBase state) const { if (Y_UNLIKELY(state == ExceptionSet)) { Y_ASSERT(Exception); std::rethrow_exception(Exception); } } }; //////////////////////////////////////////////////////////////////////////////// template <> class TFutureState: public TAtomicRefCount> { enum { NotReady, ValueSet, ExceptionSet, }; private: TAtomic State; TAdaptiveLock StateLock; TCallbackList Callbacks; mutable THolder ReadyEvent; std::exception_ptr Exception; public: TFutureState(bool valueSet = false) : State(valueSet ? ValueSet : NotReady) { } TFutureState(std::exception_ptr exception, TError) : State(ExceptionSet) , Exception(std::move(exception)) { } bool HasValue() const { return AtomicGet(State) == ValueSet; } void TryRethrow() const { TAtomicBase state = AtomicGet(State); TryRethrowWithState(state); } bool HasException() const { return AtomicGet(State) == ExceptionSet; } void GetValue(TDuration timeout = TDuration::Zero()) const { TAtomicBase state = AtomicGet(State); if (Y_UNLIKELY(state == NotReady)) { if (timeout == TDuration::Zero()) { ::NThreading::NImpl::ThrowFutureException("value not set"sv, __LOCATION__); } if (!Wait(timeout)) { ::NThreading::NImpl::ThrowFutureException("wait timeout"sv, __LOCATION__); } state = AtomicGet(State); } TryRethrowWithState(state); Y_ASSERT(state == ValueSet); } void SetValue() { bool success = TrySetValue(); if (Y_UNLIKELY(!success)) { ::NThreading::NImpl::ThrowFutureException("value already set"sv, __LOCATION__); } } bool TrySetValue() { TSystemEvent* readyEvent = nullptr; TCallbackList callbacks; with_lock (StateLock) { TAtomicBase state = AtomicGet(State); if (Y_UNLIKELY(state != NotReady)) { return false; } readyEvent = ReadyEvent.Get(); callbacks = std::move(Callbacks); AtomicSet(State, ValueSet); } if (readyEvent) { readyEvent->Signal(); } if (callbacks) { TFuture temp(this); for (auto& callback : callbacks) { callback(temp); } } return true; } void SetException(std::exception_ptr e) { bool success = TrySetException(std::move(e)); if (Y_UNLIKELY(!success)) { ::NThreading::NImpl::ThrowFutureException("value already set"sv, __LOCATION__); } } bool TrySetException(std::exception_ptr e) { TSystemEvent* readyEvent = nullptr; TCallbackList callbacks; with_lock (StateLock) { TAtomicBase state = AtomicGet(State); if (Y_UNLIKELY(state != NotReady)) { return false; } Exception = std::move(e); readyEvent = ReadyEvent.Get(); callbacks = std::move(Callbacks); AtomicSet(State, ExceptionSet); } if (readyEvent) { readyEvent->Signal(); } if (callbacks) { TFuture temp(this); for (auto& callback : callbacks) { callback(temp); } } return true; } template bool Subscribe(F&& func) { with_lock (StateLock) { TAtomicBase state = AtomicGet(State); if (state == NotReady) { Callbacks.emplace_back(std::forward(func)); return true; } } return false; } void Wait() const { Wait(TInstant::Max()); } bool Wait(TDuration timeout) const { return Wait(timeout.ToDeadLine()); } bool Wait(TInstant deadline) const { TSystemEvent* readyEvent = nullptr; with_lock (StateLock) { TAtomicBase state = AtomicGet(State); if (state != NotReady) { return true; } if (!ReadyEvent) { ReadyEvent.Reset(new TSystemEvent()); } readyEvent = ReadyEvent.Get(); } Y_ASSERT(readyEvent); return readyEvent->WaitD(deadline); } void TryRethrowWithState(TAtomicBase state) const { if (Y_UNLIKELY(state == ExceptionSet)) { Y_ASSERT(Exception); std::rethrow_exception(Exception); } } }; //////////////////////////////////////////////////////////////////////////////// template inline void SetValueImpl(TPromise& promise, const T& value) { promise.SetValue(value); } template inline void SetValueImpl(TPromise& promise, T&& value) { promise.SetValue(std::move(value)); } template inline void SetValueImpl(TPromise& promise, const TFuture& future, std::enable_if_t::value, bool> = false) { future.Subscribe([=](const TFuture& f) mutable { T const* value; try { value = &f.GetValue(); } catch (...) { promise.SetException(std::current_exception()); return; } promise.SetValue(*value); }); } template inline void SetValueImpl(TPromise& promise, const TFuture& future) { future.Subscribe([=](const TFuture& f) mutable { try { f.TryRethrow(); } catch (...) { promise.SetException(std::current_exception()); return; } promise.SetValue(); }); } template inline void SetValue(TPromise& promise, F&& func) { try { SetValueImpl(promise, func()); } catch (...) { const bool success = promise.TrySetException(std::current_exception()); if (Y_UNLIKELY(!success)) { throw; } } } template inline void SetValue(TPromise& promise, F&& func, std::enable_if_t>::value, bool> = false) { try { func(); } catch (...) { promise.SetException(std::current_exception()); return; } promise.SetValue(); } } //////////////////////////////////////////////////////////////////////////////// class TFutureStateId { private: const void* Id; public: template explicit TFutureStateId(const NImpl::TFutureState& state) : Id(&state) { } const void* Value() const noexcept { return Id; } }; inline bool operator==(const TFutureStateId& l, const TFutureStateId& r) { return l.Value() == r.Value(); } inline bool operator!=(const TFutureStateId& l, const TFutureStateId& r) { return !(l == r); } //////////////////////////////////////////////////////////////////////////////// template inline TFuture::TFuture(const TIntrusivePtr& state) noexcept : State(state) { } template inline void TFuture::Swap(TFuture& other) { State.Swap(other.State); } template inline bool TFuture::HasValue() const { return State && State->HasValue(); } template inline const T& TFuture::GetValue(TDuration timeout) const { EnsureInitialized(); return State->GetValue(timeout); } template inline T TFuture::ExtractValue(TDuration timeout) { EnsureInitialized(); return State->ExtractValue(timeout); } template inline const T& TFuture::GetValueSync() const { return GetValue(TDuration::Max()); } template inline T TFuture::ExtractValueSync() { return ExtractValue(TDuration::Max()); } template inline void TFuture::TryRethrow() const { if (State) { State->TryRethrow(); } } template inline bool TFuture::HasException() const { return State && State->HasException(); } template inline void TFuture::Wait() const { EnsureInitialized(); return State->Wait(); } template inline bool TFuture::Wait(TDuration timeout) const { EnsureInitialized(); return State->Wait(timeout); } template inline bool TFuture::Wait(TInstant deadline) const { EnsureInitialized(); return State->Wait(deadline); } template template inline const TFuture& TFuture::Subscribe(F&& func) const { EnsureInitialized(); if (!State->Subscribe(std::forward(func))) { func(*this); } return *this; } template template inline const TFuture& TFuture::NoexceptSubscribe(F&& func) const noexcept { return Subscribe(std::forward(func)); } template template inline TFuture>> TFuture::Apply(F&& func) const { auto promise = NewPromise>>(); Subscribe([promise, func = std::forward(func)](const TFuture& future) mutable { NImpl::SetValue(promise, [&]() { return func(future); }); }); return promise; } template inline TFuture TFuture::IgnoreResult() const { auto promise = NewPromise(); Subscribe([=](const TFuture& future) mutable { NImpl::SetValueImpl(promise, future); }); return promise; } template inline bool TFuture::Initialized() const { return bool(State); } template inline TMaybe TFuture::StateId() const noexcept { return State != nullptr ? MakeMaybe(*State) : Nothing(); } template inline void TFuture::EnsureInitialized() const { if (!State) { ::NThreading::NImpl::ThrowFutureException("state not initialized"sv, __LOCATION__); } } //////////////////////////////////////////////////////////////////////////////// inline TFuture::TFuture(const TIntrusivePtr& state) noexcept : State(state) { } inline void TFuture::Swap(TFuture& other) { State.Swap(other.State); } inline bool TFuture::HasValue() const { return State && State->HasValue(); } inline void TFuture::GetValue(TDuration timeout) const { EnsureInitialized(); State->GetValue(timeout); } inline void TFuture::GetValueSync() const { GetValue(TDuration::Max()); } inline void TFuture::TryRethrow() const { if (State) { State->TryRethrow(); } } inline bool TFuture::HasException() const { return State && State->HasException(); } inline void TFuture::Wait() const { EnsureInitialized(); return State->Wait(); } inline bool TFuture::Wait(TDuration timeout) const { EnsureInitialized(); return State->Wait(timeout); } inline bool TFuture::Wait(TInstant deadline) const { EnsureInitialized(); return State->Wait(deadline); } template inline const TFuture& TFuture::Subscribe(F&& func) const { EnsureInitialized(); if (!State->Subscribe(std::forward(func))) { func(*this); } return *this; } template inline const TFuture& TFuture::NoexceptSubscribe(F&& func) const noexcept { return Subscribe(std::forward(func)); } template inline TFuture>> TFuture::Apply(F&& func) const { auto promise = NewPromise>>(); Subscribe([promise, func = std::forward(func)](const TFuture& future) mutable { NImpl::SetValue(promise, [&]() { return func(future); }); }); return promise; } template inline TFuture TFuture::Return(const R& value) const { auto promise = NewPromise(); Subscribe([=](const TFuture& future) mutable { try { future.TryRethrow(); } catch (...) { promise.SetException(std::current_exception()); return; } promise.SetValue(value); }); return promise; } inline bool TFuture::Initialized() const { return bool(State); } inline TMaybe TFuture::StateId() const noexcept { return State != nullptr ? MakeMaybe(*State) : Nothing(); } inline void TFuture::EnsureInitialized() const { if (!State) { ::NThreading::NImpl::ThrowFutureException("state not initialized"sv, __LOCATION__); } } //////////////////////////////////////////////////////////////////////////////// template inline TPromise::TPromise(const TIntrusivePtr& state) noexcept : State(state) { } template inline void TPromise::Swap(TPromise& other) { State.Swap(other.State); } template inline const T& TPromise::GetValue() const { EnsureInitialized(); return State->GetValue(); } template inline T TPromise::ExtractValue() { EnsureInitialized(); return State->ExtractValue(); } template inline bool TPromise::HasValue() const { return State && State->HasValue(); } template inline void TPromise::SetValue(const T& value) { EnsureInitialized(); State->SetValue(value); } template inline void TPromise::SetValue(T&& value) { EnsureInitialized(); State->SetValue(std::move(value)); } template inline bool TPromise::TrySetValue(const T& value) { EnsureInitialized(); return State->TrySetValue(value); } template inline bool TPromise::TrySetValue(T&& value) { EnsureInitialized(); return State->TrySetValue(std::move(value)); } template inline void TPromise::TryRethrow() const { if (State) { State->TryRethrow(); } } template inline bool TPromise::HasException() const { return State && State->HasException(); } template inline void TPromise::SetException(const TString& e) { EnsureInitialized(); State->SetException(std::make_exception_ptr(yexception() << e)); } template inline void TPromise::SetException(std::exception_ptr e) { EnsureInitialized(); State->SetException(std::move(e)); } template inline bool TPromise::TrySetException(std::exception_ptr e) { EnsureInitialized(); return State->TrySetException(std::move(e)); } template inline TFuture TPromise::GetFuture() const { EnsureInitialized(); return TFuture(State); } template inline TPromise::operator TFuture() const { return GetFuture(); } template inline bool TPromise::Initialized() const { return bool(State); } template inline void TPromise::EnsureInitialized() const { if (!State) { ::NThreading::NImpl::ThrowFutureException("state not initialized"sv, __LOCATION__); } } //////////////////////////////////////////////////////////////////////////////// inline TPromise::TPromise(const TIntrusivePtr& state) noexcept : State(state) { } inline void TPromise::Swap(TPromise& other) { State.Swap(other.State); } inline void TPromise::GetValue() const { EnsureInitialized(); State->GetValue(); } inline bool TPromise::HasValue() const { return State && State->HasValue(); } inline void TPromise::SetValue() { EnsureInitialized(); State->SetValue(); } inline bool TPromise::TrySetValue() { EnsureInitialized(); return State->TrySetValue(); } inline void TPromise::TryRethrow() const { if(State) { State->TryRethrow(); } } inline bool TPromise::HasException() const { return State && State->HasException(); } inline void TPromise::SetException(const TString& e) { EnsureInitialized(); State->SetException(std::make_exception_ptr(yexception() << e)); } inline void TPromise::SetException(std::exception_ptr e) { EnsureInitialized(); State->SetException(std::move(e)); } inline bool TPromise::TrySetException(std::exception_ptr e) { EnsureInitialized(); return State->TrySetException(std::move(e)); } inline TFuture TPromise::GetFuture() const { EnsureInitialized(); return TFuture(State); } inline TPromise::operator TFuture() const { return GetFuture(); } inline bool TPromise::Initialized() const { return bool(State); } inline void TPromise::EnsureInitialized() const { if (!State) { ::NThreading::NImpl::ThrowFutureException("state not initialized"sv, __LOCATION__); } } //////////////////////////////////////////////////////////////////////////////// template inline TPromise NewPromise() { return {new NImpl::TFutureState()}; } inline TPromise NewPromise() { return {new NImpl::TFutureState()}; } template inline TFuture MakeFuture(const T& value) { return {new NImpl::TFutureState(value)}; } template inline TFuture> MakeFuture(T&& value) { return {new NImpl::TFutureState>(std::forward(value))}; } template inline TFuture MakeFuture() { struct TCache { TFuture Instance{new NImpl::TFutureState(Default())}; TCache() { // Immediately advance state from ValueSet to ValueRead. // This should prevent corrupting shared value with an ExtractValue() call. Y_UNUSED(Instance.GetValue()); } }; return Singleton()->Instance; } template inline TFuture MakeErrorFuture(std::exception_ptr exception) { return {new NImpl::TFutureState(std::move(exception), NImpl::TError::Error)}; } inline TFuture MakeFuture() { struct TCache { TFuture Instance{new NImpl::TFutureState(true)}; }; return Singleton()->Instance; } }