ksys: Start adding MessageDispatcher

This commit is contained in:
Léo Lam 2021-01-31 16:30:11 +01:00
parent 27f8a7bdda
commit 36bed57207
No known key found for this signature in database
GPG Key ID: 0DF30F9081000741
9 changed files with 325 additions and 38 deletions

View File

@ -92621,16 +92621,16 @@
0x00000071011f4394,uking::MethodTreeMgr::rtti1,288,
0x00000071011f44b4,uking::MethodTreeMgr::rtti2,92,
0x00000071011f4510,sub_71011F4510,140,
0x00000071011f459c,GlobalMessage::sb::ctor,424,
0x00000071011f4744,sub_71011F4744,268,
0x00000071011f4850,sub_71011F4850,52,
0x00000071011f4884,sub_71011F4884,20,
0x00000071011f4898,sub_71011F4898,440,
0x00000071011f4a50,nullsub_4667,4,
0x00000071011f459c,GlobalMessage::sb::ctor,424,_ZN4ksys17MessageDispatcher6QueuesC1EPNS_16MessageProcessor6LoggerE
0x00000071011f4744,sub_71011F4744,268,_ZN4ksys17MessageDispatcher6QueuesD1Ev
0x00000071011f4850,sub_71011F4850,52,_ZN4ksys17MessageDispatcher9MainQueue5clearEv
0x00000071011f4884,sub_71011F4884,20,_ZN4ksys17MessageDispatcher9MainQueueD1Ev
0x00000071011f4898,sub_71011F4898,440,_ZN4ksys17MessageDispatcher19DoubleBufferedQueueD1Ev
0x00000071011f4a50,nullsub_4667,4,_ZN4ksys17MessageDispatcher6Queues11DummyLoggerD1Ev
0x00000071011f4a54,nullsub_4668,4,
0x00000071011f4a58,sub_71011F4A58,72,
0x00000071011f4aa0,sub_71011F4AA0,112,
0x00000071011f4b10,nullsub_4669,4,
0x00000071011f4a58,sub_71011F4A58,72,_ZN4ksys17MessageDispatcher9MainQueue10addMessageERKNS_7MessageE
0x00000071011f4aa0,sub_71011F4AA0,112,_ZN4ksys17MessageDispatcher9MainQueue12processQueueERNS_16MessageProcessorE
0x00000071011f4b10,nullsub_4669,4,_ZN4ksys17MessageDispatcher6Queues11DummyLogger3logERKNS_7MessageEb
0x00000071011f4b14,nullsub_4670,4,
0x00000071011f4b18,j__ZdlPv_1220,4,
0x00000071011f4b1c,nullsub_4671,4,
@ -92650,14 +92650,14 @@
0x00000071011f5668,GlobalMessage::x_4,248,
0x00000071011f5760,GlobalMessage::rtti1,204,
0x00000071011f582c,GlobalMessage::rtti2,92,
0x00000071011f5888,j__ZdlPv_1221,4,
0x00000071011f5888,j__ZdlPv_1221,4,_ZN4ksys17MessageDispatcher6Queues11DummyLoggerD0Ev
0x00000071011f588c,sub_71011F588C,140,
0x00000071011f5918,sub_71011F5918,240,
0x00000071011f5a08,sub_71011F5A08,240,
0x00000071011f5af8,sub_71011F5AF8,460,
0x00000071011f5cc4,sub_71011F5CC4,228,
0x00000071011f5da8,sub_71011F5DA8,132,
0x00000071011f5e2c,sub_71011F5E2C,52,
0x00000071011f5918,sub_71011F5918,240,_ZN4ksys17MessageDispatcher5QueueD1Ev
0x00000071011f5a08,sub_71011F5A08,240,_ZN4ksys17MessageDispatcher5QueueD0Ev
0x00000071011f5af8,sub_71011F5AF8,460,_ZN4ksys17MessageDispatcher5Queue10addMessageERKNS_7MessageE
0x00000071011f5cc4,sub_71011F5CC4,228,_ZN4ksys17MessageDispatcher5Queue12processQueueERNS_16MessageProcessorE
0x00000071011f5da8,sub_71011F5DA8,132,_ZN4ksys17MessageDispatcher5Queue5clearEv
0x00000071011f5e2c,sub_71011F5E2C,52,_ZN4ksys17MessageDispatcher9MainQueueD0Ev
0x00000071011f5e60,j__ZdlPv_1222,4,
0x00000071011f5e64,sub_71011F5E64,116,
0x00000071011f5ed8,j__ZdlPv_1223,4,

Can't render this file because it is too large.

View File

@ -11,6 +11,8 @@ target_sources(uking PRIVATE
Thread/Message.h
Thread/MessageAck.cpp
Thread/MessageAck.h
Thread/MessageDispatcher.cpp
Thread/MessageDispatcher.h
Thread/MessageDispatcherBase.cpp
Thread/MessageDispatcherBase.h
Thread/MessageProcessor.cpp
@ -64,4 +66,5 @@ target_sources(uking PRIVATE
StrTreeMap.h
TypeTraits.h
Types.h
UniqueArrayPtr.h
)

View File

@ -1,7 +1,7 @@
#pragma once
#include <basis/seadTypes.h>
#include <tuple>
#include "KingSystem/Utils/Thread/MessageTransceiverId.h"
namespace ksys {
@ -37,6 +37,7 @@ public:
};
Message();
Message(const Message& message) { *this = message; }
Message(const MesTransceiverId& source, const MesTransceiverId& destination,
const MessageType& type, void* user_data, const DelayParams& delay_params, bool ack);
Message(const MesTransceiverId& source, const MessageType& type, void* user_data,
@ -44,6 +45,17 @@ public:
virtual ~Message();
Message& operator=(const Message& other) {
mSource = other.getSource();
mDestination = other.getDestination();
mType = other.getType();
mUserData = other.getUserData();
_48 = other.getField48();
mDelayParams = other.mDelayParams;
mShouldAck = other.shouldAck();
return *this;
}
virtual const MesTransceiverId& getSource() const;
virtual const MesTransceiverId& getDestination() const;
virtual const MessageType& getType() const;
@ -61,6 +73,35 @@ public:
--mDelayParams.delay_ticks;
}
void reset() {
mType = {};
mUserData = {};
_48 = 0xffffffff;
mDelayParams = {};
mShouldAck = {};
mSource.reset();
mDestination.reset();
}
void resetIfValid() {
if (isValid())
reset();
}
bool isValid() const { return checkTransceiver(mDestination); }
static bool checkTransceiver(const MesTransceiverId& id) {
if (!id.next)
return false;
MesTransceiverId* next = *id.next;
if (!next)
return false;
const auto& fields = [](const MesTransceiverId& i) { return std::tie(i.queue_id, i.id); };
return fields(id) == fields(*next);
}
private:
MesTransceiverId mSource{};
MesTransceiverId mDestination{};

View File

@ -0,0 +1,114 @@
#include "KingSystem/Utils/Thread/MessageDispatcher.h"
#include "KingSystem/Utils/Thread/Message.h"
namespace ksys {
SEAD_SINGLETON_DISPOSER_IMPL(MessageDispatcher)
MessageDispatcher::Queue::Queue() = default;
MessageDispatcher::Queue::~Queue() {
Queue::clear();
}
Message* MessageDispatcher::Queue::findUnusedEntry() const {
for (Message& entry : mMessages) {
if (!entry.isValid())
return &entry;
}
return nullptr;
}
bool MessageDispatcher::Queue::addMessage(const Message& message) {
if (!Message::checkTransceiver(message.getSource()))
return false;
if (!Message::checkTransceiver(message.getDestination()))
return false;
auto* entry = findUnusedEntry();
if (!entry)
return false;
*entry = message;
return true;
}
void MessageDispatcher::Queue::processQueue(MessageProcessor& processor) {
for (auto& message : mMessages) {
if (!message.isValid())
break;
if (processor.process(&message))
message.resetIfValid();
}
}
void MessageDispatcher::Queue::clear() {
for (auto it = mMessages.begin(); it != mMessages.end(); ++it)
it->resetIfValid();
}
MessageDispatcher::DoubleBufferedQueue::DoubleBufferedQueue() = default;
MessageDispatcher::DoubleBufferedQueue::~DoubleBufferedQueue() = default;
bool MessageDispatcher::DoubleBufferedQueue::addMessage(const Message& message) {
return mBuffer[mActiveIdx ^ 1].addMessage(message);
}
void MessageDispatcher::DoubleBufferedQueue::clear() {
mBuffer[0].clear();
mBuffer[1].clear();
}
void MessageDispatcher::DoubleBufferedQueue::processQueue(MessageProcessor& processor) {
mActiveIdx ^= 1;
mBuffer[mActiveIdx].processQueue(processor);
}
MessageDispatcher::Queues::DummyLogger::~DummyLogger() = default;
MessageDispatcher::MainQueue::MainQueue() = default;
MessageDispatcher::MainQueue::~MainQueue() = default;
bool MessageDispatcher::MainQueue::addMessage(const Message& message) {
const bool ret = mQueue.addMessage(message);
if (ret)
mHasMessageToProcess = true;
return ret;
}
void MessageDispatcher::MainQueue::clear() {
mQueue.clear();
}
void MessageDispatcher::MainQueue::processQueue(MessageProcessor& processor) {
for (u32 i = 0; mHasMessageToProcess && i < 1000; ++i) {
mHasMessageToProcess = false;
mQueue.processQueue(processor);
}
}
MessageDispatcher::Queues::TransceiverIdBuffer::TransceiverIdBuffer() {
for (auto it = mBuffer.begin(); it != mBuffer.end(); ++it)
*it = nullptr;
}
MessageDispatcher::Queues::TransceiverIdBuffer::~TransceiverIdBuffer() {
for (auto it = mBuffer.begin(); it != mBuffer.end(); ++it) {
if (auto* id = *it; id && Message::checkTransceiver(*id))
id->reset();
}
}
MessageDispatcher::Queues::Queues(MessageProcessor::Logger* logger)
: mProcessor(logger == nullptr ? &mDummyLogger : logger) {}
MessageDispatcher::Queues::~Queues() {
mQueue.clear();
mMainQueue.clear();
}
} // namespace ksys

View File

@ -0,0 +1,102 @@
#pragma once
#include <heap/seadDisposer.h>
#include <prim/seadRuntimeTypeInfo.h>
#include <thread/seadCriticalSection.h>
#include "KingSystem/Utils/Thread/MessageDispatcherBase.h"
#include "KingSystem/Utils/Thread/MessageProcessor.h"
#include "KingSystem/Utils/UniqueArrayPtr.h"
namespace ksys {
class Message;
class MessageProcessor;
class MessageDispatcher : public MessageDispatcherBase {
SEAD_SINGLETON_DISPOSER(MessageDispatcher)
SEAD_RTTI_OVERRIDE(MessageDispatcher, MessageDispatcherBase)
MessageDispatcher() = default;
~MessageDispatcher() override;
public:
void registerTransceiver(MessageReceiverEx& receiver) override;
void deregisterTransceiver(MessageReceiverEx& receiver) override;
bool sendMessage(const MesTransceiverId& src, const MesTransceiverId& dest,
const MessageType& type, void* user_data, bool ack) override;
bool sendMessageOnProcessingThread(const MesTransceiverId& src, const MesTransceiverId& dest,
const MessageType& type, void* user_data, bool ack) override;
void m_8() override;
void m_9() override;
void update() override;
private:
class Queue {
public:
Queue();
virtual ~Queue();
virtual bool addMessage(const Message& message);
virtual void processQueue(MessageProcessor& processor);
virtual void clear();
private:
Message* findUnusedEntry() const;
util::UniqueArrayPtr<Message, 3000> mMessages;
};
class DoubleBufferedQueue {
public:
DoubleBufferedQueue();
~DoubleBufferedQueue();
bool addMessage(const Message& message);
void clear();
void processQueue(MessageProcessor& processor);
private:
u32 mActiveIdx = 1;
Queue mBuffer[2];
};
class MainQueue {
public:
MainQueue();
virtual ~MainQueue();
virtual bool addMessage(const Message& message);
virtual void clear();
virtual void processQueue(MessageProcessor& processor);
private:
DoubleBufferedQueue mQueue;
bool mHasMessageToProcess = false;
};
class Queues {
public:
explicit Queues(MessageProcessor::Logger* logger);
~Queues();
private:
struct DummyLogger : public MessageProcessor::Logger {
~DummyLogger() override;
void log(const Message& message, bool success) override {}
};
struct TransceiverIdBuffer {
TransceiverIdBuffer();
~TransceiverIdBuffer();
util::UniqueArrayPtr<MesTransceiverId*, 10000> mBuffer;
};
sead::CriticalSection mCritSection;
u32 mId = 0xffffffff;
DummyLogger mDummyLogger;
TransceiverIdBuffer mTransceiverIds;
DoubleBufferedQueue mQueue;
MainQueue mMainQueue;
MessageProcessor mProcessor;
bool mIsProcessing = false;
};
};
} // namespace ksys

View File

@ -4,20 +4,28 @@
namespace ksys {
class MessageReceiverEx;
struct MesTransceiverId;
struct MessageType;
class MessageDispatcherBase {
SEAD_RTTI_BASE(MessageDispatcherBase)
public:
MessageDispatcherBase();
virtual ~MessageDispatcherBase();
virtual void registerTransceiver(MessageReceiverEx& receiver) = 0;
virtual void deregisterTransceiver(MessageReceiverEx& receiver) = 0;
virtual bool sendMessage(const MesTransceiverId& src, const MesTransceiverId& dest,
const MessageType& type, void* user_data, bool ack) = 0;
virtual bool sendMessageOnProcessingThread(const MesTransceiverId& src,
const MesTransceiverId& dest,
const MessageType& type, void* user_data,
bool ack) = 0;
// TODO
virtual void m_4() = 0;
virtual void m_5() = 0;
virtual void m_6() = 0;
virtual void m_7() = 0;
virtual void m_8() = 0;
virtual void m_9() = 0;
virtual void m_10() = 0;
virtual void update() = 0;
protected:
void setAsGlobalInstance();

View File

@ -1,5 +1,4 @@
#include "KingSystem/Utils/Thread/MessageProcessor.h"
#include <tuple>
#include "KingSystem/Utils/Thread/Message.h"
#include "KingSystem/Utils/Thread/MessageAck.h"
#include "KingSystem/Utils/Thread/MessageReceiver.h"
@ -10,18 +9,6 @@ MessageProcessor::MessageProcessor(Logger* logger) : mLogger(logger) {}
MessageProcessor::~MessageProcessor() = default;
static bool checkTransceiver(const MesTransceiverId& id) {
if (!id.next)
return false;
MesTransceiverId* next = *id.next;
if (!next)
return false;
const auto& fields = [](const MesTransceiverId& i) { return std::tie(i.queue_id, i.id); };
return fields(id) == fields(*next);
}
bool MessageProcessor::process(Message* message) {
message->decrementDelay();
@ -32,17 +19,17 @@ bool MessageProcessor::process(Message* message) {
bool dest_valid = false;
const auto& dest = message->getDestination();
if (checkTransceiver(dest)) {
if (Message::checkTransceiver(dest)) {
success = dest.receiver->receive(*message) & 1;
mLogger->log(*message, success);
dest_valid = true;
}
const auto& src = message->getSource();
if (!message->hasDelayer() || checkTransceiver(src)) {
if (!message->hasDelayer() || Message::checkTransceiver(src)) {
if (message->shouldAck()) {
const auto& source = message->getSource();
if (checkTransceiver(source)) {
if (Message::checkTransceiver(source)) {
auto* receiver = source.receiver;
const MessageAck ack{dest_valid, success, message->getDestination(),
message->getType(), message->getUserData()};

View File

@ -13,6 +13,7 @@ public:
virtual void log(const Message& message, bool success) = 0;
};
/// @param logger Must be non null.
explicit MessageProcessor(Logger* logger);
virtual ~MessageProcessor();
virtual bool process(Message* message);

View File

@ -0,0 +1,31 @@
#pragma once
#include <basis/seadTypes.h>
#include "KingSystem/Utils/SafeDelete.h"
namespace ksys::util {
template <typename T, std::size_t N>
class UniqueArrayPtr {
public:
UniqueArrayPtr() = default;
UniqueArrayPtr(const UniqueArrayPtr&) = delete;
// Not movable either for now.
UniqueArrayPtr(UniqueArrayPtr&&) = delete;
~UniqueArrayPtr() { safeDeleteArray(mData); }
auto& operator=(const UniqueArrayPtr&) = delete;
auto& operator=(UniqueArrayPtr&&) = delete;
T* data() const { return mData; }
std::size_t size() const { return N; }
T* begin() const { return mData; }
T* end() const { return mData + N; }
T*& operator[](std::size_t i) const { return mData[i]; }
private:
T* mData = new T[N];
};
} // namespace ksys::util