mirror of
https://github.com/justinian/jsix.git
synced 2025-12-10 00:14:32 -08:00
[kernel] Add channel objects
Add the channel object for sending messages between threads. Currently no good of passing channels to other threads, but global variables in a single process work. Currently channels are slow and do double copies, need to refine more. Tags: ipc
This commit is contained in:
@@ -5,6 +5,10 @@
|
||||
#include "j6/errors.h"
|
||||
#include "j6/signals.h"
|
||||
|
||||
const char message[] = "Hello! This is a message being sent over a channel!";
|
||||
char inbuf[1024];
|
||||
j6_handle_t chan = j6_handle_invalid;
|
||||
|
||||
extern "C" {
|
||||
j6_status_t system_log(const char *msg);
|
||||
|
||||
@@ -17,6 +21,11 @@ extern "C" {
|
||||
j6_status_t thread_sleep(uint64_t til);
|
||||
j6_status_t thread_exit(int64_t status);
|
||||
|
||||
j6_status_t channel_create(j6_handle_t *handle);
|
||||
j6_status_t channel_close(j6_handle_t handle);
|
||||
j6_status_t channel_send(j6_handle_t handle, size_t len, void *data);
|
||||
j6_status_t channel_receive(j6_handle_t handle, size_t *len, void *data);
|
||||
|
||||
int main(int, const char **);
|
||||
}
|
||||
|
||||
@@ -24,6 +33,13 @@ void
|
||||
thread_proc()
|
||||
{
|
||||
system_log("sub thread starting");
|
||||
|
||||
j6_status_t result = channel_send(chan, sizeof(message), (void*)message);
|
||||
if (result != j6_status_ok)
|
||||
thread_exit(result);
|
||||
|
||||
system_log("sub thread sent on channel");
|
||||
|
||||
for (int i = 1; i < 5; ++i)
|
||||
thread_sleep(i*10);
|
||||
|
||||
@@ -34,21 +50,49 @@ thread_proc()
|
||||
int
|
||||
main(int argc, const char **argv)
|
||||
{
|
||||
j6_handle_t child = 0;
|
||||
j6_handle_t child = j6_handle_invalid;
|
||||
j6_signal_t out = 0;
|
||||
|
||||
system_log("main thread starting");
|
||||
|
||||
j6_status_t result = thread_create(&thread_proc, &child);
|
||||
j6_status_t result = channel_create(&chan);
|
||||
if (result != j6_status_ok)
|
||||
return result;
|
||||
|
||||
system_log("main thread created channel");
|
||||
|
||||
result = thread_create(&thread_proc, &child);
|
||||
if (result != j6_status_ok)
|
||||
return result;
|
||||
|
||||
result = object_wait(chan, j6_signal_channel_can_recv, &out);
|
||||
if (result != j6_status_ok)
|
||||
return result;
|
||||
|
||||
size_t len = sizeof(inbuf);
|
||||
result = channel_receive(chan, &len, (void*)inbuf);
|
||||
if (result != j6_status_ok)
|
||||
return result;
|
||||
|
||||
for (int i = 0; i < sizeof(message); ++i) {
|
||||
if (inbuf[i] != message[i])
|
||||
return 127;
|
||||
}
|
||||
|
||||
system_log("main thread received on channel");
|
||||
|
||||
system_log("main thread waiting on child");
|
||||
|
||||
result = object_wait(child, -1ull, &out);
|
||||
if (result != j6_status_ok)
|
||||
return result;
|
||||
|
||||
result = channel_close(chan);
|
||||
if (result != j6_status_ok)
|
||||
return result;
|
||||
|
||||
system_log("main thread closed channel");
|
||||
|
||||
system_log("main thread done, exiting");
|
||||
return 0;
|
||||
}
|
||||
|
||||
@@ -30,6 +30,10 @@ SYSCALL thread_create, 0x19
|
||||
SYSCALL thread_exit, 0x1a
|
||||
SYSCALL thread_pause, 0x1b
|
||||
SYSCALL thread_sleep, 0x1c
|
||||
SYSCALL channel_create, 0x20
|
||||
SYSCALL channel_close, 0x21
|
||||
SYSCALL channel_send, 0x22
|
||||
SYSCALL channel_receive, 0x23
|
||||
|
||||
global _start
|
||||
_start:
|
||||
|
||||
@@ -8,9 +8,12 @@
|
||||
|
||||
#define j6_status_ok 0x0000
|
||||
|
||||
#define j6_status_closed 0x1000
|
||||
#define j6_status_destroyed 0x1001
|
||||
|
||||
#define j6_err_nyi j6_err(0x0001)
|
||||
#define j6_err_unexpected j6_err(0x0002)
|
||||
#define j6_err_invalid_arg j6_err(0x0003)
|
||||
#define j6_err_not_ready j6_err(0x0004)
|
||||
#define j6_err_insufficient j6_err(0x0005)
|
||||
|
||||
|
||||
@@ -13,6 +13,11 @@
|
||||
// Thread signals
|
||||
#define j6_signal_thread_exit (1ull << 16)
|
||||
|
||||
// Channel signals
|
||||
#define j6_signal_channel_closed (1ull << 16)
|
||||
#define j6_signal_channel_can_send (1ull << 17)
|
||||
#define j6_signal_channel_can_recv (1ull << 18)
|
||||
|
||||
// Signals 48-63 are user-defined signals
|
||||
#define j6_signal_user0 (1ull << 48)
|
||||
#define j6_signal_user1 (1ull << 49)
|
||||
|
||||
70
src/kernel/objects/channel.cpp
Normal file
70
src/kernel/objects/channel.cpp
Normal file
@@ -0,0 +1,70 @@
|
||||
#include "kutil/assert.h"
|
||||
|
||||
#include "objects/channel.h"
|
||||
|
||||
channel::channel() :
|
||||
m_len(0),
|
||||
m_capacity(0),
|
||||
m_data(nullptr),
|
||||
kobject(kobject::type::channel, j6_signal_channel_can_send)
|
||||
{
|
||||
}
|
||||
|
||||
channel::~channel()
|
||||
{
|
||||
kutil::kfree(m_data);
|
||||
}
|
||||
|
||||
j6_status_t
|
||||
channel::enqueue(size_t len, void *data)
|
||||
{
|
||||
// TODO: Make this thread safe!
|
||||
if (closed())
|
||||
return j6_status_closed;
|
||||
|
||||
if (!can_send())
|
||||
return j6_err_not_ready;
|
||||
|
||||
if (m_capacity < len) {
|
||||
kutil::kfree(m_data);
|
||||
m_data = kutil::kalloc(len);
|
||||
m_capacity = len;
|
||||
kassert(m_data, "Failed to allocate memory to copy channel message");
|
||||
}
|
||||
|
||||
m_len = len;
|
||||
kutil::memcpy(m_data, data, len);
|
||||
assert_signal(j6_signal_channel_can_recv);
|
||||
|
||||
return j6_status_ok;
|
||||
}
|
||||
|
||||
j6_status_t
|
||||
channel::dequeue(size_t *len, void *data)
|
||||
{
|
||||
// TODO: Make this thread safe!
|
||||
if (closed())
|
||||
return j6_status_closed;
|
||||
|
||||
if (!can_receive())
|
||||
return j6_err_not_ready;
|
||||
|
||||
if (!len)
|
||||
return j6_err_invalid_arg;
|
||||
|
||||
if (*len < m_len)
|
||||
return j6_err_insufficient;
|
||||
|
||||
kutil::memcpy(data, m_data, m_len);
|
||||
*len = m_len;
|
||||
assert_signal(j6_signal_channel_can_send);
|
||||
|
||||
return j6_status_ok;
|
||||
}
|
||||
|
||||
void
|
||||
channel::on_no_handles()
|
||||
{
|
||||
kobject::on_no_handles();
|
||||
delete this;
|
||||
}
|
||||
49
src/kernel/objects/channel.h
Normal file
49
src/kernel/objects/channel.h
Normal file
@@ -0,0 +1,49 @@
|
||||
#pragma once
|
||||
/// \file channel.h
|
||||
/// Definition of channel objects and related functions
|
||||
|
||||
#include "j6/signals.h"
|
||||
#include "objects/kobject.h"
|
||||
|
||||
/// Channels are bi-directional means of sending messages
|
||||
class channel :
|
||||
public kobject
|
||||
{
|
||||
public:
|
||||
channel();
|
||||
virtual ~channel();
|
||||
|
||||
/// Check if the channel has space for a message to be sent
|
||||
inline bool can_send() const { return check_signal(j6_signal_channel_can_send); }
|
||||
|
||||
/// Check if the channel has a message wiating already
|
||||
inline bool can_receive() const { return check_signal(j6_signal_channel_can_recv); }
|
||||
|
||||
/// Put a message into the channel
|
||||
/// \arg len Length of data, in bytes
|
||||
/// \arg data Pointer to the message data
|
||||
/// \returns j6_status_ok on success
|
||||
j6_status_t enqueue(size_t len, void *data);
|
||||
|
||||
/// Get a message from the channel, copied into a provided buffer
|
||||
/// \arg len On input, the size of the provided buffer. On output,
|
||||
/// the size of the message copied into the buffer.
|
||||
/// \arg data Pointer to the buffer
|
||||
/// \returns j6_status_ok on success
|
||||
j6_status_t dequeue(size_t *len, void *data);
|
||||
|
||||
/// Mark this channel as closed, all future calls to enqueue or
|
||||
/// dequeue messages will fail with j6_status_closed.
|
||||
inline void close() { assert_signal(j6_signal_channel_closed); }
|
||||
|
||||
/// Check if this channel has been closed
|
||||
inline bool closed() { return check_signal(j6_signal_channel_closed); }
|
||||
|
||||
protected:
|
||||
virtual void on_no_handles() override;
|
||||
|
||||
private:
|
||||
size_t m_len;
|
||||
size_t m_capacity;
|
||||
void *m_data;
|
||||
};
|
||||
@@ -19,6 +19,7 @@ public:
|
||||
|
||||
event,
|
||||
eventpair,
|
||||
channel,
|
||||
|
||||
vms,
|
||||
vmo,
|
||||
@@ -55,6 +56,10 @@ public:
|
||||
/// \arg s The set of signals to deassert
|
||||
void deassert_signal(j6_signal_t s);
|
||||
|
||||
/// Check if the given signals are set on this object
|
||||
/// \arg s The set of signals to check
|
||||
inline bool check_signal(j6_signal_t s) const { return (m_signals & s) == s; }
|
||||
|
||||
/// Increment the handle refcount
|
||||
inline void handle_retain() { ++m_handle_count; }
|
||||
|
||||
|
||||
@@ -12,3 +12,7 @@ SYSCALL(0x1a, thread_exit, int64_t)
|
||||
SYSCALL(0x1b, thread_pause, void)
|
||||
SYSCALL(0x1c, thread_sleep, uint64_t)
|
||||
|
||||
SYSCALL(0x20, channel_create, j6_handle_t *)
|
||||
SYSCALL(0x21, channel_close, j6_handle_t)
|
||||
SYSCALL(0x22, channel_send, j6_handle_t, size_t, void *)
|
||||
SYSCALL(0x23, channel_receive, j6_handle_t, size_t *, void *)
|
||||
|
||||
76
src/kernel/syscalls/channel.cpp
Normal file
76
src/kernel/syscalls/channel.cpp
Normal file
@@ -0,0 +1,76 @@
|
||||
#include "j6/errors.h"
|
||||
#include "j6/types.h"
|
||||
|
||||
#include "objects/channel.h"
|
||||
#include "objects/thread.h"
|
||||
#include "objects/process.h"
|
||||
#include "scheduler.h"
|
||||
|
||||
namespace syscalls {
|
||||
|
||||
j6_status_t
|
||||
channel_create(j6_handle_t *handle)
|
||||
{
|
||||
scheduler &s = scheduler::get();
|
||||
TCB *tcb = s.current();
|
||||
thread *parent = thread::from_tcb(tcb);
|
||||
process &p = parent->parent();
|
||||
|
||||
channel *c = new channel;
|
||||
*handle = p.add_handle(c);
|
||||
|
||||
return j6_status_ok;
|
||||
}
|
||||
|
||||
j6_status_t
|
||||
channel_close(j6_handle_t handle)
|
||||
{
|
||||
scheduler &s = scheduler::get();
|
||||
TCB *tcb = s.current();
|
||||
thread *parent = thread::from_tcb(tcb);
|
||||
process &p = parent->parent();
|
||||
|
||||
kobject *o = p.lookup_handle(handle);
|
||||
if (!o || o->get_type() != kobject::type::channel)
|
||||
return j6_err_invalid_arg;
|
||||
|
||||
p.remove_handle(handle);
|
||||
channel *c = static_cast<channel*>(o);
|
||||
c->close();
|
||||
|
||||
return j6_status_ok;
|
||||
}
|
||||
|
||||
j6_status_t
|
||||
channel_send(j6_handle_t handle, size_t len, void *data)
|
||||
{
|
||||
scheduler &s = scheduler::get();
|
||||
TCB *tcb = s.current();
|
||||
thread *parent = thread::from_tcb(tcb);
|
||||
process &p = parent->parent();
|
||||
|
||||
kobject *o = p.lookup_handle(handle);
|
||||
if (!o || o->get_type() != kobject::type::channel)
|
||||
return j6_err_invalid_arg;
|
||||
|
||||
channel *c = static_cast<channel*>(o);
|
||||
return c->enqueue(len, data);
|
||||
}
|
||||
|
||||
j6_status_t
|
||||
channel_receive(j6_handle_t handle, size_t *len, void *data)
|
||||
{
|
||||
scheduler &s = scheduler::get();
|
||||
TCB *tcb = s.current();
|
||||
thread *parent = thread::from_tcb(tcb);
|
||||
process &p = parent->parent();
|
||||
|
||||
kobject *o = p.lookup_handle(handle);
|
||||
if (!o || o->get_type() != kobject::type::channel)
|
||||
return j6_err_invalid_arg;
|
||||
|
||||
channel *c = static_cast<channel*>(o);
|
||||
return c->dequeue(len, data);
|
||||
}
|
||||
|
||||
} // namespace syscalls
|
||||
Reference in New Issue
Block a user