Files
TL/include/tl/blocking_queue.h
T
jeanlemotan 8297b0b45f First
2024-07-02 18:06:33 +02:00

472 lines
14 KiB
C++

#pragma once
#include "tl/detail/prologue.h"
#include "tl/result.h"
#include <tl/deque.h>
#include <mutex>
#include <condition_variable>
#include <cassert>
namespace tl
{
//////////////////////////////////////////////////////////////////////////
template <typename T>
class blocking_queue
{
public:
explicit blocking_queue(size_t max_size) noexcept;
~blocking_queue() noexcept;
void exit() noexcept; //stop all processing, returns Exited
void finish() noexcept; //once empty or full, stops all processing, returns Finished
enum class push_error_code
{
Full,
Timeout,
Finished,
Exited,
};
using push_result = result<void, push_error_code>;
push_result push(T const& t) noexcept;
push_result try_push(T const& t) noexcept;
push_result push_for(T const& t, chrono::system_clock::duration duration) noexcept;
push_result push_until(T const& t, chrono::system_clock::time_point time_point) noexcept;
push_result push(T&& t) noexcept;
push_result try_push(T&& t) noexcept;
push_result push_for(T&& t, chrono::system_clock::duration duration) noexcept;
push_result push_until(T&& t, chrono::system_clock::time_point time_point) noexcept;
enum class pop_error_code
{
Empty,
Timeout,
Exited,
Finished,
};
using pop_result = result<T, pop_error_code>;
pop_result pop() noexcept;
pop_result try_pop() noexcept;
pop_result pop_for(chrono::system_clock::duration duration) noexcept;
pop_result pop_until(chrono::system_clock::time_point time_point) noexcept;
template <typename Container>
using pop_many_result = result<Container, pop_error_code>;
template <typename Container>
pop_many_result<Container> pop_many(size_t max) noexcept;
template <typename Container>
pop_many_result<Container> try_pop_many(size_t max) noexcept;
template <typename Container>
pop_many_result<Container> pop_many_for(size_t max, chrono::system_clock::duration duration) noexcept;
template <typename Container>
pop_many_result<Container> pop_many_until(size_t max, chrono::system_clock::time_point time_point) noexcept;
using pop_many_emplace_result = result<void, pop_error_code>;
template <typename Container>
pop_many_emplace_result pop_many_emplace(Container& dst, size_t max) noexcept;
template <typename Container>
pop_many_emplace_result try_pop_many_emplace(Container& dst, size_t max) noexcept;
template <typename Container>
pop_many_emplace_result pop_many_emplace_for(Container& dst, size_t max, chrono::system_clock::duration duration) noexcept;
template <typename Container>
pop_many_emplace_result pop_many_emplace_until(Container& dst, size_t max, chrono::system_clock::time_point time_point) noexcept;
bool empty() const noexcept;
bool full() const noexcept;
size_t size() const noexcept;
size_t max_size() const noexcept;
size_t capacity() const noexcept;
private:
push_result _push(T&& t, bool block, chrono::system_clock::time_point* time_point) noexcept;
pop_result _pop(bool block, chrono::system_clock::time_point* time_point) noexcept;
template <typename Container>
pop_many_result<Container> _pop_many_emplace(Container& dst, size_t max, bool block, chrono::system_clock::time_point* time_point) noexcept;
mutable std::mutex m_mutex;
std::condition_variable m_cv;
deque<T> m_queue;
size_t m_max_size = 10;
bool m_exit = false;
bool m_finish = false;
};
//////////////////////////////////////////////////////////////////////////
//////////////////////////////////////////////////////////////////////////
//////////////////////////////////////////////////////////////////////////
//////////////////////////////////////////////////////////////////////////
template <class T>
blocking_queue<T>::blocking_queue(size_t max_size) noexcept
: m_max_size(max_size)
{
TL_ASSERT(m_max_size > 0);
}
template <class T>
blocking_queue<T>::~blocking_queue() noexcept
{
exit();
}
template <class T>
void blocking_queue<T>::exit() noexcept
{
{
std::unique_lock<std::mutex> lg(m_mutex);
m_exit = true;
}
m_cv.notify_all();
}
template <class T>
void blocking_queue<T>::finish() noexcept
{
{
std::unique_lock<std::mutex> lg(m_mutex);
m_finish = true;
}
m_cv.notify_all();
}
template <class T>
bool blocking_queue<T>::empty() const noexcept
{
std::unique_lock<std::mutex> lg(m_mutex);
return m_queue.empty();
}
template <class T>
bool blocking_queue<T>::full() const noexcept
{
std::unique_lock<std::mutex> lg(m_mutex);
return m_queue.size() == m_max_size;
}
template <class T>
size_t blocking_queue<T>::size() const noexcept
{
std::unique_lock<std::mutex> lg(m_mutex);
return m_queue.size();
}
template <class T>
size_t blocking_queue<T>::max_size() const noexcept
{
return capacity();
}
template <class T>
size_t blocking_queue<T>::capacity() const noexcept
{
return m_max_size;
}
template <class T>
typename blocking_queue<T>::push_result blocking_queue<T>::push(T const& t) noexcept
{
return _push(T(t), true, nullptr);
}
template <class T>
typename blocking_queue<T>::push_result blocking_queue<T>::try_push(T const& t) noexcept
{
return _push(T(t), false, nullptr);
}
template <class T>
typename blocking_queue<T>::push_result blocking_queue<T>::push(T&& t) noexcept
{
return _push(std::move(t), true, nullptr);
}
template <class T>
typename blocking_queue<T>::push_result blocking_queue<T>::try_push(T&& t) noexcept
{
return _push(std::move(t), false, nullptr);
}
template <class T>
typename blocking_queue<T>::push_result blocking_queue<T>::push_for(T const& t, chrono::system_clock::duration duration) noexcept
{
chrono::system_clock::time_point until = chrono::system_clock::now() + duration;
return _push(T(t), true, &until);
}
template <class T>
typename blocking_queue<T>::push_result blocking_queue<T>::push_until(T const& t, chrono::system_clock::time_point time_point) noexcept
{
return _push(T(t), true, &time_point);
}
template <class T>
typename blocking_queue<T>::push_result blocking_queue<T>::push_for(T&& t, chrono::system_clock::duration duration) noexcept
{
chrono::system_clock::time_point until = chrono::system_clock::now() + duration;
return _push(std::move(t), true, &until);
}
template <class T>
typename blocking_queue<T>::push_result blocking_queue<T>::push_until(T&& t, chrono::system_clock::time_point time_point) noexcept
{
return _push(std::move(t), true, &time_point);
}
template <typename T>
typename blocking_queue<T>::pop_result blocking_queue<T>::pop() noexcept
{
return _pop(true, nullptr);
}
template <typename T>
typename blocking_queue<T>::pop_result blocking_queue<T>::try_pop() noexcept
{
return _pop(false, nullptr);
}
template <typename T>
typename blocking_queue<T>::pop_result blocking_queue<T>::pop_for(chrono::system_clock::duration duration) noexcept
{
chrono::system_clock::time_point until = chrono::system_clock::now() + duration;
return _pop(true, &until);
}
template <typename T>
typename blocking_queue<T>::pop_result blocking_queue<T>::pop_until(chrono::system_clock::time_point time_point) noexcept
{
return _pop(true, &time_point);
}
template <typename T>
template <typename Container>
result<Container, typename blocking_queue<T>::pop_error_code> blocking_queue<T>::pop_many(size_t max) noexcept
{
Container container;
return _pop_many_emplace(container, max, true, nullptr);
}
template <typename T>
template <typename Container>
result<Container, typename blocking_queue<T>::pop_error_code> blocking_queue<T>::try_pop_many(size_t max) noexcept
{
Container container;
return _pop_many_emplace(container, max, false, nullptr);
}
template <typename T>
template <typename Container>
result<Container, typename blocking_queue<T>::pop_error_code> blocking_queue<T>::pop_many_for(size_t max, chrono::system_clock::duration duration) noexcept
{
Container container;
chrono::system_clock::time_point until = chrono::system_clock::now() + duration;
return _pop_many_emplace(container, max, true, &until);
}
template <typename T>
template <typename Container>
result<Container, typename blocking_queue<T>::pop_error_code> blocking_queue<T>::pop_many_until(size_t max, chrono::system_clock::time_point time_point) noexcept
{
Container container;
return _pop_many_emplace(container, max, true, &time_point);
}
template <typename T>
template <typename Container>
typename blocking_queue<T>::pop_many_emplace_result blocking_queue<T>::pop_many_emplace(Container& dst, size_t max) noexcept
{
pop_many_result<Container> result = _pop_many_emplace(dst, max, true, nullptr);
return (result.has_value()) ? success() : pop_many_emplace_result(result.error().code());
}
template <typename T>
template <typename Container>
typename blocking_queue<T>::pop_many_emplace_result blocking_queue<T>::try_pop_many_emplace(Container& dst, size_t max) noexcept
{
pop_many_result<Container> result = _pop_many_emplace(dst, max, false, nullptr);
return (result.has_value()) ? success() : pop_many_emplace_result(result.error().code());
}
template <typename T>
template <typename Container>
typename blocking_queue<T>::pop_many_emplace_result blocking_queue<T>::pop_many_emplace_for(Container& dst, size_t max, chrono::system_clock::duration duration) noexcept
{
chrono::system_clock::time_point until = chrono::system_clock::now() + duration;
pop_many_result<Container> result = _pop_many_emplace(dst, max, true, &until);
return (result.has_value()) ? success() : pop_many_emplace_result(result.error().code());
}
template <typename T>
template <typename Container>
typename blocking_queue<T>::pop_many_emplace_result blocking_queue<T>::pop_many_emplace_until(Container& dst, size_t max, chrono::system_clock::time_point time_point) noexcept
{
pop_many_result<Container> result = _pop_many_emplace(dst, max, true, &time_point);
return (result.has_value()) ? success() : pop_many_emplace_result(result.error().code());
}
//////////////////////////////////////////////////////////////////////////
template <class T>
typename blocking_queue<T>::push_result blocking_queue<T>::_push(T&& t, bool block, chrono::system_clock::time_point* time_point) noexcept
{
bool pushed_one = false;
bool timed_out = false;
const auto system_tp = time_point ? std::chrono::system_clock::now() + std::chrono::nanoseconds(chrono::duration_cast<chrono::nanoseconds>(*time_point - chrono::system_clock::now()).count()) : std::chrono::system_clock::now();
//send the current datagram
{
std::unique_lock<std::mutex> lg(m_mutex);
if (block)
{
while ((m_queue.size() >= m_max_size) & (!m_exit) & (!timed_out))
{
if (m_finish)
return push_result(push_error_code::Finished);
if (time_point)
{
m_cv.wait_until(lg, system_tp);
if (chrono::system_clock::now() >= *time_point)
timed_out = true;
}
else
m_cv.wait(lg);
}
if (m_exit)
return push_result(push_error_code::Exited);
}
if (m_queue.size() < m_max_size)
{
pushed_one = true;
m_queue.push_back(std::move(t));
}
else if (m_finish)
return push_result(push_error_code::Finished);
}
if (pushed_one)
{
m_cv.notify_all();
return success();
}
return timed_out ? push_result(push_error_code::Timeout) : push_result(push_error_code::Full);
}
//////////////////////////////////////////////////////////////////////////
template <typename T>
template <typename Container>
result<Container, typename blocking_queue<T>::pop_error_code> blocking_queue<T>::_pop_many_emplace(Container& dst, size_t max, bool block, chrono::system_clock::time_point* time_point) noexcept
{
const auto system_tp = time_point ? std::chrono::system_clock::now() + std::chrono::nanoseconds(chrono::duration_cast<chrono::nanoseconds>(*time_point - chrono::system_clock::now()).count()) : std::chrono::system_clock::now();
bool got_some = false;
bool timed_out = false;
{
//wait for data
std::unique_lock<std::mutex> lg(m_mutex);
if (block)
{
while (m_queue.empty() & (!m_exit) & (!timed_out))
{
if (m_finish)
return pop_many_result<Container>(pop_error_code::Finished);
if (time_point)
{
m_cv.wait_until(lg, system_tp);
if (chrono::system_clock::now() >= *time_point)
timed_out = true;
}
else
m_cv.wait(lg);
}
if (m_exit)
return pop_many_result<Container>(pop_error_code::Exited);
}
if (m_queue.empty() && m_finish)
return pop_many_result<Container>(pop_error_code::Finished);
while (!m_queue.empty())
{
if (dst.size() >= max)
break;
got_some = true;
dst.push_back(std::move(m_queue.front()));
m_queue.pop_front();
}
}
if (got_some)
{
m_cv.notify_all();
return pop_many_result<Container>(std::move(dst));
}
return timed_out ? pop_many_result<Container>(pop_error_code::Timeout) : pop_many_result<Container>(pop_error_code::Empty);
}
//////////////////////////////////////////////////////////////////////////
template <typename T>
typename blocking_queue<T>::pop_result blocking_queue<T>::_pop(bool block, chrono::system_clock::time_point* time_point) noexcept
{
const auto system_tp = time_point ? std::chrono::system_clock::now() + std::chrono::nanoseconds(chrono::duration_cast<chrono::nanoseconds>(*time_point - chrono::system_clock::now()).count()) : std::chrono::system_clock::now();
optional<T> opt_dst;
bool timed_out = false;
{
//wait for data
std::unique_lock<std::mutex> lg(m_mutex);
if (block)
{
while (m_queue.empty() & (!m_exit) & (!timed_out))
{
if (m_finish)
return pop_result(pop_error_code::Finished);
if (time_point)
{
m_cv.wait_until(lg, system_tp);
if (chrono::system_clock::now() >= *time_point)
timed_out = true;
}
else
m_cv.wait(lg);
}
if (m_exit)
return pop_result(pop_error_code::Exited);
}
if (!m_queue.empty())
{
opt_dst = std::move(m_queue.front());
m_queue.pop_front();
}
else if (m_finish)
return pop_result(pop_error_code::Finished);
}
if (opt_dst.has_value())
{
m_cv.notify_all();
return std::move(opt_dst.value());
}
return timed_out ? pop_result(pop_error_code::Timeout) : pop_result(pop_error_code::Empty);
}
}