[kernel] Add endpoint object and related syscalls

The endpoint object adds synchronous IPC. Also added the wait-type of
'object' to threads.
This commit is contained in:
2020-09-07 01:09:56 -07:00
parent 53d260cc6e
commit 8534d8d3c5
13 changed files with 368 additions and 49 deletions

View File

@@ -0,0 +1,97 @@
#include "objects/endpoint.h"
#include "objects/thread.h"
#include "page_manager.h"
#include "scheduler.h"
endpoint::endpoint() :
kobject(kobject::type::endpoint)
{}
endpoint::~endpoint()
{
if (!check_signal(j6_signal_endpoint_closed))
close();
}
void
endpoint::close()
{
assert_signal(j6_signal_endpoint_closed);
for (auto &data : m_blocked)
data.th->wake_on_result(this, j6_status_closed);
}
j6_status_t
endpoint::send(size_t len, void *data)
{
scheduler &s = scheduler::get();
TCB *tcb = s.current();
thread_data sender = { thread::from_tcb(tcb), data };
sender.len = len;
if (!check_signal(j6_signal_endpoint_can_send)) {
assert_signal(j6_signal_endpoint_can_recv);
sender.th->wait_on_object(this);
m_blocked.append(sender);
s.schedule();
// we woke up having already finished the send
// because it happened in the receiver
return sender.th->get_wait_result();
}
thread_data receiver = m_blocked.pop_front();
if (m_blocked.count() == 0)
deassert_signal(j6_signal_endpoint_can_send);
j6_status_t status = do_message_copy(sender, receiver);
receiver.th->wake_on_result(this, status);
return status;
}
j6_status_t
endpoint::receive(size_t *len, void *data)
{
scheduler &s = scheduler::get();
TCB *tcb = s.current();
thread_data receiver = { thread::from_tcb(tcb), data };
receiver.len_p = len;
if (!check_signal(j6_signal_endpoint_can_recv)) {
assert_signal(j6_signal_endpoint_can_send);
receiver.th->wait_on_object(this);
m_blocked.append(receiver);
s.schedule();
// we woke up having already finished the recv
// because it happened in the sender
return receiver.th->get_wait_result();
}
thread_data sender = m_blocked.pop_front();
if (m_blocked.count() == 0)
deassert_signal(j6_signal_endpoint_can_recv);
// TODO: don't pop sender on some errors
j6_status_t status = do_message_copy(sender, receiver);
sender.th->wake_on_result(this, status);
return status;
}
j6_status_t
endpoint::do_message_copy(const endpoint::thread_data &sender, endpoint::thread_data &receiver)
{
if (sender.len > *receiver.len_p)
return j6_err_insufficient;
page_manager *pm = page_manager::get();
void *send_data = pm->get_offset_from_mapped(sender.data, sender.th->tcb()->pml4);
void *recv_data = pm->get_offset_from_mapped(receiver.data, receiver.th->tcb()->pml4);
kutil::memcpy(recv_data, send_data, sender.len);
*receiver.len_p = sender.len;
// TODO: this will not work if non-contiguous pages are mapped!!
return j6_status_ok;
}

View File

@@ -0,0 +1,53 @@
#pragma once
/// \file endpoint.h
/// Definition of endpoint kobject types
#include "j6/signals.h"
#include "objects/kobject.h"
/// Endpoints are objects that enable synchronous message-passing IPC
class endpoint :
public kobject
{
public:
endpoint();
virtual ~endpoint();
/// Close the endpoint, waking all waiting processes with an error
void close();
/// Check if the endpoint has space for a message to be sent
inline bool can_send() const { return check_signal(j6_signal_endpoint_can_send); }
/// Check if the endpoint has a message wiating already
inline bool can_receive() const { return check_signal(j6_signal_endpoint_can_recv); }
/// Send a message to a thread waiting to receive on this endpoint. If no threads
/// are currently trying to receive, block the current thread.
/// \arg len The size in bytes of the message
/// \arg data The message data
/// \returns j6_status_ok on success
j6_status_t send(size_t len, void *data);
/// Receive a message from a thread waiting to send on this endpoint. If no threads
/// are currently trying to send, block the current thread.
/// \arg len [in] The size in bytes of the buffer [out] Number of bytes in the message
/// \arg data Buffer for copying message data into
/// \returns j6_status_ok on success
j6_status_t receive(size_t *len, void *data);
private:
struct thread_data
{
thread *th;
void *data;
union {
size_t *len_p;
size_t len;
};
};
j6_status_t do_message_copy(const thread_data &sender, thread_data &receiver);
kutil::vector<thread_data> m_blocked;
};

View File

@@ -20,6 +20,7 @@ public:
event,
eventpair,
channel,
endpoint,
vms,
vmo,

View File

@@ -56,6 +56,14 @@ thread::wait_on_time(uint64_t t)
clear_state(state::ready);
}
void
thread::wait_on_object(kobject *o)
{
m_wait_type = wait_type::object;
m_wait_data = reinterpret_cast<uint64_t>(o);
clear_state(state::ready);
}
bool
thread::wake_on_signals(kobject *obj, j6_signal_t signals)
{
@@ -86,6 +94,20 @@ thread::wake_on_time(uint64_t now)
return true;
}
bool
thread::wake_on_object(kobject *o)
{
if (m_wait_type != wait_type::object ||
reinterpret_cast<uint64_t>(o) != m_wait_data)
return false;
m_wait_type = wait_type::none;
m_wait_result = j6_status_ok;
m_wait_obj = o->koid();
set_state(state::ready);
return true;
}
void
thread::wake_on_result(kobject *obj, j6_status_t result)
{

View File

@@ -35,7 +35,7 @@ class thread :
public kobject
{
public:
enum class wait_type : uint8_t { none, signal, time };
enum class wait_type : uint8_t { none, signal, time, object };
enum class state : uint8_t {
ready = 0x01,
loading = 0x02,
@@ -78,6 +78,10 @@ public:
/// \arg t Clock value to wait for
void wait_on_time(uint64_t t);
/// Block the thread, waiting on the given object
/// \arg o The ojbect that should wake this thread
void wait_on_object(kobject *o);
/// Wake the thread if it is waiting on signals.
/// \arg obj Object that changed signals
/// \arg signals Signal state of the object
@@ -89,6 +93,11 @@ public:
/// \returns True if this action unblocked the thread
bool wake_on_time(uint64_t now);
/// Wake the thread if it is waiting on the given object.
/// \arg o Object trying to wake the thread
/// \returns True if this action unblocked the thread
bool wake_on_object(kobject *o);
/// Wake the thread with a given result code.
/// \arg obj Object that changed signals
/// \arg result Result code to return to the thread

View File

@@ -153,6 +153,30 @@ page_manager::map_offset_pointer(void **pointer, size_t length)
*pointer = kutil::offset_pointer(*pointer, page_offset);
}
void *
page_manager::get_offset_from_mapped(void *p, page_table *pml4)
{
if (!pml4) pml4 = get_pml4();
uintptr_t v = reinterpret_cast<uintptr_t>(p);
page_table_indices idx{v};
page_table *tables[4] = {pml4, nullptr, nullptr, nullptr};
for (int i = 1; i < 4; ++i) {
tables[i] = tables[i-1]->get(idx[i-1]);
if (!tables[i])
return nullptr;
}
uintptr_t a = tables[3]->entries[idx[3]];
if (!(a & 1))
return nullptr;
return offset_virt(
(a & ~0xfffull) |
(v & 0xfffull));
}
void
page_manager::dump_pml4(page_table *pml4, bool recurse)
{

View File

@@ -101,6 +101,11 @@ public:
return kutil::offset_pointer(reinterpret_cast<void *>(a), memory::page_offset);
}
/// Get the offet-mapped virtual address of a normal virtual address
/// \arg p Virtual address
/// \returns Virtual address in offset-mapped linear space
void * get_offset_from_mapped(void *p, page_table *pml4 = nullptr);
/// Dump the given or current PML4 to the console
/// \arg pml4 The page table to use, null for the current one
/// \arg recurse Whether to print sub-tables

View File

@@ -0,0 +1,99 @@
#include "j6/errors.h"
#include "j6/types.h"
#include "log.h"
#include "objects/endpoint.h"
#include "objects/process.h"
#include "scheduler.h"
namespace syscalls {
j6_status_t
endpoint_create(j6_handle_t *handle)
{
scheduler &s = scheduler::get();
TCB *tcb = s.current();
thread *parent = thread::from_tcb(tcb);
process &p = parent->parent();
endpoint *e = new endpoint;
*handle = p.add_handle(e);
return j6_status_ok;
}
j6_status_t
endpoint_close(j6_handle_t handle)
{
scheduler &s = scheduler::get();
TCB *tcb = s.current();
thread *parent = thread::from_tcb(tcb);
process &p = parent->parent();
kobject *o = p.lookup_handle(handle);
if (!o || o->get_type() != kobject::type::endpoint)
return j6_err_invalid_arg;
p.remove_handle(handle);
endpoint *e = static_cast<endpoint*>(o);
e->close();
return j6_status_ok;
}
j6_status_t
endpoint_send(j6_handle_t handle, size_t len, void *data)
{
scheduler &s = scheduler::get();
TCB *tcb = s.current();
thread *parent = thread::from_tcb(tcb);
process &p = parent->parent();
kobject *o = p.lookup_handle(handle);
if (!o || o->get_type() != kobject::type::endpoint)
return j6_err_invalid_arg;
endpoint *e = static_cast<endpoint*>(o);
j6_status_t status = e->send(len, data);
return status;
}
j6_status_t
endpoint_receive(j6_handle_t handle, size_t *len, void *data)
{
scheduler &s = scheduler::get();
TCB *tcb = s.current();
thread *parent = thread::from_tcb(tcb);
process &p = parent->parent();
kobject *o = p.lookup_handle(handle);
if (!o || o->get_type() != kobject::type::endpoint)
return j6_err_invalid_arg;
endpoint *e = static_cast<endpoint*>(o);
j6_status_t status = e->receive(len, data);
return status;
}
j6_status_t
endpoint_sendrecv(j6_handle_t handle, size_t *len, void *data)
{
scheduler &s = scheduler::get();
TCB *tcb = s.current();
thread *parent = thread::from_tcb(tcb);
process &p = parent->parent();
kobject *o = p.lookup_handle(handle);
if (!o || o->get_type() != kobject::type::endpoint)
return j6_err_invalid_arg;
endpoint *e = static_cast<endpoint*>(o);
j6_status_t status = e->send(*len, data);
if (status != j6_status_ok)
return status;
status = e->receive(len, data);
return status;
}
} // namespace syscalls