PocoPocoPoco 中的通知,是消息源通过中间载体将消息发送给观察者,通知可以分为
同步通知和异步通知。
下图是同步通知,消息发送流程:
class Notification: public RefCountedObject
{
public:
typedef AutoPtr<Notification> Ptr;
Notification();
virtual std::string name() const;
protected:
virtual ~Notification();
};
类 NotificationCenter 类扮演了一个消息源的角色。下面是它的定义:
class NotificationCenter
{
public:
NotificationCenter();
/// Creates the NotificationCenter.
~NotificationCenter();
/// Destroys the NotificationCenter.
void addObserver(const AbstractObserver& observer);
/// Registers an observer with the NotificationCenter.
/// Usage:
/// Observer<MyClass, MyNotification> obs(*this, &MyClass::handleNotification);
/// notificationCenter.addObserver(obs);
///
/// Alternatively, the NObserver template class can be used instead of Observer.
void removeObserver(const AbstractObserver& observer);
/// Unregisters an observer with the NotificationCenter.
bool hasObserver(const AbstractObserver& observer) const;
/// Returns true if the observer is registered with this NotificationCenter.
void postNotification(Notification::Ptr pNotification);
/// Posts a notification to the NotificationCenter.
/// The NotificationCenter then delivers the notification
/// to all interested observers.
/// If an observer throws an exception, dispatching terminates
/// and the exception is rethrown to the caller.
/// Ownership of the notification object is claimed and the
/// notification is released before returning. Therefore,
/// a call like
/// notificationCenter.postNotification(new MyNotification);
/// does not result in a memory leak.
bool hasObservers() const;
/// Returns true iff there is at least one registered observer.
///
/// Can be used to improve performance if an expensive notification
/// shall only be created and posted if there are any observers.
std::size_t countObservers() const;
/// Returns the number of registered observers.
static NotificationCenter& defaultCenter();
/// Returns a reference to the default
/// NotificationCenter.
private:
typedef SharedPtr<AbstractObserver> AbstractObserverPtr;
typedef std::vector<AbstractObserverPtr> ObserverList;
ObserverList _observers;
mutable Mutex _mutex;
};
通过定义可以看出它是目标对象的集合std::vector<AbstractObserverPtr> _observers
。
通过调用函数 addObserver(const AbstractObserver& observer),可以完成目标对象
的注册过程。调用函数 removeObserver()则可以完成注销。函数 postNotification 是一个消息传递的过程,其定义如下:
void NotificationCenter::postNotification(Notification::Ptr pNotification)
{
poco_check_ptr (pNotification);
ScopedLockWithUnlock<Mutex> lock(_mutex);
ObserverList observersToNotify(_observers);
lock.unlock();
for (ObserverList::iterator it = observersToNotify.begin(); it != observersToNotify.end(); ++it)
{
(*it)->notify(pNotification);
}
}
可以看到它是向所有观察者发送消息。这里为了避免长期占用_observers,在发送时复制了一份。
class AbstractObserver
{
public:
AbstractObserver();
AbstractObserver(const AbstractObserver& observer);
virtual ~AbstractObserver();
AbstractObserver& operator = (const AbstractObserver& observer);
virtual void notify(Notification* pNf) const = 0;
virtual bool equals(const AbstractObserver& observer) const = 0;
virtual bool accepts(Notification* pNf, const char* pName = 0) const = 0;
virtual AbstractObserver* clone() const = 0;
virtual void disable() = 0;
};
所有接收类都需要继承AbstractObserver
template <class C, class N>
class Observer: public AbstractObserver
{
public:
typedef void (C::*Callback)(N*);
Observer(C& object, Callback method):
_pObject(&object),
_method(method)
{
}
Observer(const Observer& observer):
AbstractObserver(observer),
_pObject(observer._pObject),
_method(observer._method)
{
}
~Observer()
{
}
Observer& operator = (const Observer& observer)
{
if (&observer != this)
{
_pObject = observer._pObject;
_method = observer._method;
}
return *this;
}
void notify(Notification* pNf) const
{
Poco::Mutex::ScopedLock lock(_mutex);
if (_pObject)
{
N* pCastNf = dynamic_cast<N*>(pNf);
if (pCastNf)
{
pCastNf->duplicate();
(_pObject->*_method)(pCastNf);
}
}
}
bool equals(const AbstractObserver& abstractObserver) const
{
const Observer* pObs = dynamic_cast<const Observer*>(&abstractObserver);
return pObs && pObs->_pObject == _pObject && pObs->_method == _method;
}
bool accepts(Notification* pNf) const
{
return dynamic_cast<N*>(pNf) != 0;
}
AbstractObserver* clone() const
{
return new Observer(*this);
}
void disable()
{
Poco::Mutex::ScopedLock lock(_mutex);
_pObject = 0;
}
private:
Observer();
C* _pObject;
Callback _method;
mutable Poco::Mutex _mutex;
};
Observer 中存在一个类实例对象的指针_pObject,以及对应函数入口地址_method。
template <class C, class N>
class NObserver: public AbstractObserver
{
public:
typedef AutoPtr<N> NotificationPtr;
typedef void (C::*Callback)(const NotificationPtr&);
NObserver(C& object, Callback method):
_pObject(&object),
_method(method)
{
}
NObserver(const NObserver& observer):
AbstractObserver(observer),
_pObject(observer._pObject),
_method(observer._method)
{
}
~NObserver()
{
}
NObserver& operator = (const NObserver& observer)
{
if (&observer != this)
{
_pObject = observer._pObject;
_method = observer._method;
}
return *this;
}
void notify(Notification* pNf) const
{
Poco::Mutex::ScopedLock lock(_mutex);
if (_pObject)
{
N* pCastNf = dynamic_cast<N*>(pNf);
if (pCastNf)
{
NotificationPtr ptr(pCastNf, true);
(_pObject->*_method)(ptr);
}
}
}
bool equals(const AbstractObserver& abstractObserver) const
{
const NObserver* pObs = dynamic_cast<const NObserver*>(&abstractObserver);
return pObs && pObs->_pObject == _pObject && pObs->_method == _method;
}
bool accepts(Notification* pNf) const
{
return dynamic_cast<N*>(pNf) != 0;
}
AbstractObserver* clone() const
{
return new NObserver(*this);
}
void disable()
{
Poco::Mutex::ScopedLock lock(_mutex);
_pObject = 0;
}
private:
NObserver();
C* _pObject;
Callback _method;
mutable Poco::Mutex _mutex;
};
#include "Poco/NotificationCenter.h"
#include "Poco/Notification.h"
#include "Poco/Observer.h"
#include "Poco/NObserver.h"
#include "Poco/AutoPtr.h"
#include <iostream>
using Poco::NotificationCenter;
using Poco::Notification;
using Poco::Observer;
using Poco::NObserver;
using Poco::AutoPtr;
class BaseNotification: public Notification
{
};
class SubNotification: public BaseNotification
{
};
class Target
{
public:
void handleBase(BaseNotification* pNf)
{
std::cout << "handleBase: " << pNf->name() << std::endl;
pNf->release(); // we got ownership, so we must release
}
void handleSub(const AutoPtr<SubNotification>& pNf)
{
std::cout << "handleSub: " << pNf->name() << std::endl;
}
};
int main(int argc, char** argv)
{
NotificationCenter nc;
Target target;
nc.addObserver(
Observer<Target, BaseNotification>(target, &Target::handleBase)
);
nc.addObserver(
NObserver<Target, SubNotification>(target, &Target::handleSub)
);
nc.postNotification(new BaseNotification);
nc.postNotification(new SubNotification);
nc.removeObserver(
Observer<Target, BaseNotification>(target, &Target::handleBase)
);
nc.removeObserver(
NObserver<Target, SubNotification>(target, &Target::handleSub)
);
return 0;
}
总结:
类似于观察者模式,通过创建观察者,增加到NotificationCenter,通知中心遍历所有观察者,调用到观察者,的notify函数,然后回调到用户的函数。
Poco 中的异步通知是通过 NotificationQueue 类来实现的,同它功能类似还有类PriorityNotificationQueue 和 TimedNotificationQueue。不同的是 PriorityNotificationQueue类中对消息分了优先级,对优先级高的消息优先处理;而 TimedNotificationQueue 对消息给了时间戳,时间戳早的优先处理,而和其压入队列的时间无关。所以接下来我们主要关注NotificationQueue 的实现。
class Foundation_API NotificationQueue
/// A NotificationQueue object provides a way to implement asynchronous
/// notifications. This is especially useful for sending notifications
/// from one thread to another, for example from a background thread to
/// the main (user interface) thread.
///
/// The NotificationQueue can also be used to distribute work from
/// a controlling thread to one or more worker threads. Each worker thread
/// repeatedly calls waitDequeueNotification() and processes the
/// returned notification. Special care must be taken when shutting
/// down a queue with worker threads waiting for notifications.
/// The recommended sequence to shut down and destroy the queue is to
/// 1. set a termination flag for every worker thread
/// 2. call the wakeUpAll() method
/// 3. join each worker thread
/// 4. destroy the notification queue.
{
public:
NotificationQueue();
/// Creates the NotificationQueue.
~NotificationQueue();
/// Destroys the NotificationQueue.
void enqueueNotification(Notification::Ptr pNotification);
/// Enqueues the given notification by adding it to
/// the end of the queue (FIFO).
/// The queue takes ownership of the notification, thus
/// a call like
/// notificationQueue.enqueueNotification(new MyNotification);
/// does not result in a memory leak.
void enqueueUrgentNotification(Notification::Ptr pNotification);
/// Enqueues the given notification by adding it to
/// the front of the queue (LIFO). The event therefore gets processed
/// before all other events already in the queue.
/// The queue takes ownership of the notification, thus
/// a call like
/// notificationQueue.enqueueUrgentNotification(new MyNotification);
/// does not result in a memory leak.
Notification* dequeueNotification();
/// Dequeues the next pending notification.
/// Returns 0 (null) if no notification is available.
/// The caller gains ownership of the notification and
/// is expected to release it when done with it.
///
/// It is highly recommended that the result is immediately
/// assigned to a Notification::Ptr, to avoid potential
/// memory management issues.
Notification* waitDequeueNotification();
/// Dequeues the next pending notification.
/// If no notification is available, waits for a notification
/// to be enqueued.
/// The caller gains ownership of the notification and
/// is expected to release it when done with it.
/// This method returns 0 (null) if wakeUpWaitingThreads()
/// has been called by another thread.
///
/// It is highly recommended that the result is immediately
/// assigned to a Notification::Ptr, to avoid potential
/// memory management issues.
Notification* waitDequeueNotification(long milliseconds);
/// Dequeues the next pending notification.
/// If no notification is available, waits for a notification
/// to be enqueued up to the specified time.
/// Returns 0 (null) if no notification is available.
/// The caller gains ownership of the notification and
/// is expected to release it when done with it.
///
/// It is highly recommended that the result is immediately
/// assigned to a Notification::Ptr, to avoid potential
/// memory management issues.
void dispatch(NotificationCenter& notificationCenter);
/// Dispatches all queued notifications to the given
/// notification center.
void wakeUpAll();
/// Wakes up all threads that wait for a notification.
bool empty() const;
/// Returns true iff the queue is empty.
int size() const;
/// Returns the number of notifications in the queue.
void clear();
/// Removes all notifications from the queue.
bool hasIdleThreads() const;
/// Returns true if the queue has at least one thread waiting
/// for a notification.
static NotificationQueue& defaultQueue();
/// Returns a reference to the default
/// NotificationQueue.
protected:
Notification::Ptr dequeueOne();
private:
typedef std::deque<Notification::Ptr> NfQueue;
struct WaitInfo
{
Notification::Ptr pNf;
Event nfAvailable;
};
typedef std::deque<WaitInfo*> WaitQueue;
NfQueue _nfQueue;
WaitQueue _waitQueue;
mutable FastMutex _mutex;
};
定义可以看到 NotificationQueue 类管理了两个 deque 容器。其中一个是 WaitInfo对象的 deque,另一个是 Notification 对象的 deque。而 WaitInfo 一对一的对应了一个消息对象 pNf 和事件对象 nfAvailable,毫无疑问 Event 对象是用来同步多线程的。
Notification* NotificationQueue::waitDequeueNotification()
{
Notification::Ptr pNf;
WaitInfo* pWI = 0;
{
FastMutex::ScopedLock lock(_mutex);
pNf = dequeueOne();
if (pNf) return pNf.duplicate();
pWI = new WaitInfo;
_waitQueue.push_back(pWI);
}
pWI->nfAvailable.wait();
pNf = pWI->pNf;
delete pWI;
return pNf.duplicate();
}
消费者线程首先从 Notification 对象的 deque 中获取消息,如果消息获取不为空,则接返回处理,如果消息为空,则创建一个新的 WaitInfo 对象,并压入 WaitInfo 对象的deque。 消费者线程开始等待,直到生产者通知有消息的存在,然后再从 WaitInfo 对象中取出消息,返回处理。当消费者线程能从 Notification 对象的 deque 中获取到消息时,说明消费者处理消息的速度要比生成者低;反之则说明消费者处理消息的速度要比生成者高。
void NotificationQueue::enqueueNotification(Notification::Ptr pNotification)
{
poco_check_ptr (pNotification);
FastMutex::ScopedLock lock(_mutex);
if (_waitQueue.empty())
{
_nfQueue.push_back(pNotification);
}
else
{
WaitInfo* pWI = _waitQueue.front();
_waitQueue.pop_front();
pWI->pNf = pNotification;
pWI->nfAvailable.set();
}
}
生产者线程首先判断 WaitInfo 对象的 deque 是否为空,如果不为空,说明存在消费者线程等待,则从 deque 中获取一个 WaitInfo 对象,灌入 Notification 消息,释放信号量激活消费者线程;而如果为空,说明目前说有的消费者线程都在工作,则把消息暂时存入Notification 对象的 deque,等待消费者线程有空时处理。
#include "Poco/Notification.h"
#include "Poco/NotificationQueue.h"
#include "Poco/ThreadPool.h"
#include "Poco/Runnable.h"
#include "Poco/AutoPtr.h"
using Poco::Notification;
using Poco::NotificationQueue;
using Poco::ThreadPool;
using Poco::Runnable;
using Poco::AutoPtr;
class WorkNotification: public Notification
{
public:
WorkNotification(int data): _data(data) {}
int data() const
{
return _data;
}
private:
int _data;
};
class Worker: public Runnable
{
public:
Worker(NotificationQueue& queue): _queue(queue) {}
void run()
{
AutoPtr<Notification> pNf(_queue.waitDequeueNotification());
while (pNf)
{
WorkNotification* pWorkNf = dynamic_cast<WorkNotification*>(pNf.get());
if (pWorkNf)
{
// do some work
}
pNf = _queue.waitDequeueNotification();
}
}
private:
NotificationQueue& _queue;
};
int main(int argc, char** argv)
{
NotificationQueue queue;
Worker worker1(queue); // create worker threads
Worker worker2(queue);
ThreadPool::defaultPool().start(worker1); // start workers
ThreadPool::defaultPool().start(worker2);
// create some work
for (int i = 0; i < 100; ++i)
{
queue.enqueueNotification(new WorkNotification(i));
}
while (!queue.empty()) // wait until all work is done
Poco::Thread::sleep(100);
queue.wakeUpAll(); // tell workers they're done
ThreadPool::defaultPool().joinAll();
return 0;
}
总结
生产者入队列,消费者出队列,实现消息的异步处理。
https://pocoproject.org/slides/090-NotificationsEvents.pdf
https://blog.csdn.net/arau_sh/article/details/8673459