Intricacies of C++ Callbacks

I. Overview

Recently I have been working on a C++ library which wraps a non HFT third party financial asynchronous API from a broker. Most of the API's adhere to the following pattern:

void requestData(int id, configuration parameters);
void dataPart1(int id, some data);
void dataPart2(int id, some data);
...
void dataPartN(int id, some data);
void dataEnd(int id);   // usually not present if not multiple data return calls
void cancelDataRequest(int id);  // does not exist in all group calls
void error(int id, error info);        

Each id must be unique and there can be many outstanding requests for the same data with different configuration parameters and id's which will complete in a random order due to the asynchronous nature. The id is easily guaranteed unique with a global atomic int.

This lends itself well to encapsulation into a class to handle the verification of the configuration parameters, the storage of the data, and the various synchronizations. An example of this type of pattern is as follows:

class RequestSomeData
{
public:
    // Define enumerations
    enum class Wait : uint8_t
    {
        Timeout,
        Errored,
        Completed,
    };

    // define constructors/destructors
    explicit RequestSomeData(Configuration Parameters);
    ~RequestSomeData();

    // Prevent copying/moving
    RequestSomeData(const RequestSomeData&) = delete;
    RequestSomeData(RequestSomeData&&) noexcept = delete;
    RequestSomeData& operator=(const RequestSomeData&) = delete;
    RequestSomeData& operator=(RequestSomeData&&) noexcept = delete;

    // Interface functions
    [[nodiscard]] const Data& data() const noexcept;

    /// wait for the request to complete, error, or update
    template<typename Rep = int64_t, typename Period = std::ratio<1>>
    Wait wait(const std::chrono::duration<Rep, Period>& time = std::chrono::seconds{ 86400 }) noexcept;

private:
    // Callback functions 
   void dataPart1(int id, some data);
   void dataPart2(int id, some data);
   ...
   void dataPartN(int id, some data);

    // Helper Functions
    /// Send the command to get the data
    void sendCommand();

    /// Send the cancel command
    void cancelCommand();

    /// register callbacks for this data request
    void registerCallbacks();

    /// unregister callbacks for this data request
    void unregisterCallbacks();

    // member variables
    mutable std::shared_mutex m_DataLock;
    DataType m_Data;
};        

This article is not about the configuration of this class but about the issues that can arise from C++ callbacks many of which are the same as pub/sub type issues.

II. First Iteration

As a first attempt let's look at how you might call a bunch of callbacks associated with an event. From now on we will ignore the id type for simplification. As a first attempt something like this may be produced:

std::vector<std::function<void()>> m_Callbacks;
std::mutex m_CallbacksLock;

// add callback
void addCallback(std::function<void()> callback)
{
   std::unique_lock lock{m_CallbacksLock};
   m_Callbacks.emplace_back(std::move(callback));
}

// remove callback
bool removeCallback(const std::function<void()>& callback)
{
   std::unique_lock lock{m_CallbacksLock};
   for(auto itr{std::cbegin(m_Callbacks)}; itr != std::cend(m_Callbacks); ++itr)
   {
      if (callback==*itr)
      {
         m_Callbacks.erase(itr);
         return true;
      }
   }
   return false;
}

// call callbacks
void callCallbacks(received data)
{
   std::unique_lock lock{m_CallbacksLock);
   for(auto& callback : m_Callbacks)
   {
       callback(received data);
   }
}        

While this looks perfectly safe there is one large issue which is that the locks on the callbacks is held during all the time consuming invocations which could represent the majority of the lock hold time and block threads trying to add or remove callbacks. Another issue here is that the mutex prevents the invocation of the callbacks for a different set of configuration parameters at the same time limiting the asynchronous nature.

III. Improve Callbacks Lock Held

In an attempt to improve on the lock hold duration and allow multiple simultaneous invocations the following might be attempted:

std::vector<std::function<void()>> m_Callbacks;
std::shared_mutex m_CallbacksLock;

// add callback
void addCallback(std::function<void()> callback)
{
   std::unique_lock lock{m_CallbacksLock};
   m_Callbacks.emplace_back(std::move(callback));
}

// remove callback
bool removeCallback(const std::function<void()>& callback)
{
   std::unique_lock lock{m_CallbacksLock};
   for(auto itr{std::cbegin(m_Callbacks)}; itr != std::cend(m_Callbacks); ++itr)
   {
      if (callback==*itr)
      {
         m_Callbacks.erase(itr);
         return true;
      }
   }
   return false;
}

// call callbacks
void callCallbacks(received data)
{
      std::vector<std::function<void()>> callbacks;
      {
          std::shared_lock lock{m_CallbacksLock);
          callbacks.reserve(m_Callbacks.size());
          for(const auto& callback : m_Callbacks)
          {
               callbacks.emplace_back(callback);
          }
     }

     for(auto& callback : callbacks)
     {
          callback(received data);
     }
}        

So we make a copy of the callbacks under the lock before invoking them which should be much faster than the actual invocations and then invoke them out of the lock. We also use a shared mutex which allows multiple invocations with different parameters simultaneously. While this greatly improves the lock hold time, it introduces a subtle race condition where the callback may no longer be valid when invoked due to it having been removed.

IV. Improve On Race Condition

We can improve on the callback removal race condition by changing the std::function to std::shared_ptr<std::function> and using std::weak_ptr. An example is as follows:

std::vector<std::shared_ptr<std::function<void()>>> m_Callbacks;
std::shared_mutex m_CallbacksLock;

// call callbacks
void callCallbacks(received data)
{
      std::vector<std::weak_ptr<std::function<void()>>> callbacks;
      {
          std::shared__lock lock{m_CallbacksLock);
          callbacks.reserve(m_Callbacks.size());
          for(const auto& callback : m_Callbacks)
          {
               callbacks.emplace_back(callback);
          }
     }

     for(auto& wcallback : callbacks)
     {
         if (auto callback = wcallback.lock()) 
         {
             (*callback)(received data);
         }
     }
}        

If the callback is removed before being called then the weak pointer cannot be converted to a shared pointer and thus not be called. However, there is an even more subtle race condition here where the functor itself or the object containing the functor is deleted during the callback which will result in undefined behavior most likely a crash.

V. Callback Wrapper

We can overcome the problem with the shared pointer mentioned above by creating a callback wrapper and storing a shared pointer to the callback wrapper instead of to the functor directly.

namespace CallbackWrapperDetail
{
    /// Invoke return implementation helper
    /// @tparam ReturnType the return type of the callback
    template<typename ReturnType>
    struct invoke_return_impl
    {
        // if void return type use bool; otherwise std::optional<ReturnType>
        using type = std::conditional_t<std::is_same_v<ReturnType, void>, bool, std::optional<ReturnType>>;
    };

    /// Get the invoke return type which changes if the return type of the callback is void or not
    /// @tparam CallbackType the callback type
    /// @tparam ArgsTypes the arguments types to the callback
    template<typename CallbackType, typename... ArgsTypes>
    struct invoke_return
    {
        using type = typename invoke_return_impl<std::invoke_result_t<CallbackType, ArgsTypes...>>::type;
    };

    /// Helper function to return the type 
    /// @tparam CallbackType the callback type
    /// @tparam ArgsTypes the arguments types to the callback
    template<typename CallbackType, typename... ArgsTypes>
    using invoke_return_t = typename invoke_return<CallbackType, ArgsTypes...>::type;
}

/// This is a callback Wrapper to avoid lifetime issues
/// @tparam CallbackType the type of the callback
template<typename CallbackType>
class CallbackWrapper
{
public:
    CallbackWrapper(CallbackType callback);

    ~CallbackWrapper() = default;

    // Prevent copying
    CallbackWrapper(const CallbackWrapper&) = delete;
    CallbackWrapper(CallbackWrapper&&) noexcept = default;
    CallbackWrapper& operator=(const CallbackWrapper&) = delete;
    CallbackWrapper& operator=(CallbackWrapper&&) noexcept = default;

    // interface functions

    /// destroy the ability for this callback to be invoked
    /// @tparm Rep the wait time count type
    /// @tparam Period the wait time tick period
    /// @param the wait time duration
    /// @return returns true if successful before timeout
    template<typename Rep = int64_t, typename Period = std::ratio<1>>
    [[nodiscard]] bool destroy(const std::chrono::duration<Rep, Period>& time = std::chrono::seconds{ 86400 }) noexcept
   {
       // we destroy invocation by marking unacquirable
       const auto startTime{ std::chrono::steady_clock::now() };
       while (!acquire())
       {
           const auto currentTime{ std::chrono::steady_clock::now() };
           const auto elapsed{ currentTime - startTime };
           if (currentTime - startTime > time)
           {
               // timeout
               return false;
           }
           std::this_thread::sleep_for(std::chrono::milliseconds{ 10 });
       }
       return true;
   }

    /// Invoke the call back with the given parameters
    /// @tparam ArgsTypes the argument types for the callback
    /// @tparam ReturnType the return type of the invoke function
    /// @param args the arguments to pass to the callback
    /// @return the return type of the callback
    template<typename... ArgsTypes, typename ReturnType = CallbackWrapperDetail::invoke_return_t<CallbackType, ArgsTypes...>>
    [[nodiscard]] ReturnType invoke(ArgsTypes... args)
    {
        constexpr bool voidReturn{ std::is_same_v<ReturnType, bool> };

        if constexpr (voidReturn)
        {
            // we invoke a function by acquire it.  If it is unacquirable then the function has most likely gone out of scope
            if (!acquire())
            {
                return false;
            }

            // invoke the function
           m_Callback(std::forward<ArgsTypes>(args)...);

           // release
           if (!release())
           {
                // should not happen
                assert(false);
           }

           return true;
        }
        else
        {
            // we invoke a function by acquire it.  If it is unacquirable then the function has most likely gone out of scope
            if (!acquire())
            {
               return ReturnType{};
            }

           // invoke the function
           ReturnType result{ m_Callback(std::forward<ArgsTypes>(args)...) };

           // release
           if (!release())
           {
               // should not happen
               assert(false);
           }

           return result;
        }

private:
    // helper functions

    /// Whether the callback was acquired or not
    /// @returns returns true if callback acquired
    [[nodiscard]] bool acquire() noexcept
    {
        bool expected{ false };
        const bool exchanged{ std::atomic_compare_exchange_strong(&m_Acquired, &expected, true) };
        return exchanged;
    }

    /// Whether the callback was released or not
    /// @returns returns true if callback released
    [[nodiscard]] bool release() noexcept
   {
        bool expected{ true };
        const bool exchanged{ std::atomic_compare_exchange_strong(&m_Acquired, &expected, false) };
        return exchanged;
    }

    // member variables
    mutable std::atomic<bool> m_Acquired;
    CallbackType m_Callback;
};        

The interesting part of this is the atomic bool which is acquired before invoking the callback and blocks the destroy if in a callback so that it cannot be unregistered, i.e. removed from the callback list.

With this new helper class and already discussed optimizations, the invocation of callbacks now becomes as follows:

template<typename ContainerType, typename... Args>
[[nodiscard]] size_t TWSResponse::invokeCallbacks(ContainerType& cbContainer, Args... args)
{
    using callback_type = decltype(cbContainer.callbacks)::mapped_type::element_type;
    
    std::vector<std::pair<callbackId_type, std::weak_ptr<callback_type>>> lockFreeCallbacks;
    // get weak pointers to the callbacks
    {
        std::shared_lock readLock{ cbContainer.lock };
        const auto& callbacks{ cbContainer.callbacks };
        lockFreeCallbacks.reserve(callbacks.size());
        for (auto& [id, pCallback] : callbacks)
        {
            lockFreeCallbacks.emplace_back(id, pCallback);
        }
    }

    // invoke callbacks outside the lock
    // Note: In future may throw this to a thread
    size_t numInvoked{ 0U };
    for (auto& [id, wpCallback] : lockFreeCallbacks)
    {
        // try to acquire shared pointer to guarantee callback still valid
        const auto pCallback{ wpCallback.lock() };
        if (pCallback)
        {
            if (pCallback->invoke(id, std::forward<Args>(args)...))
            {
                ++numInvoked;
            }
        }
    }
    return numInvoked;
}        

And the corresponding register and unregister become the as follows:

template<typename ContainerType,  typename CallbackType>
[[nodiscard]] auto TWSResponse::registerCallback(ContainerType& cbContainer, CallbackType&& callback) -> callbackId_type
{
    // add the callback
    std::unique_lock writeLock{ cbContainer.lock };
    const auto id{ nextCallbackId(cbContainer.callbackId) };
    const auto [itr, success] 
        { cbContainer.callbacks.try_emplace(id,
                                            std::make_shared<CallbackWrapper<CallbackType>>(std::forward<CallbackType>(callback)))};
    if (!success)
    {
        // overwrite existing callback
        itr->second = std::make_shared<CallbackWrapper<CallbackType>>(std::forward<CallbackType>(callback));
    }
    return id;
}

template<typename ContainerType>
bool TWSResponse::unregisterCallback(ContainerType& cbContainer, const callbackId_type id)
{
    // unregister the callback
    if (id == s_InvalidCallbackId)
    {
        return true;
    }

    {
        std::unique_lock writeLock{ cbContainer.lock };
        auto& callbacks{ cbContainer.callbacks };
        const auto itr{ callbacks.find(id) };
        if (itr == std::end(callbacks))
        {
            return false;
        }
        // destroy the callback
        auto& [id, callback] { *itr };
        const auto destroyed{ callback->destroy() };
        if (destroyed)
        {
            // safe to remove
            callbacks.erase(itr);
        }
        else
        {
            // should not get here
            assert(false);
        }
        return destroyed;
    }
}        

These functions can be easily modified to pass known fixed parameters which in the case described above would be for variants with an int reqId.

VI. Summary

Hope you have enjoyed this overview of C++ callbacks and some of the gotchas along the way. Any comments or feedback is appreciated.


要查看或添加评论,请登录

社区洞察

其他会员也浏览了