#pragma once #include "tl/detail/prologue.h" #include "tl/result.h" #include #include #include #include namespace tl { ////////////////////////////////////////////////////////////////////////// template class blocking_queue { public: explicit blocking_queue(size_t max_size) noexcept; ~blocking_queue() noexcept; void exit() noexcept; //stop all processing, returns Exited void finish() noexcept; //once empty or full, stops all processing, returns Finished enum class push_error_code { Full, Timeout, Finished, Exited, }; using push_result = result; push_result push(T const& t) noexcept; push_result try_push(T const& t) noexcept; push_result push_for(T const& t, chrono::system_clock::duration duration) noexcept; push_result push_until(T const& t, chrono::system_clock::time_point time_point) noexcept; push_result push(T&& t) noexcept; push_result try_push(T&& t) noexcept; push_result push_for(T&& t, chrono::system_clock::duration duration) noexcept; push_result push_until(T&& t, chrono::system_clock::time_point time_point) noexcept; enum class pop_error_code { Empty, Timeout, Exited, Finished, }; using pop_result = result; pop_result pop() noexcept; pop_result try_pop() noexcept; pop_result pop_for(chrono::system_clock::duration duration) noexcept; pop_result pop_until(chrono::system_clock::time_point time_point) noexcept; template using pop_many_result = result; template pop_many_result pop_many(size_t max) noexcept; template pop_many_result try_pop_many(size_t max) noexcept; template pop_many_result pop_many_for(size_t max, chrono::system_clock::duration duration) noexcept; template pop_many_result pop_many_until(size_t max, chrono::system_clock::time_point time_point) noexcept; using pop_many_emplace_result = result; template pop_many_emplace_result pop_many_emplace(Container& dst, size_t max) noexcept; template pop_many_emplace_result try_pop_many_emplace(Container& dst, size_t max) noexcept; template pop_many_emplace_result pop_many_emplace_for(Container& dst, size_t max, chrono::system_clock::duration duration) noexcept; template pop_many_emplace_result pop_many_emplace_until(Container& dst, size_t max, chrono::system_clock::time_point time_point) noexcept; bool empty() const noexcept; bool full() const noexcept; size_t size() const noexcept; size_t max_size() const noexcept; size_t capacity() const noexcept; private: push_result _push(T&& t, bool block, chrono::system_clock::time_point* time_point) noexcept; pop_result _pop(bool block, chrono::system_clock::time_point* time_point) noexcept; template pop_many_result _pop_many_emplace(Container& dst, size_t max, bool block, chrono::system_clock::time_point* time_point) noexcept; mutable std::mutex m_mutex; std::condition_variable m_cv; deque m_queue; size_t m_max_size = 10; bool m_exit = false; bool m_finish = false; }; ////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////// template blocking_queue::blocking_queue(size_t max_size) noexcept : m_max_size(max_size) { TL_ASSERT(m_max_size > 0); } template blocking_queue::~blocking_queue() noexcept { exit(); } template void blocking_queue::exit() noexcept { { std::unique_lock lg(m_mutex); m_exit = true; } m_cv.notify_all(); } template void blocking_queue::finish() noexcept { { std::unique_lock lg(m_mutex); m_finish = true; } m_cv.notify_all(); } template bool blocking_queue::empty() const noexcept { std::unique_lock lg(m_mutex); return m_queue.empty(); } template bool blocking_queue::full() const noexcept { std::unique_lock lg(m_mutex); return m_queue.size() == m_max_size; } template size_t blocking_queue::size() const noexcept { std::unique_lock lg(m_mutex); return m_queue.size(); } template size_t blocking_queue::max_size() const noexcept { return capacity(); } template size_t blocking_queue::capacity() const noexcept { return m_max_size; } template typename blocking_queue::push_result blocking_queue::push(T const& t) noexcept { return _push(T(t), true, nullptr); } template typename blocking_queue::push_result blocking_queue::try_push(T const& t) noexcept { return _push(T(t), false, nullptr); } template typename blocking_queue::push_result blocking_queue::push(T&& t) noexcept { return _push(std::move(t), true, nullptr); } template typename blocking_queue::push_result blocking_queue::try_push(T&& t) noexcept { return _push(std::move(t), false, nullptr); } template typename blocking_queue::push_result blocking_queue::push_for(T const& t, chrono::system_clock::duration duration) noexcept { chrono::system_clock::time_point until = chrono::system_clock::now() + duration; return _push(T(t), true, &until); } template typename blocking_queue::push_result blocking_queue::push_until(T const& t, chrono::system_clock::time_point time_point) noexcept { return _push(T(t), true, &time_point); } template typename blocking_queue::push_result blocking_queue::push_for(T&& t, chrono::system_clock::duration duration) noexcept { chrono::system_clock::time_point until = chrono::system_clock::now() + duration; return _push(std::move(t), true, &until); } template typename blocking_queue::push_result blocking_queue::push_until(T&& t, chrono::system_clock::time_point time_point) noexcept { return _push(std::move(t), true, &time_point); } template typename blocking_queue::pop_result blocking_queue::pop() noexcept { return _pop(true, nullptr); } template typename blocking_queue::pop_result blocking_queue::try_pop() noexcept { return _pop(false, nullptr); } template typename blocking_queue::pop_result blocking_queue::pop_for(chrono::system_clock::duration duration) noexcept { chrono::system_clock::time_point until = chrono::system_clock::now() + duration; return _pop(true, &until); } template typename blocking_queue::pop_result blocking_queue::pop_until(chrono::system_clock::time_point time_point) noexcept { return _pop(true, &time_point); } template template result::pop_error_code> blocking_queue::pop_many(size_t max) noexcept { Container container; return _pop_many_emplace(container, max, true, nullptr); } template template result::pop_error_code> blocking_queue::try_pop_many(size_t max) noexcept { Container container; return _pop_many_emplace(container, max, false, nullptr); } template template result::pop_error_code> blocking_queue::pop_many_for(size_t max, chrono::system_clock::duration duration) noexcept { Container container; chrono::system_clock::time_point until = chrono::system_clock::now() + duration; return _pop_many_emplace(container, max, true, &until); } template template result::pop_error_code> blocking_queue::pop_many_until(size_t max, chrono::system_clock::time_point time_point) noexcept { Container container; return _pop_many_emplace(container, max, true, &time_point); } template template typename blocking_queue::pop_many_emplace_result blocking_queue::pop_many_emplace(Container& dst, size_t max) noexcept { pop_many_result result = _pop_many_emplace(dst, max, true, nullptr); return (result.has_value()) ? success() : pop_many_emplace_result(result.error().code()); } template template typename blocking_queue::pop_many_emplace_result blocking_queue::try_pop_many_emplace(Container& dst, size_t max) noexcept { pop_many_result result = _pop_many_emplace(dst, max, false, nullptr); return (result.has_value()) ? success() : pop_many_emplace_result(result.error().code()); } template template typename blocking_queue::pop_many_emplace_result blocking_queue::pop_many_emplace_for(Container& dst, size_t max, chrono::system_clock::duration duration) noexcept { chrono::system_clock::time_point until = chrono::system_clock::now() + duration; pop_many_result result = _pop_many_emplace(dst, max, true, &until); return (result.has_value()) ? success() : pop_many_emplace_result(result.error().code()); } template template typename blocking_queue::pop_many_emplace_result blocking_queue::pop_many_emplace_until(Container& dst, size_t max, chrono::system_clock::time_point time_point) noexcept { pop_many_result result = _pop_many_emplace(dst, max, true, &time_point); return (result.has_value()) ? success() : pop_many_emplace_result(result.error().code()); } ////////////////////////////////////////////////////////////////////////// template typename blocking_queue::push_result blocking_queue::_push(T&& t, bool block, chrono::system_clock::time_point* time_point) noexcept { bool pushed_one = false; bool timed_out = false; const auto system_tp = time_point ? std::chrono::system_clock::now() + std::chrono::nanoseconds(chrono::duration_cast(*time_point - chrono::system_clock::now()).count()) : std::chrono::system_clock::now(); //send the current datagram { std::unique_lock lg(m_mutex); if (block) { while ((m_queue.size() >= m_max_size) & (!m_exit) & (!timed_out)) { if (m_finish) return push_result(push_error_code::Finished); if (time_point) { m_cv.wait_until(lg, system_tp); if (chrono::system_clock::now() >= *time_point) timed_out = true; } else m_cv.wait(lg); } if (m_exit) return push_result(push_error_code::Exited); } if (m_queue.size() < m_max_size) { pushed_one = true; m_queue.push_back(std::move(t)); } else if (m_finish) return push_result(push_error_code::Finished); } if (pushed_one) { m_cv.notify_all(); return success(); } return timed_out ? push_result(push_error_code::Timeout) : push_result(push_error_code::Full); } ////////////////////////////////////////////////////////////////////////// template template result::pop_error_code> blocking_queue::_pop_many_emplace(Container& dst, size_t max, bool block, chrono::system_clock::time_point* time_point) noexcept { const auto system_tp = time_point ? std::chrono::system_clock::now() + std::chrono::nanoseconds(chrono::duration_cast(*time_point - chrono::system_clock::now()).count()) : std::chrono::system_clock::now(); bool got_some = false; bool timed_out = false; { //wait for data std::unique_lock lg(m_mutex); if (block) { while (m_queue.empty() & (!m_exit) & (!timed_out)) { if (m_finish) return pop_many_result(pop_error_code::Finished); if (time_point) { m_cv.wait_until(lg, system_tp); if (chrono::system_clock::now() >= *time_point) timed_out = true; } else m_cv.wait(lg); } if (m_exit) return pop_many_result(pop_error_code::Exited); } if (m_queue.empty() && m_finish) return pop_many_result(pop_error_code::Finished); while (!m_queue.empty()) { if (dst.size() >= max) break; got_some = true; dst.push_back(std::move(m_queue.front())); m_queue.pop_front(); } } if (got_some) { m_cv.notify_all(); return pop_many_result(std::move(dst)); } return timed_out ? pop_many_result(pop_error_code::Timeout) : pop_many_result(pop_error_code::Empty); } ////////////////////////////////////////////////////////////////////////// template typename blocking_queue::pop_result blocking_queue::_pop(bool block, chrono::system_clock::time_point* time_point) noexcept { const auto system_tp = time_point ? std::chrono::system_clock::now() + std::chrono::nanoseconds(chrono::duration_cast(*time_point - chrono::system_clock::now()).count()) : std::chrono::system_clock::now(); optional opt_dst; bool timed_out = false; { //wait for data std::unique_lock lg(m_mutex); if (block) { while (m_queue.empty() & (!m_exit) & (!timed_out)) { if (m_finish) return pop_result(pop_error_code::Finished); if (time_point) { m_cv.wait_until(lg, system_tp); if (chrono::system_clock::now() >= *time_point) timed_out = true; } else m_cv.wait(lg); } if (m_exit) return pop_result(pop_error_code::Exited); } if (!m_queue.empty()) { opt_dst = std::move(m_queue.front()); m_queue.pop_front(); } else if (m_finish) return pop_result(pop_error_code::Finished); } if (opt_dst.has_value()) { m_cv.notify_all(); return std::move(opt_dst.value()); } return timed_out ? pop_result(pop_error_code::Timeout) : pop_result(pop_error_code::Empty); } }