Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 35 additions & 0 deletions sources/kernel/include/fs/drivers/broadcast_fs.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
#pragma once

#include <fs/filesystem.h>
#include <process/resource_manager.h>
#include <stdstring.h>

class CBroadcast_FS_Driver : public IFilesystem_Driver {
public:
virtual void On_Register() override {
//
}

virtual IFile *Open_File(const char *path, NFile_Open_Mode mode) override {
char bcastname[Max_Broadcast_Channel_Name_Length];
strncpy(bcastname, path, Max_Broadcast_Channel_Name_Length);

const int len = strlen(bcastname);
uint32_t bcastsize = Broadcast_Channel_Byte_Count_Unknown;
for (int i = 1; i < len - 1; i++) {
if (bcastname[i] == '#') {
bcastname[i] = '\0';
// explicitne receno, ze nevime, jak velky ma broadcast channel byt
if (bcastname[i + 1] == '?')
break;

bcastsize = atoi(&bcastname[i + 1]);
break;
}
}

return sProcess_Resource_Manager.Alloc_Broadcast_Channel(bcastname, bcastsize);
}
};

CBroadcast_FS_Driver fsBroadcast_FS_Driver;
1 change: 1 addition & 0 deletions sources/kernel/include/fs/filesystem.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ enum class NFile_Type_Major
Semaphore = 3, // semaphore virtual file
Condition_Var = 4, // podminkova promenna
Pipe = 5, // roura
Broadcast = 6, // broadcast channel
};

enum class NFile_Open_Mode
Expand Down
36 changes: 36 additions & 0 deletions sources/kernel/include/process/broadcast_channel.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
#pragma once

#include <fs/filesystem.h>
#include "mutex.h"

constexpr uint32_t Max_Broadcast_Readers = 16;

class CBroadcast_Channel : public IFile
{
private:
struct TReader_Info
{
uint32_t pid;
uint32_t last_read_generation;
};

char *mBuffer;
uint32_t mSize;
uint32_t mMessage_Length;
uint32_t mGeneration;

TReader_Info mReaders[Max_Broadcast_Readers];
uint32_t mReader_Count;

CMutex mMutex;

public:
CBroadcast_Channel();
~CBroadcast_Channel();

void Reset(uint32_t size);

virtual uint32_t Read(char *buffer, uint32_t len) override;
virtual uint32_t Write(const char *buffer, uint32_t len) override;
virtual bool Close() override;
};
19 changes: 19 additions & 0 deletions sources/kernel/include/process/resource_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include "semaphore.h"
#include "condvar.h"
#include "pipe.h"
#include "broadcast_channel.h"

// pocet predalokovanych mutexu (a zaroven max. pocet)
constexpr uint32_t Mutex_Count = 32;
Expand All @@ -27,10 +28,17 @@ constexpr uint32_t Pipe_Count = 16;
// maximalni delka jmena pipe
constexpr uint32_t Max_Pipe_Name_Length = 16;

// pocet predalokovanych broadcast channelu (a zaroven max. pocet)
constexpr uint32_t Broadcast_Channel_Count = 16;
// maximalni delka jmena broadcast channelu
constexpr uint32_t Max_Broadcast_Channel_Name_Length = 16;

// pri otevirani semaforu = pokud uz musel byt semafor otevreny
constexpr uint32_t Semaphore_Initial_Res_Count_Unknown = static_cast<uint32_t>(-1);
// pokud je pri otevirani pipe velikost neznama
constexpr uint32_t Pipe_Byte_Count_Unknown = static_cast<uint32_t>(-1);
// pokud je pri otevirani broadcast channelu velikost neznama
constexpr uint32_t Broadcast_Channel_Byte_Count_Unknown = static_cast<uint32_t>(-1);

class CProcess_Resource_Manager
{
Expand Down Expand Up @@ -63,10 +71,18 @@ class CProcess_Resource_Manager
unsigned int alloc_count;
};

struct TBroadcast_Channel_Record
{
CBroadcast_Channel bcast;
char name[Max_Broadcast_Channel_Name_Length];
unsigned int alloc_count;
};

TMutex_Record mMutexes[Mutex_Count];
TSemaphore_Record mSemaphores[Semaphore_Count];
TCond_Var_Record mCondVars[Cond_Var_Count];
TPipe_Record mPipes[Pipe_Count];
TBroadcast_Channel_Record mBroadcast_Channels[Broadcast_Channel_Count];

public:
CProcess_Resource_Manager();
Expand All @@ -83,6 +99,9 @@ class CProcess_Resource_Manager

CPipe* Alloc_Pipe(const char* name, uint32_t pipe_size);
void Free_Pipe(CPipe* pipe);

CBroadcast_Channel* Alloc_Broadcast_Channel(const char* name, uint32_t bcast_size);
void Free_Broadcast_Channel(CBroadcast_Channel* bcast);
};

extern CProcess_Resource_Manager sProcess_Resource_Manager;
2 changes: 2 additions & 0 deletions sources/kernel/src/fs/filesystem_drivers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
#include <fs/drivers/mutex_fs.h>
#include <fs/drivers/condvar_fs.h>
#include <fs/drivers/pipe_fs.h>
#include <fs/drivers/broadcast_fs.h>

// pole driveru - tady uvedeme vsechny, ktere jsou v systemu dostupne a ktere je zadouci pro tuto instanci naseho OS pripojit
const CFilesystem::TFS_Driver CFilesystem::gFS_Drivers[] = {
Expand All @@ -27,6 +28,7 @@ const CFilesystem::TFS_Driver CFilesystem::gFS_Drivers[] = {
{ "Semaphore", "SYS:sem", &fsSemaphore_FS_Driver },
{ "CondVar", "SYS:cv", &fsCond_Var_FS_Driver },
{ "Pipe", "SYS:pipe", &fsPipe_FS_Driver },
{ "Broadcast", "SYS:bcast", &fsBroadcast_FS_Driver },
};

// pocet FS driveru - je staticky spocitan z velikosti vyse uvedeneho pole
Expand Down
141 changes: 141 additions & 0 deletions sources/kernel/src/process/broadcast_channel.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
#include <process/broadcast_channel.h>
#include <memory/kernel_heap.h>
#include <process/process_manager.h>
#include <process/resource_manager.h>

#include "stdfile.h"

CBroadcast_Channel::CBroadcast_Channel()
: IFile(NFile_Type_Major::Broadcast)
{
//
}

CBroadcast_Channel::~CBroadcast_Channel()
{
if (mBuffer)
sKernelMem.Free(mBuffer);
}

void CBroadcast_Channel::Reset(uint32_t size)
{
if (mBuffer)
sKernelMem.Free(mBuffer);

mSize = size;
if (mSize > 0)
mBuffer = reinterpret_cast<char*>(sKernelMem.Alloc(size));
else
mBuffer = nullptr;

mMessage_Length = 0;
mGeneration = 0;
mReader_Count = 0;
}

uint32_t CBroadcast_Channel::Read(char *buffer, uint32_t len)
{
uint32_t current_pid = sProcessMgr.Get_Current_Process()->pid;
TReader_Info* reader_info = nullptr;

mMutex.Lock();

// Najdeme zaznam pro tento proces, nebo vytvorime novy
for (uint32_t i = 0; i < mReader_Count; i++)
{
if (mReaders[i].pid == current_pid)
{
reader_info = &mReaders[i];
break;
}
}
if (!reader_info && mReader_Count < Max_Broadcast_Readers)
{
reader_info = &mReaders[mReader_Count++];
reader_info->pid = current_pid;
reader_info->last_read_generation = 0; // Nikdy nic necetl
}

// Pokud jsme nenasli/nevytvorili zaznam, je chyba (malo mista)
if (!reader_info)
{
mMutex.Unlock();
return 0;
}

// Cekame, dokud neni k dispozici nova generace zpravy
while (mGeneration == 0 || reader_info->last_read_generation == mGeneration)
{
Wait_Enqueue_Current();
mMutex.Unlock();
sProcessMgr.Block_Current_Process();
mMutex.Lock();
}

// Precteme zpravu
uint32_t read_len = (mMessage_Length < len) ? mMessage_Length : len;
for (uint32_t i = 0; i < read_len; i++)
{
buffer[i] = mBuffer[i];
}

// Aktualizujeme generaci, kterou jsme cetli
reader_info->last_read_generation = mGeneration;

mMutex.Unlock();

return read_len;
}

uint32_t CBroadcast_Channel::Write(const char *buffer, uint32_t len)
{
mMutex.Lock();

// Zapiseme zpravu (prepiseme starou)
uint32_t write_len = (len < mSize) ? len : mSize;
for (uint32_t i = 0; i < write_len; i++)
{
mBuffer[i] = buffer[i];
}
mMessage_Length = write_len;

// Zvysime generaci
mGeneration++;
if (mGeneration == 0) // preteceni, 0 je specialni hodnota "zadna zprava"
mGeneration = 1;

// Probudime vsechny cekajici ctenare
Notify(NotifyAll);

mMutex.Unlock();

return write_len;
}

bool CBroadcast_Channel::Close()
{
mMutex.Lock();

// Odstranime zaznam o ctenari pro tento proces
uint32_t current_pid = sProcessMgr.Get_Current_Process()->pid;
for (uint32_t i = 0; i < mReader_Count; i++)
{
if (mReaders[i].pid == current_pid)
{
// Presuneme posledni prvek na misto mazaneho (aby pole zustalo spojite)
if (i != mReader_Count - 1)
{
mReaders[i] = mReaders[mReader_Count - 1];
}
mReader_Count--;
break;
}
}

mMutex.Unlock();

// Uvolnime zdroj ve spravci zdroju (snizi pocitadlo referenci)
sProcess_Resource_Manager.Free_Broadcast_Channel(this);

return true;
}
45 changes: 45 additions & 0 deletions sources/kernel/src/process/resource_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -191,3 +191,48 @@ void CProcess_Resource_Manager::Free_Pipe(CPipe* pipe)
}
}
}

CBroadcast_Channel* CProcess_Resource_Manager::Alloc_Broadcast_Channel(const char* name, uint32_t bcast_size)
{
for (uint32_t i = 0; i < Broadcast_Channel_Count; i++)
{
if (mBroadcast_Channels[i].alloc_count > 0)
{
if (strncmp(mBroadcast_Channels[i].name, name, Max_Broadcast_Channel_Name_Length) == 0)
{
mBroadcast_Channels[i].alloc_count++;
return &mBroadcast_Channels[i].bcast;
}
}
}

if (bcast_size == Broadcast_Channel_Byte_Count_Unknown)
return nullptr;

for (uint32_t i = 0; i < Broadcast_Channel_Count; i++)
{
if (mBroadcast_Channels[i].alloc_count == 0)
{
mBroadcast_Channels[i].bcast.Reset(bcast_size);

strncpy(mBroadcast_Channels[i].name, name, Max_Broadcast_Channel_Name_Length);
mBroadcast_Channels[i].alloc_count++;
return &mBroadcast_Channels[i].bcast;
}
}

return nullptr;
}

void CProcess_Resource_Manager::Free_Broadcast_Channel(CBroadcast_Channel* bcast)
{
for (uint32_t i = 0; i < Broadcast_Channel_Count; i++)
{
if (&mBroadcast_Channels[i].bcast == bcast && mBroadcast_Channels[i].alloc_count > 0)
{
if ((--mBroadcast_Channels[i].alloc_count) == 0)
mBroadcast_Channels[i].bcast.Reset(0);
return;
}
}
}