[kernel] Replace endpoint with new mailbox API

The new mailbox kernel object API offers asynchronous message-based IPC
for sending data and handles between threads, as opposed to endpoint's
synchronous model.
This commit is contained in:
Justin C. Miller
2022-02-22 00:06:14 -08:00
parent f7ae2e2220
commit 30aed15090
17 changed files with 647 additions and 364 deletions

View File

@@ -32,9 +32,9 @@ kernel = module("kernel",
"memory_bootstrap.cpp",
"msr.cpp",
"objects/channel.cpp",
"objects/endpoint.cpp",
"objects/event.cpp",
"objects/kobject.cpp",
"objects/mailbox.cpp",
"objects/thread.cpp",
"objects/process.cpp",
"objects/system.cpp",
@@ -52,9 +52,9 @@ kernel = module("kernel",
"syscall_verify.cpp.cog",
"syscalls.inc.cog",
"syscalls/channel.cpp",
"syscalls/endpoint.cpp",
"syscalls/event.cpp",
"syscalls/handle.cpp",
"syscalls/mailbox.cpp",
"syscalls/object.cpp",
"syscalls/process.cpp",
"syscalls/system.cpp",

View File

@@ -1,157 +0,0 @@
#include "assert.h"
#include "clock.h"
#include "device_manager.h"
#include "objects/endpoint.h"
#include "objects/process.h"
#include "objects/thread.h"
#include "vm_space.h"
namespace obj {
endpoint::endpoint() :
kobject {kobject::type::endpoint}
{}
endpoint::~endpoint()
{
if (!check_signal(j6_signal_closed))
close();
}
void
endpoint::close()
{
kobject::close();
util::scoped_lock lock {m_lock};
for (auto &data : m_blocked) {
if (data.th)
data.th->wake_on_result(this, j6_status_closed);
}
device_manager::get().unbind_irqs(this);
}
j6_status_t
endpoint::send(j6_tag_t tag, const void *data, size_t data_len)
{
thread_data sender = { &thread::current(), data };
sender.len = data_len;
sender.tag = tag;
util::scoped_lock lock {m_lock};
if (!check_signal(j6_signal_endpoint_can_send)) {
assert_signal(j6_signal_endpoint_can_recv);
m_blocked.append(sender);
lock.release();
sender.th->wait_on_object(this);
// we woke up having already finished the send
// because it happened in the receiver
return sender.th->get_wait_result();
}
thread_data receiver = m_blocked.pop_front();
if (m_blocked.count() == 0)
deassert_signal(j6_signal_endpoint_can_send);
j6_status_t status = do_message_copy(sender, receiver);
receiver.th->wake_on_result(this, status);
return status;
}
j6_status_t
endpoint::receive(j6_tag_t *tag, void *data, size_t *data_len, uint64_t timeout)
{
thread_data receiver = { &thread::current(), data };
receiver.tag_p = tag;
receiver.len_p = data_len;
// Timeout is a duration, but wait_on_* calls need a time
if (timeout)
timeout += clock::get().value();
util::scoped_lock lock {m_lock};
if (!check_signal(j6_signal_endpoint_can_recv)) {
assert_signal(j6_signal_endpoint_can_send);
m_blocked.append(receiver);
lock.release();
receiver.th->wait_on_object(this, timeout);
// we woke up having already finished the recv
// because it happened in the sender
return receiver.th->get_wait_result();
}
thread_data sender = m_blocked.pop_front();
if (m_blocked.count() == 0)
deassert_signal(j6_signal_endpoint_can_recv);
// TODO: don't pop sender on some errors
j6_status_t status = do_message_copy(sender, receiver);
if (sender.th)
sender.th->wake_on_result(this, status);
return status;
}
void
endpoint::signal_irq(unsigned irq)
{
j6_tag_t tag = j6_tag_from_irq(irq);
util::scoped_lock lock {m_lock};
if (!check_signal(j6_signal_endpoint_can_send)) {
assert_signal(j6_signal_endpoint_can_recv);
for (auto &blocked : m_blocked)
if (blocked.tag == tag)
return;
thread_data sender = { nullptr, nullptr };
sender.tag = tag;
m_blocked.append(sender);
return;
}
thread_data receiver = m_blocked.pop_front();
kassert(receiver.len_p && receiver.tag_p,
"endpoint had can_send but m_blocked was empty");
if (m_blocked.count() == 0)
deassert_signal(j6_signal_endpoint_can_send);
*receiver.len_p = 0;
*receiver.tag_p = tag;
receiver.th->wake_on_result(this, j6_status_ok);
}
j6_status_t
endpoint::do_message_copy(const endpoint::thread_data &sender, endpoint::thread_data &receiver)
{
if (sender.len > *receiver.len_p)
return j6_err_insufficient;
if (sender.len) {
vm_space &source = sender.th->parent().space();
vm_space &dest = receiver.th->parent().space();
vm_space::copy(source, dest, sender.data, receiver.buffer, sender.len);
}
*receiver.len_p = sender.len;
*receiver.tag_p = sender.tag;
// TODO: this will not work if non-contiguous pages are mapped!!
return j6_status_ok;
}
} // namespace obj

View File

@@ -1,81 +0,0 @@
#pragma once
/// \file endpoint.h
/// Definition of endpoint kobject types
#include <j6/caps.h>
#include <j6/signals.h>
#include <util/spinlock.h>
#include <util/vector.h>
#include "objects/kobject.h"
namespace obj {
/// Endpoints are objects that enable synchronous message-passing IPC
class endpoint :
public kobject
{
public:
/// Capabilities on a newly constructed endpoint handle
constexpr static j6_cap_t creation_caps = j6_cap_endpoint_all;
endpoint();
virtual ~endpoint();
static constexpr kobject::type type = kobject::type::endpoint;
/// Close the endpoint, waking all waiting processes with an error
virtual void close() override;
/// Check if the endpoint has space for a message to be sent
inline bool can_send() const { return check_signal(j6_signal_endpoint_can_send); }
/// Check if the endpoint has a message wiating already
inline bool can_receive() const { return check_signal(j6_signal_endpoint_can_recv); }
/// Send a message to a thread waiting to receive on this endpoint. If no threads
/// are currently trying to receive, block the current thread.
/// \arg tag The application-specified message tag
/// \arg data The message data
/// \arg len The size in bytes of the message
/// \returns j6_status_ok on success
j6_status_t send(j6_tag_t tag, const void *data, size_t data_len);
/// Receive a message from a thread waiting to send on this endpoint. If no threads
/// are currently trying to send, block the current thread.
/// \arg tag [in] The sender-specified message tag
/// \arg len [in] The size in bytes of the buffer [out] Number of bytes in the message
/// \arg data Buffer for copying message data into
/// \arg timeout Receive timeout in nanoseconds
/// \returns j6_status_ok on success
j6_status_t receive(j6_tag_t *tag, void *data, size_t *data_len, uint64_t timeout = 0);
/// Give the listener on the endpoint a message that a bound IRQ has been signalled
/// \arg irq The IRQ that caused this signal
void signal_irq(unsigned irq);
private:
struct thread_data
{
thread *th;
union {
const void *data;
void *buffer;
};
union {
j6_tag_t *tag_p;
j6_tag_t tag;
};
union {
size_t *len_p;
size_t len;
};
};
j6_status_t do_message_copy(const thread_data &sender, thread_data &receiver);
util::spinlock m_lock;
util::vector<thread_data> m_blocked;
};
} // namespace obj

View File

@@ -0,0 +1,123 @@
#include <util/counted.h>
#include "objects/handle.h"
#include "objects/mailbox.h"
#include "objects/thread.h"
DEFINE_SLAB_ALLOCATOR(obj::mailbox::message);
namespace obj {
static_assert(mailbox::message::slab_size % sizeof(mailbox::message) == 0,
"mailbox message size does not fit cleanly into N pages.");
constexpr uint64_t no_message = 1;
mailbox::mailbox() :
kobject(kobject::type::mailbox),
m_closed {false},
m_next_reply_tag {0}
{
}
mailbox::~mailbox()
{
close();
}
void
mailbox::close()
{
util::scoped_lock lock {m_message_lock};
// If this was previously closed, we're done
if (closed()) return;
m_closed = true;
while (!m_messages.empty()) {
message *msg = m_messages.pop_front();
delete msg;
}
for (auto &p : m_pending) {
delete p.val.msg;
p.val.sender->wake(no_message);
}
}
void
mailbox::send(message *msg)
{
util::scoped_lock lock {m_message_lock};
m_messages.push_back(msg);
thread *t = m_queue.pop_next();
if (t) t->wake();
}
bool
mailbox::call(message *msg)
{
util::scoped_lock lock {m_message_lock};
if (!++m_next_reply_tag) ++m_next_reply_tag;
msg->reply_tag = m_next_reply_tag;
thread &current = thread::current();
m_pending.insert(m_next_reply_tag, {&current, msg});
m_messages.push_back(msg);
thread *t = m_queue.pop_next();
if (t) t->wake();
return (current.block() != no_message);
}
bool
mailbox::receive(mailbox::message *&msg, bool block)
{
util::scoped_lock lock {m_message_lock};
// This needs to be a loop because we're re-acquiring the lock
// after waking up, and may have missed the message that woke us
while (m_messages.empty()) {
if (!block) {
msg = nullptr;
return false;
}
thread &cur = thread::current();
m_queue.add_thread(&cur);
lock.release();
uint64_t result = cur.block();
if (result == no_message)
return false;
lock.reacquire();
}
msg = m_messages.pop_front();
return true;
}
mailbox::replyer
mailbox::reply(uint16_t reply_tag)
{
util::scoped_lock lock {m_message_lock};
pending *p = m_pending.find(reply_tag);
if (!p) return {};
thread *caller = p->sender;
message *msg = p->msg;
m_pending.erase(reply_tag);
return {msg, caller};
}
mailbox::replyer::~replyer()
{
}
} // namespace obj

View File

@@ -0,0 +1,135 @@
#pragma once
/// \file mailbox.h
/// Definition of mailbox kobject types
#include <j6/caps.h>
#include <util/counted.h>
#include <util/map.h>
#include <util/spinlock.h>
#include "objects/handle.h"
#include "objects/kobject.h"
#include "slab_allocated.h"
#include "wait_queue.h"
namespace obj {
class thread;
/// mailboxs are objects that enable synchronous message-passing IPC
class mailbox :
public kobject
{
public:
/// Capabilities on a newly constructed mailbox handle
constexpr static j6_cap_t creation_caps = j6_cap_mailbox_all;
static constexpr kobject::type type = kobject::type::mailbox;
/// Max message data length
constexpr static size_t max_data_length = 88;
/// Max message handle count
constexpr static size_t max_handle_count = 6;
struct message;
mailbox();
virtual ~mailbox();
/// Close the mailbox, waking all waiting processes with an error
void close();
/// Check if the mailbox has been closed
inline bool closed() const { return m_closed; }
/// Send a message to a thread waiting to receive on this mailbox. If no threads
/// are currently trying to receive, block the current thread.
/// \arg msg The mailbox::message data structure to send
void send(message *msg);
/// Send a message to a thread waiting to receive on this mailbox, and block the
/// current thread awaiting a response. The response will be placed in the message
/// object provided.
/// \arg msg [inout] The mailbox::message to send, will contain the response afterward
/// \returns true if a reply was recieved
bool call(message *msg);
/// Receive the next available message, optionally blocking if no messages are available.
/// \arg msg [out] a pointer to the received message. The caller is responsible for
/// deleting the message structure when finished.
/// \arg block True if this call should block when no messages are available.
/// \returns True if a message was received successfully.
bool receive(message *&msg, bool block);
class replyer;
/// Find a given pending message to be responded to. Returns a replyer object, which will
/// wake the calling thread upon destruction.
/// \arg reply_tag The reply tag in the original message
/// \returns A replyer object contining the message
replyer reply(uint16_t reply_tag);
private:
bool m_closed;
uint16_t m_next_reply_tag;
util::spinlock m_message_lock;
util::deque<message*> m_messages;
struct pending { thread *sender; message *msg; };
util::map<uint16_t, pending> m_pending;
wait_queue m_queue;
};
struct mailbox::message :
public slab_allocated<message, 1>
{
uint64_t tag;
uint64_t badge;
uint16_t reply_tag;
uint16_t reserved0;
uint16_t reserved1;
uint8_t handle_count;
uint8_t data_len;
handle handles[mailbox::max_handle_count];
uint8_t data[mailbox::max_data_length];
};
class mailbox::replyer
{
public:
replyer() : msg {nullptr}, caller {nullptr} {}
replyer(mailbox::message *m, thread *c) : msg {m}, caller {c} {}
replyer(replyer &&o) : msg {o.msg}, caller {o.caller} {
o.msg = nullptr; o.caller = nullptr;
}
replyer & operator=(replyer &&o) {
msg = o.msg; caller = o.caller;
o.msg = nullptr; o.caller = nullptr;
return *this;
}
/// The replyer's dtor will wake the calling thread
~replyer();
/// Check if the reply is valid
inline bool valid() const { return msg && caller; }
/// Set an error to give to the caller
inline void error(uint64_t e) { status = e; }
mailbox::message *msg;
private:
thread *caller;
uint64_t status;
};
} // namespace obj

View File

@@ -1,63 +0,0 @@
#include <j6/errors.h>
#include <j6/types.h>
#include "logger.h"
#include "objects/endpoint.h"
#include "syscalls/helpers.h"
using namespace obj;
namespace syscalls {
j6_status_t
endpoint_create(j6_handle_t *self)
{
construct_handle<endpoint>(self);
return j6_status_ok;
}
j6_status_t
endpoint_send(endpoint *self, uint64_t tag, const void * data, size_t data_len)
{
if (tag & j6_tag_system_flag)
return j6_err_invalid_arg;
return self->send(tag, data, data_len);
}
j6_status_t
endpoint_receive(endpoint *self, uint64_t * tag, void * data, size_t * data_len, uint64_t timeout)
{
// Use local variables instead of the passed-in pointers, since
// they may get filled in when the sender is running, which means
// a different user VM space would be active.
j6_tag_t out_tag = j6_tag_invalid;
size_t out_len = *data_len;
j6_status_t s = self->receive(&out_tag, data, &out_len, timeout);
*tag = out_tag;
*data_len = out_len;
return s;
}
j6_status_t
endpoint_sendrecv(endpoint *self, uint64_t * tag, void * data, size_t * data_len, uint64_t timeout)
{
if (*tag & j6_tag_system_flag)
return j6_err_invalid_arg;
j6_status_t status = self->send(*tag, data, *data_len);
if (status != j6_status_ok)
return status;
// Use local variables instead of the passed-in pointers, since
// they may get filled in when the sender is running, which means
// a different user VM space would be active.
j6_tag_t out_tag = j6_tag_invalid;
size_t out_len = *data_len;
j6_status_t s = self->receive(&out_tag, data, &out_len, timeout);
*tag = out_tag;
*data_len = out_len;
return s;
}
} // namespace syscalls

View File

@@ -0,0 +1,235 @@
#include <j6/errors.h>
#include <j6/flags.h>
#include <util/util.h>
#include "objects/mailbox.h"
#include "objects/thread.h"
#include "syscalls/helpers.h"
using namespace obj;
namespace syscalls {
j6_status_t
mailbox_create(j6_handle_t *self)
{
construct_handle<mailbox>(self);
return j6_status_ok;
}
j6_status_t
mailbox_close(mailbox *self)
{
if (self->closed())
return j6_status_closed;
self->close();
return j6_status_ok;
}
j6_status_t
prep_send(
mailbox::message *msg,
uint64_t tag,
uint64_t badge,
const void *data,
size_t data_len,
const j6_handle_t *handles,
size_t handle_count)
{
if (!msg ||
data_len > mailbox::max_data_length ||
handle_count > mailbox::max_handle_count)
return j6_err_invalid_arg;
msg->tag = tag;
msg->badge = badge;
msg->handle_count = handle_count;
for (unsigned i = 0; i < handle_count; ++i) {
handle *h = get_handle<kobject>(handles[i]);
if (!h)
return j6_err_invalid_arg;
msg->handles[i] = *h;
}
msg->data_len = data_len;
memcpy(msg->data, data, data_len);
return j6_status_ok;
}
void
prep_receive(
mailbox::message *msg,
uint64_t *tag,
uint64_t *badge,
uint16_t *reply_tag,
void *data,
size_t *data_len,
j6_handle_t *handles,
size_t *handle_count)
{
if (tag) *tag = msg->tag;
if (badge) *badge = msg->badge;
if (reply_tag) *reply_tag = msg->reply_tag;
*data_len = msg->data_len;
memcpy(data, msg->data, msg->data_len);
*handle_count = msg->handle_count;
process &proc = process::current();
for (size_t i = 0; i < msg->handle_count; ++i)
handles[i] = proc.add_handle(msg->handles[i]);
delete msg;
}
j6_status_t
mailbox_send(
handle *self_handle,
uint64_t tag,
const void * data,
size_t data_len,
j6_handle_t * handles,
size_t handle_count)
{
mailbox *self = self_handle->as<mailbox>();
mailbox::message *msg = new mailbox::message;
j6_status_t s = prep_send(msg,
tag, self_handle->badge,
data, data_len,
handles, handle_count);
if (s != j6_status_ok) {
delete msg;
return s;
}
self->send(msg);
return j6_status_ok;
}
j6_status_t
mailbox_receive(
mailbox *self,
uint64_t * tag,
void * data,
size_t * data_len,
j6_handle_t * handles,
size_t * handle_count,
uint16_t * reply_tag,
uint64_t * badge,
uint64_t flags)
{
if (*data_len < mailbox::max_data_length ||
*handle_count < mailbox::max_handle_count)
return j6_err_insufficient;
mailbox::message *msg = nullptr;
bool block = flags & j6_mailbox_block;
if (!self->receive(msg, block)) {
// No message received
return self->closed() ? j6_status_closed :
block ? j6_status_would_block :
j6_err_unexpected;
}
prep_receive(msg,
tag, badge, reply_tag,
data, data_len,
handles, handle_count);
return j6_status_ok;
}
j6_status_t
mailbox_call(
handle *self_handle,
uint64_t * tag,
void * data,
size_t * data_len,
j6_handle_t * handles,
size_t * handle_count)
{
mailbox *self = self_handle->as<mailbox>();
mailbox::message *msg = new mailbox::message;
j6_status_t s = prep_send(msg,
*tag, self_handle->badge,
data, *data_len,
handles, *handle_count);
if (s != j6_status_ok) {
delete msg;
return s;
}
if (!self->call(msg)) {
delete msg;
return self->closed() ? j6_status_closed :
j6_err_unexpected;
}
prep_receive(msg,
tag, nullptr, nullptr,
data, data_len,
handles, handle_count);
delete msg;
return j6_status_ok;
}
j6_status_t
mailbox_respond(
mailbox *self,
uint64_t tag,
const void * data,
size_t data_len,
j6_handle_t * handles,
size_t handle_count,
uint16_t reply_tag)
{
mailbox::replyer reply = self->reply(reply_tag);
if (!reply.valid())
return j6_err_invalid_arg;
j6_status_t s = prep_send(reply.msg, tag, 0,
data, data_len,
handles, handle_count);
if (s != j6_status_ok) {
reply.error(s);
return s;
}
return j6_status_ok;
}
j6_status_t
mailbox_respond_receive(
mailbox *self,
uint64_t * tag,
void * data,
size_t * data_len,
j6_handle_t * handles,
size_t * handle_count,
uint16_t * reply_tag,
uint64_t * badge,
uint64_t flags)
{
j6_status_t s = mailbox_respond(self, *tag, data, *data_len, handles, *handle_count, *reply_tag);
if (s != j6_status_ok)
return s;
return mailbox_receive(self, tag, data, data_len, handles, handle_count, reply_tag, badge, flags);
}
} // namespace syscalls