472 lines
14 KiB
C++
472 lines
14 KiB
C++
#pragma once
|
|
|
|
#include "tl/detail/prologue.h"
|
|
#include "tl/result.h"
|
|
#include <tl/deque.h>
|
|
|
|
#include <mutex>
|
|
#include <condition_variable>
|
|
#include <cassert>
|
|
|
|
namespace tl
|
|
{
|
|
//////////////////////////////////////////////////////////////////////////
|
|
|
|
template <typename T>
|
|
class blocking_queue
|
|
{
|
|
public:
|
|
explicit blocking_queue(size_t max_size) noexcept;
|
|
~blocking_queue() noexcept;
|
|
|
|
void exit() noexcept; //stop all processing, returns Exited
|
|
void finish() noexcept; //once empty or full, stops all processing, returns Finished
|
|
|
|
enum class push_error_code
|
|
{
|
|
Full,
|
|
Timeout,
|
|
Finished,
|
|
Exited,
|
|
};
|
|
|
|
using push_result = result<void, push_error_code>;
|
|
|
|
push_result push(T const& t) noexcept;
|
|
push_result try_push(T const& t) noexcept;
|
|
push_result push_for(T const& t, chrono::system_clock::duration duration) noexcept;
|
|
push_result push_until(T const& t, chrono::system_clock::time_point time_point) noexcept;
|
|
|
|
push_result push(T&& t) noexcept;
|
|
push_result try_push(T&& t) noexcept;
|
|
push_result push_for(T&& t, chrono::system_clock::duration duration) noexcept;
|
|
push_result push_until(T&& t, chrono::system_clock::time_point time_point) noexcept;
|
|
|
|
enum class pop_error_code
|
|
{
|
|
Empty,
|
|
Timeout,
|
|
Exited,
|
|
Finished,
|
|
};
|
|
|
|
using pop_result = result<T, pop_error_code>;
|
|
|
|
pop_result pop() noexcept;
|
|
pop_result try_pop() noexcept;
|
|
pop_result pop_for(chrono::system_clock::duration duration) noexcept;
|
|
pop_result pop_until(chrono::system_clock::time_point time_point) noexcept;
|
|
|
|
template <typename Container>
|
|
using pop_many_result = result<Container, pop_error_code>;
|
|
|
|
template <typename Container>
|
|
pop_many_result<Container> pop_many(size_t max) noexcept;
|
|
template <typename Container>
|
|
pop_many_result<Container> try_pop_many(size_t max) noexcept;
|
|
template <typename Container>
|
|
pop_many_result<Container> pop_many_for(size_t max, chrono::system_clock::duration duration) noexcept;
|
|
template <typename Container>
|
|
pop_many_result<Container> pop_many_until(size_t max, chrono::system_clock::time_point time_point) noexcept;
|
|
|
|
using pop_many_emplace_result = result<void, pop_error_code>;
|
|
|
|
template <typename Container>
|
|
pop_many_emplace_result pop_many_emplace(Container& dst, size_t max) noexcept;
|
|
template <typename Container>
|
|
pop_many_emplace_result try_pop_many_emplace(Container& dst, size_t max) noexcept;
|
|
template <typename Container>
|
|
pop_many_emplace_result pop_many_emplace_for(Container& dst, size_t max, chrono::system_clock::duration duration) noexcept;
|
|
template <typename Container>
|
|
pop_many_emplace_result pop_many_emplace_until(Container& dst, size_t max, chrono::system_clock::time_point time_point) noexcept;
|
|
|
|
bool empty() const noexcept;
|
|
bool full() const noexcept;
|
|
size_t size() const noexcept;
|
|
size_t max_size() const noexcept;
|
|
size_t capacity() const noexcept;
|
|
|
|
private:
|
|
push_result _push(T&& t, bool block, chrono::system_clock::time_point* time_point) noexcept;
|
|
pop_result _pop(bool block, chrono::system_clock::time_point* time_point) noexcept;
|
|
|
|
template <typename Container>
|
|
pop_many_result<Container> _pop_many_emplace(Container& dst, size_t max, bool block, chrono::system_clock::time_point* time_point) noexcept;
|
|
|
|
mutable std::mutex m_mutex;
|
|
std::condition_variable m_cv;
|
|
deque<T> m_queue;
|
|
size_t m_max_size = 10;
|
|
bool m_exit = false;
|
|
bool m_finish = false;
|
|
};
|
|
|
|
//////////////////////////////////////////////////////////////////////////
|
|
//////////////////////////////////////////////////////////////////////////
|
|
//////////////////////////////////////////////////////////////////////////
|
|
//////////////////////////////////////////////////////////////////////////
|
|
|
|
template <class T>
|
|
blocking_queue<T>::blocking_queue(size_t max_size) noexcept
|
|
: m_max_size(max_size)
|
|
{
|
|
TL_ASSERT(m_max_size > 0);
|
|
}
|
|
|
|
template <class T>
|
|
blocking_queue<T>::~blocking_queue() noexcept
|
|
{
|
|
exit();
|
|
}
|
|
|
|
template <class T>
|
|
void blocking_queue<T>::exit() noexcept
|
|
{
|
|
{
|
|
std::unique_lock<std::mutex> lg(m_mutex);
|
|
m_exit = true;
|
|
}
|
|
m_cv.notify_all();
|
|
}
|
|
|
|
template <class T>
|
|
void blocking_queue<T>::finish() noexcept
|
|
{
|
|
{
|
|
std::unique_lock<std::mutex> lg(m_mutex);
|
|
m_finish = true;
|
|
}
|
|
m_cv.notify_all();
|
|
}
|
|
|
|
template <class T>
|
|
bool blocking_queue<T>::empty() const noexcept
|
|
{
|
|
std::unique_lock<std::mutex> lg(m_mutex);
|
|
return m_queue.empty();
|
|
}
|
|
|
|
template <class T>
|
|
bool blocking_queue<T>::full() const noexcept
|
|
{
|
|
std::unique_lock<std::mutex> lg(m_mutex);
|
|
return m_queue.size() == m_max_size;
|
|
}
|
|
|
|
template <class T>
|
|
size_t blocking_queue<T>::size() const noexcept
|
|
{
|
|
std::unique_lock<std::mutex> lg(m_mutex);
|
|
return m_queue.size();
|
|
}
|
|
|
|
template <class T>
|
|
size_t blocking_queue<T>::max_size() const noexcept
|
|
{
|
|
return capacity();
|
|
}
|
|
|
|
template <class T>
|
|
size_t blocking_queue<T>::capacity() const noexcept
|
|
{
|
|
return m_max_size;
|
|
}
|
|
|
|
template <class T>
|
|
typename blocking_queue<T>::push_result blocking_queue<T>::push(T const& t) noexcept
|
|
{
|
|
return _push(T(t), true, nullptr);
|
|
}
|
|
|
|
template <class T>
|
|
typename blocking_queue<T>::push_result blocking_queue<T>::try_push(T const& t) noexcept
|
|
{
|
|
return _push(T(t), false, nullptr);
|
|
}
|
|
|
|
template <class T>
|
|
typename blocking_queue<T>::push_result blocking_queue<T>::push(T&& t) noexcept
|
|
{
|
|
return _push(std::move(t), true, nullptr);
|
|
}
|
|
|
|
template <class T>
|
|
typename blocking_queue<T>::push_result blocking_queue<T>::try_push(T&& t) noexcept
|
|
{
|
|
return _push(std::move(t), false, nullptr);
|
|
}
|
|
|
|
template <class T>
|
|
typename blocking_queue<T>::push_result blocking_queue<T>::push_for(T const& t, chrono::system_clock::duration duration) noexcept
|
|
{
|
|
chrono::system_clock::time_point until = chrono::system_clock::now() + duration;
|
|
return _push(T(t), true, &until);
|
|
}
|
|
|
|
template <class T>
|
|
typename blocking_queue<T>::push_result blocking_queue<T>::push_until(T const& t, chrono::system_clock::time_point time_point) noexcept
|
|
{
|
|
return _push(T(t), true, &time_point);
|
|
}
|
|
|
|
template <class T>
|
|
typename blocking_queue<T>::push_result blocking_queue<T>::push_for(T&& t, chrono::system_clock::duration duration) noexcept
|
|
{
|
|
chrono::system_clock::time_point until = chrono::system_clock::now() + duration;
|
|
return _push(std::move(t), true, &until);
|
|
}
|
|
|
|
template <class T>
|
|
typename blocking_queue<T>::push_result blocking_queue<T>::push_until(T&& t, chrono::system_clock::time_point time_point) noexcept
|
|
{
|
|
return _push(std::move(t), true, &time_point);
|
|
}
|
|
|
|
template <typename T>
|
|
typename blocking_queue<T>::pop_result blocking_queue<T>::pop() noexcept
|
|
{
|
|
return _pop(true, nullptr);
|
|
}
|
|
|
|
template <typename T>
|
|
typename blocking_queue<T>::pop_result blocking_queue<T>::try_pop() noexcept
|
|
{
|
|
return _pop(false, nullptr);
|
|
}
|
|
|
|
template <typename T>
|
|
typename blocking_queue<T>::pop_result blocking_queue<T>::pop_for(chrono::system_clock::duration duration) noexcept
|
|
{
|
|
chrono::system_clock::time_point until = chrono::system_clock::now() + duration;
|
|
return _pop(true, &until);
|
|
}
|
|
|
|
template <typename T>
|
|
typename blocking_queue<T>::pop_result blocking_queue<T>::pop_until(chrono::system_clock::time_point time_point) noexcept
|
|
{
|
|
return _pop(true, &time_point);
|
|
}
|
|
|
|
template <typename T>
|
|
template <typename Container>
|
|
result<Container, typename blocking_queue<T>::pop_error_code> blocking_queue<T>::pop_many(size_t max) noexcept
|
|
{
|
|
Container container;
|
|
return _pop_many_emplace(container, max, true, nullptr);
|
|
}
|
|
|
|
template <typename T>
|
|
template <typename Container>
|
|
result<Container, typename blocking_queue<T>::pop_error_code> blocking_queue<T>::try_pop_many(size_t max) noexcept
|
|
{
|
|
Container container;
|
|
return _pop_many_emplace(container, max, false, nullptr);
|
|
}
|
|
|
|
template <typename T>
|
|
template <typename Container>
|
|
result<Container, typename blocking_queue<T>::pop_error_code> blocking_queue<T>::pop_many_for(size_t max, chrono::system_clock::duration duration) noexcept
|
|
{
|
|
Container container;
|
|
chrono::system_clock::time_point until = chrono::system_clock::now() + duration;
|
|
return _pop_many_emplace(container, max, true, &until);
|
|
}
|
|
|
|
template <typename T>
|
|
template <typename Container>
|
|
result<Container, typename blocking_queue<T>::pop_error_code> blocking_queue<T>::pop_many_until(size_t max, chrono::system_clock::time_point time_point) noexcept
|
|
{
|
|
Container container;
|
|
return _pop_many_emplace(container, max, true, &time_point);
|
|
}
|
|
|
|
template <typename T>
|
|
template <typename Container>
|
|
typename blocking_queue<T>::pop_many_emplace_result blocking_queue<T>::pop_many_emplace(Container& dst, size_t max) noexcept
|
|
{
|
|
pop_many_result<Container> result = _pop_many_emplace(dst, max, true, nullptr);
|
|
return (result.has_value()) ? success() : pop_many_emplace_result(result.error().code());
|
|
}
|
|
|
|
template <typename T>
|
|
template <typename Container>
|
|
typename blocking_queue<T>::pop_many_emplace_result blocking_queue<T>::try_pop_many_emplace(Container& dst, size_t max) noexcept
|
|
{
|
|
pop_many_result<Container> result = _pop_many_emplace(dst, max, false, nullptr);
|
|
return (result.has_value()) ? success() : pop_many_emplace_result(result.error().code());
|
|
}
|
|
|
|
template <typename T>
|
|
template <typename Container>
|
|
typename blocking_queue<T>::pop_many_emplace_result blocking_queue<T>::pop_many_emplace_for(Container& dst, size_t max, chrono::system_clock::duration duration) noexcept
|
|
{
|
|
chrono::system_clock::time_point until = chrono::system_clock::now() + duration;
|
|
pop_many_result<Container> result = _pop_many_emplace(dst, max, true, &until);
|
|
return (result.has_value()) ? success() : pop_many_emplace_result(result.error().code());
|
|
}
|
|
|
|
template <typename T>
|
|
template <typename Container>
|
|
typename blocking_queue<T>::pop_many_emplace_result blocking_queue<T>::pop_many_emplace_until(Container& dst, size_t max, chrono::system_clock::time_point time_point) noexcept
|
|
{
|
|
pop_many_result<Container> result = _pop_many_emplace(dst, max, true, &time_point);
|
|
return (result.has_value()) ? success() : pop_many_emplace_result(result.error().code());
|
|
}
|
|
|
|
//////////////////////////////////////////////////////////////////////////
|
|
|
|
template <class T>
|
|
typename blocking_queue<T>::push_result blocking_queue<T>::_push(T&& t, bool block, chrono::system_clock::time_point* time_point) noexcept
|
|
{
|
|
bool pushed_one = false;
|
|
bool timed_out = false;
|
|
|
|
const auto system_tp = time_point ? std::chrono::system_clock::now() + std::chrono::nanoseconds(chrono::duration_cast<chrono::nanoseconds>(*time_point - chrono::system_clock::now()).count()) : std::chrono::system_clock::now();
|
|
|
|
//send the current datagram
|
|
{
|
|
std::unique_lock<std::mutex> lg(m_mutex);
|
|
if (block)
|
|
{
|
|
while ((m_queue.size() >= m_max_size) & (!m_exit) & (!timed_out))
|
|
{
|
|
if (m_finish)
|
|
return push_result(push_error_code::Finished);
|
|
|
|
if (time_point)
|
|
{
|
|
m_cv.wait_until(lg, system_tp);
|
|
if (chrono::system_clock::now() >= *time_point)
|
|
timed_out = true;
|
|
}
|
|
else
|
|
m_cv.wait(lg);
|
|
}
|
|
|
|
if (m_exit)
|
|
return push_result(push_error_code::Exited);
|
|
}
|
|
|
|
if (m_queue.size() < m_max_size)
|
|
{
|
|
pushed_one = true;
|
|
m_queue.push_back(std::move(t));
|
|
}
|
|
else if (m_finish)
|
|
return push_result(push_error_code::Finished);
|
|
}
|
|
|
|
if (pushed_one)
|
|
{
|
|
m_cv.notify_all();
|
|
return success();
|
|
}
|
|
return timed_out ? push_result(push_error_code::Timeout) : push_result(push_error_code::Full);
|
|
}
|
|
|
|
//////////////////////////////////////////////////////////////////////////
|
|
|
|
template <typename T>
|
|
template <typename Container>
|
|
result<Container, typename blocking_queue<T>::pop_error_code> blocking_queue<T>::_pop_many_emplace(Container& dst, size_t max, bool block, chrono::system_clock::time_point* time_point) noexcept
|
|
{
|
|
const auto system_tp = time_point ? std::chrono::system_clock::now() + std::chrono::nanoseconds(chrono::duration_cast<chrono::nanoseconds>(*time_point - chrono::system_clock::now()).count()) : std::chrono::system_clock::now();
|
|
|
|
bool got_some = false;
|
|
bool timed_out = false;
|
|
{
|
|
//wait for data
|
|
std::unique_lock<std::mutex> lg(m_mutex);
|
|
if (block)
|
|
{
|
|
while (m_queue.empty() & (!m_exit) & (!timed_out))
|
|
{
|
|
if (m_finish)
|
|
return pop_many_result<Container>(pop_error_code::Finished);
|
|
|
|
if (time_point)
|
|
{
|
|
m_cv.wait_until(lg, system_tp);
|
|
if (chrono::system_clock::now() >= *time_point)
|
|
timed_out = true;
|
|
}
|
|
else
|
|
m_cv.wait(lg);
|
|
}
|
|
|
|
if (m_exit)
|
|
return pop_many_result<Container>(pop_error_code::Exited);
|
|
}
|
|
|
|
if (m_queue.empty() && m_finish)
|
|
return pop_many_result<Container>(pop_error_code::Finished);
|
|
|
|
while (!m_queue.empty())
|
|
{
|
|
if (dst.size() >= max)
|
|
break;
|
|
|
|
got_some = true;
|
|
dst.push_back(std::move(m_queue.front()));
|
|
m_queue.pop_front();
|
|
}
|
|
}
|
|
if (got_some)
|
|
{
|
|
m_cv.notify_all();
|
|
return pop_many_result<Container>(std::move(dst));
|
|
}
|
|
return timed_out ? pop_many_result<Container>(pop_error_code::Timeout) : pop_many_result<Container>(pop_error_code::Empty);
|
|
}
|
|
|
|
//////////////////////////////////////////////////////////////////////////
|
|
|
|
template <typename T>
|
|
typename blocking_queue<T>::pop_result blocking_queue<T>::_pop(bool block, chrono::system_clock::time_point* time_point) noexcept
|
|
{
|
|
const auto system_tp = time_point ? std::chrono::system_clock::now() + std::chrono::nanoseconds(chrono::duration_cast<chrono::nanoseconds>(*time_point - chrono::system_clock::now()).count()) : std::chrono::system_clock::now();
|
|
|
|
optional<T> opt_dst;
|
|
bool timed_out = false;
|
|
{
|
|
//wait for data
|
|
std::unique_lock<std::mutex> lg(m_mutex);
|
|
if (block)
|
|
{
|
|
while (m_queue.empty() & (!m_exit) & (!timed_out))
|
|
{
|
|
if (m_finish)
|
|
return pop_result(pop_error_code::Finished);
|
|
|
|
if (time_point)
|
|
{
|
|
m_cv.wait_until(lg, system_tp);
|
|
if (chrono::system_clock::now() >= *time_point)
|
|
timed_out = true;
|
|
}
|
|
else
|
|
m_cv.wait(lg);
|
|
}
|
|
|
|
if (m_exit)
|
|
return pop_result(pop_error_code::Exited);
|
|
}
|
|
|
|
if (!m_queue.empty())
|
|
{
|
|
opt_dst = std::move(m_queue.front());
|
|
m_queue.pop_front();
|
|
}
|
|
else if (m_finish)
|
|
return pop_result(pop_error_code::Finished);
|
|
}
|
|
|
|
if (opt_dst.has_value())
|
|
{
|
|
m_cv.notify_all();
|
|
return std::move(opt_dst.value());
|
|
}
|
|
return timed_out ? pop_result(pop_error_code::Timeout) : pop_result(pop_error_code::Empty);
|
|
}
|
|
}
|