From cff343e959609adae34acf2335410c22cc1975a2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Wiesner=20Andr=C3=A1s?= Date: Sun, 19 Dec 2021 13:05:27 +0100 Subject: [PATCH] final degree paper state --- CMakeLists.txt | 28 ++- MATLAB/read_multistream_dir.m | 15 ++ MATLAB/sync_datasets.m | 111 +++++++++-- main.cpp | 169 +++++++++------- src/Callback.h | 29 +++ src/GUI.h | 355 ++++++++++++++++++++++++++++++++++ src/MultiStreamReceiver.cpp | 154 +++++++++++++++ src/MultiStreamReceiver.h | 41 ++++ src/SampleWriter.h | 2 +- src/ServerBeacon.cpp | 180 ++++++++++++++--- src/ServerBeacon.h | 20 +- src/audio_types.h | 1 + src/globals.h | 15 ++ 13 files changed, 993 insertions(+), 127 deletions(-) create mode 100644 MATLAB/read_multistream_dir.m create mode 100644 src/Callback.h create mode 100644 src/GUI.h create mode 100644 src/MultiStreamReceiver.cpp create mode 100644 src/MultiStreamReceiver.h create mode 100644 src/globals.h diff --git a/CMakeLists.txt b/CMakeLists.txt index 459d152..d832193 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -3,11 +3,11 @@ cmake_minimum_required(VERSION 3.20) SET(APP_NAME wfr) project(${APP_NAME}) -set(CMAKE_CXX_STANDARD 14) +set(CMAKE_CXX_STANDARD 17) add_executable(${APP_NAME} main.cpp src/ALSADevice.cpp src/ALSADevice.h src/MemoryPool.cpp src/MemoryPool.h utils/Semaphore.h utils/Semaphore.cpp src/SampleWriter.h src/Timestamp.h #src/SampleReceiver.h - src/audio_types.h src/ServerBeacon.cpp src/ServerBeacon.h) + src/audio_types.h src/ServerBeacon.cpp src/ServerBeacon.h src/GUI.h src/globals.h src/Callback.h src/MultiStreamReceiver.cpp src/MultiStreamReceiver.h) find_package(Threads REQUIRED) if (THREADS_FOUND) @@ -16,6 +16,26 @@ endif (THREADS_FOUND) find_package(ALSA REQUIRED) if (ALSA_FOUND) - include_directories(${ALSA_INCLUDE_DIRS}) + target_include_directories(${APP_NAME} PUBLIC ${ALSA_INCLUDE_DIRS}) target_link_libraries(${APP_NAME} ${ALSA_LIBRARIES}) -endif (ALSA_FOUND) \ No newline at end of file +endif (ALSA_FOUND) + +find_package(PkgConfig REQUIRED) +pkg_check_modules(GTK3 REQUIRED gtk+-3.0) +if (GTK3_FOUND) + target_include_directories(${APP_NAME} PUBLIC ${GTK3_INCLUDE_DIRS}) + target_link_directories(${APP_NAME} PUBLIC ${GTK3_LIBRARY_DIRS}) + add_definitions(${GTK3_CFLAGS_OTHER}) + target_link_libraries(${APP_NAME} ${GTK3_LIBRARIES}) +endif(GTK3_FOUND) + +pkg_check_modules(GTKMM REQUIRED gtkmm-3.0) +if (GTKMM_FOUND) + target_include_directories(${APP_NAME} PUBLIC ${GTKMM_INCLUDE_DIRS}) + target_link_directories(${APP_NAME} PUBLIC ${GTKMM_LIBRARY_DIRS}) + target_link_libraries(${APP_NAME} ${GTKMM_LIBRARIES}) +endif(GTKMM_FOUND) + +install(TARGETS ${APP_NAME} RUNTIME DESTINATION ${PROJECT_SOURCE_DIR}/runtime ARCHIVE DESTINATION ${PROJECT_SOURCE_DIR}/runtime) + +set(CMAKE_CXX_FLAGS ${CMAKE_CXX_FLAGS} "-rdynamic") \ No newline at end of file diff --git a/MATLAB/read_multistream_dir.m b/MATLAB/read_multistream_dir.m new file mode 100644 index 0000000..8588bc2 --- /dev/null +++ b/MATLAB/read_multistream_dir.m @@ -0,0 +1,15 @@ +function read_multistream_dir(d, prefix, plotrange, onlydata) + % get directories containing samples + hint_file_name = strcat('./runtime/', d, '/node_dir_hint.txt'); + hint_file = fopen(hint_file_name); + node_dirs = textscan(hint_file, '%s'); + fclose(hint_file); + + node_names = strrep(node_dirs{1,1}, prefix, ""); + + node_dirs = strcat('./runtime/', d, '/', node_dirs{1,1}); + + sync_datasets(node_dirs, node_names, plotrange, onlydata); + + print(strcat('./runtime/', d, '/figure.svg'), '-dsvg'); +end \ No newline at end of file diff --git a/MATLAB/sync_datasets.m b/MATLAB/sync_datasets.m index 71319f8..4b551d7 100644 --- a/MATLAB/sync_datasets.m +++ b/MATLAB/sync_datasets.m @@ -1,4 +1,4 @@ -function sds = sync_datasets(dss) +function sds = sync_datasets(dss, node_names, PLOTRANGE, ONLYDATA) % load datasets len = length(dss); sds = {}; @@ -22,15 +22,28 @@ function sds = sync_datasets(dss) sds{i} = ds; end + % reference dataset and timescale + ds_ref = sds{1}; + ts_ref = ds_ref(:,1); + % create plot - figure(1) + if (~ONLYDATA) + figure('Position', [100 100 1000 1000]) + else + figure('Position', [100 100 1000 400]) + end + clf % plot samples - subplot(2,1,1); + if (~ONLYDATA) + subplot(3,1,1); + end - marks = ['-', '-o']; + marks = ["-x", "-o"]; - PLOTRANGE = 1:100; + %PLOTRANGE = 1017:1020; + + VOLT_PER_BIN = 42.80E-06; % get beginning indices for i=1:len @@ -39,34 +52,90 @@ function sds = sync_datasets(dss) s = size(ds); - %for k = 2:s(2); - for k = 2:2 - %plot(ts(1:100),ds(1:100,k), marks(k-1)); - plot(ts(PLOTRANGE),ds(PLOTRANGE,k), 'x'); + for k = 2:s(2) + %for k = 2:2 + if (ONLYDATA) + mark = "-"; + else + mark = marks(k-1); + end + + plot(ts(PLOTRANGE), ds(PLOTRANGE,k) * VOLT_PER_BIN, mark); hold on end end grid on - xlabel("Time [s]"); - ylabel("Sample"); - xlim([ts(PLOTRANGE(1)) ts(PLOTRANGE(end))]); + + if (ONLYDATA) + xlabel("Idő [s]"); + end + + ylabel("Feszültség [V]"); + xlim([ts_ref(PLOTRANGE(1)) ts_ref(PLOTRANGE(end))]); + + legend_lines = {}; + + for i=1:length(node_names) + for ch=1:2 + legend_lines{2 * (i - 1) + ch, 1} = strcat(node_names{i,1}, " CH", num2str(ch)); + end + end + + legend(legend_lines); + + if (ONLYDATA) + return; + end + + % plot signal difference + subplot(3,1,2); + + % normalize signals + ds = sds{1}; + ds_norm_range{1} = ds(PLOTRANGE,2:end) ./ max(abs(ds(PLOTRANGE,2:end))); + legend_lines = {}; + + for i=2:length(node_names) + ds = sds{i}; + ds_norm_range{i} = ds(PLOTRANGE,2:end) ./ max(abs(ds(PLOTRANGE,2:end))); + + for ch=1:2 + legend_lines{2 * (i - 2) + ch, 1} = strcat(node_names{i,1}, "-", node_names{1,1}, " CH", num2str(ch)); + end + + ts = ds(:,1); + diff_data = ((ds_norm_range{i} - ds_norm_range{1})); % ./ ds_norm_range{1}); + +% for l=1:length(diff_data) +% diff_data(l,:) = diff_data(l,:) ./ ds_norm_range{1}(l,:); +% end + + mark = marks(k-1); + plot(ts(PLOTRANGE), diff_data, mark); + end + + legend(legend_lines); + ylabel("Erősítéshibával kompenzált különbség") + xlim([ts_ref(PLOTRANGE(1)) ts_ref(PLOTRANGE(end))]); % plot timestamp errors - subplot(2,1,2); - - ds_ref = sds{1}; - ts_ref = ds_ref(:,1); + subplot(3,1,3); + + legend_lines = {}; for i = 2:len ts_err = ts(PLOTRANGE) - ts_ref(PLOTRANGE); - plot(ts_ref(PLOTRANGE), ts_err * 1E+09); + plot(ts_ref(PLOTRANGE), ts_err * 1E+09, "-o"); hold on + + legend_lines{i - 1, 1} = strcat(node_names{i,1}, "-", node_names{1,1}); end grid on - xlabel("Time [s]"); - ylabel("Time error [ns]"); - xlim([ts(PLOTRANGE(1)) ts(PLOTRANGE(end))]); + xlabel("Idő [s]"); + ylabel("Időhiba [ns]"); + legend(legend_lines); + xlim([ts_ref(PLOTRANGE(1)) ts_ref(PLOTRANGE(end))]); -endfunction +end diff --git a/main.cpp b/main.cpp index 87a72eb..5654101 100644 --- a/main.cpp +++ b/main.cpp @@ -5,12 +5,75 @@ #include -#include +#include +#include #include #include #include "src/ALSADevice.h" #include "src/SampleWriter.h" #include "src/audio_types.h" +#include "src/ServerBeacon.h" +#include "src/GUI.h" + +//uint8_t pRecvBuf[8096] __attribute__ ((aligned (32))); + +std::shared_ptr globs; + +int main(int argc, char * argv[]) { + if (argc < 3) { + std::cout << "At least 2 parameters needed:" << std::endl << "wfr data_port multicast_if_addr" << std::endl; + return 0; + } + + std::string mcastIFAddr = argv[2]; + unsigned short beaconPort = atoi(argv[1]); + + // ----------------- + + //ALSADevice dev(PERIOD_LEN, SAMPLE_RATE); + //MemoryPool pool(STEREO_BUF_LEN, 1000); + //SampleWriter sw(std::string("test_") + argv[1], 2, STEREO_BUF_LEN / 2); + + // create globals + globs = std::make_shared(); + + // create beacon + auto beacon = globs->beacon = std::make_shared(); + beacon->setInterfaceAddr(mcastIFAddr); + + // init and start GUI + Glib::init(); + GUI gui(globs); + gui.run(); + + + + //AudioPayload * pAudioPayload = reinterpret_cast(pRecvBuf); + +// while (true) { +// //while (sw.getSampleCnt() < 100000) { +// auto p = pool.alloc(); +// +// recv(soc, pRecvBuf, 8096, 0); +// +// //std::cout << pAudioPayload->timestamp_s << ' ' << pAudioPayload->timestamp_ns << std::endl; +// +// Timestamp ts = { pAudioPayload->timestamp_s, pAudioPayload->timestamp_ns }; +// //sw.addSamples(pAudioPayload->pData, ts); +// +// memcpy(p->pBlock, pAudioPayload->pData, STEREO_BUF_LEN * sizeof(AudioSampleType)); +// +// dev.write(p); +// +// //std::cout << pool.avail() << std::endl; +// } + + //close(soc); + + return 0; +} + + //int main() { // long loops; @@ -117,81 +180,37 @@ // return 0; //} -#define Fs (48000) // sampling frequency -#define F (440) // signal frequency -#define K (0.1) // amplitude +//#define Fs (48000) // sampling frequency +//#define F (440) // signal frequency +//#define K (0.1) // amplitude +// +//void generate_sine(int16_t *pBuf, uint32_t n) { +// double y; +// static double phase = 0, dt = 1.0 / Fs; +// +// uint32_t i = 0; +// for (i = 0; i < n; i++) { +// y = K * 0.5 * (1 + sin(phase)); +// phase += 2 * M_PI * dt * F; +// +// if (phase > (2 * M_PI)) { +// phase -= 2 * M_PI; +// } +// +// pBuf[2 * i + 1] = pBuf[2 * i] = ((int16_t) (y * 0x7FFF)); +// } +// +//} -void generate_sine(int16_t *pBuf, uint32_t n) { - double y; - static double phase = 0, dt = 1.0 / Fs; - - uint32_t i = 0; - for (i = 0; i < n; i++) { - y = K * 0.5 * (1 + sin(phase)); - phase += 2 * M_PI * dt * F; - - if (phase > (2 * M_PI)) { - phase -= 2 * M_PI; - } - - pBuf[2 * i + 1] = pBuf[2 * i] = ((int16_t) (y * 0x7FFF)); - } +//std::this_thread::sleep_for(std::chrono::seconds(1)); +/*for (size_t i = 0; i < 1000; i++) { + auto p = pool.alloc(); + generate_sine(p->pBlock, 324); + dev.write(p); + //std::cout << pool.avail() << std::endl; } -uint8_t pRecvBuf[8096] __attribute__ ((aligned (32))); - -int main(int argc, char * argv[]) { - ALSADevice dev(PERIOD_LEN, SAMPLE_RATE); - MemoryPool pool(STEREO_BUF_LEN, 1000); - //SampleWriter sw(std::string("test_") + argv[1], 2, STEREO_BUF_LEN / 2); - - //std::this_thread::sleep_for(std::chrono::seconds(1)); - - /*for (size_t i = 0; i < 1000; i++) { - auto p = pool.alloc(); - generate_sine(p->pBlock, 324); - dev.write(p); - //std::cout << pool.avail() << std::endl; - } - - while (true) { - std::this_thread::sleep_for(std::chrono::seconds(1)); - }*/ - - int soc = socket(AF_INET, SOCK_DGRAM, 0); - - int opt = 1; - setsockopt(soc, SOL_SOCKET, SO_REUSEADDR | SO_REUSEPORT, &opt, sizeof(opt)); - - sockaddr_in addr; - addr.sin_family = AF_INET; - addr.sin_addr.s_addr = INADDR_ANY; - addr.sin_port = htons(atoi(argv[1])); - - bind(soc, (const sockaddr *)&addr, sizeof(addr)); - - AudioPayload * pAudioPayload = reinterpret_cast(pRecvBuf); - - while (true) { - //while (sw.getSampleCnt() < 100000) { - auto p = pool.alloc(); - - recv(soc, pRecvBuf, 8096, 0); - - //std::cout << pAudioPayload->timestamp_s << ' ' << pAudioPayload->timestamp_ns << std::endl; - - Timestamp ts = { pAudioPayload->timestamp_s, pAudioPayload->timestamp_ns }; - //sw.addSamples(pAudioPayload->pData, ts); - - memcpy(p->pBlock, pAudioPayload->pData, STEREO_BUF_LEN * sizeof(AudioSampleType)); - - dev.write(p); - - //std::cout << pool.avail() << std::endl; - } - - close(soc); - - return 0; -} \ No newline at end of file +while (true) { + std::this_thread::sleep_for(std::chrono::seconds(1)); +}*/ \ No newline at end of file diff --git a/src/Callback.h b/src/Callback.h new file mode 100644 index 0000000..fe40108 --- /dev/null +++ b/src/Callback.h @@ -0,0 +1,29 @@ +// +// Created by epagris on 2021. 11. 18.. +// + +#ifndef WFR_CALLBACK_H +#define WFR_CALLBACK_H + +#include + +template +class CallbackBase { +public: + virtual void operator()(ParamTypes... params) = 0; +}; + +template +class Callback : public CallbackBase { +public: + T * pObj; // pointer to object to call the function on + void(T::*pFN)(ParamTypes... params); // member function pointer + + Callback(T * pObj, void(T::*pFN)(ParamTypes... params)) : pObj(pObj), pFN(pFN) {} + + void operator()(ParamTypes... params) override { + std::invoke(pFN, *pObj, params...); + } +}; + +#endif //WFR_CALLBACK_H diff --git a/src/GUI.h b/src/GUI.h new file mode 100644 index 0000000..9f5eed9 --- /dev/null +++ b/src/GUI.h @@ -0,0 +1,355 @@ +// +// Created by epagris on 2021. 11. 14.. +// + +#ifndef WFR_GUI_H +#define WFR_GUI_H + +#include +#include + +#include +#include + +#include "ServerBeacon.h" +#include "globals.h" +#include "Callback.h" +#include "MultiStreamReceiver.h" + +class GUI : public Gtk::Application { +public: + template + using RefPtr = Glib::RefPtr; + + // GLOBALS +private: + std::shared_ptr mGlobs; +protected: + // tree view managing client nodes + class ClNodeTW { + private: + Gtk::TreeView *mpTW; + protected: + class ClNodeLSColumns : public Gtk::TreeModel::ColumnRecord { + public: + Gtk::TreeModelColumn mColIP; + Gtk::TreeModelColumn mColUse; + Gtk::TreeModelColumn mColInAddr; + public: + ClNodeLSColumns() { + add(mColIP); + add(mColUse); + add(mColInAddr); + } + }; + + public: + ClNodeLSColumns cols; // columns + RefPtr listStore; // list store object + Gtk::CellRendererText ipR; // IP-renderer + Gtk::CellRendererToggle useR; // use? renderer + std::shared_ptr> selectChangeCB; // kiválasztás változásakor hívódik meg + private: + void onToggleUse(const Glib::ustring &path) { + auto row = *(listStore->get_iter(path)); + row[cols.mColUse] = !row[cols.mColUse]; + + if (selectChangeCB != nullptr) { + (*selectChangeCB)(this); + } + } + + /*void onRowActivate(const Gtk::TreePath &tp, Gtk::TreeViewColumn *pTVC) { + auto row = *(listStore->get_iter(tp)); + auto command = "xdg-open http://" + row[cols.mColIP] + "/"; + system(command.c_str()); + }*/ + + protected: + void addRenderers() { + mpTW->append_column("IP Address", ipR); + + useR.set_activatable(true); + useR.signal_toggled().connect(sigc::mem_fun(*this, &ClNodeTW::onToggleUse)); + + mpTW->append_column("Use?", useR); + + auto twCols = mpTW->get_columns(); + twCols[0]->add_attribute(ipR, "text", 0); + twCols[1]->add_attribute(useR, "active", 1); + } + + public: + explicit ClNodeTW(Gtk::TreeView *pTW) : mpTW(pTW) { + listStore = Gtk::ListStore::create(cols); // construct list store + mpTW->set_model(listStore); // set model + //mpTW->set_activate_on_single_click(true); + addRenderers(); // add cell renderers + + //mpTW->signal_row_activated().connect(sigc::mem_fun(*this, &ClNodeTW::onRowActivate)); + } + + void addRow(in_addr_t addr) { + auto row = *(listStore->append()); + row[cols.mColInAddr] = addr; + row[cols.mColIP] = inet_ntoa({addr}); + row[cols.mColUse] = false; + } + + void setList(const std::list &addrs) { + // set of entries already in the list + std::list entriesFound; + std::list entriesToDelete; + + bool atLeastOneSelectedLineDeleted = false; + + // check for existing lines + listStore->foreach([&](const Gtk::TreePath &path, const Gtk::TreeIter &iter) { + in_addr_t addr = (*iter)[cols.mColInAddr]; + + // if entry is already in the list, then don't add it again + if (std::find(addrs.begin(), addrs.end(), addr) != addrs.end()) { + entriesFound.push_back(addr); + } else { // if value is not on the novel list, remove it + if ((*iter)[cols.mColUse]) { + atLeastOneSelectedLineDeleted |= true; + } + + entriesToDelete.push_back(iter); + } + + return false; + }); + + // delete non-existent lines + for (const auto &entry: entriesToDelete) { + listStore->erase(entry); + } + + // add non-existent, novel lines + for (auto const &addr: addrs) { + if (std::find(entriesFound.begin(), entriesFound.end(), addr) == entriesFound.end()) { + addRow(addr); + } + } + + // send notification if a selected row is being deleted + if (atLeastOneSelectedLineDeleted && (selectChangeCB != nullptr)) { + (*selectChangeCB)(this); + } + } + + std::vector getSelectedLines() const { + std::vector selectedLines; + + // check for existing lines + listStore->foreach([&](const Gtk::TreePath &path, const Gtk::TreeIter &iter) { + if ((*iter)[cols.mColUse]) { + selectedLines.push_back((*iter)[cols.mColInAddr]); + } + + return false; + }); + + return selectedLines; + } + + in_addr_t getHighlightedRow() const { + auto iter = mpTW->get_selection(); + + if (iter->count_selected_rows() > 0) { + return (*mpTW->get_selection()->get_selected())[cols.mColInAddr]; + } else { + return 0; + } + } + }; + + // BUILDER-RELATED THINGS +protected: + RefPtr mBuilder; + + template + T *getWidget(const Glib::ustring &name) { // get widget by name + Gtk::Widget *pWidget; + mBuilder->get_widget(name, pWidget); + return reinterpret_cast(pWidget); + } + +protected: // WIDGETS + Gtk::Button *wNodeRefreshBtn; + Gtk::Button *wStartStopCaptureBtn; + Gtk::FileChooserButton *wFolderChooser; + Gtk::TextView *wInfoPanel; + Gtk::TreeView *wClNodeTW; + std::shared_ptr mClNodeTW; // instance + Gtk::Menu *wClNodeCtxMenu; + Gtk::MenuItem *wCtxEntryWebInterface; + Gtk::MenuItem *wCtxEntryTelnetTerm; + + // status flags +protected: + struct { + bool clNodesSelected; + bool destinationFolderSelected; + bool captureRunning; + } mFlags; + + // stream receiver +protected: + std::shared_ptr mpMSR; // pointer to a multistream receiver object + +public: + void onActivateApp() { + Gtk::Window *pMainWin; + mBuilder->get_widget("main_win", pMainWin); + + add_window(*pMainWin); + + //mClNodeTW->addRow("10.42.0.64", true); + //mClNodeTW->addRow("10.42.0.65", false); + } + + void onScanDone(ServerBeacon *pSB) { + auto nodes = pSB->getNodesOnNetwork(); + std::list ipList; + + // convert binary IP addresses to strings + for (auto const &node: nodes) { + ipList.push_back(node.s_addr); + } + + mClNodeTW->setList(ipList); + } + + void onRefreshBtnToggled() { + auto beacon = mGlobs->beacon; + + wNodeRefreshBtn->set_sensitive(false); + + std::shared_ptr> scanCB = std::make_shared>(this, &GUI::onScanDone); + + // start beacon + beacon->stopBeacon(); + beacon->setScanCallback(scanCB); + beacon->startBeacon(); + + std::this_thread::sleep_for(std::chrono::milliseconds(200)); + + mGlobs->beacon->stopBeacon(); + + wNodeRefreshBtn->set_sensitive(true); + } + + void onNodeSelectionChanged(ClNodeTW *pTW) { + auto selectedIPs = pTW->getSelectedLines(); + mFlags.clNodesSelected = !selectedIPs.empty(); + + manageCaptureBtnState(); + } + + void onFolderSelect() { + mFlags.destinationFolderSelected = !wFolderChooser->get_filename().empty(); + + manageCaptureBtnState(); + } + + void onStartStopCaptureBtnClick() { + if (!mFlags.captureRunning) { // start capture + wFolderChooser->set_sensitive(false); + wClNodeTW->set_sensitive(false); + wStartStopCaptureBtn->set_label("Stop capture"); + mFlags.captureRunning = true; + + mpMSR = std::make_shared(mClNodeTW->getSelectedLines(), wFolderChooser->get_filename()); + mpMSR->start(); + mGlobs->beacon->execCmdOnAllTerms(std::string("snd connect ") + mGlobs->beacon->getInterfaceAddr() + " " + std::to_string(mpMSR->port)); + } else { + mGlobs->beacon->execCmdOnAllTerms("snd disconnect"); + mpMSR = nullptr; + + wFolderChooser->set_sensitive(true); + wClNodeTW->set_sensitive(true); + + wStartStopCaptureBtn->set_label("Start capture"); + mFlags.captureRunning = false; + } + } + + bool onShowCtxPopupMenu(GdkEventButton *event) { + if (event->type == GDK_2BUTTON_PRESS) { + if (!wClNodeCtxMenu->get_attach_widget()) { + wClNodeCtxMenu->attach_to_widget(*wClNodeTW); + } + + wClNodeCtxMenu->popup(event->button, event->time); + return true; //It has been handled. + } else { + return false; + } + } + + void onOpenWebInterface() { + in_addr_t node_addr = mClNodeTW->getHighlightedRow(); + if (node_addr != 0) { + auto command = std::string("xdg-open http://") + inet_ntoa({node_addr}) + "/"; + system(command.c_str()); + } + } + + void onOpenTelnetClient() { + in_addr_t node_addr = mClNodeTW->getHighlightedRow(); + if (node_addr != 0) { + auto port = mGlobs->beacon->getNodeNettermPort({node_addr}); + auto command = std::string("putty -raw ") + inet_ntoa({node_addr}) + " " + std::to_string(port) + " &"; + system(command.c_str()); + } + } + +protected: + void manageCaptureBtnState() { + wStartStopCaptureBtn->set_sensitive(mFlags.clNodesSelected && mFlags.destinationFolderSelected); + } + +protected: + + void fetchWidgets() { + wNodeRefreshBtn = getWidget("node_refresh_btn"); + wStartStopCaptureBtn = getWidget("start_stop_capture_btn"); + wFolderChooser = getWidget("folder_chooser_btn"); + wInfoPanel = getWidget("info_panel"); + wClNodeTW = getWidget("client_node_tw"); + mClNodeTW = std::make_shared(wClNodeTW); + + wClNodeCtxMenu = getWidget("node_tw_ctx_menu"); + wCtxEntryWebInterface = getWidget("ctx_web_interface"); + wCtxEntryTelnetTerm = getWidget("ctx_telnet_term"); + } + + void connectSignals() { + signal_activate().connect(sigc::mem_fun(*this, &GUI::onActivateApp)); // app activate + wNodeRefreshBtn->signal_clicked().connect(sigc::mem_fun(*this, &GUI::onRefreshBtnToggled)); // refresh button + mClNodeTW->selectChangeCB = std::make_shared>(this, &GUI::onNodeSelectionChanged); // select change callback + wClNodeTW->signal_button_press_event().connect(sigc::mem_fun(*this, &GUI::onShowCtxPopupMenu)); // right click context menu popup + wFolderChooser->signal_file_set().connect(sigc::mem_fun(*this, &GUI::onFolderSelect)); // folder selection + wStartStopCaptureBtn->signal_clicked().connect(sigc::mem_fun(*this, &GUI::onStartStopCaptureBtnClick)); // start/stop click + + wCtxEntryWebInterface->signal_activate().connect(sigc::mem_fun(*this, &GUI::onOpenWebInterface)); + wCtxEntryTelnetTerm->signal_activate().connect(sigc::mem_fun(*this, &GUI::onOpenTelnetClient)); + } + + void resetFlags() { + memset(&mFlags, 0, sizeof(mFlags)); + } + +public: + explicit GUI(const std::shared_ptr &globs) : Gtk::Application("com.epagris.samrecv"), mGlobs(globs) { // constr. + resetFlags(); + mBuilder = Gtk::Builder::create_from_file("gui/samprecv.glade"); + fetchWidgets(); + connectSignals(); // connect signals + } +}; + + +#endif //WFR_GUI_H diff --git a/src/MultiStreamReceiver.cpp b/src/MultiStreamReceiver.cpp new file mode 100644 index 0000000..dff3c61 --- /dev/null +++ b/src/MultiStreamReceiver.cpp @@ -0,0 +1,154 @@ +// +// Created by epagris on 2021. 11. 22.. +// + +#include + +#include +#include +#include + +#include "MultiStreamReceiver.h" +#include "audio_types.h" + +MultiStreamReceiver::MultiStreamReceiver(const std::vector &nodes, const std::string& targetDir, unsigned short port) : port(port) { + mTargetDir = targetDir; + mRunning = false; + mClientNodes = nodes; +} + +MultiStreamReceiver::~MultiStreamReceiver() { + stop(); +} + +void MultiStreamReceiver::startNetworking() { + // open socket + mSoc = socket(AF_INET, SOCK_DGRAM, 0); + if (mSoc == -1) { + throw std::runtime_error(std::string("Could not create UDP-socket in ") + __FUNCTION__ + "!"); + } + + int opt = 1; + if (setsockopt(mSoc, SOL_SOCKET, SO_REUSEADDR | SO_REUSEPORT, &opt, sizeof(opt)) == -1) { + throw std::runtime_error(std::string("Could not set socket options in ") + __FUNCTION__ + "!"); + } + + sockaddr_in addr; + addr.sin_family = AF_INET; + addr.sin_addr.s_addr = INADDR_ANY; + addr.sin_port = htons(port); + + // bind + if (bind(mSoc, (const sockaddr *)&addr, sizeof(addr)) == -1) { + throw std::runtime_error(std::string("Could not bind socket options in ") + __FUNCTION__ + "!"); + } +} + +void MultiStreamReceiver::createSampleWriters() { + struct stat sb; + if (stat(mTargetDir.c_str(), &sb) == 0 && S_ISDIR(sb.st_mode)) { // if directory exists then leave it as it is + //unlink(datasetName.c_str()); FIXME + } else if (mkdir(mTargetDir.c_str(), 0777) == -1) { // else: create directory for dataset + throw std::runtime_error("Could not create directory " + mTargetDir + "!"); + } + + if (chdir(mTargetDir.c_str()) == -1) { + throw std::runtime_error("Could not change to target directory!"); + } + + std::string nodeDirHint = ""; + + for (unsigned int & mClientNode : mClientNodes) { + std::string datasetName = std::string("node_") + inet_ntoa({ mClientNode }); + mpSampleWriters.emplace_back(std::make_shared>(datasetName, 2, STEREO_BUF_LEN / 2)); + nodeDirHint += datasetName + "\n"; + } + + std::ofstream hintFile("node_dir_hint.txt", std::ios_base::out); + hintFile << nodeDirHint; + hintFile.close(); + + if (chdir("..") == -1) { + throw std::runtime_error("Could not change to target directory!"); + } +} + + +void MultiStreamReceiver::start() { + if (mRunning) { + return; + } + + // ---------------------------- + + createSampleWriters(); + startNetworking(); + + // ---------------------------- + + mRunning = true; + mRecvThread = std::make_shared(fnRecv, this); +} + +void MultiStreamReceiver::stop() { + if (!mRunning) { + return; + } + + mRunning = false; + mRecvThread->join(); + close(mSoc); + mpSampleWriters.clear(); +} + +// -------------------------------------- + +#define RECV_BUFFER_SIZE (16000) + +void MultiStreamReceiver::fnRecv(MultiStreamReceiver *pMSR) { + // select-related structures + fd_set read_fds; + timeval tv; + + // receive buffer + std::shared_ptr pWordBuf = std::shared_ptr(new uint32_t[(RECV_BUFFER_SIZE / 4) + 1]); // to have 4-byte alignment + auto * pRecvBuf = reinterpret_cast(pWordBuf.get()); + auto * pAudioPayload = reinterpret_cast(pWordBuf.get()); + + while (pMSR->mRunning) { + // fd-set + FD_ZERO(&read_fds); + FD_SET(pMSR->mSoc, &read_fds); + + // timeout + tv.tv_sec = 0; + tv.tv_usec = 1E+06 / 4; + + // wait for data + int fd_cnt = select(pMSR->mSoc + 1, &read_fds, nullptr, nullptr, &tv); + + // if data is available on the socket + if ((fd_cnt > 0) && FD_ISSET(pMSR->mSoc, &read_fds)) { + + ssize_t recv_size; + if ((recv_size = recv(pMSR->mSoc, pRecvBuf, RECV_BUFFER_SIZE, 0)) <= 0) { + throw std::runtime_error("Receive error!"); + } + + // get sample writer for the specific address + std::shared_ptr> pSampleWriter = nullptr; + for (size_t i = 0; i < pMSR->mClientNodes.size(); i++) { + if (pMSR->mClientNodes[i] == pAudioPayload->addr) { + pSampleWriter = pMSR->mpSampleWriters[i]; + break; + } + } + + // if sample writer is found + if (pSampleWriter != nullptr) { + pSampleWriter->addSamples(pAudioPayload->pData, { pAudioPayload->timestamp_s, pAudioPayload->timestamp_ns }); + } + } + } +} + diff --git a/src/MultiStreamReceiver.h b/src/MultiStreamReceiver.h new file mode 100644 index 0000000..9cc5d74 --- /dev/null +++ b/src/MultiStreamReceiver.h @@ -0,0 +1,41 @@ +// +// Created by epagris on 2021. 11. 22.. +// + +#ifndef WFR_MULTISTREAMRECEIVER_H +#define WFR_MULTISTREAMRECEIVER_H + + +#include +#include +#include +#include +#include "SampleWriter.h" + +class MultiStreamReceiver { +public: + static constexpr unsigned short DEFAULT_PORT = 20220; // default port +private: + std::vector mClientNodes; // client node addresses + int mSoc; // UDP-socket for receiving samples + std::shared_ptr mRecvThread; // thread for reception + static void fnRecv(MultiStreamReceiver * pMSR); // routine function running in separate thread + bool mRunning; + std::vector>> mpSampleWriters; // sample writers for streams + std::string mTargetDir; // target directory +private: + void startNetworking(); // start networking + void createSampleWriters(); // construct sample writers +public: + const unsigned short port; // port +public: + explicit MultiStreamReceiver(const std::vector& nodes, const std::string& targetDir, unsigned short port = DEFAULT_PORT); + void start(); // start reception + void stop(); + + virtual ~MultiStreamReceiver(); + // stop reception +}; + + +#endif //WFR_MULTISTREAMRECEIVER_H diff --git a/src/SampleWriter.h b/src/SampleWriter.h index 4c36be3..09d66c7 100644 --- a/src/SampleWriter.h +++ b/src/SampleWriter.h @@ -17,7 +17,7 @@ #define DIRSEP '/' -template // ch: number of channels, T: sample datatype +template // T: sample datatype class SampleWriter { public: static const std::string TS_FILENAME; // filename for timestamp file diff --git a/src/ServerBeacon.cpp b/src/ServerBeacon.cpp index c098f34..eda49e6 100644 --- a/src/ServerBeacon.cpp +++ b/src/ServerBeacon.cpp @@ -5,17 +5,18 @@ #include #include #include +#include #include "ServerBeacon.h" const in_addr ServerBeacon::DEFAULT_MULTICAST_ADDR = {inet_addr("224.0.2.21")}; -ServerBeacon::ServerBeacon() : mRunning(false) { +ServerBeacon::ServerBeacon() : mRunning(false), mScanCallback(nullptr) { setMulticastAddr(DEFAULT_MULTICAST_ADDR); setPort(DEFAULT_PORT); setAnnouncePeriod(DEFAULT_ANNOUNCE_PERIOD_MS); } -ServerBeacon::ServerBeacon(const in_addr &addr, unsigned short port, size_t announcePeriod_ms) : mRunning(false) { +ServerBeacon::ServerBeacon(const in_addr &addr, unsigned short port, size_t announcePeriod_ms) : mRunning(false), mScanCallback(nullptr) { setMulticastAddr(addr); setPort(port); setAnnouncePeriod(announcePeriod_ms); @@ -62,18 +63,28 @@ void ServerBeacon::startBeacon() { int opt = 1; setsockopt(mBeaconSoc, SOL_SOCKET, SO_REUSEADDR | SO_REUSEPORT, &opt, sizeof(opt)); - // bind to ip-address and port - sockaddr_in addr; - addr.sin_family = AF_INET; - addr.sin_addr = mAddr; - addr.sin_port = htons(mBeaconPort); + // fill-in ip-address and port + mMulticastAddr.sin_family = AF_INET; + mMulticastAddr.sin_addr = mAddr; + mMulticastAddr.sin_port = htons(mBeaconPort); - if (bind(mBeaconSoc, (const sockaddr *) &addr, sizeof(addr)) == -1) { + // bind to port + sockaddr_in bindAddr = mMulticastAddr; + bindAddr.sin_addr.s_addr = htonl(INADDR_ANY); + + if (bind(mBeaconSoc, (const sockaddr *) &bindAddr, sizeof(sockaddr_in)) == -1) { std::cerr << "Could not bind to IP address and/or port in " << __FUNCTION__ << "!" << std::endl; close(mBeaconSoc); return; } + // join multicast group + ip_mreq mreq; + mreq.imr_multiaddr = mAddr; + mIFAddr.s_addr = mreq.imr_interface.s_addr = inet_addr(mIFAddrStr.c_str()); + setsockopt(mBeaconSoc, IPPROTO_IP, IP_ADD_MEMBERSHIP, &mreq, sizeof(mreq)); + setsockopt(mBeaconSoc, IPPROTO_IP, IP_MULTICAST_IF, &mreq.imr_interface, sizeof(in_addr)); + // the beacon is running now! mRunning = true; @@ -82,11 +93,27 @@ void ServerBeacon::startBeacon() { } void ServerBeacon::stopBeacon() { + if (!mRunning){ + return; + } + mRunning = false; + mpBeaconThread->join(); + close(mBeaconSoc); } std::list ServerBeacon::getNodesOnNetwork() { - return std::list(); + std::list nodeAddrs; + + mBeaconMtx.lock(); + + for (auto nodeInfo : mNodes) { + nodeAddrs.push_back(nodeInfo.addr); + } + + mBeaconMtx.unlock(); + + return nodeAddrs; } @@ -96,38 +123,143 @@ void ServerBeacon::fn_BeaconThread(ServerBeacon *pSB) { // beacon message used to send and receive information BeaconMsg msg; - // - // select-related structures fd_set read_fds; timeval tv; - // fill-in select parameters - FD_ZERO(&read_fds); - FD_SET(pSB->mBeaconSoc, &read_fds); - - // fill-in timeout value - tv.tv_sec = pSB->mAnnPeriod_ms / 1000; - tv.tv_usec = (pSB->mAnnPeriod_ms % 1000) * 1000; + // list of detected nodes + std::list clNodes; while (pSB->mRunning) { + msg.senderAddr = pSB->mIFAddr; // set our address msg.server_nClient = 1; // we are the server msg.terminalPort = 0; // we have no inbound terminal port - send(pSB->mBeaconSoc, &msg, sizeof(BeaconMsg), 0); // send the announce message + sendto(pSB->mBeaconSoc, &msg, sizeof(BeaconMsg), 0, (sockaddr *) &pSB->mMulticastAddr, sizeof(sockaddr_in)); // send the announce message // ------------------ - // copy parameters - fd_set cur_rfds = read_fds; - timeval cur_tv = tv; + // fill-in select parameters + FD_ZERO(&read_fds); + FD_SET(pSB->mBeaconSoc, &read_fds); - while (select(1, &cur_rfds, nullptr, nullptr, &cur_tv) != 0) { + // fill-in timeout value + tv.tv_sec = pSB->mAnnPeriod_ms / 1000; + tv.tv_usec = (pSB->mAnnPeriod_ms % 1000) * 1000; + + // clear list + clNodes.clear(); + + while (select(pSB->mBeaconSoc + 1, &read_fds, nullptr, nullptr, &tv) != 0) { // receive response recv(pSB->mBeaconSoc, &msg, sizeof(BeaconMsg), 0); - // store response + // consider only client responses + if (msg.server_nClient == 0) { + // store response + ClientNodeInfo clInfo; + clInfo.addr = msg.senderAddr; + clInfo.terminalPort = msg.terminalPort; + + clNodes.push_back(clInfo); + + //std::cout << inet_ntoa(clInfo.addr) << std::endl; + } + + // ------------------------------------------- + + // fill-in select parameters + FD_ZERO(&read_fds); + FD_SET(pSB->mBeaconSoc, &read_fds); + + // fill-in timeout value + tv.tv_sec = pSB->mAnnPeriod_ms / 1000; + tv.tv_usec = (pSB->mAnnPeriod_ms % 1000) * 1000; } + // copy list of detected nodes + pSB->mBeaconMtx.lock(); + + pSB->mNodes = clNodes; + + pSB->mBeaconMtx.unlock(); + + // invoke callback + if (pSB->mScanCallback != nullptr) { + (*pSB->mScanCallback)(pSB); + } } } + +void ServerBeacon::setScanCallback(const std::shared_ptr>& scanCB) { + mScanCallback = scanCB; +} + +void ServerBeacon::setInterfaceAddr(const std::string &interfaceAddr) { + mIFAddrStr = interfaceAddr; +} + +std::string ServerBeacon::getInterfaceAddr() const { + return mIFAddrStr; +} + +unsigned short ServerBeacon::getNodeNettermPort(in_addr addr) { + mBeaconMtx.lock(); + + int nettermPort = -1; + + for (const auto& nodeInfo : mNodes) { + if (nodeInfo.addr.s_addr == addr.s_addr) { + nettermPort = nodeInfo.terminalPort; + break; + } + } + + mBeaconMtx.unlock(); + + if (nettermPort == -1) { + throw std::runtime_error(std::string("Node not found in database! (") + inet_ntoa(addr) + ")!"); + } + + return (unsigned short)nettermPort; +} + +void ServerBeacon::sendNettermCmd(in_addr addr, unsigned short port, const std::string &cmd) { + int soc = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP); + if (soc == -1) { + throw std::runtime_error("Could not create TCP socket to execute netterm commands!"); + } + + sockaddr_in serverAddr; + memset(&serverAddr, 0, sizeof(sockaddr_in)); + serverAddr.sin_addr = addr; + serverAddr.sin_port = htons(port); + serverAddr.sin_family = AF_INET; + + if (connect(soc, (const sockaddr *)&serverAddr, sizeof(sockaddr_in)) == -1) { + throw std::runtime_error(std::string("Could not connect to netterm! (") + inet_ntoa(addr) + ":" + std::to_string(port) + ")"); + } + + static std::string nodeftty_cmd = "nodeftty"; + send(soc, "nodeftty", nodeftty_cmd.size(), 0); + + // send command + send(soc, cmd.c_str(), cmd.size(), 0); + + close(soc); +} + +void ServerBeacon::execCmdOnNode(in_addr addr, const std::string &cmd) { + unsigned short port = getNodeNettermPort(addr); + sendNettermCmd(addr, port, cmd); +} + +void ServerBeacon::execCmdOnAllTerms(const std::string &cmd) { + mBeaconMtx.lock(); + + for (const auto& nodeInfo : mNodes) { + sendNettermCmd(nodeInfo.addr, nodeInfo.terminalPort, cmd); + } + + mBeaconMtx.unlock(); +} \ No newline at end of file diff --git a/src/ServerBeacon.h b/src/ServerBeacon.h index a1279d2..898c9d3 100644 --- a/src/ServerBeacon.h +++ b/src/ServerBeacon.h @@ -10,6 +10,8 @@ #include #include #include +#include +#include "Callback.h" class ServerBeacon { public: @@ -24,20 +26,28 @@ private: }; private: int mBeaconSoc; // beacon socket - in_addr mAddr; // address + in_addr mAddr; // multicast address + sockaddr_in mMulticastAddr; // multicast address structure unsigned short mBeaconPort; // port size_t mAnnPeriod_ms; // announce period std::list mNodes; // nodes bool mRunning; // does the beacon operate? + std::mutex mBeaconMtx; // beacon mutex + std::string mIFAddrStr; // multicast interface address STRING + in_addr mIFAddr; // multicast interface address + std::shared_ptr> mScanCallback; // scan callback private: std::shared_ptr mpBeaconThread; // thread running the beacon static void fn_BeaconThread(ServerBeacon * pSB); // function running in beacon thread private: // structure for beacon MSG struct BeaconMsg { - uint8_t server_nClient; // server or client + in_addr senderAddr; // address of sender uint16_t terminalPort; // port of terminal + uint8_t server_nClient; // server or client }; +private: + void sendNettermCmd(in_addr addr, unsigned short port, const std::string& cmd); // send netterm cmd public: ServerBeacon(); // constr. explicit ServerBeacon(const in_addr& addr, unsigned short port, size_t announcePeriod_ms); // constr. with beacon port and announce period @@ -47,9 +57,15 @@ public: unsigned short getPort() const; // get beacon port void setAnnouncePeriod(size_t period_ms); // set announce period size_t getAnnouncePeriod() const; // get announce period + void setInterfaceAddr(const std::string &interfaceAddr); // set multicast interface address + std::string getInterfaceAddr() const; // get multicast interface address void startBeacon(); // start the beacon void stopBeacon(); // stop the beacon std::list getNodesOnNetwork(); // get nodes connected to the same network + unsigned short getNodeNettermPort(in_addr addr); // get network terminal port of ndoe + void setScanCallback(const std::shared_ptr>& scanCB); // set scan callback + void execCmdOnNode(in_addr addr, const std::string& cmd); // execute command on a node through netterm + void execCmdOnAllTerms(const std::string& cmd); // multicast command to all netterms }; diff --git a/src/audio_types.h b/src/audio_types.h index 09cc7df..57b536c 100644 --- a/src/audio_types.h +++ b/src/audio_types.h @@ -16,6 +16,7 @@ typedef int16_t AudioSampleType; typedef struct { uint32_t timestamp_s, timestamp_ns; // timestamp uint32_t sample_cnt; // count of samples in packet + in_addr_t addr; // client node address AudioSampleType pData[STEREO_BUF_LEN]; // buffer for stereo data } AudioPayload; diff --git a/src/globals.h b/src/globals.h new file mode 100644 index 0000000..2998aa0 --- /dev/null +++ b/src/globals.h @@ -0,0 +1,15 @@ +// +// Created by epagris on 2021. 11. 16.. +// + +#ifndef WFR_GLOBALS_H +#define WFR_GLOBALS_H + +#include +#include "ServerBeacon.h" + +struct Globals { + std::shared_ptr beacon; // beacon to detect client nodes +}; + +#endif //WFR_GLOBALS_H