diff --git a/definitions/objects/channel.def b/definitions/objects/channel.def index e6819bc..6f96109 100644 --- a/definitions/objects/channel.def +++ b/definitions/objects/channel.def @@ -16,5 +16,6 @@ object channel : object { method receive [cap:receive] { param data buffer [out] + param flags uint64 } } diff --git a/src/kernel/objects/channel.cpp b/src/kernel/objects/channel.cpp index 9111910..fa54572 100644 --- a/src/kernel/objects/channel.cpp +++ b/src/kernel/objects/channel.cpp @@ -3,6 +3,7 @@ #include "assert.h" #include "memory.h" #include "objects/channel.h" +#include "objects/thread.h" #include "objects/vm_area.h" extern obj::vm_area_guarded g_kernel_buffers; @@ -12,10 +13,11 @@ namespace obj { constexpr size_t buffer_bytes = mem::kernel_buffer_pages * mem::frame_size; channel::channel() : - m_len(0), - m_data(g_kernel_buffers.get_section()), - m_buffer(reinterpret_cast(m_data), buffer_bytes), - kobject(kobject::type::channel) + m_len {0}, + m_data {g_kernel_buffers.get_section()}, + m_closed {false}, + m_buffer {reinterpret_cast(m_data), buffer_bytes}, + kobject {kobject::type::channel} { } @@ -33,24 +35,23 @@ channel::enqueue(const util::buffer &data) size_t len = data.count; void *buffer = nullptr; - size_t avail = m_buffer.reserve(len, &buffer); - - len = len > avail ? avail : len; + len = m_buffer.reserve(len, &buffer); memcpy(buffer, data.pointer, len); m_buffer.commit(len); - if (len) - m_can_recv = true; + if (len) { + thread *t = m_queue.pop_next(); - if (m_buffer.free_space() == 0) - m_can_send = false; + lock.release(); + if (t) t->wake(); + } return len; } size_t -channel::dequeue(util::buffer buffer) +channel::dequeue(util::buffer buffer, bool block) { util::scoped_lock lock {m_close_lock}; @@ -58,17 +59,21 @@ channel::dequeue(util::buffer buffer) void *data = nullptr; size_t avail = m_buffer.get_block(&data); + if (!avail && block) { + thread &cur = thread::current(); + m_queue.add_thread(&cur); + + lock.release(); + cur.block(); + lock.reacquire(); + avail = m_buffer.get_block(&data); + } + size_t len = buffer.count > avail ? avail : buffer.count; - memcpy(data, buffer.pointer, len); + memcpy(buffer.pointer, data, len); m_buffer.consume(len); - if (len) - m_can_send = true; - - if (m_buffer.size() == 0) - m_can_recv = false; - return len; } diff --git a/src/kernel/objects/channel.h b/src/kernel/objects/channel.h index 8d286f7..d4af43b 100644 --- a/src/kernel/objects/channel.h +++ b/src/kernel/objects/channel.h @@ -8,6 +8,7 @@ #include #include "objects/kobject.h" +#include "wait_queue.h" namespace obj { @@ -24,12 +25,6 @@ public: static constexpr kobject::type type = kobject::type::channel; - /// Check if the channel has space for a message to be sent - inline bool can_send() const { return m_can_send; } - - /// Check if the channel has a message wiating already - inline bool can_receive() const { return m_can_recv; } - /// Put a message into the channel /// \arg data Buffer of data to write /// \returns The number of bytes successfully written @@ -37,8 +32,9 @@ public: /// Get a message from the channel, copied into a provided buffer /// \arg buffer The buffer to copy data into + /// \arg block If true, block the calling thread until there is data /// \returns The number of bytes copied into the provided buffer - size_t dequeue(util::buffer buffer); + size_t dequeue(util::buffer buffer, bool block = false); /// Mark this channel as closed, all future calls to enqueue or /// dequeue messages will fail with j6_status_closed. @@ -51,10 +47,9 @@ private: size_t m_len; uintptr_t m_data; bool m_closed; - bool m_can_send; - bool m_can_recv; util::bip_buffer m_buffer; util::spinlock m_close_lock; + wait_queue m_queue; }; } // namespace obj diff --git a/src/kernel/syscalls/channel.cpp b/src/kernel/syscalls/channel.cpp index e1c075a..34dd985 100644 --- a/src/kernel/syscalls/channel.cpp +++ b/src/kernel/syscalls/channel.cpp @@ -1,4 +1,5 @@ #include +#include #include #include @@ -29,13 +30,15 @@ channel_send(channel *self, void *data, size_t *data_len) } j6_status_t -channel_receive(channel *self, void *data, size_t *data_len) +channel_receive(channel *self, void *data, size_t *data_len, uint64_t flags) { if (self->closed()) return j6_status_closed; util::buffer buffer {data, *data_len}; - *data_len = self->dequeue(buffer); + + const bool block = flags & j6_channel_block; + *data_len = self->dequeue(buffer, block); return j6_status_ok; } diff --git a/src/libraries/j6/j6/flags.h b/src/libraries/j6/j6/flags.h index d1ff8d2..d5242f4 100644 --- a/src/libraries/j6/j6/flags.h +++ b/src/libraries/j6/j6/flags.h @@ -9,6 +9,10 @@ enum j6_vm_flags { j6_vm_flag_MAX }; +enum j6_channel_flags { + j6_channel_block = 0x01, +}; + enum j6_mailbox_flags { j6_mailbox_block = 0x01, };