Intricacies of C++ Callbacks
Bryan Zimmerman
Expert modern C++ 23+ low latency HPC Software, Hardware, FPGA, and RF Engineer, Consultant, BAZ Innovations, LLC Owner
I. Overview
Recently I have been working on a C++ library which wraps a non HFT third party financial asynchronous API from a broker. Most of the API's adhere to the following pattern:
void requestData(int id, configuration parameters);
void dataPart1(int id, some data);
void dataPart2(int id, some data);
...
void dataPartN(int id, some data);
void dataEnd(int id); // usually not present if not multiple data return calls
void cancelDataRequest(int id); // does not exist in all group calls
void error(int id, error info);
Each id must be unique and there can be many outstanding requests for the same data with different configuration parameters and id's which will complete in a random order due to the asynchronous nature. The id is easily guaranteed unique with a global atomic int.
This lends itself well to encapsulation into a class to handle the verification of the configuration parameters, the storage of the data, and the various synchronizations. An example of this type of pattern is as follows:
class RequestSomeData
{
public:
// Define enumerations
enum class Wait : uint8_t
{
Timeout,
Errored,
Completed,
};
// define constructors/destructors
explicit RequestSomeData(Configuration Parameters);
~RequestSomeData();
// Prevent copying/moving
RequestSomeData(const RequestSomeData&) = delete;
RequestSomeData(RequestSomeData&&) noexcept = delete;
RequestSomeData& operator=(const RequestSomeData&) = delete;
RequestSomeData& operator=(RequestSomeData&&) noexcept = delete;
// Interface functions
[[nodiscard]] const Data& data() const noexcept;
/// wait for the request to complete, error, or update
template<typename Rep = int64_t, typename Period = std::ratio<1>>
Wait wait(const std::chrono::duration<Rep, Period>& time = std::chrono::seconds{ 86400 }) noexcept;
private:
// Callback functions
void dataPart1(int id, some data);
void dataPart2(int id, some data);
...
void dataPartN(int id, some data);
// Helper Functions
/// Send the command to get the data
void sendCommand();
/// Send the cancel command
void cancelCommand();
/// register callbacks for this data request
void registerCallbacks();
/// unregister callbacks for this data request
void unregisterCallbacks();
// member variables
mutable std::shared_mutex m_DataLock;
DataType m_Data;
};
This article is not about the configuration of this class but about the issues that can arise from C++ callbacks many of which are the same as pub/sub type issues.
II. First Iteration
As a first attempt let's look at how you might call a bunch of callbacks associated with an event. From now on we will ignore the id type for simplification. As a first attempt something like this may be produced:
std::vector<std::function<void()>> m_Callbacks;
std::mutex m_CallbacksLock;
// add callback
void addCallback(std::function<void()> callback)
{
std::unique_lock lock{m_CallbacksLock};
m_Callbacks.emplace_back(std::move(callback));
}
// remove callback
bool removeCallback(const std::function<void()>& callback)
{
std::unique_lock lock{m_CallbacksLock};
for(auto itr{std::cbegin(m_Callbacks)}; itr != std::cend(m_Callbacks); ++itr)
{
if (callback==*itr)
{
m_Callbacks.erase(itr);
return true;
}
}
return false;
}
// call callbacks
void callCallbacks(received data)
{
std::unique_lock lock{m_CallbacksLock);
for(auto& callback : m_Callbacks)
{
callback(received data);
}
}
While this looks perfectly safe there is one large issue which is that the locks on the callbacks is held during all the time consuming invocations which could represent the majority of the lock hold time and block threads trying to add or remove callbacks. Another issue here is that the mutex prevents the invocation of the callbacks for a different set of configuration parameters at the same time limiting the asynchronous nature.
III. Improve Callbacks Lock Held
In an attempt to improve on the lock hold duration and allow multiple simultaneous invocations the following might be attempted:
std::vector<std::function<void()>> m_Callbacks;
std::shared_mutex m_CallbacksLock;
// add callback
void addCallback(std::function<void()> callback)
{
std::unique_lock lock{m_CallbacksLock};
m_Callbacks.emplace_back(std::move(callback));
}
// remove callback
bool removeCallback(const std::function<void()>& callback)
{
std::unique_lock lock{m_CallbacksLock};
for(auto itr{std::cbegin(m_Callbacks)}; itr != std::cend(m_Callbacks); ++itr)
{
if (callback==*itr)
{
m_Callbacks.erase(itr);
return true;
}
}
return false;
}
// call callbacks
void callCallbacks(received data)
{
std::vector<std::function<void()>> callbacks;
{
std::shared_lock lock{m_CallbacksLock);
callbacks.reserve(m_Callbacks.size());
for(const auto& callback : m_Callbacks)
{
callbacks.emplace_back(callback);
}
}
for(auto& callback : callbacks)
{
callback(received data);
}
}
So we make a copy of the callbacks under the lock before invoking them which should be much faster than the actual invocations and then invoke them out of the lock. We also use a shared mutex which allows multiple invocations with different parameters simultaneously. While this greatly improves the lock hold time, it introduces a subtle race condition where the callback may no longer be valid when invoked due to it having been removed.
领英推荐
IV. Improve On Race Condition
We can improve on the callback removal race condition by changing the std::function to std::shared_ptr<std::function> and using std::weak_ptr. An example is as follows:
std::vector<std::shared_ptr<std::function<void()>>> m_Callbacks;
std::shared_mutex m_CallbacksLock;
// call callbacks
void callCallbacks(received data)
{
std::vector<std::weak_ptr<std::function<void()>>> callbacks;
{
std::shared__lock lock{m_CallbacksLock);
callbacks.reserve(m_Callbacks.size());
for(const auto& callback : m_Callbacks)
{
callbacks.emplace_back(callback);
}
}
for(auto& wcallback : callbacks)
{
if (auto callback = wcallback.lock())
{
(*callback)(received data);
}
}
}
If the callback is removed before being called then the weak pointer cannot be converted to a shared pointer and thus not be called. However, there is an even more subtle race condition here where the functor itself or the object containing the functor is deleted during the callback which will result in undefined behavior most likely a crash.
V. Callback Wrapper
We can overcome the problem with the shared pointer mentioned above by creating a callback wrapper and storing a shared pointer to the callback wrapper instead of to the functor directly.
namespace CallbackWrapperDetail
{
/// Invoke return implementation helper
/// @tparam ReturnType the return type of the callback
template<typename ReturnType>
struct invoke_return_impl
{
// if void return type use bool; otherwise std::optional<ReturnType>
using type = std::conditional_t<std::is_same_v<ReturnType, void>, bool, std::optional<ReturnType>>;
};
/// Get the invoke return type which changes if the return type of the callback is void or not
/// @tparam CallbackType the callback type
/// @tparam ArgsTypes the arguments types to the callback
template<typename CallbackType, typename... ArgsTypes>
struct invoke_return
{
using type = typename invoke_return_impl<std::invoke_result_t<CallbackType, ArgsTypes...>>::type;
};
/// Helper function to return the type
/// @tparam CallbackType the callback type
/// @tparam ArgsTypes the arguments types to the callback
template<typename CallbackType, typename... ArgsTypes>
using invoke_return_t = typename invoke_return<CallbackType, ArgsTypes...>::type;
}
/// This is a callback Wrapper to avoid lifetime issues
/// @tparam CallbackType the type of the callback
template<typename CallbackType>
class CallbackWrapper
{
public:
CallbackWrapper(CallbackType callback);
~CallbackWrapper() = default;
// Prevent copying
CallbackWrapper(const CallbackWrapper&) = delete;
CallbackWrapper(CallbackWrapper&&) noexcept = default;
CallbackWrapper& operator=(const CallbackWrapper&) = delete;
CallbackWrapper& operator=(CallbackWrapper&&) noexcept = default;
// interface functions
/// destroy the ability for this callback to be invoked
/// @tparm Rep the wait time count type
/// @tparam Period the wait time tick period
/// @param the wait time duration
/// @return returns true if successful before timeout
template<typename Rep = int64_t, typename Period = std::ratio<1>>
[[nodiscard]] bool destroy(const std::chrono::duration<Rep, Period>& time = std::chrono::seconds{ 86400 }) noexcept
{
// we destroy invocation by marking unacquirable
const auto startTime{ std::chrono::steady_clock::now() };
while (!acquire())
{
const auto currentTime{ std::chrono::steady_clock::now() };
const auto elapsed{ currentTime - startTime };
if (currentTime - startTime > time)
{
// timeout
return false;
}
std::this_thread::sleep_for(std::chrono::milliseconds{ 10 });
}
return true;
}
/// Invoke the call back with the given parameters
/// @tparam ArgsTypes the argument types for the callback
/// @tparam ReturnType the return type of the invoke function
/// @param args the arguments to pass to the callback
/// @return the return type of the callback
template<typename... ArgsTypes, typename ReturnType = CallbackWrapperDetail::invoke_return_t<CallbackType, ArgsTypes...>>
[[nodiscard]] ReturnType invoke(ArgsTypes... args)
{
constexpr bool voidReturn{ std::is_same_v<ReturnType, bool> };
if constexpr (voidReturn)
{
// we invoke a function by acquire it. If it is unacquirable then the function has most likely gone out of scope
if (!acquire())
{
return false;
}
// invoke the function
m_Callback(std::forward<ArgsTypes>(args)...);
// release
if (!release())
{
// should not happen
assert(false);
}
return true;
}
else
{
// we invoke a function by acquire it. If it is unacquirable then the function has most likely gone out of scope
if (!acquire())
{
return ReturnType{};
}
// invoke the function
ReturnType result{ m_Callback(std::forward<ArgsTypes>(args)...) };
// release
if (!release())
{
// should not happen
assert(false);
}
return result;
}
private:
// helper functions
/// Whether the callback was acquired or not
/// @returns returns true if callback acquired
[[nodiscard]] bool acquire() noexcept
{
bool expected{ false };
const bool exchanged{ std::atomic_compare_exchange_strong(&m_Acquired, &expected, true) };
return exchanged;
}
/// Whether the callback was released or not
/// @returns returns true if callback released
[[nodiscard]] bool release() noexcept
{
bool expected{ true };
const bool exchanged{ std::atomic_compare_exchange_strong(&m_Acquired, &expected, false) };
return exchanged;
}
// member variables
mutable std::atomic<bool> m_Acquired;
CallbackType m_Callback;
};
The interesting part of this is the atomic bool which is acquired before invoking the callback and blocks the destroy if in a callback so that it cannot be unregistered, i.e. removed from the callback list.
With this new helper class and already discussed optimizations, the invocation of callbacks now becomes as follows:
template<typename ContainerType, typename... Args>
[[nodiscard]] size_t TWSResponse::invokeCallbacks(ContainerType& cbContainer, Args... args)
{
using callback_type = decltype(cbContainer.callbacks)::mapped_type::element_type;
std::vector<std::pair<callbackId_type, std::weak_ptr<callback_type>>> lockFreeCallbacks;
// get weak pointers to the callbacks
{
std::shared_lock readLock{ cbContainer.lock };
const auto& callbacks{ cbContainer.callbacks };
lockFreeCallbacks.reserve(callbacks.size());
for (auto& [id, pCallback] : callbacks)
{
lockFreeCallbacks.emplace_back(id, pCallback);
}
}
// invoke callbacks outside the lock
// Note: In future may throw this to a thread
size_t numInvoked{ 0U };
for (auto& [id, wpCallback] : lockFreeCallbacks)
{
// try to acquire shared pointer to guarantee callback still valid
const auto pCallback{ wpCallback.lock() };
if (pCallback)
{
if (pCallback->invoke(id, std::forward<Args>(args)...))
{
++numInvoked;
}
}
}
return numInvoked;
}
And the corresponding register and unregister become the as follows:
template<typename ContainerType, typename CallbackType>
[[nodiscard]] auto TWSResponse::registerCallback(ContainerType& cbContainer, CallbackType&& callback) -> callbackId_type
{
// add the callback
std::unique_lock writeLock{ cbContainer.lock };
const auto id{ nextCallbackId(cbContainer.callbackId) };
const auto [itr, success]
{ cbContainer.callbacks.try_emplace(id,
std::make_shared<CallbackWrapper<CallbackType>>(std::forward<CallbackType>(callback)))};
if (!success)
{
// overwrite existing callback
itr->second = std::make_shared<CallbackWrapper<CallbackType>>(std::forward<CallbackType>(callback));
}
return id;
}
template<typename ContainerType>
bool TWSResponse::unregisterCallback(ContainerType& cbContainer, const callbackId_type id)
{
// unregister the callback
if (id == s_InvalidCallbackId)
{
return true;
}
{
std::unique_lock writeLock{ cbContainer.lock };
auto& callbacks{ cbContainer.callbacks };
const auto itr{ callbacks.find(id) };
if (itr == std::end(callbacks))
{
return false;
}
// destroy the callback
auto& [id, callback] { *itr };
const auto destroyed{ callback->destroy() };
if (destroyed)
{
// safe to remove
callbacks.erase(itr);
}
else
{
// should not get here
assert(false);
}
return destroyed;
}
}
These functions can be easily modified to pass known fixed parameters which in the case described above would be for variants with an int reqId.
VI. Summary
Hope you have enjoyed this overview of C++ callbacks and some of the gotchas along the way. Any comments or feedback is appreciated.