- earlier capabilities are accessible from Python

- MultiStreamReceiver and SampleWriter cleaned
- AcquisitionFormat introduced
- Logger created
- MultiStreamProcessor idea introduced, MultiStreamToFile introduced
- MATLAB scripts have been modified to load new capture folder structure
- began implementing MultiStreamOscilloscope
This commit is contained in:
Wiesner András 2022-05-05 00:11:48 +02:00
parent cff343e959
commit 22233ce62d
42 changed files with 1962 additions and 495 deletions

1
.gitignore vendored Normal file
View File

@ -0,0 +1 @@
*.bkup

View File

@ -1,13 +1,18 @@
cmake_minimum_required(VERSION 3.20)
cmake_minimum_required(VERSION 3.16)
SET(APP_NAME wfr)
SET(APP_NAME wfr_app)
project(${APP_NAME})
set(CMAKE_CXX_STANDARD 17)
add_executable(${APP_NAME} main.cpp src/ALSADevice.cpp src/ALSADevice.h src/MemoryPool.cpp src/MemoryPool.h utils/Semaphore.h utils/Semaphore.cpp src/SampleWriter.h src/Timestamp.h
#src/SampleReceiver.h
src/audio_types.h src/ServerBeacon.cpp src/ServerBeacon.h src/GUI.h src/globals.h src/Callback.h src/MultiStreamReceiver.cpp src/MultiStreamReceiver.h)
add_executable(${APP_NAME}
main.cpp
src/ALSADevice.cpp
src/ALSADevice.h
src/MemoryPool.cpp
src/MemoryPool.h
utils/Semaphore.h
utils/Semaphore.cpp)
find_package(Threads REQUIRED)
if (THREADS_FOUND)
@ -36,6 +41,12 @@ if (GTKMM_FOUND)
target_link_libraries(${APP_NAME} ${GTKMM_LIBRARIES})
endif(GTKMM_FOUND)
add_dependencies(${APP_NAME} wfr)
target_link_libraries(${APP_NAME} wfr)
install(TARGETS ${APP_NAME} RUNTIME DESTINATION ${PROJECT_SOURCE_DIR}/runtime ARCHIVE DESTINATION ${PROJECT_SOURCE_DIR}/runtime)
set(CMAKE_CXX_FLAGS ${CMAKE_CXX_FLAGS} "-rdynamic")
set(CMAKE_CXX_FLAGS ${CMAKE_CXX_FLAGS} "-rdynamic")
add_subdirectory(libwfr)
add_subdirectory(python)

View File

@ -2,14 +2,14 @@ function read_multistream_dir(d, prefix, plotrange, onlydata)
% get directories containing samples
hint_file_name = strcat('./runtime/', d, '/node_dir_hint.txt');
hint_file = fopen(hint_file_name);
node_dirs = textscan(hint_file, '%s');
node_files = textscan(hint_file, '%s');
fclose(hint_file);
node_names = strrep(node_dirs{1,1}, prefix, "");
node_names = strrep(node_files{1,1}, prefix, "");
node_dirs = strcat('./runtime/', d, '/', node_dirs{1,1});
node_files = strcat('./runtime/', d, '/', node_files{1,1});
sync_datasets(node_dirs, node_names, plotrange, onlydata);
sync_datasets(node_files, node_names, plotrange, onlydata);
print(strcat('./runtime/', d, '/figure.svg'), '-dsvg');
%print(strcat('./runtime/', d, '/figure.svg'), '-dsvg');
end

View File

@ -1,42 +1,46 @@
function ds = read_syncdata(DATASET_DIR)
function ds = read_syncdata(data_files)
% settings
%DATASET_DIR = 'test_20221';
TS_FILE = 'ts';
SAMPLE_FILE_PREFIX = 'ch_';
FILE_EXT = 'dat';
CHANNELS = 2;
TS_FILE_EXT = 'ts';
%SAMPLE_FILE_PREFIX = 'ch_';
SAMPLE_FILE_EXT = 'dat';
%CHANNELS = 2;
% computed values
FN_TS = strcat(DATASET_DIR, '/', TS_FILE, '.', FILE_EXT);
%for ch=0:1:(CHANNELS-1)
% FN_SAMPLE{ch+1} = strcat(data_files, '/', SAMPLE_FILE_PREFIX, num2str(ch), '.', SAMPLE_FILE_EXT);
%end
for ch=0:1:(CHANNELS-1)
FN_SAMPLE{ch+1} = strcat(DATASET_DIR, '/', SAMPLE_FILE_PREFIX, num2str(ch), '.', FILE_EXT);
end
[files_n,~] = size(data_files);
s = [];
ts = [];
for i=1:files_n
data_file = data_files(i,:);
% load timestamps
FN_TS = strcat(data_file, '.', TS_FILE_EXT);
f = fopen(FN_TS ,'r');
d = fread(f,[2,Inf],'uint32');
fclose(f);
% create fractional timestamp
d = d(1,:) + d(2,:) / 1E+09;
ts = [ts; d];
% load timestamps
f = fopen(FN_TS ,'r');
ts = fread(f,[2,Inf],'uint32');
fclose(f);
% get sample count
%sample_cnt = length(ts);
% take first element as t0
%ts(1,:) = ts(1,:) - 1636036145;
% create fractional timestamp
ts = ts(1,:) + ts(2,:) / 1E+09;
% get sample count
sample_cnt = length(ts);
% load samples
s = zeros(CHANNELS, sample_cnt);
for ch=1:CHANNELS
f = fopen(FN_SAMPLE{ch},'r');
% load samples
FN_SAMPLE = strcat(data_file, '.', SAMPLE_FILE_EXT);
f = fopen(FN_SAMPLE,'r');
d = fread(f, [1,Inf],'int16');
s(ch,:) = d;
s = [s; d];
fclose(f);
end
%figure(1)
%plot(ts(1,1:100), s(1,1:100), 'x')
%xlim([ts(1) ts(100)])

View File

@ -68,18 +68,18 @@ function sds = sync_datasets(dss, node_names, PLOTRANGE, ONLYDATA)
grid on
if (ONLYDATA)
xlabel("Idő [s]");
%xlabel("Idő [s]");
xlabel("Time [s]")
end
ylabel("Feszültség [V]");
%ylabel("Feszültség [V]");
ylabel("Voltage [V]");
xlim([ts_ref(PLOTRANGE(1)) ts_ref(PLOTRANGE(end))]);
legend_lines = {};
for i=1:length(node_names)
for ch=1:2
legend_lines{2 * (i - 1) + ch, 1} = strcat(node_names{i,1}, " CH", num2str(ch));
end
legend_lines{i,1} = node_names{i,1};
end
legend(legend_lines);
@ -116,9 +116,12 @@ function sds = sync_datasets(dss, node_names, PLOTRANGE, ONLYDATA)
end
legend(legend_lines);
ylabel("Erősítéshibával kompenzált különbség")
%ylabel("Erősítéshibával kompenzált különbség")
ylabel("Gain error compensated difference")
xlim([ts_ref(PLOTRANGE(1)) ts_ref(PLOTRANGE(end))]);
grid on
% plot timestamp errors
subplot(3,1,3);
@ -133,8 +136,10 @@ function sds = sync_datasets(dss, node_names, PLOTRANGE, ONLYDATA)
end
grid on
xlabel("Idő [s]");
ylabel("Időhiba [ns]");
xlabel("Time [s]")
%xlabel("Idő [s]");
ylabel("Time error [ns]")
%ylabel("Időhiba [ns]");
legend(legend_lines);
xlim([ts_ref(PLOTRANGE(1)) ts_ref(PLOTRANGE(end))]);

44
libwfr/CMakeLists.txt Normal file
View File

@ -0,0 +1,44 @@
cmake_minimum_required(VERSION 3.16)
project(libwfr)
set(CMAKE_CXX_STANDARD 20)
# Threads
find_package(Threads REQUIRED)
set(SOURCES
src/audio_types.h
src/Callback.h
src/MultiStreamReceiver.cpp
src/MultiStreamReceiver.h
src/SampleWriter.cpp
src/SampleWriter.h
src/ServerBeacon.cpp
src/ServerBeacon.h
src/Timestamp.h
src/Logger.cpp
src/Logger.h
src/AcquisitionFormat.cpp
src/AcquisitionFormat.h
src/ChannelBuffer.cpp
src/ChannelBuffer.h
src/MultiStreamProcessor.cpp
src/MultiStreamProcessor.h
src/MultiStreamToFile.cpp
src/MultiStreamToFile.h
src/ICreatable.h
src/MultiStreamOscilloscope.cpp
src/MultiStreamOscilloscope.h
src/Trigger.cpp
src/Trigger.h)
###
add_library(wfr ${SOURCES})
target_link_libraries(wfr Threads::Threads)
target_include_directories(wfr PUBLIC ${PROJECT_SOURCE_DIR}/../utils)
install(TARGETS wfr
LIBRARY DESTINATION ${PROJECT_SOURCE_DIR}/bin
ARCHIVE DESTINATION ${PROJECT_SOURCE_DIR}/bin)
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -fPIC")

View File

@ -0,0 +1,101 @@
//
// Created by epagris on 2022.05.03..
//
#include <sstream>
#include <cstring>
#include <set>
#include <list>
#include <vector>
#include "AcquisitionFormat.h"
#include "Logger.h"
AcquisitionFormat::AcquisitionFormat() {
memset(this, 0, sizeof(AcquisitionFormat)); // clear every field
}
AcquisitionFormat::AcquisitionFormat(const std::string &acqFormatStr) {
std::stringstream sstream(acqFormatStr);
// fill the input
sstream >> sampling_rate_Hz;
sstream >> sample_size_b;
sstream >> channel_count;
sstream >> mch_samples_per_packet;
// fill the rest...
fillDerivedFields();
}
AcquisitionFormat::AcquisitionFormat(size_t sampling_rate_Hz, size_t sample_size_b, size_t channel_count, size_t mch_samples_per_packet) {
// fill the input
this->sampling_rate_Hz = sampling_rate_Hz;
this->sample_size_b = sample_size_b;
this->channel_count = channel_count;
this->mch_samples_per_packet = mch_samples_per_packet;
// fill the rest...
fillDerivedFields();
}
void AcquisitionFormat::fillDerivedFields() {
mSettingsValid = true;
// validate sampling rate
std::set<size_t> validSamplingRates = {8000, 8021, 32000, 44100, 48000, 88200, 96000};
if (!validSamplingRates.contains(sampling_rate_Hz)) {
Logger::logLine("INVALID sampling rate of " + std::to_string(sampling_rate_Hz) + " Hz.");
mSettingsValid &= false;
}
// validate sample size
struct SampleSizeStorageSizeAssignment {
size_t sample_size_b;
size_t storage_size_B;
};
std::vector<SampleSizeStorageSizeAssignment> validSampleSizes = {
{8, 1},
{12, 2},
{16, 2},
{24, 4},
{32, 4}
};
// look for storage size
sample_storage_size_B = 0;
for (const auto& s: validSampleSizes) {
if (s.sample_size_b == sample_size_b) {
sample_storage_size_B = s.storage_size_B;
break;
}
}
if (sample_storage_size_B == 0) {
Logger::logLine("INVALID sample size of " + std::to_string(sample_size_b) + "bits.");
mSettingsValid &= false;
}
// validate channel count
if (channel_count > 8) {
Logger::logLine("WARNING Channel count of " + std::to_string(channel_count) + " is likely too large.");
}
// validate packet size
if ((sampling_rate_Hz % mch_samples_per_packet) != 0) {
Logger::logLine("INVALID multichannel sample count (now " + std::to_string(mch_samples_per_packet) +
"), this must be an integer divisor of sampling rate (" + std::to_string(sampling_rate_Hz) + " Hz)!");
mSettingsValid &= false;
}
distinct_samples_per_packet = mch_samples_per_packet * channel_count;
sample_data_size_per_packet_B = sample_storage_size_B * distinct_samples_per_packet;
single_channel_data_size_per_packet_B = mch_samples_per_packet * sample_storage_size_B;
all_channel_sample_size_B = channel_count * sample_storage_size_B;
packet_period_ns = 1E+09 * mch_samples_per_packet / (double) sampling_rate_Hz;
}
bool AcquisitionFormat::isValid() const {
return mSettingsValid;
}

View File

@ -0,0 +1,34 @@
//
// Created by epagris on 2022.05.03..
//
#ifndef WFR_APP_ACQUISITIONFORMAT_H
#define WFR_APP_ACQUISITIONFORMAT_H
#include <string>
class AcquisitionFormat {
public:
size_t sampling_rate_Hz; // sampling rate
size_t sample_size_b; // sample size in bits
size_t sample_storage_size_B; // sample storage size in bytes
size_t channel_count; // number of channels
size_t mch_samples_per_packet; // number of all-channel samples per packet
size_t distinct_samples_per_packet; // number of samples summed across all channel
size_t sample_data_size_per_packet_B; // data size summed across the full packet
size_t single_channel_data_size_per_packet_B; // data size of a single channel in the packet
size_t all_channel_sample_size_B; // sample size of a full, all-channel sample
double packet_period_ns; // time period encapsulated by a full packet
private:
bool mSettingsValid; // indicate if current settings are valid
void fillDerivedFields(); // fill fields with values derived from the input data
public:
AcquisitionFormat(); // default contr.
explicit AcquisitionFormat(const std::string& acqFormatStr); // constr. from formatt string
AcquisitionFormat(size_t sampling_rate_Hz, size_t sample_size_b, size_t channel_count, size_t mch_samples_per_packet); // constr. from fields
bool isValid() const; // are the fields represent a valid set of settings
};
#endif //WFR_APP_ACQUISITIONFORMAT_H

View File

@ -0,0 +1,239 @@
//
// Created by epagris on 2022.05.03..
//
#include <cstring>
#include "ChannelBuffer.h"
#include "Logger.h"
ChannelBuffer::ChannelBuffer(size_t blockSize, size_t blockCount, size_t elementSize) {
// check if blockSize is divisible by 4
if ((blockSize % 4) != 0) {
throw std::runtime_error(std::string("Block size must be an integral multiple of 4! ") + __FUNCTION__);
}
// fill-in parameters
mBlockSize = blockSize;
mBlockCount = blockCount;
mFullMemSize = blockSize * blockCount;
mElementSize = elementSize;
mElementsPerBlock = blockSize / elementSize;
// allocate memory
mpMem = std::shared_ptr<uint8_t>(new uint8_t[mFullMemSize]); // exception is thrown if this fails...
// clear and fill-in memory blocks
clear();
// no reader is waiting
mReaderWaiting = false;
}
void ChannelBuffer::store(const void *pSrc) {
mStoreFetch_mtx.lock(); // MUTEX!!!
BlockDescriptor bd;
if (mFreeBlocks.empty()) { // if there're no more free blocks to allocate, then drop the eldest one and shift the queue
// release block
bd = mOccupiedBlocks.front();
mOccupiedBlocks.pop_front();
// log buffer overrun
//Logger::logLine("ChannelBuffer overrun!");
} else { // if there's at least a single free block
// acquire the free block
bd = mFreeBlocks.front();
// remove from the free queue
mFreeBlocks.pop_front();
// put into the queue of occupied blocks
//mOccupiedBlocks.push(bd);
}
// store tag
mLastTag += mElementsPerBlock;
bd.tag = mLastTag;
// push back onto the queue ("move to the end of the queue")
mOccupiedBlocks.push_back(bd);
// copy content
void *pDest = bd.p;
memcpy(pDest, pSrc, mBlockSize);
// handle waiting readers
if (mReaderWaiting) {
mReaderWaiting = false;
mStoreFetch_mtx.unlock();
mReadSem.post();
} else {
mStoreFetch_mtx.unlock();
}
}
uint64_t ChannelBuffer::fetch(void *pDst, ssize_t size) {
mStoreFetch_mtx.lock(); // MUTEX!!!
// if there're no data to copy, then block
if (mOccupiedBlocks.empty()) {
mReaderWaiting = true; // signal that reader is waiting
mStoreFetch_mtx.unlock(); // release lock
mReadSem.wait(); // wait on data to become available
mStoreFetch_mtx.lock(); // re-lock mutex
}
BlockDescriptor bd;
// if there're data to copy
if (!mOccupiedBlocks.empty()) {
// get block
bd = mOccupiedBlocks.front();
void *pSrc = bd.p;
// copy content to given memory area
if (size == 0) {
memcpy(pDst, pSrc, mBlockSize);
} else if (size > 0) { // copy from the beginning of the block
memcpy(pDst, pSrc, std::min((size_t) size, mBlockSize));
} else if (size < 0) { // copy from the end of the block
size_t copySize = std::min((size_t) (labs(size)), mBlockSize); // determine copy size
uint8_t *pSrcStart = (uint8_t *) pSrc + mBlockSize - copySize; // determine beginning of area to copy
memcpy(pDst, pSrcStart, copySize); // copy
}
// remove from the queue of occupied blocks
mOccupiedBlocks.pop_front();
// add to the queue of free blocks
mFreeBlocks.push_back(bd);
} else {
throw std::runtime_error("Error, this mustn't be hit!");
}
mStoreFetch_mtx.unlock(); // release semaphore
return bd.tag;
}
void ChannelBuffer::clear() {
std::unique_lock<std::mutex> lock(mStoreFetch_mtx); // MUTEX!
// fill-in free blocks
mOccupiedBlocks.clear();
mFreeBlocks.clear();
for (size_t i = 0; i < mBlockCount; i++) {
BlockDescriptor bd = {.p = mpMem.get() + i * mBlockSize, .tag = -1};
mFreeBlocks.push_back(bd);
}
}
std::shared_ptr<const uint8_t> ChannelBuffer::fullBuffer() const {
return mpMem;
}
int64_t ChannelBuffer::getEldestTag() {
std::unique_lock<std::mutex> lock(mStoreFetch_mtx);
if (!mOccupiedBlocks.empty()) {
return mOccupiedBlocks.front().tag;
} else {
return -1;
}
}
size_t ChannelBuffer::copyBetweenTags(void *pDst, int64_t start, int64_t end) {
std::unique_lock<std::mutex> lock(mStoreFetch_mtx);
if (mOccupiedBlocks.empty() || (end < start)) {
return 0;
}
uint8_t * pDstByte = (uint8_t *) pDst;
// get eldest and youngest tag
int64_t eldestTag = mOccupiedBlocks.front().tag;
int64_t youngestTag = mOccupiedBlocks.back().tag;
// clip tags falling out of valid range
start = std::max(start, eldestTag);
end = std::min(end, youngestTag);
// iterators
std::list<BlockDescriptor>::iterator startIter, endIter;
bool startIterFound = false, endIterFound = false;
// find starting block
for (auto iter = mOccupiedBlocks.begin(); iter != mOccupiedBlocks.end() && (!startIterFound && !endIterFound); iter++) {
// select start iter
if ((iter->tag <= start) && ((iter->tag + mElementsPerBlock - 1) >= start)) {
startIter = iter;
startIterFound = true;
}
if ((iter->tag <= end) && ((iter->tag + mElementsPerBlock - 1) >= end)) {
endIter = iter;
endIterFound = true;
}
}
uint8_t *pStart;
size_t sizeCopied = 0;
// IF range resides in a single block
if (startIter == endIter) {
pStart = startIter->p + (start - startIter->tag) * mElementSize;
sizeCopied = end - start + 1;
memcpy(pDst, pStart, sizeCopied);
} else { // IF range spans across multiple blocks
auto iterAfterEnd = endIter;
iterAfterEnd++;
size_t copySize = 0;
for (auto iter = startIter; iter != iterAfterEnd; iter++) {
if (iter == startIter) { // START
pStart = iter->p + (start - iter->tag) * mElementSize;
copySize = mBlockSize - (start - iter->tag) + 1;
memcpy(pDstByte + sizeCopied, pStart, copySize);
sizeCopied += copySize;
} else if (iter == endIter) { // END
pStart = iter->p;
copySize = (end - iter->tag) + 1;
memcpy(pDstByte + sizeCopied, pStart, copySize);
sizeCopied += copySize;
} else { // INBETWEEN
pStart = iter->p;
copySize = mBlockSize;
memcpy(pDstByte + sizeCopied, pStart, copySize);
sizeCopied += copySize;
}
}
}
return sizeCopied;
}
size_t ChannelBuffer::getFreeBlockCnt() {
std::unique_lock<std::mutex> lock(mStoreFetch_mtx);
return mFreeBlocks.size();
}
int64_t ChannelBuffer::copyBlock(size_t n, void * pDst) {
std::unique_lock<std::mutex> lock(mStoreFetch_mtx);
// block not found
if ((n + 1) > mOccupiedBlocks.size()) {
return -1;
}
// block found
auto iter = mOccupiedBlocks.begin();
std::advance(iter, n);
// copy block contents
memcpy(pDst, iter->p, mBlockSize);
return iter->tag;
}

View File

@ -0,0 +1,53 @@
//
// Created by epagris on 2022.05.03..
//
#ifndef WFR_APP_CHANNELBUFFER_H
#define WFR_APP_CHANNELBUFFER_H
#include <cstddef>
#include <queue>
#include <cstdint>
#include <memory>
#include <mutex>
#include <list>
#include <Semaphore.h>
/*
* Finite-length, automatically dropping channel buffer designed for exactly one reader
*/
class ChannelBuffer {
public:
struct BlockDescriptor {
uint8_t * p;
int64_t tag;
};
private:
size_t mBlockSize; // size of a block
size_t mBlockCount; // number of blocks
size_t mFullMemSize; // allocate memory block size derived from block size and block count
size_t mElementSize; // size of a single element
size_t mElementsPerBlock; // number of elements per block
uint64_t mLastTag; // last assigned tag
std::list<BlockDescriptor> mOccupiedBlocks; // occupied blocks
std::list<BlockDescriptor> mFreeBlocks; // free blocks
std::shared_ptr<uint8_t> mpMem; // memory holding buffers (free and occupied also)
std::mutex mStoreFetch_mtx; // mutex for concurrent fetching and storing
bool mReaderWaiting; // indicates, that a reader is waiting for new data
Semaphore mReadSem; // read semaphore for blocking reader
public:
ChannelBuffer(size_t blockSize, size_t blockCount, size_t elementSize); // constr.
void store(const void *pSrc); // store into a new data block by copying from pSrc
uint64_t fetch(void * pDst, ssize_t size = 0); // fetch eldest data block and copy to pDst
std::shared_ptr<const uint8_t> fullBuffer() const; // get full buffer
void clear(); // clear block occupancies
int64_t getEldestTag(); // get the tag residing the longest in the buffer
size_t copyBetweenTags(void *pDst, int64_t start, int64_t end); // copy data to external buffer between tags (return: number of bytes copied)
size_t getFreeBlockCnt(); // get number of free blocks
int64_t copyBlock(size_t n, void * pDst); // get block (nth eldest)
};
#endif //WFR_APP_CHANNELBUFFER_H

19
libwfr/src/ICreatable.h Normal file
View File

@ -0,0 +1,19 @@
//
// Created by epagris on 2022.05.04..
//
#ifndef WFR_APP_ICREATABLE_H
#define WFR_APP_ICREATABLE_H
#include <memory>
template <typename T, typename... ParamTypes>
class ICreatable {
public:
static std::shared_ptr<T> create(ParamTypes... params) {
return std::shared_ptr<T>((T *) new T(params...));
}
};
#endif //WFR_APP_ICREATABLE_H

50
libwfr/src/Logger.cpp Normal file
View File

@ -0,0 +1,50 @@
//
// Created by epagris on 2022.05.03..
//
#include <unistd.h>
#include <fcntl.h>
#include <iostream>
#include "Logger.h"
bool Logger::mLogEn = false;
int Logger::mLogFD = -1;
bool Logger::mCloseOnStop = false;
void Logger::log(const std::string &str) {
if (mLogEn) {
write(mLogFD, str.c_str(), str.size());
}
}
void Logger::logLine(const std::string &str) {
std::string line = str + "\n";
log(line);
}
void Logger::startLogging(int fd) {
mLogFD = fd;
mLogEn = true;
}
void Logger::startLogging(const std::string &filename) {
mLogFD = open(filename.c_str(), O_WRONLY);
if (mLogFD < 0) {
std::cerr << "Could not open log output file! (" + filename + ")" << std::endl;
return;
}
mCloseOnStop = true; // close file when abandoning logging session, since we have opened it
mLogEn = true;
}
void Logger::stopLogging() {
if (mLogEn) {
mLogEn = false;
if (mCloseOnStop) { // close file belonging to us
close(mLogFD);
}
}
}

33
libwfr/src/Logger.h Normal file
View File

@ -0,0 +1,33 @@
//
// Created by epagris on 2022.05.03..
//
#ifndef WFR_APP_LOGGER_H
#define WFR_APP_LOGGER_H
#include <string>
#include <unistd.h>
class Logger {
public:
static constexpr int LOG_FD_DEF = STDOUT_FILENO; // default log file descriptor
private:
static bool mLogEn; // enable logging
static int mLogFD; // file descriptor for logging
static bool mCloseOnStop; // close file when stopping logging
private:
Logger() = default; // constr.
public:
static void startLogging(int fd = STDOUT_FILENO); // start logging on given file descriptor
static void startLogging(const std::string &filename); // start logging using a filename
static void stopLogging(); // stop logging
// logging functions
static void log(const std::string &str);
static void logLine(const std::string &str);
};
#endif //WFR_APP_LOGGER_H

View File

@ -0,0 +1,100 @@
//
// Created by epagris on 2022.05.04..
//
#include <cmath>
#include "MultiStreamOscilloscope.h"
MultiStreamOscilloscope::MultiStreamOscilloscope() {
mCapturePeriod_ns = DRAW_WINDOW_PERIOD_NS_DEF;
verticalScale = VOLT_PER_BIN_DEF;
mTrigState = {false, false};
}
void MultiStreamOscilloscope::setup(const std::vector<in_addr_t> &nodes, const AcquisitionFormat &acqFmt) {
MultiStreamProcessor::setup(nodes, acqFmt);
// calculate buffer parameters
mCaptureLength = ceil(mCapturePeriod_ns / 1E+09 * acqFmt.sampling_rate_Hz); // calculate draw window period in samples
mFIFOBlockCnt = 2 * floor(mCaptureLength / acqFmt.mch_samples_per_packet); // TODO: 2x: pretrigger...
mFIFOBlockSize = mAcqFmt.mch_samples_per_packet * sizeof(SamplePoint);
// setup channel buffers
mpChBufs.resize(mChCnt);
for (auto chBuf: mpChBufs) {
chBuf.reset(new ChannelBuffer(mFIFOBlockSize, mFIFOBlockCnt, 0));
}
// setup sample assembly buffer
mpAssemblyBuffer.reset(new SamplePoint[mAcqFmt.mch_samples_per_packet]);
// setup trigger buffer
mpTriggerBuffer.reset(new SamplePoint[mAcqFmt.mch_samples_per_packet]);
}
bool MultiStreamOscilloscope::input(size_t ch, const std::shared_ptr<Timestamp> &pTime, const void *pData, size_t size) {
if (!MultiStreamProcessor::input(ch, pTime, pData, size)) {
return false;
}
// assemble sample points
double sample;
for (size_t i = 0; i < mAcqFmt.mch_samples_per_packet; i++) {
uint8_t *pSample = ((uint8_t *) pData) + i * mAcqFmt.sample_storage_size_B; // generate sample pointer
// fetch value
switch (mAcqFmt.sample_storage_size_B) {
case 1:
sample = *((int8_t *) pSample);
break;
case 2:
sample = *((int16_t *) pSample);
break;
case 4:
sample = *((int32_t *) pSample);
break;
}
// infer vertical gain
sample *= verticalScale;
// save sample point
mpAssemblyBuffer.get()[i] = {.t = pTime.get()[i].to_ns(), .x = sample};
}
// push new block of samples onto the FIFO
mpChBufs[ch]->store(mpAssemblyBuffer.get());
// if a sufficient collection of samples are available (the FIFO is full)
// and the trigger is armed, and the trigger channel is the curent one,
// then probe samples to trigger
if (trigSettings.ch == ch && mTrigState.armed && mpChBufs[ch]->getFreeBlockCnt() == 0) {
int64_t tag = mpChBufs[ch]->copyBlock(2, mpTriggerBuffer.get()); // TODO magic constant!...
for (size_t i = 0; i < mAcqFmt.mch_samples_per_packet; i++) {
const SamplePoint& samplePoint = mpTriggerBuffer.get()[i]; // get sample
mTrigState.trigd = trigSettings.sample(samplePoint.x); // probe if causes a trigger
// triggered?
if (mTrigState.trigd) {
mTrigState.armed = false; // disable trigger system until arming
mTrigState.triggerSampleTag = tag + i; // store the sample the trigger fired on
// copy captured data
size_t preTriggerSamples = mAcqFmt.mch_samples_per_packet;
int64_t captureStart = mTrigState.triggerSampleTag - preTriggerSamples;
int64_t captureEnd = captureStart + mCaptureLength;
mpChBufs[ch]->copyBetweenTags(mpTriggerBuffer.get(), captureStart, captureEnd);
}
}
}
return true;
}
void MultiStreamOscilloscope::triggerNow() {
}
void MultiStreamOscilloscope::armTrigger() {
}

View File

@ -0,0 +1,51 @@
//
// Created by epagris on 2022.05.04..
//
#ifndef WFR_APP_MULTISTREAMOSCILLOSCOPE_H
#define WFR_APP_MULTISTREAMOSCILLOSCOPE_H
#include "ICreatable.h"
#include "MultiStreamProcessor.h"
#include "ChannelBuffer.h"
#include "Trigger.h"
class MultiStreamOscilloscope : public MultiStreamProcessor, public ICreatable<MultiStreamOscilloscope> {
public:
static constexpr size_t DRAW_WINDOW_PERIOD_NS_DEF = 50E+06; // default window length (50ms)
static constexpr double VOLT_PER_BIN_DEF = 42.80E-06; // default vertical resolution
public:
struct SamplePoint {
int64_t t; // time
double x; // sample data
};
private:
std::vector<std::shared_ptr<ChannelBuffer>> mpChBufs; // channel buffers
size_t mCapturePeriod_ns; // drawing window period
size_t mCaptureLength; // length of drawing sliding window in SAMPLES
size_t mFIFOBlockCnt; // number of blocks the FIFO consists of
size_t mFIFOBlockSize; // FIFO block size in bytes
std::shared_ptr<SamplePoint> mpAssemblyBuffer; // buffer for assembling sample points
std::shared_ptr<SamplePoint> mpTriggerBuffer; // buffer for probing against trigger conditions
std::shared_ptr<SamplePoint> mpCaptureBuffer; // buffer for last capture
public:
TriggerSettings trigSettings; // trigger settings
double verticalScale; // vertical resolution
private:
struct {
bool armed; // indicates wheter trigger is armed or not
bool trigd; // indicates if trigger has fired
int64_t triggerSampleTag;
} mTrigState;
public:
MultiStreamOscilloscope();
void triggerNow(); // make the scope trigger regardless conditions
void armTrigger(); // arm trigger for next acquisition
void setup(const std::vector<in_addr_t> &nodes, const AcquisitionFormat &acqFmt) override;
bool input(size_t ch, const std::shared_ptr<Timestamp> &pTime, const void *pData, size_t size) override;
};
#endif //WFR_APP_MULTISTREAMOSCILLOSCOPE_H

View File

@ -0,0 +1,29 @@
//
// Created by epagris on 2022.05.04..
//
#include "MultiStreamProcessor.h"
#include "Logger.h"
void MultiStreamProcessor::setup(const std::vector<in_addr_t> &nodes, const AcquisitionFormat &acqFmt) {
// get parameters
mNodes = nodes;
mChCnt = nodes.size() * acqFmt.channel_count;
mAcqFmt = acqFmt;
}
bool MultiStreamProcessor::input(size_t ch, const std::shared_ptr<Timestamp> &pTime, const void *pData, size_t size) {
// check input validity
if (ch >= mChCnt) {
Logger::logLine("INVALID channel index (" + std::to_string(ch) + ") in " + __FUNCTION__);
return false;
}
if (size > mAcqFmt.single_channel_data_size_per_packet_B) {
Logger::logLine("INVALID data size (" + std::to_string(size) + " bytes) in " + __FUNCTION__ +
". Predefined single-channel all-data size (" +
std::to_string(mAcqFmt.single_channel_data_size_per_packet_B) + " bytes) exceeded, tail clipped!");
}
return true;
}

View File

@ -0,0 +1,28 @@
//
// Created by epagris on 2022.05.04..
//
#ifndef WFR_APP_MULTISTREAMPROCESSOR_H
#define WFR_APP_MULTISTREAMPROCESSOR_H
#include <netinet/in.h>
#include <vector>
#include <memory>
#include "AcquisitionFormat.h"
#include "Timestamp.h"
class MultiStreamProcessor {
protected:
std::vector<in_addr_t> mNodes; // client node addresses
size_t mChCnt; // channel count
AcquisitionFormat mAcqFmt; // acquisition format
protected:
MultiStreamProcessor() = default;
public:
virtual void setup(const std::vector<in_addr_t> &nodes, const AcquisitionFormat &acqFmt); // setup function for adapting to data streams
virtual bool input(size_t ch, const std::shared_ptr<Timestamp> &pTime, const void *pData, size_t size); // input data for a channel
};
#endif //WFR_APP_MULTISTREAMPROCESSOR_H

View File

@ -0,0 +1,247 @@
//
// Created by epagris on 2021. 11. 22..
//
#include <arpa/inet.h>
#include <memory>
#include <thread>
#include <unistd.h>
#include <cstring>
#include "MultiStreamReceiver.h"
#include "audio_types.h"
#include "Logger.h"
MultiStreamReceiver::MultiStreamReceiver(const std::vector<in_addr_t> &nodes, size_t samplingRate, size_t sampleSize, size_t channelCount, size_t mchSampPerPckt, unsigned short port)
: MultiStreamReceiver(nodes, AcquisitionFormat(samplingRate, sampleSize, channelCount, mchSampPerPckt), port) {
}
MultiStreamReceiver::MultiStreamReceiver(const std::vector<in_addr_t> &nodes, const std::string &acqFormat_str, unsigned short port)
: MultiStreamReceiver(nodes, AcquisitionFormat(acqFormat_str), port) {
}
MultiStreamReceiver::MultiStreamReceiver(const std::vector<in_addr_t> &nodes, const AcquisitionFormat &acqFormat, unsigned short port)
: port(port), mAcqFmt(acqFormat) {
mRunning = false;
mClientNodes = nodes;
mSoc = -1; // some dummy initial value
}
MultiStreamReceiver::MultiStreamReceiver(const std::vector<std::string> &nodes_str, const AcquisitionFormat &acqFormat, unsigned short port)
: port(port), mAcqFmt(acqFormat) {
mRunning = false;
mSoc = -1;
for (auto &node_addr: nodes_str) {
mClientNodes.push_back({ inet_addr(node_addr.c_str()) });
}
}
MultiStreamReceiver::MultiStreamReceiver(const std::vector<std::string> &nodes_str, const std::string &acqFormat_str, unsigned short port)
: MultiStreamReceiver(nodes_str, AcquisitionFormat(acqFormat_str), port) {
}
MultiStreamReceiver::~MultiStreamReceiver() {
stop();
}
void MultiStreamReceiver::startNetworking() {
// open socket
mSoc = socket(AF_INET, SOCK_DGRAM, 0);
if (mSoc == -1) {
throw std::runtime_error(std::string("Could not create UDP-socket in ") + __FUNCTION__ + "!");
}
int opt = 1;
if (setsockopt(mSoc, SOL_SOCKET, SO_REUSEADDR | SO_REUSEPORT, &opt, sizeof(opt)) == -1) {
throw std::runtime_error(std::string("Could not set socket options in ") + __FUNCTION__ + "!");
}
sockaddr_in addr;
addr.sin_family = AF_INET;
addr.sin_addr.s_addr = INADDR_ANY;
addr.sin_port = htons(port);
// bind
if (bind(mSoc, (const sockaddr *) &addr, sizeof(addr)) == -1) {
throw std::runtime_error(std::string("Could not bind socket options in ") + __FUNCTION__ + "!");
}
}
void MultiStreamReceiver::start(const std::shared_ptr<MultiStreamProcessor> &pMSP) {
if (mRunning) {
return;
}
// store multistream processor
mpMSP = pMSP;
mpMSP->setup(mClientNodes, mAcqFmt);
if (mpMSP == nullptr) {
Logger::logLine(std::string("ERROR MultiStreamProcessor is NULL in ") + __FUNCTION__ + ", cannot start MultiStreamReceiver!");
return;
}
// ----------------------------
//createSampleWriters(); // TODO
startNetworking();
// ----------------------------
mRunning = true;
mRecvThread = std::make_shared<std::thread>(fnRecv, this);
}
void MultiStreamReceiver::stop() {
if (!mRunning) {
return;
}
mRunning = false;
mRecvThread->join();
close(mSoc);
mpSampleWriters.clear();
}
// --------------------------------------
void MultiStreamReceiver::fnRecv(MultiStreamReceiver *pMSR) {
// select-related structures
fd_set read_fds;
timeval tv;
// allocate receive buffer based on sample size
size_t expected_receive_size = pMSR->mAcqFmt.sample_data_size_per_packet_B + sizeof(AudioPayloadHeader);
std::shared_ptr<uint8_t> pRecvBuf(new uint8_t[expected_receive_size]);
// allocate buffers for extracted channels
std::shared_ptr<uint8_t> pChannelBuf(new uint8_t[pMSR->mAcqFmt.sample_data_size_per_packet_B]);
// if (pMSR->mSampleSize == 16) {
// pRecvBuf.reset(new uint8_t[pMSR->mStereoBufLen * 2 + sizeof(AudioPayloadHeader)]);
// } else if (pMSR->mSampleSize == 24 || pMSR->mSampleSize == 32) {
// pRecvBuf.reset(new uint8_t[pMSR->mStereoBufLen * 4 + sizeof(AudioPayloadHeader)]);
// }
// audio payload header
AudioPayloadHeader aph;
// packet timestamps
Timestamp ts[2]; // current and previous timestamp
// limit on time difference variation
double timeDiffVariation_ns = 1E+09 / pMSR->mAcqFmt.sampling_rate_Hz * TS_VALIDITY_RANGE;
// allocate buffer for interpolated timestamps
std::shared_ptr<Timestamp> pTSBuf(new Timestamp[pMSR->mAcqFmt.mch_samples_per_packet]);
while (pMSR->mRunning) {
// fd-set
FD_ZERO(&read_fds);
FD_SET(pMSR->mSoc, &read_fds);
// timeout
tv.tv_sec = 0;
tv.tv_usec = 1E+06 / 4;
// wait for data
int fd_cnt = select(pMSR->mSoc + 1, &read_fds, nullptr, nullptr, &tv);
// if data is available on the socket
if ((fd_cnt > 0) && FD_ISSET(pMSR->mSoc, &read_fds)) {
/* // read just the header
* if (read(pMSR->mSoc, (&aph), sizeof(AudioPayloadHeader)) < sizeof(AudioPayloadHeader)) {
throw std::runtime_error("Receive error!");
}
// read the rest
if ((recv_size = read(pMSR->mSoc, pRecvBuf.get(), aph.sample_cnt * aph.channel_count * pMSR->mSampleStorageSizeByte)) <= 0) {
throw std::runtime_error("Receive error!");
// TODO check buffer size equivalence
}*/
size_t recv_size;
if ((recv_size = recv(pMSR->mSoc, pRecvBuf.get(), expected_receive_size, 0)) <= 0) {
throw std::runtime_error("Receive error!");
}
// get header
memcpy(&aph, pRecvBuf.get(), sizeof(AudioPayloadHeader));
// ----------- TIME PROCESSING ---------------
// store timestamp
ts[1] = ts[0]; // shift FIFO
ts[0] = {(int64_t) aph.timestamp_s, (int64_t) aph.timestamp_ns}; // store new timestamp
// if previous timestamp is empty, then don't store samples since time interpolation can not be done
if (ts[1].empty()) {
continue;
}
// calculate step size
Timestamp d = (ts[0] - ts[1]) / (double) pMSR->mAcqFmt.mch_samples_per_packet;
// validity check on time step size
if (llabs(d.to_ns() - ((int64_t) (1E+09 / pMSR->mAcqFmt.sampling_rate_Hz))) > timeDiffVariation_ns) {
Logger::logLine("SKIPPED packet due to large packet period variation!");
continue; // skip this packet if time step is outside the valid range
}
// prepare time interpolation
Timestamp ts_interp = ts[1]; // start from the oldest one
// ------------- SAMPLE PROCESSING -------------
// decompose data into distinct channels ("transpose matrix")
uint8_t *pSampleData = pRecvBuf.get() + sizeof(AudioPayloadHeader); // get pointer on the data block
for (size_t i = 0; i < pMSR->mAcqFmt.mch_samples_per_packet; i++) { // iterate on all samples
for (size_t ch = 0; ch < pMSR->mAcqFmt.channel_count; ch++) {
// calculate source and destination indices
size_t dstIndex = pMSR->mAcqFmt.single_channel_data_size_per_packet_B * ch + // channel offset ("select column")
pMSR->mAcqFmt.sample_storage_size_B * i; // destination sample index ("select row")
size_t srcIndex = pMSR->mAcqFmt.all_channel_sample_size_B * i + // sample offset ("select row")
ch * pMSR->mAcqFmt.sample_storage_size_B; // channel offset ("select column")
// copy sample
memcpy(pChannelBuf.get() + dstIndex, pSampleData + srcIndex, pMSR->mAcqFmt.sample_storage_size_B);
}
// TIME INTERPOLATION [optimizing loops] (interpolate time for each sample)
ts_interp += d;
pTSBuf.get()[i] = ts_interp;
}
// get sample writer for the specific address
// std::shared_ptr<SampleWriter> pSampleWriter = nullptr;
for (size_t i = 0; i < pMSR->mClientNodes.size(); i++) {
if (pMSR->mClientNodes[i] == aph.addr) {
size_t ch0 = i * pMSR->mAcqFmt.channel_count; // get "first channel" of that node
// transfer channel data
for (size_t ch = 0; ch < pMSR->mAcqFmt.channel_count; ch++) {
size_t dataSize = pMSR->mAcqFmt.single_channel_data_size_per_packet_B;
uint8_t *pChData = pChannelBuf.get() + ch * dataSize; // get channel data
// pass data
pMSR->mpMSP->input(ch + ch0, pTSBuf, pChData, dataSize);
}
// pSampleWriter = pMSR->mpSampleWriters[i];
break;
}
}
// if sample writer is found
// if (pSampleWriter != nullptr) {
// std::unique_lock<std::mutex> lock(pMSR->mSampleInsertMtx);
// pSampleWriter->addSamples(pRecvBuf.get() + sizeof(AudioPayloadHeader), {aph.timestamp_s, aph.timestamp_ns}); // get sample data
// }
}
}
}

View File

@ -0,0 +1,51 @@
//
// Created by epagris on 2021. 11. 22..
//
#ifndef WFR_MULTISTREAMRECEIVER_H
#define WFR_MULTISTREAMRECEIVER_H
#include <netinet/in.h>
#include <vector>
#include <memory>
#include <thread>
#include <mutex>
#include "SampleWriter.h"
#include "AcquisitionFormat.h"
#include "MultiStreamProcessor.h"
class MultiStreamReceiver {
public:
static constexpr unsigned short DEFAULT_PORT = 20220; // default port
static constexpr double TS_VALIDITY_RANGE = 0.1; // maximal variation of packet length considered valid
private:
std::vector<in_addr_t> mClientNodes; // client node addresses
int mSoc; // UDP-socket for receiving samples
std::shared_ptr<std::thread> mRecvThread; // thread for reception
static void fnRecv(MultiStreamReceiver *pMSR); // routine function running in separate thread
bool mRunning;
std::vector<std::shared_ptr<SampleWriter>> mpSampleWriters; // sample writers for streams
//std::mutex mSampleInsertMtx; // mutex protecting against sample insertion collision when calling pSampleWriter->addSamples(...)
std::shared_ptr<MultiStreamProcessor> mpMSP; // multistream processor INSTANCE (pointer to...)
private:
AcquisitionFormat mAcqFmt; // acquisition format
private:
void startNetworking(); // start networking
public:
const unsigned short port; // port
public:
MultiStreamReceiver(const std::vector<in_addr_t> &nodes, size_t samplingRate, size_t sampleSize, size_t channelCount, size_t mchSampPerPckt, unsigned short port = DEFAULT_PORT);
MultiStreamReceiver(const std::vector<in_addr_t> &nodes, const AcquisitionFormat &acqFormat, unsigned short port = DEFAULT_PORT);
MultiStreamReceiver(const std::vector<in_addr_t> &nodes, const std::string &acqFormat_str, unsigned short port = DEFAULT_PORT);
MultiStreamReceiver(const std::vector<std::string> &nodes_str, const AcquisitionFormat &acqFormat, unsigned short port = DEFAULT_PORT);
MultiStreamReceiver(const std::vector<std::string> &nodes_str, const std::string &acqFormat_str, unsigned short port = DEFAULT_PORT);
void start(const std::shared_ptr<MultiStreamProcessor> &pMSP); // start reception
void stop(); // stop reception
virtual ~MultiStreamReceiver();
};
#endif //WFR_MULTISTREAMRECEIVER_H

View File

@ -0,0 +1,71 @@
//
// Created by epagris on 2022.05.04..
//
#include <stdexcept>
#include <unistd.h>
#include <arpa/inet.h>
#include "MultiStreamToFile.h"
#include "Logger.h"
MultiStreamToFile::MultiStreamToFile(const std::string &targetDir) {
mTargetDir = targetDir;
}
void MultiStreamToFile::createSampleWriters() {
struct stat sb;
if (stat(mTargetDir.c_str(), &sb) == 0 && S_ISDIR(sb.st_mode)) { // if directory exists then leave it as it is
//unlink(datasetName.c_str()); FIXME
} else if (mkdir(mTargetDir.c_str(), 0777) == -1) { // else: create directory for dataset
throw std::runtime_error("Could not create directory " + mTargetDir + "!");
}
if (chdir(mTargetDir.c_str()) == -1) {
throw std::runtime_error("Could not change to target directory!");
}
std::string nodeDirHint = "";
// create sample writers, ONE-PER-CHANNEL
size_t acc_ch = 0; // accumulative channel index (across all preceding nodes)
for (unsigned int &mClientNode: mNodes) {
for (size_t ch = 0; ch < mAcqFmt.channel_count; ch++) {
// create sample writer
std::string datasetName = std::string("node_") + inet_ntoa({mClientNode}) + "ch" + std::to_string(acc_ch);
mpSampleWriters.emplace_back(std::make_shared<SampleWriter>(datasetName, mAcqFmt));
// create hint
nodeDirHint += datasetName + "\n";
// increase accumulative channel count
acc_ch++;
}
}
// save hint
std::ofstream hintFile("node_dir_hint.txt", std::ios_base::out);
hintFile << nodeDirHint;
hintFile.close();
if (chdir("..") == -1) {
throw std::runtime_error("Could not change to target directory!");
}
}
void MultiStreamToFile::setup(const std::vector<in_addr_t> &nodes, const AcquisitionFormat &acqFmt) {
MultiStreamProcessor::setup(nodes, acqFmt);
// prepare for receiving data
createSampleWriters();
}
bool MultiStreamToFile::input(size_t ch, const std::shared_ptr<Timestamp> &pTime, const void *pData, size_t size) {
if (!MultiStreamProcessor::input(ch, pTime, pData, size)) {
return false;
}
// store new data
mpSampleWriters[ch]->addSamples((const uint8_t *) pData, pTime);
return true;
}

View File

@ -0,0 +1,29 @@
//
// Created by epagris on 2022.05.04..
//
#ifndef WFR_APP_MULTISTREAMTOFILE_H
#define WFR_APP_MULTISTREAMTOFILE_H
#include <memory>
#include "MultiStreamProcessor.h"
#include "SampleWriter.h"
#include "ICreatable.h"
class MultiStreamToFile : public MultiStreamProcessor, public ICreatable<MultiStreamToFile, const std::string&> {
private:
std::string mTargetDir; // target directory
std::vector<std::shared_ptr<SampleWriter>> mpSampleWriters; // sample writers for streams
private:
void createSampleWriters(); // create sample writers
public:
MultiStreamToFile(const std::string& targetDir); // constr.
public:
void setup(const std::vector<in_addr_t> &nodes, const AcquisitionFormat &acqFmt) override;
bool input(size_t ch, const std::shared_ptr<Timestamp> &pTime, const void *pData, size_t size) override;
};
#endif //WFR_APP_MULTISTREAMTOFILE_H

136
libwfr/src/SampleWriter.cpp Normal file
View File

@ -0,0 +1,136 @@
#include "SampleWriter.h"
#include "AcquisitionFormat.h"
SampleWriter::SampleWriter(const std::string &datasetName, const AcquisitionFormat &acqFmt, bool withoutTSFile)
: mAcqFmt(acqFmt), mWithoutTSFile(withoutTSFile) { // constr.
prepareBuffers();
prepareOutputFiles(datasetName);
}
SampleWriter::~SampleWriter() {
closeFiles();
}
void SampleWriter::prepareBuffers() {
// allocate timestamp data buffer
mpTs = std::shared_ptr<TimestampRecord>(new TimestampRecord[mAcqFmt.mch_samples_per_packet]);
// allocate buffer based on sample size
mpSamples = std::shared_ptr<uint8_t>(new uint8_t[mAcqFmt.single_channel_data_size_per_packet_B]);
}
void SampleWriter::prepareOutputFiles(const std::string &datasetName) {
// struct stat sb;
// if (stat(datasetName.c_str(), &sb) == 0 && S_ISDIR(sb.st_mode)) { // if directory exists then leave it as it is
// //unlink(datasetName.c_str()); FIXME
// } else if (mkdir(datasetName.c_str(), 0777) == -1) { // else: create directory for dataset
// throw std::runtime_error("Could not create directory " + datasetName + "!");
// }
//
// std::string datadir_dirsep = datasetName + DIRSEP;
// timestamps
if (!mWithoutTSFile) {
mTsFileName = datasetName + '.' + TS_EXT; // timestamp
// open timestamp file
mTsFile.open(mTsFileName, std::ios_base::binary);
if (!mTsFile.is_open()) {
throw std::runtime_error("Could not open timestamp file " + mTsFileName + "!");
}
}
// data samples
mSampleFileName = datasetName + '.' + SAMPLE_EXT;
mSampleFile.open(mSampleFileName, std::ios_base::binary);
if (!mSampleFile.is_open()) {
throw std::runtime_error("Could not open sample file " + mSampleFileName + "!");
}
}
void SampleWriter::closeFiles() {
// close file storing timestamp files
if (!mWithoutTSFile) {
mTsFile.close();
}
// close files stroring samples
mSampleFile.close();
}
void SampleWriter::addSamples(const uint8_t *pSamples, const std::shared_ptr<Timestamp> &pTime) {
/* // shift FIFO and store new timestamp
mTs[1] = mTs[0];
mTs[0] = ts;
// if previous timestamp is empty, then don't store samples since time interpolation can not be done
if (mTs[1].empty()) {
return;
}
// calculate time step size
Timestamp d = (mTs[0] - mTs[1]) / (double) mBlockSize;
// validity check on time step size
if (d.to_ns() > mSamplingPeriod_ns_UB || d.to_ns() < mSamplingPeriod_ns_LB) {
return; // skip this packet if time step is outside the valid range
}
// timestamp for time interpolation
Timestamp ts_interp = mTs[1];
// get channel pointers
int8_t *ppCh[mChN];
for (size_t i = 0; i < mChN; i++) {
ppCh[i] = mpSamples[i].get();
}
// extract samples
for (size_t i = 0; i < mBlockSize; i++) {
// select record
auto *pRawSamples = (int8_t *) (pSamples + i * mSampleDisposition);
// store sample for a channel
for (size_t ch = 0; ch < mChN; ch++) {
if (mSampleStorageSizeByte == 2) {
*((int16_t*)(ppCh[ch] + (i * mSampleStorageSizeByte))) = *((int16_t*)(pRawSamples + (ch * mSampleStorageSizeByte)));
} else if (mSampleStorageSizeByte == 4) {
*((int32_t*)(ppCh[ch] + (i * mSampleStorageSizeByte))) = *((int32_t*)(pRawSamples + (ch * mSampleStorageSizeByte)));
}
}
// step time
ts_interp += d;
// store timestamp
mpTs.get()[i] = ts_interp;
}
// write to file newly created records
*/
// write timestamps
if (!mWithoutTSFile) {
// convert timestamp
for (size_t i = 0; i < mAcqFmt.mch_samples_per_packet; i++) {
mpTs.get()[i] = pTime.get()[i];
}
// write...
mTsFile.write((const char *) mpTs.get(), mAcqFmt.mch_samples_per_packet * sizeof(TimestampRecord));
}
// write samples
mSampleFile.write((const char *) pSamples, mAcqFmt.single_channel_data_size_per_packet_B);
// increment counter
mSamplesN += mAcqFmt.mch_samples_per_packet;
}
unsigned long long SampleWriter::getSampleCnt() const {
return mSamplesN;
}
// static fields
const std::string SampleWriter::TS_EXT = "ts";
//const std::string SampleWriter::SAMPLE_FILENAME_PREFIX = "ch_";
const std::string SampleWriter::SAMPLE_EXT = "dat";

65
libwfr/src/SampleWriter.h Normal file
View File

@ -0,0 +1,65 @@
//
// Created by epagris on 2021. 11. 01..
//
#ifndef WFR_SAMPLEWRITER_H
#define WFR_SAMPLEWRITER_H
#include <fstream>
#include <string>
#include <memory>
#include <iostream>
#include <sys/stat.h>
#include <vector>
#include <set>
#include <queue>
#include "Timestamp.h"
#include "AcquisitionFormat.h"
#define DIRSEP '/'
class SampleWriter {
public:
static const std::string TS_EXT; // filename for timestamp file
//static const std::string SAMPLE_FILENAME_PREFIX; // filename prefix for sample file
static const std::string SAMPLE_EXT; // datafile extensions
public:
// timestamp record
struct TimestampRecord {
public:
uint32_t ts_s, ts_ns; // timestamp
public:
TimestampRecord &operator=(const Timestamp &ts) {
ts_s = (uint32_t) ts.s;
ts_ns = (uint32_t) ts.ns;
return *this;
}
};
private:
AcquisitionFormat mAcqFmt;
bool mWithoutTSFile; // does sample writer omit timestamp outputting?
private:
std::shared_ptr<TimestampRecord> mpTs; // timestamp records
std::shared_ptr<uint8_t> mpSamples; // sample buffer
std::string mTsFileName; // timestamp filename
std::ofstream mTsFile; // timestamp file
std::string mSampleFileName; // sample file name
std::ofstream mSampleFile; // sample output file
private:
unsigned long long mSamplesN; // number of samples written until now
private:
void prepareBuffers(); // resize sample and record buffers
void prepareOutputFiles(const std::string &datasetName); // prepare dataset environment
void closeFiles(); // close files
public:
SampleWriter(const std::string &datasetName, const AcquisitionFormat &acqFmt, bool withoutTSFile = false);
~SampleWriter();
void addSamples(const uint8_t *pSamples, const std::shared_ptr<Timestamp> &pTime); // add n samples to file
unsigned long long getSampleCnt() const; // get number of samples written out
};
#endif //WFR_SAMPLEWRITER_H

View File

@ -6,17 +6,27 @@
#include <memory>
#include <unistd.h>
#include <cstring>
#include <cmath>
#include "ServerBeacon.h"
const in_addr ServerBeacon::DEFAULT_MULTICAST_ADDR = {inet_addr("224.0.2.21")};
ServerBeacon::ServerBeacon() : mRunning(false), mScanCallback(nullptr) {
ServerBeacon::ServerBeacon() :
mRunning(false), mScanCallback(nullptr) {
setMulticastAddr(DEFAULT_MULTICAST_ADDR);
setPort(DEFAULT_PORT);
setAnnouncePeriod(DEFAULT_ANNOUNCE_PERIOD_MS);
}
ServerBeacon::ServerBeacon(const in_addr &addr, unsigned short port, size_t announcePeriod_ms) : mRunning(false), mScanCallback(nullptr) {
ServerBeacon::ServerBeacon(const in_addr &addr, unsigned short port, size_t announcePeriod_ms) :
mRunning(false), mScanCallback(nullptr) {
setMulticastAddr(addr);
setPort(port);
setAnnouncePeriod(announcePeriod_ms);
}
ServerBeacon::ServerBeacon(const std::string &addr, unsigned short port, size_t announcePeriod_ms) :
mRunning(false), mScanCallback(nullptr) {
setMulticastAddr(addr);
setPort(port);
setAnnouncePeriod(announcePeriod_ms);
@ -26,6 +36,11 @@ void ServerBeacon::setMulticastAddr(const in_addr &addr) {
mAddr = addr;
}
void ServerBeacon::setMulticastAddr(const std::string &addr_s) {
mAddr = {inet_addr(addr_s.c_str())};
}
in_addr ServerBeacon::getMulticastAddr() const {
return mAddr;
}
@ -93,7 +108,7 @@ void ServerBeacon::startBeacon() {
}
void ServerBeacon::stopBeacon() {
if (!mRunning){
if (!mRunning) {
return;
}
@ -102,12 +117,27 @@ void ServerBeacon::stopBeacon() {
close(mBeaconSoc);
}
void ServerBeacon::singleScan() {
if (mRunning) {
return;
}
// start the beacon
startBeacon();
// wait for nodes to respond
std::this_thread::sleep_for(std::chrono::milliseconds((size_t) (mAnnPeriod_ms * 0.75)));
// stop the beacon
stopBeacon();
}
std::list<in_addr> ServerBeacon::getNodesOnNetwork() {
std::list<in_addr> nodeAddrs;
mBeaconMtx.lock();
for (auto nodeInfo : mNodes) {
for (auto nodeInfo: mNodes) {
nodeAddrs.push_back(nodeInfo.addr);
}
@ -116,10 +146,22 @@ std::list<in_addr> ServerBeacon::getNodesOnNetwork() {
return nodeAddrs;
}
std::list<std::string> ServerBeacon::getNodesOnNetwork_str() {
std::list<std::string> nodeAddrs;
for (auto nodeInfo: getNodesOnNetwork()) {
nodeAddrs.emplace_back(inet_ntoa(nodeInfo));
}
return nodeAddrs;
}
// -------------------------------------------
void ServerBeacon::fn_BeaconThread(ServerBeacon *pSB) {
// log
Logger::logLine("START beacon");
// beacon message used to send and receive information
BeaconMsg msg;
@ -163,6 +205,9 @@ void ServerBeacon::fn_BeaconThread(ServerBeacon *pSB) {
clNodes.push_back(clInfo);
// log
Logger::logLine(std::string("FOUND NODE ") + inet_ntoa(clInfo.addr));
//std::cout << inet_ntoa(clInfo.addr) << std::endl;
}
@ -189,9 +234,12 @@ void ServerBeacon::fn_BeaconThread(ServerBeacon *pSB) {
(*pSB->mScanCallback)(pSB);
}
}
// log
Logger::logLine("STOP beacon");
}
void ServerBeacon::setScanCallback(const std::shared_ptr<CallbackBase<ServerBeacon *>>& scanCB) {
void ServerBeacon::setScanCallback(const std::shared_ptr<CallbackBase<ServerBeacon *>> &scanCB) {
mScanCallback = scanCB;
}
@ -208,7 +256,7 @@ unsigned short ServerBeacon::getNodeNettermPort(in_addr addr) {
int nettermPort = -1;
for (const auto& nodeInfo : mNodes) {
for (const auto &nodeInfo: mNodes) {
if (nodeInfo.addr.s_addr == addr.s_addr) {
nettermPort = nodeInfo.terminalPort;
break;
@ -221,45 +269,142 @@ unsigned short ServerBeacon::getNodeNettermPort(in_addr addr) {
throw std::runtime_error(std::string("Node not found in database! (") + inet_ntoa(addr) + ")!");
}
return (unsigned short)nettermPort;
return (unsigned short) nettermPort;
}
void ServerBeacon::sendNettermCmd(in_addr addr, unsigned short port, const std::string &cmd) {
#define MAX_TERM_LINE_LENGTH (127)
std::string ServerBeacon::sendNettermCmd(in_addr addr, unsigned short port, const std::string &cmd, bool expectResp) {
int soc = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP);
if (soc == -1) {
throw std::runtime_error("Could not create TCP socket to execute netterm commands!");
}
// return value
std::string ret_resp;
// connect to server
sockaddr_in serverAddr;
memset(&serverAddr, 0, sizeof(sockaddr_in));
serverAddr.sin_addr = addr;
serverAddr.sin_port = htons(port);
serverAddr.sin_family = AF_INET;
if (connect(soc, (const sockaddr *)&serverAddr, sizeof(sockaddr_in)) == -1) {
if (connect(soc, (const sockaddr *) &serverAddr, sizeof(sockaddr_in)) == -1) {
throw std::runtime_error(std::string("Could not connect to netterm! (") + inet_ntoa(addr) + ":" + std::to_string(port) + ")");
}
static std::string nodeftty_cmd = "nodeftty";
send(soc, "nodeftty", nodeftty_cmd.size(), 0);
static std::string nodeftty_cmd = "nodeftty\n";
send(soc, nodeftty_cmd.c_str(), nodeftty_cmd.size(), 0);
// send command
send(soc, cmd.c_str(), cmd.size(), 0);
std::string cmd_nl = cmd + "\n";
send(soc, cmd_nl.c_str(), cmd_nl.size(), 0);
// log
Logger::logLine(std::string("CMD '") + cmd + "' ON " + inet_ntoa(addr));
if (expectResp) {
// wait for response
fd_set read_fds;
timeval tv = {1, 0}; // 1s timeout
FD_ZERO(&read_fds);
FD_SET(soc, &read_fds);
if (select(soc + 1, &read_fds, nullptr, nullptr, &tv) <= 0) { // if we have NO response
Logger::logLine(std::string(" -->NO RESPONSE"));
} else if(FD_ISSET(soc, &read_fds)) { // if we have response
char sResp[MAX_TERM_LINE_LENGTH + 1];
size_t resp_len = recv(soc, sResp, MAX_TERM_LINE_LENGTH, 0);
sResp[resp_len] = '\0';
ret_resp = std::string(sResp);
// clip trailing \r\n to [r][n]
if (ret_resp.ends_with("\r\n")) {
ret_resp = ret_resp.substr(0, ret_resp.size() - 2);
}
// log
Logger::logLine(std::string(" -->RESP '") + ret_resp + "'");
}
}
// close connection gracefully
std::string cmdExit = "exit\n";
send(soc, cmdExit.c_str(), cmdExit.size(), 0);
// wait for response
fd_set read_fds;
timeval tv = {1, 0}; // 1s timeout
FD_ZERO(&read_fds);
FD_SET(soc, &read_fds);
if (select(soc + 1, &read_fds, nullptr, nullptr, &tv) <= 0) {
Logger::logLine(std::string(" -->FORCED CLOSE")); // log
}
// close the connection finally
close(soc);
return ret_resp;
}
void ServerBeacon::execCmdOnNode(in_addr addr, const std::string &cmd) {
unsigned short port = getNodeNettermPort(addr);
sendNettermCmd(addr, port, cmd);
sendNettermCmd(addr, port, cmd, false);
}
void ServerBeacon::execCmdOnAllTerms(const std::string &cmd) {
void ServerBeacon::execCmdOnNode(const std::string &addr_str, const std::string &cmd) {
in_addr node_addr = {inet_addr(addr_str.c_str())};
execCmdOnNode(node_addr, cmd);
}
void ServerBeacon::execCmdOnAllNodes(const std::string &cmd) {
mBeaconMtx.lock();
for (const auto& nodeInfo : mNodes) {
sendNettermCmd(nodeInfo.addr, nodeInfo.terminalPort, cmd);
for (const auto &nodeInfo: mNodes) {
sendNettermCmd(nodeInfo.addr, nodeInfo.terminalPort, cmd, false);
}
mBeaconMtx.unlock();
}
}
std::string ServerBeacon::queryFromNode(in_addr addr, const std::string &cmd) {
unsigned short port = getNodeNettermPort(addr);
return sendNettermCmd(addr, port, cmd, true);
}
std::string ServerBeacon::queryFromNode(const std::string &addr_str, const std::string &cmd) {
in_addr node_addr = {inet_addr(addr_str.c_str())};
return queryFromNode(node_addr, cmd);
}
std::map<in_addr_t, std::string> ServerBeacon::queryFromAllNodes(const std::string &cmd) {
std::map<in_addr_t, std::string> res;
mBeaconMtx.lock();
for (const auto &nodeInfo: mNodes) {
res[nodeInfo.addr.s_addr] = sendNettermCmd(nodeInfo.addr, nodeInfo.terminalPort, cmd, true);
}
mBeaconMtx.unlock();
return res;
}
std::map<std::string, std::string> ServerBeacon::queryFromAllNodes_str(const std::string &cmd) {
std::map<std::string, std::string> res;
mBeaconMtx.lock();
for (const auto &nodeInfo: mNodes) {
res[inet_ntoa(nodeInfo.addr)] = sendNettermCmd(nodeInfo.addr, nodeInfo.terminalPort, cmd, true);
}
mBeaconMtx.unlock();
return res;
}

View File

@ -11,7 +11,9 @@
#include <arpa/inet.h>
#include <thread>
#include <mutex>
#include <map>
#include "Callback.h"
#include "Logger.h"
class ServerBeacon {
public:
@ -47,11 +49,13 @@ private:
uint8_t server_nClient; // server or client
};
private:
void sendNettermCmd(in_addr addr, unsigned short port, const std::string& cmd); // send netterm cmd
std::string sendNettermCmd(in_addr addr, unsigned short port, const std::string &cmd, bool expectResp); // send netterm cmd
public:
ServerBeacon(); // constr.
explicit ServerBeacon(const in_addr& addr, unsigned short port, size_t announcePeriod_ms); // constr. with beacon port and announce period
ServerBeacon(const in_addr& addr, unsigned short port, size_t announcePeriod_ms); // constr. with beacon port and announce period
ServerBeacon(const std::string& addr, unsigned short port, size_t announcePeriod_ms); // constr. with beacon port and announce period
void setMulticastAddr(const in_addr& addr); // set multicasting address
void setMulticastAddr(const std::string& addr_s); // set multicast address (string)
in_addr getMulticastAddr() const; // set multicasting address
void setPort(unsigned short port); // set port BEFORE starting beacon
unsigned short getPort() const; // get beacon port
@ -61,11 +65,18 @@ public:
std::string getInterfaceAddr() const; // get multicast interface address
void startBeacon(); // start the beacon
void stopBeacon(); // stop the beacon
void singleScan(); // initiate a single scan
std::list<in_addr> getNodesOnNetwork(); // get nodes connected to the same network
unsigned short getNodeNettermPort(in_addr addr); // get network terminal port of ndoe
std::list<std::string> getNodesOnNetwork_str(); // get nodes connected to the same network (list of string)
unsigned short getNodeNettermPort(in_addr addr); // get network terminal port of node
void setScanCallback(const std::shared_ptr<CallbackBase<ServerBeacon *>>& scanCB); // set scan callback
void execCmdOnNode(in_addr addr, const std::string& cmd); // execute command on a node through netterm
void execCmdOnAllTerms(const std::string& cmd); // multicast command to all netterms
void execCmdOnNode(in_addr addr, const std::string& cmd); // execute command on a node through netterm AND DON'T expect a response
void execCmdOnNode(const std::string& addr_str, const std::string& cmd);
std::string queryFromNode(in_addr addr, const std::string& cmd); // --- " ---- AND expect a response
std::string queryFromNode(const std::string& addr_str, const std::string& cmd);
void execCmdOnAllNodes(const std::string& cmd); // multicast command to all netterms
std::map<in_addr_t, std::string> queryFromAllNodes(const std::string& cmd); // --- " ---- AND expect a response
std::map<std::string, std::string> queryFromAllNodes_str(const std::string& cmd); // (string address)
};

55
libwfr/src/Trigger.cpp Normal file
View File

@ -0,0 +1,55 @@
//
// Created by epagris on 2022.05.04..
//
#include "Trigger.h"
TriggerSettings::TriggerSettings() {
TriggerSettings::reset();
}
void TriggerSettings::reset() {
mFirstSample = true;
ch = 0;
}
bool TriggerSettings::sample(double x) {
return false; // never trigger
}
// ------------------------
SlopeTrigger::SlopeTrigger() {
level = 0;
slope = RISING;
SlopeTrigger::reset();
}
void SlopeTrigger::reset() {
TriggerSettings::reset();
x_prev = 0;
}
bool SlopeTrigger::sample(double x) {
// do not act on first sample
if (mFirstSample) {
mFirstSample = false;
x_prev = x;
return false;
}
bool trig = false;
// on every other sample...
if (((slope == RISING) && (x_prev < level && x > level)) ||
((slope == FALLING) && (x_prev > level && x < level))) {
trig = true;
}
x_prev = x;
return trig;
}

38
libwfr/src/Trigger.h Normal file
View File

@ -0,0 +1,38 @@
//
// Created by epagris on 2022.05.04..
//
#ifndef WFR_APP_TRIGGER_H
#define WFR_APP_TRIGGER_H
#include <cstddef>
// Dummy trigger base
struct TriggerSettings {
protected:
bool mFirstSample; // indicates the latest sample was the first since the last reset
public:
size_t ch; // channel to probe
public:
TriggerSettings();
virtual void reset(); // reset trigger internal state
virtual bool sample(double x); // insert sample into the trigger
};
// Simple slope trigger
struct SlopeTrigger : public TriggerSettings {
public:
enum SlopeType { RISING, FALLING };
private:
double x_prev; // previous sample
public:
double level; // trigger level
SlopeType slope; // slope direction
public:
SlopeTrigger(); // constr.
void reset() override;
bool sample(double x) override;
};
#endif //WFR_APP_TRIGGER_H

28
libwfr/src/audio_types.h Normal file
View File

@ -0,0 +1,28 @@
//
// Created by epagris on 2021. 11. 02..
//
#ifndef WFR_AUDIO_TYPES_H
#define WFR_AUDIO_TYPES_H
#include <cstdint>
#define PERIOD_LEN_DEF (STEREO_BUF_LEN / 2)
#define MCH_SAMP_PER_PCKT_DEF (192)
#define SAMPLE_RATE_DEF (48000)
#define SAMPLE_SIZE_DEF (16)
#define CHANNEL_COUNT_DEF (2)
//typedef int16_t AudioSampleType16_DEF;
typedef struct {
uint32_t timestamp_s, timestamp_ns; // timestamp
uint32_t sample_cnt; // count of all-channel samples in packet
uint16_t sample_size; // size of a single (mono) sample
uint16_t channel_count; // number of channels
in_addr_t addr; // client node address
//AudioSampleType16_DEF pData[MCH_SAMP_PER_PCKT_DEF]; // buffer for stereo data
} AudioPayloadHeader;
#endif //WFR_AUDIO_TYPES_H

View File

@ -10,9 +10,9 @@
#include <sys/socket.h>
#include <netinet/in.h>
#include "src/ALSADevice.h"
#include "src/SampleWriter.h"
#include "src/audio_types.h"
#include "src/ServerBeacon.h"
#include "libwfr/src/SampleWriter.h"
#include "libwfr/src/audio_types.h"
#include "libwfr/src/ServerBeacon.h"
#include "src/GUI.h"
//uint8_t pRecvBuf[8096] __attribute__ ((aligned (32)));
@ -28,6 +28,10 @@ int main(int argc, char * argv[]) {
std::string mcastIFAddr = argv[2];
unsigned short beaconPort = atoi(argv[1]);
Logger::startLogging();
auto a = MultiStreamToFile::create("asd");
// -----------------
//ALSADevice dev(PERIOD_LEN, SAMPLE_RATE);

30
python/CMakeLists.txt Normal file
View File

@ -0,0 +1,30 @@
cmake_minimum_required(VERSION 3.16)
project(wfr-python-extension)
set(Python_ADDITIONAL_VERSIONS 3.8)
find_package(Python3 COMPONENTS Interpreter Development)
find_package(PythonLibs REQUIRED)
#list(APPEND CMAKE_MODULE_PATH "/home/epagris/.local/lib/python3.8/site-packages/skbuild/resources/cmake/")
#find_package(PythonExtensions REQUIRED)
find_package(pybind11 REQUIRED)
set(CMAKE_CXX_STANDARD 17)
set(SRC
src/wfsmodule.cpp)
pybind11_add_module(pywfs src/wfsmodule.cpp)
target_include_directories(pywfs PUBLIC ${PROJECT_SOURCE_DIR}/../libwfr/src)
add_dependencies(pywfs wfr)
target_compile_definitions(pywfs PRIVATE VERSION_INFO=${EXAMPLE_VERSION_INFO})
target_include_directories(pywfs PUBLIC ${PYTHON_INCLUDE_DIRS})
target_link_libraries(pywfs PUBLIC wfr)
set_target_properties(pywfs PROPERTIES SUFFIX ".so")
install(TARGETS pywfs DESTINATION ${PROJECT_SOURCE_DIR}/module)

90
python/src/wfsmodule.cpp Normal file
View File

@ -0,0 +1,90 @@
#include <pybind11/pybind11.h>
#include <pybind11/stl.h>
#include <ServerBeacon.h>
#include "AcquisitionFormat.h"
#include "MultiStreamToFile.h"
#include "MultiStreamReceiver.h"
#define STRINGIFY(x) #x
#define MACRO_STRINGIFY(x) STRINGIFY(x)
int add(int i, int j) {
return i + j;
}
namespace py = pybind11;
using namespace py::literals;
PYBIND11_MODULE(pywfs, m) {
m.doc() = R"pbdoc(
WaveFormStream extension
-----------------------
)pbdoc";
// py::class_<in_addr>(m, "in_addr")
// .def(py::init<>())
// .def_readwrite("s_addr", &in_addr::s_addr);
// Logger
py::class_<Logger>(m, "Logger")
.def_static("start", static_cast<void (*)(int)>(&Logger::startLogging), py::arg("fd") = STDOUT_FILENO)
.def_static("stop", &Logger::stopLogging);
// ServerBeacon
py::class_<ServerBeacon>(m, "ServerBeacon")
.def(py::init<>())
.def("setMulticastAddr", static_cast<void (ServerBeacon::*)(const std::string &)>(&ServerBeacon::setMulticastAddr))
.def("setInterfaceAddr", static_cast<void (ServerBeacon::*)(const std::string &)>(&ServerBeacon::setInterfaceAddr))
.def("singleScan", &ServerBeacon::singleScan)
.def("getNodesOnNetwork", &ServerBeacon::getNodesOnNetwork_str)
.def("execCmdOnNode", static_cast<void (ServerBeacon::*)(const std::string &, const std::string &)>(&ServerBeacon::execCmdOnNode))
.def("queryFromNode", static_cast<std::string (ServerBeacon::*)(const std::string &, const std::string &)>(&ServerBeacon::queryFromNode))
.def("execCmdOnAllNodes", &ServerBeacon::execCmdOnAllNodes)
.def("queryFromAllNodes", &ServerBeacon::queryFromAllNodes_str);
// AquisitionFormat
py::class_<AcquisitionFormat>(m, "AcquisitionFormat")
.def(py::init<const std::string>())
.def(py::init<size_t, size_t, size_t, size_t>())
.def_readonly("sampling_rate_Hz", &AcquisitionFormat::sampling_rate_Hz)
.def_readonly("sample_size_b", &AcquisitionFormat::sample_size_b)
.def_readonly("sameple_storage_size_B", &AcquisitionFormat::sample_storage_size_B)
.def_readonly("channel_count", &AcquisitionFormat::channel_count)
.def_readonly("mch_sample_per_packet", &AcquisitionFormat::mch_samples_per_packet)
.def_readonly("distinct_samples_per_packet", &AcquisitionFormat::distinct_samples_per_packet)
.def_readonly("sample_data_size_per_packet", &AcquisitionFormat::sample_data_size_per_packet_B)
.def_readonly("single_channel_data_size_per_packet_B", &AcquisitionFormat::single_channel_data_size_per_packet_B)
.def_readonly("all_channel_sample_size_B", &AcquisitionFormat::all_channel_sample_size_B)
.def_readonly("packet_period_ns", &AcquisitionFormat::packet_period_ns)
.def("isValid", &AcquisitionFormat::isValid);
// MultiStreamReceiver
py::class_<MultiStreamReceiver>(m, "MultiStreamReceiver")
.def(py::init<std::vector<std::string>, const AcquisitionFormat &, unsigned short>(), "nodes_str"_a, "acqFormat"_a, "port"_a = MultiStreamReceiver::DEFAULT_PORT)
.def(py::init<std::vector<std::string>, const std::string &, unsigned short>(), "nodes_str"_a, "acqFormat_str"_a, "port"_a = MultiStreamReceiver::DEFAULT_PORT)
.def("listen", &MultiStreamReceiver::start)
.def("close", &MultiStreamReceiver::stop);
// MultiStreamProcessor (abstract class)
py::class_<MultiStreamProcessor, std::shared_ptr<MultiStreamProcessor>> MSP(m, "MultiStreamProcessor");
// MultiStreamToFile
py::class_<MultiStreamToFile, std::shared_ptr<MultiStreamToFile>, MultiStreamProcessor>(m, "MultiStreamToFile")
.def_static("create", &MultiStreamToFile::create);
//
// m.def("subtract", [](int i, int j) { return i - j; }, R"pbdoc(
// Subtract two numbers
// Some other explanation about the subtract function.
// )pbdoc");
#ifdef VERSION_INFO
m.attr("__version__") = MACRO_STRINGIFY(VERSION_INFO);
#else
m.attr("__version__") = "dev";
#endif
}

8
python/src/wfsmodule.h Normal file
View File

@ -0,0 +1,8 @@
//
// Created by epagris on 2022.05.01..
//
#ifndef WFR_APP_WFSMODULE_H
#define WFR_APP_WFSMODULE_H
#endif //WFR_APP_WFSMODULE_H

View File

@ -11,10 +11,11 @@
#include <gtk/gtk.h>
#include <gtkmm-3.0/gtkmm.h>
#include "ServerBeacon.h"
#include "../libwfr/src/ServerBeacon.h"
#include "globals.h"
#include "Callback.h"
#include "MultiStreamReceiver.h"
#include "../libwfr/src/Callback.h"
#include "../libwfr/src/MultiStreamReceiver.h"
#include "../libwfr/src/MultiStreamToFile.h"
class GUI : public Gtk::Application {
public:
@ -261,11 +262,12 @@ public:
wStartStopCaptureBtn->set_label("Stop capture");
mFlags.captureRunning = true;
mpMSR = std::make_shared<MultiStreamReceiver>(mClNodeTW->getSelectedLines(), wFolderChooser->get_filename());
mpMSR->start();
mGlobs->beacon->execCmdOnAllTerms(std::string("snd connect ") + mGlobs->beacon->getInterfaceAddr() + " " + std::to_string(mpMSR->port));
mpMSR = std::make_shared<MultiStreamReceiver>(mClNodeTW->getSelectedLines(), SAMPLE_RATE_DEF, SAMPLE_SIZE_DEF, CHANNEL_COUNT_DEF, MCH_SAMP_PER_PCKT_DEF);
mpMSR->start(std::make_shared<MultiStreamToFile>(wFolderChooser->get_filename()));
//mpMSR->start(std::shared_ptr<MultiStreamProcessor>((MultiStreamProcessor*) new MultiStreamToFile(wFolderChooser->get_filename())));
mGlobs->beacon->execCmdOnAllNodes(std::string("snd connect ") + mGlobs->beacon->getInterfaceAddr() + " " + std::to_string(mpMSR->port));
} else {
mGlobs->beacon->execCmdOnAllTerms("snd disconnect");
mGlobs->beacon->execCmdOnAllNodes("snd disconnect");
mpMSR = nullptr;
wFolderChooser->set_sensitive(true);

View File

@ -1,154 +0,0 @@
//
// Created by epagris on 2021. 11. 22..
//
#include <arpa/inet.h>
#include <memory>
#include <thread>
#include <unistd.h>
#include "MultiStreamReceiver.h"
#include "audio_types.h"
MultiStreamReceiver::MultiStreamReceiver(const std::vector<in_addr_t> &nodes, const std::string& targetDir, unsigned short port) : port(port) {
mTargetDir = targetDir;
mRunning = false;
mClientNodes = nodes;
}
MultiStreamReceiver::~MultiStreamReceiver() {
stop();
}
void MultiStreamReceiver::startNetworking() {
// open socket
mSoc = socket(AF_INET, SOCK_DGRAM, 0);
if (mSoc == -1) {
throw std::runtime_error(std::string("Could not create UDP-socket in ") + __FUNCTION__ + "!");
}
int opt = 1;
if (setsockopt(mSoc, SOL_SOCKET, SO_REUSEADDR | SO_REUSEPORT, &opt, sizeof(opt)) == -1) {
throw std::runtime_error(std::string("Could not set socket options in ") + __FUNCTION__ + "!");
}
sockaddr_in addr;
addr.sin_family = AF_INET;
addr.sin_addr.s_addr = INADDR_ANY;
addr.sin_port = htons(port);
// bind
if (bind(mSoc, (const sockaddr *)&addr, sizeof(addr)) == -1) {
throw std::runtime_error(std::string("Could not bind socket options in ") + __FUNCTION__ + "!");
}
}
void MultiStreamReceiver::createSampleWriters() {
struct stat sb;
if (stat(mTargetDir.c_str(), &sb) == 0 && S_ISDIR(sb.st_mode)) { // if directory exists then leave it as it is
//unlink(datasetName.c_str()); FIXME
} else if (mkdir(mTargetDir.c_str(), 0777) == -1) { // else: create directory for dataset
throw std::runtime_error("Could not create directory " + mTargetDir + "!");
}
if (chdir(mTargetDir.c_str()) == -1) {
throw std::runtime_error("Could not change to target directory!");
}
std::string nodeDirHint = "";
for (unsigned int & mClientNode : mClientNodes) {
std::string datasetName = std::string("node_") + inet_ntoa({ mClientNode });
mpSampleWriters.emplace_back(std::make_shared<SampleWriter<int16_t>>(datasetName, 2, STEREO_BUF_LEN / 2));
nodeDirHint += datasetName + "\n";
}
std::ofstream hintFile("node_dir_hint.txt", std::ios_base::out);
hintFile << nodeDirHint;
hintFile.close();
if (chdir("..") == -1) {
throw std::runtime_error("Could not change to target directory!");
}
}
void MultiStreamReceiver::start() {
if (mRunning) {
return;
}
// ----------------------------
createSampleWriters();
startNetworking();
// ----------------------------
mRunning = true;
mRecvThread = std::make_shared<std::thread>(fnRecv, this);
}
void MultiStreamReceiver::stop() {
if (!mRunning) {
return;
}
mRunning = false;
mRecvThread->join();
close(mSoc);
mpSampleWriters.clear();
}
// --------------------------------------
#define RECV_BUFFER_SIZE (16000)
void MultiStreamReceiver::fnRecv(MultiStreamReceiver *pMSR) {
// select-related structures
fd_set read_fds;
timeval tv;
// receive buffer
std::shared_ptr<uint32_t> pWordBuf = std::shared_ptr<uint32_t>(new uint32_t[(RECV_BUFFER_SIZE / 4) + 1]); // to have 4-byte alignment
auto * pRecvBuf = reinterpret_cast<uint8_t *>(pWordBuf.get());
auto * pAudioPayload = reinterpret_cast<AudioPayload *>(pWordBuf.get());
while (pMSR->mRunning) {
// fd-set
FD_ZERO(&read_fds);
FD_SET(pMSR->mSoc, &read_fds);
// timeout
tv.tv_sec = 0;
tv.tv_usec = 1E+06 / 4;
// wait for data
int fd_cnt = select(pMSR->mSoc + 1, &read_fds, nullptr, nullptr, &tv);
// if data is available on the socket
if ((fd_cnt > 0) && FD_ISSET(pMSR->mSoc, &read_fds)) {
ssize_t recv_size;
if ((recv_size = recv(pMSR->mSoc, pRecvBuf, RECV_BUFFER_SIZE, 0)) <= 0) {
throw std::runtime_error("Receive error!");
}
// get sample writer for the specific address
std::shared_ptr<SampleWriter<int16_t>> pSampleWriter = nullptr;
for (size_t i = 0; i < pMSR->mClientNodes.size(); i++) {
if (pMSR->mClientNodes[i] == pAudioPayload->addr) {
pSampleWriter = pMSR->mpSampleWriters[i];
break;
}
}
// if sample writer is found
if (pSampleWriter != nullptr) {
pSampleWriter->addSamples(pAudioPayload->pData, { pAudioPayload->timestamp_s, pAudioPayload->timestamp_ns });
}
}
}
}

View File

@ -1,41 +0,0 @@
//
// Created by epagris on 2021. 11. 22..
//
#ifndef WFR_MULTISTREAMRECEIVER_H
#define WFR_MULTISTREAMRECEIVER_H
#include <netinet/in.h>
#include <vector>
#include <memory>
#include <thread>
#include "SampleWriter.h"
class MultiStreamReceiver {
public:
static constexpr unsigned short DEFAULT_PORT = 20220; // default port
private:
std::vector<in_addr_t> mClientNodes; // client node addresses
int mSoc; // UDP-socket for receiving samples
std::shared_ptr<std::thread> mRecvThread; // thread for reception
static void fnRecv(MultiStreamReceiver * pMSR); // routine function running in separate thread
bool mRunning;
std::vector<std::shared_ptr<SampleWriter<int16_t>>> mpSampleWriters; // sample writers for streams
std::string mTargetDir; // target directory
private:
void startNetworking(); // start networking
void createSampleWriters(); // construct sample writers
public:
const unsigned short port; // port
public:
explicit MultiStreamReceiver(const std::vector<in_addr_t>& nodes, const std::string& targetDir, unsigned short port = DEFAULT_PORT);
void start(); // start reception
void stop();
virtual ~MultiStreamReceiver();
// stop reception
};
#endif //WFR_MULTISTREAMRECEIVER_H

View File

@ -6,7 +6,7 @@
#define WFR_SAMPLERECEIVER_H
#include "MemoryPool.h"
#include "Timestamp.h"
#include "../libwfr/src/Timestamp.h"
#include <vector>

View File

@ -1,195 +0,0 @@
//
// Created by epagris on 2021. 11. 01..
//
#ifndef WFR_SAMPLEWRITER_H
#define WFR_SAMPLEWRITER_H
#include <fstream>
#include <string>
#include <memory>
#include <iostream>
#include <sys/stat.h>
#include <vector>
#include <set>
#include <queue>
#include "Timestamp.h"
#define DIRSEP '/'
template<typename T> // T: sample datatype
class SampleWriter {
public:
static const std::string TS_FILENAME; // filename for timestamp file
static const std::string SAMPLE_FILENAME_PREFIX; // filename prefix for sample file
static const std::string FILE_EXT; // datafile extensions
public:
// timestamp record
struct TimestampRecord {
public:
uint32_t ts_s, ts_ns; // timestamp
public:
TimestampRecord &operator=(const Timestamp &ts) {
ts_s = (uint32_t) ts.s;
ts_ns = (uint32_t) ts.ns;
return *this;
}
};
private:
const size_t mChN; // number of channel PAIRS, immutable
const size_t mBlockSize; // buffer size, immutable
private:
std::shared_ptr<TimestampRecord> mpTs; // timestamp records
std::vector<std::shared_ptr<T>> mpSamples; // sample buffer array
std::string mTsFileName; // timestamp filename
std::ofstream mTsFile; // timestamp file
std::vector<std::string> mSampleFileNames; // sample file names
std::vector<std::ofstream> mSampleFiles; // sample output files
private:
Timestamp mTs[2]; // previous and current timestamp
private:
unsigned long long mSamplesN; // number of samples written until now
private:
// resize sample and record buffers
void prepareBuffers() {
// allocate timestamp data buffer
mpTs = std::shared_ptr<TimestampRecord>(new TimestampRecord[mBlockSize]);
// allocate sample data buffers
mpSamples.resize(mChN);
for (size_t i = 0; i < mChN; i++) {
mpSamples[i] = std::shared_ptr<T>(new T[mBlockSize]);
}
}
// prepare dataset environment
void prepareDatadir(const std::string &datasetName) {
struct stat sb;
if (stat(datasetName.c_str(), &sb) == 0 && S_ISDIR(sb.st_mode)) { // if directory exists then leave it as it is
//unlink(datasetName.c_str()); FIXME
} else if (mkdir(datasetName.c_str(), 0777) == -1) { // else: create directory for dataset
throw std::runtime_error("Could not create directory " + datasetName + "!");
}
std::string datadir_dirsep = datasetName + DIRSEP;
// generate file names
mTsFileName = TS_FILENAME + '.' + FILE_EXT; // timestamp
// samples
mSampleFileNames.resize(mChN);
for (size_t i = 0; i < mChN; i++) {
mSampleFileNames[i] = SAMPLE_FILENAME_PREFIX + std::to_string(i) + '.' + FILE_EXT;
}
// open timestamp file
mTsFile.open(datadir_dirsep + mTsFileName, std::ios_base::binary);
if (!mTsFile.is_open()) {
throw std::runtime_error("Could not open timestamp file " + mTsFileName + "!");
}
// open sample files
mSampleFiles.resize(mChN);
for (size_t i = 0; i < mChN; i++) {
mSampleFiles[i].open(datadir_dirsep + mSampleFileNames[i], std::ios_base::binary);
if (!mSampleFiles[i].is_open()) {
throw std::runtime_error("Could not open sample file " + mSampleFileNames[i] + "!");
}
}
}
// close files
void closeFiles() {
// close file storing timestamp files
mTsFile.close();
// close files stroring samples
for (size_t i = 0; i < mChN; i++) {
mSampleFiles[i].close();
}
}
public:
explicit SampleWriter(const std::string &datasetName, size_t chN, size_t bufSize) : mChN(chN), mBlockSize(bufSize), mSamplesN(0) { // constr.
prepareBuffers();
prepareDatadir(datasetName);
}
~SampleWriter() {
closeFiles();
}
// add n samples to file
void addSamples(const T *pSamples, const Timestamp &ts) {
// shift FIFO and store new timestamp
mTs[1] = mTs[0];
mTs[0] = ts;
// if previous timestamp is empty, then don't store samples since time interpolation can not be done
if (mTs[1].empty()) {
return;
}
// calculate time step size
Timestamp d = (mTs[0] - mTs[1]) / (double) mBlockSize;
// timestamp for time interpolation
Timestamp ts_interp = mTs[1];
// get channel pointers
T *ppCh[mChN];
for (size_t i = 0; i < mChN; i++) {
ppCh[i] = mpSamples[i].get();
}
// extract samples
for (size_t i = 0; i < mBlockSize; i++) {
// select record
T *pRawSamples = (short *) (pSamples + i * mChN);
// store sample for a channel
for (size_t ch = 0; ch < mChN; ch++) {
ppCh[ch][i] = pRawSamples[ch];
}
// step time
ts_interp += d;
// store timestamp
mpTs.get()[i] = ts_interp;
}
// write to file newly created records
// write timestamps
mTsFile.write((const char *) mpTs.get(), mBlockSize * sizeof(TimestampRecord));
// write samples
for (size_t ch = 0; ch < mChN; ch++) {
mSampleFiles[ch].write((const char *) ppCh[ch], mBlockSize * sizeof(T));
}
// increment counter
mSamplesN += mBlockSize;
}
// get number of samples written out
unsigned long long getSampleCnt() const {
return mSamplesN;
}
};
// static fields
template<typename T>
const std::string SampleWriter<T>::TS_FILENAME = "ts";
template<typename T>
const std::string SampleWriter<T>::SAMPLE_FILENAME_PREFIX = "ch_";
template<typename T>
const std::string SampleWriter<T>::FILE_EXT = "dat";
#endif //WFR_SAMPLEWRITER_H

View File

@ -1,23 +0,0 @@
//
// Created by epagris on 2021. 11. 02..
//
#ifndef WFR_AUDIO_TYPES_H
#define WFR_AUDIO_TYPES_H
#include <cstdint>
#define STEREO_BUF_LEN (2 * 192)
#define PERIOD_LEN (STEREO_BUF_LEN / 2)
#define SAMPLE_RATE (48000)
typedef int16_t AudioSampleType;
typedef struct {
uint32_t timestamp_s, timestamp_ns; // timestamp
uint32_t sample_cnt; // count of samples in packet
in_addr_t addr; // client node address
AudioSampleType pData[STEREO_BUF_LEN]; // buffer for stereo data
} AudioPayload;
#endif //WFR_AUDIO_TYPES_H

View File

@ -6,7 +6,7 @@
#define WFR_GLOBALS_H
#include <memory>
#include "ServerBeacon.h"
#include "../libwfr/src/ServerBeacon.h"
struct Globals {
std::shared_ptr<ServerBeacon> beacon; // beacon to detect client nodes

68
wfstream_analyzer.lua Normal file
View File

@ -0,0 +1,68 @@
-- @brief WaveFormStream analyzer for wireshark
-- @author Epagris
-- @date 2022.04.29.
-- 1. Create parser objects
local NAME = "WFS" --Custom protocol name
local MsgProto = Proto(NAME, "WaveFormStream over UDP")
-- MsgProto Resolution fields for defining protocols
local fields = MsgProto.fields
fields.timestamp_s = ProtoField.uint32(NAME .. "TIMESTAMP_S", "timestamp_s", base.DEC)
fields.timestamp_ns = ProtoField.uint32(NAME .. "TIMESTAMP_NS", "timestamp_ns", base.DEC)
fields.sample_cnt = ProtoField.uint32(NAME .. "SAMPLE_CNT", "sample_cnt", base.DEC)
fields.sample_size = ProtoField.uin16_t(NAME .. "SAMPLE_SIZE", "sample_size", base.DEC)
fields.channel_count = ProtoField.uin16_t(NAME .. "CHANNEL_COUNT", "channel_count", base.DEC)
fields.addr = ProtoField.ipv4(NAME .. "ADDR", "addr", base.DEC)
local data_dis = Dissector.get("data")
-- 2. Parser function dissect packet
--[[
//Next, define the main function of the foo parser, which is called by wireshark
//The first parameter is the tvb type, which represents the data that needs to be parsed by this parser
//The second parameter is the Pinfo type, which is the information on the protocol parsing tree, including the display on the UI.
//The third parameter is the TreeItem type, which represents the upper parse tree.
--]]
function MsgProto.dissector (tvb, pinfo, tree)
--Create a subtree from the root tree to print parsed message data
local subtree = tree:add(MsgProto, tvb())
subtree:append_text(", msg_no: " .. tvb(0, 1):uint())
-- The protocol name displayed on the protocol line in the packet details
pinfo.cols.protocol = MsgProto.name
tvb_length = tvb:len()
-- dissect field one by one, and add to protocol tree
--Include a header in the message and continue to create tree parsing
local msg_head_tree = subtree:add(MsgProto, tvb(0,2), "MSG_HEADER") --"MSG_HEADER"The parameter replaces the protocol name display
msg_head_tree:add(fields.msg_no, tvb(0, 1))--Represents a byte starting from 0
msg_head_tree:add(fields.msg_version, tvb(1, 1))
subtree:add(fields.msg_len, tvb(2,1))
subtree:add(fields.length, tvb_length) --Display data slice length information without fetching data from slice memory
subtree:add(fields.data_length, tvb_length-8)
-- Bit Domain Continues to Create Tree Resolution
local msg_bitx_tree = subtree:add( fields.msg_bitx, tvb(3,1) ) -- bitfield
msg_bitx_tree:add(fields.msg_bit1,tvb(3,1))
msg_bitx_tree:add(fields.msg_bit2,tvb(3,1))
msg_bitx_tree:add(fields.msg_bit3,tvb(3,1))
msg_bitx_tree:add(fields.msg_bit4,tvb(3,1))
subtree:add_le(fields.local_id,tvb(4,4))
subtree:add_le(fields.remote_id,tvb(8,4))
data_dis:call(tvb(12):tvb(), pinfo, tree) --It is noteworthy to parse the data in the data stream after the message structure. call The parameter name must be tvbI hope the big man will give me some advice.
end
-- 3 Register the parser to wireshark Analytical table register this dissector
local udp_port_table = DissectorTable.get("tcp.port")
--Adding parsed TCP Port, Identify Protocol Based on Port Number
for i,port in ipairs{8001,8002} do
udp_port_table:add(port,MsgProto)
end