support evt.topic, evt.qos, evt.msg for mqtt cb

This commit is contained in:
lyon 2022-12-27 22:53:13 +08:00
parent ed04dc2657
commit 7d476ff971
4 changed files with 69 additions and 118 deletions

View File

@ -54,7 +54,7 @@ class _MQTT:
pass
"""listSubscribeTopic """
def publish(self,topic: str, payload: str, qos:int) -> int:
def publish(self, topic: str, payload: str, qos: int) -> int:
pass
"""publish to the mqtt-server."""
@ -64,19 +64,7 @@ class _MQTT:
def setHost(self, host_url: str) -> int:
"""Set the host_url of the MQTTClient."""
def getMsg(self,signal:int) -> str:
pass
"""callback fun get msg"""
def getTopic(self,signal:int) -> str:
pass
"""callback fun get topic"""
def getQos(self,signal:int) -> int:
pass
"""callback fun get qos"""
def setDisconnectHandler(self,cb: any) -> int:
def setDisconnectHandler(self, cb: any) -> int:
pass
"""set disconnect callback fun."""

View File

@ -1,8 +1,8 @@
#include "_mqtt__MQTT.h"
#include "PikaObj.h"
#include "PikaStdData_List.h"
#include "TinyObj.h"
#include "mqttclient.h"
#include "PikaObj.h"
PikaEventListener* g_mqtt_event_listener = NULL;
void Subscribe_Handler(void* client, message_data_t* msg);
@ -187,7 +187,8 @@ PikaObj* _mqtt__MQTT_listSubscribeTopic(PikaObj* self) {
if (NULL != msg_handler->topic_filter) {
MQTT_LOG_I("%s:%d %s()...[%d] subscribe topic: %s", __FILE__,
__LINE__, __FUNCTION__, ++i, msg_handler->topic_filter);
// __platform_printf("[%d]subscribe topic: %s\n",++i, msg_handler->topic_filter);
// __platform_printf("[%d]subscribe topic: %s\n",++i,
// msg_handler->topic_filter);
/* 用 arg_new<type> 的 api 创建 arg */
Arg* str_arg1 = arg_newStr((char*)msg_handler->topic_filter);
/* 添加到 list 对象 */
@ -204,7 +205,7 @@ PikaObj* _mqtt__MQTT_listSubscribeTopic(PikaObj* self) {
// 输入参数:主题名称,有效数据
// 返 回 值0=成功非0=错误码
///////////////////////////////////////////////////////////////////
int _mqtt__MQTT_publish(PikaObj *self, char* topic, char* payload, int qos) {
int _mqtt__MQTT_publish(PikaObj* self, char* topic, char* payload, int qos) {
int ret;
mqtt_message_t msg;
@ -428,7 +429,7 @@ int _mqtt__MQTT_setVersion(PikaObj* self, char* version) {
return -1;
}
if ((strcmp(version, "3.1") == 0) || (strcmp(version, "3.1.1") == 0)) {
if(strcmp(version, "3.1.1") == 0)
if (strcmp(version, "3.1.1") == 0)
mqtt_set_version(_client, 4);
else
mqtt_set_version(_client, 3);
@ -482,17 +483,18 @@ int _mqtt__MQTT_setWill(PikaObj* self,
return -1;
}
// 必须转换成python环境的变量否则函数退出后topic里的是个空指针
memset(topic_str, 0, sizeof(topic_str));
sprintf(topic_str, "%s", topic);
obj_setStr(self, topic_str, topic);
obj_setStr(self, "Will_payload", payload);
// __platform_printf("obj_getStr(self, topic_str) :%s\r\n", obj_getStr(self, topic_str));
// __platform_printf("obj_getStr(self, topic_str) :%s\r\n", obj_getStr(self,
// topic_str));
// __platform_printf("iqos :%d\r\n", qos);
// __platform_printf("retain :%d\r\n", retain);
// __platform_printf("obj_getStr(self, \"Will_payload\") :%s\r\n", obj_getStr(self, "Will_payload"));
// __platform_printf("obj_getStr(self, \"Will_payload\") :%s\r\n",
// obj_getStr(self, "Will_payload"));
ret = mqtt_set_will_options(_client, obj_getStr(self, topic_str), qos,
(uint8_t)retain,
@ -512,7 +514,7 @@ int _mqtt__MQTT_setWill(PikaObj* self,
// 输入参数:
// 返 回 值0=成功非0=错误码
///////////////////////////////////////////////////////////////////
int _mqtt__MQTT_subscribe(PikaObj *self, char* topic, Arg* cb, int qos) {
int _mqtt__MQTT_subscribe(PikaObj* self, char* topic, Arg* cb, int qos) {
mqtt_client_t* _client = obj_getPtr(self, "_client");
int ret;
char topic_str[MQTT_TOPIC_LEN_MAX + 24];
@ -542,22 +544,26 @@ int _mqtt__MQTT_subscribe(PikaObj *self, char* topic, Arg* cb, int qos) {
ret = mqtt_subscribe(_client, obj_getStr(self, topic_str), qos,
Subscribe_Handler);
if (ret == 0) {
// __platform_printf("MQTT_subscribe Topic :%s Qos:%d OK\r\n", topic,qos);
//注册mqtt订阅主题的 回调函数
if(cb != NULL) {
// __platform_printf("MQTT_subscribe Topic :%s Qos:%d OK\r\n",
// topic,qos);
// 注册mqtt订阅主题的 回调函数
if (cb != NULL) {
char hash_str[32] = {0};
memset(hash_str,0,sizeof(hash_str));
sprintf(hash_str,"C%d",hash_time33(topic_str));
obj_newDirectObj(self,hash_str,New_TinyObj);//新建一个对象来放CB
PikaObj* eventHandler = obj_getPtr(self,hash_str);
memset(hash_str, 0, sizeof(hash_str));
sprintf(hash_str, "C%d", hash_time33(topic_str));
obj_newDirectObj(self, hash_str,
New_TinyObj); // 新建一个对象来放CB
PikaObj* eventHandler = obj_getPtr(self, hash_str);
obj_setArg(eventHandler, "eventCallBack", cb);
/* init event_listener for the first time */
if (NULL == g_mqtt_event_listener) {
pks_eventListener_init(&g_mqtt_event_listener);
}
uint32_t eventId = hash_time33(topic_str);
// __platform_printf("hash_time33(topic_str):%d \r\n",hash_time33(topic_str));
pks_eventListener_registEvent(g_mqtt_event_listener, eventId, eventHandler);
// __platform_printf("hash_time33(topic_str):%d
// \r\n",hash_time33(topic_str));
pks_eventListener_registEvent(g_mqtt_event_listener, eventId,
eventHandler);
}
} else
@ -598,33 +604,25 @@ int _mqtt__MQTT_unsubscribe(PikaObj* self, char* topic) {
///////////////////////////////////////////////////////////////////
void Subscribe_Handler(void* client, message_data_t* msg) {
char topic_str[MQTT_TOPIC_LEN_MAX + 24];
PikaObj* self = ((mqtt_client_t*)client)->user_data;
char hash_str[32] = {0};
memset(topic_str,0,sizeof(topic_str));
if(strlen(msg->topic_name) <= MQTT_TOPIC_LEN_MAX)
sprintf(topic_str,"%s",msg->topic_name);
memset(topic_str, 0, sizeof(topic_str));
if (strlen(msg->topic_name) <= MQTT_TOPIC_LEN_MAX)
sprintf(topic_str, "%s", msg->topic_name);
else {
__platform_printf("Subscribe Topic recv data topic length ERROR\r\n");
return ;
return;
}
memset(hash_str,0,sizeof(hash_str));
sprintf(hash_str,"M%d",hash_time33(msg->topic_name));
obj_setStr(self, hash_str, (char*)msg->message->payload);
memset(hash_str, 0, sizeof(hash_str));
sprintf(hash_str, "T%d", hash_time33(msg->topic_name));
obj_setStr(self, hash_str, (char*)msg->topic_name);
Arg* evt_obj_arg = arg_newDirectObj(New_TinyObj);
PikaObj* evt_obj = arg_getPtr(evt_obj_arg);
obj_setStr(evt_obj, "topic", (char*)msg->topic_name);
obj_setStr(evt_obj, "msg", (char*)msg->message->payload);
obj_setInt(evt_obj, "qos", msg->message->qos);
memset(hash_str, 0, sizeof(hash_str));
sprintf(hash_str, "Q%d", hash_time33(msg->topic_name));
obj_setInt(self, hash_str, msg->message->qos);
// 存好数据后,再发送事件信号,防止信号收到了但是需要传输的数据没准备好
pks_eventListener_send(g_mqtt_event_listener, hash_time33(msg->topic_name),
evt_obj_arg);
//存好数据后,再发送事件信号,防止信号收到了但是需要传输的数据没准备好
pks_eventListener_sendSignal(g_mqtt_event_listener,
hash_time33(msg->topic_name), hash_time33(msg->topic_name));
// MQTT_LOG_I("\n>>>------------------");
// MQTT_LOG_I("Topic:%s \nlen:%d,message: %s", msg->topic_name,
// (int)msg->message->payloadlen, (char*)msg->message->payload);
@ -643,64 +641,22 @@ void _mqtt___del__(PikaObj* self) {
}
}
////////////////////////////////////////////////////////////////////
// 函 数 名_mqtt__MQTT_getMsg
// 功能说明:在回调函数中取出返回的数据
// 输入参数:
// 返 回 值:
///////////////////////////////////////////////////////////////////
char* _mqtt__MQTT_getMsg(PikaObj* self, int signal) {
// mqtt_client_t* _client = obj_getPtr(self, "_client");
char hash_str[32];
memset(hash_str, 0, sizeof(hash_str));
sprintf(hash_str, "M%d", signal);
return (obj_getStr(self, hash_str));
}
////////////////////////////////////////////////////////////////////
// 函 数 名_mqtt__MQTT_getTopic
// 功能说明:在回调函数中取出返回的数据,主题
// 输入参数:
// 返 回 值:
///////////////////////////////////////////////////////////////////
char* _mqtt__MQTT_getTopic(PikaObj* self, int signal) {
char hash_str[32];
memset(hash_str, 0, sizeof(hash_str));
sprintf(hash_str, "T%d", signal);
return (obj_getStr(self, hash_str));
}
////////////////////////////////////////////////////////////////////
// 函 数 名_mqtt__MQTT_getQos
// 功能说明:在回调函数中取出返回的数据,消息类型
// 输入参数:
// 返 回 值:
///////////////////////////////////////////////////////////////////
int _mqtt__MQTT_getQos(PikaObj* self, int signal) {
char hash_str[32];
memset(hash_str, 0, sizeof(hash_str));
sprintf(hash_str, "Q%d", signal);
return (obj_getInt(self, hash_str));
}
////////////////////////////////////////////////////////////////////
// 函 数 名Reconnect_Handler
// 功能说明mqtt 断开连接后 的回调函数这里使用mqttclient库函数的断线重连接口提示发生了mqtt断连的事件
// 功能说明mqtt 断开连接后
// 的回调函数这里使用mqttclient库函数的断线重连接口提示发生了mqtt断连的事件
// 输入参数:
// 返 回 值0=成功非0=错误码
///////////////////////////////////////////////////////////////////
void Reconnect_Handler(void *client, void *reconnect_date) {
void Reconnect_Handler(void* client, void* reconnect_date) {
// PikaObj* self = ((mqtt_client_t*)client)->user_data;
// __platform_printf("Reconnect_Handler\r\n");
if(((mqtt_client_t*)client)->mqtt_client_state != CLIENT_STATE_CONNECTED) {
//发送事件信号
pks_eventListener_sendSignal(g_mqtt_event_listener,MQTT_RECONNECTION_EVENT_ID,1);
if (((mqtt_client_t*)client)->mqtt_client_state != CLIENT_STATE_CONNECTED) {
// 发送事件信号
pks_eventListener_sendSignal(g_mqtt_event_listener,
MQTT_RECONNECTION_EVENT_ID, 1);
}
}
////////////////////////////////////////////////////////////////////
@ -711,11 +667,11 @@ void Reconnect_Handler(void *client, void *reconnect_date) {
///////////////////////////////////////////////////////////////////
int _mqtt__MQTT_setDisconnectHandler(PikaObj* self, Arg* cb) {
mqtt_client_t* _client = obj_getPtr(self, "_client");
// __platform_printf("_mqtt__MQTT_setDisconnectHandler\r\n");
//注册到c库中
mqtt_set_reconnect_handler(_client,Reconnect_Handler);
// 注册到c库中
mqtt_set_reconnect_handler(_client, Reconnect_Handler);
// char hash_str[32] = {0};
// memset(hash_str,0,sizeof(hash_str));
@ -724,14 +680,17 @@ int _mqtt__MQTT_setDisconnectHandler(PikaObj* self, Arg* cb) {
// PikaObj* eventHandler = obj_getPtr(self,hash_str);
// obj_setArg(eventHandler, "eventCallBack", cb);
obj_setArg(self, "eventCallBack", cb);//重连回调是唯一的就直接用self对象
obj_setArg(self, "eventCallBack",
cb); // 重连回调是唯一的就直接用self对象
/* init event_listener for the first time */
if (NULL == g_mqtt_event_listener) {
pks_eventListener_init(&g_mqtt_event_listener);
}
// uint32_t eventId = hash_time33(topic_str);
// __platform_printf("hash_time33(topic_str):%d \r\n",hash_time33(topic_str));
pks_eventListener_registEvent(g_mqtt_event_listener, MQTT_RECONNECTION_EVENT_ID, self);
// __platform_printf("hash_time33(topic_str):%d
// \r\n",hash_time33(topic_str));
pks_eventListener_registEvent(g_mqtt_event_listener,
MQTT_RECONNECTION_EVENT_ID, self);
return 0;
}

View File

@ -342,16 +342,20 @@ enum shellCTRL obj_runChar(PikaObj* self, char inputChar);
typedef PikaObj PikaEventListener;
void pks_eventListener_sendSignal(PikaEventListener* self,
uint32_t eventId,
int eventSignal);
void pks_eventListener_registEvent(PikaEventListener* self,
uint32_t eventId,
PikaObj* eventHandleObj);
void pks_eventListener_removeEvent(PikaEventListener* self, uint32_t eventId);
void pks_eventListener_sendSignal(PikaEventListener* self,
uint32_t eventId,
int eventSignal);
void pks_eventListener_send(PikaEventListener* self,
uint32_t eventId,
Arg* eventData);
PikaObj* pks_eventListener_getEventHandleObj(PikaEventListener* self,
uint32_t eventId);

View File

@ -1,7 +1,8 @@
import mqtt
import PikaStdDevice
client = mqtt.MQTT('broker.emqx.io', port=1883, clinetID='clientid', username='name_', password='passwd_')
client = mqtt.MQTT('broker.emqx.io', port=1883,
clinetID='clientid', username='name_', password='passwd_')
ret = client.connect()
print("connect ret:%d" % ret)
@ -21,15 +22,14 @@ def callback1(signal):
print("py1 cb: %s-qos:%d-->>%s" % (recv_topic, recv_qos, recv_msg))
def callback2(signal):
recv_msg = client.getMsg(signal)
recv_topic = client.getTopic(signal)
recv_qos = client.getQos(signal)
print("py2 cb: %s-qos:%d-->>%s" % (recv_topic, recv_qos, recv_msg))
def callback2(evt):
print("py2 cb: %s-qos:%d-->>%s" % (evt.topic, evt.qos, evt.msg))
def reconnect_mq(signal):
print('lost mqtt connect and try to reconnect')
print('signal:',signal)
print('signal:', signal)
client.setKeepAlive(5)
ret = client.subscribe('topic_pikapy_qos0', callback0, 0)
@ -53,7 +53,7 @@ print('listSubscribeTopic out', out)
# out2 = client.listSubscribeTopic()
# print('listSubscribeTopic out2',out2)
ret = client.setDisconnectHandler(reconnect_mq);
ret = client.setDisconnectHandler(reconnect_mq)
print("setDisconnectHandler:%d" % ret)
# ret = client.setWill('topic_will','lost mqtt connect')