mirror of
https://github.com/justinian/jsix.git
synced 2025-12-10 00:14:32 -08:00
[kernel] Allow blocking on empty channels
This commit adds a new flag, j6_channel_block, and a new flags param to the channel_receive syscall. When the block flag is specified, the caller will block waiting for data on the channel if the channel is empty.
This commit is contained in:
@@ -16,5 +16,6 @@ object channel : object {
|
|||||||
|
|
||||||
method receive [cap:receive] {
|
method receive [cap:receive] {
|
||||||
param data buffer [out]
|
param data buffer [out]
|
||||||
|
param flags uint64
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -3,6 +3,7 @@
|
|||||||
#include "assert.h"
|
#include "assert.h"
|
||||||
#include "memory.h"
|
#include "memory.h"
|
||||||
#include "objects/channel.h"
|
#include "objects/channel.h"
|
||||||
|
#include "objects/thread.h"
|
||||||
#include "objects/vm_area.h"
|
#include "objects/vm_area.h"
|
||||||
|
|
||||||
extern obj::vm_area_guarded g_kernel_buffers;
|
extern obj::vm_area_guarded g_kernel_buffers;
|
||||||
@@ -12,10 +13,11 @@ namespace obj {
|
|||||||
constexpr size_t buffer_bytes = mem::kernel_buffer_pages * mem::frame_size;
|
constexpr size_t buffer_bytes = mem::kernel_buffer_pages * mem::frame_size;
|
||||||
|
|
||||||
channel::channel() :
|
channel::channel() :
|
||||||
m_len(0),
|
m_len {0},
|
||||||
m_data(g_kernel_buffers.get_section()),
|
m_data {g_kernel_buffers.get_section()},
|
||||||
m_buffer(reinterpret_cast<uint8_t*>(m_data), buffer_bytes),
|
m_closed {false},
|
||||||
kobject(kobject::type::channel)
|
m_buffer {reinterpret_cast<uint8_t*>(m_data), buffer_bytes},
|
||||||
|
kobject {kobject::type::channel}
|
||||||
{
|
{
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -33,24 +35,23 @@ channel::enqueue(const util::buffer &data)
|
|||||||
|
|
||||||
size_t len = data.count;
|
size_t len = data.count;
|
||||||
void *buffer = nullptr;
|
void *buffer = nullptr;
|
||||||
size_t avail = m_buffer.reserve(len, &buffer);
|
len = m_buffer.reserve(len, &buffer);
|
||||||
|
|
||||||
len = len > avail ? avail : len;
|
|
||||||
|
|
||||||
memcpy(buffer, data.pointer, len);
|
memcpy(buffer, data.pointer, len);
|
||||||
m_buffer.commit(len);
|
m_buffer.commit(len);
|
||||||
|
|
||||||
if (len)
|
if (len) {
|
||||||
m_can_recv = true;
|
thread *t = m_queue.pop_next();
|
||||||
|
|
||||||
if (m_buffer.free_space() == 0)
|
lock.release();
|
||||||
m_can_send = false;
|
if (t) t->wake();
|
||||||
|
}
|
||||||
|
|
||||||
return len;
|
return len;
|
||||||
}
|
}
|
||||||
|
|
||||||
size_t
|
size_t
|
||||||
channel::dequeue(util::buffer buffer)
|
channel::dequeue(util::buffer buffer, bool block)
|
||||||
{
|
{
|
||||||
util::scoped_lock lock {m_close_lock};
|
util::scoped_lock lock {m_close_lock};
|
||||||
|
|
||||||
@@ -58,17 +59,21 @@ channel::dequeue(util::buffer buffer)
|
|||||||
|
|
||||||
void *data = nullptr;
|
void *data = nullptr;
|
||||||
size_t avail = m_buffer.get_block(&data);
|
size_t avail = m_buffer.get_block(&data);
|
||||||
|
if (!avail && block) {
|
||||||
|
thread &cur = thread::current();
|
||||||
|
m_queue.add_thread(&cur);
|
||||||
|
|
||||||
|
lock.release();
|
||||||
|
cur.block();
|
||||||
|
lock.reacquire();
|
||||||
|
avail = m_buffer.get_block(&data);
|
||||||
|
}
|
||||||
|
|
||||||
size_t len = buffer.count > avail ? avail : buffer.count;
|
size_t len = buffer.count > avail ? avail : buffer.count;
|
||||||
|
|
||||||
memcpy(data, buffer.pointer, len);
|
memcpy(buffer.pointer, data, len);
|
||||||
m_buffer.consume(len);
|
m_buffer.consume(len);
|
||||||
|
|
||||||
if (len)
|
|
||||||
m_can_send = true;
|
|
||||||
|
|
||||||
if (m_buffer.size() == 0)
|
|
||||||
m_can_recv = false;
|
|
||||||
|
|
||||||
return len;
|
return len;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -8,6 +8,7 @@
|
|||||||
#include <util/spinlock.h>
|
#include <util/spinlock.h>
|
||||||
|
|
||||||
#include "objects/kobject.h"
|
#include "objects/kobject.h"
|
||||||
|
#include "wait_queue.h"
|
||||||
|
|
||||||
namespace obj {
|
namespace obj {
|
||||||
|
|
||||||
@@ -24,12 +25,6 @@ public:
|
|||||||
|
|
||||||
static constexpr kobject::type type = kobject::type::channel;
|
static constexpr kobject::type type = kobject::type::channel;
|
||||||
|
|
||||||
/// Check if the channel has space for a message to be sent
|
|
||||||
inline bool can_send() const { return m_can_send; }
|
|
||||||
|
|
||||||
/// Check if the channel has a message wiating already
|
|
||||||
inline bool can_receive() const { return m_can_recv; }
|
|
||||||
|
|
||||||
/// Put a message into the channel
|
/// Put a message into the channel
|
||||||
/// \arg data Buffer of data to write
|
/// \arg data Buffer of data to write
|
||||||
/// \returns The number of bytes successfully written
|
/// \returns The number of bytes successfully written
|
||||||
@@ -37,8 +32,9 @@ public:
|
|||||||
|
|
||||||
/// Get a message from the channel, copied into a provided buffer
|
/// Get a message from the channel, copied into a provided buffer
|
||||||
/// \arg buffer The buffer to copy data into
|
/// \arg buffer The buffer to copy data into
|
||||||
|
/// \arg block If true, block the calling thread until there is data
|
||||||
/// \returns The number of bytes copied into the provided buffer
|
/// \returns The number of bytes copied into the provided buffer
|
||||||
size_t dequeue(util::buffer buffer);
|
size_t dequeue(util::buffer buffer, bool block = false);
|
||||||
|
|
||||||
/// Mark this channel as closed, all future calls to enqueue or
|
/// Mark this channel as closed, all future calls to enqueue or
|
||||||
/// dequeue messages will fail with j6_status_closed.
|
/// dequeue messages will fail with j6_status_closed.
|
||||||
@@ -51,10 +47,9 @@ private:
|
|||||||
size_t m_len;
|
size_t m_len;
|
||||||
uintptr_t m_data;
|
uintptr_t m_data;
|
||||||
bool m_closed;
|
bool m_closed;
|
||||||
bool m_can_send;
|
|
||||||
bool m_can_recv;
|
|
||||||
util::bip_buffer m_buffer;
|
util::bip_buffer m_buffer;
|
||||||
util::spinlock m_close_lock;
|
util::spinlock m_close_lock;
|
||||||
|
wait_queue m_queue;
|
||||||
};
|
};
|
||||||
|
|
||||||
} // namespace obj
|
} // namespace obj
|
||||||
|
|||||||
@@ -1,4 +1,5 @@
|
|||||||
#include <j6/errors.h>
|
#include <j6/errors.h>
|
||||||
|
#include <j6/flags.h>
|
||||||
#include <j6/types.h>
|
#include <j6/types.h>
|
||||||
#include <util/counted.h>
|
#include <util/counted.h>
|
||||||
|
|
||||||
@@ -29,13 +30,15 @@ channel_send(channel *self, void *data, size_t *data_len)
|
|||||||
}
|
}
|
||||||
|
|
||||||
j6_status_t
|
j6_status_t
|
||||||
channel_receive(channel *self, void *data, size_t *data_len)
|
channel_receive(channel *self, void *data, size_t *data_len, uint64_t flags)
|
||||||
{
|
{
|
||||||
if (self->closed())
|
if (self->closed())
|
||||||
return j6_status_closed;
|
return j6_status_closed;
|
||||||
|
|
||||||
util::buffer buffer {data, *data_len};
|
util::buffer buffer {data, *data_len};
|
||||||
*data_len = self->dequeue(buffer);
|
|
||||||
|
const bool block = flags & j6_channel_block;
|
||||||
|
*data_len = self->dequeue(buffer, block);
|
||||||
|
|
||||||
return j6_status_ok;
|
return j6_status_ok;
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -9,6 +9,10 @@ enum j6_vm_flags {
|
|||||||
j6_vm_flag_MAX
|
j6_vm_flag_MAX
|
||||||
};
|
};
|
||||||
|
|
||||||
|
enum j6_channel_flags {
|
||||||
|
j6_channel_block = 0x01,
|
||||||
|
};
|
||||||
|
|
||||||
enum j6_mailbox_flags {
|
enum j6_mailbox_flags {
|
||||||
j6_mailbox_block = 0x01,
|
j6_mailbox_block = 0x01,
|
||||||
};
|
};
|
||||||
|
|||||||
Reference in New Issue
Block a user