From dfef8dfff7a66851b5797f1863a4507cdad6e44c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?L=C3=A9o=20Lam?= Date: Mon, 1 Feb 2021 21:17:56 +0100 Subject: [PATCH] ksys: Implement several more MessageDispatcher functions --- data/uking_functions.csv | 8 +- lib/sead | 2 +- src/KingSystem/Utils/Thread/Message.h | 14 +- .../Utils/Thread/MessageDispatcher.cpp | 120 +++++++++++++++++- .../Utils/Thread/MessageDispatcher.h | 20 ++- .../Utils/Thread/MessageProcessor.cpp | 6 +- .../Utils/Thread/MessageReceiverEx.h | 1 + .../Utils/Thread/MessageTransceiverId.h | 22 +++- 8 files changed, 158 insertions(+), 35 deletions(-) diff --git a/data/uking_functions.csv b/data/uking_functions.csv index 3a2fce5e..6a6b24c3 100644 --- a/data/uking_functions.csv +++ b/data/uking_functions.csv @@ -92641,10 +92641,10 @@ 0x00000071011f4db4,GlobalMessage::dtorDelete,36,_ZN4ksys17MessageDispatcherD0Ev 0x00000071011f4dd8,GlobalMessage::init,580,_ZN4ksys17MessageDispatcher4initERKNS0_7InitArgEPN4sead4HeapE 0x00000071011f501c,GlobalMessage::currentTlsSlotIsSame,56,_ZNK4ksys17MessageDispatcher27isProcessingOnCurrentThreadEv -0x00000071011f5054,GlobalMessage::x,376, -0x00000071011f51cc,GlobalMessage::x_0,344, -0x00000071011f5324,GlobalMessage::x_1,124, -0x00000071011f53a0,GlobalMessage::x_5,236, +0x00000071011f5054,GlobalMessage::x,376,_ZN4ksys17MessageDispatcher19registerTransceiverERNS_17MessageReceiverExE +0x00000071011f51cc,GlobalMessage::x_0,344,_ZN4ksys17MessageDispatcher21deregisterTransceiverERNS_17MessageReceiverExE +0x00000071011f5324,GlobalMessage::x_1,124,_ZN4ksys17MessageDispatcher11sendMessageERKNS_16MesTransceiverIdES3_RKNS_11MessageTypeEPvb +0x00000071011f53a0,GlobalMessage::x_5,236,_ZN4ksys17MessageDispatcher29sendMessageOnProcessingThreadERKNS_16MesTransceiverIdES3_RKNS_11MessageTypeEPvb? 0x00000071011f548c,GlobalMessage::x_2,208, 0x00000071011f555c,GlobalMessage::x_3,268, 0x00000071011f5668,GlobalMessage::x_4,248, diff --git a/lib/sead b/lib/sead index 1737b022..ac3d8764 160000 --- a/lib/sead +++ b/lib/sead @@ -1 +1 @@ -Subproject commit 1737b022c7b70c04cc2471ff58d00ead1b6b6a93 +Subproject commit ac3d8764bba7dedeea62ce0d6841b0f801111321 diff --git a/src/KingSystem/Utils/Thread/Message.h b/src/KingSystem/Utils/Thread/Message.h index 07da260d..351beb57 100644 --- a/src/KingSystem/Utils/Thread/Message.h +++ b/src/KingSystem/Utils/Thread/Message.h @@ -88,19 +88,7 @@ public: reset(); } - bool isValid() const { return checkTransceiver(mDestination); } - - static bool checkTransceiver(const MesTransceiverId& id) { - if (!id.next) - return false; - - MesTransceiverId* next = *id.next; - if (!next) - return false; - - const auto& fields = [](const MesTransceiverId& i) { return std::tie(i.queue_id, i.id); }; - return fields(id) == fields(*next); - } + bool isValid() const { return mDestination.isRegistered(); } private: MesTransceiverId mSource{}; diff --git a/src/KingSystem/Utils/Thread/MessageDispatcher.cpp b/src/KingSystem/Utils/Thread/MessageDispatcher.cpp index cbd4f905..b45fe3d8 100644 --- a/src/KingSystem/Utils/Thread/MessageDispatcher.cpp +++ b/src/KingSystem/Utils/Thread/MessageDispatcher.cpp @@ -1,10 +1,13 @@ #include "KingSystem/Utils/Thread/MessageDispatcher.h" #include #include +#include #include +#include "KingSystem/Utils/Debug.h" #include "KingSystem/Utils/HeapUtil.h" #include "KingSystem/Utils/SafeDelete.h" #include "KingSystem/Utils/Thread/Message.h" +#include "KingSystem/Utils/Thread/MessageReceiverEx.h" namespace ksys { @@ -23,10 +26,10 @@ Message* MessageDispatcher::Queue::findUnusedEntry() const { } bool MessageDispatcher::Queue::addMessage(const Message& message) { - if (!Message::checkTransceiver(message.getSource())) + if (!message.getSource().isRegistered()) return false; - if (!Message::checkTransceiver(message.getDestination())) + if (!message.getDestination().isRegistered()) return false; auto* entry = findUnusedEntry(); @@ -66,7 +69,6 @@ void MessageDispatcher::DoubleBufferedQueue::clear() { } void MessageDispatcher::DoubleBufferedQueue::processQueue(MessageProcessor& processor) { - mActiveIdx ^= 1; mBuffer[mActiveIdx].processQueue(processor); } @@ -94,6 +96,7 @@ void MessageDispatcher::MainQueue::clear() { void MessageDispatcher::MainQueue::processQueue(MessageProcessor& processor) { for (u32 i = 0; mHasMessageToProcess && i < 1000; ++i) { mHasMessageToProcess = false; + mQueue.swapBuffer(); mQueue.processQueue(processor); } } @@ -105,7 +108,7 @@ MessageDispatcher::Queues::TransceiverIdBuffer::TransceiverIdBuffer() { MessageDispatcher::Queues::TransceiverIdBuffer::~TransceiverIdBuffer() { for (auto it = mBuffer.begin(); it != mBuffer.end(); ++it) { - if (auto* id = *it; id && Message::checkTransceiver(*id)) + if (auto* id = *it; id && id->isRegistered()) id->reset(); } } @@ -153,4 +156,113 @@ bool MessageDispatcher::isProcessingOnCurrentThread() const { return mProcessingThread == sead::ThreadMgr::instance()->getCurrentThread(); } +void MessageDispatcher::registerTransceiver(MessageReceiverEx& receiver) { + const auto lock = sead::makeScopedLock(mCritSection); + Queues& queues = *mQueues; + receiver.setQueueId(queues.getId()); + MesTransceiverId** ref = nullptr; + { + const auto queue_lock = sead::makeScopedLock(queues.getCritSection()); + MesTransceiverId* id = receiver.getId(); + + if (!id->isRegistered()) { + const auto& pointers = queues.getIdPointers(); + for (auto it = pointers.begin(); it != pointers.end(); ++it) { + if (*it == nullptr) { + ref = it; + break; + } + } + } + + if (ref) { + *ref = id; + id->self_ref = ref; + } + } + + if (!ref) { + sead::FormatFixedSafeString<128> msg{"↓↓↓\nエントリー数 : %d\n↑↑↑\n", mNumEntries.load()}; + util::PrintDebug(msg); + return; + } + + mNumEntries.increment(); + if (!mBools.isEmpty()) + receiver.setFlagPointer(mBools.popFront()); +} + +void MessageDispatcher::deregisterTransceiver(MessageReceiverEx& receiver) { + if (receiver.checkFlag() && receiver.checkCounter()) + mUpdateEndEvent.wait(); + + const auto lock = sead::makeScopedLock(mCritSection); + if (!receiver.getId()->isRegistered()) + return; + + auto* ptr = receiver.getFlagPointer(); + if (!ptr) + return; + + mBools.emplaceBack(ptr); + receiver.clearFlagPointer(); + + { + const auto queue_lock = sead::makeScopedLock(mQueues->getCritSection()); + auto& id = *receiver.getId(); + if (id.isRegistered()) { + auto* self = id.self_ref; + id.self_ref = nullptr; + *self = nullptr; + } + } + + mNumEntries.decrement(); +} + +bool MessageDispatcher::sendMessage(const MesTransceiverId& src, const MesTransceiverId& dest, + const MessageType& type, void* user_data, bool ack) { + auto* queues = mQueues; + const auto message = Message{src, dest, type, user_data, {}, ack}; + const auto lock = sead::makeScopedLock(queues->getCritSection()); + return queues->getQueue().addMessage(message); +} + +// NON_MATCHING: branching: deduplicated Message destructor call +bool MessageDispatcher::sendMessageOnProcessingThread(const MesTransceiverId& src, + const MesTransceiverId& dest, + const MessageType& type, void* user_data, + bool ack) { + if (!isProcessingOnCurrentThread()) + return false; + + auto* queues = mQueues; + const auto message = Message{src, dest, type, user_data, {}, ack}; + if (!queues->isProcessing()) + return false; + return queues->getMainQueue().addMessage(message); +} + +void MessageDispatcher::Queues::process() { + { + const auto lock = sead::makeScopedLock(mCritSection); + mQueue.swapBuffer(); + } + mIsProcessing = true; + mQueue.processQueue(mProcessor); + mMainQueue.processQueue(mProcessor); + mIsProcessing = false; +} + +void MessageDispatcher::update() { + mUpdateEndEvent.resetSignal(); + mProcessingThread = sead::ThreadMgr::instance()->getCurrentThread(); + + mQueues->process(); + + sead::MemUtil::fillZero(mBoolBuffer.getBufferPtr(), mBoolBuffer.getByteSize()); + mProcessingThread = nullptr; + mUpdateEndEvent.setSignal(); +} + } // namespace ksys diff --git a/src/KingSystem/Utils/Thread/MessageDispatcher.h b/src/KingSystem/Utils/Thread/MessageDispatcher.h index 20b81f43..907e917e 100644 --- a/src/KingSystem/Utils/Thread/MessageDispatcher.h +++ b/src/KingSystem/Utils/Thread/MessageDispatcher.h @@ -5,6 +5,7 @@ #include #include #include +#include #include #include "KingSystem/Utils/Thread/Event.h" #include "KingSystem/Utils/Thread/MessageDispatcherBase.h" @@ -19,6 +20,7 @@ namespace ksys { class Message; class MessageProcessor; +struct MesTransceiverId; class MessageDispatcher : public MessageDispatcherBase { SEAD_SINGLETON_DISPOSER(MessageDispatcher) @@ -71,13 +73,14 @@ private: bool addMessage(const Message& message); void clear(); void processQueue(MessageProcessor& processor); + void swapBuffer() { mActiveIdx ^= 1; } private: u32 mActiveIdx = 1; Queue mBuffer[2]; }; - class MainQueue { + class MainQueue final { public: MainQueue(); virtual ~MainQueue(); @@ -94,6 +97,13 @@ private: public: explicit Queues(MessageProcessor::Logger* logger); ~Queues(); + const u32& getId() const { return mId; } + sead::CriticalSection& getCritSection() { return mCritSection; } + const auto& getIdPointers() const { return mTransceiverIdPtrs.mBuffer; } + DoubleBufferedQueue& getQueue() { return mQueue; } + MainQueue& getMainQueue() { return mMainQueue; } + bool isProcessing() const { return mIsProcessing; } + void process(); private: struct DummyLogger : public MessageProcessor::Logger { @@ -111,7 +121,7 @@ private: sead::CriticalSection mCritSection; u32 mId = 0xffffffff; DummyLogger mDummyLogger; - TransceiverIdBuffer mTransceiverIds; + TransceiverIdBuffer mTransceiverIdPtrs; DoubleBufferedQueue mQueue; MainQueue mMainQueue; MessageProcessor mProcessor; @@ -131,11 +141,11 @@ private: Logger mLogger{}; Queues* mQueues{}; sead::TypedBitFlag mFlags; - sead::Buffer mBoolBuffer; - sead::ObjList mBools; + sead::Buffer mBoolBuffer; + sead::ObjList mBools; sead::CriticalSection mCritSection; util::Event mUpdateEndEvent; - int mNumEntries = 0; + sead::Atomic mNumEntries = 0; }; } // namespace ksys diff --git a/src/KingSystem/Utils/Thread/MessageProcessor.cpp b/src/KingSystem/Utils/Thread/MessageProcessor.cpp index 88ce1b93..779afde6 100644 --- a/src/KingSystem/Utils/Thread/MessageProcessor.cpp +++ b/src/KingSystem/Utils/Thread/MessageProcessor.cpp @@ -19,17 +19,17 @@ bool MessageProcessor::process(Message* message) { bool dest_valid = false; const auto& dest = message->getDestination(); - if (Message::checkTransceiver(dest)) { + if (dest.isRegistered()) { success = dest.receiver->receive(*message) & 1; mLogger->log(*message, success); dest_valid = true; } const auto& src = message->getSource(); - if (!message->hasDelayer() || Message::checkTransceiver(src)) { + if (!message->hasDelayer() || src.isRegistered()) { if (message->shouldAck()) { const auto& source = message->getSource(); - if (Message::checkTransceiver(source)) { + if (source.isRegistered()) { auto* receiver = source.receiver; const MessageAck ack{dest_valid, success, message->getDestination(), message->getType(), message->getUserData()}; diff --git a/src/KingSystem/Utils/Thread/MessageReceiverEx.h b/src/KingSystem/Utils/Thread/MessageReceiverEx.h index 61f0571a..80a10a15 100644 --- a/src/KingSystem/Utils/Thread/MessageReceiverEx.h +++ b/src/KingSystem/Utils/Thread/MessageReceiverEx.h @@ -17,6 +17,7 @@ public: bool checkCounter() const; void setFlag(bool update_counter); + u8* getFlagPointer() const { return mFlag; } void setFlagPointer(u8* ptr); void clearFlagPointer(); diff --git a/src/KingSystem/Utils/Thread/MessageTransceiverId.h b/src/KingSystem/Utils/Thread/MessageTransceiverId.h index 8c9371b0..63a73184 100644 --- a/src/KingSystem/Utils/Thread/MessageTransceiverId.h +++ b/src/KingSystem/Utils/Thread/MessageTransceiverId.h @@ -1,6 +1,7 @@ #pragma once #include +#include #include "KingSystem/Utils/Types.h" @@ -9,30 +10,41 @@ namespace ksys { class MessageReceiver; struct MesTransceiverId { - MesTransceiverId() : receiver(), next() { reset(); } + MesTransceiverId() : receiver(), self_ref() { reset(); } MesTransceiverId(const MesTransceiverId& other) { *this = other; } - MesTransceiverId(u32& id, MessageReceiver* receiver) : id(++id), receiver(receiver), next() {} + MesTransceiverId(u32& id, MessageReceiver* receiver) + : id(++id), receiver(receiver), self_ref() {} ~MesTransceiverId() { reset(); } void reset() { queue_id = 0xffffffff; id = 0xffffffff; receiver = nullptr; - next = nullptr; + self_ref = nullptr; } MesTransceiverId& operator=(const MesTransceiverId& other) { queue_id = other.queue_id; id = other.id; receiver = other.receiver; - next = other.next; + self_ref = other.self_ref; return *this; } + bool isRegistered() const { + if (!self_ref || !*self_ref) + return false; + + const auto fields = [](const MesTransceiverId& i) { return std::tie(i.queue_id, i.id); }; + return fields(*this) == fields(**self_ref); + } + u32 queue_id = 0xffffffff; u32 id = 0xffffffff; MessageReceiver* receiver; - MesTransceiverId** next; + /// If registered, this points to a pointer that points to this object. + /// Otherwise, this value is nullptr. + MesTransceiverId** self_ref; }; KSYS_CHECK_SIZE_NX150(MesTransceiverId, 0x18);