First
This commit is contained in:
@@ -0,0 +1,471 @@
|
||||
#pragma once
|
||||
|
||||
#include "tl/detail/prologue.h"
|
||||
#include "tl/result.h"
|
||||
#include <tl/deque.h>
|
||||
|
||||
#include <mutex>
|
||||
#include <condition_variable>
|
||||
#include <cassert>
|
||||
|
||||
namespace tl
|
||||
{
|
||||
//////////////////////////////////////////////////////////////////////////
|
||||
|
||||
template <typename T>
|
||||
class blocking_queue
|
||||
{
|
||||
public:
|
||||
explicit blocking_queue(size_t max_size) noexcept;
|
||||
~blocking_queue() noexcept;
|
||||
|
||||
void exit() noexcept; //stop all processing, returns Exited
|
||||
void finish() noexcept; //once empty or full, stops all processing, returns Finished
|
||||
|
||||
enum class push_error_code
|
||||
{
|
||||
Full,
|
||||
Timeout,
|
||||
Finished,
|
||||
Exited,
|
||||
};
|
||||
|
||||
using push_result = result<void, push_error_code>;
|
||||
|
||||
push_result push(T const& t) noexcept;
|
||||
push_result try_push(T const& t) noexcept;
|
||||
push_result push_for(T const& t, chrono::system_clock::duration duration) noexcept;
|
||||
push_result push_until(T const& t, chrono::system_clock::time_point time_point) noexcept;
|
||||
|
||||
push_result push(T&& t) noexcept;
|
||||
push_result try_push(T&& t) noexcept;
|
||||
push_result push_for(T&& t, chrono::system_clock::duration duration) noexcept;
|
||||
push_result push_until(T&& t, chrono::system_clock::time_point time_point) noexcept;
|
||||
|
||||
enum class pop_error_code
|
||||
{
|
||||
Empty,
|
||||
Timeout,
|
||||
Exited,
|
||||
Finished,
|
||||
};
|
||||
|
||||
using pop_result = result<T, pop_error_code>;
|
||||
|
||||
pop_result pop() noexcept;
|
||||
pop_result try_pop() noexcept;
|
||||
pop_result pop_for(chrono::system_clock::duration duration) noexcept;
|
||||
pop_result pop_until(chrono::system_clock::time_point time_point) noexcept;
|
||||
|
||||
template <typename Container>
|
||||
using pop_many_result = result<Container, pop_error_code>;
|
||||
|
||||
template <typename Container>
|
||||
pop_many_result<Container> pop_many(size_t max) noexcept;
|
||||
template <typename Container>
|
||||
pop_many_result<Container> try_pop_many(size_t max) noexcept;
|
||||
template <typename Container>
|
||||
pop_many_result<Container> pop_many_for(size_t max, chrono::system_clock::duration duration) noexcept;
|
||||
template <typename Container>
|
||||
pop_many_result<Container> pop_many_until(size_t max, chrono::system_clock::time_point time_point) noexcept;
|
||||
|
||||
using pop_many_emplace_result = result<void, pop_error_code>;
|
||||
|
||||
template <typename Container>
|
||||
pop_many_emplace_result pop_many_emplace(Container& dst, size_t max) noexcept;
|
||||
template <typename Container>
|
||||
pop_many_emplace_result try_pop_many_emplace(Container& dst, size_t max) noexcept;
|
||||
template <typename Container>
|
||||
pop_many_emplace_result pop_many_emplace_for(Container& dst, size_t max, chrono::system_clock::duration duration) noexcept;
|
||||
template <typename Container>
|
||||
pop_many_emplace_result pop_many_emplace_until(Container& dst, size_t max, chrono::system_clock::time_point time_point) noexcept;
|
||||
|
||||
bool empty() const noexcept;
|
||||
bool full() const noexcept;
|
||||
size_t size() const noexcept;
|
||||
size_t max_size() const noexcept;
|
||||
size_t capacity() const noexcept;
|
||||
|
||||
private:
|
||||
push_result _push(T&& t, bool block, chrono::system_clock::time_point* time_point) noexcept;
|
||||
pop_result _pop(bool block, chrono::system_clock::time_point* time_point) noexcept;
|
||||
|
||||
template <typename Container>
|
||||
pop_many_result<Container> _pop_many_emplace(Container& dst, size_t max, bool block, chrono::system_clock::time_point* time_point) noexcept;
|
||||
|
||||
mutable std::mutex m_mutex;
|
||||
std::condition_variable m_cv;
|
||||
deque<T> m_queue;
|
||||
size_t m_max_size = 10;
|
||||
bool m_exit = false;
|
||||
bool m_finish = false;
|
||||
};
|
||||
|
||||
//////////////////////////////////////////////////////////////////////////
|
||||
//////////////////////////////////////////////////////////////////////////
|
||||
//////////////////////////////////////////////////////////////////////////
|
||||
//////////////////////////////////////////////////////////////////////////
|
||||
|
||||
template <class T>
|
||||
blocking_queue<T>::blocking_queue(size_t max_size) noexcept
|
||||
: m_max_size(max_size)
|
||||
{
|
||||
TL_ASSERT(m_max_size > 0);
|
||||
}
|
||||
|
||||
template <class T>
|
||||
blocking_queue<T>::~blocking_queue() noexcept
|
||||
{
|
||||
exit();
|
||||
}
|
||||
|
||||
template <class T>
|
||||
void blocking_queue<T>::exit() noexcept
|
||||
{
|
||||
{
|
||||
std::unique_lock<std::mutex> lg(m_mutex);
|
||||
m_exit = true;
|
||||
}
|
||||
m_cv.notify_all();
|
||||
}
|
||||
|
||||
template <class T>
|
||||
void blocking_queue<T>::finish() noexcept
|
||||
{
|
||||
{
|
||||
std::unique_lock<std::mutex> lg(m_mutex);
|
||||
m_finish = true;
|
||||
}
|
||||
m_cv.notify_all();
|
||||
}
|
||||
|
||||
template <class T>
|
||||
bool blocking_queue<T>::empty() const noexcept
|
||||
{
|
||||
std::unique_lock<std::mutex> lg(m_mutex);
|
||||
return m_queue.empty();
|
||||
}
|
||||
|
||||
template <class T>
|
||||
bool blocking_queue<T>::full() const noexcept
|
||||
{
|
||||
std::unique_lock<std::mutex> lg(m_mutex);
|
||||
return m_queue.size() == m_max_size;
|
||||
}
|
||||
|
||||
template <class T>
|
||||
size_t blocking_queue<T>::size() const noexcept
|
||||
{
|
||||
std::unique_lock<std::mutex> lg(m_mutex);
|
||||
return m_queue.size();
|
||||
}
|
||||
|
||||
template <class T>
|
||||
size_t blocking_queue<T>::max_size() const noexcept
|
||||
{
|
||||
return capacity();
|
||||
}
|
||||
|
||||
template <class T>
|
||||
size_t blocking_queue<T>::capacity() const noexcept
|
||||
{
|
||||
return m_max_size;
|
||||
}
|
||||
|
||||
template <class T>
|
||||
typename blocking_queue<T>::push_result blocking_queue<T>::push(T const& t) noexcept
|
||||
{
|
||||
return _push(T(t), true, nullptr);
|
||||
}
|
||||
|
||||
template <class T>
|
||||
typename blocking_queue<T>::push_result blocking_queue<T>::try_push(T const& t) noexcept
|
||||
{
|
||||
return _push(T(t), false, nullptr);
|
||||
}
|
||||
|
||||
template <class T>
|
||||
typename blocking_queue<T>::push_result blocking_queue<T>::push(T&& t) noexcept
|
||||
{
|
||||
return _push(std::move(t), true, nullptr);
|
||||
}
|
||||
|
||||
template <class T>
|
||||
typename blocking_queue<T>::push_result blocking_queue<T>::try_push(T&& t) noexcept
|
||||
{
|
||||
return _push(std::move(t), false, nullptr);
|
||||
}
|
||||
|
||||
template <class T>
|
||||
typename blocking_queue<T>::push_result blocking_queue<T>::push_for(T const& t, chrono::system_clock::duration duration) noexcept
|
||||
{
|
||||
chrono::system_clock::time_point until = chrono::system_clock::now() + duration;
|
||||
return _push(T(t), true, &until);
|
||||
}
|
||||
|
||||
template <class T>
|
||||
typename blocking_queue<T>::push_result blocking_queue<T>::push_until(T const& t, chrono::system_clock::time_point time_point) noexcept
|
||||
{
|
||||
return _push(T(t), true, &time_point);
|
||||
}
|
||||
|
||||
template <class T>
|
||||
typename blocking_queue<T>::push_result blocking_queue<T>::push_for(T&& t, chrono::system_clock::duration duration) noexcept
|
||||
{
|
||||
chrono::system_clock::time_point until = chrono::system_clock::now() + duration;
|
||||
return _push(std::move(t), true, &until);
|
||||
}
|
||||
|
||||
template <class T>
|
||||
typename blocking_queue<T>::push_result blocking_queue<T>::push_until(T&& t, chrono::system_clock::time_point time_point) noexcept
|
||||
{
|
||||
return _push(std::move(t), true, &time_point);
|
||||
}
|
||||
|
||||
template <typename T>
|
||||
typename blocking_queue<T>::pop_result blocking_queue<T>::pop() noexcept
|
||||
{
|
||||
return _pop(true, nullptr);
|
||||
}
|
||||
|
||||
template <typename T>
|
||||
typename blocking_queue<T>::pop_result blocking_queue<T>::try_pop() noexcept
|
||||
{
|
||||
return _pop(false, nullptr);
|
||||
}
|
||||
|
||||
template <typename T>
|
||||
typename blocking_queue<T>::pop_result blocking_queue<T>::pop_for(chrono::system_clock::duration duration) noexcept
|
||||
{
|
||||
chrono::system_clock::time_point until = chrono::system_clock::now() + duration;
|
||||
return _pop(true, &until);
|
||||
}
|
||||
|
||||
template <typename T>
|
||||
typename blocking_queue<T>::pop_result blocking_queue<T>::pop_until(chrono::system_clock::time_point time_point) noexcept
|
||||
{
|
||||
return _pop(true, &time_point);
|
||||
}
|
||||
|
||||
template <typename T>
|
||||
template <typename Container>
|
||||
result<Container, typename blocking_queue<T>::pop_error_code> blocking_queue<T>::pop_many(size_t max) noexcept
|
||||
{
|
||||
Container container;
|
||||
return _pop_many_emplace(container, max, true, nullptr);
|
||||
}
|
||||
|
||||
template <typename T>
|
||||
template <typename Container>
|
||||
result<Container, typename blocking_queue<T>::pop_error_code> blocking_queue<T>::try_pop_many(size_t max) noexcept
|
||||
{
|
||||
Container container;
|
||||
return _pop_many_emplace(container, max, false, nullptr);
|
||||
}
|
||||
|
||||
template <typename T>
|
||||
template <typename Container>
|
||||
result<Container, typename blocking_queue<T>::pop_error_code> blocking_queue<T>::pop_many_for(size_t max, chrono::system_clock::duration duration) noexcept
|
||||
{
|
||||
Container container;
|
||||
chrono::system_clock::time_point until = chrono::system_clock::now() + duration;
|
||||
return _pop_many_emplace(container, max, true, &until);
|
||||
}
|
||||
|
||||
template <typename T>
|
||||
template <typename Container>
|
||||
result<Container, typename blocking_queue<T>::pop_error_code> blocking_queue<T>::pop_many_until(size_t max, chrono::system_clock::time_point time_point) noexcept
|
||||
{
|
||||
Container container;
|
||||
return _pop_many_emplace(container, max, true, &time_point);
|
||||
}
|
||||
|
||||
template <typename T>
|
||||
template <typename Container>
|
||||
typename blocking_queue<T>::pop_many_emplace_result blocking_queue<T>::pop_many_emplace(Container& dst, size_t max) noexcept
|
||||
{
|
||||
pop_many_result<Container> result = _pop_many_emplace(dst, max, true, nullptr);
|
||||
return (result.has_value()) ? success() : pop_many_emplace_result(result.error().code());
|
||||
}
|
||||
|
||||
template <typename T>
|
||||
template <typename Container>
|
||||
typename blocking_queue<T>::pop_many_emplace_result blocking_queue<T>::try_pop_many_emplace(Container& dst, size_t max) noexcept
|
||||
{
|
||||
pop_many_result<Container> result = _pop_many_emplace(dst, max, false, nullptr);
|
||||
return (result.has_value()) ? success() : pop_many_emplace_result(result.error().code());
|
||||
}
|
||||
|
||||
template <typename T>
|
||||
template <typename Container>
|
||||
typename blocking_queue<T>::pop_many_emplace_result blocking_queue<T>::pop_many_emplace_for(Container& dst, size_t max, chrono::system_clock::duration duration) noexcept
|
||||
{
|
||||
chrono::system_clock::time_point until = chrono::system_clock::now() + duration;
|
||||
pop_many_result<Container> result = _pop_many_emplace(dst, max, true, &until);
|
||||
return (result.has_value()) ? success() : pop_many_emplace_result(result.error().code());
|
||||
}
|
||||
|
||||
template <typename T>
|
||||
template <typename Container>
|
||||
typename blocking_queue<T>::pop_many_emplace_result blocking_queue<T>::pop_many_emplace_until(Container& dst, size_t max, chrono::system_clock::time_point time_point) noexcept
|
||||
{
|
||||
pop_many_result<Container> result = _pop_many_emplace(dst, max, true, &time_point);
|
||||
return (result.has_value()) ? success() : pop_many_emplace_result(result.error().code());
|
||||
}
|
||||
|
||||
//////////////////////////////////////////////////////////////////////////
|
||||
|
||||
template <class T>
|
||||
typename blocking_queue<T>::push_result blocking_queue<T>::_push(T&& t, bool block, chrono::system_clock::time_point* time_point) noexcept
|
||||
{
|
||||
bool pushed_one = false;
|
||||
bool timed_out = false;
|
||||
|
||||
const auto system_tp = time_point ? std::chrono::system_clock::now() + std::chrono::nanoseconds(chrono::duration_cast<chrono::nanoseconds>(*time_point - chrono::system_clock::now()).count()) : std::chrono::system_clock::now();
|
||||
|
||||
//send the current datagram
|
||||
{
|
||||
std::unique_lock<std::mutex> lg(m_mutex);
|
||||
if (block)
|
||||
{
|
||||
while ((m_queue.size() >= m_max_size) & (!m_exit) & (!timed_out))
|
||||
{
|
||||
if (m_finish)
|
||||
return push_result(push_error_code::Finished);
|
||||
|
||||
if (time_point)
|
||||
{
|
||||
m_cv.wait_until(lg, system_tp);
|
||||
if (chrono::system_clock::now() >= *time_point)
|
||||
timed_out = true;
|
||||
}
|
||||
else
|
||||
m_cv.wait(lg);
|
||||
}
|
||||
|
||||
if (m_exit)
|
||||
return push_result(push_error_code::Exited);
|
||||
}
|
||||
|
||||
if (m_queue.size() < m_max_size)
|
||||
{
|
||||
pushed_one = true;
|
||||
m_queue.push_back(std::move(t));
|
||||
}
|
||||
else if (m_finish)
|
||||
return push_result(push_error_code::Finished);
|
||||
}
|
||||
|
||||
if (pushed_one)
|
||||
{
|
||||
m_cv.notify_all();
|
||||
return success();
|
||||
}
|
||||
return timed_out ? push_result(push_error_code::Timeout) : push_result(push_error_code::Full);
|
||||
}
|
||||
|
||||
//////////////////////////////////////////////////////////////////////////
|
||||
|
||||
template <typename T>
|
||||
template <typename Container>
|
||||
result<Container, typename blocking_queue<T>::pop_error_code> blocking_queue<T>::_pop_many_emplace(Container& dst, size_t max, bool block, chrono::system_clock::time_point* time_point) noexcept
|
||||
{
|
||||
const auto system_tp = time_point ? std::chrono::system_clock::now() + std::chrono::nanoseconds(chrono::duration_cast<chrono::nanoseconds>(*time_point - chrono::system_clock::now()).count()) : std::chrono::system_clock::now();
|
||||
|
||||
bool got_some = false;
|
||||
bool timed_out = false;
|
||||
{
|
||||
//wait for data
|
||||
std::unique_lock<std::mutex> lg(m_mutex);
|
||||
if (block)
|
||||
{
|
||||
while (m_queue.empty() & (!m_exit) & (!timed_out))
|
||||
{
|
||||
if (m_finish)
|
||||
return pop_many_result<Container>(pop_error_code::Finished);
|
||||
|
||||
if (time_point)
|
||||
{
|
||||
m_cv.wait_until(lg, system_tp);
|
||||
if (chrono::system_clock::now() >= *time_point)
|
||||
timed_out = true;
|
||||
}
|
||||
else
|
||||
m_cv.wait(lg);
|
||||
}
|
||||
|
||||
if (m_exit)
|
||||
return pop_many_result<Container>(pop_error_code::Exited);
|
||||
}
|
||||
|
||||
if (m_queue.empty() && m_finish)
|
||||
return pop_many_result<Container>(pop_error_code::Finished);
|
||||
|
||||
while (!m_queue.empty())
|
||||
{
|
||||
if (dst.size() >= max)
|
||||
break;
|
||||
|
||||
got_some = true;
|
||||
dst.push_back(std::move(m_queue.front()));
|
||||
m_queue.pop_front();
|
||||
}
|
||||
}
|
||||
if (got_some)
|
||||
{
|
||||
m_cv.notify_all();
|
||||
return pop_many_result<Container>(std::move(dst));
|
||||
}
|
||||
return timed_out ? pop_many_result<Container>(pop_error_code::Timeout) : pop_many_result<Container>(pop_error_code::Empty);
|
||||
}
|
||||
|
||||
//////////////////////////////////////////////////////////////////////////
|
||||
|
||||
template <typename T>
|
||||
typename blocking_queue<T>::pop_result blocking_queue<T>::_pop(bool block, chrono::system_clock::time_point* time_point) noexcept
|
||||
{
|
||||
const auto system_tp = time_point ? std::chrono::system_clock::now() + std::chrono::nanoseconds(chrono::duration_cast<chrono::nanoseconds>(*time_point - chrono::system_clock::now()).count()) : std::chrono::system_clock::now();
|
||||
|
||||
optional<T> opt_dst;
|
||||
bool timed_out = false;
|
||||
{
|
||||
//wait for data
|
||||
std::unique_lock<std::mutex> lg(m_mutex);
|
||||
if (block)
|
||||
{
|
||||
while (m_queue.empty() & (!m_exit) & (!timed_out))
|
||||
{
|
||||
if (m_finish)
|
||||
return pop_result(pop_error_code::Finished);
|
||||
|
||||
if (time_point)
|
||||
{
|
||||
m_cv.wait_until(lg, system_tp);
|
||||
if (chrono::system_clock::now() >= *time_point)
|
||||
timed_out = true;
|
||||
}
|
||||
else
|
||||
m_cv.wait(lg);
|
||||
}
|
||||
|
||||
if (m_exit)
|
||||
return pop_result(pop_error_code::Exited);
|
||||
}
|
||||
|
||||
if (!m_queue.empty())
|
||||
{
|
||||
opt_dst = std::move(m_queue.front());
|
||||
m_queue.pop_front();
|
||||
}
|
||||
else if (m_finish)
|
||||
return pop_result(pop_error_code::Finished);
|
||||
}
|
||||
|
||||
if (opt_dst.has_value())
|
||||
{
|
||||
m_cv.notify_all();
|
||||
return std::move(opt_dst.value());
|
||||
}
|
||||
return timed_out ? pop_result(pop_error_code::Timeout) : pop_result(pop_error_code::Empty);
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user