From b45e8cd81d0c29be258c419ea15f8d5ade1aa87f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Wiesner=20Andr=C3=A1s?= Date: Fri, 24 Feb 2023 11:07:12 +0100 Subject: [PATCH] -RX queue added -Multiplatform thread and semaphore definitions added --- eth_interface.c | 27 ++++++++++++++++++++++++++- eth_interface.h | 3 +++ gen_queue.c | 1 - msg_queue.c | 4 +++- 4 files changed, 32 insertions(+), 3 deletions(-) diff --git a/eth_interface.c b/eth_interface.c index b6410ba..5fede89 100644 --- a/eth_interface.c +++ b/eth_interface.c @@ -18,6 +18,9 @@ static void ethintf_register(EthInterface * intf) { intf->ioDef->llRxDone = ethintf_llrecv; } +// interface processing thread +static void * ethinf_proc_thread(void * param); + EthInterface *ethintf_new(EthIODef * io) { EthInterface * ethIntf = (EthInterface *)dynmem_alloc(sizeof(EthInterface)); ASSERT_NULL(ethIntf); @@ -31,14 +34,36 @@ EthInterface *ethintf_new(EthIODef * io) { ethintf_register(ethIntf); ethIntf->txQ = mq_create(ETHLIB_DEF_MQ_SIZE); + ethIntf->rxQ = mq_create(ETHLIB_DEF_MQ_SIZE); + ETHLIB_OS_SEM_CREATE(ðIntf->rxSem); + ETHLIB_OS_THREAD_DEFINE(ethinf_proc_thread, ipp, 10, 4096, ethIntf); + ETHLIB_OS_THREAD_CREATE(ipp, ethIntf); ethIntf->ipra = ipra_new(); return ethIntf; } +static void * ethinf_proc_thread(void * param) { + EthInterface * intf = (EthInterface *) param; + while (true) { + ETHLIB_OS_SEM_WAIT(&intf->rxSem); + if (mq_avail(intf->rxQ) > 0) { + RawPckt rawPckt = mq_top(intf->rxQ); + mq_pop(intf->rxQ); + packsieve_input(&intf->sieve, &rawPckt); + } + } + return NULL; +} + void ethinf_receive(EthInterface *intf, const RawPckt *rawPckt) { - packsieve_input(&intf->sieve, rawPckt); + bool pushOK = mq_push(intf->rxQ, rawPckt); + if (pushOK) { + ETHLIB_OS_SEM_POST(&intf->rxSem); + } else { + ERROR("Input queue full, packet dropped!\n"); + } } void ethinf_transmit(EthInterface *intf, const RawPckt *rawPckt) { diff --git a/eth_interface.h b/eth_interface.h index 43d0f86..7a7ccdc 100644 --- a/eth_interface.h +++ b/eth_interface.h @@ -8,6 +8,7 @@ #include "connection_block.h" #include "msg_queue.h" #include "prefab/conn_blocks/ipv4/ip_assembler.h" +#include "etherlib_options.h" /** * Ethernet interface low level definition. @@ -36,6 +37,8 @@ typedef struct EthInterface_ { ArpCache * arpc; ///< ARP cache ConnBlock arpCb; ///< ARP connection block MsgQueue * txQ; ///< Transmit queue + MsgQueue * rxQ; ///< Receive queue + ETHLIB_OS_SEM_TYPE rxSem; ///< Receive queue semaphore IPv4Assembler * ipra; ///< IPv4 reassembler } EthInterface; diff --git a/gen_queue.c b/gen_queue.c index af9dd77..119e857 100644 --- a/gen_queue.c +++ b/gen_queue.c @@ -37,7 +37,6 @@ bool q_push(Queue * q, const void * src) { // can push memcpy(q->elements + q->writeIdx * q->elemSize, src, q->elemSize); - q->writeIdx++; // advance write pointer q->writeIdx = MQ_NEXT(q->length, q->writeIdx); diff --git a/msg_queue.c b/msg_queue.c index 5326280..ebc2060 100644 --- a/msg_queue.c +++ b/msg_queue.c @@ -7,6 +7,8 @@ #include "msg_queue.h" #include "dynmem.h" +#include + MsgQueue *mq_create(uint32_t size) { MsgQueue * mq = (MsgQueue *) dynmem_alloc(sizeof(MsgQueue) + size * sizeof(RawPckt)); mq->size = size; @@ -35,7 +37,7 @@ bool mq_push(MsgQueue * mq, const RawPckt * raw) { } // can push - mq->pckts[mq->writeIdx++] = *raw; + mq->pckts[mq->writeIdx] = *raw; // advance write pointer mq->writeIdx = MQ_NEXT(mq->size, mq->writeIdx);