- BlockingFifo implemented

- get_interface_by_address() added
- HTTP server initials added
- code added to strip away padding on processing the IP layer by shrinking overall packet size
- load of TCP fixes and improvements
- TCP stream interface added
- TCP window destroy() added and some bugs fixed
This commit is contained in:
Wiesner András 2023-11-22 20:55:50 +01:00
parent 038678a597
commit 8a5c800fd3
15 changed files with 447 additions and 74 deletions

28
apps/http_server.c Normal file
View File

@ -0,0 +1,28 @@
//
// Created by epagris on 2023.11.22..
//
#include <stddef.h>
#include "http_server.h"
#include "../dynmem.h"
#include "../utils.h"
#include "../prefab/conn_blocks/tcp_connblock.h"
#include "../global_state.h"
HttpServer *http_new(ip4_addr addr, uint16_t port) {
// acquire interface
EthInterface * intf = get_interface_by_address(addr);
if (intf == NULL) {
ERROR("Unknown interface!\n");
return NULL;
}
// allocate instance
HttpServer * http = (HttpServer *) dynmem_alloc(sizeof(HttpServer));
ASSERT_NULL(http);
// open server connection
//http->serverConn = tcp_new_connblock(intf, addr, port, );
return http;
}

19
apps/http_server.h Normal file
View File

@ -0,0 +1,19 @@
#ifndef ETHERLIB_TEST_HTTP_SERVER_H
#define ETHERLIB_TEST_HTTP_SERVER_H
#include "../cbd_table.h"
#include "etherlib/prefab/packet_parsers/ipv4_types.h"
typedef struct {
cbd serverConn; ///< Server connection
} HttpServer;
/**
* Create a new HTTP-server
* @param addr server bind address
* @param port server port
* @return pointer to newly allocated Http server OR NULL on failure
*/
HttpServer * http_new(ip4_addr addr, uint16_t port);
#endif //ETHERLIB_TEST_HTTP_SERVER_H

124
blocking_fifo.c Normal file
View File

@ -0,0 +1,124 @@
//
// Created by epagris on 2023.11.22..
//
#include <memory.h>
#include "blocking_fifo.h"
#include "dynmem.h"
#include "utils.h"
#define MIN(a,b) (((a) < (b)) ? (a) : (b))
BlockingFifo *bfifo_new(uint32_t size) {
uint32_t allocSize = sizeof(BlockingFifo) + size; // calculate full allocation size
BlockingFifo * fifo = (BlockingFifo *) dynmem_alloc(allocSize); // allocate data block at the end
ASSERT_NULL(fifo);
memset(fifo, 0, sizeof(BlockingFifo));
fifo->size = size;
ETHLIB_OS_SEM_CREATE(&fifo->nonEmpty, 1);
ETHLIB_OS_SEM_WAIT(&fifo->nonEmpty); // FIFO is empty
ETHLIB_OS_SEM_CREATE(&fifo->notFull, 1);
return fifo;
}
void bfifo_destroy(BlockingFifo * bfifo) {
ETHLIB_OS_SEM_DESTROY(&bfifo->nonEmpty);
ETHLIB_OS_SEM_DESTROY(&bfifo->notFull);
bfifo->size = 0;
dynmem_free(bfifo);
}
uint32_t bfifo_get_used(const BlockingFifo *bfifo) {
return bfifo->absWriteIdx - bfifo->absReadIdx;
}
uint32_t bfifo_get_free(const BlockingFifo * bfifo) {
return bfifo->size - bfifo_get_used(bfifo);
}
uint32_t bfifo_push_try(BlockingFifo * bfifo, const uint8_t * data, uint32_t size) {
if (bfifo_get_free(bfifo) == 0) {
return 0;
} else {
return bfifo_push(bfifo, data, size);
}
}
uint32_t bfifo_push(BlockingFifo * bfifo, const uint8_t * data, uint32_t size) {
ETHLIB_OS_SEM_WAIT(&bfifo->notFull); // take not full semaphore
// calculate copy size, limit if required
uint32_t freeArea = bfifo_get_free(bfifo);
uint32_t copySize = MIN(size, freeArea);
// determine first block size
uint32_t spaceToBufEnd = bfifo->size - bfifo->writeIdx; // space to buffer end
uint32_t firstBlockSize = MIN(spaceToBufEnd, copySize); // first block size
uint32_t secondBlockSize = (firstBlockSize < copySize) ? copySize - firstBlockSize : 0; // second block size
memcpy(bfifo->data + bfifo->writeIdx, data, firstBlockSize); // copy first block
if (secondBlockSize > 0) { // copy second block if needed
memcpy(bfifo->data, data + firstBlockSize, secondBlockSize);
}
// adjust indices
uint32_t newWriteIdx = bfifo->writeIdx + copySize; // advance write index
if (newWriteIdx >= bfifo->size) {
newWriteIdx -= bfifo->size;
}
bfifo->writeIdx = newWriteIdx; // replace...
bfifo->absWriteIdx += copySize;
freeArea = bfifo_get_free(bfifo);
if (freeArea > 0) { // FIFO is not full
ETHLIB_OS_SEM_POST(&bfifo->notFull);
}
if (freeArea != bfifo->size) { // FIFO is not empty
ETHLIB_OS_SEM_POST(&bfifo->nonEmpty);
}
return copySize;
}
uint32_t bfifo_peek(BlockingFifo * bfifo, uint8_t * dst, uint32_t size) {
// determine copy size
uint32_t usedArea = bfifo_get_used(bfifo);
uint32_t copySize = MIN(size, usedArea);
uint32_t spaceToBufEnd = bfifo->size - bfifo->readIdx; // space to buffer end
uint32_t firstBlockSize = MIN(spaceToBufEnd, copySize); // first block size
uint32_t secondBlockSize = (firstBlockSize < copySize) ? copySize - firstBlockSize : 0; // second block size
memcpy(dst, bfifo->data + bfifo->readIdx, firstBlockSize); // copy first block
if (secondBlockSize > 0) { // copy second block if needed
memcpy(dst + firstBlockSize, bfifo->data, secondBlockSize);
}
return copySize;
}
uint32_t bfifo_pop(BlockingFifo *bfifo, uint8_t *dst, uint32_t size) {
ETHLIB_OS_SEM_WAIT(&bfifo->nonEmpty);
// if destination is given, then also perform a copy
if (dst != NULL) {
bfifo_peek(bfifo, dst, size);
}
// adjust indices
uint32_t usedArea = bfifo_get_used(bfifo);
uint32_t popSize = MIN(size, usedArea); // advance next written
uint32_t newReadIdx = bfifo->readIdx + popSize;
if (newReadIdx >= bfifo->size) {
newReadIdx -= bfifo->size;
}
bfifo->readIdx = newReadIdx; // replace it instead of performing an in-place change
bfifo->absReadIdx += popSize;
usedArea = bfifo_get_used(bfifo);
if (usedArea > 0) { // FIFO is NOT empty
ETHLIB_OS_SEM_POST(&bfifo->nonEmpty);
}
if (usedArea != bfifo->size) { // FIFO is NOT full
ETHLIB_OS_SEM_POST(&bfifo->notFull);
}
return popSize;
}

45
blocking_fifo.h Normal file
View File

@ -0,0 +1,45 @@
//
// Created by epagris on 2023.11.22..
//
#ifndef ETHERLIB_TEST_BLOCKING_FIFO_H
#define ETHERLIB_TEST_BLOCKING_FIFO_H
#include <stdint.h>
#include <etherlib_options.h>
/**
* Blocking FIFO
*/
typedef struct {
uint32_t readIdx; ///< Read index
uint32_t absReadIdx; ///< Absolute read index
uint32_t writeIdx; ///< Write index
uint32_t absWriteIdx; ///< Absolute write index
uint32_t size; ///< Size of the storage area
ETHLIB_OS_SEM_TYPE notFull, nonEmpty; ///< Semaphore, protecting read and write operations
uint8_t data[]; ///< Window data storage
} BlockingFifo;
/**
* Create new blocking FIFO.
* @param size size of the FIFO in bytes
* @return newly allocated FIFO
*/
BlockingFifo * bfifo_new(uint32_t size);
void bfifo_destroy(BlockingFifo * bfifo);
uint32_t bfifo_get_used(const BlockingFifo * bfifo);
uint32_t bfifo_get_free(const BlockingFifo * bfifo);
uint32_t bfifo_push(BlockingFifo * bfifo, const uint8_t * data, uint32_t size);
uint32_t bfifo_push_try(BlockingFifo * bfifo, const uint8_t * data, uint32_t size);
uint32_t bfifo_peek(BlockingFifo * bfifo, uint8_t * dst, uint32_t size);
uint32_t bfifo_pop(BlockingFifo *bfifo, uint8_t *dst, uint32_t size);
#endif //ETHERLIB_TEST_BLOCKING_FIFO_H

View File

@ -96,3 +96,7 @@ void close_connection(cbd d) {
EthInterface *get_default_interface() { EthInterface *get_default_interface() {
return E.ethIntf; return E.ethIntf;
} }
EthInterface * get_interface_by_address(ip4_addr addr) {
return get_default_interface();
}

View File

@ -41,4 +41,11 @@ void close_connection(cbd d);
*/ */
EthInterface * get_default_interface(); EthInterface * get_default_interface();
/**
* Get interface by address.
* @param addr IP address of a specific interface
* @return pointer to interface OR NULL if not found
*/
EthInterface * get_interface_by_address(ip4_addr addr);
#endif //ETHERLIB_GLOBAL_STATE_H #endif //ETHERLIB_GLOBAL_STATE_H

View File

@ -46,12 +46,18 @@ typedef struct {
// special processing function returns // special processing function returns
#define PROC_FN_RET_OK (0) // OK, go ahead! #define PROC_FN_RET_OK (0) // OK, go ahead!
#define PROC_FN_RET_ABORT (-1) // abort further processing #define PROC_FN_RET_ABORT (1) // abort further processing
#define PROC_FN_RET_REPRST (-2) // replace packet and restart packet processing #define PROC_FN_RET_REPRST (2) // replace packet and restart packet processing
#define PROC_FN_FLAG_STRIP_PAD (1 << 16) // Remove padding from the end of the packet
#define MASK_PROC_FN_RET(a) ((a) & 0xFFFF)
#define MASK_PROC_FN_FLAGS(a) ((a) & 0xFFFF0000)
typedef struct { typedef struct {
void * p; void * p;
uint32_t u; uint32_t u;
int32_t i;
bool8_t b; bool8_t b;
} PcktProcFnPassbackData; } PcktProcFnPassbackData;

View File

@ -86,7 +86,15 @@ void packsieve_input(PcktSieve *sieve, const RawPckt *rawPckt) {
// [3.] call parsing function // [3.] call parsing function
PcktProcFnPassbackData pb; PcktProcFnPassbackData pb;
procRet = cdesc->procFun(data + offset, size - offset, header, sieve->intf, &pb); procRet = cdesc->procFun(data + offset, size - offset, header, sieve->intf, &pb);
switch (procRet) { // execute further action based on the return value of the parsing function
// process flags
uint32_t flags = MASK_PROC_FN_FLAGS(procRet);
if (flags & PROC_FN_FLAG_STRIP_PAD) { // strip padding
size -= pb.i;
}
// act according to return value
switch (MASK_PROC_FN_RET(procRet)) { // execute further action based on the return value of the parsing function
case PROC_FN_RET_REPRST: // replace packet and restart processing case PROC_FN_RET_REPRST: // replace packet and restart processing
dynmem_free(data); // release previous packet data dynmem_free(data); // release previous packet data
data = pb.p; // store new packet data passed back in pb data = pb.p; // store new packet data passed back in pb

View File

@ -17,11 +17,15 @@ TcpWindow *tcpw_create(uint32_t size) {
return tcpw; return tcpw;
} }
void tcpw_set_seqnum_offset(TcpWindow * tcpw, uint32_t offset) { void tcpw_destroy(TcpWindow * tcpw) {
tcpw->firstByteOffset = offset; dynmem_free(tcpw);
} }
#define TCP_WINDOW_MAX_WINSIZE_PADDING (16) // keep-out zone, this way allocated blocks will not block registry table growth TODO: not the best solution void tcpw_set_seqnum_offset(TcpWindow * tcpw, uint32_t offset) {
tcpw->firstByteOffset = offset;
tcpw->nextWritten = offset;
tcpw->firstUnACK = offset;
}
static inline uint32_t tcpw_get_occupied_size(TcpWindow * tcpw) { static inline uint32_t tcpw_get_occupied_size(TcpWindow * tcpw) {
return tcpw->nextWritten - tcpw->firstUnACK; return tcpw->nextWritten - tcpw->firstUnACK;
@ -31,7 +35,7 @@ uint32_t tcpw_get_max_window_size(TcpWindow * tcpw) {
return tcpw->size - tcpw_get_occupied_size(tcpw); return tcpw->size - tcpw_get_occupied_size(tcpw);
} }
uint32_t tcpw_store_segment(TcpWindow * tcpw, const uint8_t * data, uint32_t size, uint32_t seqNum) { uint32_t tcpw_store(TcpWindow * tcpw, const uint8_t * data, uint32_t size, uint32_t seqNum) {
// check that seqNum is the continuation of the previous content // check that seqNum is the continuation of the previous content
if (seqNum != (tcpw->nextWritten)) { // if not, then cannot store if (seqNum != (tcpw->nextWritten)) { // if not, then cannot store
return 0; return 0;

View File

@ -2,6 +2,8 @@
#define ETHERLIB_TCP_WINDOW_H #define ETHERLIB_TCP_WINDOW_H
#include <stdint.h> #include <stdint.h>
#include <etherlib_options.h>
//#include "../../../memory_pool.h" //#include "../../../memory_pool.h"
/** /**
@ -22,6 +24,12 @@ typedef struct {
*/ */
TcpWindow * tcpw_create(uint32_t size); TcpWindow * tcpw_create(uint32_t size);
/**
* Destruct TCP Window object.
* @param tcpw pointer to previously allocated TCP Window object
*/
void tcpw_destroy(TcpWindow * tcpw);
/** /**
* Set sequence number offset * Set sequence number offset
* @param tcpw * @param tcpw
@ -42,9 +50,9 @@ uint32_t tcpw_get_max_window_size(TcpWindow * tcpw);
* @param data data to store * @param data data to store
* @param size data size in bytes * @param size data size in bytes
* @param seqNum sequence number of first octet of the block * @param seqNum sequence number of first octet of the block
* @return pointer to TcpWindowSegment allocated in the window OR NULL on failure * @return number of stored bytes
*/ */
uint32_t tcpw_store_segment(TcpWindow * tcpw, const uint8_t * data, uint32_t size, uint32_t seqNum); uint32_t tcpw_store(TcpWindow * tcpw, const uint8_t * data, uint32_t size, uint32_t seqNum);
/** /**
* Acknowledge segment. * Acknowledge segment.

View File

@ -19,10 +19,11 @@
// TODO: - ne próbáljunk két kapcsolatot nyitni ugyanarra a portra... // TODO: - ne próbáljunk két kapcsolatot nyitni ugyanarra a portra...
// TODO: - ez az "ok" változó nem túl jó... // TODO: - ez az "ok" változó nem túl jó...
// TODO: - retry connection nincs kész // TODO: - retry connection nincs kész
// TODO: - server socket egy csomó erőforrást felszabadíthat
// ---------- // ----------
static inline cbd tcp_new_connblock_filtcond(EthInterface *intf, ip4_addr ipAddr, uint16_t port, SieveCallBackFn cbFn, static inline cbd tcp_new_connblock_filtcond(EthInterface *intf, ip4_addr ipAddr, uint16_t port, StreamCallBackFn cbFn,
const PcktSieveFilterCondition *filtCond); const PcktSieveFilterCondition *filtCond);
// ---------- // ----------
@ -48,19 +49,34 @@ static bool filtTcp(const PcktSieveFilterCondition *filtCond, const PcktProps *c
return ipProps->Protocol == ETH_TCP_PACKET_CLASS && references_us; return ipProps->Protocol == ETH_TCP_PACKET_CLASS && references_us;
} }
static inline TcpState * tcps_create() { #define MAX_TCP_WINDOW_SIZE (1200)
TcpState * tcps = dynmem_alloc(sizeof(TcpState));
tcps->txWin = tcpw_create(ETHLIB_DEF_TCP_WINDOW_SIZE); static inline TcpState *tcps_create() {
TcpState *tcps = dynmem_alloc(sizeof(TcpState));
uint32_t winSize = MAX(ETHLIB_DEF_TCP_WINDOW_SIZE, MAX_TCP_WINDOW_SIZE);
tcps->txWin = tcpw_create(winSize); // create transmit window
ETHLIB_OS_SEM_CREATE(&tcps->txBufNotFull, 1); // create transmit buffer not full semaphore
ETHLIB_OS_SEM_CREATE(&tcps->txInProgress, 1); // create transmission in progress semaphore
return tcps; return tcps;
} }
static inline void tcps_remove_and_cleanup(const TcpState * tcps) { static inline void tcps_release_client_related_objects(TcpState *tcps) {
tcpw_destroy(tcps->txWin);
ETHLIB_OS_SEM_DESTROY(&tcps->txBufNotFull);
ETHLIB_OS_SEM_DESTROY(&tcps->txInProgress);
bfifo_destroy(tcps->rxFifo);
}
static inline void tcps_remove_and_cleanup(TcpState *tcps) {
if (tcps == NULL) { if (tcps == NULL) {
return; return;
} }
dynmem_free(tcps->txWin); if (!tcps->isServer) {
tcps_release_client_related_objects(tcps);
}
dynmem_free(tcps); dynmem_free(tcps);
} }
@ -78,6 +94,7 @@ void tcps_init(TcpState *tcps, uint16_t localPort) {
tcps->txSeqNum = (uint32_t) rand(); tcps->txSeqNum = (uint32_t) rand();
tcps->rxAckNum = 0; tcps->rxAckNum = 0;
tcps->lastTxSeqNumAcked = tcps->txSeqNum;
tcps->localWindow = ETHLIB_DEF_TCP_WINDOW_SIZE; tcps->localWindow = ETHLIB_DEF_TCP_WINDOW_SIZE;
tcpw_set_seqnum_offset(tcps->txWin, tcps->txSeqNum); tcpw_set_seqnum_offset(tcps->txWin, tcps->txSeqNum);
@ -87,10 +104,12 @@ void tcps_init(TcpState *tcps, uint16_t localPort) {
tcps->retries = 0; tcps->retries = 0;
tcps->debug = false; tcps->debug = false;
tcps->userCb = NULL; tcps->streamCb = NULL;
} tcps->acceptCb = NULL;
tcps->isServer = false;
#define TCP_FETCH_STATE_FROM_CONNBLOCK(connBlock) ((TcpState *) (connBlock)->sieveLayer->tag.p) tcps->rxFifo = bfifo_new(ETHLIB_DEF_FIFO_SIZE);
}
void tcp_init_connection(ConnBlock *connBlock); void tcp_init_connection(ConnBlock *connBlock);
@ -109,7 +128,7 @@ static inline cbd tcps_create_based_on_listening(const TcpState *source, uint16_
// create connblock // create connblock
EthInterface *intf = source->connBlock.sieve->intf; // retrieve interface EthInterface *intf = source->connBlock.sieve->intf; // retrieve interface
cbd d = tcp_new_connblock_filtcond(intf, IPv4_IF_ADDR, source->localPort, source->userCb, &filtCond); cbd d = tcp_new_connblock_filtcond(intf, IPv4_IF_ADDR, source->localPort, source->streamCb, &filtCond);
if (!cbdt_get_connection_block(E.cbdt, d, connBlock)) { if (!cbdt_get_connection_block(E.cbdt, d, connBlock)) {
ERROR("Invalid CBD descriptor: '%d'!\n", d); ERROR("Invalid CBD descriptor: '%d'!\n", d);
return CBDT_ERR; return CBDT_ERR;
@ -175,6 +194,7 @@ int tcp_receive_segment_cb(const Pckt *pckt, PcktSieveLayerTag tag) {
// send ACK // send ACK
int flags = TCP_FLAG_ACK; int flags = TCP_FLAG_ACK;
tcps->txSeqNum++; // advance sequence number // TODO... tcps->txSeqNum++; // advance sequence number // TODO...
tcpw_set_seqnum_offset(tcps->txWin, tcps->txSeqNum);
tcp_send_segment(&tcps->connBlock, flags, NULL, NULL, 0); tcp_send_segment(&tcps->connBlock, flags, NULL, NULL, 0);
// save filter values // save filter values
@ -214,11 +234,16 @@ int tcp_receive_segment_cb(const Pckt *pckt, PcktSieveLayerTag tag) {
// create connection based on the listening connection // create connection based on the listening connection
ConnBlock connB; ConnBlock connB;
tcps_create_based_on_listening(tcps, tcpProps->SourcePort, ipProps->SourceIPAddr, &connB); cbd client_d = tcps_create_based_on_listening(tcps, tcpProps->SourcePort, ipProps->SourceIPAddr, &connB);
tcp_send_segment(&connB, flags, NULL, NULL, 0); tcp_send_segment(&connB, flags, NULL, NULL, 0);
TcpState *target = TCP_FETCH_STATE_FROM_CONNBLOCK(&connB); TcpState *target = TCP_FETCH_STATE_FROM_CONNBLOCK(&connB);
target->txSeqNum++; target->txSeqNum++;
tcpw_set_seqnum_offset(target->txWin, target->txSeqNum);
if ((client_d > 0) && (tcps->acceptCb != NULL)) {
tcps->acceptCb(client_d);
}
} }
} }
@ -237,24 +262,43 @@ int tcp_receive_segment_cb(const Pckt *pckt, PcktSieveLayerTag tag) {
// step into next state // step into next state
tcps->connState = TCP_STATE_LAST_ACK; tcps->connState = TCP_STATE_LAST_ACK;
} else if ((dataSize > 0) && (tcps->rxAckNum == tcpProps->SequenceNumber)) { // incoming data segment with integrity in ack/seq } else if ((dataSize > 0) && (tcps->rxAckNum == tcpProps->SequenceNumber)) { // incoming data segment with integrity in ack/seq
// try to store packet content into the FIFO
uint32_t pushSize = bfifo_push_try(tcps->rxFifo, pckt->payload, pckt->payloadSize);
// adjust receive window size
tcps->localWindow = bfifo_get_free(tcps->rxFifo);
// send acknowledge
tcps->rxAckNum += pushSize;
// send acknowledge // send acknowledge
tcps->rxAckNum += dataSize;
tcp_send_segment(&tcps->connBlock, TCP_FLAG_ACK, NULL, NULL, 0); tcp_send_segment(&tcps->connBlock, TCP_FLAG_ACK, NULL, NULL, 0);
// process data // process data
if (tcps->userCb != NULL) { if (tcps->streamCb != NULL) {
PcktSieveLayerTag userTag = {0}; // TODO... tcps->streamCb(tcps->d);
tcps->userCb(pckt, userTag);
} }
} else if ((dataSize == 0) && (tcps->txSeqNum == tcpProps->AcknowledgementNumber)) { // outgoing segment was acknowledged by peer } else if ((dataSize == 0) && (tcps->txSeqNum == tcpProps->AcknowledgementNumber)) { // it is a valid ACK segment
if (tcps->lastTxSeqNumAcked < tcps->txSeqNum) { // outgoing segment was acknowledged by peer
// release segment from retransmit window // release segment from retransmit window
tcpw_acknowledge(tcps->txWin, tcpProps->AcknowledgementNumber); tcpw_acknowledge(tcps->txWin, tcpProps->AcknowledgementNumber);
// release the transmit semaphore
ETHLIB_OS_SEM_POST(&tcps->txBufNotFull);
// store last TX sequence number transmitted in ACK
tcps->lastTxSeqNumAcked = tcps->txSeqNum;
} else if (tcps->lastTxSeqNumAcked == tcps->txSeqNum) { // Keep-Alive, respond with some ACK
tcp_send_segment(&tcps->connBlock, TCP_FLAG_ACK, NULL, NULL, 0);
}
} }
break; break;
case TCP_STATE_LAST_ACK: /* last ACK received */ case TCP_STATE_LAST_ACK: /* last ACK received */
if (tcpProps->Flags & TCP_FLAG_ACK) { if (tcpProps->Flags & TCP_FLAG_ACK) {
tcps->connState = TCP_STATE_CLOSED; tcps->connState = TCP_STATE_CLOSED;
ETHLIB_OS_SEM_POST(&tcps->txBufNotFull);
ETHLIB_OS_SEM_WAIT(&tcps->txInProgress);
ret = SIEVE_LAYER_REMOVE_THIS; // if peer closed the connection, remove sieve layer ret = SIEVE_LAYER_REMOVE_THIS; // if peer closed the connection, remove sieve layer
} }
break; break;
@ -326,8 +370,10 @@ void tcp_listen(cbd d) {
} }
TcpState *tcps = TCP_FETCH_STATE_FROM_CONNBLOCK(&connBlock); TcpState *tcps = TCP_FETCH_STATE_FROM_CONNBLOCK(&connBlock);
if ((tcps->userCb != NULL) && (tcps->connState == TCP_STATE_CLOSED)) { if ((tcps->streamCb != NULL) && (tcps->connState == TCP_STATE_CLOSED)) { // step into listen state
tcps->connState = TCP_STATE_LISTEN; tcps->connState = TCP_STATE_LISTEN;
tcps->isServer = true;
tcps_release_client_related_objects(tcps);
} }
} }
@ -344,7 +390,7 @@ void tcp_debug(cbd d, bool debug) {
// --------------------- // ---------------------
static inline cbd tcp_new_connblock_filtcond(EthInterface *intf, ip4_addr ipAddr, uint16_t port, SieveCallBackFn cbFn, const PcktSieveFilterCondition *filtCond) { static inline cbd tcp_new_connblock_filtcond(EthInterface *intf, ip4_addr ipAddr, uint16_t port, StreamCallBackFn cbFn, const PcktSieveFilterCondition *filtCond) {
ConnBlock tcpConnB; ConnBlock tcpConnB;
connb_init_defaults(&tcpConnB); connb_init_defaults(&tcpConnB);
@ -353,7 +399,7 @@ static inline cbd tcp_new_connblock_filtcond(EthInterface *intf, ip4_addr ipAddr
PcktSieveLayerTag tag; // store TCP state into sieve layer's tag PcktSieveLayerTag tag; // store TCP state into sieve layer's tag
TcpState *tcpState = tcps_create(); TcpState *tcpState = tcps_create();
tcps_init(tcpState, port); tcps_init(tcpState, port);
tcpState->userCb = cbFn; tcpState->streamCb = cbFn;
tag.p = tcpState; tag.p = tcpState;
tcpConnB.sieveLayer = packsieve_new_layer(ipConnB.sieveLayer, filtCond, false, filtTcp, tcp_receive_segment_cb, tag, ETH_TCP_PACKET_CLASS); tcpConnB.sieveLayer = packsieve_new_layer(ipConnB.sieveLayer, filtCond, false, filtTcp, tcp_receive_segment_cb, tag, ETH_TCP_PACKET_CLASS);
@ -372,7 +418,7 @@ static inline cbd tcp_new_connblock_filtcond(EthInterface *intf, ip4_addr ipAddr
return d; return d;
} }
cbd tcp_new_connblock(EthInterface *intf, ip4_addr ipAddr, uint16_t port, SieveCallBackFn cbFn) { cbd tcp_new_connblock(EthInterface *intf, ip4_addr ipAddr, uint16_t port, StreamCallBackFn cbFn) {
PcktSieveFilterCondition filtCond; PcktSieveFilterCondition filtCond;
packfiltcond_zero(&filtCond); packfiltcond_zero(&filtCond);
TCP_LOCAL_PORT_TO_FILTCOND(&filtCond, port); TCP_LOCAL_PORT_TO_FILTCOND(&filtCond, port);
@ -479,16 +525,69 @@ uint32_t tcp_send(cbd d, const uint8_t *data, uint32_t size) {
return 0; return 0;
} }
// acquire TCP state
TcpState *tcps = TCP_FETCH_STATE_FROM_CONNBLOCK(&connBlock); TcpState *tcps = TCP_FETCH_STATE_FROM_CONNBLOCK(&connBlock);
if (tcps->connState == TCP_STATE_ESTAB) { // can send only data if connection is ESTABLISHED
uint32_t maxWinSize = tcpw_get_max_window_size(tcps->txWin); // can send only data if connection is ESTABLISHED
uint32_t txSize = MIN(tcps->remoteWindow, MIN(maxWinSize, size)); // limit size to the allocatable size of the retransmit buffer if (tcps->connState != TCP_STATE_ESTAB) {
tcpw_store_segment(tcps->txWin, data, txSize, tcps->txSeqNum); // store segment for possible retransmission
tcp_send_segment(&connBlock, TCP_FLAG_ACK, NULL, data, txSize);
return txSize;
} else {
return 0; return 0;
} }
// connection state is established...
uint32_t sizeLeft = size;
// indicate that transmission is in progress
ETHLIB_OS_SEM_WAIT(&tcps->txInProgress);
// acquire resources
while ((sizeLeft > 0) && (tcps->connState == TCP_STATE_ESTAB)) {
ETHLIB_OS_SEM_WAIT(&tcps->txBufNotFull);
// TODO: lehet, hogy ide kell egy mutex....
// TODO: lezáráskor jelezni kell, hogy még egy blokkoló hívásban vagyunk
uint32_t freeSize = tcpw_get_max_window_size(tcps->txWin);
// check that data fits the transmit window
if (freeSize > 0) {
// store segment for possible retransmission
uint32_t txSize = tcpw_store(tcps->txWin, data, sizeLeft, tcps->txSeqNum);
// send segment with data (no options)
tcp_send_segment(&connBlock, TCP_FLAG_ACK, NULL, data, txSize);
// decrement sizeLeft with the amount of transferred data
sizeLeft -= txSize;
// increment data pointer
data += txSize;
// release semaphore if there's free space remaining TODO: ide szinte biztos, hogy kell mutex
freeSize = tcpw_get_max_window_size(tcps->txWin);
if (freeSize > 0) {
ETHLIB_OS_SEM_POST(&tcps->txBufNotFull);
}
}
}
ETHLIB_OS_SEM_POST(&tcps->txInProgress);
return size - sizeLeft;
}
uint32_t tcp_recv(unsigned char d, uint8_t *data, uint32_t size) {
ConnBlock connBlock;
if (!cbdt_get_connection_block(E.cbdt, d, &connBlock)) {
ERROR("Invalid CBD descriptor: '%d'!\n", d);
return 0;
}
// acquire TCP state
TcpState *tcps = TCP_FETCH_STATE_FROM_CONNBLOCK(&connBlock);
return bfifo_pop(tcps->rxFifo, data, size);
} }
void tcp_print_report(const ConnBlock *connBlock) { void tcp_print_report(const ConnBlock *connBlock) {

View File

@ -2,9 +2,9 @@
#define ETHERLIB_TCP_CONNBLOCK_H #define ETHERLIB_TCP_CONNBLOCK_H
#define TCP_EXTRACT_FILTCOND(fc) uint16_t local_port = ((fc)->uw[4]); uint16_t remote_port = ((fc)->uw[5]); ip4_addr remote_addr = ((fc)->u[0]) #define TCP_EXTRACT_FILTCOND(fc) uint16_t local_port = ((fc)->uw[4]); uint16_t remote_port = ((fc)->uw[5]); ip4_addr remote_addr = ((fc)->u[0])
#define TCP_LOCAL_PORT_TO_FILTCOND(fc,local_port) ((fc)->uw[4]) = (local_port) #define TCP_LOCAL_PORT_TO_FILTCOND(fc, local_port) ((fc)->uw[4]) = (local_port)
#define TCP_REMOTE_PORT_TO_FILTCOND(fc,remote_port) ((fc)->uw[5]) = (remote_port) #define TCP_REMOTE_PORT_TO_FILTCOND(fc, remote_port) ((fc)->uw[5]) = (remote_port)
#define TCP_REMOTE_ADDR_TO_FILTCOND(fc,remote_addr) ((fc)->u[0] = (remote_addr)) #define TCP_REMOTE_ADDR_TO_FILTCOND(fc, remote_addr) ((fc)->u[0] = (remote_addr))
#include <stdint.h> #include <stdint.h>
#include "../packet_parsers/ipv4_types.h" #include "../packet_parsers/ipv4_types.h"
@ -12,6 +12,8 @@
#include "../packet_parsers/tcp_segment.h" #include "../packet_parsers/tcp_segment.h"
#include "../../cbd_table.h" #include "../../cbd_table.h"
#include "../conn_blocks/tcp/tcp_window.h" #include "../conn_blocks/tcp/tcp_window.h"
#include "../../blocking_fifo.h"
#include <etherlib_options.h>
struct EthInterface_; struct EthInterface_;
@ -47,6 +49,9 @@ typedef enum {
TCP_STATE_TIME_WAIT TCP_STATE_TIME_WAIT
} TcpConnectionState; } TcpConnectionState;
typedef int (*StreamCallBackFn)(cbd d);
typedef int (*TcpAcceptCbFn)(cbd d);
/** /**
* TCP state. * TCP state.
*/ */
@ -65,6 +70,7 @@ typedef struct {
uint16_t localWindow; ///< Maximum number of bytes we are willing (able) to accept uint16_t localWindow; ///< Maximum number of bytes we are willing (able) to accept
uint16_t remoteWindow; ///< Maximum number of bytes the peer is willing (able) to accept uint16_t remoteWindow; ///< Maximum number of bytes the peer is willing (able) to accept
TcpWindow *txWin; ///< Transmit window OBJECT TcpWindow *txWin; ///< Transmit window OBJECT
uint32_t lastTxSeqNumAcked; ///< Last txSeqNum that has received an ACK (used for differentiation between ACK and Keep-Alive segment)
// --------------- // ---------------
@ -73,6 +79,10 @@ typedef struct {
// --------------- // ---------------
bool8_t isServer; ///< this connection is a server connection TODO: nem a legjobb elnevezés
// ---------------
uint8_t retryLimit; ///< Maximum number of retransmission reties uint8_t retryLimit; ///< Maximum number of retransmission reties
uint8_t retries; ///< Current attempts number uint8_t retries; ///< Current attempts number
uint8_t retryTO; ///< Retry timeout uint8_t retryTO; ///< Retry timeout
@ -80,9 +90,25 @@ typedef struct {
// --------------- // ---------------
bool8_t debug; ///< Turns on/off debug mode bool8_t debug; ///< Turns on/off debug mode
SieveCallBackFn userCb; ///< User callback function
// ---------------
StreamCallBackFn streamCb; ///< Callback invoked on stream actions
TcpAcceptCbFn acceptCb; ///< Callback invoked on detecting an incoming connection
// ---------------
ETHLIB_OS_SEM_TYPE txBufNotFull; ///< Semaphore protecting the transmit window
ETHLIB_OS_SEM_TYPE txInProgress; ///< Transmission is in progress, transmit thread is blocked
// ---------------
BlockingFifo * rxFifo; ///< Receive FIFO
} TcpState; } TcpState;
#define TCP_FETCH_STATE_FROM_CONNBLOCK(connBlock) ((TcpState *) (connBlock)->sieveLayer->tag.p)
/** /**
* Create new TCP connection block * Create new TCP connection block
* @param intf associated Ethernet interface * @param intf associated Ethernet interface
@ -91,7 +117,7 @@ typedef struct {
* @param cbFn receive callback function * @param cbFn receive callback function
* @return TCP connection block * @return TCP connection block
*/ */
cbd tcp_new_connblock(struct EthInterface_ *intf, ip4_addr ipAddr, uint16_t port, SieveCallBackFn cbFn); cbd tcp_new_connblock(struct EthInterface_ *intf, ip4_addr ipAddr, uint16_t port, StreamCallBackFn cbFn);
/** /**
* Bind TCP connection to remote socket. * Bind TCP connection to remote socket.
@ -107,6 +133,13 @@ void tcp_bind(cbd d, ip4_addr remoteAddr, uint16_t remotePort);
*/ */
void tcp_listen(cbd d); void tcp_listen(cbd d);
/**
* Set callback invoked on detecting an incoming connection on a listening conncetion.
* @param d cbd of the listening connection
* @param acb function pointer of the callback
*/
void tcp_set_accept_callback(cbd d, TcpAcceptCbFn acb);
/** /**
* Send data over an existing TCP connection. * Send data over an existing TCP connection.
* @param d pointer to existing connblock * @param d pointer to existing connblock
@ -114,7 +147,9 @@ void tcp_listen(cbd d);
* @param size data size in bytes * @param size data size in bytes
* @return number of bytes sent * @return number of bytes sent
*/ */
uint32_t tcp_send(unsigned char d, const uint8_t * data, uint32_t size); uint32_t tcp_send(unsigned char d, const uint8_t *data, uint32_t size);
uint32_t tcp_recv(unsigned char d, uint8_t * data, uint32_t size);
/** /**
* Turn TCP debugging ON/OFF * Turn TCP debugging ON/OFF
@ -127,7 +162,7 @@ void tcp_debug(cbd d, bool debug);
* Print TCP connblock report. * Print TCP connblock report.
* @param connBlock TCP connblock * @param connBlock TCP connblock
*/ */
void tcp_print_report(const ConnBlock* connBlock); void tcp_print_report(const ConnBlock *connBlock);
//int tcp_send_segment(const struct ConnBlock_ *connBlock, TcpFlag flags, TcpOption * opts, const uint8_t *data, uint32_t size); //int tcp_send_segment(const struct ConnBlock_ *connBlock, TcpFlag flags, TcpOption * opts, const uint8_t *data, uint32_t size);

View File

@ -54,6 +54,13 @@ int parse_ipv4(const uint8_t *hdr, uint32_t size, PcktHeaderElement *pcktHdrLe,
arpc_learn(intf->arpc, &entry); // learn new assignment arpc_learn(intf->arpc, &entry); // learn new assignment
} }
// check if packet has padding at the end, then strip it
int ret = 0;
if (size > ipProps->TotalLength) {
pb->i = (int32_t)size - (int32_t)ipProps->TotalLength; // store the stripping length
ret |= PROC_FN_FLAG_STRIP_PAD;
}
// check whether it is a fragmented packet // check whether it is a fragmented packet
bool isAFragment = (ipProps->FragmentOffset != 0) || (ipProps->Flags & IP_FLAG_MF); bool isAFragment = (ipProps->FragmentOffset != 0) || (ipProps->Flags & IP_FLAG_MF);
if (isAFragment) { if (isAFragment) {
@ -66,14 +73,17 @@ int parse_ipv4(const uint8_t *hdr, uint32_t size, PcktHeaderElement *pcktHdrLe,
pb->p = raPload; pb->p = raPload;
pb->u = raSize; pb->u = raSize;
pb->b = true; // MRD! pb->b = true; // MRD!
return PROC_FN_RET_REPRST; // replace buffer and restart processing (with the assembled packet) ret |= PROC_FN_RET_REPRST;
return ret; // replace buffer and restart processing (with the assembled packet)
} else { } else {
ipProps->containedPacketClass = 0; // no contained packet if not assembled yet ipProps->containedPacketClass = 0; // no contained packet if not assembled yet
return PROC_FN_RET_ABORT; // stop further processing ret |= PROC_FN_RET_ABORT;
return ret; // stop further processing
} }
} }
return ipProps->validityOK ? PROC_FN_RET_OK : PROC_FN_RET_ABORT; ret |= ipProps->validityOK ? PROC_FN_RET_OK : PROC_FN_RET_ABORT;
return ret;
} }
static uint16_t nextIpIdentification = 0; static uint16_t nextIpIdentification = 0;

View File

@ -1,5 +0,0 @@
#include "tcp_connection_manager.h"
void tcpconmgr_add_connection(TcpConMgrCtrl * mgr) {
}

View File

@ -1,19 +0,0 @@
#ifndef CORE_ETHERLIB_TCP_CONNECTION_MANAGER
#define CORE_ETHERLIB_TCP_CONNECTION_MANAGER
#include <stdint.h>
#include <etherlib_options.h>
#include "cbd_table.h"
typedef struct {
uint16_t activeConns; ///< Number of active connections
ConnBlock * cb[]
} TcpConMgrCtrl;
void tcpconmgr_new();
void tcpconmgr_add_connection(TcpConMgrCtrl * mgr);
#endif /* CORE_ETHERLIB_TCP_CONNECTION_MANAGER */