Optimize the callback function of subscribe function

This commit is contained in:
FlintJ 2022-12-18 21:47:23 +08:00
parent cd062ff909
commit 47f502928b
5 changed files with 72 additions and 65 deletions

View File

@ -9,43 +9,45 @@ print("connect ret:%d" % ret)
def callback0(signal):
recv_msg = client.getMsg(signal)
recv_topic = client.getTopic(signal)
print("py cb: %s-->>%s" % (recv_topic, recv_msg))
recv_qos = client.getQos(signal)
print("py0 cb: %s-qos:%d-->>%s" % (recv_topic,recv_qos, recv_msg))
def callback1(signal):
recv_msg = client.getMsg(signal)
recv_topic = client.getTopic(signal)
print("py cb: %s-->>%s" % (recv_topic, recv_msg))
recv_qos = client.getQos(signal)
print("py1 cb: %s-qos:%d-->>%s" % (recv_topic,recv_qos, recv_msg))
def callback2(signal):
recv_msg = client.getMsg(signal)
recv_topic = client.getTopic(signal)
recv_qos = client.getQos(signal)
print("py cb: %s-qos:%d-->>%s" % (recv_topic,recv_qos, recv_msg))
print("py2 cb: %s-qos:%d-->>%s" % (recv_topic,recv_qos, recv_msg))
# ret = client.subscribe('topic_pikapy',0,callback0)
# print("subscribe ret:%d" % ret)
# ret = client.subscribe('topic_pikapy', 1,callback1)
# print("subscribe ret:%d" % ret)
ret = client.subscribe('topic_pikapy', 2,callback2)
ret = client.subscribe('topic_pikapy_qos0', 0,callback0)
print("subscribe ret:%d" % ret)
ret = client.subscribe('topic_pikapy_qos1', 1,callback1)
print("subscribe ret:%d" % ret)
ret = client.subscribe('topic_pikapy_qos2', 2,callback2)
print("subscribe ret:%d" % ret)
#sleep wait for recv data
T = PikaStdDevice.Time()
T.sleep_s(1)
T.sleep_s(5)
out = client.listSubscribeTopic()
print('out',out)
print('listSubscribeTopic out',out)
# client.unsubscribe('topic_pikapy_qos0');
# client.unsubscribe('topic_pikapy_qos1');
# client.unsubscribe('topic_pikapy_qos2');
# T.sleep_s(5)
# client.listSubscribeTopic()
# out2 = client.listSubscribeTopic()
# print('listSubscribeTopic out2',out2)
# ret = client.setWill(1,'topic_will',1,'lost mqtt connect')
T.sleep_s(2)
T.sleep_s(10)
# exit()
ret = client.disconnect()
print("ret:%d" % ret)
print("disconnect ret:%d" % ret)

View File

@ -1,6 +1,7 @@
#include "_mqtt__MQTT.h"
#include "mqttclient.h"
#include "PikaStdData_List.h"
#include "TinyObj.h"
PikaEventListener* g_mqtt_event_listener = NULL;
@ -164,7 +165,7 @@ int _mqtt__MQTT_disconnect(PikaObj* self) {
///////////////////////////////////////////////////////////////////
PikaObj* _mqtt__MQTT_listSubscribeTopic(PikaObj* self) {
mqtt_client_t* _client = obj_getPtr(self, "_client");
// int i = 0;
int i = 0;
mqtt_list_t *curr, *next;
message_handlers_t* msg_handler;
PikaObj* list = NULL;
@ -190,9 +191,9 @@ PikaObj* _mqtt__MQTT_listSubscribeTopic(PikaObj* self) {
/* determine whether a node already exists by mqtt topic, but wildcards
* are not supported */
if (NULL != msg_handler->topic_filter) {
// MQTT_LOG_I("%s:%d %s()...[%d] subscribe topic: %s", __FILE__,
// __LINE__, __FUNCTION__, ++i, msg_handler->topic_filter);
MQTT_LOG_I("%s:%d %s()...[%d] subscribe topic: %s", __FILE__,
__LINE__, __FUNCTION__, ++i, msg_handler->topic_filter);
__platform_printf("[%d]subscribe topic: %s\n",++i, msg_handler->topic_filter);
/* 用 arg_new<type> 的 api 创建 arg */
Arg* str_arg1 = arg_newStr((char*)msg_handler->topic_filter);
/* 添加到 list 对象 */
@ -532,18 +533,19 @@ int _mqtt__MQTT_subscribe(PikaObj *self, char* topic, int qos, Arg* cb) {
// __platform_printf("MQTT_subscribe Topic :%s Qos:%d OK\r\n", topic,qos);
//注册mqtt订阅主题的 回调函数
if(cb != NULL) {
memset(topic_str,0,sizeof(topic_str));
// sprintf(topic_str,"eventCallBack_%s",topic);
sprintf(topic_str,"eventCallBack");
// __platform_printf("topic_str:%s \r\n",topic_str);
obj_setArg(self, topic_str, cb);
char hash_str[32] = {0};
memset(hash_str,0,sizeof(hash_str));
sprintf(hash_str,"C%d",hash_time33(topic_str));
obj_newDirectObj(self,hash_str,New_TinyObj);//新建一个对象来放CB
PikaObj* eventHandler = obj_getPtr(self,hash_str);
obj_setArg(eventHandler, "eventCallBack", cb);
/* init event_listener for the first time */
if (NULL == g_mqtt_event_listener) {
pks_eventLisener_init(&g_mqtt_event_listener);
}
uint32_t eventId = hash_time33(topic_str);
// __platform_printf("hash_time33(topic_str):%d \r\n",hash_time33(topic_str));
pks_eventLicener_registEvent(g_mqtt_event_listener, eventId, self);
pks_eventLicener_registEvent(g_mqtt_event_listener, eventId, eventHandler);
}
} else
@ -587,19 +589,14 @@ void Subscribe_Handler(void* client, message_data_t* msg) {
PikaObj* self = ((mqtt_client_t*)client)->user_data;
char hash_str[32] = {0};
//防止数组越界
memset(topic_str,0,sizeof(topic_str));
if(strlen(msg->topic_name) <= MQTT_TOPIC_LEN_MAX)
// sprintf(topic_str,"eventCallBack_%s",msg->topic_name);
sprintf(topic_str,"eventCallBack");
sprintf(topic_str,"%s",msg->topic_name);
else {
sprintf(topic_str,"eventCallBack_");
memcpy((topic_str+strlen("eventCallBack_")),msg->topic_name,MQTT_TOPIC_LEN_MAX);
__platform_printf("Subscribe Topic recv data topic length ERROR\r\n");
return ;
}
pks_eventLisener_sendSignal(g_mqtt_event_listener,
hash_time33(topic_str), hash_time33(msg->topic_name));
memset(hash_str,0,sizeof(hash_str));
sprintf(hash_str,"M%d",hash_time33(msg->topic_name));
obj_setStr(self, hash_str, (char*)msg->message->payload);
@ -612,6 +609,10 @@ void Subscribe_Handler(void* client, message_data_t* msg) {
sprintf(hash_str,"Q%d",hash_time33(msg->topic_name));
obj_setInt(self, hash_str, msg->message->qos);
//存好数据后,再发送事件信号,防止信号收到了但是需要传输的数据没准备好
pks_eventLisener_sendSignal(g_mqtt_event_listener,
hash_time33(msg->topic_name), hash_time33(msg->topic_name));
// MQTT_LOG_I("\n>>>------------------");
// MQTT_LOG_I("Topic:%s \nlen:%d,message: %s", msg->topic_name,
// (int)msg->message->payloadlen, (char*)msg->message->payload);

View File

@ -93,7 +93,8 @@
"__instruction_def.h": "c",
"pika_hal_def.h": "c",
"pika_hal_table.h": "c",
"pika_hal_table_rule.h": "c"
"pika_hal_table_rule.h": "c",
"pikaobj.h": "c"
},
"python.formatting.provider": "autopep8",
"C_Cpp.errorSquiggles": "Disabled"

View File

@ -1,6 +1,7 @@
#include "_mqtt__MQTT.h"
#include "mqttclient.h"
#include "PikaStdData_List.h"
#include "TinyObj.h"
PikaEventListener* g_mqtt_event_listener = NULL;
@ -164,7 +165,7 @@ int _mqtt__MQTT_disconnect(PikaObj* self) {
///////////////////////////////////////////////////////////////////
PikaObj* _mqtt__MQTT_listSubscribeTopic(PikaObj* self) {
mqtt_client_t* _client = obj_getPtr(self, "_client");
// int i = 0;
int i = 0;
mqtt_list_t *curr, *next;
message_handlers_t* msg_handler;
PikaObj* list = NULL;
@ -190,9 +191,9 @@ PikaObj* _mqtt__MQTT_listSubscribeTopic(PikaObj* self) {
/* determine whether a node already exists by mqtt topic, but wildcards
* are not supported */
if (NULL != msg_handler->topic_filter) {
// MQTT_LOG_I("%s:%d %s()...[%d] subscribe topic: %s", __FILE__,
// __LINE__, __FUNCTION__, ++i, msg_handler->topic_filter);
MQTT_LOG_I("%s:%d %s()...[%d] subscribe topic: %s", __FILE__,
__LINE__, __FUNCTION__, ++i, msg_handler->topic_filter);
__platform_printf("[%d]subscribe topic: %s\n",++i, msg_handler->topic_filter);
/* 用 arg_new<type> 的 api 创建 arg */
Arg* str_arg1 = arg_newStr((char*)msg_handler->topic_filter);
/* 添加到 list 对象 */
@ -532,18 +533,19 @@ int _mqtt__MQTT_subscribe(PikaObj *self, char* topic, int qos, Arg* cb) {
// __platform_printf("MQTT_subscribe Topic :%s Qos:%d OK\r\n", topic,qos);
//注册mqtt订阅主题的 回调函数
if(cb != NULL) {
memset(topic_str,0,sizeof(topic_str));
// sprintf(topic_str,"eventCallBack_%s",topic);
sprintf(topic_str,"eventCallBack");
// __platform_printf("topic_str:%s \r\n",topic_str);
obj_setArg(self, topic_str, cb);
char hash_str[32] = {0};
memset(hash_str,0,sizeof(hash_str));
sprintf(hash_str,"C%d",hash_time33(topic_str));
obj_newDirectObj(self,hash_str,New_TinyObj);//新建一个对象来放CB
PikaObj* eventHandler = obj_getPtr(self,hash_str);
obj_setArg(eventHandler, "eventCallBack", cb);
/* init event_listener for the first time */
if (NULL == g_mqtt_event_listener) {
pks_eventLisener_init(&g_mqtt_event_listener);
}
uint32_t eventId = hash_time33(topic_str);
// __platform_printf("hash_time33(topic_str):%d \r\n",hash_time33(topic_str));
pks_eventLicener_registEvent(g_mqtt_event_listener, eventId, self);
pks_eventLicener_registEvent(g_mqtt_event_listener, eventId, eventHandler);
}
} else
@ -587,19 +589,14 @@ void Subscribe_Handler(void* client, message_data_t* msg) {
PikaObj* self = ((mqtt_client_t*)client)->user_data;
char hash_str[32] = {0};
//防止数组越界
memset(topic_str,0,sizeof(topic_str));
if(strlen(msg->topic_name) <= MQTT_TOPIC_LEN_MAX)
// sprintf(topic_str,"eventCallBack_%s",msg->topic_name);
sprintf(topic_str,"eventCallBack");
sprintf(topic_str,"%s",msg->topic_name);
else {
sprintf(topic_str,"eventCallBack_");
memcpy((topic_str+strlen("eventCallBack_")),msg->topic_name,MQTT_TOPIC_LEN_MAX);
__platform_printf("Subscribe Topic recv data topic length ERROR\r\n");
return ;
}
pks_eventLisener_sendSignal(g_mqtt_event_listener,
hash_time33(topic_str), hash_time33(msg->topic_name));
memset(hash_str,0,sizeof(hash_str));
sprintf(hash_str,"M%d",hash_time33(msg->topic_name));
obj_setStr(self, hash_str, (char*)msg->message->payload);
@ -612,6 +609,10 @@ void Subscribe_Handler(void* client, message_data_t* msg) {
sprintf(hash_str,"Q%d",hash_time33(msg->topic_name));
obj_setInt(self, hash_str, msg->message->qos);
//存好数据后,再发送事件信号,防止信号收到了但是需要传输的数据没准备好
pks_eventLisener_sendSignal(g_mqtt_event_listener,
hash_time33(msg->topic_name), hash_time33(msg->topic_name));
// MQTT_LOG_I("\n>>>------------------");
// MQTT_LOG_I("Topic:%s \nlen:%d,message: %s", msg->topic_name,
// (int)msg->message->payloadlen, (char*)msg->message->payload);

View File

@ -9,43 +9,45 @@ print("connect ret:%d" % ret)
def callback0(signal):
recv_msg = client.getMsg(signal)
recv_topic = client.getTopic(signal)
print("py cb: %s-->>%s" % (recv_topic, recv_msg))
recv_qos = client.getQos(signal)
print("py0 cb: %s-qos:%d-->>%s" % (recv_topic,recv_qos, recv_msg))
def callback1(signal):
recv_msg = client.getMsg(signal)
recv_topic = client.getTopic(signal)
print("py cb: %s-->>%s" % (recv_topic, recv_msg))
recv_qos = client.getQos(signal)
print("py1 cb: %s-qos:%d-->>%s" % (recv_topic,recv_qos, recv_msg))
def callback2(signal):
recv_msg = client.getMsg(signal)
recv_topic = client.getTopic(signal)
recv_qos = client.getQos(signal)
print("py cb: %s-qos:%d-->>%s" % (recv_topic,recv_qos, recv_msg))
print("py2 cb: %s-qos:%d-->>%s" % (recv_topic,recv_qos, recv_msg))
# ret = client.subscribe('topic_pikapy',0,callback0)
# print("subscribe ret:%d" % ret)
# ret = client.subscribe('topic_pikapy', 1,callback1)
# print("subscribe ret:%d" % ret)
ret = client.subscribe('topic_pikapy', 2,callback2)
ret = client.subscribe('topic_pikapy_qos0', 0,callback0)
print("subscribe ret:%d" % ret)
ret = client.subscribe('topic_pikapy_qos1', 1,callback1)
print("subscribe ret:%d" % ret)
ret = client.subscribe('topic_pikapy_qos2', 2,callback2)
print("subscribe ret:%d" % ret)
#sleep wait for recv data
T = PikaStdDevice.Time()
T.sleep_s(1)
T.sleep_s(5)
out = client.listSubscribeTopic()
print('out',out)
print('listSubscribeTopic out',out)
# client.unsubscribe('topic_pikapy_qos0');
# client.unsubscribe('topic_pikapy_qos1');
# client.unsubscribe('topic_pikapy_qos2');
# T.sleep_s(5)
# client.listSubscribeTopic()
# out2 = client.listSubscribeTopic()
# print('listSubscribeTopic out2',out2)
# ret = client.setWill(1,'topic_will',1,'lost mqtt connect')
T.sleep_s(2)
T.sleep_s(10)
# exit()
ret = client.disconnect()
print("ret:%d" % ret)
print("disconnect ret:%d" % ret)