From 8a5c800fd38dbbe5ee819f469b9135c314b03b4b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Wiesner=20Andr=C3=A1s?= Date: Wed, 22 Nov 2023 20:55:50 +0100 Subject: [PATCH] - BlockingFifo implemented - get_interface_by_address() added - HTTP server initials added - code added to strip away padding on processing the IP layer by shrinking overall packet size - load of TCP fixes and improvements - TCP stream interface added - TCP window destroy() added and some bugs fixed --- apps/http_server.c | 28 +++++ apps/http_server.h | 19 ++++ blocking_fifo.c | 124 +++++++++++++++++++++ blocking_fifo.h | 45 ++++++++ global_state.c | 4 + global_state.h | 7 ++ packet_registry.h | 10 +- packet_sieve.c | 10 +- prefab/conn_blocks/tcp/tcp_window.c | 12 ++- prefab/conn_blocks/tcp/tcp_window.h | 12 ++- prefab/conn_blocks/tcp_connblock.c | 161 ++++++++++++++++++++++------ prefab/conn_blocks/tcp_connblock.h | 49 +++++++-- prefab/packet_parsers/ipv4_packet.c | 16 ++- tcp_connection_manager.c | 5 - tcp_connection_manager.h | 19 ---- 15 files changed, 447 insertions(+), 74 deletions(-) create mode 100644 apps/http_server.c create mode 100644 apps/http_server.h create mode 100644 blocking_fifo.c create mode 100644 blocking_fifo.h delete mode 100644 tcp_connection_manager.c delete mode 100644 tcp_connection_manager.h diff --git a/apps/http_server.c b/apps/http_server.c new file mode 100644 index 0000000..8dc3994 --- /dev/null +++ b/apps/http_server.c @@ -0,0 +1,28 @@ +// +// Created by epagris on 2023.11.22.. +// + +#include +#include "http_server.h" +#include "../dynmem.h" +#include "../utils.h" +#include "../prefab/conn_blocks/tcp_connblock.h" +#include "../global_state.h" + +HttpServer *http_new(ip4_addr addr, uint16_t port) { + // acquire interface + EthInterface * intf = get_interface_by_address(addr); + if (intf == NULL) { + ERROR("Unknown interface!\n"); + return NULL; + } + + // allocate instance + HttpServer * http = (HttpServer *) dynmem_alloc(sizeof(HttpServer)); + ASSERT_NULL(http); + + // open server connection + //http->serverConn = tcp_new_connblock(intf, addr, port, ); + + return http; +} diff --git a/apps/http_server.h b/apps/http_server.h new file mode 100644 index 0000000..601dba8 --- /dev/null +++ b/apps/http_server.h @@ -0,0 +1,19 @@ +#ifndef ETHERLIB_TEST_HTTP_SERVER_H +#define ETHERLIB_TEST_HTTP_SERVER_H + +#include "../cbd_table.h" +#include "etherlib/prefab/packet_parsers/ipv4_types.h" + +typedef struct { + cbd serverConn; ///< Server connection +} HttpServer; + +/** + * Create a new HTTP-server + * @param addr server bind address + * @param port server port + * @return pointer to newly allocated Http server OR NULL on failure + */ +HttpServer * http_new(ip4_addr addr, uint16_t port); + +#endif //ETHERLIB_TEST_HTTP_SERVER_H diff --git a/blocking_fifo.c b/blocking_fifo.c new file mode 100644 index 0000000..dc22172 --- /dev/null +++ b/blocking_fifo.c @@ -0,0 +1,124 @@ +// +// Created by epagris on 2023.11.22.. +// + +#include +#include "blocking_fifo.h" +#include "dynmem.h" +#include "utils.h" + +#define MIN(a,b) (((a) < (b)) ? (a) : (b)) + +BlockingFifo *bfifo_new(uint32_t size) { + uint32_t allocSize = sizeof(BlockingFifo) + size; // calculate full allocation size + BlockingFifo * fifo = (BlockingFifo *) dynmem_alloc(allocSize); // allocate data block at the end + ASSERT_NULL(fifo); + memset(fifo, 0, sizeof(BlockingFifo)); + fifo->size = size; + ETHLIB_OS_SEM_CREATE(&fifo->nonEmpty, 1); + ETHLIB_OS_SEM_WAIT(&fifo->nonEmpty); // FIFO is empty + ETHLIB_OS_SEM_CREATE(&fifo->notFull, 1); + return fifo; +} + +void bfifo_destroy(BlockingFifo * bfifo) { + ETHLIB_OS_SEM_DESTROY(&bfifo->nonEmpty); + ETHLIB_OS_SEM_DESTROY(&bfifo->notFull); + bfifo->size = 0; + dynmem_free(bfifo); +} + +uint32_t bfifo_get_used(const BlockingFifo *bfifo) { + return bfifo->absWriteIdx - bfifo->absReadIdx; +} + +uint32_t bfifo_get_free(const BlockingFifo * bfifo) { + return bfifo->size - bfifo_get_used(bfifo); +} + +uint32_t bfifo_push_try(BlockingFifo * bfifo, const uint8_t * data, uint32_t size) { + if (bfifo_get_free(bfifo) == 0) { + return 0; + } else { + return bfifo_push(bfifo, data, size); + } +} + +uint32_t bfifo_push(BlockingFifo * bfifo, const uint8_t * data, uint32_t size) { + ETHLIB_OS_SEM_WAIT(&bfifo->notFull); // take not full semaphore + + // calculate copy size, limit if required + uint32_t freeArea = bfifo_get_free(bfifo); + uint32_t copySize = MIN(size, freeArea); + + // determine first block size + uint32_t spaceToBufEnd = bfifo->size - bfifo->writeIdx; // space to buffer end + uint32_t firstBlockSize = MIN(spaceToBufEnd, copySize); // first block size + uint32_t secondBlockSize = (firstBlockSize < copySize) ? copySize - firstBlockSize : 0; // second block size + memcpy(bfifo->data + bfifo->writeIdx, data, firstBlockSize); // copy first block + if (secondBlockSize > 0) { // copy second block if needed + memcpy(bfifo->data, data + firstBlockSize, secondBlockSize); + } + + // adjust indices + uint32_t newWriteIdx = bfifo->writeIdx + copySize; // advance write index + if (newWriteIdx >= bfifo->size) { + newWriteIdx -= bfifo->size; + } + bfifo->writeIdx = newWriteIdx; // replace... + bfifo->absWriteIdx += copySize; + + freeArea = bfifo_get_free(bfifo); + if (freeArea > 0) { // FIFO is not full + ETHLIB_OS_SEM_POST(&bfifo->notFull); + } + if (freeArea != bfifo->size) { // FIFO is not empty + ETHLIB_OS_SEM_POST(&bfifo->nonEmpty); + } + + return copySize; +} + +uint32_t bfifo_peek(BlockingFifo * bfifo, uint8_t * dst, uint32_t size) { + // determine copy size + uint32_t usedArea = bfifo_get_used(bfifo); + uint32_t copySize = MIN(size, usedArea); + + uint32_t spaceToBufEnd = bfifo->size - bfifo->readIdx; // space to buffer end + uint32_t firstBlockSize = MIN(spaceToBufEnd, copySize); // first block size + uint32_t secondBlockSize = (firstBlockSize < copySize) ? copySize - firstBlockSize : 0; // second block size + memcpy(dst, bfifo->data + bfifo->readIdx, firstBlockSize); // copy first block + if (secondBlockSize > 0) { // copy second block if needed + memcpy(dst + firstBlockSize, bfifo->data, secondBlockSize); + } + return copySize; +} + +uint32_t bfifo_pop(BlockingFifo *bfifo, uint8_t *dst, uint32_t size) { + ETHLIB_OS_SEM_WAIT(&bfifo->nonEmpty); + + // if destination is given, then also perform a copy + if (dst != NULL) { + bfifo_peek(bfifo, dst, size); + } + + // adjust indices + uint32_t usedArea = bfifo_get_used(bfifo); + uint32_t popSize = MIN(size, usedArea); // advance next written + uint32_t newReadIdx = bfifo->readIdx + popSize; + if (newReadIdx >= bfifo->size) { + newReadIdx -= bfifo->size; + } + bfifo->readIdx = newReadIdx; // replace it instead of performing an in-place change + bfifo->absReadIdx += popSize; + + usedArea = bfifo_get_used(bfifo); + if (usedArea > 0) { // FIFO is NOT empty + ETHLIB_OS_SEM_POST(&bfifo->nonEmpty); + } + if (usedArea != bfifo->size) { // FIFO is NOT full + ETHLIB_OS_SEM_POST(&bfifo->notFull); + } + + return popSize; +} \ No newline at end of file diff --git a/blocking_fifo.h b/blocking_fifo.h new file mode 100644 index 0000000..6c14112 --- /dev/null +++ b/blocking_fifo.h @@ -0,0 +1,45 @@ +// +// Created by epagris on 2023.11.22.. +// + +#ifndef ETHERLIB_TEST_BLOCKING_FIFO_H +#define ETHERLIB_TEST_BLOCKING_FIFO_H + +#include +#include + +/** + * Blocking FIFO + */ +typedef struct { + uint32_t readIdx; ///< Read index + uint32_t absReadIdx; ///< Absolute read index + uint32_t writeIdx; ///< Write index + uint32_t absWriteIdx; ///< Absolute write index + uint32_t size; ///< Size of the storage area + ETHLIB_OS_SEM_TYPE notFull, nonEmpty; ///< Semaphore, protecting read and write operations + uint8_t data[]; ///< Window data storage +} BlockingFifo; + +/** + * Create new blocking FIFO. + * @param size size of the FIFO in bytes + * @return newly allocated FIFO + */ +BlockingFifo * bfifo_new(uint32_t size); + +void bfifo_destroy(BlockingFifo * bfifo); + +uint32_t bfifo_get_used(const BlockingFifo * bfifo); + +uint32_t bfifo_get_free(const BlockingFifo * bfifo); + +uint32_t bfifo_push(BlockingFifo * bfifo, const uint8_t * data, uint32_t size); + +uint32_t bfifo_push_try(BlockingFifo * bfifo, const uint8_t * data, uint32_t size); + +uint32_t bfifo_peek(BlockingFifo * bfifo, uint8_t * dst, uint32_t size); + +uint32_t bfifo_pop(BlockingFifo *bfifo, uint8_t *dst, uint32_t size); + +#endif //ETHERLIB_TEST_BLOCKING_FIFO_H diff --git a/global_state.c b/global_state.c index c814b3c..48ed029 100644 --- a/global_state.c +++ b/global_state.c @@ -96,3 +96,7 @@ void close_connection(cbd d) { EthInterface *get_default_interface() { return E.ethIntf; } + +EthInterface * get_interface_by_address(ip4_addr addr) { + return get_default_interface(); +} diff --git a/global_state.h b/global_state.h index 2d99e70..ece3889 100644 --- a/global_state.h +++ b/global_state.h @@ -41,4 +41,11 @@ void close_connection(cbd d); */ EthInterface * get_default_interface(); +/** + * Get interface by address. + * @param addr IP address of a specific interface + * @return pointer to interface OR NULL if not found + */ +EthInterface * get_interface_by_address(ip4_addr addr); + #endif //ETHERLIB_GLOBAL_STATE_H diff --git a/packet_registry.h b/packet_registry.h index 02d7100..0d04c3d 100644 --- a/packet_registry.h +++ b/packet_registry.h @@ -46,12 +46,18 @@ typedef struct { // special processing function returns #define PROC_FN_RET_OK (0) // OK, go ahead! -#define PROC_FN_RET_ABORT (-1) // abort further processing -#define PROC_FN_RET_REPRST (-2) // replace packet and restart packet processing +#define PROC_FN_RET_ABORT (1) // abort further processing +#define PROC_FN_RET_REPRST (2) // replace packet and restart packet processing + +#define PROC_FN_FLAG_STRIP_PAD (1 << 16) // Remove padding from the end of the packet + +#define MASK_PROC_FN_RET(a) ((a) & 0xFFFF) +#define MASK_PROC_FN_FLAGS(a) ((a) & 0xFFFF0000) typedef struct { void * p; uint32_t u; + int32_t i; bool8_t b; } PcktProcFnPassbackData; diff --git a/packet_sieve.c b/packet_sieve.c index 9fcbf51..2ec171a 100644 --- a/packet_sieve.c +++ b/packet_sieve.c @@ -86,7 +86,15 @@ void packsieve_input(PcktSieve *sieve, const RawPckt *rawPckt) { // [3.] call parsing function PcktProcFnPassbackData pb; procRet = cdesc->procFun(data + offset, size - offset, header, sieve->intf, &pb); - switch (procRet) { // execute further action based on the return value of the parsing function + + // process flags + uint32_t flags = MASK_PROC_FN_FLAGS(procRet); + if (flags & PROC_FN_FLAG_STRIP_PAD) { // strip padding + size -= pb.i; + } + + // act according to return value + switch (MASK_PROC_FN_RET(procRet)) { // execute further action based on the return value of the parsing function case PROC_FN_RET_REPRST: // replace packet and restart processing dynmem_free(data); // release previous packet data data = pb.p; // store new packet data passed back in pb diff --git a/prefab/conn_blocks/tcp/tcp_window.c b/prefab/conn_blocks/tcp/tcp_window.c index b03a1c3..d95f73f 100644 --- a/prefab/conn_blocks/tcp/tcp_window.c +++ b/prefab/conn_blocks/tcp/tcp_window.c @@ -17,11 +17,15 @@ TcpWindow *tcpw_create(uint32_t size) { return tcpw; } -void tcpw_set_seqnum_offset(TcpWindow * tcpw, uint32_t offset) { - tcpw->firstByteOffset = offset; +void tcpw_destroy(TcpWindow * tcpw) { + dynmem_free(tcpw); } -#define TCP_WINDOW_MAX_WINSIZE_PADDING (16) // keep-out zone, this way allocated blocks will not block registry table growth TODO: not the best solution +void tcpw_set_seqnum_offset(TcpWindow * tcpw, uint32_t offset) { + tcpw->firstByteOffset = offset; + tcpw->nextWritten = offset; + tcpw->firstUnACK = offset; +} static inline uint32_t tcpw_get_occupied_size(TcpWindow * tcpw) { return tcpw->nextWritten - tcpw->firstUnACK; @@ -31,7 +35,7 @@ uint32_t tcpw_get_max_window_size(TcpWindow * tcpw) { return tcpw->size - tcpw_get_occupied_size(tcpw); } -uint32_t tcpw_store_segment(TcpWindow * tcpw, const uint8_t * data, uint32_t size, uint32_t seqNum) { +uint32_t tcpw_store(TcpWindow * tcpw, const uint8_t * data, uint32_t size, uint32_t seqNum) { // check that seqNum is the continuation of the previous content if (seqNum != (tcpw->nextWritten)) { // if not, then cannot store return 0; diff --git a/prefab/conn_blocks/tcp/tcp_window.h b/prefab/conn_blocks/tcp/tcp_window.h index 5ae9e65..5f3cda2 100644 --- a/prefab/conn_blocks/tcp/tcp_window.h +++ b/prefab/conn_blocks/tcp/tcp_window.h @@ -2,6 +2,8 @@ #define ETHERLIB_TCP_WINDOW_H #include +#include + //#include "../../../memory_pool.h" /** @@ -22,6 +24,12 @@ typedef struct { */ TcpWindow * tcpw_create(uint32_t size); +/** + * Destruct TCP Window object. + * @param tcpw pointer to previously allocated TCP Window object + */ +void tcpw_destroy(TcpWindow * tcpw); + /** * Set sequence number offset * @param tcpw @@ -42,9 +50,9 @@ uint32_t tcpw_get_max_window_size(TcpWindow * tcpw); * @param data data to store * @param size data size in bytes * @param seqNum sequence number of first octet of the block - * @return pointer to TcpWindowSegment allocated in the window OR NULL on failure + * @return number of stored bytes */ -uint32_t tcpw_store_segment(TcpWindow * tcpw, const uint8_t * data, uint32_t size, uint32_t seqNum); +uint32_t tcpw_store(TcpWindow * tcpw, const uint8_t * data, uint32_t size, uint32_t seqNum); /** * Acknowledge segment. diff --git a/prefab/conn_blocks/tcp_connblock.c b/prefab/conn_blocks/tcp_connblock.c index db301cd..c46ec1d 100644 --- a/prefab/conn_blocks/tcp_connblock.c +++ b/prefab/conn_blocks/tcp_connblock.c @@ -19,11 +19,12 @@ // TODO: - ne próbáljunk két kapcsolatot nyitni ugyanarra a portra... // TODO: - ez az "ok" változó nem túl jó... // TODO: - retry connection nincs kész +// TODO: - server socket egy csomó erőforrást felszabadíthat // ---------- -static inline cbd tcp_new_connblock_filtcond(EthInterface *intf, ip4_addr ipAddr, uint16_t port, SieveCallBackFn cbFn, - const PcktSieveFilterCondition *filtCond); +static inline cbd tcp_new_connblock_filtcond(EthInterface *intf, ip4_addr ipAddr, uint16_t port, StreamCallBackFn cbFn, + const PcktSieveFilterCondition *filtCond); // ---------- @@ -48,19 +49,34 @@ static bool filtTcp(const PcktSieveFilterCondition *filtCond, const PcktProps *c return ipProps->Protocol == ETH_TCP_PACKET_CLASS && references_us; } -static inline TcpState * tcps_create() { - TcpState * tcps = dynmem_alloc(sizeof(TcpState)); - tcps->txWin = tcpw_create(ETHLIB_DEF_TCP_WINDOW_SIZE); +#define MAX_TCP_WINDOW_SIZE (1200) + +static inline TcpState *tcps_create() { + TcpState *tcps = dynmem_alloc(sizeof(TcpState)); + uint32_t winSize = MAX(ETHLIB_DEF_TCP_WINDOW_SIZE, MAX_TCP_WINDOW_SIZE); + tcps->txWin = tcpw_create(winSize); // create transmit window + ETHLIB_OS_SEM_CREATE(&tcps->txBufNotFull, 1); // create transmit buffer not full semaphore + ETHLIB_OS_SEM_CREATE(&tcps->txInProgress, 1); // create transmission in progress semaphore return tcps; } -static inline void tcps_remove_and_cleanup(const TcpState * tcps) { +static inline void tcps_release_client_related_objects(TcpState *tcps) { + tcpw_destroy(tcps->txWin); + ETHLIB_OS_SEM_DESTROY(&tcps->txBufNotFull); + ETHLIB_OS_SEM_DESTROY(&tcps->txInProgress); + bfifo_destroy(tcps->rxFifo); +} + +static inline void tcps_remove_and_cleanup(TcpState *tcps) { if (tcps == NULL) { return; } - dynmem_free(tcps->txWin); + if (!tcps->isServer) { + tcps_release_client_related_objects(tcps); + } + dynmem_free(tcps); } @@ -78,6 +94,7 @@ void tcps_init(TcpState *tcps, uint16_t localPort) { tcps->txSeqNum = (uint32_t) rand(); tcps->rxAckNum = 0; + tcps->lastTxSeqNumAcked = tcps->txSeqNum; tcps->localWindow = ETHLIB_DEF_TCP_WINDOW_SIZE; tcpw_set_seqnum_offset(tcps->txWin, tcps->txSeqNum); @@ -87,10 +104,12 @@ void tcps_init(TcpState *tcps, uint16_t localPort) { tcps->retries = 0; tcps->debug = false; - tcps->userCb = NULL; -} + tcps->streamCb = NULL; + tcps->acceptCb = NULL; + tcps->isServer = false; -#define TCP_FETCH_STATE_FROM_CONNBLOCK(connBlock) ((TcpState *) (connBlock)->sieveLayer->tag.p) + tcps->rxFifo = bfifo_new(ETHLIB_DEF_FIFO_SIZE); +} void tcp_init_connection(ConnBlock *connBlock); @@ -109,7 +128,7 @@ static inline cbd tcps_create_based_on_listening(const TcpState *source, uint16_ // create connblock EthInterface *intf = source->connBlock.sieve->intf; // retrieve interface - cbd d = tcp_new_connblock_filtcond(intf, IPv4_IF_ADDR, source->localPort, source->userCb, &filtCond); + cbd d = tcp_new_connblock_filtcond(intf, IPv4_IF_ADDR, source->localPort, source->streamCb, &filtCond); if (!cbdt_get_connection_block(E.cbdt, d, connBlock)) { ERROR("Invalid CBD descriptor: '%d'!\n", d); return CBDT_ERR; @@ -175,6 +194,7 @@ int tcp_receive_segment_cb(const Pckt *pckt, PcktSieveLayerTag tag) { // send ACK int flags = TCP_FLAG_ACK; tcps->txSeqNum++; // advance sequence number // TODO... + tcpw_set_seqnum_offset(tcps->txWin, tcps->txSeqNum); tcp_send_segment(&tcps->connBlock, flags, NULL, NULL, 0); // save filter values @@ -214,11 +234,16 @@ int tcp_receive_segment_cb(const Pckt *pckt, PcktSieveLayerTag tag) { // create connection based on the listening connection ConnBlock connB; - tcps_create_based_on_listening(tcps, tcpProps->SourcePort, ipProps->SourceIPAddr, &connB); + cbd client_d = tcps_create_based_on_listening(tcps, tcpProps->SourcePort, ipProps->SourceIPAddr, &connB); tcp_send_segment(&connB, flags, NULL, NULL, 0); TcpState *target = TCP_FETCH_STATE_FROM_CONNBLOCK(&connB); target->txSeqNum++; + tcpw_set_seqnum_offset(target->txWin, target->txSeqNum); + + if ((client_d > 0) && (tcps->acceptCb != NULL)) { + tcps->acceptCb(client_d); + } } } @@ -237,24 +262,43 @@ int tcp_receive_segment_cb(const Pckt *pckt, PcktSieveLayerTag tag) { // step into next state tcps->connState = TCP_STATE_LAST_ACK; } else if ((dataSize > 0) && (tcps->rxAckNum == tcpProps->SequenceNumber)) { // incoming data segment with integrity in ack/seq + // try to store packet content into the FIFO + uint32_t pushSize = bfifo_push_try(tcps->rxFifo, pckt->payload, pckt->payloadSize); + + // adjust receive window size + tcps->localWindow = bfifo_get_free(tcps->rxFifo); + + // send acknowledge + tcps->rxAckNum += pushSize; + // send acknowledge - tcps->rxAckNum += dataSize; tcp_send_segment(&tcps->connBlock, TCP_FLAG_ACK, NULL, NULL, 0); // process data - if (tcps->userCb != NULL) { - PcktSieveLayerTag userTag = {0}; // TODO... - tcps->userCb(pckt, userTag); + if (tcps->streamCb != NULL) { + tcps->streamCb(tcps->d); + } + } else if ((dataSize == 0) && (tcps->txSeqNum == tcpProps->AcknowledgementNumber)) { // it is a valid ACK segment + if (tcps->lastTxSeqNumAcked < tcps->txSeqNum) { // outgoing segment was acknowledged by peer + // release segment from retransmit window + tcpw_acknowledge(tcps->txWin, tcpProps->AcknowledgementNumber); + + // release the transmit semaphore + ETHLIB_OS_SEM_POST(&tcps->txBufNotFull); + + // store last TX sequence number transmitted in ACK + tcps->lastTxSeqNumAcked = tcps->txSeqNum; + } else if (tcps->lastTxSeqNumAcked == tcps->txSeqNum) { // Keep-Alive, respond with some ACK + tcp_send_segment(&tcps->connBlock, TCP_FLAG_ACK, NULL, NULL, 0); } - } else if ((dataSize == 0) && (tcps->txSeqNum == tcpProps->AcknowledgementNumber)) { // outgoing segment was acknowledged by peer - // release segment from retransmit window - tcpw_acknowledge(tcps->txWin, tcpProps->AcknowledgementNumber); } break; case TCP_STATE_LAST_ACK: /* last ACK received */ if (tcpProps->Flags & TCP_FLAG_ACK) { tcps->connState = TCP_STATE_CLOSED; + ETHLIB_OS_SEM_POST(&tcps->txBufNotFull); + ETHLIB_OS_SEM_WAIT(&tcps->txInProgress); ret = SIEVE_LAYER_REMOVE_THIS; // if peer closed the connection, remove sieve layer } break; @@ -326,8 +370,10 @@ void tcp_listen(cbd d) { } TcpState *tcps = TCP_FETCH_STATE_FROM_CONNBLOCK(&connBlock); - if ((tcps->userCb != NULL) && (tcps->connState == TCP_STATE_CLOSED)) { + if ((tcps->streamCb != NULL) && (tcps->connState == TCP_STATE_CLOSED)) { // step into listen state tcps->connState = TCP_STATE_LISTEN; + tcps->isServer = true; + tcps_release_client_related_objects(tcps); } } @@ -344,7 +390,7 @@ void tcp_debug(cbd d, bool debug) { // --------------------- -static inline cbd tcp_new_connblock_filtcond(EthInterface *intf, ip4_addr ipAddr, uint16_t port, SieveCallBackFn cbFn, const PcktSieveFilterCondition *filtCond) { +static inline cbd tcp_new_connblock_filtcond(EthInterface *intf, ip4_addr ipAddr, uint16_t port, StreamCallBackFn cbFn, const PcktSieveFilterCondition *filtCond) { ConnBlock tcpConnB; connb_init_defaults(&tcpConnB); @@ -353,7 +399,7 @@ static inline cbd tcp_new_connblock_filtcond(EthInterface *intf, ip4_addr ipAddr PcktSieveLayerTag tag; // store TCP state into sieve layer's tag TcpState *tcpState = tcps_create(); tcps_init(tcpState, port); - tcpState->userCb = cbFn; + tcpState->streamCb = cbFn; tag.p = tcpState; tcpConnB.sieveLayer = packsieve_new_layer(ipConnB.sieveLayer, filtCond, false, filtTcp, tcp_receive_segment_cb, tag, ETH_TCP_PACKET_CLASS); @@ -372,7 +418,7 @@ static inline cbd tcp_new_connblock_filtcond(EthInterface *intf, ip4_addr ipAddr return d; } -cbd tcp_new_connblock(EthInterface *intf, ip4_addr ipAddr, uint16_t port, SieveCallBackFn cbFn) { +cbd tcp_new_connblock(EthInterface *intf, ip4_addr ipAddr, uint16_t port, StreamCallBackFn cbFn) { PcktSieveFilterCondition filtCond; packfiltcond_zero(&filtCond); TCP_LOCAL_PORT_TO_FILTCOND(&filtCond, port); @@ -479,16 +525,69 @@ uint32_t tcp_send(cbd d, const uint8_t *data, uint32_t size) { return 0; } + // acquire TCP state TcpState *tcps = TCP_FETCH_STATE_FROM_CONNBLOCK(&connBlock); - if (tcps->connState == TCP_STATE_ESTAB) { // can send only data if connection is ESTABLISHED - uint32_t maxWinSize = tcpw_get_max_window_size(tcps->txWin); - uint32_t txSize = MIN(tcps->remoteWindow, MIN(maxWinSize, size)); // limit size to the allocatable size of the retransmit buffer - tcpw_store_segment(tcps->txWin, data, txSize, tcps->txSeqNum); // store segment for possible retransmission - tcp_send_segment(&connBlock, TCP_FLAG_ACK, NULL, data, txSize); - return txSize; - } else { + + // can send only data if connection is ESTABLISHED + if (tcps->connState != TCP_STATE_ESTAB) { return 0; } + + // connection state is established... + + uint32_t sizeLeft = size; + + // indicate that transmission is in progress + ETHLIB_OS_SEM_WAIT(&tcps->txInProgress); + + // acquire resources + while ((sizeLeft > 0) && (tcps->connState == TCP_STATE_ESTAB)) { + ETHLIB_OS_SEM_WAIT(&tcps->txBufNotFull); + + // TODO: lehet, hogy ide kell egy mutex.... + // TODO: lezáráskor jelezni kell, hogy még egy blokkoló hívásban vagyunk + + uint32_t freeSize = tcpw_get_max_window_size(tcps->txWin); + + // check that data fits the transmit window + if (freeSize > 0) { + + // store segment for possible retransmission + uint32_t txSize = tcpw_store(tcps->txWin, data, sizeLeft, tcps->txSeqNum); + + // send segment with data (no options) + tcp_send_segment(&connBlock, TCP_FLAG_ACK, NULL, data, txSize); + + // decrement sizeLeft with the amount of transferred data + sizeLeft -= txSize; + + // increment data pointer + data += txSize; + + // release semaphore if there's free space remaining TODO: ide szinte biztos, hogy kell mutex + freeSize = tcpw_get_max_window_size(tcps->txWin); + if (freeSize > 0) { + ETHLIB_OS_SEM_POST(&tcps->txBufNotFull); + } + } + } + + ETHLIB_OS_SEM_POST(&tcps->txInProgress); + + return size - sizeLeft; +} + +uint32_t tcp_recv(unsigned char d, uint8_t *data, uint32_t size) { + ConnBlock connBlock; + if (!cbdt_get_connection_block(E.cbdt, d, &connBlock)) { + ERROR("Invalid CBD descriptor: '%d'!\n", d); + return 0; + } + + // acquire TCP state + TcpState *tcps = TCP_FETCH_STATE_FROM_CONNBLOCK(&connBlock); + + return bfifo_pop(tcps->rxFifo, data, size); } void tcp_print_report(const ConnBlock *connBlock) { @@ -499,4 +598,4 @@ void tcp_print_report(const ConnBlock *connBlock) { } else { INFO("TCP :%d -- %u.%u.%u.%u:%d - %s", tcps->localPort, EXPLODE_IPv4(tcps->remoteAddr), tcps->remotePort, TCP_STATE_NAMES[tcps->connState]); } -} \ No newline at end of file +} diff --git a/prefab/conn_blocks/tcp_connblock.h b/prefab/conn_blocks/tcp_connblock.h index 126deaf..37d0677 100644 --- a/prefab/conn_blocks/tcp_connblock.h +++ b/prefab/conn_blocks/tcp_connblock.h @@ -2,9 +2,9 @@ #define ETHERLIB_TCP_CONNBLOCK_H #define TCP_EXTRACT_FILTCOND(fc) uint16_t local_port = ((fc)->uw[4]); uint16_t remote_port = ((fc)->uw[5]); ip4_addr remote_addr = ((fc)->u[0]) -#define TCP_LOCAL_PORT_TO_FILTCOND(fc,local_port) ((fc)->uw[4]) = (local_port) -#define TCP_REMOTE_PORT_TO_FILTCOND(fc,remote_port) ((fc)->uw[5]) = (remote_port) -#define TCP_REMOTE_ADDR_TO_FILTCOND(fc,remote_addr) ((fc)->u[0] = (remote_addr)) +#define TCP_LOCAL_PORT_TO_FILTCOND(fc, local_port) ((fc)->uw[4]) = (local_port) +#define TCP_REMOTE_PORT_TO_FILTCOND(fc, remote_port) ((fc)->uw[5]) = (remote_port) +#define TCP_REMOTE_ADDR_TO_FILTCOND(fc, remote_addr) ((fc)->u[0] = (remote_addr)) #include #include "../packet_parsers/ipv4_types.h" @@ -12,6 +12,8 @@ #include "../packet_parsers/tcp_segment.h" #include "../../cbd_table.h" #include "../conn_blocks/tcp/tcp_window.h" +#include "../../blocking_fifo.h" +#include struct EthInterface_; @@ -47,6 +49,9 @@ typedef enum { TCP_STATE_TIME_WAIT } TcpConnectionState; +typedef int (*StreamCallBackFn)(cbd d); +typedef int (*TcpAcceptCbFn)(cbd d); + /** * TCP state. */ @@ -65,6 +70,7 @@ typedef struct { uint16_t localWindow; ///< Maximum number of bytes we are willing (able) to accept uint16_t remoteWindow; ///< Maximum number of bytes the peer is willing (able) to accept TcpWindow *txWin; ///< Transmit window OBJECT + uint32_t lastTxSeqNumAcked; ///< Last txSeqNum that has received an ACK (used for differentiation between ACK and Keep-Alive segment) // --------------- @@ -73,6 +79,10 @@ typedef struct { // --------------- + bool8_t isServer; ///< this connection is a server connection TODO: nem a legjobb elnevezés + + // --------------- + uint8_t retryLimit; ///< Maximum number of retransmission reties uint8_t retries; ///< Current attempts number uint8_t retryTO; ///< Retry timeout @@ -80,9 +90,25 @@ typedef struct { // --------------- bool8_t debug; ///< Turns on/off debug mode - SieveCallBackFn userCb; ///< User callback function + + // --------------- + + StreamCallBackFn streamCb; ///< Callback invoked on stream actions + TcpAcceptCbFn acceptCb; ///< Callback invoked on detecting an incoming connection + + // --------------- + + ETHLIB_OS_SEM_TYPE txBufNotFull; ///< Semaphore protecting the transmit window + ETHLIB_OS_SEM_TYPE txInProgress; ///< Transmission is in progress, transmit thread is blocked + + // --------------- + + BlockingFifo * rxFifo; ///< Receive FIFO + } TcpState; +#define TCP_FETCH_STATE_FROM_CONNBLOCK(connBlock) ((TcpState *) (connBlock)->sieveLayer->tag.p) + /** * Create new TCP connection block * @param intf associated Ethernet interface @@ -91,7 +117,7 @@ typedef struct { * @param cbFn receive callback function * @return TCP connection block */ -cbd tcp_new_connblock(struct EthInterface_ *intf, ip4_addr ipAddr, uint16_t port, SieveCallBackFn cbFn); +cbd tcp_new_connblock(struct EthInterface_ *intf, ip4_addr ipAddr, uint16_t port, StreamCallBackFn cbFn); /** * Bind TCP connection to remote socket. @@ -107,6 +133,13 @@ void tcp_bind(cbd d, ip4_addr remoteAddr, uint16_t remotePort); */ void tcp_listen(cbd d); +/** + * Set callback invoked on detecting an incoming connection on a listening conncetion. + * @param d cbd of the listening connection + * @param acb function pointer of the callback + */ +void tcp_set_accept_callback(cbd d, TcpAcceptCbFn acb); + /** * Send data over an existing TCP connection. * @param d pointer to existing connblock @@ -114,7 +147,9 @@ void tcp_listen(cbd d); * @param size data size in bytes * @return number of bytes sent */ -uint32_t tcp_send(unsigned char d, const uint8_t * data, uint32_t size); +uint32_t tcp_send(unsigned char d, const uint8_t *data, uint32_t size); + +uint32_t tcp_recv(unsigned char d, uint8_t * data, uint32_t size); /** * Turn TCP debugging ON/OFF @@ -127,7 +162,7 @@ void tcp_debug(cbd d, bool debug); * Print TCP connblock report. * @param connBlock TCP connblock */ -void tcp_print_report(const ConnBlock* connBlock); +void tcp_print_report(const ConnBlock *connBlock); //int tcp_send_segment(const struct ConnBlock_ *connBlock, TcpFlag flags, TcpOption * opts, const uint8_t *data, uint32_t size); diff --git a/prefab/packet_parsers/ipv4_packet.c b/prefab/packet_parsers/ipv4_packet.c index 217d7a9..b32a135 100644 --- a/prefab/packet_parsers/ipv4_packet.c +++ b/prefab/packet_parsers/ipv4_packet.c @@ -54,6 +54,13 @@ int parse_ipv4(const uint8_t *hdr, uint32_t size, PcktHeaderElement *pcktHdrLe, arpc_learn(intf->arpc, &entry); // learn new assignment } + // check if packet has padding at the end, then strip it + int ret = 0; + if (size > ipProps->TotalLength) { + pb->i = (int32_t)size - (int32_t)ipProps->TotalLength; // store the stripping length + ret |= PROC_FN_FLAG_STRIP_PAD; + } + // check whether it is a fragmented packet bool isAFragment = (ipProps->FragmentOffset != 0) || (ipProps->Flags & IP_FLAG_MF); if (isAFragment) { @@ -66,14 +73,17 @@ int parse_ipv4(const uint8_t *hdr, uint32_t size, PcktHeaderElement *pcktHdrLe, pb->p = raPload; pb->u = raSize; pb->b = true; // MRD! - return PROC_FN_RET_REPRST; // replace buffer and restart processing (with the assembled packet) + ret |= PROC_FN_RET_REPRST; + return ret; // replace buffer and restart processing (with the assembled packet) } else { ipProps->containedPacketClass = 0; // no contained packet if not assembled yet - return PROC_FN_RET_ABORT; // stop further processing + ret |= PROC_FN_RET_ABORT; + return ret; // stop further processing } } - return ipProps->validityOK ? PROC_FN_RET_OK : PROC_FN_RET_ABORT; + ret |= ipProps->validityOK ? PROC_FN_RET_OK : PROC_FN_RET_ABORT; + return ret; } static uint16_t nextIpIdentification = 0; diff --git a/tcp_connection_manager.c b/tcp_connection_manager.c deleted file mode 100644 index ea48189..0000000 --- a/tcp_connection_manager.c +++ /dev/null @@ -1,5 +0,0 @@ -#include "tcp_connection_manager.h" - -void tcpconmgr_add_connection(TcpConMgrCtrl * mgr) { - -} diff --git a/tcp_connection_manager.h b/tcp_connection_manager.h deleted file mode 100644 index a1c00c8..0000000 --- a/tcp_connection_manager.h +++ /dev/null @@ -1,19 +0,0 @@ -#ifndef CORE_ETHERLIB_TCP_CONNECTION_MANAGER -#define CORE_ETHERLIB_TCP_CONNECTION_MANAGER - -#include - -#include - -#include "cbd_table.h" - -typedef struct { - uint16_t activeConns; ///< Number of active connections - ConnBlock * cb[] -} TcpConMgrCtrl; - -void tcpconmgr_new(); - -void tcpconmgr_add_connection(TcpConMgrCtrl * mgr); - -#endif /* CORE_ETHERLIB_TCP_CONNECTION_MANAGER */