/* Editor's note: COTD Entry: Flexible Synchronization Objects by Justin Wilder [justin@bigradio.com] */ #include #include #ifdef _MT namespace utility { // Defines an interface for synchronization devices that can be waited on. class waitable { public: virtual ~waitable() throw() { for (handle_vector::size_type n = 0; n < hvect.size(); ++n) ::CloseHandle(hvect[n]); } // Waits for this object to be signalled. Returns a positive integer context value, or less than // zero for an error or timeout. The timeout value defaults to infinite (no timeout). int wait(unsigned int timeout = INFINITE) const throw() { DWORD ret = ::WaitForMultipleObjects(hvect.size(), &hvect[0], wait_all, timeout); if ((ret >= WAIT_OBJECT_0) && (ret < (WAIT_OBJECT_0 + hvect.size()))) return ret - WAIT_OBJECT_0; else if ((ret >= WAIT_ABANDONED_0) && (ret < (WAIT_ABANDONED_0 + hvect.size()))) return ret - WAIT_ABANDONED_0; else return -1; } protected: typedef std::vector handle_vector; handle_vector hvect; bool wait_all; waitable(handle_vector::size_type res = 0, bool wait_all = false) throw() : wait_all(wait_all) { hvect.reserve(res); } waitable(const waitable& c) throw() : hvect(c.dup_handles()), wait_all(c.wait_all) {} handle_vector dup_handles() const throw() { handle_vector ret(hvect.size()); for (handle_vector::size_type n = 0; n < hvect.size(); ++n) ret[n] = dup_handle(hvect[n]); return ret; } HANDLE dup_handle(HANDLE h) const throw() { if (!DuplicateHandle(GetCurrentProcess(), h, GetCurrentProcess(), &h, 0, FALSE, DUPLICATE_SAME_ACCESS)) return NULL; else return h; } void insert(const handle_vector& hvect) throw() { for (handle_vector::size_type n = 0; n < hvect.size(); ++n) this->hvect.push_back(hvect[n]); } void insert(const waitable& c) throw() { insert(c.dup_handles()); } void insert(HANDLE h) throw() { hvect.push_back(h); } HANDLE get_handle(handle_vector::size_type index = 0) const throw() { assert(index < hvect.size()); return hvect[index]; } handle_vector::size_type size() const throw() { return hvect.size(); } friend class waitable_group; friend class timer; }; // Groups multiple waitable objects into a single waitable object, that will be // signalled when all component objects are signalled (wait_all == true), or when // any one component object is signalled (wait_all == false, the default). class waitable_group : public waitable { public: waitable_group(const waitable_group& c) throw() : waitable(c) {} explicit waitable_group(const waitable& w1, bool wait_all = false) throw() : waitable(w1.size(), wait_all) { insert(w1); } waitable_group(const waitable& w1, const waitable& w2, bool wait_all = false) throw() : waitable(w1.size() + w2.size(), wait_all) { insert(w1); insert(w2); } waitable_group(const waitable& w1, const waitable& w2, const waitable& w3, bool wait_all = false) throw() : waitable(w1.size() + w2.size() + w3.size(), wait_all) { insert(w1); insert(w2); insert(w3); } waitable_group(const waitable& w1, const waitable& w2, const waitable& w3, const waitable& w4, bool wait_all = false) throw() : waitable(w1.size() + w2.size() + w3.size() + w4.size(), wait_all) { insert(w1); insert(w2); insert(w3); insert(w4); } waitable_group(const waitable* wa, unsigned int len, bool wait_all = false) throw() : waitable(0, wait_all) { assert((len > 0) && (len < 1000)); for (unsigned int n = 0; n < len; ++n) insert(wa[n]); } }; // TODO: make this derive from waitable. // A lock sentry. The object it's constructed with i\s lock()'ed when the sentry is created, and // unlock()'ed when the sentry is destroyed. template class lock_sentry { public: lock_sentry(const _LOCK_TYPE& _l) : _l(_l) { _l.lock(); } ~lock_sentry() { _l.unlock(); } private: const _LOCK_TYPE& _l; }; // Implements mutual-exclusion object. class mutex : public waitable { public: typedef lock_sentry sentry; mutex() throw() { insert(::CreateMutex(NULL, FALSE, NULL)); assert(get_handle() != NULL); } // Blocks until the mutex can be locked. void lock() const throw() { wait(); } // Unlocks the mutex. Every call to lock must be matched with a subsequent unlock. void unlock() const throw() { ::ReleaseMutex(get_handle()); } }; // A waitable event. class event : public waitable { public: event() throw() { insert(CreateEvent(NULL, FALSE, FALSE, NULL)); assert(get_handle() != NULL); } // Creates a copy of this event. Copied events can be used interchangeably (i.e. a function // call on one is the same as a function call on another). The underlying system object is only // destroyed once all copies of an event have been destroyed. event(const event& c) : waitable(c) {} // Releases exactly one thread waiting on this event. If no threads are waiting at the time of the signal, the // event will remain signalled until a thread waits on it (and is subsequently released). void signal() const throw() { SetEvent(get_handle()); } // Releases zero or one waiting thread. If no threads are waiting at the time of the pulse, // no threads are released. void pulse() const throw() { PulseEvent(get_handle()); } // Resets the event. If the event is currently signalled, this function sets it to unsignalled. void reset() const throw() { ResetEvent(get_handle()); } // INCOMPLETE: it's hard to implement these two with win32 events. May need to create another // manual-reset event for this one to work. Not implemented because it's currently unused. // Releases every thread currently waiting on this event. void broadcast() const throw() { assert2(false, "waitable::broadcast() not currently supported"); } // Releases every thread currently waiting on this event, as well as every thread that will wait on it // in the future, until reset is called. void set() const throw() { assert2(false, "waitable::set() not currently supported"); } friend inline unsigned int wait(const event* ea, unsigned int len) throw(); }; // Your typical semaphore. It can be waited on in conjunction with any other waitable object. class semaphore : public waitable { public: // Initializes the semaphore with the given number of locks, and the given number initially locked. // 'size' must be greater than zero and 'locked' cannot be greater than 'size'. semaphore(unsigned int size, unsigned int locked = 0) throw() { assert((size > 0) && (locked <= size)); insert(::CreateSemaphore(NULL, size - locked, size, NULL)); assert(get_handle() != NULL); } // Creates a copy of this semaphore. Copied semaphores can be used interchangeably (i.e. a function // call on one is the same as a function call on another). The underlying system object is only // destroyed once all copies of a semaphore have been destroyed. semaphore(const semaphore& c) : waitable(c) {} // Takes a lock on the semaphore. Blocks if none are currently available. This function has // the same effect as waitable::wait(), and can be used interchangeably with it. void lock() const throw() { wait(); } // Releases a lock or a waiting thread (a thread blocking on waitable::wait()) on the semaphore. // Every lock (or wait) must be followed by an accompanying call to unlock. void unlock() const throw() { ::ReleaseSemaphore(get_handle(), 1, NULL); } }; // A timer that calls a callback function (or function object) at a specified time. template class callback_timer { public: typedef _CALLBACK callback_type; typedef unsigned int time_type; explicit callback_timer(const callback_type& cb = callback_type()) throw() : cb(cb), started(false) { } // Constructs the timer initially started. explicit callback_timer(time_type period, bool periodic = false, callback_type& cb = callback_type()) throw(std::runtime_error) : cb(cb), started(false) { if (!start(period, periodic)) throw std::runtime_error("Failed to start timer."); } callback_timer(const timer& c) throw() : cb(c.cb), started(false) {} ~callback_timer() { stop(); } // Starts the timer to be signalled in period milliseconds. If periodic is false (default) // the timer will only be signalled once, if periodic is true, it will be signalled every // period milliseconds. bool start(time_type period, bool periodic = false) throw() { if (started == true) return false; thread_args* args = new thread_args(period, absolute_time(), periodic, cb, halt_event, going_mutex); if (_beginthread(thread_entry, 0, args) == -1) { delete args; return false; } started = true; return true; } // Stops the timer. void stop() throw() { halt_event.signal(); going_mutex.lock(); // Wait for timer thread to cease. started = false; going_mutex.unlock(); } protected: callback_type cb; event halt_event; mutex going_mutex; bool started; struct absolute_time { absolute_time() { _ftime(&t); } absolute_time(const absolute_time& c) { t.time = c.t.time; t.millitm = c.t.millitm; } absolute_time& operator=(const absolute_time& c) { t.time = c.t.time; t.millitm = c.t.millitm; return *this; } absolute_time& operator+=(const absolute_time& c) { t.millitm += c.t.millitm; if (t.millitm >= 1000) { t.time += 1; t.millitm -= 1000; } t.time += c.t.time; return *this; } absolute_time& operator-=(const absolute_time& c) { t.millitm -= c.t.millitm; if ((signed)t.millitm < 0) { t.time -= 1; t.millitm += 1000; } t.time -= c.t.time; return *this; } absolute_time operator+(const absolute_time& c) const { absolute_time temp(*this); return temp += c; } absolute_time operator-(const absolute_time& c) const { absolute_time temp(*this); return temp -= c; } operator time_type () const { return t.time * 1000 + t.millitm; } protected: _timeb t; }; struct thread_args { thread_args() {} thread_args(time_type period, const absolute_time& start, bool periodic, const callback_type& cb, const event& halt_event, const mutex& going_mutex) : period(period), start(start), periodic(periodic), cb(cb), halt_event(halt_event), going_mutex(going_mutex) {} thread_args(const thread_args& c) : period(c.period), start(c.start), periodic(c.periodic), cb(c.cb), halt_event(c.halt_event), going_mutex(c.going_mutex) {} time_type period; absolute_time start; bool periodic; callback_type cb; event halt_event; mutex going_mutex; }; static void thread_entry(void* context) { assert(context != NULL); std::auto_ptr args((thread_args*)context); args->going_mutex.lock(); while (true) { time_type time_left = (absolute_time() - args->start) % args->period; if (args->halt_event.wait(time_left) < 0) { args->cb(); if (args->periodic) continue; } break; } args->going_mutex.unlock(); } }; // A waitable timer. Win32 is supposed to have a waitable timer, but it only works for NT. // So I had to implement one myself. class timer : public waitable { public: typedef unsigned int time_type; // Constructs the timer initially stopped. timer() throw() : t(callback(signal_event)) {} // Constructs the timer initially started. explicit timer(time_type period, bool periodic = false) throw(std::runtime_error) : t(period, periodic, callback(signal_event)) {} timer(const timer& c) : waitable(c), t(c.t), signal_event(c.signal_event) {} // Starts the timer to be signalled in period milliseconds. If periodic is false (default) // the timer will only be signalled once, if periodic is true, it will be signalled every // period milliseconds. bool start(time_type period, bool periodic = false) throw() { return t.start(period, periodic); } // Stops the timer. This function will not change the signal state of the timer (if it was signalled // prior to the call, it remains so). void stop() throw() { t.stop(); } protected: class callback { public: explicit callback(const event& e) : e(e) {} callback(const callback& c) : e(c.e) {} void operator()() { e.signal(); } protected: event e; }; typedef callback_timer timer_type; timer_type t; event signal_event; }; }; //namespace utility { #endif //_MT