-RX queue added

-Multiplatform thread and semaphore definitions added
This commit is contained in:
Wiesner András 2023-02-24 11:07:12 +01:00
parent 704fef911f
commit b45e8cd81d
4 changed files with 32 additions and 3 deletions

View File

@ -18,6 +18,9 @@ static void ethintf_register(EthInterface * intf) {
intf->ioDef->llRxDone = ethintf_llrecv;
}
// interface processing thread
static void * ethinf_proc_thread(void * param);
EthInterface *ethintf_new(EthIODef * io) {
EthInterface * ethIntf = (EthInterface *)dynmem_alloc(sizeof(EthInterface));
ASSERT_NULL(ethIntf);
@ -31,14 +34,36 @@ EthInterface *ethintf_new(EthIODef * io) {
ethintf_register(ethIntf);
ethIntf->txQ = mq_create(ETHLIB_DEF_MQ_SIZE);
ethIntf->rxQ = mq_create(ETHLIB_DEF_MQ_SIZE);
ETHLIB_OS_SEM_CREATE(&ethIntf->rxSem);
ETHLIB_OS_THREAD_DEFINE(ethinf_proc_thread, ipp, 10, 4096, ethIntf);
ETHLIB_OS_THREAD_CREATE(ipp, ethIntf);
ethIntf->ipra = ipra_new();
return ethIntf;
}
static void * ethinf_proc_thread(void * param) {
EthInterface * intf = (EthInterface *) param;
while (true) {
ETHLIB_OS_SEM_WAIT(&intf->rxSem);
if (mq_avail(intf->rxQ) > 0) {
RawPckt rawPckt = mq_top(intf->rxQ);
mq_pop(intf->rxQ);
packsieve_input(&intf->sieve, &rawPckt);
}
}
return NULL;
}
void ethinf_receive(EthInterface *intf, const RawPckt *rawPckt) {
packsieve_input(&intf->sieve, rawPckt);
bool pushOK = mq_push(intf->rxQ, rawPckt);
if (pushOK) {
ETHLIB_OS_SEM_POST(&intf->rxSem);
} else {
ERROR("Input queue full, packet dropped!\n");
}
}
void ethinf_transmit(EthInterface *intf, const RawPckt *rawPckt) {

View File

@ -8,6 +8,7 @@
#include "connection_block.h"
#include "msg_queue.h"
#include "prefab/conn_blocks/ipv4/ip_assembler.h"
#include "etherlib_options.h"
/**
* Ethernet interface low level definition.
@ -36,6 +37,8 @@ typedef struct EthInterface_ {
ArpCache * arpc; ///< ARP cache
ConnBlock arpCb; ///< ARP connection block
MsgQueue * txQ; ///< Transmit queue
MsgQueue * rxQ; ///< Receive queue
ETHLIB_OS_SEM_TYPE rxSem; ///< Receive queue semaphore
IPv4Assembler * ipra; ///< IPv4 reassembler
} EthInterface;

View File

@ -37,7 +37,6 @@ bool q_push(Queue * q, const void * src) {
// can push
memcpy(q->elements + q->writeIdx * q->elemSize, src, q->elemSize);
q->writeIdx++;
// advance write pointer
q->writeIdx = MQ_NEXT(q->length, q->writeIdx);

View File

@ -7,6 +7,8 @@
#include "msg_queue.h"
#include "dynmem.h"
#include <stdio.h>
MsgQueue *mq_create(uint32_t size) {
MsgQueue * mq = (MsgQueue *) dynmem_alloc(sizeof(MsgQueue) + size * sizeof(RawPckt));
mq->size = size;
@ -35,7 +37,7 @@ bool mq_push(MsgQueue * mq, const RawPckt * raw) {
}
// can push
mq->pckts[mq->writeIdx++] = *raw;
mq->pckts[mq->writeIdx] = *raw;
// advance write pointer
mq->writeIdx = MQ_NEXT(mq->size, mq->writeIdx);