From 1a04310f80070a367f088c38e39a54d3d4c2cc67 Mon Sep 17 00:00:00 2001 From: "Justin C. Miller" Date: Fri, 14 Oct 2022 01:02:56 -0700 Subject: [PATCH] [kernel] Simplify mailbox code, and messages A number of simplifications of mailboxes now that the interface is much simpler, and synchronous. * call and respond can now only transfer one handle at a time * mailbox objects got rid of the message queue, and just have wait_queues of blocked threads, and a reply_to map. * threads now have a message_data struct on them for use by mailboxes --- definitions/objects/mailbox.def | 7 +- src/kernel/objects/mailbox.cpp | 121 +++++++++++--------------- src/kernel/objects/mailbox.h | 99 ++++++--------------- src/kernel/objects/thread.h | 9 ++ src/kernel/syscalls/helpers.h | 6 ++ src/kernel/syscalls/mailbox.cpp | 118 ++++++------------------- src/kernel/wait_queue.cpp | 4 +- src/kernel/wait_queue.h | 5 +- src/libraries/util/util/deque.h | 2 +- src/user/drv.uart/main.cpp | 5 +- src/user/srv.init/service_locator.cpp | 39 ++++----- src/user/srv.logger/main.cpp | 6 +- 12 files changed, 147 insertions(+), 274 deletions(-) diff --git a/definitions/objects/mailbox.def b/definitions/objects/mailbox.def index 1250036..87f0046 100644 --- a/definitions/objects/mailbox.def +++ b/definitions/objects/mailbox.def @@ -19,7 +19,7 @@ object mailbox : object { method call [cap:send] { param tag uint64 [inout] param subtag uint64 [inout] - param handles ref object [inout list zero_ok] + param give_handle ref object [inout handle] } # Respond to a message sent using call, and wait for another @@ -29,9 +29,8 @@ object mailbox : object { method respond [cap:receive] { param tag uint64 [inout] param subtag uint64 [inout] - param handles ref object [inout list zero_ok] - param handles_in size - param reply_tag uint16 [inout] + param give_handle ref object [inout handle] + param reply_tag uint64 [inout] param flags uint64 } } diff --git a/src/kernel/objects/mailbox.cpp b/src/kernel/objects/mailbox.cpp index 464954e..71b7a0d 100644 --- a/src/kernel/objects/mailbox.cpp +++ b/src/kernel/objects/mailbox.cpp @@ -3,16 +3,8 @@ #include "objects/mailbox.h" #include "objects/thread.h" -DEFINE_SLAB_ALLOCATOR(obj::mailbox::message); - namespace obj { -static_assert(mailbox::message::slab_size % sizeof(mailbox::message) == 0, - "mailbox message size does not fit cleanly into N pages."); - -constexpr uint64_t no_message = 0; -constexpr uint64_t has_message = 1; - mailbox::mailbox() : kobject(kobject::type::mailbox), m_closed {false}, @@ -28,91 +20,80 @@ mailbox::~mailbox() void mailbox::close() { - util::scoped_lock lock {m_message_lock}; - // If this was previously closed, we're done - if (closed()) return; - m_closed = true; + bool was_closed = __atomic_exchange_n(&m_closed, true, __ATOMIC_ACQ_REL); + if (was_closed) return; - while (!m_messages.empty()) { - message *msg = m_messages.pop_front(); - delete msg; - } - - for (auto &p : m_pending) { - delete p.val.msg; - } - m_queue.clear(); + m_callers.clear(j6_status_closed); + m_responders.clear(j6_status_closed); } -bool -mailbox::call(message *msg) +j6_status_t +mailbox::call() { - uint16_t reply_tag = next_reply_tag(); + if (closed()) + return j6_status_closed; - util::scoped_lock lock {m_message_lock}; - - msg->reply_tag = reply_tag; thread ¤t = thread::current(); - m_pending.insert(reply_tag, {¤t, msg}); + m_callers.add_thread(¤t); - m_messages.push_back(msg); + thread *responder = m_responders.pop_next(); + if (responder) + responder->wake(j6_status_ok); - thread *t = m_queue.pop_next(); - - lock.release(); - if (t) t->wake(has_message); - - uint64_t result = current.block(); - return result == has_message; + return current.block(); } -bool -mailbox::receive(mailbox::message *&msg, bool block) +j6_status_t +mailbox::receive(thread::message_data &data, reply_tag_t &reply_tag, bool block) { - util::scoped_lock lock {m_message_lock}; + if (closed()) + return j6_status_closed; - // This needs to be a loop because we're re-acquiring the lock - // after waking up, and may have missed the message that woke us - while (m_messages.empty()) { - if (!block) { - msg = nullptr; - return false; - } + thread ¤t = thread::current(); + thread *caller = nullptr; - thread &cur = thread::current(); - m_queue.add_thread(&cur); + while (true) { + caller = m_callers.pop_next(); + if (caller) + break; - lock.release(); - uint64_t result = cur.block(); - if (result == no_message) - return false; - lock.reacquire(); + if (!block) + return j6_status_would_block; + + m_responders.add_thread(¤t); + j6_status_t s = current.block(); + if (s != j6_status_ok) + return s; } - msg = m_messages.pop_front(); - return true; + util::scoped_lock lock {m_reply_lock}; + reply_tag = ++m_next_reply_tag; + m_reply_map.insert({ reply_tag, caller }); + lock.release(); + + data = caller->get_message_data(); + return j6_status_ok; } -mailbox::replyer -mailbox::reply(uint16_t reply_tag) +j6_status_t +mailbox::reply(reply_tag_t reply_tag, const thread::message_data &data) { - util::scoped_lock lock {m_message_lock}; + if (closed()) + return j6_status_closed; - pending *p = m_pending.find(reply_tag); - if (!p) return {}; + util::scoped_lock lock {m_reply_lock}; + reply_to *rt = m_reply_map.find(reply_tag); + if (!rt) + return j6_err_invalid_arg; - thread *caller = p->sender; - message *msg = p->msg; - m_pending.erase(reply_tag); + thread *caller = rt->thread; + m_reply_map.erase(reply_tag); + lock.release(); - return {msg, caller, has_message}; -} - -mailbox::replyer::~replyer() -{ - if (caller) - caller->wake(status); + caller->get_message_data() = data; + caller->wake(j6_status_ok); + return j6_status_ok; } } // namespace obj diff --git a/src/kernel/objects/mailbox.h b/src/kernel/objects/mailbox.h index 5c23525..470791c 100644 --- a/src/kernel/objects/mailbox.h +++ b/src/kernel/objects/mailbox.h @@ -4,11 +4,13 @@ #include #include -#include +#include #include +#include "heap_allocator.h" #include "memory.h" #include "objects/kobject.h" +#include "objects/thread.h" #include "slab_allocated.h" #include "wait_queue.h" @@ -21,6 +23,8 @@ class mailbox : public kobject { public: + using reply_tag_t = uint64_t; + /// Capabilities on a newly constructed mailbox handle constexpr static j6_cap_t creation_caps = j6_cap_mailbox_all; @@ -29,8 +33,6 @@ public: /// Max message handle count constexpr static size_t max_handle_count = 5; - struct message; - mailbox(); virtual ~mailbox(); @@ -38,88 +40,45 @@ public: void close(); /// Check if the mailbox has been closed - inline bool closed() const { return m_closed; } + inline bool closed() const { return __atomic_load_n(&m_closed, __ATOMIC_ACQUIRE); } /// Send a message to a thread waiting to receive on this mailbox, and block the - /// current thread awaiting a response. The response will be placed in the message - /// object provided. - /// \arg msg [inout] The mailbox::message to send, will contain the response afterward - /// \returns true if a reply was recieved - bool call(message *msg); + /// current thread awaiting a response. The message contents should be in the calling + /// thread's message_data. + /// \returns j6_status_ok if a reply was received + j6_status_t call(); /// Receive the next available message, optionally blocking if no messages are available. - /// \arg msg [out] a pointer to the received message. The caller is responsible for - /// deleting the message structure when finished. + /// \arg data [out] a thread::message_data structure to fill + /// \arg reply_tag [out] the reply_tag to use when replying to this message /// \arg block True if this call should block when no messages are available. - /// \returns True if a message was received successfully. - bool receive(message *&msg, bool block); - - class replyer; + /// \returns j6_status_ok if a message was received + j6_status_t receive(thread::message_data &data, reply_tag_t &reply_tag, bool block); /// Find a given pending message to be responded to. Returns a replyer object, which will /// wake the calling thread upon destruction. /// \arg reply_tag The reply tag in the original message - /// \returns A replyer object contining the message - replyer reply(uint16_t reply_tag); + /// \arg data Message data to pass on to the caller + /// \returns j6_status_ok if the reply was successfully sent + j6_status_t reply(reply_tag_t reply_tag, const thread::message_data &data); private: - inline uint16_t next_reply_tag() { - return (__atomic_add_fetch(&m_next_reply_tag, 1, __ATOMIC_SEQ_CST) << 1) | 1; - } + wait_queue m_callers; + wait_queue m_responders; + + struct reply_to { reply_tag_t reply_tag; thread *thread; }; + using reply_map = + util::node_map; + + util::spinlock m_reply_lock; + reply_map m_reply_map; bool m_closed; - uint16_t m_next_reply_tag; + reply_tag_t m_next_reply_tag; - util::spinlock m_message_lock; - util::deque m_messages; - - struct pending { thread *sender; message *msg; }; - util::map m_pending; - wait_queue m_queue; + friend reply_tag_t & get_map_key(reply_to &rt); }; - -struct mailbox::message : - public slab_allocated -{ - uint64_t tag; - uint64_t subtag; - - uint16_t reply_tag; - uint8_t handle_count; - - j6_handle_t handles[mailbox::max_handle_count]; -}; - -class mailbox::replyer -{ -public: - replyer() : msg {nullptr}, caller {nullptr}, status {0} {} - replyer(mailbox::message *m, thread *c, uint64_t s) : msg {m}, caller {c}, status {s} {} - replyer(replyer &&o) : msg {o.msg}, caller {o.caller}, status {o.status} { - o.msg = nullptr; o.caller = nullptr; o.status = 0; - } - - replyer & operator=(replyer &&o) { - msg = o.msg; caller = o.caller; status = o.status; - o.msg = nullptr; o.caller = nullptr; - return *this; - } - - /// The replyer's dtor will wake the calling thread - ~replyer(); - - /// Check if the reply is valid - inline bool valid() const { return msg && caller; } - - /// Set an error to give to the caller - inline void error(uint64_t e) { status = e; } - - mailbox::message *msg; - -private: - thread *caller; - uint64_t status; -}; +inline mailbox::reply_tag_t & get_map_key(mailbox::reply_to &rt) { return rt.reply_tag; } } // namespace obj diff --git a/src/kernel/objects/thread.h b/src/kernel/objects/thread.h index 36b4473..e23ccc9 100644 --- a/src/kernel/objects/thread.h +++ b/src/kernel/objects/thread.h @@ -114,6 +114,14 @@ public: /// \returns The clock time at which to wake. 0 for no timeout. inline uint64_t wake_timeout() const { return m_wake_timeout; } + struct message_data + { + uint64_t tag, subtag; + j6_handle_t handle; + }; + + message_data & get_message_data() { return m_message_data; } + inline bool has_state(state s) const { return static_cast(m_state) & static_cast(s); } @@ -186,6 +194,7 @@ private: uint64_t m_wake_value; uint64_t m_wake_timeout; + message_data m_message_data; j6_handle_t m_self_handle; }; diff --git a/src/kernel/syscalls/helpers.h b/src/kernel/syscalls/helpers.h index 410ab53..95ca96a 100644 --- a/src/kernel/syscalls/helpers.h +++ b/src/kernel/syscalls/helpers.h @@ -37,6 +37,12 @@ j6_status_t get_handle(j6_handle_t id, j6_cap_t caps, T *&object) return j6_status_ok; } +template +inline j6_status_t get_handle(j6_handle_t *id, j6_cap_t caps, T *&object) +{ + return get_handle(*id, caps, object); +} + template <> inline j6_status_t get_handle(j6_handle_t id, j6_cap_t caps, obj::kobject *&object) { diff --git a/src/kernel/syscalls/mailbox.cpp b/src/kernel/syscalls/mailbox.cpp index dc25b06..9a33496 100644 --- a/src/kernel/syscalls/mailbox.cpp +++ b/src/kernel/syscalls/mailbox.cpp @@ -27,78 +27,29 @@ mailbox_close(mailbox *self) return j6_status_ok; } -j6_status_t -prep_send( - mailbox::message *msg, - uint64_t tag, - uint64_t subtag, - const j6_handle_t *handles, - size_t handle_count) -{ - if (!msg || - handle_count > mailbox::max_handle_count) - return j6_err_invalid_arg; - - msg->tag = tag; - msg->subtag = subtag; - - msg->handle_count = handle_count; - memcpy(msg->handles, handles, sizeof(j6_handle_t) * handle_count); - - return j6_status_ok; -} - -void -prep_receive( - mailbox::message *msg, - uint64_t *tag, - uint64_t *subtag, - uint16_t *reply_tag, - j6_handle_t *handles, - size_t *handle_count) -{ - if (tag) *tag = msg->tag; - if (subtag) *subtag = msg->subtag; - if (reply_tag) *reply_tag = msg->reply_tag; - - *handle_count = msg->handle_count; - process &proc = process::current(); - for (size_t i = 0; i < msg->handle_count; ++i) { - proc.add_handle(msg->handles[i]); - handles[i] = msg->handles[i]; - } -} - j6_status_t mailbox_call( mailbox *self, uint64_t *tag, uint64_t *subtag, - j6_handle_t *handles, - size_t *handle_count) + j6_handle_t *handle) { - mailbox::message *msg = new mailbox::message; + thread::message_data &data = + thread::current().get_message_data(); - j6_status_t s = prep_send(msg, - *tag, *subtag, - handles, *handle_count); + data.tag = *tag; + data.subtag = *subtag; + data.handle = *handle; - if (s != j6_status_ok) { - delete msg; + j6_status_t s = self->call(); + if (s != j6_status_ok) return s; - } - if (!self->call(msg)) { - delete msg; - return self->closed() ? j6_status_closed : - j6_err_unexpected; - } + *tag = data.tag; + *subtag = data.subtag; + *handle = data.handle; + process::current().add_handle(*handle); - prep_receive(msg, - tag, subtag, 0, - handles, handle_count); - - delete msg; return j6_status_ok; } @@ -107,46 +58,27 @@ mailbox_respond( mailbox *self, uint64_t *tag, uint64_t *subtag, - j6_handle_t *handles, - size_t *handle_count, - size_t handles_in, - uint16_t *reply_tag, + j6_handle_t *handle, + uint64_t *reply_tag, uint64_t flags) { + thread::message_data data { *tag, *subtag, *handle }; + if (*reply_tag) { - mailbox::replyer reply = self->reply(*reply_tag); - if (!reply.valid()) - return j6_err_invalid_arg; - - j6_status_t s = prep_send(reply.msg, *tag, *subtag, - handles, handles_in); - - if (s != j6_status_ok) { - reply.error(s); + j6_status_t s = self->reply(*reply_tag, data); + if (s != j6_status_ok) return s; - } } - - if (*handle_count < mailbox::max_handle_count) - return j6_err_insufficient; - - mailbox::message *msg = nullptr; - bool block = flags & j6_mailbox_block; - if (!self->receive(msg, block)) { - // No message received - return self->closed() ? j6_status_closed : - !block ? j6_status_would_block : - j6_err_unexpected; - } + j6_status_t s = self->receive(data, *reply_tag, block); + if (s != j6_status_ok) + return s; - prep_receive(msg, - tag, subtag, reply_tag, - handles, handle_count); - - if (*reply_tag == 0) - delete msg; + *tag = data.tag; + *subtag = data.subtag; + *handle = data.handle; + process::current().add_handle(*handle); return j6_status_ok; } diff --git a/src/kernel/wait_queue.cpp b/src/kernel/wait_queue.cpp index 84584e9..1092c7f 100644 --- a/src/kernel/wait_queue.cpp +++ b/src/kernel/wait_queue.cpp @@ -43,11 +43,11 @@ wait_queue::pop_next_unlocked() } void -wait_queue::clear() +wait_queue::clear(uint64_t value) { util::scoped_lock lock {m_lock}; for (auto *t : m_threads) { - if (!t->exited()) t->wake(); + if (!t->exited()) t->wake(value); t->handle_release(); } } diff --git a/src/kernel/wait_queue.h b/src/kernel/wait_queue.h index 14fe3b1..2de60ec 100644 --- a/src/kernel/wait_queue.h +++ b/src/kernel/wait_queue.h @@ -38,7 +38,8 @@ public: util::spinlock & get_lock() { return m_lock; } /// Wake and clear out all threads. - void clear(); + /// \arg value The value passed to thread::wake + void clear(uint64_t value = 0); private: /// Get rid of any exited threads that are next @@ -46,6 +47,6 @@ private: void pop_exited(); util::spinlock m_lock; - util::deque m_threads; + util::deque m_threads; }; diff --git a/src/libraries/util/util/deque.h b/src/libraries/util/util/deque.h index b4ad093..c8a3c2b 100644 --- a/src/libraries/util/util/deque.h +++ b/src/libraries/util/util/deque.h @@ -11,11 +11,11 @@ namespace util { template class deque { +public: struct node { T items[N]; }; using list_type = linked_list; using node_type = typename list_type::item_type; -public: class iterator { public: diff --git a/src/user/drv.uart/main.cpp b/src/user/drv.uart/main.cpp index 51322a0..ecf1285 100644 --- a/src/user/drv.uart/main.cpp +++ b/src/user/drv.uart/main.cpp @@ -61,10 +61,7 @@ channel_pump_loop() uint64_t tag = j6_proto_sl_register; uint64_t proto_id = "jsix.protocol.stream.ouput"_id; - size_t handle_count = 1; - result = j6_mailbox_call(slp, - &tag, &proto_id, - &cout_write, &handle_count); + result = j6_mailbox_call(slp, &tag, &proto_id, &cout_write); if (result != j6_status_ok) return 4; diff --git a/src/user/srv.init/service_locator.cpp b/src/user/srv.init/service_locator.cpp index a28f8f9..c087cf8 100644 --- a/src/user/srv.init/service_locator.cpp +++ b/src/user/srv.init/service_locator.cpp @@ -25,23 +25,17 @@ service_locator_start(j6_handle_t mb) uint64_t tag = 0; uint64_t subtag = 0; - uint16_t reply_tag = 0; - - j6_handle_t handles[10] = {0}; - const size_t handles_capacity = sizeof(handles)/sizeof(j6_handle_t); - size_t handles_count = 0; + uint64_t reply_tag = 0; + j6_handle_t give_handle = j6_handle_invalid; uint64_t proto_id; while (true) { - size_t handles_in = handles_count; - handles_count = handles_capacity; + j6_status_t s = j6_mailbox_respond(mb, &tag, &subtag, &give_handle, + &reply_tag, j6_mailbox_block); - j6_status_t s = j6_mailbox_respond(mb, - &tag, &subtag, - handles, &handles_count, - handles_in, &reply_tag, - j6_mailbox_block); + if (s != j6_status_ok) + while (1); handle_entry *found = nullptr; @@ -49,22 +43,21 @@ service_locator_start(j6_handle_t mb) case j6_proto_base_get_proto_id: tag = j6_proto_base_proto_id; subtag = j6_proto_sl_id; - handles_count = 0; + give_handle = j6_handle_invalid; break; case j6_proto_sl_register: proto_id = subtag; - if (handles_count != 1) { + if (give_handle == j6_handle_invalid) { tag = j6_proto_base_status; subtag = j6_err_invalid_arg; - handles_count = 0; break; } - services.insert( {proto_id, handles[0]} ); + services.insert( {proto_id, give_handle} ); tag = j6_proto_base_status; subtag = j6_status_ok; - handles_count = 0; + give_handle = j6_handle_invalid; break; case j6_proto_sl_find: @@ -72,18 +65,16 @@ service_locator_start(j6_handle_t mb) tag = j6_proto_sl_result; found = services.find(proto_id); - if (found) { - handles_count = 1; - handles[0] = found->handle; - } else { - handles_count = 0; - } + if (found) + give_handle = found->handle; + else + give_handle = j6_handle_invalid; break; default: tag = j6_proto_base_status; subtag = j6_err_invalid_arg; - handles_count = 0; + give_handle = j6_handle_invalid; break; } } diff --git a/src/user/srv.logger/main.cpp b/src/user/srv.logger/main.cpp index 96bfe5f..484e6e1 100644 --- a/src/user/srv.logger/main.cpp +++ b/src/user/srv.logger/main.cpp @@ -125,13 +125,11 @@ main(int argc, const char **argv) for (unsigned i = 0; i < 100; ++i) { uint64_t tag = j6_proto_sl_find; uint64_t proto_id = "jsix.protocol.stream.ouput"_id; - size_t handle_count = 0; - j6_status_t s = j6_mailbox_call(slp, &tag, - &proto_id, &cout, &handle_count); + j6_status_t s = j6_mailbox_call(slp, &tag, &proto_id, &cout); if (s == j6_status_ok && tag == j6_proto_sl_result && - handle_count == 1) + cout != j6_handle_invalid) break; cout = j6_handle_invalid;