[kernel] Simplify mailbox code, and messages

A number of simplifications of mailboxes now that the interface is much
simpler, and synchronous.

* call and respond can now only transfer one handle at a time
* mailbox objects got rid of the message queue, and just have
  wait_queues of blocked threads, and a reply_to map.
* threads now have a message_data struct on them for use by mailboxes
This commit is contained in:
Justin C. Miller
2022-10-14 01:02:56 -07:00
parent e830a3d37b
commit 1a04310f80
12 changed files with 147 additions and 274 deletions

View File

@@ -3,16 +3,8 @@
#include "objects/mailbox.h"
#include "objects/thread.h"
DEFINE_SLAB_ALLOCATOR(obj::mailbox::message);
namespace obj {
static_assert(mailbox::message::slab_size % sizeof(mailbox::message) == 0,
"mailbox message size does not fit cleanly into N pages.");
constexpr uint64_t no_message = 0;
constexpr uint64_t has_message = 1;
mailbox::mailbox() :
kobject(kobject::type::mailbox),
m_closed {false},
@@ -28,91 +20,80 @@ mailbox::~mailbox()
void
mailbox::close()
{
util::scoped_lock lock {m_message_lock};
// If this was previously closed, we're done
if (closed()) return;
m_closed = true;
bool was_closed = __atomic_exchange_n(&m_closed, true, __ATOMIC_ACQ_REL);
if (was_closed) return;
while (!m_messages.empty()) {
message *msg = m_messages.pop_front();
delete msg;
}
for (auto &p : m_pending) {
delete p.val.msg;
}
m_queue.clear();
m_callers.clear(j6_status_closed);
m_responders.clear(j6_status_closed);
}
bool
mailbox::call(message *msg)
j6_status_t
mailbox::call()
{
uint16_t reply_tag = next_reply_tag();
if (closed())
return j6_status_closed;
util::scoped_lock lock {m_message_lock};
msg->reply_tag = reply_tag;
thread &current = thread::current();
m_pending.insert(reply_tag, {&current, msg});
m_callers.add_thread(&current);
m_messages.push_back(msg);
thread *responder = m_responders.pop_next();
if (responder)
responder->wake(j6_status_ok);
thread *t = m_queue.pop_next();
lock.release();
if (t) t->wake(has_message);
uint64_t result = current.block();
return result == has_message;
return current.block();
}
bool
mailbox::receive(mailbox::message *&msg, bool block)
j6_status_t
mailbox::receive(thread::message_data &data, reply_tag_t &reply_tag, bool block)
{
util::scoped_lock lock {m_message_lock};
if (closed())
return j6_status_closed;
// This needs to be a loop because we're re-acquiring the lock
// after waking up, and may have missed the message that woke us
while (m_messages.empty()) {
if (!block) {
msg = nullptr;
return false;
}
thread &current = thread::current();
thread *caller = nullptr;
thread &cur = thread::current();
m_queue.add_thread(&cur);
while (true) {
caller = m_callers.pop_next();
if (caller)
break;
lock.release();
uint64_t result = cur.block();
if (result == no_message)
return false;
lock.reacquire();
if (!block)
return j6_status_would_block;
m_responders.add_thread(&current);
j6_status_t s = current.block();
if (s != j6_status_ok)
return s;
}
msg = m_messages.pop_front();
return true;
util::scoped_lock lock {m_reply_lock};
reply_tag = ++m_next_reply_tag;
m_reply_map.insert({ reply_tag, caller });
lock.release();
data = caller->get_message_data();
return j6_status_ok;
}
mailbox::replyer
mailbox::reply(uint16_t reply_tag)
j6_status_t
mailbox::reply(reply_tag_t reply_tag, const thread::message_data &data)
{
util::scoped_lock lock {m_message_lock};
if (closed())
return j6_status_closed;
pending *p = m_pending.find(reply_tag);
if (!p) return {};
util::scoped_lock lock {m_reply_lock};
reply_to *rt = m_reply_map.find(reply_tag);
if (!rt)
return j6_err_invalid_arg;
thread *caller = p->sender;
message *msg = p->msg;
m_pending.erase(reply_tag);
thread *caller = rt->thread;
m_reply_map.erase(reply_tag);
lock.release();
return {msg, caller, has_message};
}
mailbox::replyer::~replyer()
{
if (caller)
caller->wake(status);
caller->get_message_data() = data;
caller->wake(j6_status_ok);
return j6_status_ok;
}
} // namespace obj

View File

@@ -4,11 +4,13 @@
#include <j6/cap_flags.h>
#include <util/counted.h>
#include <util/map.h>
#include <util/node_map.h>
#include <util/spinlock.h>
#include "heap_allocator.h"
#include "memory.h"
#include "objects/kobject.h"
#include "objects/thread.h"
#include "slab_allocated.h"
#include "wait_queue.h"
@@ -21,6 +23,8 @@ class mailbox :
public kobject
{
public:
using reply_tag_t = uint64_t;
/// Capabilities on a newly constructed mailbox handle
constexpr static j6_cap_t creation_caps = j6_cap_mailbox_all;
@@ -29,8 +33,6 @@ public:
/// Max message handle count
constexpr static size_t max_handle_count = 5;
struct message;
mailbox();
virtual ~mailbox();
@@ -38,88 +40,45 @@ public:
void close();
/// Check if the mailbox has been closed
inline bool closed() const { return m_closed; }
inline bool closed() const { return __atomic_load_n(&m_closed, __ATOMIC_ACQUIRE); }
/// Send a message to a thread waiting to receive on this mailbox, and block the
/// current thread awaiting a response. The response will be placed in the message
/// object provided.
/// \arg msg [inout] The mailbox::message to send, will contain the response afterward
/// \returns true if a reply was recieved
bool call(message *msg);
/// current thread awaiting a response. The message contents should be in the calling
/// thread's message_data.
/// \returns j6_status_ok if a reply was received
j6_status_t call();
/// Receive the next available message, optionally blocking if no messages are available.
/// \arg msg [out] a pointer to the received message. The caller is responsible for
/// deleting the message structure when finished.
/// \arg data [out] a thread::message_data structure to fill
/// \arg reply_tag [out] the reply_tag to use when replying to this message
/// \arg block True if this call should block when no messages are available.
/// \returns True if a message was received successfully.
bool receive(message *&msg, bool block);
class replyer;
/// \returns j6_status_ok if a message was received
j6_status_t receive(thread::message_data &data, reply_tag_t &reply_tag, bool block);
/// Find a given pending message to be responded to. Returns a replyer object, which will
/// wake the calling thread upon destruction.
/// \arg reply_tag The reply tag in the original message
/// \returns A replyer object contining the message
replyer reply(uint16_t reply_tag);
/// \arg data Message data to pass on to the caller
/// \returns j6_status_ok if the reply was successfully sent
j6_status_t reply(reply_tag_t reply_tag, const thread::message_data &data);
private:
inline uint16_t next_reply_tag() {
return (__atomic_add_fetch(&m_next_reply_tag, 1, __ATOMIC_SEQ_CST) << 1) | 1;
}
wait_queue m_callers;
wait_queue m_responders;
struct reply_to { reply_tag_t reply_tag; thread *thread; };
using reply_map =
util::node_map<uint64_t, reply_to, 0, heap_allocated>;
util::spinlock m_reply_lock;
reply_map m_reply_map;
bool m_closed;
uint16_t m_next_reply_tag;
reply_tag_t m_next_reply_tag;
util::spinlock m_message_lock;
util::deque<message*> m_messages;
struct pending { thread *sender; message *msg; };
util::map<uint16_t, pending> m_pending;
wait_queue m_queue;
friend reply_tag_t & get_map_key(reply_to &rt);
};
struct mailbox::message :
public slab_allocated<message, mem::frame_size>
{
uint64_t tag;
uint64_t subtag;
uint16_t reply_tag;
uint8_t handle_count;
j6_handle_t handles[mailbox::max_handle_count];
};
class mailbox::replyer
{
public:
replyer() : msg {nullptr}, caller {nullptr}, status {0} {}
replyer(mailbox::message *m, thread *c, uint64_t s) : msg {m}, caller {c}, status {s} {}
replyer(replyer &&o) : msg {o.msg}, caller {o.caller}, status {o.status} {
o.msg = nullptr; o.caller = nullptr; o.status = 0;
}
replyer & operator=(replyer &&o) {
msg = o.msg; caller = o.caller; status = o.status;
o.msg = nullptr; o.caller = nullptr;
return *this;
}
/// The replyer's dtor will wake the calling thread
~replyer();
/// Check if the reply is valid
inline bool valid() const { return msg && caller; }
/// Set an error to give to the caller
inline void error(uint64_t e) { status = e; }
mailbox::message *msg;
private:
thread *caller;
uint64_t status;
};
inline mailbox::reply_tag_t & get_map_key(mailbox::reply_to &rt) { return rt.reply_tag; }
} // namespace obj

View File

@@ -114,6 +114,14 @@ public:
/// \returns The clock time at which to wake. 0 for no timeout.
inline uint64_t wake_timeout() const { return m_wake_timeout; }
struct message_data
{
uint64_t tag, subtag;
j6_handle_t handle;
};
message_data & get_message_data() { return m_message_data; }
inline bool has_state(state s) const {
return static_cast<uint8_t>(m_state) & static_cast<uint8_t>(s);
}
@@ -186,6 +194,7 @@ private:
uint64_t m_wake_value;
uint64_t m_wake_timeout;
message_data m_message_data;
j6_handle_t m_self_handle;
};