[kernel] Add a timeout to endpoint recieve syscalls
This also required adding support for multiple wait conditions on a thread, so wait_type is an enum_bitfield now. I really need a real clock.
This commit is contained in:
@@ -18,6 +18,7 @@ object endpoint : kobject {
|
|||||||
method receive {
|
method receive {
|
||||||
param tag uint64 [out]
|
param tag uint64 [out]
|
||||||
param data buffer [out]
|
param data buffer [out]
|
||||||
|
param timeout uint64 # Receive timeout in nanoseconds
|
||||||
}
|
}
|
||||||
|
|
||||||
# Send a message on a channel and then await a new message.
|
# Send a message on a channel and then await a new message.
|
||||||
@@ -26,5 +27,6 @@ object endpoint : kobject {
|
|||||||
method sendrecv {
|
method sendrecv {
|
||||||
param tag uint64 [inout]
|
param tag uint64 [inout]
|
||||||
param data buffer [inout]
|
param data buffer [inout]
|
||||||
|
param timeout uint64 # Receive timeout in nanoseconds
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -2,6 +2,7 @@
|
|||||||
#include "objects/endpoint.h"
|
#include "objects/endpoint.h"
|
||||||
#include "objects/process.h"
|
#include "objects/process.h"
|
||||||
#include "objects/thread.h"
|
#include "objects/thread.h"
|
||||||
|
#include "scheduler.h"
|
||||||
#include "vm_space.h"
|
#include "vm_space.h"
|
||||||
|
|
||||||
endpoint::endpoint() :
|
endpoint::endpoint() :
|
||||||
@@ -55,16 +56,20 @@ endpoint::send(j6_tag_t tag, const void *data, size_t data_len)
|
|||||||
}
|
}
|
||||||
|
|
||||||
j6_status_t
|
j6_status_t
|
||||||
endpoint::receive(j6_tag_t *tag, void *data, size_t *data_len)
|
endpoint::receive(j6_tag_t *tag, void *data, size_t *data_len, uint64_t timeout)
|
||||||
{
|
{
|
||||||
thread_data receiver = { &thread::current(), data };
|
thread_data receiver = { &thread::current(), data };
|
||||||
receiver.tag_p = tag;
|
receiver.tag_p = tag;
|
||||||
receiver.len_p = data_len;
|
receiver.len_p = data_len;
|
||||||
|
|
||||||
|
// Timeout is a duration, but wait_on_* calls need a time
|
||||||
|
if (timeout)
|
||||||
|
timeout += scheduler::get().clock();
|
||||||
|
|
||||||
if (!check_signal(j6_signal_endpoint_can_recv)) {
|
if (!check_signal(j6_signal_endpoint_can_recv)) {
|
||||||
assert_signal(j6_signal_endpoint_can_send);
|
assert_signal(j6_signal_endpoint_can_send);
|
||||||
m_blocked.append(receiver);
|
m_blocked.append(receiver);
|
||||||
receiver.th->wait_on_object(this);
|
receiver.th->wait_on_object(this, timeout);
|
||||||
|
|
||||||
// we woke up having already finished the recv
|
// we woke up having already finished the recv
|
||||||
// because it happened in the sender
|
// because it happened in the sender
|
||||||
|
|||||||
@@ -36,11 +36,12 @@ public:
|
|||||||
|
|
||||||
/// Receive a message from a thread waiting to send on this endpoint. If no threads
|
/// Receive a message from a thread waiting to send on this endpoint. If no threads
|
||||||
/// are currently trying to send, block the current thread.
|
/// are currently trying to send, block the current thread.
|
||||||
/// \arg tag [in] The sender-specified message tag
|
/// \arg tag [in] The sender-specified message tag
|
||||||
/// \arg len [in] The size in bytes of the buffer [out] Number of bytes in the message
|
/// \arg len [in] The size in bytes of the buffer [out] Number of bytes in the message
|
||||||
/// \arg data Buffer for copying message data into
|
/// \arg data Buffer for copying message data into
|
||||||
/// \returns j6_status_ok on success
|
/// \arg timeout Receive timeout in nanoseconds
|
||||||
j6_status_t receive(j6_tag_t *tag, void *data, size_t *data_len);
|
/// \returns j6_status_ok on success
|
||||||
|
j6_status_t receive(j6_tag_t *tag, void *data, size_t *data_len, uint64_t timeout = 0);
|
||||||
|
|
||||||
/// Give the listener on the endpoint a message that a bound IRQ has been signalled
|
/// Give the listener on the endpoint a message that a bound IRQ has been signalled
|
||||||
/// \arg irq The IRQ that caused this signal
|
/// \arg irq The IRQ that caused this signal
|
||||||
|
|||||||
@@ -47,37 +47,52 @@ inline void schedule_if_current(thread *t) { if (t == current_cpu().thread) sche
|
|||||||
void
|
void
|
||||||
thread::wait_on_signals(j6_signal_t signals)
|
thread::wait_on_signals(j6_signal_t signals)
|
||||||
{
|
{
|
||||||
m_wait_type = wait_type::signal;
|
{
|
||||||
m_wait_data = signals;
|
util::scoped_lock {m_wait_lock};
|
||||||
clear_state(state::ready);
|
m_wait_type = wait_type::signal;
|
||||||
|
m_wait_data = signals;
|
||||||
|
clear_state(state::ready);
|
||||||
|
}
|
||||||
schedule_if_current(this);
|
schedule_if_current(this);
|
||||||
}
|
}
|
||||||
|
|
||||||
void
|
void
|
||||||
thread::wait_on_time(uint64_t t)
|
thread::wait_on_time(uint64_t t)
|
||||||
{
|
{
|
||||||
m_wait_type = wait_type::time;
|
{
|
||||||
m_wait_data = t;
|
util::scoped_lock {m_wait_lock};
|
||||||
clear_state(state::ready);
|
m_wait_type = wait_type::time;
|
||||||
|
m_wait_time = t;
|
||||||
|
clear_state(state::ready);
|
||||||
|
}
|
||||||
schedule_if_current(this);
|
schedule_if_current(this);
|
||||||
}
|
}
|
||||||
|
|
||||||
void
|
void
|
||||||
thread::wait_on_object(kobject *o)
|
thread::wait_on_object(kobject *o, uint64_t t)
|
||||||
{
|
{
|
||||||
m_wait_type = wait_type::object;
|
{
|
||||||
m_wait_data = reinterpret_cast<uint64_t>(o);
|
util::scoped_lock {m_wait_lock};
|
||||||
clear_state(state::ready);
|
|
||||||
|
|
||||||
|
m_wait_type = wait_type::object;
|
||||||
|
m_wait_data = reinterpret_cast<uint64_t>(o);
|
||||||
|
|
||||||
|
if (t) {
|
||||||
|
m_wait_type |= wait_type::time;
|
||||||
|
m_wait_time = t;
|
||||||
|
}
|
||||||
|
|
||||||
|
clear_state(state::ready);
|
||||||
|
}
|
||||||
schedule_if_current(this);
|
schedule_if_current(this);
|
||||||
}
|
}
|
||||||
|
|
||||||
bool
|
bool
|
||||||
thread::wake_on_signals(kobject *obj, j6_signal_t signals)
|
thread::wake_on_signals(kobject *obj, j6_signal_t signals)
|
||||||
{
|
{
|
||||||
if (m_wait_type != wait_type::signal ||
|
util::scoped_lock {m_wait_lock};
|
||||||
|
|
||||||
|
if (!(m_wait_type & wait_type::signal) ||
|
||||||
(signals & m_wait_data) == 0)
|
(signals & m_wait_data) == 0)
|
||||||
return false;
|
return false;
|
||||||
|
|
||||||
@@ -92,12 +107,18 @@ thread::wake_on_signals(kobject *obj, j6_signal_t signals)
|
|||||||
bool
|
bool
|
||||||
thread::wake_on_time(uint64_t now)
|
thread::wake_on_time(uint64_t now)
|
||||||
{
|
{
|
||||||
if (m_wait_type != wait_type::time ||
|
util::scoped_lock {m_wait_lock};
|
||||||
now < m_wait_data)
|
|
||||||
|
if (!(m_wait_type & wait_type::time) ||
|
||||||
|
now < m_wait_time)
|
||||||
return false;
|
return false;
|
||||||
|
|
||||||
|
if (!(m_wait_type & ~wait_type::none))
|
||||||
|
m_wait_result = j6_status_ok;
|
||||||
|
else
|
||||||
|
m_wait_result = j6_err_timed_out;
|
||||||
|
|
||||||
m_wait_type = wait_type::none;
|
m_wait_type = wait_type::none;
|
||||||
m_wait_result = j6_status_ok;
|
|
||||||
m_wait_data = now;
|
m_wait_data = now;
|
||||||
m_wait_obj = 0;
|
m_wait_obj = 0;
|
||||||
set_state(state::ready);
|
set_state(state::ready);
|
||||||
@@ -107,7 +128,9 @@ thread::wake_on_time(uint64_t now)
|
|||||||
bool
|
bool
|
||||||
thread::wake_on_object(kobject *o)
|
thread::wake_on_object(kobject *o)
|
||||||
{
|
{
|
||||||
if (m_wait_type != wait_type::object ||
|
util::scoped_lock {m_wait_lock};
|
||||||
|
|
||||||
|
if (!(m_wait_type & wait_type::object) ||
|
||||||
reinterpret_cast<uint64_t>(o) != m_wait_data)
|
reinterpret_cast<uint64_t>(o) != m_wait_data)
|
||||||
return false;
|
return false;
|
||||||
|
|
||||||
@@ -121,6 +144,8 @@ thread::wake_on_object(kobject *o)
|
|||||||
void
|
void
|
||||||
thread::wake_on_result(kobject *obj, j6_status_t result)
|
thread::wake_on_result(kobject *obj, j6_status_t result)
|
||||||
{
|
{
|
||||||
|
util::scoped_lock {m_wait_lock};
|
||||||
|
|
||||||
m_wait_type = wait_type::none;
|
m_wait_type = wait_type::none;
|
||||||
m_wait_result = result;
|
m_wait_result = result;
|
||||||
m_wait_data = 0;
|
m_wait_data = 0;
|
||||||
|
|||||||
@@ -2,7 +2,9 @@
|
|||||||
/// \file thread.h
|
/// \file thread.h
|
||||||
/// Definition of thread kobject types
|
/// Definition of thread kobject types
|
||||||
|
|
||||||
|
#include <util/enum_bitfields.h>
|
||||||
#include <util/linked_list.h>
|
#include <util/linked_list.h>
|
||||||
|
#include <util/spinlock.h>
|
||||||
|
|
||||||
#include "objects/kobject.h"
|
#include "objects/kobject.h"
|
||||||
|
|
||||||
@@ -36,11 +38,20 @@ struct TCB
|
|||||||
using tcb_list = util::linked_list<TCB>;
|
using tcb_list = util::linked_list<TCB>;
|
||||||
using tcb_node = tcb_list::item_type;
|
using tcb_node = tcb_list::item_type;
|
||||||
|
|
||||||
|
enum class wait_type : uint8_t
|
||||||
|
{
|
||||||
|
none = 0x00,
|
||||||
|
signal = 0x01,
|
||||||
|
time = 0x02,
|
||||||
|
object = 0x04,
|
||||||
|
};
|
||||||
|
is_bitfield(wait_type);
|
||||||
|
|
||||||
class thread :
|
class thread :
|
||||||
public kobject
|
public kobject
|
||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
enum class wait_type : uint8_t { none, signal, time, object };
|
|
||||||
enum class state : uint8_t {
|
enum class state : uint8_t {
|
||||||
ready = 0x01,
|
ready = 0x01,
|
||||||
loading = 0x02,
|
loading = 0x02,
|
||||||
@@ -85,8 +96,9 @@ public:
|
|||||||
void wait_on_time(uint64_t t);
|
void wait_on_time(uint64_t t);
|
||||||
|
|
||||||
/// Block the thread, waiting on the given object
|
/// Block the thread, waiting on the given object
|
||||||
/// \arg o The ojbect that should wake this thread
|
/// \arg o The object that should wake this thread
|
||||||
void wait_on_object(kobject *o);
|
/// \arg t The timeout clock value to wait for
|
||||||
|
void wait_on_object(kobject *o, uint64_t t = 0);
|
||||||
|
|
||||||
/// Wake the thread if it is waiting on signals.
|
/// Wake the thread if it is waiting on signals.
|
||||||
/// \arg obj Object that changed signals
|
/// \arg obj Object that changed signals
|
||||||
@@ -184,8 +196,10 @@ private:
|
|||||||
int32_t m_return_code;
|
int32_t m_return_code;
|
||||||
|
|
||||||
uint64_t m_wait_data;
|
uint64_t m_wait_data;
|
||||||
|
uint64_t m_wait_time;
|
||||||
j6_status_t m_wait_result;
|
j6_status_t m_wait_result;
|
||||||
j6_koid_t m_wait_obj;
|
j6_koid_t m_wait_obj;
|
||||||
|
util::spinlock m_wait_lock;
|
||||||
|
|
||||||
j6_handle_t m_self_handle;
|
j6_handle_t m_self_handle;
|
||||||
};
|
};
|
||||||
|
|||||||
@@ -72,6 +72,8 @@ public:
|
|||||||
/// \arg t The new thread's TCB
|
/// \arg t The new thread's TCB
|
||||||
void add_thread(TCB *t);
|
void add_thread(TCB *t);
|
||||||
|
|
||||||
|
uint64_t clock() const { return m_clock; }
|
||||||
|
|
||||||
/// Get a reference to the scheduler
|
/// Get a reference to the scheduler
|
||||||
/// \returns A reference to the global system scheduler
|
/// \returns A reference to the global system scheduler
|
||||||
static scheduler & get() { return *s_instance; }
|
static scheduler & get() { return *s_instance; }
|
||||||
|
|||||||
@@ -27,7 +27,7 @@ endpoint_send(j6_handle_t handle, uint64_t tag, const void * data, size_t data_l
|
|||||||
}
|
}
|
||||||
|
|
||||||
j6_status_t
|
j6_status_t
|
||||||
endpoint_receive(j6_handle_t handle, uint64_t * tag, void * data, size_t * data_len)
|
endpoint_receive(j6_handle_t handle, uint64_t * tag, void * data, size_t * data_len, uint64_t timeout)
|
||||||
{
|
{
|
||||||
if (!tag || !data_len || (*data_len && !data))
|
if (!tag || !data_len || (*data_len && !data))
|
||||||
return j6_err_invalid_arg;
|
return j6_err_invalid_arg;
|
||||||
@@ -37,14 +37,14 @@ endpoint_receive(j6_handle_t handle, uint64_t * tag, void * data, size_t * data_
|
|||||||
|
|
||||||
j6_tag_t out_tag = j6_tag_invalid;
|
j6_tag_t out_tag = j6_tag_invalid;
|
||||||
size_t out_len = *data_len;
|
size_t out_len = *data_len;
|
||||||
j6_status_t s = e->receive(&out_tag, data, &out_len);
|
j6_status_t s = e->receive(&out_tag, data, &out_len, timeout);
|
||||||
*tag = out_tag;
|
*tag = out_tag;
|
||||||
*data_len = out_len;
|
*data_len = out_len;
|
||||||
return s;
|
return s;
|
||||||
}
|
}
|
||||||
|
|
||||||
j6_status_t
|
j6_status_t
|
||||||
endpoint_sendrecv(j6_handle_t handle, uint64_t * tag, void * data, size_t * data_len)
|
endpoint_sendrecv(j6_handle_t handle, uint64_t * tag, void * data, size_t * data_len, uint64_t timeout)
|
||||||
{
|
{
|
||||||
if (!tag || (*tag & j6_tag_system_flag))
|
if (!tag || (*tag & j6_tag_system_flag))
|
||||||
return j6_err_invalid_arg;
|
return j6_err_invalid_arg;
|
||||||
@@ -58,7 +58,7 @@ endpoint_sendrecv(j6_handle_t handle, uint64_t * tag, void * data, size_t * data
|
|||||||
|
|
||||||
j6_tag_t out_tag = j6_tag_invalid;
|
j6_tag_t out_tag = j6_tag_invalid;
|
||||||
size_t out_len = *data_len;
|
size_t out_len = *data_len;
|
||||||
j6_status_t s = e->receive(&out_tag, data, &out_len);
|
j6_status_t s = e->receive(&out_tag, data, &out_len, timeout);
|
||||||
*tag = out_tag;
|
*tag = out_tag;
|
||||||
*data_len = out_len;
|
*data_len = out_len;
|
||||||
return s;
|
return s;
|
||||||
|
|||||||
@@ -17,4 +17,5 @@
|
|||||||
#define j6_err_invalid_arg j6_err(0x0003)
|
#define j6_err_invalid_arg j6_err(0x0003)
|
||||||
#define j6_err_not_ready j6_err(0x0004)
|
#define j6_err_not_ready j6_err(0x0004)
|
||||||
#define j6_err_insufficient j6_err(0x0005)
|
#define j6_err_insufficient j6_err(0x0005)
|
||||||
|
#define j6_err_timed_out j6_err(0x0006)
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user