diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..5766b6e --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +*.bkup \ No newline at end of file diff --git a/CMakeLists.txt b/CMakeLists.txt index d832193..111347d 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1,13 +1,18 @@ -cmake_minimum_required(VERSION 3.20) +cmake_minimum_required(VERSION 3.16) -SET(APP_NAME wfr) +SET(APP_NAME wfr_app) project(${APP_NAME}) set(CMAKE_CXX_STANDARD 17) -add_executable(${APP_NAME} main.cpp src/ALSADevice.cpp src/ALSADevice.h src/MemoryPool.cpp src/MemoryPool.h utils/Semaphore.h utils/Semaphore.cpp src/SampleWriter.h src/Timestamp.h - #src/SampleReceiver.h - src/audio_types.h src/ServerBeacon.cpp src/ServerBeacon.h src/GUI.h src/globals.h src/Callback.h src/MultiStreamReceiver.cpp src/MultiStreamReceiver.h) +add_executable(${APP_NAME} + main.cpp + src/ALSADevice.cpp + src/ALSADevice.h + src/MemoryPool.cpp + src/MemoryPool.h + utils/Semaphore.h + utils/Semaphore.cpp) find_package(Threads REQUIRED) if (THREADS_FOUND) @@ -36,6 +41,12 @@ if (GTKMM_FOUND) target_link_libraries(${APP_NAME} ${GTKMM_LIBRARIES}) endif(GTKMM_FOUND) +add_dependencies(${APP_NAME} wfr) +target_link_libraries(${APP_NAME} wfr) + install(TARGETS ${APP_NAME} RUNTIME DESTINATION ${PROJECT_SOURCE_DIR}/runtime ARCHIVE DESTINATION ${PROJECT_SOURCE_DIR}/runtime) -set(CMAKE_CXX_FLAGS ${CMAKE_CXX_FLAGS} "-rdynamic") \ No newline at end of file +set(CMAKE_CXX_FLAGS ${CMAKE_CXX_FLAGS} "-rdynamic") + +add_subdirectory(libwfr) +add_subdirectory(python) \ No newline at end of file diff --git a/MATLAB/read_multistream_dir.m b/MATLAB/read_multistream_dir.m index 8588bc2..7e58380 100644 --- a/MATLAB/read_multistream_dir.m +++ b/MATLAB/read_multistream_dir.m @@ -2,14 +2,14 @@ function read_multistream_dir(d, prefix, plotrange, onlydata) % get directories containing samples hint_file_name = strcat('./runtime/', d, '/node_dir_hint.txt'); hint_file = fopen(hint_file_name); - node_dirs = textscan(hint_file, '%s'); + node_files = textscan(hint_file, '%s'); fclose(hint_file); - node_names = strrep(node_dirs{1,1}, prefix, ""); + node_names = strrep(node_files{1,1}, prefix, ""); - node_dirs = strcat('./runtime/', d, '/', node_dirs{1,1}); + node_files = strcat('./runtime/', d, '/', node_files{1,1}); - sync_datasets(node_dirs, node_names, plotrange, onlydata); + sync_datasets(node_files, node_names, plotrange, onlydata); - print(strcat('./runtime/', d, '/figure.svg'), '-dsvg'); + %print(strcat('./runtime/', d, '/figure.svg'), '-dsvg'); end \ No newline at end of file diff --git a/MATLAB/read_syncdata.m b/MATLAB/read_syncdata.m index 4fc769d..264d1f5 100644 --- a/MATLAB/read_syncdata.m +++ b/MATLAB/read_syncdata.m @@ -1,42 +1,46 @@ -function ds = read_syncdata(DATASET_DIR) +function ds = read_syncdata(data_files) % settings %DATASET_DIR = 'test_20221'; - TS_FILE = 'ts'; - SAMPLE_FILE_PREFIX = 'ch_'; - FILE_EXT = 'dat'; - CHANNELS = 2; + TS_FILE_EXT = 'ts'; + %SAMPLE_FILE_PREFIX = 'ch_'; + SAMPLE_FILE_EXT = 'dat'; + %CHANNELS = 2; - % computed values - FN_TS = strcat(DATASET_DIR, '/', TS_FILE, '.', FILE_EXT); + %for ch=0:1:(CHANNELS-1) + % FN_SAMPLE{ch+1} = strcat(data_files, '/', SAMPLE_FILE_PREFIX, num2str(ch), '.', SAMPLE_FILE_EXT); + %end - for ch=0:1:(CHANNELS-1) - FN_SAMPLE{ch+1} = strcat(DATASET_DIR, '/', SAMPLE_FILE_PREFIX, num2str(ch), '.', FILE_EXT); - end + [files_n,~] = size(data_files); + + s = []; + ts = []; + + for i=1:files_n + + data_file = data_files(i,:); + + % load timestamps + FN_TS = strcat(data_file, '.', TS_FILE_EXT); + f = fopen(FN_TS ,'r'); + d = fread(f,[2,Inf],'uint32'); + fclose(f); + % create fractional timestamp + d = d(1,:) + d(2,:) / 1E+09; + + ts = [ts; d]; - % load timestamps - f = fopen(FN_TS ,'r'); - ts = fread(f,[2,Inf],'uint32'); - fclose(f); + % get sample count + %sample_cnt = length(ts); - % take first element as t0 - %ts(1,:) = ts(1,:) - 1636036145; - - % create fractional timestamp - ts = ts(1,:) + ts(2,:) / 1E+09; - - % get sample count - sample_cnt = length(ts); - - % load samples - s = zeros(CHANNELS, sample_cnt); - for ch=1:CHANNELS - f = fopen(FN_SAMPLE{ch},'r'); + % load samples + FN_SAMPLE = strcat(data_file, '.', SAMPLE_FILE_EXT); + f = fopen(FN_SAMPLE,'r'); d = fread(f, [1,Inf],'int16'); - s(ch,:) = d; + s = [s; d]; fclose(f); end - + %figure(1) %plot(ts(1,1:100), s(1,1:100), 'x') %xlim([ts(1) ts(100)]) diff --git a/MATLAB/sync_datasets.m b/MATLAB/sync_datasets.m index 4b551d7..2e8648a 100644 --- a/MATLAB/sync_datasets.m +++ b/MATLAB/sync_datasets.m @@ -68,18 +68,18 @@ function sds = sync_datasets(dss, node_names, PLOTRANGE, ONLYDATA) grid on if (ONLYDATA) - xlabel("Idő [s]"); + %xlabel("Idő [s]"); + xlabel("Time [s]") end - ylabel("Feszültség [V]"); + %ylabel("Feszültség [V]"); + ylabel("Voltage [V]"); xlim([ts_ref(PLOTRANGE(1)) ts_ref(PLOTRANGE(end))]); legend_lines = {}; for i=1:length(node_names) - for ch=1:2 - legend_lines{2 * (i - 1) + ch, 1} = strcat(node_names{i,1}, " CH", num2str(ch)); - end + legend_lines{i,1} = node_names{i,1}; end legend(legend_lines); @@ -116,9 +116,12 @@ function sds = sync_datasets(dss, node_names, PLOTRANGE, ONLYDATA) end legend(legend_lines); - ylabel("Erősítéshibával kompenzált különbség") + %ylabel("Erősítéshibával kompenzált különbség") + ylabel("Gain error compensated difference") xlim([ts_ref(PLOTRANGE(1)) ts_ref(PLOTRANGE(end))]); + grid on + % plot timestamp errors subplot(3,1,3); @@ -133,8 +136,10 @@ function sds = sync_datasets(dss, node_names, PLOTRANGE, ONLYDATA) end grid on - xlabel("Idő [s]"); - ylabel("Időhiba [ns]"); + xlabel("Time [s]") + %xlabel("Idő [s]"); + ylabel("Time error [ns]") + %ylabel("Időhiba [ns]"); legend(legend_lines); xlim([ts_ref(PLOTRANGE(1)) ts_ref(PLOTRANGE(end))]); diff --git a/libwfr/CMakeLists.txt b/libwfr/CMakeLists.txt new file mode 100644 index 0000000..d784b8c --- /dev/null +++ b/libwfr/CMakeLists.txt @@ -0,0 +1,44 @@ +cmake_minimum_required(VERSION 3.16) +project(libwfr) + +set(CMAKE_CXX_STANDARD 20) + +# Threads +find_package(Threads REQUIRED) + +set(SOURCES + src/audio_types.h + src/Callback.h + src/MultiStreamReceiver.cpp + src/MultiStreamReceiver.h + src/SampleWriter.cpp + src/SampleWriter.h + src/ServerBeacon.cpp + src/ServerBeacon.h + src/Timestamp.h + src/Logger.cpp + src/Logger.h + src/AcquisitionFormat.cpp + src/AcquisitionFormat.h + src/ChannelBuffer.cpp + src/ChannelBuffer.h + src/MultiStreamProcessor.cpp + src/MultiStreamProcessor.h + src/MultiStreamToFile.cpp + src/MultiStreamToFile.h + src/ICreatable.h + src/MultiStreamOscilloscope.cpp + src/MultiStreamOscilloscope.h + src/Trigger.cpp + src/Trigger.h) + +### + +add_library(wfr ${SOURCES}) +target_link_libraries(wfr Threads::Threads) +target_include_directories(wfr PUBLIC ${PROJECT_SOURCE_DIR}/../utils) +install(TARGETS wfr + LIBRARY DESTINATION ${PROJECT_SOURCE_DIR}/bin + ARCHIVE DESTINATION ${PROJECT_SOURCE_DIR}/bin) + +set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -fPIC") \ No newline at end of file diff --git a/libwfr/src/AcquisitionFormat.cpp b/libwfr/src/AcquisitionFormat.cpp new file mode 100644 index 0000000..5d4024a --- /dev/null +++ b/libwfr/src/AcquisitionFormat.cpp @@ -0,0 +1,101 @@ +// +// Created by epagris on 2022.05.03.. +// + +#include +#include +#include +#include +#include +#include "AcquisitionFormat.h" +#include "Logger.h" + +AcquisitionFormat::AcquisitionFormat() { + memset(this, 0, sizeof(AcquisitionFormat)); // clear every field +} + + +AcquisitionFormat::AcquisitionFormat(const std::string &acqFormatStr) { + std::stringstream sstream(acqFormatStr); + + // fill the input + sstream >> sampling_rate_Hz; + sstream >> sample_size_b; + sstream >> channel_count; + sstream >> mch_samples_per_packet; + + // fill the rest... + fillDerivedFields(); +} + +AcquisitionFormat::AcquisitionFormat(size_t sampling_rate_Hz, size_t sample_size_b, size_t channel_count, size_t mch_samples_per_packet) { + // fill the input + this->sampling_rate_Hz = sampling_rate_Hz; + this->sample_size_b = sample_size_b; + this->channel_count = channel_count; + this->mch_samples_per_packet = mch_samples_per_packet; + + // fill the rest... + fillDerivedFields(); +} + +void AcquisitionFormat::fillDerivedFields() { + mSettingsValid = true; + + // validate sampling rate + std::set validSamplingRates = {8000, 8021, 32000, 44100, 48000, 88200, 96000}; + if (!validSamplingRates.contains(sampling_rate_Hz)) { + Logger::logLine("INVALID sampling rate of " + std::to_string(sampling_rate_Hz) + " Hz."); + mSettingsValid &= false; + } + + // validate sample size + struct SampleSizeStorageSizeAssignment { + size_t sample_size_b; + size_t storage_size_B; + }; + + std::vector validSampleSizes = { + {8, 1}, + {12, 2}, + {16, 2}, + {24, 4}, + {32, 4} + }; + + // look for storage size + sample_storage_size_B = 0; + for (const auto& s: validSampleSizes) { + if (s.sample_size_b == sample_size_b) { + sample_storage_size_B = s.storage_size_B; + break; + } + } + + if (sample_storage_size_B == 0) { + Logger::logLine("INVALID sample size of " + std::to_string(sample_size_b) + "bits."); + mSettingsValid &= false; + } + + // validate channel count + if (channel_count > 8) { + Logger::logLine("WARNING Channel count of " + std::to_string(channel_count) + " is likely too large."); + } + + // validate packet size + if ((sampling_rate_Hz % mch_samples_per_packet) != 0) { + Logger::logLine("INVALID multichannel sample count (now " + std::to_string(mch_samples_per_packet) + + "), this must be an integer divisor of sampling rate (" + std::to_string(sampling_rate_Hz) + " Hz)!"); + mSettingsValid &= false; + } + + distinct_samples_per_packet = mch_samples_per_packet * channel_count; + sample_data_size_per_packet_B = sample_storage_size_B * distinct_samples_per_packet; + single_channel_data_size_per_packet_B = mch_samples_per_packet * sample_storage_size_B; + all_channel_sample_size_B = channel_count * sample_storage_size_B; + packet_period_ns = 1E+09 * mch_samples_per_packet / (double) sampling_rate_Hz; +} + +bool AcquisitionFormat::isValid() const { + return mSettingsValid; +} diff --git a/libwfr/src/AcquisitionFormat.h b/libwfr/src/AcquisitionFormat.h new file mode 100644 index 0000000..0512631 --- /dev/null +++ b/libwfr/src/AcquisitionFormat.h @@ -0,0 +1,34 @@ +// +// Created by epagris on 2022.05.03.. +// + +#ifndef WFR_APP_ACQUISITIONFORMAT_H +#define WFR_APP_ACQUISITIONFORMAT_H + + +#include + +class AcquisitionFormat { +public: + size_t sampling_rate_Hz; // sampling rate + size_t sample_size_b; // sample size in bits + size_t sample_storage_size_B; // sample storage size in bytes + size_t channel_count; // number of channels + size_t mch_samples_per_packet; // number of all-channel samples per packet + size_t distinct_samples_per_packet; // number of samples summed across all channel + size_t sample_data_size_per_packet_B; // data size summed across the full packet + size_t single_channel_data_size_per_packet_B; // data size of a single channel in the packet + size_t all_channel_sample_size_B; // sample size of a full, all-channel sample + double packet_period_ns; // time period encapsulated by a full packet +private: + bool mSettingsValid; // indicate if current settings are valid + void fillDerivedFields(); // fill fields with values derived from the input data +public: + AcquisitionFormat(); // default contr. + explicit AcquisitionFormat(const std::string& acqFormatStr); // constr. from formatt string + AcquisitionFormat(size_t sampling_rate_Hz, size_t sample_size_b, size_t channel_count, size_t mch_samples_per_packet); // constr. from fields + bool isValid() const; // are the fields represent a valid set of settings +}; + + +#endif //WFR_APP_ACQUISITIONFORMAT_H diff --git a/src/Callback.h b/libwfr/src/Callback.h similarity index 100% rename from src/Callback.h rename to libwfr/src/Callback.h diff --git a/libwfr/src/ChannelBuffer.cpp b/libwfr/src/ChannelBuffer.cpp new file mode 100644 index 0000000..62a19f0 --- /dev/null +++ b/libwfr/src/ChannelBuffer.cpp @@ -0,0 +1,239 @@ +// +// Created by epagris on 2022.05.03.. +// + +#include +#include "ChannelBuffer.h" +#include "Logger.h" + +ChannelBuffer::ChannelBuffer(size_t blockSize, size_t blockCount, size_t elementSize) { + // check if blockSize is divisible by 4 + if ((blockSize % 4) != 0) { + throw std::runtime_error(std::string("Block size must be an integral multiple of 4! ") + __FUNCTION__); + } + + // fill-in parameters + mBlockSize = blockSize; + mBlockCount = blockCount; + mFullMemSize = blockSize * blockCount; + mElementSize = elementSize; + mElementsPerBlock = blockSize / elementSize; + + // allocate memory + mpMem = std::shared_ptr(new uint8_t[mFullMemSize]); // exception is thrown if this fails... + + // clear and fill-in memory blocks + clear(); + + // no reader is waiting + mReaderWaiting = false; +} + +void ChannelBuffer::store(const void *pSrc) { + mStoreFetch_mtx.lock(); // MUTEX!!! + + BlockDescriptor bd; + + if (mFreeBlocks.empty()) { // if there're no more free blocks to allocate, then drop the eldest one and shift the queue + // release block + bd = mOccupiedBlocks.front(); + mOccupiedBlocks.pop_front(); + + // log buffer overrun + //Logger::logLine("ChannelBuffer overrun!"); + + } else { // if there's at least a single free block + // acquire the free block + bd = mFreeBlocks.front(); + + // remove from the free queue + mFreeBlocks.pop_front(); + + // put into the queue of occupied blocks + //mOccupiedBlocks.push(bd); + } + + // store tag + mLastTag += mElementsPerBlock; + bd.tag = mLastTag; + + // push back onto the queue ("move to the end of the queue") + mOccupiedBlocks.push_back(bd); + + // copy content + void *pDest = bd.p; + memcpy(pDest, pSrc, mBlockSize); + + // handle waiting readers + if (mReaderWaiting) { + mReaderWaiting = false; + mStoreFetch_mtx.unlock(); + mReadSem.post(); + } else { + mStoreFetch_mtx.unlock(); + } +} + +uint64_t ChannelBuffer::fetch(void *pDst, ssize_t size) { + mStoreFetch_mtx.lock(); // MUTEX!!! + + // if there're no data to copy, then block + if (mOccupiedBlocks.empty()) { + mReaderWaiting = true; // signal that reader is waiting + mStoreFetch_mtx.unlock(); // release lock + mReadSem.wait(); // wait on data to become available + mStoreFetch_mtx.lock(); // re-lock mutex + } + + BlockDescriptor bd; + + // if there're data to copy + if (!mOccupiedBlocks.empty()) { + // get block + bd = mOccupiedBlocks.front(); + + void *pSrc = bd.p; + + // copy content to given memory area + if (size == 0) { + memcpy(pDst, pSrc, mBlockSize); + } else if (size > 0) { // copy from the beginning of the block + memcpy(pDst, pSrc, std::min((size_t) size, mBlockSize)); + } else if (size < 0) { // copy from the end of the block + size_t copySize = std::min((size_t) (labs(size)), mBlockSize); // determine copy size + uint8_t *pSrcStart = (uint8_t *) pSrc + mBlockSize - copySize; // determine beginning of area to copy + memcpy(pDst, pSrcStart, copySize); // copy + } + + // remove from the queue of occupied blocks + mOccupiedBlocks.pop_front(); + + // add to the queue of free blocks + mFreeBlocks.push_back(bd); + } else { + throw std::runtime_error("Error, this mustn't be hit!"); + } + + mStoreFetch_mtx.unlock(); // release semaphore + + return bd.tag; +} + +void ChannelBuffer::clear() { + std::unique_lock lock(mStoreFetch_mtx); // MUTEX! + + // fill-in free blocks + mOccupiedBlocks.clear(); + mFreeBlocks.clear(); + for (size_t i = 0; i < mBlockCount; i++) { + BlockDescriptor bd = {.p = mpMem.get() + i * mBlockSize, .tag = -1}; + mFreeBlocks.push_back(bd); + } +} + +std::shared_ptr ChannelBuffer::fullBuffer() const { + return mpMem; +} + +int64_t ChannelBuffer::getEldestTag() { + std::unique_lock lock(mStoreFetch_mtx); + + if (!mOccupiedBlocks.empty()) { + return mOccupiedBlocks.front().tag; + } else { + return -1; + } +} + +size_t ChannelBuffer::copyBetweenTags(void *pDst, int64_t start, int64_t end) { + std::unique_lock lock(mStoreFetch_mtx); + + if (mOccupiedBlocks.empty() || (end < start)) { + return 0; + } + + uint8_t * pDstByte = (uint8_t *) pDst; + + // get eldest and youngest tag + int64_t eldestTag = mOccupiedBlocks.front().tag; + int64_t youngestTag = mOccupiedBlocks.back().tag; + + // clip tags falling out of valid range + start = std::max(start, eldestTag); + end = std::min(end, youngestTag); + + // iterators + std::list::iterator startIter, endIter; + bool startIterFound = false, endIterFound = false; + + // find starting block + for (auto iter = mOccupiedBlocks.begin(); iter != mOccupiedBlocks.end() && (!startIterFound && !endIterFound); iter++) { + // select start iter + if ((iter->tag <= start) && ((iter->tag + mElementsPerBlock - 1) >= start)) { + startIter = iter; + startIterFound = true; + } + if ((iter->tag <= end) && ((iter->tag + mElementsPerBlock - 1) >= end)) { + endIter = iter; + endIterFound = true; + } + } + + uint8_t *pStart; + size_t sizeCopied = 0; + + // IF range resides in a single block + if (startIter == endIter) { + pStart = startIter->p + (start - startIter->tag) * mElementSize; + sizeCopied = end - start + 1; + memcpy(pDst, pStart, sizeCopied); + } else { // IF range spans across multiple blocks + auto iterAfterEnd = endIter; + iterAfterEnd++; + + size_t copySize = 0; + for (auto iter = startIter; iter != iterAfterEnd; iter++) { + if (iter == startIter) { // START + pStart = iter->p + (start - iter->tag) * mElementSize; + copySize = mBlockSize - (start - iter->tag) + 1; + memcpy(pDstByte + sizeCopied, pStart, copySize); + sizeCopied += copySize; + } else if (iter == endIter) { // END + pStart = iter->p; + copySize = (end - iter->tag) + 1; + memcpy(pDstByte + sizeCopied, pStart, copySize); + sizeCopied += copySize; + } else { // INBETWEEN + pStart = iter->p; + copySize = mBlockSize; + memcpy(pDstByte + sizeCopied, pStart, copySize); + sizeCopied += copySize; + } + } + } + + return sizeCopied; +} + +size_t ChannelBuffer::getFreeBlockCnt() { + std::unique_lock lock(mStoreFetch_mtx); + return mFreeBlocks.size(); +} + +int64_t ChannelBuffer::copyBlock(size_t n, void * pDst) { + std::unique_lock lock(mStoreFetch_mtx); + + // block not found + if ((n + 1) > mOccupiedBlocks.size()) { + return -1; + } + + // block found + auto iter = mOccupiedBlocks.begin(); + std::advance(iter, n); + + // copy block contents + memcpy(pDst, iter->p, mBlockSize); + + return iter->tag; +} diff --git a/libwfr/src/ChannelBuffer.h b/libwfr/src/ChannelBuffer.h new file mode 100644 index 0000000..8a5ffa5 --- /dev/null +++ b/libwfr/src/ChannelBuffer.h @@ -0,0 +1,53 @@ +// +// Created by epagris on 2022.05.03.. +// + +#ifndef WFR_APP_CHANNELBUFFER_H +#define WFR_APP_CHANNELBUFFER_H + + +#include +#include +#include +#include +#include +#include +#include + +/* + * Finite-length, automatically dropping channel buffer designed for exactly one reader + */ + +class ChannelBuffer { +public: + struct BlockDescriptor { + uint8_t * p; + int64_t tag; + }; +private: + size_t mBlockSize; // size of a block + size_t mBlockCount; // number of blocks + size_t mFullMemSize; // allocate memory block size derived from block size and block count + size_t mElementSize; // size of a single element + size_t mElementsPerBlock; // number of elements per block + uint64_t mLastTag; // last assigned tag + std::list mOccupiedBlocks; // occupied blocks + std::list mFreeBlocks; // free blocks + std::shared_ptr mpMem; // memory holding buffers (free and occupied also) + std::mutex mStoreFetch_mtx; // mutex for concurrent fetching and storing + bool mReaderWaiting; // indicates, that a reader is waiting for new data + Semaphore mReadSem; // read semaphore for blocking reader +public: + ChannelBuffer(size_t blockSize, size_t blockCount, size_t elementSize); // constr. + void store(const void *pSrc); // store into a new data block by copying from pSrc + uint64_t fetch(void * pDst, ssize_t size = 0); // fetch eldest data block and copy to pDst + std::shared_ptr fullBuffer() const; // get full buffer + void clear(); // clear block occupancies + int64_t getEldestTag(); // get the tag residing the longest in the buffer + size_t copyBetweenTags(void *pDst, int64_t start, int64_t end); // copy data to external buffer between tags (return: number of bytes copied) + size_t getFreeBlockCnt(); // get number of free blocks + int64_t copyBlock(size_t n, void * pDst); // get block (nth eldest) +}; + + +#endif //WFR_APP_CHANNELBUFFER_H diff --git a/libwfr/src/ICreatable.h b/libwfr/src/ICreatable.h new file mode 100644 index 0000000..802eb19 --- /dev/null +++ b/libwfr/src/ICreatable.h @@ -0,0 +1,19 @@ +// +// Created by epagris on 2022.05.04.. +// + +#ifndef WFR_APP_ICREATABLE_H +#define WFR_APP_ICREATABLE_H + +#include + +template +class ICreatable { +public: + static std::shared_ptr create(ParamTypes... params) { + return std::shared_ptr((T *) new T(params...)); + } +}; + + +#endif //WFR_APP_ICREATABLE_H diff --git a/libwfr/src/Logger.cpp b/libwfr/src/Logger.cpp new file mode 100644 index 0000000..325b74b --- /dev/null +++ b/libwfr/src/Logger.cpp @@ -0,0 +1,50 @@ +// +// Created by epagris on 2022.05.03.. +// + +#include +#include +#include +#include "Logger.h" + +bool Logger::mLogEn = false; +int Logger::mLogFD = -1; +bool Logger::mCloseOnStop = false; + +void Logger::log(const std::string &str) { + if (mLogEn) { + write(mLogFD, str.c_str(), str.size()); + } +} + +void Logger::logLine(const std::string &str) { + std::string line = str + "\n"; + log(line); +} + +void Logger::startLogging(int fd) { + mLogFD = fd; + mLogEn = true; +} + +void Logger::startLogging(const std::string &filename) { + mLogFD = open(filename.c_str(), O_WRONLY); + + if (mLogFD < 0) { + std::cerr << "Could not open log output file! (" + filename + ")" << std::endl; + return; + } + + mCloseOnStop = true; // close file when abandoning logging session, since we have opened it + mLogEn = true; +} + +void Logger::stopLogging() { + if (mLogEn) { + mLogEn = false; + + if (mCloseOnStop) { // close file belonging to us + close(mLogFD); + } + } +} diff --git a/libwfr/src/Logger.h b/libwfr/src/Logger.h new file mode 100644 index 0000000..8644007 --- /dev/null +++ b/libwfr/src/Logger.h @@ -0,0 +1,33 @@ +// +// Created by epagris on 2022.05.03.. +// + +#ifndef WFR_APP_LOGGER_H +#define WFR_APP_LOGGER_H + + +#include +#include + +class Logger { +public: + static constexpr int LOG_FD_DEF = STDOUT_FILENO; // default log file descriptor +private: + static bool mLogEn; // enable logging + static int mLogFD; // file descriptor for logging + static bool mCloseOnStop; // close file when stopping logging +private: + Logger() = default; // constr. +public: + static void startLogging(int fd = STDOUT_FILENO); // start logging on given file descriptor + static void startLogging(const std::string &filename); // start logging using a filename + static void stopLogging(); // stop logging + + // logging functions + static void log(const std::string &str); + static void logLine(const std::string &str); + +}; + + +#endif //WFR_APP_LOGGER_H diff --git a/libwfr/src/MultiStreamOscilloscope.cpp b/libwfr/src/MultiStreamOscilloscope.cpp new file mode 100644 index 0000000..429b5ff --- /dev/null +++ b/libwfr/src/MultiStreamOscilloscope.cpp @@ -0,0 +1,100 @@ +// +// Created by epagris on 2022.05.04.. +// + +#include +#include "MultiStreamOscilloscope.h" + +MultiStreamOscilloscope::MultiStreamOscilloscope() { + mCapturePeriod_ns = DRAW_WINDOW_PERIOD_NS_DEF; + verticalScale = VOLT_PER_BIN_DEF; + mTrigState = {false, false}; +} + +void MultiStreamOscilloscope::setup(const std::vector &nodes, const AcquisitionFormat &acqFmt) { + MultiStreamProcessor::setup(nodes, acqFmt); + + // calculate buffer parameters + mCaptureLength = ceil(mCapturePeriod_ns / 1E+09 * acqFmt.sampling_rate_Hz); // calculate draw window period in samples + mFIFOBlockCnt = 2 * floor(mCaptureLength / acqFmt.mch_samples_per_packet); // TODO: 2x: pretrigger... + mFIFOBlockSize = mAcqFmt.mch_samples_per_packet * sizeof(SamplePoint); + + // setup channel buffers + mpChBufs.resize(mChCnt); + for (auto chBuf: mpChBufs) { + chBuf.reset(new ChannelBuffer(mFIFOBlockSize, mFIFOBlockCnt, 0)); + } + + // setup sample assembly buffer + mpAssemblyBuffer.reset(new SamplePoint[mAcqFmt.mch_samples_per_packet]); + + // setup trigger buffer + mpTriggerBuffer.reset(new SamplePoint[mAcqFmt.mch_samples_per_packet]); +} + +bool MultiStreamOscilloscope::input(size_t ch, const std::shared_ptr &pTime, const void *pData, size_t size) { + if (!MultiStreamProcessor::input(ch, pTime, pData, size)) { + return false; + } + + // assemble sample points + double sample; + for (size_t i = 0; i < mAcqFmt.mch_samples_per_packet; i++) { + uint8_t *pSample = ((uint8_t *) pData) + i * mAcqFmt.sample_storage_size_B; // generate sample pointer + + // fetch value + switch (mAcqFmt.sample_storage_size_B) { + case 1: + sample = *((int8_t *) pSample); + break; + case 2: + sample = *((int16_t *) pSample); + break; + case 4: + sample = *((int32_t *) pSample); + break; + } + + // infer vertical gain + sample *= verticalScale; + + // save sample point + mpAssemblyBuffer.get()[i] = {.t = pTime.get()[i].to_ns(), .x = sample}; + } + + // push new block of samples onto the FIFO + mpChBufs[ch]->store(mpAssemblyBuffer.get()); + + // if a sufficient collection of samples are available (the FIFO is full) + // and the trigger is armed, and the trigger channel is the curent one, + // then probe samples to trigger + if (trigSettings.ch == ch && mTrigState.armed && mpChBufs[ch]->getFreeBlockCnt() == 0) { + int64_t tag = mpChBufs[ch]->copyBlock(2, mpTriggerBuffer.get()); // TODO magic constant!... + for (size_t i = 0; i < mAcqFmt.mch_samples_per_packet; i++) { + const SamplePoint& samplePoint = mpTriggerBuffer.get()[i]; // get sample + mTrigState.trigd = trigSettings.sample(samplePoint.x); // probe if causes a trigger + + // triggered? + if (mTrigState.trigd) { + mTrigState.armed = false; // disable trigger system until arming + mTrigState.triggerSampleTag = tag + i; // store the sample the trigger fired on + + // copy captured data + size_t preTriggerSamples = mAcqFmt.mch_samples_per_packet; + int64_t captureStart = mTrigState.triggerSampleTag - preTriggerSamples; + int64_t captureEnd = captureStart + mCaptureLength; + mpChBufs[ch]->copyBetweenTags(mpTriggerBuffer.get(), captureStart, captureEnd); + } + } + } + + return true; +} + +void MultiStreamOscilloscope::triggerNow() { + +} + +void MultiStreamOscilloscope::armTrigger() { + +} diff --git a/libwfr/src/MultiStreamOscilloscope.h b/libwfr/src/MultiStreamOscilloscope.h new file mode 100644 index 0000000..159bcfe --- /dev/null +++ b/libwfr/src/MultiStreamOscilloscope.h @@ -0,0 +1,51 @@ +// +// Created by epagris on 2022.05.04.. +// + +#ifndef WFR_APP_MULTISTREAMOSCILLOSCOPE_H +#define WFR_APP_MULTISTREAMOSCILLOSCOPE_H + + +#include "ICreatable.h" +#include "MultiStreamProcessor.h" +#include "ChannelBuffer.h" +#include "Trigger.h" + +class MultiStreamOscilloscope : public MultiStreamProcessor, public ICreatable { +public: + static constexpr size_t DRAW_WINDOW_PERIOD_NS_DEF = 50E+06; // default window length (50ms) + static constexpr double VOLT_PER_BIN_DEF = 42.80E-06; // default vertical resolution +public: + struct SamplePoint { + int64_t t; // time + double x; // sample data + }; +private: + std::vector> mpChBufs; // channel buffers + size_t mCapturePeriod_ns; // drawing window period + size_t mCaptureLength; // length of drawing sliding window in SAMPLES + size_t mFIFOBlockCnt; // number of blocks the FIFO consists of + size_t mFIFOBlockSize; // FIFO block size in bytes + std::shared_ptr mpAssemblyBuffer; // buffer for assembling sample points + std::shared_ptr mpTriggerBuffer; // buffer for probing against trigger conditions + std::shared_ptr mpCaptureBuffer; // buffer for last capture +public: + TriggerSettings trigSettings; // trigger settings + double verticalScale; // vertical resolution +private: + struct { + bool armed; // indicates wheter trigger is armed or not + bool trigd; // indicates if trigger has fired + int64_t triggerSampleTag; + } mTrigState; +public: + MultiStreamOscilloscope(); + void triggerNow(); // make the scope trigger regardless conditions + void armTrigger(); // arm trigger for next acquisition + + void setup(const std::vector &nodes, const AcquisitionFormat &acqFmt) override; + bool input(size_t ch, const std::shared_ptr &pTime, const void *pData, size_t size) override; +}; + + +#endif //WFR_APP_MULTISTREAMOSCILLOSCOPE_H diff --git a/libwfr/src/MultiStreamProcessor.cpp b/libwfr/src/MultiStreamProcessor.cpp new file mode 100644 index 0000000..4b2e2cd --- /dev/null +++ b/libwfr/src/MultiStreamProcessor.cpp @@ -0,0 +1,29 @@ +// +// Created by epagris on 2022.05.04.. +// + +#include "MultiStreamProcessor.h" +#include "Logger.h" + +void MultiStreamProcessor::setup(const std::vector &nodes, const AcquisitionFormat &acqFmt) { + // get parameters + mNodes = nodes; + mChCnt = nodes.size() * acqFmt.channel_count; + mAcqFmt = acqFmt; +} + +bool MultiStreamProcessor::input(size_t ch, const std::shared_ptr &pTime, const void *pData, size_t size) { + // check input validity + if (ch >= mChCnt) { + Logger::logLine("INVALID channel index (" + std::to_string(ch) + ") in " + __FUNCTION__); + return false; + } + + if (size > mAcqFmt.single_channel_data_size_per_packet_B) { + Logger::logLine("INVALID data size (" + std::to_string(size) + " bytes) in " + __FUNCTION__ + + ". Predefined single-channel all-data size (" + + std::to_string(mAcqFmt.single_channel_data_size_per_packet_B) + " bytes) exceeded, tail clipped!"); + } + + return true; +} diff --git a/libwfr/src/MultiStreamProcessor.h b/libwfr/src/MultiStreamProcessor.h new file mode 100644 index 0000000..380d2dd --- /dev/null +++ b/libwfr/src/MultiStreamProcessor.h @@ -0,0 +1,28 @@ +// +// Created by epagris on 2022.05.04.. +// + +#ifndef WFR_APP_MULTISTREAMPROCESSOR_H +#define WFR_APP_MULTISTREAMPROCESSOR_H + + +#include +#include +#include +#include "AcquisitionFormat.h" +#include "Timestamp.h" + +class MultiStreamProcessor { +protected: + std::vector mNodes; // client node addresses + size_t mChCnt; // channel count + AcquisitionFormat mAcqFmt; // acquisition format +protected: + MultiStreamProcessor() = default; +public: + virtual void setup(const std::vector &nodes, const AcquisitionFormat &acqFmt); // setup function for adapting to data streams + virtual bool input(size_t ch, const std::shared_ptr &pTime, const void *pData, size_t size); // input data for a channel +}; + + +#endif //WFR_APP_MULTISTREAMPROCESSOR_H diff --git a/libwfr/src/MultiStreamReceiver.cpp b/libwfr/src/MultiStreamReceiver.cpp new file mode 100644 index 0000000..11eb841 --- /dev/null +++ b/libwfr/src/MultiStreamReceiver.cpp @@ -0,0 +1,247 @@ +// +// Created by epagris on 2021. 11. 22.. +// + +#include + +#include +#include +#include +#include + +#include "MultiStreamReceiver.h" +#include "audio_types.h" +#include "Logger.h" + +MultiStreamReceiver::MultiStreamReceiver(const std::vector &nodes, size_t samplingRate, size_t sampleSize, size_t channelCount, size_t mchSampPerPckt, unsigned short port) + : MultiStreamReceiver(nodes, AcquisitionFormat(samplingRate, sampleSize, channelCount, mchSampPerPckt), port) { +} + +MultiStreamReceiver::MultiStreamReceiver(const std::vector &nodes, const std::string &acqFormat_str, unsigned short port) + : MultiStreamReceiver(nodes, AcquisitionFormat(acqFormat_str), port) { +} + +MultiStreamReceiver::MultiStreamReceiver(const std::vector &nodes, const AcquisitionFormat &acqFormat, unsigned short port) + : port(port), mAcqFmt(acqFormat) { + mRunning = false; + mClientNodes = nodes; + mSoc = -1; // some dummy initial value +} + +MultiStreamReceiver::MultiStreamReceiver(const std::vector &nodes_str, const AcquisitionFormat &acqFormat, unsigned short port) + : port(port), mAcqFmt(acqFormat) { + mRunning = false; + mSoc = -1; + + for (auto &node_addr: nodes_str) { + mClientNodes.push_back({ inet_addr(node_addr.c_str()) }); + } +} + +MultiStreamReceiver::MultiStreamReceiver(const std::vector &nodes_str, const std::string &acqFormat_str, unsigned short port) + : MultiStreamReceiver(nodes_str, AcquisitionFormat(acqFormat_str), port) { +} + +MultiStreamReceiver::~MultiStreamReceiver() { + stop(); +} + +void MultiStreamReceiver::startNetworking() { + // open socket + mSoc = socket(AF_INET, SOCK_DGRAM, 0); + if (mSoc == -1) { + throw std::runtime_error(std::string("Could not create UDP-socket in ") + __FUNCTION__ + "!"); + } + + int opt = 1; + if (setsockopt(mSoc, SOL_SOCKET, SO_REUSEADDR | SO_REUSEPORT, &opt, sizeof(opt)) == -1) { + throw std::runtime_error(std::string("Could not set socket options in ") + __FUNCTION__ + "!"); + } + + sockaddr_in addr; + addr.sin_family = AF_INET; + addr.sin_addr.s_addr = INADDR_ANY; + addr.sin_port = htons(port); + + // bind + if (bind(mSoc, (const sockaddr *) &addr, sizeof(addr)) == -1) { + throw std::runtime_error(std::string("Could not bind socket options in ") + __FUNCTION__ + "!"); + } +} + +void MultiStreamReceiver::start(const std::shared_ptr &pMSP) { + if (mRunning) { + return; + } + + // store multistream processor + mpMSP = pMSP; + mpMSP->setup(mClientNodes, mAcqFmt); + + if (mpMSP == nullptr) { + Logger::logLine(std::string("ERROR MultiStreamProcessor is NULL in ") + __FUNCTION__ + ", cannot start MultiStreamReceiver!"); + return; + } + + // ---------------------------- + + //createSampleWriters(); // TODO + startNetworking(); + + // ---------------------------- + + mRunning = true; + mRecvThread = std::make_shared(fnRecv, this); +} + +void MultiStreamReceiver::stop() { + if (!mRunning) { + return; + } + + mRunning = false; + mRecvThread->join(); + close(mSoc); + mpSampleWriters.clear(); +} + +// -------------------------------------- + +void MultiStreamReceiver::fnRecv(MultiStreamReceiver *pMSR) { + // select-related structures + fd_set read_fds; + timeval tv; + + // allocate receive buffer based on sample size + size_t expected_receive_size = pMSR->mAcqFmt.sample_data_size_per_packet_B + sizeof(AudioPayloadHeader); + std::shared_ptr pRecvBuf(new uint8_t[expected_receive_size]); + + // allocate buffers for extracted channels + std::shared_ptr pChannelBuf(new uint8_t[pMSR->mAcqFmt.sample_data_size_per_packet_B]); + +// if (pMSR->mSampleSize == 16) { +// pRecvBuf.reset(new uint8_t[pMSR->mStereoBufLen * 2 + sizeof(AudioPayloadHeader)]); +// } else if (pMSR->mSampleSize == 24 || pMSR->mSampleSize == 32) { +// pRecvBuf.reset(new uint8_t[pMSR->mStereoBufLen * 4 + sizeof(AudioPayloadHeader)]); +// } + + // audio payload header + AudioPayloadHeader aph; + + // packet timestamps + Timestamp ts[2]; // current and previous timestamp + + // limit on time difference variation + double timeDiffVariation_ns = 1E+09 / pMSR->mAcqFmt.sampling_rate_Hz * TS_VALIDITY_RANGE; + + // allocate buffer for interpolated timestamps + std::shared_ptr pTSBuf(new Timestamp[pMSR->mAcqFmt.mch_samples_per_packet]); + + while (pMSR->mRunning) { + // fd-set + FD_ZERO(&read_fds); + FD_SET(pMSR->mSoc, &read_fds); + + // timeout + tv.tv_sec = 0; + tv.tv_usec = 1E+06 / 4; + + // wait for data + int fd_cnt = select(pMSR->mSoc + 1, &read_fds, nullptr, nullptr, &tv); + + // if data is available on the socket + if ((fd_cnt > 0) && FD_ISSET(pMSR->mSoc, &read_fds)) { + + /* // read just the header + * if (read(pMSR->mSoc, (&aph), sizeof(AudioPayloadHeader)) < sizeof(AudioPayloadHeader)) { + throw std::runtime_error("Receive error!"); + } + + // read the rest + if ((recv_size = read(pMSR->mSoc, pRecvBuf.get(), aph.sample_cnt * aph.channel_count * pMSR->mSampleStorageSizeByte)) <= 0) { + throw std::runtime_error("Receive error!"); + // TODO check buffer size equivalence + }*/ + + size_t recv_size; + if ((recv_size = recv(pMSR->mSoc, pRecvBuf.get(), expected_receive_size, 0)) <= 0) { + throw std::runtime_error("Receive error!"); + } + + // get header + memcpy(&aph, pRecvBuf.get(), sizeof(AudioPayloadHeader)); + + // ----------- TIME PROCESSING --------------- + + // store timestamp + ts[1] = ts[0]; // shift FIFO + ts[0] = {(int64_t) aph.timestamp_s, (int64_t) aph.timestamp_ns}; // store new timestamp + + // if previous timestamp is empty, then don't store samples since time interpolation can not be done + if (ts[1].empty()) { + continue; + } + + // calculate step size + Timestamp d = (ts[0] - ts[1]) / (double) pMSR->mAcqFmt.mch_samples_per_packet; + + // validity check on time step size + if (llabs(d.to_ns() - ((int64_t) (1E+09 / pMSR->mAcqFmt.sampling_rate_Hz))) > timeDiffVariation_ns) { + Logger::logLine("SKIPPED packet due to large packet period variation!"); + continue; // skip this packet if time step is outside the valid range + } + + // prepare time interpolation + Timestamp ts_interp = ts[1]; // start from the oldest one + + // ------------- SAMPLE PROCESSING ------------- + + // decompose data into distinct channels ("transpose matrix") + uint8_t *pSampleData = pRecvBuf.get() + sizeof(AudioPayloadHeader); // get pointer on the data block + + for (size_t i = 0; i < pMSR->mAcqFmt.mch_samples_per_packet; i++) { // iterate on all samples + for (size_t ch = 0; ch < pMSR->mAcqFmt.channel_count; ch++) { + // calculate source and destination indices + size_t dstIndex = pMSR->mAcqFmt.single_channel_data_size_per_packet_B * ch + // channel offset ("select column") + pMSR->mAcqFmt.sample_storage_size_B * i; // destination sample index ("select row") + + size_t srcIndex = pMSR->mAcqFmt.all_channel_sample_size_B * i + // sample offset ("select row") + ch * pMSR->mAcqFmt.sample_storage_size_B; // channel offset ("select column") + + // copy sample + memcpy(pChannelBuf.get() + dstIndex, pSampleData + srcIndex, pMSR->mAcqFmt.sample_storage_size_B); + } + + // TIME INTERPOLATION [optimizing loops] (interpolate time for each sample) + ts_interp += d; + pTSBuf.get()[i] = ts_interp; + } + + // get sample writer for the specific address +// std::shared_ptr pSampleWriter = nullptr; + for (size_t i = 0; i < pMSR->mClientNodes.size(); i++) { + if (pMSR->mClientNodes[i] == aph.addr) { + size_t ch0 = i * pMSR->mAcqFmt.channel_count; // get "first channel" of that node + + // transfer channel data + for (size_t ch = 0; ch < pMSR->mAcqFmt.channel_count; ch++) { + size_t dataSize = pMSR->mAcqFmt.single_channel_data_size_per_packet_B; + uint8_t *pChData = pChannelBuf.get() + ch * dataSize; // get channel data + + // pass data + pMSR->mpMSP->input(ch + ch0, pTSBuf, pChData, dataSize); + } + +// pSampleWriter = pMSR->mpSampleWriters[i]; + break; + } + } + + // if sample writer is found +// if (pSampleWriter != nullptr) { +// std::unique_lock lock(pMSR->mSampleInsertMtx); +// pSampleWriter->addSamples(pRecvBuf.get() + sizeof(AudioPayloadHeader), {aph.timestamp_s, aph.timestamp_ns}); // get sample data +// } + } + } +} diff --git a/libwfr/src/MultiStreamReceiver.h b/libwfr/src/MultiStreamReceiver.h new file mode 100644 index 0000000..2cc7332 --- /dev/null +++ b/libwfr/src/MultiStreamReceiver.h @@ -0,0 +1,51 @@ +// +// Created by epagris on 2021. 11. 22.. +// + +#ifndef WFR_MULTISTREAMRECEIVER_H +#define WFR_MULTISTREAMRECEIVER_H + + +#include +#include +#include +#include +#include +#include "SampleWriter.h" +#include "AcquisitionFormat.h" +#include "MultiStreamProcessor.h" + +class MultiStreamReceiver { +public: + static constexpr unsigned short DEFAULT_PORT = 20220; // default port + static constexpr double TS_VALIDITY_RANGE = 0.1; // maximal variation of packet length considered valid +private: + std::vector mClientNodes; // client node addresses + int mSoc; // UDP-socket for receiving samples + std::shared_ptr mRecvThread; // thread for reception + static void fnRecv(MultiStreamReceiver *pMSR); // routine function running in separate thread + bool mRunning; + std::vector> mpSampleWriters; // sample writers for streams + //std::mutex mSampleInsertMtx; // mutex protecting against sample insertion collision when calling pSampleWriter->addSamples(...) + std::shared_ptr mpMSP; // multistream processor INSTANCE (pointer to...) +private: + AcquisitionFormat mAcqFmt; // acquisition format +private: + void startNetworking(); // start networking +public: + const unsigned short port; // port +public: + MultiStreamReceiver(const std::vector &nodes, size_t samplingRate, size_t sampleSize, size_t channelCount, size_t mchSampPerPckt, unsigned short port = DEFAULT_PORT); + MultiStreamReceiver(const std::vector &nodes, const AcquisitionFormat &acqFormat, unsigned short port = DEFAULT_PORT); + MultiStreamReceiver(const std::vector &nodes, const std::string &acqFormat_str, unsigned short port = DEFAULT_PORT); + MultiStreamReceiver(const std::vector &nodes_str, const AcquisitionFormat &acqFormat, unsigned short port = DEFAULT_PORT); + MultiStreamReceiver(const std::vector &nodes_str, const std::string &acqFormat_str, unsigned short port = DEFAULT_PORT); + + void start(const std::shared_ptr &pMSP); // start reception + void stop(); // stop reception + + virtual ~MultiStreamReceiver(); +}; + + +#endif //WFR_MULTISTREAMRECEIVER_H diff --git a/libwfr/src/MultiStreamToFile.cpp b/libwfr/src/MultiStreamToFile.cpp new file mode 100644 index 0000000..a0b687f --- /dev/null +++ b/libwfr/src/MultiStreamToFile.cpp @@ -0,0 +1,71 @@ +// +// Created by epagris on 2022.05.04.. +// + +#include +#include +#include +#include "MultiStreamToFile.h" +#include "Logger.h" + +MultiStreamToFile::MultiStreamToFile(const std::string &targetDir) { + mTargetDir = targetDir; +} + +void MultiStreamToFile::createSampleWriters() { + struct stat sb; + if (stat(mTargetDir.c_str(), &sb) == 0 && S_ISDIR(sb.st_mode)) { // if directory exists then leave it as it is + //unlink(datasetName.c_str()); FIXME + } else if (mkdir(mTargetDir.c_str(), 0777) == -1) { // else: create directory for dataset + throw std::runtime_error("Could not create directory " + mTargetDir + "!"); + } + + if (chdir(mTargetDir.c_str()) == -1) { + throw std::runtime_error("Could not change to target directory!"); + } + + std::string nodeDirHint = ""; + + // create sample writers, ONE-PER-CHANNEL + size_t acc_ch = 0; // accumulative channel index (across all preceding nodes) + for (unsigned int &mClientNode: mNodes) { + for (size_t ch = 0; ch < mAcqFmt.channel_count; ch++) { + // create sample writer + std::string datasetName = std::string("node_") + inet_ntoa({mClientNode}) + "ch" + std::to_string(acc_ch); + mpSampleWriters.emplace_back(std::make_shared(datasetName, mAcqFmt)); + + // create hint + nodeDirHint += datasetName + "\n"; + + // increase accumulative channel count + acc_ch++; + } + } + + // save hint + std::ofstream hintFile("node_dir_hint.txt", std::ios_base::out); + hintFile << nodeDirHint; + hintFile.close(); + + if (chdir("..") == -1) { + throw std::runtime_error("Could not change to target directory!"); + } +} + +void MultiStreamToFile::setup(const std::vector &nodes, const AcquisitionFormat &acqFmt) { + MultiStreamProcessor::setup(nodes, acqFmt); + + // prepare for receiving data + createSampleWriters(); +} + +bool MultiStreamToFile::input(size_t ch, const std::shared_ptr &pTime, const void *pData, size_t size) { + if (!MultiStreamProcessor::input(ch, pTime, pData, size)) { + return false; + } + + // store new data + mpSampleWriters[ch]->addSamples((const uint8_t *) pData, pTime); + + return true; +} diff --git a/libwfr/src/MultiStreamToFile.h b/libwfr/src/MultiStreamToFile.h new file mode 100644 index 0000000..1938179 --- /dev/null +++ b/libwfr/src/MultiStreamToFile.h @@ -0,0 +1,29 @@ +// +// Created by epagris on 2022.05.04.. +// + +#ifndef WFR_APP_MULTISTREAMTOFILE_H +#define WFR_APP_MULTISTREAMTOFILE_H + + +#include +#include "MultiStreamProcessor.h" +#include "SampleWriter.h" +#include "ICreatable.h" + +class MultiStreamToFile : public MultiStreamProcessor, public ICreatable { +private: + std::string mTargetDir; // target directory + std::vector> mpSampleWriters; // sample writers for streams +private: + void createSampleWriters(); // create sample writers +public: + MultiStreamToFile(const std::string& targetDir); // constr. + +public: + void setup(const std::vector &nodes, const AcquisitionFormat &acqFmt) override; + bool input(size_t ch, const std::shared_ptr &pTime, const void *pData, size_t size) override; +}; + + +#endif //WFR_APP_MULTISTREAMTOFILE_H diff --git a/libwfr/src/SampleWriter.cpp b/libwfr/src/SampleWriter.cpp new file mode 100644 index 0000000..8e2366a --- /dev/null +++ b/libwfr/src/SampleWriter.cpp @@ -0,0 +1,136 @@ +#include "SampleWriter.h" +#include "AcquisitionFormat.h" + +SampleWriter::SampleWriter(const std::string &datasetName, const AcquisitionFormat &acqFmt, bool withoutTSFile) + : mAcqFmt(acqFmt), mWithoutTSFile(withoutTSFile) { // constr. + prepareBuffers(); + prepareOutputFiles(datasetName); +} + +SampleWriter::~SampleWriter() { + closeFiles(); +} + +void SampleWriter::prepareBuffers() { + // allocate timestamp data buffer + mpTs = std::shared_ptr(new TimestampRecord[mAcqFmt.mch_samples_per_packet]); + + // allocate buffer based on sample size + mpSamples = std::shared_ptr(new uint8_t[mAcqFmt.single_channel_data_size_per_packet_B]); +} + +void SampleWriter::prepareOutputFiles(const std::string &datasetName) { +// struct stat sb; +// if (stat(datasetName.c_str(), &sb) == 0 && S_ISDIR(sb.st_mode)) { // if directory exists then leave it as it is +// //unlink(datasetName.c_str()); FIXME +// } else if (mkdir(datasetName.c_str(), 0777) == -1) { // else: create directory for dataset +// throw std::runtime_error("Could not create directory " + datasetName + "!"); +// } +// +// std::string datadir_dirsep = datasetName + DIRSEP; + + // timestamps + if (!mWithoutTSFile) { + mTsFileName = datasetName + '.' + TS_EXT; // timestamp + + // open timestamp file + mTsFile.open(mTsFileName, std::ios_base::binary); + if (!mTsFile.is_open()) { + throw std::runtime_error("Could not open timestamp file " + mTsFileName + "!"); + } + } + + // data samples + mSampleFileName = datasetName + '.' + SAMPLE_EXT; + mSampleFile.open(mSampleFileName, std::ios_base::binary); + if (!mSampleFile.is_open()) { + throw std::runtime_error("Could not open sample file " + mSampleFileName + "!"); + } +} + +void SampleWriter::closeFiles() { + // close file storing timestamp files + if (!mWithoutTSFile) { + mTsFile.close(); + } + + // close files stroring samples + mSampleFile.close(); +} + +void SampleWriter::addSamples(const uint8_t *pSamples, const std::shared_ptr &pTime) { +/* // shift FIFO and store new timestamp + mTs[1] = mTs[0]; + mTs[0] = ts; + + // if previous timestamp is empty, then don't store samples since time interpolation can not be done + if (mTs[1].empty()) { + return; + } + + // calculate time step size + Timestamp d = (mTs[0] - mTs[1]) / (double) mBlockSize; + + // validity check on time step size + if (d.to_ns() > mSamplingPeriod_ns_UB || d.to_ns() < mSamplingPeriod_ns_LB) { + return; // skip this packet if time step is outside the valid range + } + + // timestamp for time interpolation + Timestamp ts_interp = mTs[1]; + + // get channel pointers + int8_t *ppCh[mChN]; + for (size_t i = 0; i < mChN; i++) { + ppCh[i] = mpSamples[i].get(); + } + + // extract samples + for (size_t i = 0; i < mBlockSize; i++) { + // select record + auto *pRawSamples = (int8_t *) (pSamples + i * mSampleDisposition); + + // store sample for a channel + for (size_t ch = 0; ch < mChN; ch++) { + if (mSampleStorageSizeByte == 2) { + *((int16_t*)(ppCh[ch] + (i * mSampleStorageSizeByte))) = *((int16_t*)(pRawSamples + (ch * mSampleStorageSizeByte))); + } else if (mSampleStorageSizeByte == 4) { + *((int32_t*)(ppCh[ch] + (i * mSampleStorageSizeByte))) = *((int32_t*)(pRawSamples + (ch * mSampleStorageSizeByte))); + } + } + + // step time + ts_interp += d; + + // store timestamp + mpTs.get()[i] = ts_interp; + } + + // write to file newly created records + */ + // write timestamps + if (!mWithoutTSFile) { + // convert timestamp + for (size_t i = 0; i < mAcqFmt.mch_samples_per_packet; i++) { + mpTs.get()[i] = pTime.get()[i]; + } + + // write... + mTsFile.write((const char *) mpTs.get(), mAcqFmt.mch_samples_per_packet * sizeof(TimestampRecord)); + } + + // write samples + mSampleFile.write((const char *) pSamples, mAcqFmt.single_channel_data_size_per_packet_B); + + // increment counter + mSamplesN += mAcqFmt.mch_samples_per_packet; +} + +unsigned long long SampleWriter::getSampleCnt() const { + return mSamplesN; +} + +// static fields +const std::string SampleWriter::TS_EXT = "ts"; +//const std::string SampleWriter::SAMPLE_FILENAME_PREFIX = "ch_"; +const std::string SampleWriter::SAMPLE_EXT = "dat"; \ No newline at end of file diff --git a/libwfr/src/SampleWriter.h b/libwfr/src/SampleWriter.h new file mode 100644 index 0000000..52b30f1 --- /dev/null +++ b/libwfr/src/SampleWriter.h @@ -0,0 +1,65 @@ +// +// Created by epagris on 2021. 11. 01.. +// + +#ifndef WFR_SAMPLEWRITER_H +#define WFR_SAMPLEWRITER_H + +#include +#include +#include +#include +#include +#include +#include +#include +#include "Timestamp.h" +#include "AcquisitionFormat.h" + +#define DIRSEP '/' + +class SampleWriter { +public: + static const std::string TS_EXT; // filename for timestamp file + //static const std::string SAMPLE_FILENAME_PREFIX; // filename prefix for sample file + static const std::string SAMPLE_EXT; // datafile extensions +public: + // timestamp record + struct TimestampRecord { + public: + uint32_t ts_s, ts_ns; // timestamp + public: + TimestampRecord &operator=(const Timestamp &ts) { + ts_s = (uint32_t) ts.s; + ts_ns = (uint32_t) ts.ns; + return *this; + } + }; + +private: + AcquisitionFormat mAcqFmt; + bool mWithoutTSFile; // does sample writer omit timestamp outputting? +private: + std::shared_ptr mpTs; // timestamp records + std::shared_ptr mpSamples; // sample buffer + + std::string mTsFileName; // timestamp filename + std::ofstream mTsFile; // timestamp file + std::string mSampleFileName; // sample file name + std::ofstream mSampleFile; // sample output file +private: + unsigned long long mSamplesN; // number of samples written until now +private: + void prepareBuffers(); // resize sample and record buffers + void prepareOutputFiles(const std::string &datasetName); // prepare dataset environment + void closeFiles(); // close files +public: + SampleWriter(const std::string &datasetName, const AcquisitionFormat &acqFmt, bool withoutTSFile = false); + ~SampleWriter(); + + void addSamples(const uint8_t *pSamples, const std::shared_ptr &pTime); // add n samples to file + unsigned long long getSampleCnt() const; // get number of samples written out +}; + + +#endif //WFR_SAMPLEWRITER_H diff --git a/src/ServerBeacon.cpp b/libwfr/src/ServerBeacon.cpp similarity index 57% rename from src/ServerBeacon.cpp rename to libwfr/src/ServerBeacon.cpp index eda49e6..8ed8b85 100644 --- a/src/ServerBeacon.cpp +++ b/libwfr/src/ServerBeacon.cpp @@ -6,17 +6,27 @@ #include #include #include +#include #include "ServerBeacon.h" const in_addr ServerBeacon::DEFAULT_MULTICAST_ADDR = {inet_addr("224.0.2.21")}; -ServerBeacon::ServerBeacon() : mRunning(false), mScanCallback(nullptr) { +ServerBeacon::ServerBeacon() : + mRunning(false), mScanCallback(nullptr) { setMulticastAddr(DEFAULT_MULTICAST_ADDR); setPort(DEFAULT_PORT); setAnnouncePeriod(DEFAULT_ANNOUNCE_PERIOD_MS); } -ServerBeacon::ServerBeacon(const in_addr &addr, unsigned short port, size_t announcePeriod_ms) : mRunning(false), mScanCallback(nullptr) { +ServerBeacon::ServerBeacon(const in_addr &addr, unsigned short port, size_t announcePeriod_ms) : + mRunning(false), mScanCallback(nullptr) { + setMulticastAddr(addr); + setPort(port); + setAnnouncePeriod(announcePeriod_ms); +} + +ServerBeacon::ServerBeacon(const std::string &addr, unsigned short port, size_t announcePeriod_ms) : + mRunning(false), mScanCallback(nullptr) { setMulticastAddr(addr); setPort(port); setAnnouncePeriod(announcePeriod_ms); @@ -26,6 +36,11 @@ void ServerBeacon::setMulticastAddr(const in_addr &addr) { mAddr = addr; } +void ServerBeacon::setMulticastAddr(const std::string &addr_s) { + mAddr = {inet_addr(addr_s.c_str())}; +} + + in_addr ServerBeacon::getMulticastAddr() const { return mAddr; } @@ -93,7 +108,7 @@ void ServerBeacon::startBeacon() { } void ServerBeacon::stopBeacon() { - if (!mRunning){ + if (!mRunning) { return; } @@ -102,12 +117,27 @@ void ServerBeacon::stopBeacon() { close(mBeaconSoc); } +void ServerBeacon::singleScan() { + if (mRunning) { + return; + } + + // start the beacon + startBeacon(); + + // wait for nodes to respond + std::this_thread::sleep_for(std::chrono::milliseconds((size_t) (mAnnPeriod_ms * 0.75))); + + // stop the beacon + stopBeacon(); +} + std::list ServerBeacon::getNodesOnNetwork() { std::list nodeAddrs; mBeaconMtx.lock(); - for (auto nodeInfo : mNodes) { + for (auto nodeInfo: mNodes) { nodeAddrs.push_back(nodeInfo.addr); } @@ -116,10 +146,22 @@ std::list ServerBeacon::getNodesOnNetwork() { return nodeAddrs; } +std::list ServerBeacon::getNodesOnNetwork_str() { + std::list nodeAddrs; + + for (auto nodeInfo: getNodesOnNetwork()) { + nodeAddrs.emplace_back(inet_ntoa(nodeInfo)); + } + + return nodeAddrs; +} // ------------------------------------------- void ServerBeacon::fn_BeaconThread(ServerBeacon *pSB) { + // log + Logger::logLine("START beacon"); + // beacon message used to send and receive information BeaconMsg msg; @@ -163,6 +205,9 @@ void ServerBeacon::fn_BeaconThread(ServerBeacon *pSB) { clNodes.push_back(clInfo); + // log + Logger::logLine(std::string("FOUND NODE ") + inet_ntoa(clInfo.addr)); + //std::cout << inet_ntoa(clInfo.addr) << std::endl; } @@ -189,9 +234,12 @@ void ServerBeacon::fn_BeaconThread(ServerBeacon *pSB) { (*pSB->mScanCallback)(pSB); } } + + // log + Logger::logLine("STOP beacon"); } -void ServerBeacon::setScanCallback(const std::shared_ptr>& scanCB) { +void ServerBeacon::setScanCallback(const std::shared_ptr> &scanCB) { mScanCallback = scanCB; } @@ -208,7 +256,7 @@ unsigned short ServerBeacon::getNodeNettermPort(in_addr addr) { int nettermPort = -1; - for (const auto& nodeInfo : mNodes) { + for (const auto &nodeInfo: mNodes) { if (nodeInfo.addr.s_addr == addr.s_addr) { nettermPort = nodeInfo.terminalPort; break; @@ -221,45 +269,142 @@ unsigned short ServerBeacon::getNodeNettermPort(in_addr addr) { throw std::runtime_error(std::string("Node not found in database! (") + inet_ntoa(addr) + ")!"); } - return (unsigned short)nettermPort; + return (unsigned short) nettermPort; } -void ServerBeacon::sendNettermCmd(in_addr addr, unsigned short port, const std::string &cmd) { +#define MAX_TERM_LINE_LENGTH (127) + +std::string ServerBeacon::sendNettermCmd(in_addr addr, unsigned short port, const std::string &cmd, bool expectResp) { int soc = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP); if (soc == -1) { throw std::runtime_error("Could not create TCP socket to execute netterm commands!"); } + // return value + std::string ret_resp; + + // connect to server sockaddr_in serverAddr; memset(&serverAddr, 0, sizeof(sockaddr_in)); serverAddr.sin_addr = addr; serverAddr.sin_port = htons(port); serverAddr.sin_family = AF_INET; - if (connect(soc, (const sockaddr *)&serverAddr, sizeof(sockaddr_in)) == -1) { + if (connect(soc, (const sockaddr *) &serverAddr, sizeof(sockaddr_in)) == -1) { throw std::runtime_error(std::string("Could not connect to netterm! (") + inet_ntoa(addr) + ":" + std::to_string(port) + ")"); } - static std::string nodeftty_cmd = "nodeftty"; - send(soc, "nodeftty", nodeftty_cmd.size(), 0); + static std::string nodeftty_cmd = "nodeftty\n"; + send(soc, nodeftty_cmd.c_str(), nodeftty_cmd.size(), 0); // send command - send(soc, cmd.c_str(), cmd.size(), 0); + std::string cmd_nl = cmd + "\n"; + send(soc, cmd_nl.c_str(), cmd_nl.size(), 0); + // log + Logger::logLine(std::string("CMD '") + cmd + "' ON " + inet_ntoa(addr)); + + if (expectResp) { + // wait for response + fd_set read_fds; + timeval tv = {1, 0}; // 1s timeout + FD_ZERO(&read_fds); + FD_SET(soc, &read_fds); + + if (select(soc + 1, &read_fds, nullptr, nullptr, &tv) <= 0) { // if we have NO response + Logger::logLine(std::string(" -->NO RESPONSE")); + + } else if(FD_ISSET(soc, &read_fds)) { // if we have response + char sResp[MAX_TERM_LINE_LENGTH + 1]; + size_t resp_len = recv(soc, sResp, MAX_TERM_LINE_LENGTH, 0); + sResp[resp_len] = '\0'; + + ret_resp = std::string(sResp); + + // clip trailing \r\n to [r][n] + if (ret_resp.ends_with("\r\n")) { + ret_resp = ret_resp.substr(0, ret_resp.size() - 2); + } + + // log + Logger::logLine(std::string(" -->RESP '") + ret_resp + "'"); + } + } + + // close connection gracefully + std::string cmdExit = "exit\n"; + send(soc, cmdExit.c_str(), cmdExit.size(), 0); + + // wait for response + fd_set read_fds; + timeval tv = {1, 0}; // 1s timeout + FD_ZERO(&read_fds); + FD_SET(soc, &read_fds); + + if (select(soc + 1, &read_fds, nullptr, nullptr, &tv) <= 0) { + Logger::logLine(std::string(" -->FORCED CLOSE")); // log + } + + // close the connection finally close(soc); + + return ret_resp; } void ServerBeacon::execCmdOnNode(in_addr addr, const std::string &cmd) { unsigned short port = getNodeNettermPort(addr); - sendNettermCmd(addr, port, cmd); + sendNettermCmd(addr, port, cmd, false); } -void ServerBeacon::execCmdOnAllTerms(const std::string &cmd) { +void ServerBeacon::execCmdOnNode(const std::string &addr_str, const std::string &cmd) { + in_addr node_addr = {inet_addr(addr_str.c_str())}; + execCmdOnNode(node_addr, cmd); +} + +void ServerBeacon::execCmdOnAllNodes(const std::string &cmd) { mBeaconMtx.lock(); - for (const auto& nodeInfo : mNodes) { - sendNettermCmd(nodeInfo.addr, nodeInfo.terminalPort, cmd); + for (const auto &nodeInfo: mNodes) { + sendNettermCmd(nodeInfo.addr, nodeInfo.terminalPort, cmd, false); } mBeaconMtx.unlock(); -} \ No newline at end of file +} + +std::string ServerBeacon::queryFromNode(in_addr addr, const std::string &cmd) { + unsigned short port = getNodeNettermPort(addr); + return sendNettermCmd(addr, port, cmd, true); +} + +std::string ServerBeacon::queryFromNode(const std::string &addr_str, const std::string &cmd) { + in_addr node_addr = {inet_addr(addr_str.c_str())}; + return queryFromNode(node_addr, cmd); +} + +std::map ServerBeacon::queryFromAllNodes(const std::string &cmd) { + std::map res; + + mBeaconMtx.lock(); + + for (const auto &nodeInfo: mNodes) { + res[nodeInfo.addr.s_addr] = sendNettermCmd(nodeInfo.addr, nodeInfo.terminalPort, cmd, true); + } + + mBeaconMtx.unlock(); + + return res; +} + +std::map ServerBeacon::queryFromAllNodes_str(const std::string &cmd) { + std::map res; + + mBeaconMtx.lock(); + + for (const auto &nodeInfo: mNodes) { + res[inet_ntoa(nodeInfo.addr)] = sendNettermCmd(nodeInfo.addr, nodeInfo.terminalPort, cmd, true); + } + + mBeaconMtx.unlock(); + + return res; +} diff --git a/src/ServerBeacon.h b/libwfr/src/ServerBeacon.h similarity index 67% rename from src/ServerBeacon.h rename to libwfr/src/ServerBeacon.h index 898c9d3..d76daf5 100644 --- a/src/ServerBeacon.h +++ b/libwfr/src/ServerBeacon.h @@ -11,7 +11,9 @@ #include #include #include +#include #include "Callback.h" +#include "Logger.h" class ServerBeacon { public: @@ -47,11 +49,13 @@ private: uint8_t server_nClient; // server or client }; private: - void sendNettermCmd(in_addr addr, unsigned short port, const std::string& cmd); // send netterm cmd + std::string sendNettermCmd(in_addr addr, unsigned short port, const std::string &cmd, bool expectResp); // send netterm cmd public: ServerBeacon(); // constr. - explicit ServerBeacon(const in_addr& addr, unsigned short port, size_t announcePeriod_ms); // constr. with beacon port and announce period + ServerBeacon(const in_addr& addr, unsigned short port, size_t announcePeriod_ms); // constr. with beacon port and announce period + ServerBeacon(const std::string& addr, unsigned short port, size_t announcePeriod_ms); // constr. with beacon port and announce period void setMulticastAddr(const in_addr& addr); // set multicasting address + void setMulticastAddr(const std::string& addr_s); // set multicast address (string) in_addr getMulticastAddr() const; // set multicasting address void setPort(unsigned short port); // set port BEFORE starting beacon unsigned short getPort() const; // get beacon port @@ -61,11 +65,18 @@ public: std::string getInterfaceAddr() const; // get multicast interface address void startBeacon(); // start the beacon void stopBeacon(); // stop the beacon + void singleScan(); // initiate a single scan std::list getNodesOnNetwork(); // get nodes connected to the same network - unsigned short getNodeNettermPort(in_addr addr); // get network terminal port of ndoe + std::list getNodesOnNetwork_str(); // get nodes connected to the same network (list of string) + unsigned short getNodeNettermPort(in_addr addr); // get network terminal port of node void setScanCallback(const std::shared_ptr>& scanCB); // set scan callback - void execCmdOnNode(in_addr addr, const std::string& cmd); // execute command on a node through netterm - void execCmdOnAllTerms(const std::string& cmd); // multicast command to all netterms + void execCmdOnNode(in_addr addr, const std::string& cmd); // execute command on a node through netterm AND DON'T expect a response + void execCmdOnNode(const std::string& addr_str, const std::string& cmd); + std::string queryFromNode(in_addr addr, const std::string& cmd); // --- " ---- AND expect a response + std::string queryFromNode(const std::string& addr_str, const std::string& cmd); + void execCmdOnAllNodes(const std::string& cmd); // multicast command to all netterms + std::map queryFromAllNodes(const std::string& cmd); // --- " ---- AND expect a response + std::map queryFromAllNodes_str(const std::string& cmd); // (string address) }; diff --git a/src/Timestamp.h b/libwfr/src/Timestamp.h similarity index 100% rename from src/Timestamp.h rename to libwfr/src/Timestamp.h diff --git a/libwfr/src/Trigger.cpp b/libwfr/src/Trigger.cpp new file mode 100644 index 0000000..d93da7e --- /dev/null +++ b/libwfr/src/Trigger.cpp @@ -0,0 +1,55 @@ +// +// Created by epagris on 2022.05.04.. +// + +#include "Trigger.h" + +TriggerSettings::TriggerSettings() { + TriggerSettings::reset(); +} + +void TriggerSettings::reset() { + mFirstSample = true; + ch = 0; +} + +bool TriggerSettings::sample(double x) { + return false; // never trigger +} + +// ------------------------ + +SlopeTrigger::SlopeTrigger() { + level = 0; + slope = RISING; + SlopeTrigger::reset(); +} + +void SlopeTrigger::reset() { + TriggerSettings::reset(); + + x_prev = 0; +} + +bool SlopeTrigger::sample(double x) { + // do not act on first sample + if (mFirstSample) { + mFirstSample = false; + x_prev = x; + return false; + } + + bool trig = false; + + // on every other sample... + if (((slope == RISING) && (x_prev < level && x > level)) || + ((slope == FALLING) && (x_prev > level && x < level))) { + trig = true; + } + + x_prev = x; + + return trig; +} + + diff --git a/libwfr/src/Trigger.h b/libwfr/src/Trigger.h new file mode 100644 index 0000000..cbe2edb --- /dev/null +++ b/libwfr/src/Trigger.h @@ -0,0 +1,38 @@ +// +// Created by epagris on 2022.05.04.. +// + +#ifndef WFR_APP_TRIGGER_H +#define WFR_APP_TRIGGER_H + +#include + +// Dummy trigger base +struct TriggerSettings { +protected: + bool mFirstSample; // indicates the latest sample was the first since the last reset +public: + size_t ch; // channel to probe +public: + TriggerSettings(); + + virtual void reset(); // reset trigger internal state + virtual bool sample(double x); // insert sample into the trigger +}; + +// Simple slope trigger +struct SlopeTrigger : public TriggerSettings { +public: + enum SlopeType { RISING, FALLING }; +private: + double x_prev; // previous sample +public: + double level; // trigger level + SlopeType slope; // slope direction +public: + SlopeTrigger(); // constr. + void reset() override; + bool sample(double x) override; +}; + +#endif //WFR_APP_TRIGGER_H diff --git a/libwfr/src/audio_types.h b/libwfr/src/audio_types.h new file mode 100644 index 0000000..a946bc9 --- /dev/null +++ b/libwfr/src/audio_types.h @@ -0,0 +1,28 @@ +// +// Created by epagris on 2021. 11. 02.. +// + +#ifndef WFR_AUDIO_TYPES_H +#define WFR_AUDIO_TYPES_H + +#include + +#define PERIOD_LEN_DEF (STEREO_BUF_LEN / 2) + +#define MCH_SAMP_PER_PCKT_DEF (192) +#define SAMPLE_RATE_DEF (48000) +#define SAMPLE_SIZE_DEF (16) +#define CHANNEL_COUNT_DEF (2) + +//typedef int16_t AudioSampleType16_DEF; + +typedef struct { + uint32_t timestamp_s, timestamp_ns; // timestamp + uint32_t sample_cnt; // count of all-channel samples in packet + uint16_t sample_size; // size of a single (mono) sample + uint16_t channel_count; // number of channels + in_addr_t addr; // client node address + //AudioSampleType16_DEF pData[MCH_SAMP_PER_PCKT_DEF]; // buffer for stereo data +} AudioPayloadHeader; + +#endif //WFR_AUDIO_TYPES_H diff --git a/main.cpp b/main.cpp index 5654101..4a09bad 100644 --- a/main.cpp +++ b/main.cpp @@ -10,9 +10,9 @@ #include #include #include "src/ALSADevice.h" -#include "src/SampleWriter.h" -#include "src/audio_types.h" -#include "src/ServerBeacon.h" +#include "libwfr/src/SampleWriter.h" +#include "libwfr/src/audio_types.h" +#include "libwfr/src/ServerBeacon.h" #include "src/GUI.h" //uint8_t pRecvBuf[8096] __attribute__ ((aligned (32))); @@ -28,6 +28,10 @@ int main(int argc, char * argv[]) { std::string mcastIFAddr = argv[2]; unsigned short beaconPort = atoi(argv[1]); + Logger::startLogging(); + + auto a = MultiStreamToFile::create("asd"); + // ----------------- //ALSADevice dev(PERIOD_LEN, SAMPLE_RATE); diff --git a/python/CMakeLists.txt b/python/CMakeLists.txt new file mode 100644 index 0000000..95d5fdc --- /dev/null +++ b/python/CMakeLists.txt @@ -0,0 +1,30 @@ +cmake_minimum_required(VERSION 3.16) + +project(wfr-python-extension) + +set(Python_ADDITIONAL_VERSIONS 3.8) + +find_package(Python3 COMPONENTS Interpreter Development) +find_package(PythonLibs REQUIRED) + +#list(APPEND CMAKE_MODULE_PATH "/home/epagris/.local/lib/python3.8/site-packages/skbuild/resources/cmake/") + +#find_package(PythonExtensions REQUIRED) + +find_package(pybind11 REQUIRED) + +set(CMAKE_CXX_STANDARD 17) +set(SRC + src/wfsmodule.cpp) + +pybind11_add_module(pywfs src/wfsmodule.cpp) +target_include_directories(pywfs PUBLIC ${PROJECT_SOURCE_DIR}/../libwfr/src) +add_dependencies(pywfs wfr) + +target_compile_definitions(pywfs PRIVATE VERSION_INFO=${EXAMPLE_VERSION_INFO}) +target_include_directories(pywfs PUBLIC ${PYTHON_INCLUDE_DIRS}) +target_link_libraries(pywfs PUBLIC wfr) + +set_target_properties(pywfs PROPERTIES SUFFIX ".so") + +install(TARGETS pywfs DESTINATION ${PROJECT_SOURCE_DIR}/module) \ No newline at end of file diff --git a/python/src/wfsmodule.cpp b/python/src/wfsmodule.cpp new file mode 100644 index 0000000..a51620a --- /dev/null +++ b/python/src/wfsmodule.cpp @@ -0,0 +1,90 @@ +#include +#include + +#include +#include "AcquisitionFormat.h" +#include "MultiStreamToFile.h" +#include "MultiStreamReceiver.h" + +#define STRINGIFY(x) #x +#define MACRO_STRINGIFY(x) STRINGIFY(x) + + +int add(int i, int j) { + return i + j; +} + +namespace py = pybind11; +using namespace py::literals; + +PYBIND11_MODULE(pywfs, m) { + m.doc() = R"pbdoc( + WaveFormStream extension + ----------------------- + + )pbdoc"; + +// py::class_(m, "in_addr") +// .def(py::init<>()) +// .def_readwrite("s_addr", &in_addr::s_addr); + + // Logger + py::class_(m, "Logger") + .def_static("start", static_cast(&Logger::startLogging), py::arg("fd") = STDOUT_FILENO) + .def_static("stop", &Logger::stopLogging); + + // ServerBeacon + py::class_(m, "ServerBeacon") + .def(py::init<>()) + .def("setMulticastAddr", static_cast(&ServerBeacon::setMulticastAddr)) + .def("setInterfaceAddr", static_cast(&ServerBeacon::setInterfaceAddr)) + .def("singleScan", &ServerBeacon::singleScan) + .def("getNodesOnNetwork", &ServerBeacon::getNodesOnNetwork_str) + .def("execCmdOnNode", static_cast(&ServerBeacon::execCmdOnNode)) + .def("queryFromNode", static_cast(&ServerBeacon::queryFromNode)) + .def("execCmdOnAllNodes", &ServerBeacon::execCmdOnAllNodes) + .def("queryFromAllNodes", &ServerBeacon::queryFromAllNodes_str); + + // AquisitionFormat + py::class_(m, "AcquisitionFormat") + .def(py::init()) + .def(py::init()) + .def_readonly("sampling_rate_Hz", &AcquisitionFormat::sampling_rate_Hz) + .def_readonly("sample_size_b", &AcquisitionFormat::sample_size_b) + .def_readonly("sameple_storage_size_B", &AcquisitionFormat::sample_storage_size_B) + .def_readonly("channel_count", &AcquisitionFormat::channel_count) + .def_readonly("mch_sample_per_packet", &AcquisitionFormat::mch_samples_per_packet) + .def_readonly("distinct_samples_per_packet", &AcquisitionFormat::distinct_samples_per_packet) + .def_readonly("sample_data_size_per_packet", &AcquisitionFormat::sample_data_size_per_packet_B) + .def_readonly("single_channel_data_size_per_packet_B", &AcquisitionFormat::single_channel_data_size_per_packet_B) + .def_readonly("all_channel_sample_size_B", &AcquisitionFormat::all_channel_sample_size_B) + .def_readonly("packet_period_ns", &AcquisitionFormat::packet_period_ns) + .def("isValid", &AcquisitionFormat::isValid); + + // MultiStreamReceiver + py::class_(m, "MultiStreamReceiver") + .def(py::init, const AcquisitionFormat &, unsigned short>(), "nodes_str"_a, "acqFormat"_a, "port"_a = MultiStreamReceiver::DEFAULT_PORT) + .def(py::init, const std::string &, unsigned short>(), "nodes_str"_a, "acqFormat_str"_a, "port"_a = MultiStreamReceiver::DEFAULT_PORT) + .def("listen", &MultiStreamReceiver::start) + .def("close", &MultiStreamReceiver::stop); + + // MultiStreamProcessor (abstract class) + py::class_> MSP(m, "MultiStreamProcessor"); + + // MultiStreamToFile + py::class_, MultiStreamProcessor>(m, "MultiStreamToFile") + .def_static("create", &MultiStreamToFile::create); + +// +// m.def("subtract", [](int i, int j) { return i - j; }, R"pbdoc( +// Subtract two numbers +// Some other explanation about the subtract function. +// )pbdoc"); + +#ifdef VERSION_INFO + m.attr("__version__") = MACRO_STRINGIFY(VERSION_INFO); +#else + m.attr("__version__") = "dev"; +#endif +} + diff --git a/python/src/wfsmodule.h b/python/src/wfsmodule.h new file mode 100644 index 0000000..e75cb49 --- /dev/null +++ b/python/src/wfsmodule.h @@ -0,0 +1,8 @@ +// +// Created by epagris on 2022.05.01.. +// + +#ifndef WFR_APP_WFSMODULE_H +#define WFR_APP_WFSMODULE_H + +#endif //WFR_APP_WFSMODULE_H diff --git a/src/GUI.h b/src/GUI.h index 9f5eed9..c190e03 100644 --- a/src/GUI.h +++ b/src/GUI.h @@ -11,10 +11,11 @@ #include #include -#include "ServerBeacon.h" +#include "../libwfr/src/ServerBeacon.h" #include "globals.h" -#include "Callback.h" -#include "MultiStreamReceiver.h" +#include "../libwfr/src/Callback.h" +#include "../libwfr/src/MultiStreamReceiver.h" +#include "../libwfr/src/MultiStreamToFile.h" class GUI : public Gtk::Application { public: @@ -261,11 +262,12 @@ public: wStartStopCaptureBtn->set_label("Stop capture"); mFlags.captureRunning = true; - mpMSR = std::make_shared(mClNodeTW->getSelectedLines(), wFolderChooser->get_filename()); - mpMSR->start(); - mGlobs->beacon->execCmdOnAllTerms(std::string("snd connect ") + mGlobs->beacon->getInterfaceAddr() + " " + std::to_string(mpMSR->port)); + mpMSR = std::make_shared(mClNodeTW->getSelectedLines(), SAMPLE_RATE_DEF, SAMPLE_SIZE_DEF, CHANNEL_COUNT_DEF, MCH_SAMP_PER_PCKT_DEF); + mpMSR->start(std::make_shared(wFolderChooser->get_filename())); + //mpMSR->start(std::shared_ptr((MultiStreamProcessor*) new MultiStreamToFile(wFolderChooser->get_filename()))); + mGlobs->beacon->execCmdOnAllNodes(std::string("snd connect ") + mGlobs->beacon->getInterfaceAddr() + " " + std::to_string(mpMSR->port)); } else { - mGlobs->beacon->execCmdOnAllTerms("snd disconnect"); + mGlobs->beacon->execCmdOnAllNodes("snd disconnect"); mpMSR = nullptr; wFolderChooser->set_sensitive(true); diff --git a/src/MultiStreamReceiver.cpp b/src/MultiStreamReceiver.cpp deleted file mode 100644 index dff3c61..0000000 --- a/src/MultiStreamReceiver.cpp +++ /dev/null @@ -1,154 +0,0 @@ -// -// Created by epagris on 2021. 11. 22.. -// - -#include - -#include -#include -#include - -#include "MultiStreamReceiver.h" -#include "audio_types.h" - -MultiStreamReceiver::MultiStreamReceiver(const std::vector &nodes, const std::string& targetDir, unsigned short port) : port(port) { - mTargetDir = targetDir; - mRunning = false; - mClientNodes = nodes; -} - -MultiStreamReceiver::~MultiStreamReceiver() { - stop(); -} - -void MultiStreamReceiver::startNetworking() { - // open socket - mSoc = socket(AF_INET, SOCK_DGRAM, 0); - if (mSoc == -1) { - throw std::runtime_error(std::string("Could not create UDP-socket in ") + __FUNCTION__ + "!"); - } - - int opt = 1; - if (setsockopt(mSoc, SOL_SOCKET, SO_REUSEADDR | SO_REUSEPORT, &opt, sizeof(opt)) == -1) { - throw std::runtime_error(std::string("Could not set socket options in ") + __FUNCTION__ + "!"); - } - - sockaddr_in addr; - addr.sin_family = AF_INET; - addr.sin_addr.s_addr = INADDR_ANY; - addr.sin_port = htons(port); - - // bind - if (bind(mSoc, (const sockaddr *)&addr, sizeof(addr)) == -1) { - throw std::runtime_error(std::string("Could not bind socket options in ") + __FUNCTION__ + "!"); - } -} - -void MultiStreamReceiver::createSampleWriters() { - struct stat sb; - if (stat(mTargetDir.c_str(), &sb) == 0 && S_ISDIR(sb.st_mode)) { // if directory exists then leave it as it is - //unlink(datasetName.c_str()); FIXME - } else if (mkdir(mTargetDir.c_str(), 0777) == -1) { // else: create directory for dataset - throw std::runtime_error("Could not create directory " + mTargetDir + "!"); - } - - if (chdir(mTargetDir.c_str()) == -1) { - throw std::runtime_error("Could not change to target directory!"); - } - - std::string nodeDirHint = ""; - - for (unsigned int & mClientNode : mClientNodes) { - std::string datasetName = std::string("node_") + inet_ntoa({ mClientNode }); - mpSampleWriters.emplace_back(std::make_shared>(datasetName, 2, STEREO_BUF_LEN / 2)); - nodeDirHint += datasetName + "\n"; - } - - std::ofstream hintFile("node_dir_hint.txt", std::ios_base::out); - hintFile << nodeDirHint; - hintFile.close(); - - if (chdir("..") == -1) { - throw std::runtime_error("Could not change to target directory!"); - } -} - - -void MultiStreamReceiver::start() { - if (mRunning) { - return; - } - - // ---------------------------- - - createSampleWriters(); - startNetworking(); - - // ---------------------------- - - mRunning = true; - mRecvThread = std::make_shared(fnRecv, this); -} - -void MultiStreamReceiver::stop() { - if (!mRunning) { - return; - } - - mRunning = false; - mRecvThread->join(); - close(mSoc); - mpSampleWriters.clear(); -} - -// -------------------------------------- - -#define RECV_BUFFER_SIZE (16000) - -void MultiStreamReceiver::fnRecv(MultiStreamReceiver *pMSR) { - // select-related structures - fd_set read_fds; - timeval tv; - - // receive buffer - std::shared_ptr pWordBuf = std::shared_ptr(new uint32_t[(RECV_BUFFER_SIZE / 4) + 1]); // to have 4-byte alignment - auto * pRecvBuf = reinterpret_cast(pWordBuf.get()); - auto * pAudioPayload = reinterpret_cast(pWordBuf.get()); - - while (pMSR->mRunning) { - // fd-set - FD_ZERO(&read_fds); - FD_SET(pMSR->mSoc, &read_fds); - - // timeout - tv.tv_sec = 0; - tv.tv_usec = 1E+06 / 4; - - // wait for data - int fd_cnt = select(pMSR->mSoc + 1, &read_fds, nullptr, nullptr, &tv); - - // if data is available on the socket - if ((fd_cnt > 0) && FD_ISSET(pMSR->mSoc, &read_fds)) { - - ssize_t recv_size; - if ((recv_size = recv(pMSR->mSoc, pRecvBuf, RECV_BUFFER_SIZE, 0)) <= 0) { - throw std::runtime_error("Receive error!"); - } - - // get sample writer for the specific address - std::shared_ptr> pSampleWriter = nullptr; - for (size_t i = 0; i < pMSR->mClientNodes.size(); i++) { - if (pMSR->mClientNodes[i] == pAudioPayload->addr) { - pSampleWriter = pMSR->mpSampleWriters[i]; - break; - } - } - - // if sample writer is found - if (pSampleWriter != nullptr) { - pSampleWriter->addSamples(pAudioPayload->pData, { pAudioPayload->timestamp_s, pAudioPayload->timestamp_ns }); - } - } - } -} - diff --git a/src/MultiStreamReceiver.h b/src/MultiStreamReceiver.h deleted file mode 100644 index 9cc5d74..0000000 --- a/src/MultiStreamReceiver.h +++ /dev/null @@ -1,41 +0,0 @@ -// -// Created by epagris on 2021. 11. 22.. -// - -#ifndef WFR_MULTISTREAMRECEIVER_H -#define WFR_MULTISTREAMRECEIVER_H - - -#include -#include -#include -#include -#include "SampleWriter.h" - -class MultiStreamReceiver { -public: - static constexpr unsigned short DEFAULT_PORT = 20220; // default port -private: - std::vector mClientNodes; // client node addresses - int mSoc; // UDP-socket for receiving samples - std::shared_ptr mRecvThread; // thread for reception - static void fnRecv(MultiStreamReceiver * pMSR); // routine function running in separate thread - bool mRunning; - std::vector>> mpSampleWriters; // sample writers for streams - std::string mTargetDir; // target directory -private: - void startNetworking(); // start networking - void createSampleWriters(); // construct sample writers -public: - const unsigned short port; // port -public: - explicit MultiStreamReceiver(const std::vector& nodes, const std::string& targetDir, unsigned short port = DEFAULT_PORT); - void start(); // start reception - void stop(); - - virtual ~MultiStreamReceiver(); - // stop reception -}; - - -#endif //WFR_MULTISTREAMRECEIVER_H diff --git a/src/SampleReceiver.h b/src/SampleReceiver.h index 1c7ef09..91a3cb3 100644 --- a/src/SampleReceiver.h +++ b/src/SampleReceiver.h @@ -6,7 +6,7 @@ #define WFR_SAMPLERECEIVER_H #include "MemoryPool.h" -#include "Timestamp.h" +#include "../libwfr/src/Timestamp.h" #include diff --git a/src/SampleWriter.h b/src/SampleWriter.h deleted file mode 100644 index 09d66c7..0000000 --- a/src/SampleWriter.h +++ /dev/null @@ -1,195 +0,0 @@ -// -// Created by epagris on 2021. 11. 01.. -// - -#ifndef WFR_SAMPLEWRITER_H -#define WFR_SAMPLEWRITER_H - -#include -#include -#include -#include -#include -#include -#include -#include -#include "Timestamp.h" - -#define DIRSEP '/' - -template // T: sample datatype -class SampleWriter { -public: - static const std::string TS_FILENAME; // filename for timestamp file - static const std::string SAMPLE_FILENAME_PREFIX; // filename prefix for sample file - static const std::string FILE_EXT; // datafile extensions -public: - // timestamp record - struct TimestampRecord { - public: - uint32_t ts_s, ts_ns; // timestamp - public: - TimestampRecord &operator=(const Timestamp &ts) { - ts_s = (uint32_t) ts.s; - ts_ns = (uint32_t) ts.ns; - return *this; - } - }; - -private: - const size_t mChN; // number of channel PAIRS, immutable - const size_t mBlockSize; // buffer size, immutable -private: - std::shared_ptr mpTs; // timestamp records - std::vector> mpSamples; // sample buffer array - - std::string mTsFileName; // timestamp filename - std::ofstream mTsFile; // timestamp file - std::vector mSampleFileNames; // sample file names - std::vector mSampleFiles; // sample output files -private: - Timestamp mTs[2]; // previous and current timestamp -private: - unsigned long long mSamplesN; // number of samples written until now -private: - // resize sample and record buffers - void prepareBuffers() { - // allocate timestamp data buffer - mpTs = std::shared_ptr(new TimestampRecord[mBlockSize]); - - // allocate sample data buffers - mpSamples.resize(mChN); - for (size_t i = 0; i < mChN; i++) { - mpSamples[i] = std::shared_ptr(new T[mBlockSize]); - } - } - - // prepare dataset environment - void prepareDatadir(const std::string &datasetName) { - struct stat sb; - if (stat(datasetName.c_str(), &sb) == 0 && S_ISDIR(sb.st_mode)) { // if directory exists then leave it as it is - //unlink(datasetName.c_str()); FIXME - } else if (mkdir(datasetName.c_str(), 0777) == -1) { // else: create directory for dataset - throw std::runtime_error("Could not create directory " + datasetName + "!"); - } - - std::string datadir_dirsep = datasetName + DIRSEP; - - // generate file names - mTsFileName = TS_FILENAME + '.' + FILE_EXT; // timestamp - - // samples - mSampleFileNames.resize(mChN); - for (size_t i = 0; i < mChN; i++) { - mSampleFileNames[i] = SAMPLE_FILENAME_PREFIX + std::to_string(i) + '.' + FILE_EXT; - } - - // open timestamp file - mTsFile.open(datadir_dirsep + mTsFileName, std::ios_base::binary); - if (!mTsFile.is_open()) { - throw std::runtime_error("Could not open timestamp file " + mTsFileName + "!"); - } - - // open sample files - mSampleFiles.resize(mChN); - for (size_t i = 0; i < mChN; i++) { - mSampleFiles[i].open(datadir_dirsep + mSampleFileNames[i], std::ios_base::binary); - if (!mSampleFiles[i].is_open()) { - throw std::runtime_error("Could not open sample file " + mSampleFileNames[i] + "!"); - } - } - } - - // close files - void closeFiles() { - // close file storing timestamp files - mTsFile.close(); - - // close files stroring samples - for (size_t i = 0; i < mChN; i++) { - mSampleFiles[i].close(); - } - } - -public: - explicit SampleWriter(const std::string &datasetName, size_t chN, size_t bufSize) : mChN(chN), mBlockSize(bufSize), mSamplesN(0) { // constr. - prepareBuffers(); - prepareDatadir(datasetName); - } - - ~SampleWriter() { - closeFiles(); - } - - // add n samples to file - void addSamples(const T *pSamples, const Timestamp &ts) { - // shift FIFO and store new timestamp - mTs[1] = mTs[0]; - mTs[0] = ts; - - // if previous timestamp is empty, then don't store samples since time interpolation can not be done - if (mTs[1].empty()) { - return; - } - - // calculate time step size - Timestamp d = (mTs[0] - mTs[1]) / (double) mBlockSize; - - // timestamp for time interpolation - Timestamp ts_interp = mTs[1]; - - // get channel pointers - T *ppCh[mChN]; - for (size_t i = 0; i < mChN; i++) { - ppCh[i] = mpSamples[i].get(); - } - - // extract samples - for (size_t i = 0; i < mBlockSize; i++) { - // select record - T *pRawSamples = (short *) (pSamples + i * mChN); - - // store sample for a channel - for (size_t ch = 0; ch < mChN; ch++) { - ppCh[ch][i] = pRawSamples[ch]; - } - - // step time - ts_interp += d; - - // store timestamp - mpTs.get()[i] = ts_interp; - } - - // write to file newly created records - - // write timestamps - mTsFile.write((const char *) mpTs.get(), mBlockSize * sizeof(TimestampRecord)); - - // write samples - for (size_t ch = 0; ch < mChN; ch++) { - mSampleFiles[ch].write((const char *) ppCh[ch], mBlockSize * sizeof(T)); - } - - // increment counter - mSamplesN += mBlockSize; - } - - // get number of samples written out - unsigned long long getSampleCnt() const { - return mSamplesN; - } - -}; - -// static fields -template -const std::string SampleWriter::TS_FILENAME = "ts"; - -template -const std::string SampleWriter::SAMPLE_FILENAME_PREFIX = "ch_"; - -template -const std::string SampleWriter::FILE_EXT = "dat"; - -#endif //WFR_SAMPLEWRITER_H diff --git a/src/audio_types.h b/src/audio_types.h deleted file mode 100644 index 57b536c..0000000 --- a/src/audio_types.h +++ /dev/null @@ -1,23 +0,0 @@ -// -// Created by epagris on 2021. 11. 02.. -// - -#ifndef WFR_AUDIO_TYPES_H -#define WFR_AUDIO_TYPES_H - -#include - -#define STEREO_BUF_LEN (2 * 192) -#define PERIOD_LEN (STEREO_BUF_LEN / 2) -#define SAMPLE_RATE (48000) - -typedef int16_t AudioSampleType; - -typedef struct { - uint32_t timestamp_s, timestamp_ns; // timestamp - uint32_t sample_cnt; // count of samples in packet - in_addr_t addr; // client node address - AudioSampleType pData[STEREO_BUF_LEN]; // buffer for stereo data -} AudioPayload; - -#endif //WFR_AUDIO_TYPES_H diff --git a/src/globals.h b/src/globals.h index 2998aa0..e87e2b5 100644 --- a/src/globals.h +++ b/src/globals.h @@ -6,7 +6,7 @@ #define WFR_GLOBALS_H #include -#include "ServerBeacon.h" +#include "../libwfr/src/ServerBeacon.h" struct Globals { std::shared_ptr beacon; // beacon to detect client nodes diff --git a/wfstream_analyzer.lua b/wfstream_analyzer.lua new file mode 100644 index 0000000..68c9655 --- /dev/null +++ b/wfstream_analyzer.lua @@ -0,0 +1,68 @@ +-- @brief WaveFormStream analyzer for wireshark +-- @author Epagris +-- @date 2022.04.29. + +-- 1. Create parser objects +local NAME = "WFS" --Custom protocol name +local MsgProto = Proto(NAME, "WaveFormStream over UDP") + +-- MsgProto Resolution fields for defining protocols +local fields = MsgProto.fields +fields.timestamp_s = ProtoField.uint32(NAME .. "TIMESTAMP_S", "timestamp_s", base.DEC) +fields.timestamp_ns = ProtoField.uint32(NAME .. "TIMESTAMP_NS", "timestamp_ns", base.DEC) +fields.sample_cnt = ProtoField.uint32(NAME .. "SAMPLE_CNT", "sample_cnt", base.DEC) +fields.sample_size = ProtoField.uin16_t(NAME .. "SAMPLE_SIZE", "sample_size", base.DEC) +fields.channel_count = ProtoField.uin16_t(NAME .. "CHANNEL_COUNT", "channel_count", base.DEC) +fields.addr = ProtoField.ipv4(NAME .. "ADDR", "addr", base.DEC) + +local data_dis = Dissector.get("data") + +-- 2. Parser function dissect packet +--[[ + //Next, define the main function of the foo parser, which is called by wireshark + //The first parameter is the tvb type, which represents the data that needs to be parsed by this parser + //The second parameter is the Pinfo type, which is the information on the protocol parsing tree, including the display on the UI. + //The third parameter is the TreeItem type, which represents the upper parse tree. +--]] + +function MsgProto.dissector (tvb, pinfo, tree) + + --Create a subtree from the root tree to print parsed message data + local subtree = tree:add(MsgProto, tvb()) + subtree:append_text(", msg_no: " .. tvb(0, 1):uint()) + -- The protocol name displayed on the protocol line in the packet details + pinfo.cols.protocol = MsgProto.name + + tvb_length = tvb:len() + -- dissect field one by one, and add to protocol tree + --Include a header in the message and continue to create tree parsing + local msg_head_tree = subtree:add(MsgProto, tvb(0,2), "MSG_HEADER") --"MSG_HEADER"The parameter replaces the protocol name display + msg_head_tree:add(fields.msg_no, tvb(0, 1))--Represents a byte starting from 0 + msg_head_tree:add(fields.msg_version, tvb(1, 1)) + + subtree:add(fields.msg_len, tvb(2,1)) + + subtree:add(fields.length, tvb_length) --Display data slice length information without fetching data from slice memory + subtree:add(fields.data_length, tvb_length-8) + + -- Bit Domain Continues to Create Tree Resolution + local msg_bitx_tree = subtree:add( fields.msg_bitx, tvb(3,1) ) -- bitfield + msg_bitx_tree:add(fields.msg_bit1,tvb(3,1)) + msg_bitx_tree:add(fields.msg_bit2,tvb(3,1)) + msg_bitx_tree:add(fields.msg_bit3,tvb(3,1)) + msg_bitx_tree:add(fields.msg_bit4,tvb(3,1)) + + subtree:add_le(fields.local_id,tvb(4,4)) + subtree:add_le(fields.remote_id,tvb(8,4)) + + data_dis:call(tvb(12):tvb(), pinfo, tree) --It is noteworthy to parse the data in the data stream after the message structure. call The parameter name must be tvb,???,I hope the big man will give me some advice. + +end + +-- 3 Register the parser to wireshark Analytical table register this dissector +local udp_port_table = DissectorTable.get("tcp.port") + +--Adding parsed TCP Port, Identify Protocol Based on Port Number +for i,port in ipairs{8001,8002} do + udp_port_table:add(port,MsgProto) +end \ No newline at end of file