且构网

分享程序员开发的那些事...
且构网 - 分享程序员编程开发的那些事

消息系统的观察者模式+访客模式

更新时间:2023-12-01 16:45:10

一些元编程样板:

// a bundle of types:
template<class...>struct types{using type=types;};

// a type that does nothing but carry a type around
// without being that type:
template<class T>struct tag{using type=T;};

// a template that undoes the `tag` operation above:
template<class Tag>using type_t=typename Tag::type;

// a shorter way to say `std::integral_constant<size_t, x>`:
template<std::size_t i>struct index:std::integral_constant<std::size_t, i>{};

获取types<...>中类型的索引:

// this code takes a type T, and a types<...> and returns
// the index of the type in there.
// index_of
namespace details {
  template<class T, class Types>
  struct index_of{};
}
template<class T, class Types>
using index_of_t=type_t<details::index_of<T,Types>>;
namespace details {
  // if the first entry in the list of types is T,
  // our value is 0
  template<class T, class...Ts>struct index_of<T, types<T,Ts...>>:
    tag< index<0> >
  {};
  // otherwise, it is 1 plus our value on the tail of the list:
  template<class T, class T0, class...Ts>
  struct index_of<T, types<T0, Ts...>>:
    tag< index< index_of_t<T,types<Ts...>{}+1 > >
  {};
}

这里是单个频道"广播公司(它发送一种消息):

Here is a single "channel" broadcaster (it sends one kind of message):

// a token is a shared pointer to anything
// below, it tends to be a shared pointer to a std::function
// all we care about is the lifetime, however:
using token = std::shared_ptr<void>;
template<class M>
struct broadcaster {
  // f is the type of something that can eat our message:
  using f = std::function< void(M) >;
  // we keep a vector of weak pointers to people who can eat
  // our message.  This lets them manage lifetime independently:
  std::vector<std::weak_ptr<f>> listeners;

  // reg is register.  You pass in a function to eat the message
  // it returns a token.  So long as the token, or a copy of it,
  // survives, broadcaster will continue to send stuff at the
  // function you pass in:
  token reg( f target ) {
    // if thread safe, (write)lock here
    auto sp = std::make_shared<f>(std::move(target));
    listeners.push_back( sp );
    return sp;
    // unlock here
  }
  // removes dead listeners:
  void trim() {
    // if thread safe, (try write)lock here
    // and/or have trim take a lock as an argument
    listeners.erase(
      std::remove_if( begin(listeners), end(listeners), [](auto&& p){
        return p.expired();
      } ),
      listeners.end()
    );
    // unlock here
  }
  // Sends a message M m to every listener who is not dead:
  void send( M m ) {
    trim(); // remove dead listeners
    // (read) lock here
    auto tmp_copy = listeners; // copy the listeners, just in case
    // unlock here

    for (auto w:tmp_copy) {
      auto p = w.lock();
      if (p) (*p)(m);
    }
  }
};

这里是一个多通道subject,它可以支持任意数量的不同消息类型(在编译时确定).如果您无法匹配消息类型,则send和/或reg将无法编译.您负责确定消息是const&还是值或其他任何值.尝试reg右值消息不起作用.旨在将M显式传递给regsend.

Here is a multi-channel subject that can support any number of different message types (determined at compile-time). If you fail to match a message type, send and/or reg will fail to compile. You are responsible for deciding if a message is a const& or a value or whatever. Trying to reg an rvalue message won't work. It is intended that M is passed to reg and send explicitly.

// fancy wrapper around a tuple of broadcasters:
template<class...Ts>
struct subject {
  std::tuple<broadcaster<Ts>...> stations;
  // helper function that gets a broadcaster compatible
  // with a message type M:
  template<class M>
  broadcaster<M>& station() {
    return std::get< index_of_t<M, types<Ts...>>{} >( stations );
  }
  // register a message of type M.  You should call with M explicit usually:
  template<class M>
  token reg( std::function<void(M)> listener ) {
    return station<M>().reg(std::move(listener));
  }
  // send a message of type M.  You should explicitly pass M usually:
  template<class M>
  void send( M m ) {
    station<M>().send(std::forward<M>(m));
  }
};

实时示例.

当您reg时,它将返回token,也称为std::shared_ptr<void>.只要此令牌(或副本)保留下来,消息就会一直流传.如果它消失了,到reged回调的消息将结束.通常,这意味着侦听器应维护std::vector<token>以及使用this willy-nilly的reg lambda.

When you reg, it returns a token, aka std::shared_ptr<void>. For as long as this token (or a copy) survives, messages will flow. If it goes away, messages to the reged callback will end. Typically that means listeners should maintain a std::vector<token>, and reg lambdas that use this willy-nilly.

在C ++ 14/1z中,上述内容会更好一些(我们可以省去types<...>index_of之一).

In C++14/1z, the above gets a bit nicer (we can do away with types<...> and index_of for one).

如果在广播周期中添加侦听器,则不会将其发送到.如果您在广播周期中删除了某个侦听器,则删除该点之后,该侦听器将不会发送到该侦听器.

If you add a listener during a broadcast cycle, it will not be sent to. If you remove a listener during a broadcast cycle, it will not be sent to after the point you removed it.

为广播器上的读取器/写入器锁设置了线程安全注释.

The thread safe comments are set up for reader/writer locks on broadcaster.

当调用trimsend时,将回收分配给给定广播者的无效侦听器的内存.但是,std::function早已被销毁,因此只有有限的内存被浪费了,直到下一个send为止.然后我这样做,因为无论如何我们都要遍历消息列表,不妨先清理所有混乱.

Memory allocated for dead listeners for a given broadcaster is reclaimed when trim or send is called. However, the std::function will have been destroyed long ago, so only a limited amount of memory is wasted until the next send. I do it then, because we are going to iterate over the list of messages anyhow, might as well clean up any mess first.

此解决方案没有RTTI或动态强制转换,并且消息仅发送给了解它们的侦听器.

This solution has no RTTI or dynamic casting, and messages are only sent to listeners who understand them.

c ++ 17 事情变得简单了.删除所有元编程样板,删除subject(保持broadcaster),然后执行此操作即可处理多个通道:

In c++17 things gets simpler. Drop all of the metaprogramming boilerplate, remove subject (keep broadcaster) and just do this to handle more than one channel:

template<class...Ms>
struct broadcasters : broadcaster<Ms>... {
  using broadcaster<Ms>::reg...;
  using broadcaster<Ms>::send...;

  template<class M>
  broadcaster<M>& station() { return *this; }
};

broadcasters现在几乎是对上述subject的直接改进.

this broadcasters is now nearly drop-in improvement on subject above.

由于std::function中的改进,因为reg函数通常会做正确的事,除非信号选项过于相似.如果您确实遇到了regsend的问题,您将不得不致电.station<type>().reg(blah).

Due to improvements in std::function since c++11, the reg function usually does the right thing unless the signal options are overly similar. If you do run into problems with reg or send, you are forced to call .station<type>().reg(blah).

但是您可以只执行.reg( lambda ).send( msg )的99/100次,而超载分辨率则可以解决问题.

But 99/100 times you can just do a .reg( lambda ) and .send( msg ) and overload resolution does the right thing.

在线示例.

下面是整个系统,其中增加了模块化的嵌入式线程安全系统:

And here is the entire system augmented with a modular drop-in thread safety system:

struct not_thread_safe {
    struct not_lock {~not_lock(){}};
    auto lock() const { return not_lock{}; }
};
struct mutex_thread_safe {
    auto lock() const { return std::unique_lock<std::mutex>(m); }
private:
    mutable std::mutex m;
};
struct rw_thread_safe {
    auto lock() { return std::unique_lock<std::shared_timed_mutex>(m); }
    auto lock() const { return std::shared_lock<std::shared_timed_mutex>(m); }
private:
    mutable std::shared_timed_mutex m;
};
template<class D, class>
struct derived_ts {
    auto lock() { return static_cast<D*>(this)->lock(); }
    auto lock() const { return static_cast<D const*>(this)->lock(); }
};
using token = std::shared_ptr<void>;
template<class M, class TS=not_thread_safe>

struct broadcaster:
  TS
{
  using f = std::function< void(M) >;
  mutable std::vector<std::weak_ptr<f>> listeners;
  token reg( f target )
  {
    auto l = this->lock();
    auto sp = std::make_shared<f>(std::move(target));
    listeners.push_back( sp );
    return sp;
  }
  // logically const, but not really:
  void trim() const {
    auto l = const_cast<broadcaster&>(*this).lock();
    auto it = std::remove_if( listeners.begin(), listeners.end(), [](auto&& p){
      return p.expired();
    } );
    listeners.erase( it, listeners.end() );
  }
  // logically const, but not really:
  void send( M m ) const
  {
    trim(); // remove dead listeners
    auto tmp_copy = [this]{
      auto l = this->lock();
      return listeners; // copy the listeners, just in case
    }();

    for (auto w:tmp_copy) {
      auto p = w.lock();
      if (p) (*p)(m);
    }
  }
};
template<class TS, class...Ms>
struct basic_broadcasters :
    TS,
    broadcaster<Ms, derived_ts<basic_broadcasters<TS, Ms...>, Ms> >... 
{
  using TS::lock;
  using broadcaster<Ms, derived_ts<basic_broadcasters<TS, Ms...>, Ms> >::reg...;
  using broadcaster<Ms, derived_ts<basic_broadcasters<TS, Ms...>, Ms> >::send...;

  template<class M>
  broadcaster<M, derived_ts<basic_broadcasters<TS, Ms...>, M>>& station() { return *this; }
  template<class M>
  broadcaster<M, derived_ts<basic_broadcasters<TS, Ms...>, M>> const& station() const { return *this; }
};
template<class...Ms>
using broadcasters = basic_broadcasters<rw_thread_safe, Ms...>;

实时示例.

broadcasters<Messages...>现在是一个读写锁定的广播类,它使用1个公共共享锁来同步每个广播队列.

broadcasters<Messages...> is now a read-write locked broadcasting class that uses 1 common shared lock to synchronize every broadcast queue.

basic_broadcasters<not_thread_safe, Messages...>而是创建一个没有锁定的线程(即不是线程安全的).

basic_broadcasters<not_thread_safe, Messages...> instead creates one with no locking (ie, isn't thread safe).