final degree paper state

This commit is contained in:
Wiesner András 2021-12-19 13:05:27 +01:00
parent 45c2e06bf8
commit cff343e959
13 changed files with 993 additions and 127 deletions

View File

@ -3,11 +3,11 @@ cmake_minimum_required(VERSION 3.20)
SET(APP_NAME wfr)
project(${APP_NAME})
set(CMAKE_CXX_STANDARD 14)
set(CMAKE_CXX_STANDARD 17)
add_executable(${APP_NAME} main.cpp src/ALSADevice.cpp src/ALSADevice.h src/MemoryPool.cpp src/MemoryPool.h utils/Semaphore.h utils/Semaphore.cpp src/SampleWriter.h src/Timestamp.h
#src/SampleReceiver.h
src/audio_types.h src/ServerBeacon.cpp src/ServerBeacon.h)
src/audio_types.h src/ServerBeacon.cpp src/ServerBeacon.h src/GUI.h src/globals.h src/Callback.h src/MultiStreamReceiver.cpp src/MultiStreamReceiver.h)
find_package(Threads REQUIRED)
if (THREADS_FOUND)
@ -16,6 +16,26 @@ endif (THREADS_FOUND)
find_package(ALSA REQUIRED)
if (ALSA_FOUND)
include_directories(${ALSA_INCLUDE_DIRS})
target_include_directories(${APP_NAME} PUBLIC ${ALSA_INCLUDE_DIRS})
target_link_libraries(${APP_NAME} ${ALSA_LIBRARIES})
endif (ALSA_FOUND)
endif (ALSA_FOUND)
find_package(PkgConfig REQUIRED)
pkg_check_modules(GTK3 REQUIRED gtk+-3.0)
if (GTK3_FOUND)
target_include_directories(${APP_NAME} PUBLIC ${GTK3_INCLUDE_DIRS})
target_link_directories(${APP_NAME} PUBLIC ${GTK3_LIBRARY_DIRS})
add_definitions(${GTK3_CFLAGS_OTHER})
target_link_libraries(${APP_NAME} ${GTK3_LIBRARIES})
endif(GTK3_FOUND)
pkg_check_modules(GTKMM REQUIRED gtkmm-3.0)
if (GTKMM_FOUND)
target_include_directories(${APP_NAME} PUBLIC ${GTKMM_INCLUDE_DIRS})
target_link_directories(${APP_NAME} PUBLIC ${GTKMM_LIBRARY_DIRS})
target_link_libraries(${APP_NAME} ${GTKMM_LIBRARIES})
endif(GTKMM_FOUND)
install(TARGETS ${APP_NAME} RUNTIME DESTINATION ${PROJECT_SOURCE_DIR}/runtime ARCHIVE DESTINATION ${PROJECT_SOURCE_DIR}/runtime)
set(CMAKE_CXX_FLAGS ${CMAKE_CXX_FLAGS} "-rdynamic")

View File

@ -0,0 +1,15 @@
function read_multistream_dir(d, prefix, plotrange, onlydata)
% get directories containing samples
hint_file_name = strcat('./runtime/', d, '/node_dir_hint.txt');
hint_file = fopen(hint_file_name);
node_dirs = textscan(hint_file, '%s');
fclose(hint_file);
node_names = strrep(node_dirs{1,1}, prefix, "");
node_dirs = strcat('./runtime/', d, '/', node_dirs{1,1});
sync_datasets(node_dirs, node_names, plotrange, onlydata);
print(strcat('./runtime/', d, '/figure.svg'), '-dsvg');
end

View File

@ -1,4 +1,4 @@
function sds = sync_datasets(dss)
function sds = sync_datasets(dss, node_names, PLOTRANGE, ONLYDATA)
% load datasets
len = length(dss);
sds = {};
@ -22,15 +22,28 @@ function sds = sync_datasets(dss)
sds{i} = ds;
end
% reference dataset and timescale
ds_ref = sds{1};
ts_ref = ds_ref(:,1);
% create plot
figure(1)
if (~ONLYDATA)
figure('Position', [100 100 1000 1000])
else
figure('Position', [100 100 1000 400])
end
clf
% plot samples
subplot(2,1,1);
if (~ONLYDATA)
subplot(3,1,1);
end
marks = ['-', '-o'];
marks = ["-x", "-o"];
PLOTRANGE = 1:100;
%PLOTRANGE = 1017:1020;
VOLT_PER_BIN = 42.80E-06;
% get beginning indices
for i=1:len
@ -39,34 +52,90 @@ function sds = sync_datasets(dss)
s = size(ds);
%for k = 2:s(2);
for k = 2:2
%plot(ts(1:100),ds(1:100,k), marks(k-1));
plot(ts(PLOTRANGE),ds(PLOTRANGE,k), 'x');
for k = 2:s(2)
%for k = 2:2
if (ONLYDATA)
mark = "-";
else
mark = marks(k-1);
end
plot(ts(PLOTRANGE), ds(PLOTRANGE,k) * VOLT_PER_BIN, mark);
hold on
end
end
grid on
xlabel("Time [s]");
ylabel("Sample");
xlim([ts(PLOTRANGE(1)) ts(PLOTRANGE(end))]);
if (ONLYDATA)
xlabel("Idő [s]");
end
ylabel("Feszültség [V]");
xlim([ts_ref(PLOTRANGE(1)) ts_ref(PLOTRANGE(end))]);
legend_lines = {};
for i=1:length(node_names)
for ch=1:2
legend_lines{2 * (i - 1) + ch, 1} = strcat(node_names{i,1}, " CH", num2str(ch));
end
end
legend(legend_lines);
if (ONLYDATA)
return;
end
% plot signal difference
subplot(3,1,2);
% normalize signals
ds = sds{1};
ds_norm_range{1} = ds(PLOTRANGE,2:end) ./ max(abs(ds(PLOTRANGE,2:end)));
legend_lines = {};
for i=2:length(node_names)
ds = sds{i};
ds_norm_range{i} = ds(PLOTRANGE,2:end) ./ max(abs(ds(PLOTRANGE,2:end)));
for ch=1:2
legend_lines{2 * (i - 2) + ch, 1} = strcat(node_names{i,1}, "-", node_names{1,1}, " CH", num2str(ch));
end
ts = ds(:,1);
diff_data = ((ds_norm_range{i} - ds_norm_range{1})); % ./ ds_norm_range{1});
% for l=1:length(diff_data)
% diff_data(l,:) = diff_data(l,:) ./ ds_norm_range{1}(l,:);
% end
mark = marks(k-1);
plot(ts(PLOTRANGE), diff_data, mark);
end
legend(legend_lines);
ylabel("Erősítéshibával kompenzált különbség")
xlim([ts_ref(PLOTRANGE(1)) ts_ref(PLOTRANGE(end))]);
% plot timestamp errors
subplot(2,1,2);
ds_ref = sds{1};
ts_ref = ds_ref(:,1);
subplot(3,1,3);
legend_lines = {};
for i = 2:len
ts_err = ts(PLOTRANGE) - ts_ref(PLOTRANGE);
plot(ts_ref(PLOTRANGE), ts_err * 1E+09);
plot(ts_ref(PLOTRANGE), ts_err * 1E+09, "-o");
hold on
legend_lines{i - 1, 1} = strcat(node_names{i,1}, "-", node_names{1,1});
end
grid on
xlabel("Time [s]");
ylabel("Time error [ns]");
xlim([ts(PLOTRANGE(1)) ts(PLOTRANGE(end))]);
xlabel("Idő [s]");
ylabel("Időhiba [ns]");
legend(legend_lines);
xlim([ts_ref(PLOTRANGE(1)) ts_ref(PLOTRANGE(end))]);
endfunction
end

169
main.cpp
View File

@ -5,12 +5,75 @@
#include <alsa/asoundlib.h>
#include <math.h>
#include <cmath>
#include <memory>
#include <sys/socket.h>
#include <netinet/in.h>
#include "src/ALSADevice.h"
#include "src/SampleWriter.h"
#include "src/audio_types.h"
#include "src/ServerBeacon.h"
#include "src/GUI.h"
//uint8_t pRecvBuf[8096] __attribute__ ((aligned (32)));
std::shared_ptr<Globals> globs;
int main(int argc, char * argv[]) {
if (argc < 3) {
std::cout << "At least 2 parameters needed:" << std::endl << "wfr data_port multicast_if_addr" << std::endl;
return 0;
}
std::string mcastIFAddr = argv[2];
unsigned short beaconPort = atoi(argv[1]);
// -----------------
//ALSADevice dev(PERIOD_LEN, SAMPLE_RATE);
//MemoryPool<int16_t> pool(STEREO_BUF_LEN, 1000);
//SampleWriter<AudioSampleType> sw(std::string("test_") + argv[1], 2, STEREO_BUF_LEN / 2);
// create globals
globs = std::make_shared<Globals>();
// create beacon
auto beacon = globs->beacon = std::make_shared<ServerBeacon>();
beacon->setInterfaceAddr(mcastIFAddr);
// init and start GUI
Glib::init();
GUI gui(globs);
gui.run();
//AudioPayload * pAudioPayload = reinterpret_cast<AudioPayload *>(pRecvBuf);
// while (true) {
// //while (sw.getSampleCnt() < 100000) {
// auto p = pool.alloc();
//
// recv(soc, pRecvBuf, 8096, 0);
//
// //std::cout << pAudioPayload->timestamp_s << ' ' << pAudioPayload->timestamp_ns << std::endl;
//
// Timestamp ts = { pAudioPayload->timestamp_s, pAudioPayload->timestamp_ns };
// //sw.addSamples(pAudioPayload->pData, ts);
//
// memcpy(p->pBlock, pAudioPayload->pData, STEREO_BUF_LEN * sizeof(AudioSampleType));
//
// dev.write(p);
//
// //std::cout << pool.avail() << std::endl;
// }
//close(soc);
return 0;
}
//int main() {
// long loops;
@ -117,81 +180,37 @@
// return 0;
//}
#define Fs (48000) // sampling frequency
#define F (440) // signal frequency
#define K (0.1) // amplitude
//#define Fs (48000) // sampling frequency
//#define F (440) // signal frequency
//#define K (0.1) // amplitude
//
//void generate_sine(int16_t *pBuf, uint32_t n) {
// double y;
// static double phase = 0, dt = 1.0 / Fs;
//
// uint32_t i = 0;
// for (i = 0; i < n; i++) {
// y = K * 0.5 * (1 + sin(phase));
// phase += 2 * M_PI * dt * F;
//
// if (phase > (2 * M_PI)) {
// phase -= 2 * M_PI;
// }
//
// pBuf[2 * i + 1] = pBuf[2 * i] = ((int16_t) (y * 0x7FFF));
// }
//
//}
void generate_sine(int16_t *pBuf, uint32_t n) {
double y;
static double phase = 0, dt = 1.0 / Fs;
uint32_t i = 0;
for (i = 0; i < n; i++) {
y = K * 0.5 * (1 + sin(phase));
phase += 2 * M_PI * dt * F;
if (phase > (2 * M_PI)) {
phase -= 2 * M_PI;
}
pBuf[2 * i + 1] = pBuf[2 * i] = ((int16_t) (y * 0x7FFF));
}
//std::this_thread::sleep_for(std::chrono::seconds(1));
/*for (size_t i = 0; i < 1000; i++) {
auto p = pool.alloc();
generate_sine(p->pBlock, 324);
dev.write(p);
//std::cout << pool.avail() << std::endl;
}
uint8_t pRecvBuf[8096] __attribute__ ((aligned (32)));
int main(int argc, char * argv[]) {
ALSADevice dev(PERIOD_LEN, SAMPLE_RATE);
MemoryPool<int16_t> pool(STEREO_BUF_LEN, 1000);
//SampleWriter<AudioSampleType> sw(std::string("test_") + argv[1], 2, STEREO_BUF_LEN / 2);
//std::this_thread::sleep_for(std::chrono::seconds(1));
/*for (size_t i = 0; i < 1000; i++) {
auto p = pool.alloc();
generate_sine(p->pBlock, 324);
dev.write(p);
//std::cout << pool.avail() << std::endl;
}
while (true) {
std::this_thread::sleep_for(std::chrono::seconds(1));
}*/
int soc = socket(AF_INET, SOCK_DGRAM, 0);
int opt = 1;
setsockopt(soc, SOL_SOCKET, SO_REUSEADDR | SO_REUSEPORT, &opt, sizeof(opt));
sockaddr_in addr;
addr.sin_family = AF_INET;
addr.sin_addr.s_addr = INADDR_ANY;
addr.sin_port = htons(atoi(argv[1]));
bind(soc, (const sockaddr *)&addr, sizeof(addr));
AudioPayload * pAudioPayload = reinterpret_cast<AudioPayload *>(pRecvBuf);
while (true) {
//while (sw.getSampleCnt() < 100000) {
auto p = pool.alloc();
recv(soc, pRecvBuf, 8096, 0);
//std::cout << pAudioPayload->timestamp_s << ' ' << pAudioPayload->timestamp_ns << std::endl;
Timestamp ts = { pAudioPayload->timestamp_s, pAudioPayload->timestamp_ns };
//sw.addSamples(pAudioPayload->pData, ts);
memcpy(p->pBlock, pAudioPayload->pData, STEREO_BUF_LEN * sizeof(AudioSampleType));
dev.write(p);
//std::cout << pool.avail() << std::endl;
}
close(soc);
return 0;
}
while (true) {
std::this_thread::sleep_for(std::chrono::seconds(1));
}*/

29
src/Callback.h Normal file
View File

@ -0,0 +1,29 @@
//
// Created by epagris on 2021. 11. 18..
//
#ifndef WFR_CALLBACK_H
#define WFR_CALLBACK_H
#include <functional>
template <typename... ParamTypes>
class CallbackBase {
public:
virtual void operator()(ParamTypes... params) = 0;
};
template <typename T, typename... ParamTypes>
class Callback : public CallbackBase<ParamTypes...> {
public:
T * pObj; // pointer to object to call the function on
void(T::*pFN)(ParamTypes... params); // member function pointer
Callback(T * pObj, void(T::*pFN)(ParamTypes... params)) : pObj(pObj), pFN(pFN) {}
void operator()(ParamTypes... params) override {
std::invoke(pFN, *pObj, params...);
}
};
#endif //WFR_CALLBACK_H

355
src/GUI.h Normal file
View File

@ -0,0 +1,355 @@
//
// Created by epagris on 2021. 11. 14..
//
#ifndef WFR_GUI_H
#define WFR_GUI_H
#include <memory>
#include <iostream>
#include <gtk/gtk.h>
#include <gtkmm-3.0/gtkmm.h>
#include "ServerBeacon.h"
#include "globals.h"
#include "Callback.h"
#include "MultiStreamReceiver.h"
class GUI : public Gtk::Application {
public:
template<typename T>
using RefPtr = Glib::RefPtr<T>;
// GLOBALS
private:
std::shared_ptr<Globals> mGlobs;
protected:
// tree view managing client nodes
class ClNodeTW {
private:
Gtk::TreeView *mpTW;
protected:
class ClNodeLSColumns : public Gtk::TreeModel::ColumnRecord {
public:
Gtk::TreeModelColumn<Glib::ustring> mColIP;
Gtk::TreeModelColumn<bool> mColUse;
Gtk::TreeModelColumn<in_addr_t> mColInAddr;
public:
ClNodeLSColumns() {
add(mColIP);
add(mColUse);
add(mColInAddr);
}
};
public:
ClNodeLSColumns cols; // columns
RefPtr<Gtk::ListStore> listStore; // list store object
Gtk::CellRendererText ipR; // IP-renderer
Gtk::CellRendererToggle useR; // use? renderer
std::shared_ptr<CallbackBase<ClNodeTW *>> selectChangeCB; // kiválasztás változásakor hívódik meg
private:
void onToggleUse(const Glib::ustring &path) {
auto row = *(listStore->get_iter(path));
row[cols.mColUse] = !row[cols.mColUse];
if (selectChangeCB != nullptr) {
(*selectChangeCB)(this);
}
}
/*void onRowActivate(const Gtk::TreePath &tp, Gtk::TreeViewColumn *pTVC) {
auto row = *(listStore->get_iter(tp));
auto command = "xdg-open http://" + row[cols.mColIP] + "/";
system(command.c_str());
}*/
protected:
void addRenderers() {
mpTW->append_column("IP Address", ipR);
useR.set_activatable(true);
useR.signal_toggled().connect(sigc::mem_fun(*this, &ClNodeTW::onToggleUse));
mpTW->append_column("Use?", useR);
auto twCols = mpTW->get_columns();
twCols[0]->add_attribute(ipR, "text", 0);
twCols[1]->add_attribute(useR, "active", 1);
}
public:
explicit ClNodeTW(Gtk::TreeView *pTW) : mpTW(pTW) {
listStore = Gtk::ListStore::create(cols); // construct list store
mpTW->set_model(listStore); // set model
//mpTW->set_activate_on_single_click(true);
addRenderers(); // add cell renderers
//mpTW->signal_row_activated().connect(sigc::mem_fun(*this, &ClNodeTW::onRowActivate));
}
void addRow(in_addr_t addr) {
auto row = *(listStore->append());
row[cols.mColInAddr] = addr;
row[cols.mColIP] = inet_ntoa({addr});
row[cols.mColUse] = false;
}
void setList(const std::list<in_addr_t> &addrs) {
// set of entries already in the list
std::list<in_addr_t> entriesFound;
std::list<Gtk::TreeIter> entriesToDelete;
bool atLeastOneSelectedLineDeleted = false;
// check for existing lines
listStore->foreach([&](const Gtk::TreePath &path, const Gtk::TreeIter &iter) {
in_addr_t addr = (*iter)[cols.mColInAddr];
// if entry is already in the list, then don't add it again
if (std::find(addrs.begin(), addrs.end(), addr) != addrs.end()) {
entriesFound.push_back(addr);
} else { // if value is not on the novel list, remove it
if ((*iter)[cols.mColUse]) {
atLeastOneSelectedLineDeleted |= true;
}
entriesToDelete.push_back(iter);
}
return false;
});
// delete non-existent lines
for (const auto &entry: entriesToDelete) {
listStore->erase(entry);
}
// add non-existent, novel lines
for (auto const &addr: addrs) {
if (std::find(entriesFound.begin(), entriesFound.end(), addr) == entriesFound.end()) {
addRow(addr);
}
}
// send notification if a selected row is being deleted
if (atLeastOneSelectedLineDeleted && (selectChangeCB != nullptr)) {
(*selectChangeCB)(this);
}
}
std::vector<in_addr_t> getSelectedLines() const {
std::vector<in_addr_t> selectedLines;
// check for existing lines
listStore->foreach([&](const Gtk::TreePath &path, const Gtk::TreeIter &iter) {
if ((*iter)[cols.mColUse]) {
selectedLines.push_back((*iter)[cols.mColInAddr]);
}
return false;
});
return selectedLines;
}
in_addr_t getHighlightedRow() const {
auto iter = mpTW->get_selection();
if (iter->count_selected_rows() > 0) {
return (*mpTW->get_selection()->get_selected())[cols.mColInAddr];
} else {
return 0;
}
}
};
// BUILDER-RELATED THINGS
protected:
RefPtr<Gtk::Builder> mBuilder;
template<typename T>
T *getWidget(const Glib::ustring &name) { // get widget by name
Gtk::Widget *pWidget;
mBuilder->get_widget(name, pWidget);
return reinterpret_cast<T *>(pWidget);
}
protected: // WIDGETS
Gtk::Button *wNodeRefreshBtn;
Gtk::Button *wStartStopCaptureBtn;
Gtk::FileChooserButton *wFolderChooser;
Gtk::TextView *wInfoPanel;
Gtk::TreeView *wClNodeTW;
std::shared_ptr<ClNodeTW> mClNodeTW; // instance
Gtk::Menu *wClNodeCtxMenu;
Gtk::MenuItem *wCtxEntryWebInterface;
Gtk::MenuItem *wCtxEntryTelnetTerm;
// status flags
protected:
struct {
bool clNodesSelected;
bool destinationFolderSelected;
bool captureRunning;
} mFlags;
// stream receiver
protected:
std::shared_ptr<MultiStreamReceiver> mpMSR; // pointer to a multistream receiver object
public:
void onActivateApp() {
Gtk::Window *pMainWin;
mBuilder->get_widget("main_win", pMainWin);
add_window(*pMainWin);
//mClNodeTW->addRow("10.42.0.64", true);
//mClNodeTW->addRow("10.42.0.65", false);
}
void onScanDone(ServerBeacon *pSB) {
auto nodes = pSB->getNodesOnNetwork();
std::list<in_addr_t> ipList;
// convert binary IP addresses to strings
for (auto const &node: nodes) {
ipList.push_back(node.s_addr);
}
mClNodeTW->setList(ipList);
}
void onRefreshBtnToggled() {
auto beacon = mGlobs->beacon;
wNodeRefreshBtn->set_sensitive(false);
std::shared_ptr<Callback<GUI, ServerBeacon *>> scanCB = std::make_shared<Callback<GUI, ServerBeacon *>>(this, &GUI::onScanDone);
// start beacon
beacon->stopBeacon();
beacon->setScanCallback(scanCB);
beacon->startBeacon();
std::this_thread::sleep_for(std::chrono::milliseconds(200));
mGlobs->beacon->stopBeacon();
wNodeRefreshBtn->set_sensitive(true);
}
void onNodeSelectionChanged(ClNodeTW *pTW) {
auto selectedIPs = pTW->getSelectedLines();
mFlags.clNodesSelected = !selectedIPs.empty();
manageCaptureBtnState();
}
void onFolderSelect() {
mFlags.destinationFolderSelected = !wFolderChooser->get_filename().empty();
manageCaptureBtnState();
}
void onStartStopCaptureBtnClick() {
if (!mFlags.captureRunning) { // start capture
wFolderChooser->set_sensitive(false);
wClNodeTW->set_sensitive(false);
wStartStopCaptureBtn->set_label("Stop capture");
mFlags.captureRunning = true;
mpMSR = std::make_shared<MultiStreamReceiver>(mClNodeTW->getSelectedLines(), wFolderChooser->get_filename());
mpMSR->start();
mGlobs->beacon->execCmdOnAllTerms(std::string("snd connect ") + mGlobs->beacon->getInterfaceAddr() + " " + std::to_string(mpMSR->port));
} else {
mGlobs->beacon->execCmdOnAllTerms("snd disconnect");
mpMSR = nullptr;
wFolderChooser->set_sensitive(true);
wClNodeTW->set_sensitive(true);
wStartStopCaptureBtn->set_label("Start capture");
mFlags.captureRunning = false;
}
}
bool onShowCtxPopupMenu(GdkEventButton *event) {
if (event->type == GDK_2BUTTON_PRESS) {
if (!wClNodeCtxMenu->get_attach_widget()) {
wClNodeCtxMenu->attach_to_widget(*wClNodeTW);
}
wClNodeCtxMenu->popup(event->button, event->time);
return true; //It has been handled.
} else {
return false;
}
}
void onOpenWebInterface() {
in_addr_t node_addr = mClNodeTW->getHighlightedRow();
if (node_addr != 0) {
auto command = std::string("xdg-open http://") + inet_ntoa({node_addr}) + "/";
system(command.c_str());
}
}
void onOpenTelnetClient() {
in_addr_t node_addr = mClNodeTW->getHighlightedRow();
if (node_addr != 0) {
auto port = mGlobs->beacon->getNodeNettermPort({node_addr});
auto command = std::string("putty -raw ") + inet_ntoa({node_addr}) + " " + std::to_string(port) + " &";
system(command.c_str());
}
}
protected:
void manageCaptureBtnState() {
wStartStopCaptureBtn->set_sensitive(mFlags.clNodesSelected && mFlags.destinationFolderSelected);
}
protected:
void fetchWidgets() {
wNodeRefreshBtn = getWidget<Gtk::ToggleButton>("node_refresh_btn");
wStartStopCaptureBtn = getWidget<Gtk::Button>("start_stop_capture_btn");
wFolderChooser = getWidget<Gtk::FileChooserButton>("folder_chooser_btn");
wInfoPanel = getWidget<Gtk::TextView>("info_panel");
wClNodeTW = getWidget<Gtk::TreeView>("client_node_tw");
mClNodeTW = std::make_shared<ClNodeTW>(wClNodeTW);
wClNodeCtxMenu = getWidget<Gtk::Menu>("node_tw_ctx_menu");
wCtxEntryWebInterface = getWidget<Gtk::MenuItem>("ctx_web_interface");
wCtxEntryTelnetTerm = getWidget<Gtk::MenuItem>("ctx_telnet_term");
}
void connectSignals() {
signal_activate().connect(sigc::mem_fun(*this, &GUI::onActivateApp)); // app activate
wNodeRefreshBtn->signal_clicked().connect(sigc::mem_fun(*this, &GUI::onRefreshBtnToggled)); // refresh button
mClNodeTW->selectChangeCB = std::make_shared<Callback<GUI, ClNodeTW *>>(this, &GUI::onNodeSelectionChanged); // select change callback
wClNodeTW->signal_button_press_event().connect(sigc::mem_fun(*this, &GUI::onShowCtxPopupMenu)); // right click context menu popup
wFolderChooser->signal_file_set().connect(sigc::mem_fun(*this, &GUI::onFolderSelect)); // folder selection
wStartStopCaptureBtn->signal_clicked().connect(sigc::mem_fun(*this, &GUI::onStartStopCaptureBtnClick)); // start/stop click
wCtxEntryWebInterface->signal_activate().connect(sigc::mem_fun(*this, &GUI::onOpenWebInterface));
wCtxEntryTelnetTerm->signal_activate().connect(sigc::mem_fun(*this, &GUI::onOpenTelnetClient));
}
void resetFlags() {
memset(&mFlags, 0, sizeof(mFlags));
}
public:
explicit GUI(const std::shared_ptr<Globals> &globs) : Gtk::Application("com.epagris.samrecv"), mGlobs(globs) { // constr.
resetFlags();
mBuilder = Gtk::Builder::create_from_file("gui/samprecv.glade");
fetchWidgets();
connectSignals(); // connect signals
}
};
#endif //WFR_GUI_H

154
src/MultiStreamReceiver.cpp Normal file
View File

@ -0,0 +1,154 @@
//
// Created by epagris on 2021. 11. 22..
//
#include <arpa/inet.h>
#include <memory>
#include <thread>
#include <unistd.h>
#include "MultiStreamReceiver.h"
#include "audio_types.h"
MultiStreamReceiver::MultiStreamReceiver(const std::vector<in_addr_t> &nodes, const std::string& targetDir, unsigned short port) : port(port) {
mTargetDir = targetDir;
mRunning = false;
mClientNodes = nodes;
}
MultiStreamReceiver::~MultiStreamReceiver() {
stop();
}
void MultiStreamReceiver::startNetworking() {
// open socket
mSoc = socket(AF_INET, SOCK_DGRAM, 0);
if (mSoc == -1) {
throw std::runtime_error(std::string("Could not create UDP-socket in ") + __FUNCTION__ + "!");
}
int opt = 1;
if (setsockopt(mSoc, SOL_SOCKET, SO_REUSEADDR | SO_REUSEPORT, &opt, sizeof(opt)) == -1) {
throw std::runtime_error(std::string("Could not set socket options in ") + __FUNCTION__ + "!");
}
sockaddr_in addr;
addr.sin_family = AF_INET;
addr.sin_addr.s_addr = INADDR_ANY;
addr.sin_port = htons(port);
// bind
if (bind(mSoc, (const sockaddr *)&addr, sizeof(addr)) == -1) {
throw std::runtime_error(std::string("Could not bind socket options in ") + __FUNCTION__ + "!");
}
}
void MultiStreamReceiver::createSampleWriters() {
struct stat sb;
if (stat(mTargetDir.c_str(), &sb) == 0 && S_ISDIR(sb.st_mode)) { // if directory exists then leave it as it is
//unlink(datasetName.c_str()); FIXME
} else if (mkdir(mTargetDir.c_str(), 0777) == -1) { // else: create directory for dataset
throw std::runtime_error("Could not create directory " + mTargetDir + "!");
}
if (chdir(mTargetDir.c_str()) == -1) {
throw std::runtime_error("Could not change to target directory!");
}
std::string nodeDirHint = "";
for (unsigned int & mClientNode : mClientNodes) {
std::string datasetName = std::string("node_") + inet_ntoa({ mClientNode });
mpSampleWriters.emplace_back(std::make_shared<SampleWriter<int16_t>>(datasetName, 2, STEREO_BUF_LEN / 2));
nodeDirHint += datasetName + "\n";
}
std::ofstream hintFile("node_dir_hint.txt", std::ios_base::out);
hintFile << nodeDirHint;
hintFile.close();
if (chdir("..") == -1) {
throw std::runtime_error("Could not change to target directory!");
}
}
void MultiStreamReceiver::start() {
if (mRunning) {
return;
}
// ----------------------------
createSampleWriters();
startNetworking();
// ----------------------------
mRunning = true;
mRecvThread = std::make_shared<std::thread>(fnRecv, this);
}
void MultiStreamReceiver::stop() {
if (!mRunning) {
return;
}
mRunning = false;
mRecvThread->join();
close(mSoc);
mpSampleWriters.clear();
}
// --------------------------------------
#define RECV_BUFFER_SIZE (16000)
void MultiStreamReceiver::fnRecv(MultiStreamReceiver *pMSR) {
// select-related structures
fd_set read_fds;
timeval tv;
// receive buffer
std::shared_ptr<uint32_t> pWordBuf = std::shared_ptr<uint32_t>(new uint32_t[(RECV_BUFFER_SIZE / 4) + 1]); // to have 4-byte alignment
auto * pRecvBuf = reinterpret_cast<uint8_t *>(pWordBuf.get());
auto * pAudioPayload = reinterpret_cast<AudioPayload *>(pWordBuf.get());
while (pMSR->mRunning) {
// fd-set
FD_ZERO(&read_fds);
FD_SET(pMSR->mSoc, &read_fds);
// timeout
tv.tv_sec = 0;
tv.tv_usec = 1E+06 / 4;
// wait for data
int fd_cnt = select(pMSR->mSoc + 1, &read_fds, nullptr, nullptr, &tv);
// if data is available on the socket
if ((fd_cnt > 0) && FD_ISSET(pMSR->mSoc, &read_fds)) {
ssize_t recv_size;
if ((recv_size = recv(pMSR->mSoc, pRecvBuf, RECV_BUFFER_SIZE, 0)) <= 0) {
throw std::runtime_error("Receive error!");
}
// get sample writer for the specific address
std::shared_ptr<SampleWriter<int16_t>> pSampleWriter = nullptr;
for (size_t i = 0; i < pMSR->mClientNodes.size(); i++) {
if (pMSR->mClientNodes[i] == pAudioPayload->addr) {
pSampleWriter = pMSR->mpSampleWriters[i];
break;
}
}
// if sample writer is found
if (pSampleWriter != nullptr) {
pSampleWriter->addSamples(pAudioPayload->pData, { pAudioPayload->timestamp_s, pAudioPayload->timestamp_ns });
}
}
}
}

41
src/MultiStreamReceiver.h Normal file
View File

@ -0,0 +1,41 @@
//
// Created by epagris on 2021. 11. 22..
//
#ifndef WFR_MULTISTREAMRECEIVER_H
#define WFR_MULTISTREAMRECEIVER_H
#include <netinet/in.h>
#include <vector>
#include <memory>
#include <thread>
#include "SampleWriter.h"
class MultiStreamReceiver {
public:
static constexpr unsigned short DEFAULT_PORT = 20220; // default port
private:
std::vector<in_addr_t> mClientNodes; // client node addresses
int mSoc; // UDP-socket for receiving samples
std::shared_ptr<std::thread> mRecvThread; // thread for reception
static void fnRecv(MultiStreamReceiver * pMSR); // routine function running in separate thread
bool mRunning;
std::vector<std::shared_ptr<SampleWriter<int16_t>>> mpSampleWriters; // sample writers for streams
std::string mTargetDir; // target directory
private:
void startNetworking(); // start networking
void createSampleWriters(); // construct sample writers
public:
const unsigned short port; // port
public:
explicit MultiStreamReceiver(const std::vector<in_addr_t>& nodes, const std::string& targetDir, unsigned short port = DEFAULT_PORT);
void start(); // start reception
void stop();
virtual ~MultiStreamReceiver();
// stop reception
};
#endif //WFR_MULTISTREAMRECEIVER_H

View File

@ -17,7 +17,7 @@
#define DIRSEP '/'
template<typename T> // ch: number of channels, T: sample datatype
template<typename T> // T: sample datatype
class SampleWriter {
public:
static const std::string TS_FILENAME; // filename for timestamp file

View File

@ -5,17 +5,18 @@
#include <iostream>
#include <memory>
#include <unistd.h>
#include <cstring>
#include "ServerBeacon.h"
const in_addr ServerBeacon::DEFAULT_MULTICAST_ADDR = {inet_addr("224.0.2.21")};
ServerBeacon::ServerBeacon() : mRunning(false) {
ServerBeacon::ServerBeacon() : mRunning(false), mScanCallback(nullptr) {
setMulticastAddr(DEFAULT_MULTICAST_ADDR);
setPort(DEFAULT_PORT);
setAnnouncePeriod(DEFAULT_ANNOUNCE_PERIOD_MS);
}
ServerBeacon::ServerBeacon(const in_addr &addr, unsigned short port, size_t announcePeriod_ms) : mRunning(false) {
ServerBeacon::ServerBeacon(const in_addr &addr, unsigned short port, size_t announcePeriod_ms) : mRunning(false), mScanCallback(nullptr) {
setMulticastAddr(addr);
setPort(port);
setAnnouncePeriod(announcePeriod_ms);
@ -62,18 +63,28 @@ void ServerBeacon::startBeacon() {
int opt = 1;
setsockopt(mBeaconSoc, SOL_SOCKET, SO_REUSEADDR | SO_REUSEPORT, &opt, sizeof(opt));
// bind to ip-address and port
sockaddr_in addr;
addr.sin_family = AF_INET;
addr.sin_addr = mAddr;
addr.sin_port = htons(mBeaconPort);
// fill-in ip-address and port
mMulticastAddr.sin_family = AF_INET;
mMulticastAddr.sin_addr = mAddr;
mMulticastAddr.sin_port = htons(mBeaconPort);
if (bind(mBeaconSoc, (const sockaddr *) &addr, sizeof(addr)) == -1) {
// bind to port
sockaddr_in bindAddr = mMulticastAddr;
bindAddr.sin_addr.s_addr = htonl(INADDR_ANY);
if (bind(mBeaconSoc, (const sockaddr *) &bindAddr, sizeof(sockaddr_in)) == -1) {
std::cerr << "Could not bind to IP address and/or port in " << __FUNCTION__ << "!" << std::endl;
close(mBeaconSoc);
return;
}
// join multicast group
ip_mreq mreq;
mreq.imr_multiaddr = mAddr;
mIFAddr.s_addr = mreq.imr_interface.s_addr = inet_addr(mIFAddrStr.c_str());
setsockopt(mBeaconSoc, IPPROTO_IP, IP_ADD_MEMBERSHIP, &mreq, sizeof(mreq));
setsockopt(mBeaconSoc, IPPROTO_IP, IP_MULTICAST_IF, &mreq.imr_interface, sizeof(in_addr));
// the beacon is running now!
mRunning = true;
@ -82,11 +93,27 @@ void ServerBeacon::startBeacon() {
}
void ServerBeacon::stopBeacon() {
if (!mRunning){
return;
}
mRunning = false;
mpBeaconThread->join();
close(mBeaconSoc);
}
std::list<in_addr> ServerBeacon::getNodesOnNetwork() {
return std::list<in_addr>();
std::list<in_addr> nodeAddrs;
mBeaconMtx.lock();
for (auto nodeInfo : mNodes) {
nodeAddrs.push_back(nodeInfo.addr);
}
mBeaconMtx.unlock();
return nodeAddrs;
}
@ -96,38 +123,143 @@ void ServerBeacon::fn_BeaconThread(ServerBeacon *pSB) {
// beacon message used to send and receive information
BeaconMsg msg;
//
// select-related structures
fd_set read_fds;
timeval tv;
// fill-in select parameters
FD_ZERO(&read_fds);
FD_SET(pSB->mBeaconSoc, &read_fds);
// fill-in timeout value
tv.tv_sec = pSB->mAnnPeriod_ms / 1000;
tv.tv_usec = (pSB->mAnnPeriod_ms % 1000) * 1000;
// list of detected nodes
std::list<ClientNodeInfo> clNodes;
while (pSB->mRunning) {
msg.senderAddr = pSB->mIFAddr; // set our address
msg.server_nClient = 1; // we are the server
msg.terminalPort = 0; // we have no inbound terminal port
send(pSB->mBeaconSoc, &msg, sizeof(BeaconMsg), 0); // send the announce message
sendto(pSB->mBeaconSoc, &msg, sizeof(BeaconMsg), 0, (sockaddr *) &pSB->mMulticastAddr, sizeof(sockaddr_in)); // send the announce message
// ------------------
// copy parameters
fd_set cur_rfds = read_fds;
timeval cur_tv = tv;
// fill-in select parameters
FD_ZERO(&read_fds);
FD_SET(pSB->mBeaconSoc, &read_fds);
while (select(1, &cur_rfds, nullptr, nullptr, &cur_tv) != 0) {
// fill-in timeout value
tv.tv_sec = pSB->mAnnPeriod_ms / 1000;
tv.tv_usec = (pSB->mAnnPeriod_ms % 1000) * 1000;
// clear list
clNodes.clear();
while (select(pSB->mBeaconSoc + 1, &read_fds, nullptr, nullptr, &tv) != 0) {
// receive response
recv(pSB->mBeaconSoc, &msg, sizeof(BeaconMsg), 0);
// store response
// consider only client responses
if (msg.server_nClient == 0) {
// store response
ClientNodeInfo clInfo;
clInfo.addr = msg.senderAddr;
clInfo.terminalPort = msg.terminalPort;
clNodes.push_back(clInfo);
//std::cout << inet_ntoa(clInfo.addr) << std::endl;
}
// -------------------------------------------
// fill-in select parameters
FD_ZERO(&read_fds);
FD_SET(pSB->mBeaconSoc, &read_fds);
// fill-in timeout value
tv.tv_sec = pSB->mAnnPeriod_ms / 1000;
tv.tv_usec = (pSB->mAnnPeriod_ms % 1000) * 1000;
}
// copy list of detected nodes
pSB->mBeaconMtx.lock();
pSB->mNodes = clNodes;
pSB->mBeaconMtx.unlock();
// invoke callback
if (pSB->mScanCallback != nullptr) {
(*pSB->mScanCallback)(pSB);
}
}
}
void ServerBeacon::setScanCallback(const std::shared_ptr<CallbackBase<ServerBeacon *>>& scanCB) {
mScanCallback = scanCB;
}
void ServerBeacon::setInterfaceAddr(const std::string &interfaceAddr) {
mIFAddrStr = interfaceAddr;
}
std::string ServerBeacon::getInterfaceAddr() const {
return mIFAddrStr;
}
unsigned short ServerBeacon::getNodeNettermPort(in_addr addr) {
mBeaconMtx.lock();
int nettermPort = -1;
for (const auto& nodeInfo : mNodes) {
if (nodeInfo.addr.s_addr == addr.s_addr) {
nettermPort = nodeInfo.terminalPort;
break;
}
}
mBeaconMtx.unlock();
if (nettermPort == -1) {
throw std::runtime_error(std::string("Node not found in database! (") + inet_ntoa(addr) + ")!");
}
return (unsigned short)nettermPort;
}
void ServerBeacon::sendNettermCmd(in_addr addr, unsigned short port, const std::string &cmd) {
int soc = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP);
if (soc == -1) {
throw std::runtime_error("Could not create TCP socket to execute netterm commands!");
}
sockaddr_in serverAddr;
memset(&serverAddr, 0, sizeof(sockaddr_in));
serverAddr.sin_addr = addr;
serverAddr.sin_port = htons(port);
serverAddr.sin_family = AF_INET;
if (connect(soc, (const sockaddr *)&serverAddr, sizeof(sockaddr_in)) == -1) {
throw std::runtime_error(std::string("Could not connect to netterm! (") + inet_ntoa(addr) + ":" + std::to_string(port) + ")");
}
static std::string nodeftty_cmd = "nodeftty";
send(soc, "nodeftty", nodeftty_cmd.size(), 0);
// send command
send(soc, cmd.c_str(), cmd.size(), 0);
close(soc);
}
void ServerBeacon::execCmdOnNode(in_addr addr, const std::string &cmd) {
unsigned short port = getNodeNettermPort(addr);
sendNettermCmd(addr, port, cmd);
}
void ServerBeacon::execCmdOnAllTerms(const std::string &cmd) {
mBeaconMtx.lock();
for (const auto& nodeInfo : mNodes) {
sendNettermCmd(nodeInfo.addr, nodeInfo.terminalPort, cmd);
}
mBeaconMtx.unlock();
}

View File

@ -10,6 +10,8 @@
#include <netinet/in.h>
#include <arpa/inet.h>
#include <thread>
#include <mutex>
#include "Callback.h"
class ServerBeacon {
public:
@ -24,20 +26,28 @@ private:
};
private:
int mBeaconSoc; // beacon socket
in_addr mAddr; // address
in_addr mAddr; // multicast address
sockaddr_in mMulticastAddr; // multicast address structure
unsigned short mBeaconPort; // port
size_t mAnnPeriod_ms; // announce period
std::list<ClientNodeInfo> mNodes; // nodes
bool mRunning; // does the beacon operate?
std::mutex mBeaconMtx; // beacon mutex
std::string mIFAddrStr; // multicast interface address STRING
in_addr mIFAddr; // multicast interface address
std::shared_ptr<CallbackBase<ServerBeacon *>> mScanCallback; // scan callback
private:
std::shared_ptr<std::thread> mpBeaconThread; // thread running the beacon
static void fn_BeaconThread(ServerBeacon * pSB); // function running in beacon thread
private:
// structure for beacon MSG
struct BeaconMsg {
uint8_t server_nClient; // server or client
in_addr senderAddr; // address of sender
uint16_t terminalPort; // port of terminal
uint8_t server_nClient; // server or client
};
private:
void sendNettermCmd(in_addr addr, unsigned short port, const std::string& cmd); // send netterm cmd
public:
ServerBeacon(); // constr.
explicit ServerBeacon(const in_addr& addr, unsigned short port, size_t announcePeriod_ms); // constr. with beacon port and announce period
@ -47,9 +57,15 @@ public:
unsigned short getPort() const; // get beacon port
void setAnnouncePeriod(size_t period_ms); // set announce period
size_t getAnnouncePeriod() const; // get announce period
void setInterfaceAddr(const std::string &interfaceAddr); // set multicast interface address
std::string getInterfaceAddr() const; // get multicast interface address
void startBeacon(); // start the beacon
void stopBeacon(); // stop the beacon
std::list<in_addr> getNodesOnNetwork(); // get nodes connected to the same network
unsigned short getNodeNettermPort(in_addr addr); // get network terminal port of ndoe
void setScanCallback(const std::shared_ptr<CallbackBase<ServerBeacon *>>& scanCB); // set scan callback
void execCmdOnNode(in_addr addr, const std::string& cmd); // execute command on a node through netterm
void execCmdOnAllTerms(const std::string& cmd); // multicast command to all netterms
};

View File

@ -16,6 +16,7 @@ typedef int16_t AudioSampleType;
typedef struct {
uint32_t timestamp_s, timestamp_ns; // timestamp
uint32_t sample_cnt; // count of samples in packet
in_addr_t addr; // client node address
AudioSampleType pData[STEREO_BUF_LEN]; // buffer for stereo data
} AudioPayload;

15
src/globals.h Normal file
View File

@ -0,0 +1,15 @@
//
// Created by epagris on 2021. 11. 16..
//
#ifndef WFR_GLOBALS_H
#define WFR_GLOBALS_H
#include <memory>
#include "ServerBeacon.h"
struct Globals {
std::shared_ptr<ServerBeacon> beacon; // beacon to detect client nodes
};
#endif //WFR_GLOBALS_H