Send data as it is through MQTT

This commit is contained in:
Alex Spataru 2024-11-13 17:40:55 -05:00
parent 7852779b2f
commit 4303e1a5e3
4 changed files with 19 additions and 46 deletions

View File

@ -288,11 +288,10 @@ void JSON::FrameBuilder::readData(const QByteArray &data)
{ {
// Obtain state of the app // Obtain state of the app
const bool csvPlaying = CSV::Player::instance().isOpen(); const bool csvPlaying = CSV::Player::instance().isOpen();
const bool mqttSubscribed = MQTT::Client::instance().isSubscribed();
// Real-time data, parse data & perform conversion // Real-time data, parse data & perform conversion
QStringList fields; QStringList fields;
if (!csvPlaying && !mqttSubscribed) if (!csvPlaying)
{ {
// Convert binary frame data to a string // Convert binary frame data to a string
QString frameData; QString frameData;
@ -322,7 +321,7 @@ void JSON::FrameBuilder::readData(const QByteArray &data)
fields = m_frameParser->parse(frameData, separator); fields = m_frameParser->parse(frameData, separator);
} }
// CSV/MQTT data, no need to perform conversions or use frame parser // CSV data, no need to perform conversions or use frame parser
else else
fields = QString::fromUtf8(data.simplified()).split(','); fields = QString::fromUtf8(data.simplified()).split(',');

View File

@ -26,7 +26,6 @@
#include "IO/Manager.h" #include "IO/Manager.h"
#include "MQTT/Client.h" #include "MQTT/Client.h"
#include "Misc/Utilities.h" #include "Misc/Utilities.h"
#include "JSON/FrameBuilder.h"
//---------------------------------------------------------------------------- //----------------------------------------------------------------------------
// Suppress deprecated warnings // Suppress deprecated warnings
@ -60,8 +59,8 @@ MQTT::Client::Client()
regenerateClient(); regenerateClient();
// Send data periodically & reset statistics when disconnected/connected to a // Send data periodically & reset statistics when disconnected/connected to a
connect(&JSON::FrameBuilder::instance(), &JSON::FrameBuilder::frameChanged, connect(&IO::Manager::instance(), &IO::Manager::frameReceived, this,
this, &MQTT::Client::sendFrame); &MQTT::Client::sendFrame);
connect(&IO::Manager::instance(), &IO::Manager::connectedChanged, this, connect(&IO::Manager::instance(), &IO::Manager::connectedChanged, this,
&MQTT::Client::resetStatistics); &MQTT::Client::resetStatistics);
@ -617,15 +616,11 @@ void MQTT::Client::onConnectedChanged()
} }
/** /**
* @brief Sends a JSON frame as a CSV-like MQTT message. * @brief Sends a data frame to the MQTT broker
* *
* Constructs a CSV-like message from the dataset values in the given frame, * @param frame The frame containing the data to be sent.
* ordered by dataset index, and publishes it to the MQTT topic if the client
* is connected and in publisher mode.
*
* @param frame The JSON frame containing the data to be sent.
*/ */
void MQTT::Client::sendFrame(const JSON::Frame &frame) void MQTT::Client::sendFrame(const QByteArray &frame)
{ {
Q_ASSERT(m_client); Q_ASSERT(m_client);
@ -641,31 +636,10 @@ void MQTT::Client::sendFrame(const JSON::Frame &frame)
else if (clientMode() != ClientPublisher) else if (clientMode() != ClientPublisher)
return; return;
// Write frame data in the order of the frame indexes
const auto &groups = frame.groups();
QMap<int, QString> fieldValues;
// Iterate through groups and datasets to collect field values
for (auto g = groups.constBegin(); g != groups.constEnd(); ++g)
{
const auto &datasets = g->datasets();
for (auto d = datasets.constBegin(); d != datasets.constEnd(); ++d)
fieldValues[d->index()] = d->value();
}
// Construct byte array with CSV-like frame with ordered dataset indexes
QByteArray data;
for (auto it = fieldValues.begin(); it != fieldValues.end(); ++it)
{
data.append(it.value().toUtf8());
if (std::next(it) != fieldValues.end())
data.append(',');
}
// Create & send MQTT message // Create & send MQTT message
if (!data.isEmpty()) if (!frame.isEmpty())
{ {
QMQTT::Message message(m_sentMessages, topic(), data); QMQTT::Message message(m_sentMessages, topic(), frame);
m_client->publish(message); m_client->publish(message);
++m_sentMessages; ++m_sentMessages;
} }
@ -846,10 +820,6 @@ void MQTT::Client::onMessageReceived(const QMQTT::Message &message)
if (topic() != mtopic) if (topic() != mtopic)
return; return;
// Add EOL character
if (!mpayld.endsWith('\n'))
mpayld.append('\n');
// Let IO manager process incoming data // Let IO manager process incoming data
QMetaObject::invokeMethod( QMetaObject::invokeMethod(
this, [=] { IO::Manager::instance().processPayload(mpayld); }, this, [=] { IO::Manager::instance().processPayload(mpayld); },

View File

@ -229,7 +229,7 @@ public slots:
private slots: private slots:
void resetStatistics(); void resetStatistics();
void onConnectedChanged(); void onConnectedChanged();
void sendFrame(const JSON::Frame &frame); void sendFrame(const QByteArray &frame);
void lookupFinished(const QHostInfo &info); void lookupFinished(const QHostInfo &info);
void onError(const QMQTT::ClientError error); void onError(const QMQTT::ClientError error);
void onSslErrors(const QList<QSslError> &errors); void onSslErrors(const QList<QSslError> &errors);

View File

@ -48,9 +48,13 @@ UI::Dashboard::Dashboard()
// clang-format on // clang-format on
// Reset dashboard data if MQTT client is subscribed // Reset dashboard data if MQTT client is subscribed
connect(&MQTT::Client::instance(), &MQTT::Client::connectedChanged, this, connect(
[=] { &MQTT::Client::instance(), &MQTT::Client::connectedChanged, this, [=] {
if (MQTT::Client::instance().isSubscribed()) const bool subscribed = MQTT::Client::instance().isSubscribed();
const bool wasSubscribed = !MQTT::Client::instance().isConnectedToHost()
&& MQTT::Client::instance().clientMode()
== MQTT::ClientSubscriber;
if (subscribed || wasSubscribed)
resetData(); resetData();
}); });