Files
TL/include/tl/compact_signal.h
T
jeanlemotan 8297b0b45f First
2024-07-02 18:06:33 +02:00

542 lines
16 KiB
C++

#pragma once
#include <type_traits> // std::enable_if, std::is_constructible, etc
#include <mutex>
#include "tl/unordered_map.h"
#include "tl/memory_buffer.h"
#include "tl/detail/internal_assert.h"
namespace tl
{
template<typename ThreadPolicy>
class signal_system
{
public:
class scoped_connection;
class connection
{
public:
connection() noexcept = default;
connection(const connection& other) noexcept = default;
connection(connection&& other) noexcept = default;
connection& operator=(const connection& other) noexcept = default;
connection& operator=(connection&& other) noexcept = default;
void disconnect() noexcept;
void lock() noexcept;
void unlock() noexcept;
[[nodiscard]] bool is_connected() const noexcept;
protected:
friend class signal_system;
friend class scoped_connection;
inline explicit connection(uint32_t id) noexcept;
uint32_t id = 0;
};
class [[nodiscard]] scoped_connection : public connection
{
public:
scoped_connection() noexcept = default;
scoped_connection(connection c) noexcept
: connection(c) {}
~scoped_connection() noexcept;
scoped_connection(const scoped_connection& other) = delete;
scoped_connection(scoped_connection&& other) noexcept
{
this->id = other.id;
other.id = 0;
}
scoped_connection& operator=(const scoped_connection& other) = delete;
scoped_connection& operator=(scoped_connection&& other) noexcept
{
if (this->id != 0)
this->disconnect();
this->id = other.id;
other.id = 0;
return *this;
}
void detach() noexcept
{
this->id = 0;
}
};
//static inline void create(intptr_t owner, uint8_t signal_count, size_t delegate_count_hint = 0);
template<typename... Args>
static connection add(intptr_t owner, uint8_t signal_index, tl::function<void(Args...)>&& delegate) noexcept;
template<typename... Args>
static void invoke(intptr_t owner, uint8_t signal_index, Args&&... args) noexcept;
static void disconnect(const connection& c) noexcept;
static void lock(const connection& c) noexcept;
static void unlock(const connection& c) noexcept;
static void remove_all() noexcept;
static void remove_all(intptr_t owner) noexcept;
static void remove_all(intptr_t owner, uint8_t signal_index) noexcept;
static size_t get_slot_count(intptr_t owner, uint8_t signal_index) noexcept;
private:
//static inline uint32_t compute_owner_hash(void* owner);
static inline constexpr size_t k_offset_bits = 25;
static inline constexpr size_t k_offset_mask = (1 << k_offset_bits) - 1;
static inline constexpr size_t k_delegate_size_bits = 7;
static inline constexpr size_t k_delegate_size_mask = (1 << k_delegate_size_bits) - 1;
static inline constexpr size_t k_signal_index_bits = 5;
static inline constexpr size_t k_signal_index_disconnected = (1 << k_signal_index_bits) - 1; //this is used to mark disconnected slots
struct OwnerData
{
union
{
struct
{
uint32_t offset : k_offset_bits; //div 1 << k_offset_shift
uint32_t initialized : 1;
};
uint32_t all = 0;
};
};
static_assert(sizeof(OwnerData) == 4);
struct SlotHeader
{
union
{
struct
{
uint32_t delegate_size : k_delegate_size_bits; //div 1 << k_offset_shift
uint32_t prev_offset : k_offset_bits; //div 1 << k_offset_shift
uint32_t next_offset : k_offset_bits; //div 1 << k_offset_shift
uint32_t signal_index : k_signal_index_bits; //a certain value is used to signal disconnection (k_signal_index_disconnected)
uint32_t locked : 1; //if locked, don't emit
uint32_t __unused : 1;
};
uint64_t all = 0; //8 bytes, to make sure alignment is fine
};
void (*destructor)(void*) = nullptr;
};
static_assert(sizeof(SlotHeader) == 16);
struct ConnectionData
{
uint32_t offset : k_offset_bits; //div 1 << k_offset_shift
};
static typename ThreadPolicy::mutex_t m_mutex;
static tl::unordered_map<intptr_t, OwnerData> m_owners;
static tl::unordered_map<uint32_t, ConnectionData> m_connections;
static typename ThreadPolicy::connection_id_counter_t m_last_connection_id;
static tl::memory_buffer m_slots;
static typename ThreadPolicy::data_ptr_t m_slots_data_ptr;
static size_t m_garbage;
};
//////////////////////////////////////////////////////////////////////////
template<typename ThreadPolicy>
signal_system<ThreadPolicy>::connection::connection(uint32_t id) noexcept
: id(id)
{
}
//////////////////////////////////////////////////////////////////////////
namespace detail
{
inline static constexpr size_t k_offset_shift = 3;
}
template<typename ThreadPolicy>
template<typename... Args>
auto signal_system<ThreadPolicy>::add(intptr_t owner, uint8_t signal_index, tl::function<void(Args...)>&& delegate) noexcept -> connection
{
std::lock_guard<typename ThreadPolicy::mutex_t> lg(m_mutex);
if constexpr ((sizeof(delegate) & ((1 << detail::k_offset_shift) - 1)) != 0)
TL_PLAIN_CRASH("delegate_size not property aligned");
if (signal_index >= k_signal_index_disconnected)
TL_PLAIN_CRASH("signal_index out of range");
using delegate_t = tl::function<void(Args...)>;
OwnerData& od = m_owners[owner];
const uint32_t delegate_size = static_cast<uint32_t>(sizeof(delegate));
if ((delegate_size >> detail::k_offset_shift) > k_delegate_size_mask)
TL_PLAIN_CRASH("delegate_size out of range");
TL_PLAIN_ASSERT((delegate_size & ((1 << detail::k_offset_shift) - 1)) == 0);
TL_PLAIN_ASSERT(delegate_size < 128 * (1 << detail::k_offset_shift));
const uint32_t offset = static_cast<uint32_t>(m_slots.size()) >> detail::k_offset_shift;
if (offset > k_offset_mask)
TL_PLAIN_CRASH("offset out of range");
//TODO: this is not safe!!!! while resizing, the atomic still points to the old deleted data before it's updated.
// Here I have to double buffer stuff, so that the atomic either points to old, but valid memory or to new memory. Never to old but deleted memory
// Effectively this is what should happen:
// auto new_mem = allocate(new_size);
// memcpy(new_mem, old_mem, old_size);
// m_slots_data_ptr = new_mem;
// delete old_mem;
m_slots.resize_uninitialized(m_slots.size() + (size_t)delegate_size + sizeof(SlotHeader));
m_slots_data_ptr = reinterpret_cast<uint64_t*>(m_slots.data());
uint64_t* data = m_slots_data_ptr;
auto& header = *reinterpret_cast<SlotHeader*>(data + offset);
header = {};
header.delegate_size = delegate_size >> detail::k_offset_shift;
header.signal_index = signal_index;
header.destructor = [](void* memory) { static_cast<delegate_t*>(memory)->~delegate_t(); };
if (od.initialized == 0) //store the index
{
od.initialized = 1;
od.offset = offset;
header.prev_offset = offset;
}
else //here, we already have one slot (could be marked as deleted though!)
{
SlotHeader& first_header = *reinterpret_cast<SlotHeader*>(data + od.offset);
const uint32_t last_offset = first_header.prev_offset;//first_header->prev_offset points to the last header
SlotHeader& last_header = *reinterpret_cast<SlotHeader*>(data + last_offset);
last_header.next_offset = offset; //link our header after the last one
header.prev_offset = last_offset;
first_header.prev_offset = offset; //this is used to point to the last header, always
}
//note - data is uint64_t, so any byte offset has to be divided by 8 (or shifted by k_offset_shift)
new(data + offset + (sizeof(SlotHeader) >> detail::k_offset_shift)) delegate_t(std::move(delegate));
const uint32_t connection_id = ++m_last_connection_id;
m_connections[connection_id] = { offset };
return connection(connection_id);
}
template<typename ThreadPolicy>
template<typename... Args>
void signal_system<ThreadPolicy>::invoke(intptr_t owner, uint8_t signal_index, Args&&... args) noexcept
{
//we're locked from here ---
std::unique_lock<typename ThreadPolicy::mutex_t> lg(m_mutex);
auto it = m_owners.find(owner);
if (it == m_owners.end())
return;
OwnerData od = it->second;
lg.unlock();
// --- to here
// UNLOCKED ACCESS FOLLOWING THIS
// The only things safe are local data (copies) and atomics
// Any member should be atomic and not cached across the invoke, as the user might connect/disconnect stuff and invalidate the slots
using delegate_t = tl::function<void(Args...)>;
size_t next_offset = od.offset;
const size_t last_offset = (reinterpret_cast<const SlotHeader*>(m_slots_data_ptr + od.offset))->prev_offset;
bool is_last;
do
{
const SlotHeader* header = reinterpret_cast<const SlotHeader*>(m_slots_data_ptr + next_offset);
next_offset = header->next_offset;
is_last = (next_offset == 0) | (next_offset > last_offset);
if ((header->signal_index == signal_index) & (header->locked == 0))
{
auto delegate = reinterpret_cast<delegate_t*>(const_cast<SlotHeader*>(header) + 1);
if (is_last) //last slot? we can move potentially
(*delegate)(std::forward<Args>(args)...);
else
(*delegate)(args...);
}
} while (!is_last);
}
template<typename ThreadPolicy>
typename ThreadPolicy::mutex_t signal_system<ThreadPolicy>::m_mutex;
template<typename ThreadPolicy>
tl::unordered_map<intptr_t, typename signal_system<ThreadPolicy>::OwnerData> signal_system<ThreadPolicy>::m_owners;
template<typename ThreadPolicy>
tl::unordered_map<uint32_t, typename signal_system<ThreadPolicy>::ConnectionData> signal_system<ThreadPolicy>::m_connections;
template<typename ThreadPolicy>
typename ThreadPolicy::connection_id_counter_t signal_system<ThreadPolicy>::m_last_connection_id = 0;
template<typename ThreadPolicy>
tl::memory_buffer signal_system<ThreadPolicy>::m_slots = tl::memory_buffer(1024);
template<typename ThreadPolicy>
typename ThreadPolicy::data_ptr_t signal_system<ThreadPolicy>::m_slots_data_ptr = nullptr;
template<typename ThreadPolicy>
size_t signal_system<ThreadPolicy>::m_garbage = 0;
template<typename ThreadPolicy>
void signal_system<ThreadPolicy>::connection::disconnect() noexcept
{
signal_system<ThreadPolicy>::disconnect(*this);
this->id = 0;
}
template<typename ThreadPolicy>
void signal_system<ThreadPolicy>::connection::lock() noexcept
{
signal_system<ThreadPolicy>::lock(*this);
}
template<typename ThreadPolicy>
void signal_system<ThreadPolicy>::connection::unlock() noexcept
{
signal_system<ThreadPolicy>::unlock(*this);
}
template<typename ThreadPolicy>
bool signal_system<ThreadPolicy>::connection::is_connected() const noexcept
{
return this->id != 0;
}
template<typename ThreadPolicy>
signal_system<ThreadPolicy>::scoped_connection::~scoped_connection() noexcept
{
if (this->id > 0)
this->disconnect();
}
template<typename ThreadPolicy>
void signal_system<ThreadPolicy>::disconnect(const connection& c) noexcept
{
std::lock_guard<typename ThreadPolicy::mutex_t> lg(m_mutex);
auto it = m_connections.find(c.id);
if (it == m_connections.end())
return;
ConnectionData cd = it->second;
m_connections.erase(it);
uint64_t* data = m_slots_data_ptr;
SlotHeader* header = reinterpret_cast<SlotHeader*>(data + cd.offset);
if (header->signal_index == k_signal_index_disconnected)
return; //already removed
header->destructor(header + 1);
header->signal_index = k_signal_index_disconnected; //mark it so that traversal will not call it
if (header->prev_offset > cd.offset)//first slot
{
//we cannot remove the head slot!!! The OwnerData points to it
}
else if (header->next_offset == 0) //last slot?
{
//we cannot remove the tail slot!!! The OwnerData points to it (using the prev_offset)
}
else //middle slot
{
auto* prev_header = reinterpret_cast<SlotHeader*>(data + header->prev_offset);
auto* next_header = reinterpret_cast<SlotHeader*>(data + header->next_offset);
prev_header->next_offset = header->next_offset;
next_header->prev_offset = header->prev_offset;
}
m_garbage += sizeof(SlotHeader) + (static_cast<size_t>(header->delegate_size) << detail::k_offset_shift);
}
template<typename ThreadPolicy>
void signal_system<ThreadPolicy>::lock(const connection& c) noexcept
{
std::lock_guard<typename ThreadPolicy::mutex_t> lg(m_mutex);
auto it = m_connections.find(c.id);
if (it == m_connections.end())
return;
ConnectionData cd = it->second;
uint64_t* data = m_slots_data_ptr;
SlotHeader* header = reinterpret_cast<SlotHeader*>(data + cd.offset);
TL_PLAIN_ASSERT(header->locked == 0);
header->locked = 1;
}
template<typename ThreadPolicy>
void signal_system<ThreadPolicy>::unlock(const connection& c) noexcept
{
std::lock_guard<typename ThreadPolicy::mutex_t> lg(m_mutex);
auto it = m_connections.find(c.id);
if (it == m_connections.end())
return;
ConnectionData cd = it->second;
uint64_t* data = m_slots_data_ptr;
SlotHeader* header = reinterpret_cast<SlotHeader*>(data + cd.offset);
TL_PLAIN_ASSERT(header->locked != 0);
header->locked = 0;
}
template<typename ThreadPolicy>
void signal_system<ThreadPolicy>::remove_all() noexcept
{
std::lock_guard<typename ThreadPolicy::mutex_t> lg(m_mutex);
uint64_t* data = m_slots_data_ptr;
for (auto& p : m_owners)
{
OwnerData& od = p.second;
if (od.initialized == 0)
continue;
size_t next_offset = od.offset;
do
{
SlotHeader* header = reinterpret_cast<SlotHeader*>(data + next_offset);
next_offset = header->next_offset;
if (header->signal_index != k_signal_index_disconnected)
{
header->destructor(header + 1);
header->signal_index = k_signal_index_disconnected;
m_garbage += sizeof(SlotHeader) + (static_cast<size_t>(header->delegate_size) << detail::k_offset_shift);
}
} while (next_offset != 0);
od.all = {};
}
}
template<typename ThreadPolicy>
void signal_system<ThreadPolicy>::remove_all(intptr_t owner) noexcept
{
std::lock_guard<typename ThreadPolicy::mutex_t> lg(m_mutex);
auto it = m_owners.find(owner);
if (it == m_owners.end())
return;
OwnerData& od = it->second;
uint64_t* data = m_slots_data_ptr;
if (od.initialized != 0)
{
size_t next_offset = od.offset;
do
{
SlotHeader* header = reinterpret_cast<SlotHeader*>(data + next_offset);
next_offset = header->next_offset;
if (header->signal_index != k_signal_index_disconnected)
{
header->destructor(header + 1);
header->signal_index = k_signal_index_disconnected;
m_garbage += sizeof(SlotHeader) + (static_cast<size_t>(header->delegate_size) << detail::k_offset_shift);
}
} while (next_offset != 0);
}
m_owners.erase(it);
}
template<typename ThreadPolicy>
void signal_system<ThreadPolicy>::remove_all(intptr_t owner, uint8_t signal_index) noexcept
{
std::lock_guard<typename ThreadPolicy::mutex_t> lg(m_mutex);
if (signal_index >= k_signal_index_disconnected)
{
TL_PLAIN_FAIL();
return;
}
auto it = m_owners.find(owner);
if (it == m_owners.end())
return;
OwnerData& od = it->second;
uint64_t* data = m_slots_data_ptr;
if (od.initialized == 0)
{
m_owners.erase(it);
return;
}
bool has_other_signals = false;
size_t next_offset = od.offset;
do
{
SlotHeader* header = reinterpret_cast<SlotHeader*>(data + next_offset);
next_offset = header->next_offset;
if (header->signal_index == signal_index)
{
header->destructor(header + 1);
header->signal_index = k_signal_index_disconnected;
m_garbage += sizeof(SlotHeader) + (static_cast<size_t>(header->delegate_size) << detail::k_offset_shift);
}
else
has_other_signals = true;
} while (next_offset != 0);
if (!has_other_signals)
m_owners.erase(it);
}
template<typename ThreadPolicy>
size_t signal_system<ThreadPolicy>::get_slot_count(intptr_t owner, uint8_t signal_index) noexcept
{
std::lock_guard<typename ThreadPolicy::mutex_t> lg(m_mutex);
if (signal_index >= k_signal_index_disconnected)
{
TL_PLAIN_FAIL();
return 0;
}
auto it = m_owners.find(owner);
if (it == m_owners.end())
return 0;
OwnerData& od = it->second;
uint64_t* data = m_slots_data_ptr;
if (od.initialized == 0)
{
m_owners.erase(it);
return 0;
}
size_t count = 0;
size_t next_offset = od.offset;
do
{
SlotHeader* header = reinterpret_cast<SlotHeader*>(data + next_offset);
next_offset = header->next_offset;
if (header->signal_index == signal_index)
count++;
} while (next_offset != 0);
return count;
}
}