From 4303e1a5e36b048bd006a0f5d800ff8b42d51139 Mon Sep 17 00:00:00 2001 From: Alex Spataru Date: Wed, 13 Nov 2024 17:40:55 -0500 Subject: [PATCH] Send data as it is through MQTT --- app/src/JSON/FrameBuilder.cpp | 5 ++-- app/src/MQTT/Client.cpp | 44 ++++++----------------------------- app/src/MQTT/Client.h | 2 +- app/src/UI/Dashboard.cpp | 14 +++++++---- 4 files changed, 19 insertions(+), 46 deletions(-) diff --git a/app/src/JSON/FrameBuilder.cpp b/app/src/JSON/FrameBuilder.cpp index ac89c84c..26816020 100644 --- a/app/src/JSON/FrameBuilder.cpp +++ b/app/src/JSON/FrameBuilder.cpp @@ -288,11 +288,10 @@ void JSON::FrameBuilder::readData(const QByteArray &data) { // Obtain state of the app const bool csvPlaying = CSV::Player::instance().isOpen(); - const bool mqttSubscribed = MQTT::Client::instance().isSubscribed(); // Real-time data, parse data & perform conversion QStringList fields; - if (!csvPlaying && !mqttSubscribed) + if (!csvPlaying) { // Convert binary frame data to a string QString frameData; @@ -322,7 +321,7 @@ void JSON::FrameBuilder::readData(const QByteArray &data) fields = m_frameParser->parse(frameData, separator); } - // CSV/MQTT data, no need to perform conversions or use frame parser + // CSV data, no need to perform conversions or use frame parser else fields = QString::fromUtf8(data.simplified()).split(','); diff --git a/app/src/MQTT/Client.cpp b/app/src/MQTT/Client.cpp index 1405ddad..d1229b7b 100644 --- a/app/src/MQTT/Client.cpp +++ b/app/src/MQTT/Client.cpp @@ -26,7 +26,6 @@ #include "IO/Manager.h" #include "MQTT/Client.h" #include "Misc/Utilities.h" -#include "JSON/FrameBuilder.h" //---------------------------------------------------------------------------- // Suppress deprecated warnings @@ -60,8 +59,8 @@ MQTT::Client::Client() regenerateClient(); // Send data periodically & reset statistics when disconnected/connected to a - connect(&JSON::FrameBuilder::instance(), &JSON::FrameBuilder::frameChanged, - this, &MQTT::Client::sendFrame); + connect(&IO::Manager::instance(), &IO::Manager::frameReceived, this, + &MQTT::Client::sendFrame); connect(&IO::Manager::instance(), &IO::Manager::connectedChanged, this, &MQTT::Client::resetStatistics); @@ -617,15 +616,11 @@ void MQTT::Client::onConnectedChanged() } /** - * @brief Sends a JSON frame as a CSV-like MQTT message. + * @brief Sends a data frame to the MQTT broker * - * Constructs a CSV-like message from the dataset values in the given frame, - * ordered by dataset index, and publishes it to the MQTT topic if the client - * is connected and in publisher mode. - * - * @param frame The JSON frame containing the data to be sent. + * @param frame The frame containing the data to be sent. */ -void MQTT::Client::sendFrame(const JSON::Frame &frame) +void MQTT::Client::sendFrame(const QByteArray &frame) { Q_ASSERT(m_client); @@ -641,31 +636,10 @@ void MQTT::Client::sendFrame(const JSON::Frame &frame) else if (clientMode() != ClientPublisher) return; - // Write frame data in the order of the frame indexes - const auto &groups = frame.groups(); - QMap fieldValues; - - // Iterate through groups and datasets to collect field values - for (auto g = groups.constBegin(); g != groups.constEnd(); ++g) - { - const auto &datasets = g->datasets(); - for (auto d = datasets.constBegin(); d != datasets.constEnd(); ++d) - fieldValues[d->index()] = d->value(); - } - - // Construct byte array with CSV-like frame with ordered dataset indexes - QByteArray data; - for (auto it = fieldValues.begin(); it != fieldValues.end(); ++it) - { - data.append(it.value().toUtf8()); - if (std::next(it) != fieldValues.end()) - data.append(','); - } - // Create & send MQTT message - if (!data.isEmpty()) + if (!frame.isEmpty()) { - QMQTT::Message message(m_sentMessages, topic(), data); + QMQTT::Message message(m_sentMessages, topic(), frame); m_client->publish(message); ++m_sentMessages; } @@ -846,10 +820,6 @@ void MQTT::Client::onMessageReceived(const QMQTT::Message &message) if (topic() != mtopic) return; - // Add EOL character - if (!mpayld.endsWith('\n')) - mpayld.append('\n'); - // Let IO manager process incoming data QMetaObject::invokeMethod( this, [=] { IO::Manager::instance().processPayload(mpayld); }, diff --git a/app/src/MQTT/Client.h b/app/src/MQTT/Client.h index a49efa97..5a8bab46 100644 --- a/app/src/MQTT/Client.h +++ b/app/src/MQTT/Client.h @@ -229,7 +229,7 @@ public slots: private slots: void resetStatistics(); void onConnectedChanged(); - void sendFrame(const JSON::Frame &frame); + void sendFrame(const QByteArray &frame); void lookupFinished(const QHostInfo &info); void onError(const QMQTT::ClientError error); void onSslErrors(const QList &errors); diff --git a/app/src/UI/Dashboard.cpp b/app/src/UI/Dashboard.cpp index 24b8cffc..3656fdbe 100644 --- a/app/src/UI/Dashboard.cpp +++ b/app/src/UI/Dashboard.cpp @@ -48,11 +48,15 @@ UI::Dashboard::Dashboard() // clang-format on // Reset dashboard data if MQTT client is subscribed - connect(&MQTT::Client::instance(), &MQTT::Client::connectedChanged, this, - [=] { - if (MQTT::Client::instance().isSubscribed()) - resetData(); - }); + connect( + &MQTT::Client::instance(), &MQTT::Client::connectedChanged, this, [=] { + const bool subscribed = MQTT::Client::instance().isSubscribed(); + const bool wasSubscribed = !MQTT::Client::instance().isConnectedToHost() + && MQTT::Client::instance().clientMode() + == MQTT::ClientSubscriber; + if (subscribed || wasSubscribed) + resetData(); + }); // Update the dashboard widgets at 24 Hz connect(&Misc::TimerEvents::instance(), &Misc::TimerEvents::timeout24Hz, this,