- MultiStreamOscilloscope first implementation done

- SlopeTrigger -> EdgeTrigger
- working Python example included
- libwfr.h introduced for easy including all libwfr headers
- ChannelBuffer is now template
- Semaphore moved
This commit is contained in:
Wiesner András 2022-05-05 22:19:52 +02:00
parent 22233ce62d
commit a6be70fb7b
23 changed files with 632 additions and 321 deletions

View File

@ -11,8 +11,8 @@ add_executable(${APP_NAME}
src/ALSADevice.h
src/MemoryPool.cpp
src/MemoryPool.h
utils/Semaphore.h
utils/Semaphore.cpp)
libwfr/src/utils/Semaphore.h
libwfr/src/utils/Semaphore.cpp)
find_package(Threads REQUIRED)
if (THREADS_FOUND)

View File

@ -30,7 +30,10 @@ set(SOURCES
src/MultiStreamOscilloscope.cpp
src/MultiStreamOscilloscope.h
src/Trigger.cpp
src/Trigger.h)
src/Trigger.h
src/libwfr.h
src/utils/Semaphore.cpp
src/utils/Semaphore.h)
###

View File

@ -1,239 +1,3 @@
//
// Created by epagris on 2022.05.03..
//
#include <cstring>
#include "ChannelBuffer.h"
#include "Logger.h"
ChannelBuffer::ChannelBuffer(size_t blockSize, size_t blockCount, size_t elementSize) {
// check if blockSize is divisible by 4
if ((blockSize % 4) != 0) {
throw std::runtime_error(std::string("Block size must be an integral multiple of 4! ") + __FUNCTION__);
}
// fill-in parameters
mBlockSize = blockSize;
mBlockCount = blockCount;
mFullMemSize = blockSize * blockCount;
mElementSize = elementSize;
mElementsPerBlock = blockSize / elementSize;
// allocate memory
mpMem = std::shared_ptr<uint8_t>(new uint8_t[mFullMemSize]); // exception is thrown if this fails...
// clear and fill-in memory blocks
clear();
// no reader is waiting
mReaderWaiting = false;
}
void ChannelBuffer::store(const void *pSrc) {
mStoreFetch_mtx.lock(); // MUTEX!!!
BlockDescriptor bd;
if (mFreeBlocks.empty()) { // if there're no more free blocks to allocate, then drop the eldest one and shift the queue
// release block
bd = mOccupiedBlocks.front();
mOccupiedBlocks.pop_front();
// log buffer overrun
//Logger::logLine("ChannelBuffer overrun!");
} else { // if there's at least a single free block
// acquire the free block
bd = mFreeBlocks.front();
// remove from the free queue
mFreeBlocks.pop_front();
// put into the queue of occupied blocks
//mOccupiedBlocks.push(bd);
}
// store tag
mLastTag += mElementsPerBlock;
bd.tag = mLastTag;
// push back onto the queue ("move to the end of the queue")
mOccupiedBlocks.push_back(bd);
// copy content
void *pDest = bd.p;
memcpy(pDest, pSrc, mBlockSize);
// handle waiting readers
if (mReaderWaiting) {
mReaderWaiting = false;
mStoreFetch_mtx.unlock();
mReadSem.post();
} else {
mStoreFetch_mtx.unlock();
}
}
uint64_t ChannelBuffer::fetch(void *pDst, ssize_t size) {
mStoreFetch_mtx.lock(); // MUTEX!!!
// if there're no data to copy, then block
if (mOccupiedBlocks.empty()) {
mReaderWaiting = true; // signal that reader is waiting
mStoreFetch_mtx.unlock(); // release lock
mReadSem.wait(); // wait on data to become available
mStoreFetch_mtx.lock(); // re-lock mutex
}
BlockDescriptor bd;
// if there're data to copy
if (!mOccupiedBlocks.empty()) {
// get block
bd = mOccupiedBlocks.front();
void *pSrc = bd.p;
// copy content to given memory area
if (size == 0) {
memcpy(pDst, pSrc, mBlockSize);
} else if (size > 0) { // copy from the beginning of the block
memcpy(pDst, pSrc, std::min((size_t) size, mBlockSize));
} else if (size < 0) { // copy from the end of the block
size_t copySize = std::min((size_t) (labs(size)), mBlockSize); // determine copy size
uint8_t *pSrcStart = (uint8_t *) pSrc + mBlockSize - copySize; // determine beginning of area to copy
memcpy(pDst, pSrcStart, copySize); // copy
}
// remove from the queue of occupied blocks
mOccupiedBlocks.pop_front();
// add to the queue of free blocks
mFreeBlocks.push_back(bd);
} else {
throw std::runtime_error("Error, this mustn't be hit!");
}
mStoreFetch_mtx.unlock(); // release semaphore
return bd.tag;
}
void ChannelBuffer::clear() {
std::unique_lock<std::mutex> lock(mStoreFetch_mtx); // MUTEX!
// fill-in free blocks
mOccupiedBlocks.clear();
mFreeBlocks.clear();
for (size_t i = 0; i < mBlockCount; i++) {
BlockDescriptor bd = {.p = mpMem.get() + i * mBlockSize, .tag = -1};
mFreeBlocks.push_back(bd);
}
}
std::shared_ptr<const uint8_t> ChannelBuffer::fullBuffer() const {
return mpMem;
}
int64_t ChannelBuffer::getEldestTag() {
std::unique_lock<std::mutex> lock(mStoreFetch_mtx);
if (!mOccupiedBlocks.empty()) {
return mOccupiedBlocks.front().tag;
} else {
return -1;
}
}
size_t ChannelBuffer::copyBetweenTags(void *pDst, int64_t start, int64_t end) {
std::unique_lock<std::mutex> lock(mStoreFetch_mtx);
if (mOccupiedBlocks.empty() || (end < start)) {
return 0;
}
uint8_t * pDstByte = (uint8_t *) pDst;
// get eldest and youngest tag
int64_t eldestTag = mOccupiedBlocks.front().tag;
int64_t youngestTag = mOccupiedBlocks.back().tag;
// clip tags falling out of valid range
start = std::max(start, eldestTag);
end = std::min(end, youngestTag);
// iterators
std::list<BlockDescriptor>::iterator startIter, endIter;
bool startIterFound = false, endIterFound = false;
// find starting block
for (auto iter = mOccupiedBlocks.begin(); iter != mOccupiedBlocks.end() && (!startIterFound && !endIterFound); iter++) {
// select start iter
if ((iter->tag <= start) && ((iter->tag + mElementsPerBlock - 1) >= start)) {
startIter = iter;
startIterFound = true;
}
if ((iter->tag <= end) && ((iter->tag + mElementsPerBlock - 1) >= end)) {
endIter = iter;
endIterFound = true;
}
}
uint8_t *pStart;
size_t sizeCopied = 0;
// IF range resides in a single block
if (startIter == endIter) {
pStart = startIter->p + (start - startIter->tag) * mElementSize;
sizeCopied = end - start + 1;
memcpy(pDst, pStart, sizeCopied);
} else { // IF range spans across multiple blocks
auto iterAfterEnd = endIter;
iterAfterEnd++;
size_t copySize = 0;
for (auto iter = startIter; iter != iterAfterEnd; iter++) {
if (iter == startIter) { // START
pStart = iter->p + (start - iter->tag) * mElementSize;
copySize = mBlockSize - (start - iter->tag) + 1;
memcpy(pDstByte + sizeCopied, pStart, copySize);
sizeCopied += copySize;
} else if (iter == endIter) { // END
pStart = iter->p;
copySize = (end - iter->tag) + 1;
memcpy(pDstByte + sizeCopied, pStart, copySize);
sizeCopied += copySize;
} else { // INBETWEEN
pStart = iter->p;
copySize = mBlockSize;
memcpy(pDstByte + sizeCopied, pStart, copySize);
sizeCopied += copySize;
}
}
}
return sizeCopied;
}
size_t ChannelBuffer::getFreeBlockCnt() {
std::unique_lock<std::mutex> lock(mStoreFetch_mtx);
return mFreeBlocks.size();
}
int64_t ChannelBuffer::copyBlock(size_t n, void * pDst) {
std::unique_lock<std::mutex> lock(mStoreFetch_mtx);
// block not found
if ((n + 1) > mOccupiedBlocks.size()) {
return -1;
}
// block found
auto iter = mOccupiedBlocks.begin();
std::advance(iter, n);
// copy block contents
memcpy(pDst, iter->p, mBlockSize);
return iter->tag;
}

View File

@ -12,41 +12,295 @@
#include <memory>
#include <mutex>
#include <list>
#include <Semaphore.h>
#include <cstring>
#include "utils/Semaphore.h"
#include "Logger.h"
/*
* Finite-length, automatically dropping channel buffer designed for exactly one reader
*/
template<typename T>
class ChannelBuffer {
public:
struct BlockDescriptor {
uint8_t * p;
T *p;
int64_t tag;
};
private:
size_t mBlockSize; // size of a block
size_t mElementsPerBlock; // size of a block
size_t mBlockCount; // number of blocks
size_t mFullMemSize; // allocate memory block size derived from block size and block count
size_t mElementSize; // size of a single element
size_t mElementsPerBlock; // number of elements per block
size_t mFullElementCnt; // allocate memory block size derived from block size and block count
//size_t mElementSize; // size of a single element
//size_t mElementsPerBlock; // number of elements per block
uint64_t mLastTag; // last assigned tag
std::list<BlockDescriptor> mOccupiedBlocks; // occupied blocks
std::list<BlockDescriptor> mFreeBlocks; // free blocks
std::shared_ptr<uint8_t> mpMem; // memory holding buffers (free and occupied also)
std::shared_ptr<T> mpMem; // memory holding buffers (free and occupied also)
std::mutex mStoreFetch_mtx; // mutex for concurrent fetching and storing
bool mReaderWaiting; // indicates, that a reader is waiting for new data
Semaphore mReadSem; // read semaphore for blocking reader
public:
ChannelBuffer(size_t blockSize, size_t blockCount, size_t elementSize); // constr.
void store(const void *pSrc); // store into a new data block by copying from pSrc
uint64_t fetch(void * pDst, ssize_t size = 0); // fetch eldest data block and copy to pDst
std::shared_ptr<const uint8_t> fullBuffer() const; // get full buffer
void clear(); // clear block occupancies
int64_t getEldestTag(); // get the tag residing the longest in the buffer
size_t copyBetweenTags(void *pDst, int64_t start, int64_t end); // copy data to external buffer between tags (return: number of bytes copied)
size_t getFreeBlockCnt(); // get number of free blocks
int64_t copyBlock(size_t n, void * pDst); // get block (nth eldest)
// constr.
ChannelBuffer(size_t elementsPerBlock, size_t blockCount) {
// fill-in parameters
mElementsPerBlock = elementsPerBlock;
mBlockCount = blockCount;
mFullElementCnt = elementsPerBlock * blockCount;
// allocate memory
mpMem = std::shared_ptr<T>(new T[mFullElementCnt]); // exception is thrown if this fails...
// clear and fill-in memory block queues
clear();
// no reader is waiting
mReaderWaiting = false;
// clear last tag
mLastTag = 0;
}
// store into a new data block by copying from pSrc
void store(const T *pSrc) {
mStoreFetch_mtx.lock(); // MUTEX!!!
BlockDescriptor bd;
if (mFreeBlocks.empty()) { // if there're no more free blocks to allocate, then drop the eldest one and shift the queue
// release block
bd = mOccupiedBlocks.front();
mOccupiedBlocks.pop_front();
// log buffer overrun
//Logger::logLine("ChannelBuffer overrun!");
} else { // if there's at least a single free block
// acquire the free block
bd = mFreeBlocks.front();
// remove from the free queue
mFreeBlocks.pop_front();
// put into the queue of occupied blocks
//mOccupiedBlocks.push(bd);
}
// store tag
mLastTag += mElementsPerBlock;
bd.tag = mLastTag;
// push back onto the queue ("move to the end of the queue")
mOccupiedBlocks.push_back(bd);
// copy content
memcpy(bd.p, pSrc, mElementsPerBlock * sizeof(T));
// handle waiting readers
if (mReaderWaiting) {
mReaderWaiting = false;
mStoreFetch_mtx.unlock();
mReadSem.post();
} else {
mStoreFetch_mtx.unlock();
}
}
// fetch eldest data block and copy to pDst
int64_t fetch(T *pDst, ssize_t n = 0) {
mStoreFetch_mtx.lock(); // MUTEX!!!
// if there're no data to copy, then block
if (mOccupiedBlocks.empty()) {
mReaderWaiting = true; // signal that reader is waiting
mStoreFetch_mtx.unlock(); // release lock
mReadSem.wait(); // wait on data to become available
mStoreFetch_mtx.lock(); // re-lock mutex
}
BlockDescriptor bd;
// if there're data to copy
if (!mOccupiedBlocks.empty()) {
// get block
bd = mOccupiedBlocks.front();
T *pSrc = bd.p;
// copy content to given memory area
if (n == 0) {
memcpy(pDst, pSrc, mElementsPerBlock * sizeof(T));
} else if (n > 0) { // copy from the beginning of the block
memcpy(pDst, pSrc, std::min((size_t) n, mElementsPerBlock * sizeof(T)));
} else if (n < 0) { // copy from the end of the block
size_t copyN = std::min((size_t) (labs(n)), mElementsPerBlock) * sizeof(T); // determine copy n
T *pSrcStart = pSrc + mElementsPerBlock - copyN; // determine beginning of area to copy
memcpy(pDst, pSrcStart, copyN * sizeof(T)); // copy
}
// remove from the queue of occupied blocks
mOccupiedBlocks.pop_front();
// add to the queue of free blocks
mFreeBlocks.push_back(bd);
} else {
throw std::runtime_error("Error, this mustn't be hit!");
}
mStoreFetch_mtx.unlock(); // release semaphore
return bd.tag;
}
// get full buffer
std::shared_ptr<const T> fullBuffer() const {
return mpMem;
}
// clear block occupancies
void clear() {
std::lock_guard<std::mutex> lock(mStoreFetch_mtx); // MUTEX!
// fill-in free blocks
mOccupiedBlocks.clear();
mFreeBlocks.clear();
for (size_t i = 0; i < mBlockCount; i++) {
BlockDescriptor bd = {.p = mpMem.get() + i * mElementsPerBlock, .tag = -1};
mFreeBlocks.push_back(bd);
}
}
// get the tag residing the longest in the buffer
int64_t getEldestTag() {
std::lock_guard<std::mutex> lock(mStoreFetch_mtx);
if (!mOccupiedBlocks.empty()) {
return mOccupiedBlocks.front().tag;
} else {
return -1;
}
}
// copy data to external buffer between tags (return: number of ELEMENTS copied)
size_t copyBetweenTags(T *pDst, int64_t start, int64_t end) {
std::lock_guard<std::mutex> lock(mStoreFetch_mtx);
if (mOccupiedBlocks.empty() || (end < start)) {
return 0;
}
// get eldest and youngest tag
int64_t eldestTag = mOccupiedBlocks.front().tag;
int64_t youngestTag = mOccupiedBlocks.back().tag;
// clip tags falling out of valid range
start = std::max(start, eldestTag);
end = std::min(end, youngestTag);
// iterators
typename std::list<BlockDescriptor>::iterator startIter, endIter;
bool startIterFound = false, endIterFound = false;
// find starting block
for (auto iter = mOccupiedBlocks.begin(); iter != mOccupiedBlocks.end() && !(startIterFound && endIterFound); iter++) {
// select start iter
if ((iter->tag <= start) && ((iter->tag + mElementsPerBlock - 1) >= start)) {
startIter = iter;
startIterFound = true;
}
if ((iter->tag <= end) && ((iter->tag + mElementsPerBlock - 1) >= end)) {
endIter = iter;
endIterFound = true;
}
}
T *pStart;
size_t elementsCopied = 0;
// IF range resides in a single block
if (startIter == endIter) {
pStart = startIter->p + (start - startIter->tag);
elementsCopied = end - start;
memcpy(pDst, pStart, elementsCopied * sizeof(T));
} else { // IF range spans across multiple blocks
auto iterAfterEnd = endIter;
iterAfterEnd++;
size_t copyN = 0;
for (auto iter = startIter; iter != iterAfterEnd; iter++) {
if (iter == startIter) { // START
pStart = iter->p + (start - iter->tag);
copyN = mElementsPerBlock - (start - iter->tag);
memcpy(pDst + elementsCopied, pStart, copyN * sizeof(T));
elementsCopied += copyN;
} else if (iter == endIter) { // END
pStart = iter->p;
copyN = (end - iter->tag);
memcpy(pDst + elementsCopied, pStart, copyN * sizeof(T));
elementsCopied += copyN;
} else { // INBETWEEN
pStart = iter->p;
copyN = mElementsPerBlock;
memcpy(pDst + elementsCopied, pStart, copyN * sizeof(T));
elementsCopied += copyN;
}
}
}
return elementsCopied;
}
// get number of free blocks
size_t getFreeBlockCnt() {
std::lock_guard<std::mutex> lock(mStoreFetch_mtx);
return mFreeBlocks.size();
}
// get block (nth eldest)
int64_t copyBlock(size_t n, T *pDst) {
std::lock_guard<std::mutex> lock(mStoreFetch_mtx);
// block not found
if ((n + 1) > mOccupiedBlocks.size()) {
return -1;
}
// block found
auto iter = mOccupiedBlocks.begin();
std::advance(iter, n);
// copy block contents
memcpy(pDst, iter->p, mElementsPerBlock * sizeof(T));
return iter->tag;
}
// get sample with given tag
T getElementByTag(int64_t tag) {
std::lock_guard<std::mutex> lock(mStoreFetch_mtx);
for (auto iter = mOccupiedBlocks.begin(); iter != mOccupiedBlocks.end(); iter++) {
if ((iter->tag <= tag) && ((iter->tag + mElementsPerBlock) > tag)) {
return *(iter->p + (tag - iter->tag));
}
}
// log invalid tag
Logger::logLine("INVALID tag (" + std::to_string(tag) + "), must be between " +
std::to_string(mOccupiedBlocks.front().tag) + " and " +
std::to_string(mOccupiedBlocks.back().tag + mElementsPerBlock - 1) + "!");
T invalid;
memset(&invalid, 0, sizeof(T));
return invalid;
}
// get eldest sample
T getEldestSample() {
std::lock_guard<std::mutex> lock(mStoreFetch_mtx);
return *(mOccupiedBlocks.front().p);
}
};

View File

@ -3,12 +3,19 @@
//
#include <cmath>
#include <cstring>
#include "MultiStreamOscilloscope.h"
MultiStreamOscilloscope::MultiStreamOscilloscope() {
trigger = std::make_shared<TriggerSettings>(); // default, dummy trigger
mTriggerPosition_percent = TRIGGER_POS_PERCENT_DEF;
mCapturePeriod_ns = DRAW_WINDOW_PERIOD_NS_DEF;
verticalScale = VOLT_PER_BIN_DEF;
mTrigState = {false, false};
mTrigState = {false, false, -1, false };
mFIFOBlockCnt = 0;
mCaptureLength = 0;
mPreTriggerSamples = 0;
}
void MultiStreamOscilloscope::setup(const std::vector<in_addr_t> &nodes, const AcquisitionFormat &acqFmt) {
@ -16,20 +23,36 @@ void MultiStreamOscilloscope::setup(const std::vector<in_addr_t> &nodes, const A
// calculate buffer parameters
mCaptureLength = ceil(mCapturePeriod_ns / 1E+09 * acqFmt.sampling_rate_Hz); // calculate draw window period in samples
mFIFOBlockCnt = 2 * floor(mCaptureLength / acqFmt.mch_samples_per_packet); // TODO: 2x: pretrigger...
mFIFOBlockSize = mAcqFmt.mch_samples_per_packet * sizeof(SamplePoint);
mFIFOBlockCnt = 4 * ceil(mCaptureLength / acqFmt.mch_samples_per_packet); // TODO magic...
// setup channel buffers
mpChBufs.resize(mChCnt);
for (auto chBuf: mpChBufs) {
chBuf.reset(new ChannelBuffer(mFIFOBlockSize, mFIFOBlockCnt, 0));
for (auto& chBuf: mpChBufs) {
chBuf.reset(new ChannelBuffer<SamplePoint>(mAcqFmt.mch_samples_per_packet, mFIFOBlockCnt));
}
// setup fullness bits
mFIFOFull.resize(mChCnt);
clearFIFOFullnessBits();
// setup sample assembly buffer
mpAssemblyBuffer.reset(new SamplePoint[mAcqFmt.mch_samples_per_packet]);
// setup trigger buffer
mpTriggerBuffer.reset(new SamplePoint[mAcqFmt.mch_samples_per_packet]);
// determine trigger block
double propTrigPos = mTriggerPosition_percent / 100.0;
mTriggerProbeBlock_idx = ceil((mFIFOBlockCnt - 2) * propTrigPos) + 1; // TODO it's too difficult to explain...
// determine number of pretrigger samples
mPreTriggerSamples = ceil(mCaptureLength * propTrigPos);
// setup capture buffers for every channel
mCaptureBuffers.resize(mChCnt);
for (size_t ch = 0; ch < mChCnt; ch++) {
mCaptureBuffers[ch].resize(mCaptureLength);
}
}
bool MultiStreamOscilloscope::input(size_t ch, const std::shared_ptr<Timestamp> &pTime, const void *pData, size_t size) {
@ -59,20 +82,35 @@ bool MultiStreamOscilloscope::input(size_t ch, const std::shared_ptr<Timestamp>
sample *= verticalScale;
// save sample point
mpAssemblyBuffer.get()[i] = {.t = pTime.get()[i].to_ns(), .x = sample};
SamplePoint& sp = mpAssemblyBuffer.get()[i];
sp.t = pTime.get()[i].to_ns();
sp.x = sample;
}
// push new block of samples onto the FIFO
mpChBufs[ch]->store(mpAssemblyBuffer.get());
// store FIFO-fullness bit
mFIFOFull[ch] = (mpChBufs[ch]->getFreeBlockCnt() == 0);
// if a sufficient collection of samples are available (the FIFO is full)
// and the trigger is armed, and the trigger channel is the curent one,
// then probe samples to trigger
if (trigSettings.ch == ch && mTrigState.armed && mpChBufs[ch]->getFreeBlockCnt() == 0) {
int64_t tag = mpChBufs[ch]->copyBlock(2, mpTriggerBuffer.get()); // TODO magic constant!...
for (size_t i = 0; i < mAcqFmt.mch_samples_per_packet; i++) {
const SamplePoint& samplePoint = mpTriggerBuffer.get()[i]; // get sample
mTrigState.trigd = trigSettings.sample(samplePoint.x); // probe if causes a trigger
if (trigger->ch == ch && mTrigState.armed && mpChBufs[ch]->getFreeBlockCnt() == 0 && isEveryFIFOFull()) {
std::unique_lock<std::mutex> lock(mCapture_mtx);
int64_t tag = mpChBufs[ch]->copyBlock(mTriggerProbeBlock_idx, mpTriggerBuffer.get()); // TODO magic constant!...
size_t i = 0;
if (!mTrigState.trigd) { // if not triggered externally
while (i < mAcqFmt.mch_samples_per_packet) {
const SamplePoint &samplePoint = mpTriggerBuffer.get()[i]; // get sample
mTrigState.trigd = trigger->sample(samplePoint.x); // probe if causes a trigger
if (mTrigState.trigd) { // break if trigger is successful
break;
}
i++; // increase index
}
}
// triggered?
if (mTrigState.trigd) {
@ -80,21 +118,107 @@ bool MultiStreamOscilloscope::input(size_t ch, const std::shared_ptr<Timestamp>
mTrigState.triggerSampleTag = tag + i; // store the sample the trigger fired on
// copy captured data
size_t preTriggerSamples = mAcqFmt.mch_samples_per_packet;
int64_t captureStart = mTrigState.triggerSampleTag - preTriggerSamples;
int64_t captureStart = mTrigState.triggerSampleTag - mPreTriggerSamples;
int64_t captureEnd = captureStart + mCaptureLength;
mpChBufs[ch]->copyBetweenTags(mpTriggerBuffer.get(), captureStart, captureEnd);
// get time of trigger occurred
SamplePoint trigSP = mpChBufs[ch]->getElementByTag(mTrigState.triggerSampleTag);
int64_t t0 = trigSP.t;
// get eldest sample's timestamp on the trigger channel
int64_t t_eldest_trigger = mpChBufs[ch]->getEldestSample().t;
// TODO now full synchronicity is assumed
// copy samples from each channel
for (size_t k = 0; k < mChCnt; k++) {
auto& chBuf = *(mpChBufs[k]); // acquire channel buffer
int64_t t_offset, offset_index = 0;
if (k != ch) { // we are NOT processing the trigger channel, since it's the reference
int64_t t_eldest = chBuf.getEldestSample().t;
t_offset = t_eldest - t_eldest_trigger; // k-th channel is t_offset time before the trigger channel
offset_index = round(t_offset * 1E-09 / mAcqFmt.sampling_rate_Hz); // determine offset in sample index
}
chBuf.copyBetweenTags(mCaptureBuffers[k].data(), captureStart - offset_index, captureEnd - offset_index);
for (size_t l = 0; l < mCaptureLength; l++) {
mCaptureBuffers[k][l].t -= t0; // substract t0 from each sample point's time
}
}
// clear each channel's FIFO (since we don't want the data to be displayed more than once)
for (size_t k = 0; k < mChCnt; k++) {
mpChBufs[k]->clear();
}
// notify thread waiting on data
mTrigState.samplesReady = true;
lock.unlock(); // release mutex
mCapture_cv.notify_all(); // notify
}
// automatically unlocks here
}
return true;
}
void MultiStreamOscilloscope::triggerNow() {
void MultiStreamOscilloscope::clearFIFOFullnessBits() {
for (size_t i = 0; i < mChCnt; i++) {
mFIFOFull[i] = false;
}
}
bool MultiStreamOscilloscope::isEveryFIFOFull() const {
for (size_t i = 0; i < mChCnt; i++) {
if (!mFIFOFull[i]) {
return false;
}
}
return true;
}
void MultiStreamOscilloscope::triggerNow() {
armTrigger();
mTrigState.trigd = true;
}
void MultiStreamOscilloscope::armTrigger() {
trigger->reset();
mTrigState.trigd = false;
mTrigState.samplesReady = false;
mTrigState.armed = true;
}
std::vector<std::vector<MultiStreamOscilloscope::SamplePoint>> MultiStreamOscilloscope::capture() {
std::unique_lock<std::mutex> lock(mCapture_mtx);
mCapture_cv.wait(lock, [this] { return mTrigState.samplesReady; }); // wait for data to become available
auto samples = mCaptureBuffers; // copy buffer
mTrigState.samplesReady = false; // invalidate samples
return samples;
}
// ----------------------------------------
void MultiStreamOscilloscope::setScreenPeriod(size_t ns) {
mCapturePeriod_ns = ns;
}
size_t MultiStreamOscilloscope::getScreenPeriod() const {
return mCapturePeriod_ns;
}
void MultiStreamOscilloscope::setTriggerPosition(size_t percent) {
mTriggerPosition_percent = std::min(percent, (size_t) 100);
}
size_t MultiStreamOscilloscope::getTriggerPosition() const {
return mTriggerPosition_percent;
}
std::vector<int64_t> MultiStreamOscilloscope::getScreenTimeLimits() const {
int64_t lowerLimit, upperLimit;
lowerLimit = -ceil(mTriggerPosition_percent / 100.0 * mCapturePeriod_ns);
upperLimit = mCapturePeriod_ns + lowerLimit;
return std::vector<int64_t>({ lowerLimit, upperLimit });
}

View File

@ -6,6 +6,7 @@
#define WFR_APP_MULTISTREAMOSCILLOSCOPE_H
#include <condition_variable>
#include "ICreatable.h"
#include "MultiStreamProcessor.h"
#include "ChannelBuffer.h"
@ -15,36 +16,65 @@ class MultiStreamOscilloscope : public MultiStreamProcessor, public ICreatable<M
public:
static constexpr size_t DRAW_WINDOW_PERIOD_NS_DEF = 50E+06; // default window length (50ms)
static constexpr double VOLT_PER_BIN_DEF = 42.80E-06; // default vertical resolution
static constexpr size_t TRIGGER_POS_PERCENT_DEF = 50; // trigger position is defaulted to the screen center
public:
struct SamplePoint {
int64_t t; // time
double x; // sample data
SamplePoint() {
t = 0;
x = 0;
}
};
private:
std::vector<std::shared_ptr<ChannelBuffer>> mpChBufs; // channel buffers
size_t mCapturePeriod_ns; // drawing window period
private: // data acquisition related things
std::vector<std::shared_ptr<ChannelBuffer<SamplePoint>>> mpChBufs; // channel buffers
size_t mCaptureLength; // length of drawing sliding window in SAMPLES
size_t mFIFOBlockCnt; // number of blocks the FIFO consists of
size_t mFIFOBlockSize; // FIFO block size in bytes
//size_t mFIFOBlockSize; // FIFO block size in bytes
std::shared_ptr<SamplePoint> mpAssemblyBuffer; // buffer for assembling sample points
std::shared_ptr<SamplePoint> mpTriggerBuffer; // buffer for probing against trigger conditions
std::shared_ptr<SamplePoint> mpCaptureBuffer; // buffer for last capture
std::vector<std::vector<SamplePoint>> mCaptureBuffers; // buffer for last capture
std::mutex mCapture_mtx; // mutex for capture cv
std::condition_variable mCapture_cv; // condition variable on data capture
std::vector<bool> mFIFOFull; // stores FIFO fullness indicators, needed for proper triggering and capture
private:
void clearFIFOFullnessBits(); // clear fullness bits
bool isEveryFIFOFull() const; // are the FIFOs full?
public: // graphical displaying related things
size_t mTriggerPosition_percent; // trigger position on the screen
size_t mTriggerProbeBlock_idx; // index of block on which trigger will run
size_t mPreTriggerSamples; // samples displayed before the trigger point
size_t mCapturePeriod_ns; // (drawing window) screen period
public:
TriggerSettings trigSettings; // trigger settings
std::shared_ptr<TriggerSettings> trigger; // trigger settings
double verticalScale; // vertical resolution
private:
struct {
bool armed; // indicates wheter trigger is armed or not
bool trigd; // indicates if trigger has fired
int64_t triggerSampleTag;
bool samplesReady; // samples have been transferred to the output buffer
} mTrigState;
public:
void setup(const std::vector<in_addr_t> &nodes, const AcquisitionFormat &acqFmt) override;
bool input(size_t ch, const std::shared_ptr<Timestamp> &pTime, const void *pData, size_t size) override;
public:
MultiStreamOscilloscope();
void setScreenPeriod(size_t ns); // set screen (visible waveform window) time length
size_t getScreenPeriod() const; // get --- " ----
void setTriggerPosition(size_t percent); // set trigger position
size_t getTriggerPosition() const; // get trigger position
std::vector<int64_t> getScreenTimeLimits() const; // get time limits of the screen's edges
void triggerNow(); // make the scope trigger regardless conditions
void armTrigger(); // arm trigger for next acquisition
std::vector<std::vector<MultiStreamOscilloscope::SamplePoint>> capture(); // wait for captured data TODO to be renamed
void setup(const std::vector<in_addr_t> &nodes, const AcquisitionFormat &acqFmt) override;
bool input(size_t ch, const std::shared_ptr<Timestamp> &pTime, const void *pData, size_t size) override;
};

View File

@ -27,3 +27,7 @@ bool MultiStreamProcessor::input(size_t ch, const std::shared_ptr<Timestamp> &pT
return true;
}
size_t MultiStreamProcessor::getChannelCount() const {
return mChCnt;
}

View File

@ -22,6 +22,7 @@ protected:
public:
virtual void setup(const std::vector<in_addr_t> &nodes, const AcquisitionFormat &acqFmt); // setup function for adapting to data streams
virtual bool input(size_t ch, const std::shared_ptr<Timestamp> &pTime, const void *pData, size_t size); // input data for a channel
size_t getChannelCount() const; // get number of channels
};

View File

@ -8,6 +8,7 @@
#include <thread>
#include <unistd.h>
#include <cstring>
#include <map>
#include "MultiStreamReceiver.h"
#include "audio_types.h"
@ -34,7 +35,7 @@ MultiStreamReceiver::MultiStreamReceiver(const std::vector<std::string> &nodes_s
mSoc = -1;
for (auto &node_addr: nodes_str) {
mClientNodes.push_back({ inet_addr(node_addr.c_str()) });
mClientNodes.push_back({inet_addr(node_addr.c_str())});
}
}
@ -102,7 +103,6 @@ void MultiStreamReceiver::stop() {
mRunning = false;
mRecvThread->join();
close(mSoc);
mpSampleWriters.clear();
}
// --------------------------------------
@ -129,10 +129,20 @@ void MultiStreamReceiver::fnRecv(MultiStreamReceiver *pMSR) {
AudioPayloadHeader aph;
// packet timestamps
Timestamp ts[2]; // current and previous timestamp
typedef Timestamp NodePacketTimestamps[2]; // current and previous timestamp
// a pair for each node (since each node may have a unique timescale)
std::map<in_addr_t, NodePacketTimestamps> allNodeTS;
for (auto node_ip: pMSR->mClientNodes) { // prepare clear timestamps
allNodeTS[node_ip][0] = Timestamp(0);
allNodeTS[node_ip][1] = Timestamp(0);
}
//Timestamp ts[2];
// limit on time difference variation
double timeDiffVariation_ns = 1E+09 / pMSR->mAcqFmt.sampling_rate_Hz * TS_VALIDITY_RANGE;
uint32_t lastIndex = 0;
// allocate buffer for interpolated timestamps
std::shared_ptr<Timestamp> pTSBuf(new Timestamp[pMSR->mAcqFmt.mch_samples_per_packet]);
@ -171,8 +181,33 @@ void MultiStreamReceiver::fnRecv(MultiStreamReceiver *pMSR) {
// get header
memcpy(&aph, pRecvBuf.get(), sizeof(AudioPayloadHeader));
// examine if the node belongs to us
bool nodeFound = false;
for (auto node_ip: pMSR->mClientNodes) {
if (node_ip == aph.addr) {
nodeFound = true;
break;
}
}
if (!nodeFound) {
Logger::logLine("UNKNOWN remote node is talking to us, skipping this packet! (" + std::string(inet_ntoa({aph.addr})) + ")");
continue;
}
// check index continuity
if (lastIndex > 0 && (aph.index) != (lastIndex + 1)) {
Logger::logLine("NON-CONTINUOUS packet indices! (" + std::to_string(lastIndex) + " -> " + std::to_string(aph.index) + ")");
// TODO Should skip here?
}
lastIndex = aph.index;
// ----------- TIME PROCESSING ---------------
// fetch timestamp array
NodePacketTimestamps &ts = allNodeTS[aph.addr];
// store timestamp
ts[1] = ts[0]; // shift FIFO
ts[0] = {(int64_t) aph.timestamp_s, (int64_t) aph.timestamp_ns}; // store new timestamp
@ -186,8 +221,10 @@ void MultiStreamReceiver::fnRecv(MultiStreamReceiver *pMSR) {
Timestamp d = (ts[0] - ts[1]) / (double) pMSR->mAcqFmt.mch_samples_per_packet;
// validity check on time step size
if (llabs(d.to_ns() - ((int64_t) (1E+09 / pMSR->mAcqFmt.sampling_rate_Hz))) > timeDiffVariation_ns) {
Logger::logLine("SKIPPED packet due to large packet period variation!");
uint64_t diff = llabs(d.to_ns() - ((int64_t) (1E+09 / pMSR->mAcqFmt.sampling_rate_Hz)));
if (diff > timeDiffVariation_ns) {
Logger::logLine("SKIPPED packet due to large packet period variation! (" + std::to_string(diff) + " ns)");
Logger::logLine("DEBUG " + std::to_string(aph.debug));
continue; // skip this packet if time step is outside the valid range
}
@ -245,3 +282,7 @@ void MultiStreamReceiver::fnRecv(MultiStreamReceiver *pMSR) {
}
}
}
size_t MultiStreamReceiver::getChannelCount() const {
return mAcqFmt.channel_count * mClientNodes.size();
}

View File

@ -25,8 +25,6 @@ private:
std::shared_ptr<std::thread> mRecvThread; // thread for reception
static void fnRecv(MultiStreamReceiver *pMSR); // routine function running in separate thread
bool mRunning;
std::vector<std::shared_ptr<SampleWriter>> mpSampleWriters; // sample writers for streams
//std::mutex mSampleInsertMtx; // mutex protecting against sample insertion collision when calling pSampleWriter->addSamples(...)
std::shared_ptr<MultiStreamProcessor> mpMSP; // multistream processor INSTANCE (pointer to...)
private:
AcquisitionFormat mAcqFmt; // acquisition format
@ -43,6 +41,7 @@ public:
void start(const std::shared_ptr<MultiStreamProcessor> &pMSP); // start reception
void stop(); // stop reception
size_t getChannelCount() const; // get number of data channels
virtual ~MultiStreamReceiver();
};

View File

@ -31,7 +31,7 @@ void MultiStreamToFile::createSampleWriters() {
for (unsigned int &mClientNode: mNodes) {
for (size_t ch = 0; ch < mAcqFmt.channel_count; ch++) {
// create sample writer
std::string datasetName = std::string("node_") + inet_ntoa({mClientNode}) + "ch" + std::to_string(acc_ch);
std::string datasetName = std::string("node_") + inet_ntoa({mClientNode}) + "_CH" + std::to_string(acc_ch);
mpSampleWriters.emplace_back(std::make_shared<SampleWriter>(datasetName, mAcqFmt));
// create hint

View File

@ -69,7 +69,7 @@ void SampleWriter::addSamples(const uint8_t *pSamples, const std::shared_ptr<Tim
}
// calculate time step size
Timestamp d = (mTs[0] - mTs[1]) / (double) mBlockSize;
Timestamp d = (mTs[0] - mTs[1]) / (double) mElementsPerBlock;
// validity check on time step size
if (d.to_ns() > mSamplingPeriod_ns_UB || d.to_ns() < mSamplingPeriod_ns_LB) {
@ -86,7 +86,7 @@ void SampleWriter::addSamples(const uint8_t *pSamples, const std::shared_ptr<Tim
}
// extract samples
for (size_t i = 0; i < mBlockSize; i++) {
for (size_t i = 0; i < mElementsPerBlock; i++) {
// select record
auto *pRawSamples = (int8_t *) (pSamples + i * mSampleDisposition);

View File

@ -146,8 +146,8 @@ std::list<in_addr> ServerBeacon::getNodesOnNetwork() {
return nodeAddrs;
}
std::list<std::string> ServerBeacon::getNodesOnNetwork_str() {
std::list<std::string> nodeAddrs;
std::vector<std::string> ServerBeacon::getNodesOnNetwork_str() {
std::vector<std::string> nodeAddrs;
for (auto nodeInfo: getNodesOnNetwork()) {
nodeAddrs.emplace_back(inet_ntoa(nodeInfo));

View File

@ -67,7 +67,7 @@ public:
void stopBeacon(); // stop the beacon
void singleScan(); // initiate a single scan
std::list<in_addr> getNodesOnNetwork(); // get nodes connected to the same network
std::list<std::string> getNodesOnNetwork_str(); // get nodes connected to the same network (list of string)
std::vector<std::string> getNodesOnNetwork_str(); // get nodes connected to the same network (list of string)
unsigned short getNodeNettermPort(in_addr addr); // get network terminal port of node
void setScanCallback(const std::shared_ptr<CallbackBase<ServerBeacon *>>& scanCB); // set scan callback
void execCmdOnNode(in_addr addr, const std::string& cmd); // execute command on a node through netterm AND DON'T expect a response

View File

@ -2,6 +2,7 @@
// Created by epagris on 2022.05.04..
//
#include <iostream>
#include "Trigger.h"
TriggerSettings::TriggerSettings() {
@ -19,19 +20,19 @@ bool TriggerSettings::sample(double x) {
// ------------------------
SlopeTrigger::SlopeTrigger() {
EdgeTrigger::EdgeTrigger() {
level = 0;
slope = RISING;
SlopeTrigger::reset();
edge = RISING;
EdgeTrigger::reset();
}
void SlopeTrigger::reset() {
void EdgeTrigger::reset() {
TriggerSettings::reset();
x_prev = 0;
}
bool SlopeTrigger::sample(double x) {
bool EdgeTrigger::sample(double x) {
// do not act on first sample
if (mFirstSample) {
mFirstSample = false;
@ -42,11 +43,13 @@ bool SlopeTrigger::sample(double x) {
bool trig = false;
// on every other sample...
if (((slope == RISING) && (x_prev < level && x > level)) ||
((slope == FALLING) && (x_prev > level && x < level))) {
if (((edge == RISING) && (x_prev < level && x > level)) ||
((edge == FALLING) && (x_prev > level && x < level))) {
trig = true;
}
//std::cout << x << " - " << level << std::endl;
x_prev = x;
return trig;

View File

@ -6,6 +6,7 @@
#define WFR_APP_TRIGGER_H
#include <cstddef>
#include "ICreatable.h"
// Dummy trigger base
struct TriggerSettings {
@ -20,17 +21,17 @@ public:
virtual bool sample(double x); // insert sample into the trigger
};
// Simple slope trigger
struct SlopeTrigger : public TriggerSettings {
// Simple edge trigger
struct EdgeTrigger : public TriggerSettings, public ICreatable<EdgeTrigger> {
public:
enum SlopeType { RISING, FALLING };
enum EdgeType { RISING, FALLING };
private:
double x_prev; // previous sample
public:
double level; // trigger level
SlopeType slope; // slope direction
EdgeType edge; // edge direction
public:
SlopeTrigger(); // constr.
EdgeTrigger(); // constr.
void reset() override;
bool sample(double x) override;
};

View File

@ -6,6 +6,7 @@
#define WFR_AUDIO_TYPES_H
#include <cstdint>
#include <netinet/in.h>
#define PERIOD_LEN_DEF (STEREO_BUF_LEN / 2)
@ -18,9 +19,11 @@
typedef struct {
uint32_t timestamp_s, timestamp_ns; // timestamp
uint32_t sample_cnt; // count of all-channel samples in packet
uint32_t index;
uint32_t debug;
/*uint32_t sample_cnt; // count of all-channel samples in packet
uint16_t sample_size; // size of a single (mono) sample
uint16_t channel_count; // number of channels
uint16_t channel_count; // number of channels*/
in_addr_t addr; // client node address
//AudioSampleType16_DEF pData[MCH_SAMP_PER_PCKT_DEF]; // buffer for stereo data
} AudioPayloadHeader;

22
libwfr/src/libwfr.h Normal file
View File

@ -0,0 +1,22 @@
//
// Created by epagris on 2022.05.05..
//
#ifndef WFR_APP_LIBWFR_H
#define WFR_APP_LIBWFR_H
#include "AcquisitionFormat.h"
#include "audio_types.h"
#include "Callback.h"
#include "ChannelBuffer.h"
#include "ICreatable.h"
#include "Logger.h"
#include "MultiStreamOscilloscope.h"
#include "MultiStreamProcessor.h"
#include "MultiStreamReceiver.h"
#include "MultiStreamToFile.h"
#include "SampleWriter.h"
#include "ServerBeacon.h"
#include "Trigger.h"
#endif //WFR_APP_LIBWFR_H

View File

@ -15,6 +15,8 @@
#include "libwfr/src/ServerBeacon.h"
#include "src/GUI.h"
#include "libwfr/src/libwfr.h"
//uint8_t pRecvBuf[8096] __attribute__ ((aligned (32)));
std::shared_ptr<Globals> globs;
@ -30,7 +32,7 @@ int main(int argc, char * argv[]) {
Logger::startLogging();
auto a = MultiStreamToFile::create("asd");
// -----------------
@ -44,6 +46,27 @@ int main(int argc, char * argv[]) {
// create beacon
auto beacon = globs->beacon = std::make_shared<ServerBeacon>();
beacon->setInterfaceAddr(mcastIFAddr);
beacon->singleScan();
auto nodes = beacon->getNodesOnNetwork_str();
AcquisitionFormat fmt(beacon->queryFromNode(nodes.front(), "snd acqformat mr"));
MultiStreamReceiver msr(nodes, fmt);
auto osc = MultiStreamOscilloscope::create();
auto et = EdgeTrigger::create();
et->level = 0.5;
et->edge = EdgeTrigger::RISING;
osc->trigger = et;
msr.start(osc);
beacon->execCmdOnAllNodes("snd connect 10.42.0.1 20220");
osc->armTrigger();
auto samples = osc->capture();
beacon->execCmdOnAllNodes("snd disconnect");
msr.stop();
return 0;
// init and start GUI
Glib::init();

View File

@ -1,10 +1,7 @@
#include <pybind11/pybind11.h>
#include <pybind11/stl.h>
#include <ServerBeacon.h>
#include "AcquisitionFormat.h"
#include "MultiStreamToFile.h"
#include "MultiStreamReceiver.h"
#include <libwfr.h>
#define STRINGIFY(x) #x
#define MACRO_STRINGIFY(x) STRINGIFY(x)
@ -66,7 +63,8 @@ PYBIND11_MODULE(pywfs, m) {
.def(py::init<std::vector<std::string>, const AcquisitionFormat &, unsigned short>(), "nodes_str"_a, "acqFormat"_a, "port"_a = MultiStreamReceiver::DEFAULT_PORT)
.def(py::init<std::vector<std::string>, const std::string &, unsigned short>(), "nodes_str"_a, "acqFormat_str"_a, "port"_a = MultiStreamReceiver::DEFAULT_PORT)
.def("listen", &MultiStreamReceiver::start)
.def("close", &MultiStreamReceiver::stop);
.def("close", &MultiStreamReceiver::stop)
.def("getChannelCount", &MultiStreamReceiver::getChannelCount);
// MultiStreamProcessor (abstract class)
py::class_<MultiStreamProcessor, std::shared_ptr<MultiStreamProcessor>> MSP(m, "MultiStreamProcessor");
@ -75,6 +73,47 @@ PYBIND11_MODULE(pywfs, m) {
py::class_<MultiStreamToFile, std::shared_ptr<MultiStreamToFile>, MultiStreamProcessor>(m, "MultiStreamToFile")
.def_static("create", &MultiStreamToFile::create);
// MultiStreamOscilloscope
py::class_<MultiStreamOscilloscope, std::shared_ptr<MultiStreamOscilloscope>, MultiStreamProcessor>(m, "MultiStreamOscilloscope")
.def(py::init<>())
.def_static("create", &MultiStreamOscilloscope::create)
.def_readwrite("trigger", &MultiStreamOscilloscope::trigger)
.def_readwrite("verticalScale", &MultiStreamOscilloscope::verticalScale)
.def("armTrigger", &MultiStreamOscilloscope::armTrigger)
.def("triggerNow", &MultiStreamOscilloscope::triggerNow)
.def("capture", &MultiStreamOscilloscope::capture)
.def("getChannelCount", &MultiStreamOscilloscope::getChannelCount)
.def("setScreenPeriod", &MultiStreamOscilloscope::setScreenPeriod)
.def("getScreenPeriod", &MultiStreamOscilloscope::getScreenPeriod)
.def("setTriggerPosition", &MultiStreamOscilloscope::setTriggerPosition)
.def("getTriggerPosition", &MultiStreamOscilloscope::getTriggerPosition)
.def("getScreenTimeLimits", &MultiStreamOscilloscope::getScreenTimeLimits)
;
// SamplePoint
py::class_<MultiStreamOscilloscope::SamplePoint>(m, "SamplePoint")
.def(py::init<>())
.def_readwrite("t", &MultiStreamOscilloscope::SamplePoint::t)
.def_readwrite("x", &MultiStreamOscilloscope::SamplePoint::x);
// TriggerSettings
py::class_<TriggerSettings, std::shared_ptr<TriggerSettings>>(m, "TriggerSettings")
.def(py::init<>())
.def_readwrite("ch", &TriggerSettings::ch);
// EdgeTrigger
py::class_<EdgeTrigger, std::shared_ptr<EdgeTrigger>, TriggerSettings>(m, "EdgeTrigger")
.def(py::init<>())
.def("create", &EdgeTrigger::create)
.def_readwrite("level", &EdgeTrigger::level)
.def_readwrite("edge", &EdgeTrigger::edge);
// EdgeType
py::enum_<EdgeTrigger::EdgeType>(m, "EdgeType")
.value("RISING", EdgeTrigger::EdgeType::RISING)
.value("FALLING", EdgeTrigger::EdgeType::FALLING)
.export_values();
//
// m.def("subtract", [](int i, int j) { return i - j; }, R"pbdoc(
// Subtract two numbers

View File

@ -9,7 +9,7 @@
#include <thread>
#include <mutex>
#include "MemoryPool.h"
#include "../utils/Semaphore.h"
#include "../libwfr/src/utils/Semaphore.h"
class ALSADevice {
private: