mirror of
https://github.com/justinian/jsix.git
synced 2025-12-10 00:14:32 -08:00
[kernel] Make channels stream based
Multiple changes regarding channels. Mainly channels are now stream based and can handle partial reads or writes. Channels now use the kernel buffers area with the related buffer_cache. Added a fake stdout stream channel and kernel task to read its contents to the screen in preparation for handing channels as stdin/stdout to processes.
This commit is contained in:
@@ -1,40 +1,49 @@
|
||||
#include "kutil/assert.h"
|
||||
|
||||
#include "buffer_cache.h"
|
||||
#include "kernel_memory.h"
|
||||
#include "objects/channel.h"
|
||||
|
||||
using memory::frame_size;
|
||||
using memory::kernel_buffer_pages;
|
||||
static constexpr size_t buffer_bytes = kernel_buffer_pages * frame_size;
|
||||
|
||||
channel::channel() :
|
||||
m_len(0),
|
||||
m_capacity(0),
|
||||
m_data(nullptr),
|
||||
m_data(g_kbuffer_cache.get_buffer()),
|
||||
m_buffer(reinterpret_cast<uint8_t*>(m_data), buffer_bytes),
|
||||
kobject(kobject::type::channel, j6_signal_channel_can_send)
|
||||
{
|
||||
}
|
||||
|
||||
channel::~channel()
|
||||
{
|
||||
kutil::kfree(m_data);
|
||||
if (!closed()) close();
|
||||
}
|
||||
|
||||
j6_status_t
|
||||
channel::enqueue(size_t len, void *data)
|
||||
channel::enqueue(size_t *len, const void *data)
|
||||
{
|
||||
// TODO: Make this thread safe!
|
||||
if (closed())
|
||||
return j6_status_closed;
|
||||
|
||||
if (!can_send())
|
||||
if (!len || !*len)
|
||||
return j6_err_invalid_arg;
|
||||
|
||||
if (m_buffer.free_space() == 0)
|
||||
return j6_err_not_ready;
|
||||
|
||||
if (m_capacity < len) {
|
||||
kutil::kfree(m_data);
|
||||
m_data = kutil::kalloc(len);
|
||||
m_capacity = len;
|
||||
kassert(m_data, "Failed to allocate memory to copy channel message");
|
||||
}
|
||||
void *buffer = nullptr;
|
||||
size_t avail = m_buffer.reserve(*len, &buffer);
|
||||
*len = *len > avail ? avail : *len;
|
||||
|
||||
kutil::memcpy(buffer, data, *len);
|
||||
m_buffer.commit(*len);
|
||||
|
||||
m_len = len;
|
||||
kutil::memcpy(m_data, data, len);
|
||||
assert_signal(j6_signal_channel_can_recv);
|
||||
if (m_buffer.free_space() == 0)
|
||||
deassert_signal(j6_signal_channel_can_send);
|
||||
|
||||
return j6_status_ok;
|
||||
}
|
||||
@@ -46,22 +55,33 @@ channel::dequeue(size_t *len, void *data)
|
||||
if (closed())
|
||||
return j6_status_closed;
|
||||
|
||||
if (!can_receive())
|
||||
return j6_err_not_ready;
|
||||
|
||||
if (!len)
|
||||
if (!len || !*len)
|
||||
return j6_err_invalid_arg;
|
||||
|
||||
if (*len < m_len)
|
||||
return j6_err_insufficient;
|
||||
if (m_buffer.size() == 0)
|
||||
return j6_err_not_ready;
|
||||
|
||||
void *buffer = nullptr;
|
||||
size_t avail = m_buffer.get_block(&buffer);
|
||||
*len = *len > avail ? avail : *len;
|
||||
|
||||
kutil::memcpy(data, buffer, *len);
|
||||
m_buffer.consume(*len);
|
||||
|
||||
kutil::memcpy(data, m_data, m_len);
|
||||
*len = m_len;
|
||||
assert_signal(j6_signal_channel_can_send);
|
||||
if (m_buffer.size() == 0)
|
||||
deassert_signal(j6_signal_channel_can_recv);
|
||||
|
||||
return j6_status_ok;
|
||||
}
|
||||
|
||||
void
|
||||
channel::close()
|
||||
{
|
||||
g_kbuffer_cache.return_buffer(m_data);
|
||||
assert_signal(j6_signal_channel_closed);
|
||||
}
|
||||
|
||||
void
|
||||
channel::on_no_handles()
|
||||
{
|
||||
|
||||
Reference in New Issue
Block a user