[kernel] First steps at removing channel objects

This commit does a number of things to start the transition of channels
from kernel to user space:

- Remove channel objects / syscalls from the kernel
- Add mutex type in libj6
- Add condition type in libj6
- Add a `ring` type flag for VMA syscalls to create ring buffers
- Implement a rudimentary shared memory channel using all of the above
This commit is contained in:
Justin C. Miller
2023-03-16 19:32:52 -07:00
parent ed95574c24
commit 9fa588566f
20 changed files with 323 additions and 251 deletions

View File

@@ -1,21 +0,0 @@
object channel : object {
uid 3ea38b96aa0e54c8
capabilities [
send
receive
close
]
method create [constructor]
method close [destructor cap:close]
method send [cap:send] {
param data buffer [inout]
}
method receive [cap:receive] {
param data buffer [out]
param flags uint64
}
}

View File

@@ -1,6 +1,5 @@
import "objects/object.def" import "objects/object.def"
import "objects/channel.def"
import "objects/event.def" import "objects/event.def"
import "objects/mailbox.def" import "objects/mailbox.def"
import "objects/process.def" import "objects/process.def"
@@ -12,12 +11,12 @@ interface syscalls [syscall] {
uid 01d9b6a948961097 uid 01d9b6a948961097
expose ref object expose ref object
expose ref system
expose ref event expose ref event
expose ref process
expose ref thread
expose ref mailbox expose ref mailbox
expose ref channel expose ref process
expose ref system
expose ref thread
expose ref vma expose ref vma
# Simple no-op syscall for testing # Simple no-op syscall for testing

View File

@@ -33,7 +33,6 @@ kernel = module("kernel",
"memory.h.cog", "memory.h.cog",
"memory_bootstrap.cpp", "memory_bootstrap.cpp",
"msr.cpp", "msr.cpp",
"objects/channel.cpp",
"objects/event.cpp", "objects/event.cpp",
"objects/kobject.cpp", "objects/kobject.cpp",
"objects/mailbox.cpp", "objects/mailbox.cpp",
@@ -51,7 +50,6 @@ kernel = module("kernel",
"syscall.s", "syscall.s",
"syscall_verify.cpp.cog", "syscall_verify.cpp.cog",
"syscalls.inc.cog", "syscalls.inc.cog",
"syscalls/channel.cpp",
"syscalls/event.cpp", "syscalls/event.cpp",
"syscalls/handle.cpp", "syscalls/handle.cpp",
"syscalls/mailbox.cpp", "syscalls/mailbox.cpp",

View File

@@ -1,89 +0,0 @@
#include <string.h>
#include "assert.h"
#include "memory.h"
#include "objects/channel.h"
#include "objects/thread.h"
#include "objects/vm_area.h"
extern obj::vm_area_guarded g_kernel_buffers;
namespace obj {
constexpr size_t buffer_bytes = mem::kernel_buffer_pages * mem::frame_size;
channel::channel() :
m_len {0},
m_data {g_kernel_buffers.get_section()},
m_closed {false},
m_buffer {reinterpret_cast<uint8_t*>(m_data), buffer_bytes},
kobject {kobject::type::channel}
{
}
channel::~channel()
{
if (!closed()) close();
}
size_t
channel::enqueue(const util::buffer &data)
{
util::scoped_lock lock {m_close_lock};
if (closed()) return 0;
size_t len = data.count;
void *buffer = nullptr;
len = m_buffer.reserve(len, &buffer);
memcpy(buffer, data.pointer, len);
m_buffer.commit(len);
if (len) {
thread *t = m_queue.pop_next();
lock.release();
if (t) t->wake();
}
return len;
}
size_t
channel::dequeue(util::buffer buffer, bool block)
{
util::scoped_lock lock {m_close_lock};
if (closed()) return 0;
void *data = nullptr;
size_t avail = m_buffer.get_block(&data);
if (!avail && block) {
thread &cur = thread::current();
m_queue.add_thread(&cur);
lock.release();
cur.block();
lock.reacquire();
avail = m_buffer.get_block(&data);
}
size_t len = buffer.count > avail ? avail : buffer.count;
memcpy(buffer.pointer, data, len);
m_buffer.consume(len);
return len;
}
void
channel::close()
{
util::scoped_lock lock {m_close_lock};
m_queue.clear();
g_kernel_buffers.return_section(m_data);
m_closed = true;
}
} // namespace obj

View File

@@ -1,54 +0,0 @@
#pragma once
/// \file channel.h
/// Definition of channel objects and related functions
#include <j6/cap_flags.h>
#include <util/bip_buffer.h>
#include <util/counted.h>
#include <util/spinlock.h>
#include "objects/kobject.h"
#include "wait_queue.h"
namespace obj {
/// Channels are uni-directional means of sending data
class channel :
public kobject
{
public:
/// Capabilities on a newly constructed channel handle
static constexpr j6_cap_t creation_caps = j6_cap_channel_all;
static constexpr kobject::type type = kobject::type::channel;
channel();
virtual ~channel();
/// Put a message into the channel
/// \arg data Buffer of data to write
/// \returns The number of bytes successfully written
size_t enqueue(const util::buffer &data);
/// Get a message from the channel, copied into a provided buffer
/// \arg buffer The buffer to copy data into
/// \arg block If true, block the calling thread until there is data
/// \returns The number of bytes copied into the provided buffer
size_t dequeue(util::buffer buffer, bool block = false);
/// Mark this channel as closed, all future calls to enqueue or
/// dequeue messages will fail with j6_status_closed.
void close();
/// Check if this channel has been closed.
inline bool closed() const { return m_closed; }
private:
size_t m_len;
uintptr_t m_data;
bool m_closed;
util::bip_buffer m_buffer;
util::spinlock m_close_lock;
wait_queue m_queue;
};
} // namespace obj

View File

@@ -13,7 +13,6 @@
#include "io.h" #include "io.h"
#include "logger.h" #include "logger.h"
#include "msr.h" #include "msr.h"
#include "objects/channel.h"
#include "objects/process.h" #include "objects/process.h"
#include "objects/system.h" #include "objects/system.h"
#include "objects/thread.h" #include "objects/thread.h"

View File

@@ -1,56 +0,0 @@
#include <j6/errors.h>
#include <j6/flags.h>
#include <j6/types.h>
#include <util/counted.h>
#include "objects/channel.h"
#include "syscalls/helpers.h"
using namespace obj;
namespace syscalls {
j6_status_t
channel_create(j6_handle_t *self)
{
construct_handle<channel>(self);
return j6_status_ok;
}
j6_status_t
channel_send(channel *self, void *data, size_t *data_len)
{
if (self->closed())
return j6_status_closed;
const util::buffer buffer {data, *data_len};
*data_len = self->enqueue(buffer);
return j6_status_ok;
}
j6_status_t
channel_receive(channel *self, void *data, size_t *data_len, uint64_t flags)
{
if (self->closed())
return j6_status_closed;
util::buffer buffer {data, *data_len};
const bool block = flags & j6_channel_block;
*data_len = self->dequeue(buffer, block);
return j6_status_ok;
}
j6_status_t
channel_close(channel *self)
{
if (self->closed())
return j6_status_closed;
self->close();
return j6_status_ok;
}
} // namespace syscalls

View File

@@ -3,6 +3,7 @@
#include <util/spinlock.h> #include <util/spinlock.h>
#include "clock.h" #include "clock.h"
#include "logger.h"
#include "objects/process.h" #include "objects/process.h"
#include "objects/thread.h" #include "objects/thread.h"
#include "vm_space.h" #include "vm_space.h"
@@ -36,8 +37,13 @@ util::spinlock g_futexes_lock;
j6_status_t j6_status_t
futex_wait(const uint32_t *value, uint32_t expected, uint64_t timeout) futex_wait(const uint32_t *value, uint32_t expected, uint64_t timeout)
{ {
if (*value != expected) thread& t = thread::current();
return j6_status_would_block; process &p = t.parent();
if (*value != expected) {
log::spam(logs::syscall, "<%02x:%02x> futex %lx: %x != %x", p.obj_id(), t.obj_id(), value, *value, expected);
return j6_status_futex_changed;
}
uintptr_t address = reinterpret_cast<uintptr_t>(value); uintptr_t address = reinterpret_cast<uintptr_t>(value);
vm_space &space = process::current().space(); vm_space &space = process::current().space();
@@ -46,15 +52,18 @@ futex_wait(const uint32_t *value, uint32_t expected, uint64_t timeout)
util::scoped_lock lock {g_futexes_lock}; util::scoped_lock lock {g_futexes_lock};
futex &f = g_futexes[phys]; futex &f = g_futexes[phys];
thread& t = thread::current();
if (timeout) { if (timeout) {
timeout += clock::get().value(); timeout += clock::get().value();
t.set_wake_timeout(timeout); t.set_wake_timeout(timeout);
} }
log::spam(logs::syscall, "<%02x:%02x> blocking on futex %lx", p.obj_id(), t.obj_id(), value);
lock.release(); lock.release();
f.queue.wait(); f.queue.wait();
log::spam(logs::syscall, "<%02x:%02x> woke on futex %lx", p.obj_id(), t.obj_id(), value);
return j6_status_ok; return j6_status_ok;
} }
@@ -69,10 +78,15 @@ futex_wake(const uint32_t *value, size_t count)
futex *f = g_futexes.find(phys); futex *f = g_futexes.find(phys);
if (f) { if (f) {
if (count) {
for (unsigned i = 0; i < count; ++i) { for (unsigned i = 0; i < count; ++i) {
obj::thread *t = f->queue.pop_next(); obj::thread *t = f->queue.pop_next();
if (!t) break;
t->wake(); t->wake();
} }
} else {
f->queue.clear();
}
if (f->queue.empty()) if (f->queue.empty())
g_futexes.erase(phys); g_futexes.erase(phys);

View File

@@ -70,7 +70,7 @@ mailbox_respond(
return s; return s;
} }
bool block = flags & j6_mailbox_block; bool block = flags & j6_flag_block;
j6_status_t s = self->receive(data, *reply_tag, block); j6_status_t s = self->receive(data, *reply_tag, block);
if (s != j6_status_ok) if (s != j6_status_ok)
return s; return s;

View File

@@ -15,6 +15,9 @@ j6_status_t
vma_create(j6_handle_t *self, size_t size, uint32_t flags) vma_create(j6_handle_t *self, size_t size, uint32_t flags)
{ {
vm_flags f = vm_flags::user_mask & flags; vm_flags f = vm_flags::user_mask & flags;
if (util::bits::has(f, vm_flags::ring))
construct_handle<vm_area_ring>(self, size, f);
else
construct_handle<vm_area_open>(self, size, f); construct_handle<vm_area_open>(self, size, f);
return j6_status_ok; return j6_status_ok;
} }
@@ -22,8 +25,13 @@ vma_create(j6_handle_t *self, size_t size, uint32_t flags)
j6_status_t j6_status_t
vma_create_map(j6_handle_t *self, size_t size, uintptr_t base, uint32_t flags) vma_create_map(j6_handle_t *self, size_t size, uintptr_t base, uint32_t flags)
{ {
vm_area *a = nullptr;
vm_flags f = vm_flags::user_mask & flags; vm_flags f = vm_flags::user_mask & flags;
vm_area *a = construct_handle<vm_area_open>(self, size, f); if (util::bits::has(f, vm_flags::ring))
a = construct_handle<vm_area_ring>(self, size, f);
else
a = construct_handle<vm_area_open>(self, size, f);
process::current().space().add(base, a); process::current().space().add(base, a);
return j6_status_ok; return j6_status_ok;
} }

View File

@@ -0,0 +1,150 @@
// The kernel depends on libj6 for some shared code,
// but should not include the user-specific code.
#ifndef __j6kernel
#include <string.h>
#include <arch/memory.h>
#include <j6/channel.hh>
#include <j6/condition.hh>
#include <j6/errors.h>
#include <j6/flags.h>
#include <j6/mutex.hh>
#include <j6/syscalls.h>
#include <j6/syslog.hh>
#include <util/spinlock.h>
namespace j6 {
static uintptr_t channel_addr = 0x6000'0000;
static util::spinlock addr_spinlock;
struct channel::header
{
size_t size;
size_t read_index;
size_t write_index;
mutex mutex;
condition read_waiting;
condition write_waiting;
uint8_t data[0];
inline const void* read_at() const { return &data[read_index & (size - 1)]; }
inline void* write_at() { return &data[write_index & (size - 1)]; }
inline size_t read_avail() const { return write_index - read_index; }
inline size_t write_avail() const { return size - read_avail(); }
inline void consume(size_t n) { read_index += n; }
inline void commit(size_t n) { write_index += n; }
};
channel *
channel::create(size_t size)
{
j6_status_t result;
j6_handle_t vma = j6_handle_invalid;
if (size < arch::frame_size || (size & (size - 1)) != 0) {
syslog("Bad channel size: %lx", size);
return nullptr;
}
util::scoped_lock lock {addr_spinlock};
uintptr_t addr = channel_addr;
channel_addr += size;
lock.release();
result = j6_vma_create_map(&vma, size, addr, j6_vm_flag_write|j6_vm_flag_ring);
if (result != j6_status_ok) {
syslog("Failed to create channel VMA. Error: %lx", result);
return nullptr;
}
header *h = reinterpret_cast<header*>(addr);
memset(h, 0, sizeof(*h));
h->size = size;
return new channel {vma, h};
}
channel *
channel::open(j6_handle_t vma)
{
j6_status_t result;
util::scoped_lock lock {addr_spinlock};
uintptr_t addr = channel_addr;
result = j6_vma_map(vma, 0, addr);
if (result != j6_status_ok) {
syslog("Failed to map channel VMA. Error: %lx", result);
return nullptr;
}
header *h = reinterpret_cast<header*>(addr);
channel_addr += h->size;
lock.release();
return new channel {vma, h};
}
channel::channel(j6_handle_t vma, header *h) :
m_vma {vma},
m_size {h->size},
m_header {h}
{
}
j6_status_t
channel::send(const void *buffer, size_t len, bool block)
{
if (len > m_header->size)
return j6_err_insufficient;
j6::scoped_lock lock {m_header->mutex};
while (m_header->write_avail() < len) {
if (!block)
return j6_status_would_block;
lock.release();
m_header->write_waiting.wait();
lock.acquire();
}
memcpy(m_header->write_at(), buffer, len);
m_header->commit(len);
m_header->read_waiting.wake();
return j6_status_ok;
}
j6_status_t
channel::receive(void *buffer, size_t *size, bool block)
{
j6::scoped_lock lock {m_header->mutex};
while (!m_header->read_avail()) {
if (!block)
return j6_status_would_block;
lock.release();
m_header->read_waiting.wait();
lock.acquire();
}
size_t avail = m_header->read_avail();
size_t read = *size > avail ? avail : *size;
memcpy(buffer, m_header->read_at(), read);
m_header->consume(read);
m_header->write_waiting.wake();
*size = read;
return j6_status_ok;
}
} // namespace j6
#endif // __j6kernel

View File

@@ -0,0 +1,37 @@
// The kernel depends on libj6 for some shared code,
// but should not include the user-specific code.
#ifndef __j6kernel
#include <j6/condition.hh>
#include <j6/errors.h>
#include <j6/syscalls.h>
#include <j6/syslog.hh>
namespace j6 {
void
condition::wait()
{
j6::syslog("Waiting on condition %lx", this);
uint32_t v = __atomic_add_fetch(&m_state, 1, __ATOMIC_ACQ_REL);
j6_status_t s = j6_futex_wait(&m_state, v, 0);
while (s == j6_status_futex_changed) {
v = m_state;
if (v == 0) break;
s = j6_futex_wait(&m_state, v, 0);
}
j6::syslog("Woke on condition %lx", this);
}
void
condition::wake()
{
uint32_t v = __atomic_exchange_n(&m_state, 0, __ATOMIC_ACQ_REL);
if (v)
j6_futex_wake(&m_state, 0);
}
} // namespace j6
#endif // __j6kernel

View File

@@ -4,6 +4,8 @@ j6 = module("j6",
kind = "lib", kind = "lib",
deps = [ "util" ], deps = [ "util" ],
sources = [ sources = [
"channel.cpp",
"condition.cpp",
"init.cpp", "init.cpp",
"init.s", "init.s",
"mutex.cpp", "mutex.cpp",
@@ -15,6 +17,8 @@ j6 = module("j6",
], ],
public_headers = [ public_headers = [
"j6/cap_flags.h.cog", "j6/cap_flags.h.cog",
"j6/channel.hh",
"j6/condition.hh",
"j6/errors.h", "j6/errors.h",
"j6/flags.h", "j6/flags.h",
"j6/init.h", "j6/init.h",

View File

@@ -0,0 +1,51 @@
#pragma once
/// \file channel.hh
/// High level channel interface
// The kernel depends on libj6 for some shared code,
// but should not include the user-specific code.
#ifndef __j6kernel
#include <stddef.h>
#include <j6/types.h>
namespace j6 {
class channel
{
public:
/// Create a new channel of the given size.
static channel * create(size_t size);
/// Open an existing channel for which we have a VMA handle
static channel * open(j6_handle_t vma);
/// Send data into the channel.
/// \arg buffer The buffer from which to read data
/// \arg len The number of bytes to read from `buffer`
/// \arg block If true, block this thread if there aren't `len` bytes of space available
j6_status_t send(const void *buffer, size_t len, bool block = true);
/// Read data out of the channel.
/// \arg buffer The buffer to receive channel data
/// \arg size [in] The size of `buffer` [out] the amount read into `buffer`
/// \arg block If true, block this thread if there is no data to read yet
j6_status_t receive(void *buffer, size_t *size, bool block = true);
/// Get the VMA handle for sharing with other processes
j6_handle_t handle() const { return m_vma; }
private:
struct header;
channel(j6_handle_t vma, header *h);
j6_handle_t m_vma;
size_t m_size;
header *m_header;
};
} // namespace j6
#endif // __j6kernel

View File

@@ -0,0 +1,28 @@
#pragma once
/// \file condition.hh
/// High level condition interface based on futexes
// The kernel depends on libj6 for some shared code,
// but should not include the user-specific code.
#ifndef __j6kernel
#include <stddef.h>
#include <stdint.h>
namespace j6 {
class condition
{
public:
condition() : m_state {0} {}
void wait();
void wake();
private:
uint32_t m_state;
};
} // namespace j6
#endif // __j6kernel

View File

@@ -15,6 +15,7 @@
#define j6_status_destroyed 0x1001 #define j6_status_destroyed 0x1001
#define j6_status_exists 0x1002 #define j6_status_exists 0x1002
#define j6_status_would_block 0x1003 #define j6_status_would_block 0x1003
#define j6_status_futex_changed 0x1004
#define j6_err_nyi j6_err(0x0001) #define j6_err_nyi j6_err(0x0001)
#define j6_err_unexpected j6_err(0x0002) #define j6_err_unexpected j6_err(0x0002)

View File

@@ -9,10 +9,8 @@ enum j6_vm_flags {
j6_vm_flag_MAX j6_vm_flag_MAX
}; };
enum j6_channel_flags { enum j6_flags {
j6_channel_block = 0x01, j6_flag_block = 0x01,
};
enum j6_mailbox_flags { j6_flags_COUNT // custom per-type flags should start here
j6_mailbox_block = 0x01,
}; };

View File

@@ -33,6 +33,9 @@ public:
m_mutex.unlock(); m_mutex.unlock();
} }
inline void acquire() { m_mutex.lock(); }
inline void release() { m_mutex.unlock(); }
private: private:
mutex &m_mutex; mutex &m_mutex;
}; };

View File

@@ -3,7 +3,7 @@ OBJECT_TYPE( none, 0x00 )
OBJECT_TYPE( system, 0x01 ) OBJECT_TYPE( system, 0x01 )
OBJECT_TYPE( event, 0x02 ) OBJECT_TYPE( event, 0x02 )
OBJECT_TYPE( channel, 0x03 )
OBJECT_TYPE( endpoint, 0x04 ) OBJECT_TYPE( endpoint, 0x04 )
OBJECT_TYPE( mailbox, 0x05 ) OBJECT_TYPE( mailbox, 0x05 )

View File

@@ -1,8 +1,10 @@
VM_FLAG( none, 0x00000000) VM_FLAG( none, 0x00000000 )
VM_FLAG( write, 0x00000001) VM_FLAG( write, 0x00000001 )
VM_FLAG( exec, 0x00000002) VM_FLAG( exec, 0x00000002 )
VM_FLAG( contiguous, 0x00000020) VM_FLAG( contiguous, 0x00000020 )
VM_FLAG( large_pages, 0x00000100) VM_FLAG( large_pages, 0x00000100 )
VM_FLAG( huge_pages, 0x00000200) VM_FLAG( huge_pages, 0x00000200 )
VM_FLAG( write_combine, 0x00001000) VM_FLAG( write_combine, 0x00001000 )
VM_FLAG( mmio, 0x00010000) VM_FLAG( ring, 0x00002000 )
VM_FLAG( mmio, 0x00010000 )