diff --git a/definitions/objects/channel.def b/definitions/objects/channel.def index e019462..ad1815a 100644 --- a/definitions/objects/channel.def +++ b/definitions/objects/channel.def @@ -1,3 +1,14 @@ object channel : kobject { uid 3ea38b96aa0e54c8 + + method create [constructor] + method close [destructor] + + method send { + param data buffer [inout] + } + + method receive { + param data buffer [out] + } } diff --git a/src/kernel/objects/channel.cpp b/src/kernel/objects/channel.cpp index b413ad0..1cb2613 100644 --- a/src/kernel/objects/channel.cpp +++ b/src/kernel/objects/channel.cpp @@ -24,63 +24,59 @@ channel::~channel() if (!closed()) close(); } -j6_status_t -channel::enqueue(size_t *len, const void *data) +size_t +channel::enqueue(const util::buffer &data) { - // TODO: Make this thread safe! - if (closed()) - return j6_status_closed; + util::scoped_lock lock {m_close_lock}; - if (!len || !*len) - return j6_err_invalid_arg; - - if (m_buffer.free_space() == 0) - return j6_err_not_ready; + if (closed()) return 0; + size_t len = data.count; void *buffer = nullptr; - size_t avail = m_buffer.reserve(*len, &buffer); - *len = *len > avail ? avail : *len; + size_t avail = m_buffer.reserve(len, &buffer); - memcpy(buffer, data, *len); - m_buffer.commit(*len); + len = len > avail ? avail : len; + + memcpy(buffer, data.pointer, len); + m_buffer.commit(len); + + if (len) + assert_signal(j6_signal_channel_can_recv); - assert_signal(j6_signal_channel_can_recv); if (m_buffer.free_space() == 0) deassert_signal(j6_signal_channel_can_send); - return j6_status_ok; + return len; } -j6_status_t -channel::dequeue(size_t *len, void *data) +size_t +channel::dequeue(util::buffer buffer) { - // TODO: Make this thread safe! - if (closed()) - return j6_status_closed; + util::scoped_lock lock {m_close_lock}; - if (!len || !*len) - return j6_err_invalid_arg; + if (closed()) return 0; - if (m_buffer.size() == 0) - return j6_err_not_ready; + void *data = nullptr; + size_t avail = m_buffer.get_block(&data); + size_t len = buffer.count > avail ? avail : buffer.count; - void *buffer = nullptr; - size_t avail = m_buffer.get_block(&buffer); - *len = *len > avail ? avail : *len; + memcpy(data, buffer.pointer, len); + m_buffer.consume(len); - memcpy(data, buffer, *len); - m_buffer.consume(*len); + if (len) + assert_signal(j6_signal_channel_can_send); - assert_signal(j6_signal_channel_can_send); if (m_buffer.size() == 0) deassert_signal(j6_signal_channel_can_recv); - return j6_status_ok; + return len; } void channel::close() { + util::scoped_lock lock {m_close_lock}; + kobject::close(); g_kernel_buffers.return_section(m_data); } diff --git a/src/kernel/objects/channel.h b/src/kernel/objects/channel.h index d2d44aa..15b3077 100644 --- a/src/kernel/objects/channel.h +++ b/src/kernel/objects/channel.h @@ -4,12 +4,14 @@ #include #include +#include +#include #include "objects/kobject.h" namespace obj { -/// Channels are bi-directional means of sending messages +/// Channels are uni-directional means of sending data class channel : public kobject { @@ -29,17 +31,14 @@ public: inline bool can_receive() const { return check_signal(j6_signal_channel_can_recv); } /// Put a message into the channel - /// \arg len [in] Bytes in data buffer [out] number of bytes written - /// \arg data Pointer to the message data - /// \returns j6_status_ok on success - j6_status_t enqueue(size_t *len, const void *data); + /// \arg data Buffer of data to write + /// \returns The number of bytes successfully written + size_t enqueue(const util::buffer &data); /// Get a message from the channel, copied into a provided buffer - /// \arg len On input, the size of the provided buffer. On output, - /// the size of the message copied into the buffer. - /// \arg data Pointer to the buffer - /// \returns j6_status_ok on success - j6_status_t dequeue(size_t *len, void *data); + /// \arg buffer The buffer to copy data into + /// \returns The number of bytes copied into the provided buffer + size_t dequeue(util::buffer buffer); /// Mark this channel as closed, all future calls to enqueue or /// dequeue messages will fail with j6_status_closed. @@ -52,6 +51,7 @@ private: size_t m_len; uintptr_t m_data; util::bip_buffer m_buffer; + util::spinlock m_close_lock; }; } // namespace obj diff --git a/src/kernel/syscalls/channel.cpp b/src/kernel/syscalls/channel.cpp index a8e0555..e1c075a 100644 --- a/src/kernel/syscalls/channel.cpp +++ b/src/kernel/syscalls/channel.cpp @@ -1,5 +1,6 @@ #include #include +#include #include "objects/channel.h" #include "syscalls/helpers.h" @@ -16,15 +17,37 @@ channel_create(j6_handle_t *self) } j6_status_t -channel_send(channel *self, size_t *len, void *data) +channel_send(channel *self, void *data, size_t *data_len) { - return self->enqueue(len, data); + if (self->closed()) + return j6_status_closed; + + const util::buffer buffer {data, *data_len}; + *data_len = self->enqueue(buffer); + + return j6_status_ok; } j6_status_t -channel_receive(channel *self, size_t *len, void *data) +channel_receive(channel *self, void *data, size_t *data_len) { - return self->dequeue(len, data); + if (self->closed()) + return j6_status_closed; + + util::buffer buffer {data, *data_len}; + *data_len = self->dequeue(buffer); + + return j6_status_ok; +} + +j6_status_t +channel_close(channel *self) +{ + if (self->closed()) + return j6_status_closed; + + self->close(); + return j6_status_ok; } } // namespace syscalls