[kernel] Make mailbox non-fixed-length again

Going back to letting mailboxes use variable-length data. Note that this
requires extra copies, so shared memory channels should be used for
anything in the hot path. But this allows better RPC over mailboxes and
other flexibility.

Other changes:
- added a j6::proto::sl::client class to act as a service locator
  client, instead of duplicating that code in every program.
- moved protocol ids into j6/tables/protocols.inc so that C++ clients
  can easily have their own API
This commit is contained in:
Justin C. Miller
2023-08-07 22:59:03 -07:00
parent a0f91ed0fd
commit 8b3fa3ed01
22 changed files with 329 additions and 81 deletions

View File

@@ -0,0 +1,49 @@
#include <j6/memutils.h>
#include <util/basic_types.h>
#include "ipc_message.h"
namespace ipc {
message::message() : tag {0}, data {nullptr, 0}, handles {nullptr, 0} {}
message::message(
uint64_t in_tag,
const util::buffer &in_data,
const util::counted<j6_handle_t> &in_handles) :
tag {in_tag}, data {nullptr, in_data.count}, handles {nullptr, in_handles.count}
{
if (data.count) {
data.pointer = new uint8_t [data.count];
memcpy(data.pointer, in_data.pointer, data.count);
}
if (handles.count) {
handles.pointer = new j6_handle_t [handles.count];
memcpy(handles.pointer, in_handles.pointer, handles.count * sizeof(j6_handle_t));
}
}
message::message(message &&other) { *this = util::move(other); }
message::~message()
{
delete [] reinterpret_cast<uint8_t*>(data.pointer);
delete [] handles.pointer;
}
message &
message::operator=(message &&other)
{
tag = other.tag;
other.tag = 0;
data = other.data;
other.data = {nullptr, 0};
handles = other.handles;
other.handles = {nullptr, 0};
return *this;
}
} // namespace ipc

25
src/kernel/ipc_message.h Normal file
View File

@@ -0,0 +1,25 @@
#pragma once
/// \file ipc_message.h
/// Definition of shared message structure
#include <stdint.h>
#include <j6/types.h>
#include <util/counted.h>
namespace ipc {
struct message
{
uint64_t tag;
util::buffer data;
util::counted<j6_handle_t> handles;
message();
message(uint64_t in_tag, const util::buffer &in_data, const util::counted<j6_handle_t> &in_handles);
message(message &&other);
~message();
message & operator=(message &&other);
};
} // namespace ipc

View File

@@ -27,6 +27,7 @@ kernel = module("kernel",
"interrupts.cpp",
"interrupts.s",
"io.cpp",
"ipc_message.cpp",
"kernel_main.cpp",
"logger.cpp",
"memory.cpp",

View File

@@ -1,4 +1,6 @@
#include <util/basic_types.h>
#include <util/counted.h>
#include <j6/memutils.h>
#include "objects/mailbox.h"
#include "objects/thread.h"
@@ -49,7 +51,7 @@ mailbox::call()
}
j6_status_t
mailbox::receive(thread::message_data &data, reply_tag_t &reply_tag, bool block)
mailbox::receive(ipc::message &data, reply_tag_t &reply_tag, bool block)
{
if (closed())
return j6_status_closed;
@@ -81,7 +83,7 @@ mailbox::receive(thread::message_data &data, reply_tag_t &reply_tag, bool block)
}
j6_status_t
mailbox::reply(reply_tag_t reply_tag, const thread::message_data &data)
mailbox::reply(reply_tag_t reply_tag, ipc::message &&data)
{
if (closed())
return j6_status_closed;
@@ -95,7 +97,7 @@ mailbox::reply(reply_tag_t reply_tag, const thread::message_data &data)
m_reply_map.erase(reply_tag);
lock.release();
caller->get_message_data() = data;
caller->set_message_data(util::move(data));
caller->wake(j6_status_ok);
return j6_status_ok;
}

View File

@@ -8,10 +8,9 @@
#include <util/spinlock.h>
#include "heap_allocator.h"
#include "ipc_message.h"
#include "memory.h"
#include "objects/kobject.h"
#include "objects/thread.h"
#include "slab_allocated.h"
#include "wait_queue.h"
namespace obj {
@@ -23,6 +22,7 @@ class mailbox :
public kobject
{
public:
using reply_tag_t = uint64_t;
/// Capabilities on a newly constructed mailbox handle
@@ -43,23 +43,23 @@ public:
/// Send a message to a thread waiting to receive on this mailbox, and block the
/// current thread awaiting a response. The message contents should be in the calling
/// thread's message_data.
/// thread's message data.
/// \returns j6_status_ok if a reply was received
j6_status_t call();
/// Receive the next available message, optionally blocking if no messages are available.
/// \arg data [out] a thread::message_data structure to fill
/// \arg data [out] an ipc::message structure to fill
/// \arg reply_tag [out] the reply_tag to use when replying to this message
/// \arg block True if this call should block when no messages are available.
/// \returns j6_status_ok if a message was received
j6_status_t receive(thread::message_data &data, reply_tag_t &reply_tag, bool block);
j6_status_t receive(ipc::message &data, reply_tag_t &reply_tag, bool block);
/// Find a given pending message to be responded to. Returns a replyer object, which will
/// wake the calling thread upon destruction.
/// wake the calling read upon destruction.
/// \arg reply_tag The reply tag in the original message
/// \arg data Message data to pass on to the caller
/// \returns j6_status_ok if the reply was successfully sent
j6_status_t reply(reply_tag_t reply_tag, const thread::message_data &data);
j6_status_t reply(reply_tag_t reply_tag, ipc::message &&data);
private:
wait_queue m_callers;

View File

@@ -118,6 +118,10 @@ process::thread_exited(thread *th)
void
process::add_handle(j6_handle_t handle)
{
// Passing the invalid handle is fine, just don't add anything
if (handle == j6_handle_invalid)
return;
capability *c = g_cap_table.retain(handle);
kassert(c, "Trying to add a non-existant handle to a process!");

View File

@@ -1,3 +1,4 @@
#include <util/basic_types.h>
#include <util/pointers.h>
#include "kassert.h"
@@ -110,6 +111,9 @@ thread::wake_only()
set_state(state::ready);
}
void thread::set_message_data(ipc::message &&md) { m_message = util::move(md); }
ipc::message && thread::get_message_data() { return util::move(m_message); }
void
thread::exit()
{

View File

@@ -8,6 +8,7 @@
#include <util/spinlock.h>
#include "cpu.h"
#include "ipc_message.h"
#include "objects/kobject.h"
#include "wait_queue.h"
@@ -121,13 +122,8 @@ public:
/// \returns The clock time at which to wake. 0 for no timeout.
inline uint64_t wake_timeout() const { return m_wake_timeout; }
struct message_data
{
uint64_t tag, subtag;
j6_handle_t handle;
};
message_data & get_message_data() { return m_message_data; }
void set_message_data(ipc::message &&md);
ipc::message && get_message_data();
inline bool has_state(state s) const {
return __atomic_load_n(reinterpret_cast<const uint8_t*>(&m_state), __ATOMIC_ACQUIRE) &
@@ -200,7 +196,8 @@ private:
uint64_t m_wake_value;
uint64_t m_wake_timeout;
message_data m_message_data;
ipc::message m_message;
wait_queue m_join_queue;
};

View File

@@ -1,7 +1,9 @@
#include <j6/errors.h>
#include <j6/flags.h>
#include <util/counted.h>
#include <util/util.h>
#include "ipc_message.h"
#include "objects/mailbox.h"
#include "objects/thread.h"
#include "syscalls/helpers.h"
@@ -31,30 +33,39 @@ j6_status_t
mailbox_call(
mailbox *self,
uint64_t *tag,
uint64_t *subtag,
j6_handle_t *handle)
void *in_data,
size_t *data_len,
size_t data_in_len,
j6_handle_t *in_handles,
size_t *handles_count)
{
thread::message_data &data =
thread::current().get_message_data();
thread &cur = thread::current();
data.tag = *tag;
data.subtag = *subtag;
util::buffer data {in_data, data_in_len};
util::counted<j6_handle_t> handles {in_handles, *handles_count};
if (handle)
data.handle = *handle;
ipc::message message(*tag, data, handles);
cur.set_message_data(util::move(message));
j6_status_t s = self->call();
if (s != j6_status_ok)
return s;
*tag = data.tag;
*subtag = data.subtag;
message = cur.get_message_data();
if (handle) {
*handle = data.handle;
process::current().add_handle(*handle);
if (message.handles) {
for (unsigned i = 0; i < message.handles.count; ++i)
process::current().add_handle(message.handles[i]);
}
*tag = message.tag;
*data_len = *data_len > message.data.count ? message.data.count : *data_len;
memcpy(in_data, message.data.pointer, *data_len);
size_t handles_min = *handles_count > message.handles.count ? message.handles.count : *handles_count;
*handles_count = handles_min;
memcpy(in_handles, message.handles.pointer, handles_min * sizeof(j6_handle_t));
return j6_status_ok;
}
@@ -62,28 +73,42 @@ j6_status_t
mailbox_respond(
mailbox *self,
uint64_t *tag,
uint64_t *subtag,
j6_handle_t *handle,
void *in_data,
size_t *data_len,
size_t data_in_len,
j6_handle_t *in_handles,
size_t *handles_count,
uint64_t *reply_tag,
uint64_t flags)
{
thread::message_data data { *tag, *subtag, *handle };
util::buffer data {in_data, data_in_len};
util::counted<j6_handle_t> handles {in_handles, *handles_count};
ipc::message message(*tag, data, handles);
if (*reply_tag) {
j6_status_t s = self->reply(*reply_tag, data);
j6_status_t s = self->reply(*reply_tag, util::move(message));
if (s != j6_status_ok)
return s;
}
bool block = flags & j6_flag_block;
j6_status_t s = self->receive(data, *reply_tag, block);
j6_status_t s = self->receive(message, *reply_tag, block);
if (s != j6_status_ok)
return s;
*tag = data.tag;
*subtag = data.subtag;
*handle = data.handle;
process::current().add_handle(*handle);
if (message.handles) {
for (unsigned i = 0; i < message.handles.count; ++i)
process::current().add_handle(message.handles[i]);
}
*tag = message.tag;
*data_len = *data_len > message.data.count ? message.data.count : *data_len;
memcpy(in_data, message.data.pointer, *data_len);
size_t handles_min = *handles_count > message.handles.count ? message.handles.count : *handles_count;
*handles_count = handles_min;
memcpy(in_handles, message.handles.pointer, handles_min * sizeof(j6_handle_t));
return j6_status_ok;
}