improve the callback function set in the subscribe function And add mqtt will function

This commit is contained in:
FlintJ 2022-12-11 21:42:55 +08:00
parent 41fab8447e
commit d2e697549c
4 changed files with 126 additions and 42 deletions

View File

@ -25,14 +25,15 @@ T.sleep_s(5)
out = client.listSubscribrTopic()
print('out',out)
client.unsubscribe('topic_pikapy_qos0');
client.unsubscribe('topic_pikapy_qos1');
client.unsubscribe('topic_pikapy_qos2');
# client.unsubscribe('topic_pikapy_qos0');
# client.unsubscribe('topic_pikapy_qos1');
# client.unsubscribe('topic_pikapy_qos2');
# T.sleep_s(5)
# client.listSubscribrTopic()
T.sleep_s(5)
client.listSubscribrTopic()
# ret = client.setWill(1,'topic_will',1,'lost mqtt connect')
T.sleep_s(10)
# exit()
ret = client.disconnect()
print("ret:%d" % ret)

View File

@ -421,8 +421,15 @@ int _mqtt__MQTT_setWill(PikaObj* self,
char* payload) {
mqtt_client_t* _client = obj_getPtr(self, "_client");
int ret;
char topic_str[MQTT_TOPIC_LEN_MAX];
if (strlen(topic) <= 0) {
__platform_printf("\r\n");
if(topic == NULL) {
__platform_printf("input topic error\r\n");
return -1;
}
if (strlen(topic) <= 0) {
__platform_printf("input topic error\r\n");
return -1;
}
@ -432,6 +439,11 @@ int _mqtt__MQTT_setWill(PikaObj* self,
return -1;
}
if(payload == NULL) {
__platform_printf("input payload error\r\n");
return -1;
}
if (strlen(payload) <= 0) {
__platform_printf("input payload error\r\n");
return -1;
@ -439,7 +451,13 @@ int _mqtt__MQTT_setWill(PikaObj* self,
__platform_printf("input retain :%d\r\n", (uint8_t)retain);
ret = mqtt_set_will_options(_client, topic, qos, (uint8_t)retain, payload);
//必须转换成python环境的变量否则函数退出后topic里的是个空指针
memset(topic_str,0,sizeof(topic_str));
sprintf(topic_str,"%s",topic);
obj_setStr(self, topic_str, topic);
obj_setStr(self, "Will_payload", payload);
ret = mqtt_set_will_options(_client, obj_getStr(self, topic_str), qos, (uint8_t)retain, obj_getStr(self, "Will_payload"));
if (ret == 0) {
__platform_printf("MQTT_setWill OK\r\n", topic);
@ -458,7 +476,12 @@ int _mqtt__MQTT_setWill(PikaObj* self,
int _mqtt__MQTT_subscribe(PikaObj* self, char* topic, int qos, Arg* cb) {
mqtt_client_t* _client = obj_getPtr(self, "_client");
int ret;
char topic_str[MQTT_TOPIC_LEN_MAX];
char topic_str[MQTT_TOPIC_LEN_MAX+24];
if(topic == NULL) {
__platform_printf("input topic error\r\n");
return -1;
}
if ((strlen(topic) > MQTT_TOPIC_LEN_MAX)||(strlen(topic) <= 0)) {
__platform_printf("input topic error\r\n");
@ -470,14 +493,6 @@ int _mqtt__MQTT_subscribe(PikaObj* self, char* topic, int qos, Arg* cb) {
return -1;
}
// obj_setArg(self, "eventCallBack", cb);
// /* init event_listener for the first time */
// if (NULL == g_mqtt_event_listener) {
// pks_eventLisener_init(&g_mqtt_event_listener);
// }
// uint32_t eventId = hash_time33(topic);
// pks_eventLicener_registEvent(g_mqtt_event_listener, eventId, self);
//必须转换成python环境的变量否则函数退出后topic里的是个空指针
memset(topic_str,0,sizeof(topic_str));
sprintf(topic_str,"%s",topic);
@ -486,6 +501,20 @@ int _mqtt__MQTT_subscribe(PikaObj* self, char* topic, int qos, Arg* cb) {
ret = mqtt_subscribe(_client, obj_getStr(self, topic_str), qos, Subscribe_Handler);
if (ret == 0) {
__platform_printf("MQTT_subscribe Topic :%s Qos:%d OK\r\n", topic,qos);
//注册mqtt订阅主题的 回调函数
if(cb != NULL) {
memset(topic_str,0,sizeof(topic_str));
sprintf(topic_str,"eventCallBack_%s",topic);
__platform_printf("topic_str:%s \r\n",topic_str);
obj_setArg(self, topic_str, cb);
/* init event_listener for the first time */
if (NULL == g_mqtt_event_listener) {
pks_eventLisener_init(&g_mqtt_event_listener);
}
uint32_t eventId = hash_time33(topic);
pks_eventLicener_registEvent(g_mqtt_event_listener, eventId, self);
}
} else
__platform_printf("MQTT_subscribe Topic ERROR\r\n");
@ -523,12 +552,24 @@ int _mqtt__MQTT_unsubscribe(PikaObj* self, char* topic) {
// 返 回 值0=成功非0=错误码
///////////////////////////////////////////////////////////////////
void Subscribe_Handler(void* client, message_data_t* msg) {
// PikaObj* self = ((mqtt_client_t*)client)->user_data;
// Arg* cb = obj_getArg(self, "eventCallBack");
char topic_str[MQTT_TOPIC_LEN_MAX+24];
PikaObj* self = ((mqtt_client_t*)client)->user_data;
//防止数组约界
memset(topic_str,0,sizeof(topic_str));
if(strlen(msg->topic_name) <= MQTT_TOPIC_LEN_MAX)
sprintf(topic_str,"eventCallBack_%s",msg->topic_name);
else {
sprintf(topic_str,"eventCallBack_");
memcpy((topic_str+strlen("eventCallBack_")),msg->topic_name,MQTT_TOPIC_LEN_MAX);
}
__platform_printf("topic_str:%s \r\n",topic_str);
Arg* cb = obj_getArg(self, topic_str);
// obj_setStr(self, "recv_topic", msg->topic_name);
// obj_setStr(self, "recv_msg", msg->message->payload);
// pks_eventLisener_sendSignal(g_mqtt_event_listener,
// hash_time33(msg->topic_name), 1);
pks_eventLisener_sendSignal(g_mqtt_event_listener,
hash_time33(msg->topic_name), 1);
MQTT_LOG_I("\n>>>------------------");
MQTT_LOG_I("Topic:%s \nlen:%d,message: %s", msg->topic_name,

View File

@ -421,8 +421,15 @@ int _mqtt__MQTT_setWill(PikaObj* self,
char* payload) {
mqtt_client_t* _client = obj_getPtr(self, "_client");
int ret;
char topic_str[MQTT_TOPIC_LEN_MAX];
if (strlen(topic) <= 0) {
__platform_printf("\r\n");
if(topic == NULL) {
__platform_printf("input topic error\r\n");
return -1;
}
if (strlen(topic) <= 0) {
__platform_printf("input topic error\r\n");
return -1;
}
@ -432,6 +439,11 @@ int _mqtt__MQTT_setWill(PikaObj* self,
return -1;
}
if(payload == NULL) {
__platform_printf("input payload error\r\n");
return -1;
}
if (strlen(payload) <= 0) {
__platform_printf("input payload error\r\n");
return -1;
@ -439,7 +451,13 @@ int _mqtt__MQTT_setWill(PikaObj* self,
__platform_printf("input retain :%d\r\n", (uint8_t)retain);
ret = mqtt_set_will_options(_client, topic, qos, (uint8_t)retain, payload);
//必须转换成python环境的变量否则函数退出后topic里的是个空指针
memset(topic_str,0,sizeof(topic_str));
sprintf(topic_str,"%s",topic);
obj_setStr(self, topic_str, topic);
obj_setStr(self, "Will_payload", payload);
ret = mqtt_set_will_options(_client, obj_getStr(self, topic_str), qos, (uint8_t)retain, obj_getStr(self, "Will_payload"));
if (ret == 0) {
__platform_printf("MQTT_setWill OK\r\n", topic);
@ -458,7 +476,12 @@ int _mqtt__MQTT_setWill(PikaObj* self,
int _mqtt__MQTT_subscribe(PikaObj* self, char* topic, int qos, Arg* cb) {
mqtt_client_t* _client = obj_getPtr(self, "_client");
int ret;
char topic_str[MQTT_TOPIC_LEN_MAX];
char topic_str[MQTT_TOPIC_LEN_MAX+24];
if(topic == NULL) {
__platform_printf("input topic error\r\n");
return -1;
}
if ((strlen(topic) > MQTT_TOPIC_LEN_MAX)||(strlen(topic) <= 0)) {
__platform_printf("input topic error\r\n");
@ -470,14 +493,6 @@ int _mqtt__MQTT_subscribe(PikaObj* self, char* topic, int qos, Arg* cb) {
return -1;
}
// obj_setArg(self, "eventCallBack", cb);
// /* init event_listener for the first time */
// if (NULL == g_mqtt_event_listener) {
// pks_eventLisener_init(&g_mqtt_event_listener);
// }
// uint32_t eventId = hash_time33(topic);
// pks_eventLicener_registEvent(g_mqtt_event_listener, eventId, self);
//必须转换成python环境的变量否则函数退出后topic里的是个空指针
memset(topic_str,0,sizeof(topic_str));
sprintf(topic_str,"%s",topic);
@ -486,6 +501,20 @@ int _mqtt__MQTT_subscribe(PikaObj* self, char* topic, int qos, Arg* cb) {
ret = mqtt_subscribe(_client, obj_getStr(self, topic_str), qos, Subscribe_Handler);
if (ret == 0) {
__platform_printf("MQTT_subscribe Topic :%s Qos:%d OK\r\n", topic,qos);
//注册mqtt订阅主题的 回调函数
if(cb != NULL) {
memset(topic_str,0,sizeof(topic_str));
sprintf(topic_str,"eventCallBack_%s",topic);
__platform_printf("topic_str:%s \r\n",topic_str);
obj_setArg(self, topic_str, cb);
/* init event_listener for the first time */
if (NULL == g_mqtt_event_listener) {
pks_eventLisener_init(&g_mqtt_event_listener);
}
uint32_t eventId = hash_time33(topic);
pks_eventLicener_registEvent(g_mqtt_event_listener, eventId, self);
}
} else
__platform_printf("MQTT_subscribe Topic ERROR\r\n");
@ -523,12 +552,24 @@ int _mqtt__MQTT_unsubscribe(PikaObj* self, char* topic) {
// 返 回 值0=成功非0=错误码
///////////////////////////////////////////////////////////////////
void Subscribe_Handler(void* client, message_data_t* msg) {
// PikaObj* self = ((mqtt_client_t*)client)->user_data;
// Arg* cb = obj_getArg(self, "eventCallBack");
char topic_str[MQTT_TOPIC_LEN_MAX+24];
PikaObj* self = ((mqtt_client_t*)client)->user_data;
//防止数组约界
memset(topic_str,0,sizeof(topic_str));
if(strlen(msg->topic_name) <= MQTT_TOPIC_LEN_MAX)
sprintf(topic_str,"eventCallBack_%s",msg->topic_name);
else {
sprintf(topic_str,"eventCallBack_");
memcpy((topic_str+strlen("eventCallBack_")),msg->topic_name,MQTT_TOPIC_LEN_MAX);
}
__platform_printf("topic_str:%s \r\n",topic_str);
Arg* cb = obj_getArg(self, topic_str);
// obj_setStr(self, "recv_topic", msg->topic_name);
// obj_setStr(self, "recv_msg", msg->message->payload);
// pks_eventLisener_sendSignal(g_mqtt_event_listener,
// hash_time33(msg->topic_name), 1);
pks_eventLisener_sendSignal(g_mqtt_event_listener,
hash_time33(msg->topic_name), 1);
MQTT_LOG_I("\n>>>------------------");
MQTT_LOG_I("Topic:%s \nlen:%d,message: %s", msg->topic_name,

View File

@ -25,14 +25,15 @@ T.sleep_s(5)
out = client.listSubscribrTopic()
print('out',out)
client.unsubscribe('topic_pikapy_qos0');
client.unsubscribe('topic_pikapy_qos1');
client.unsubscribe('topic_pikapy_qos2');
# client.unsubscribe('topic_pikapy_qos0');
# client.unsubscribe('topic_pikapy_qos1');
# client.unsubscribe('topic_pikapy_qos2');
# T.sleep_s(5)
# client.listSubscribrTopic()
T.sleep_s(5)
client.listSubscribrTopic()
# ret = client.setWill(1,'topic_will',1,'lost mqtt connect')
T.sleep_s(10)
# exit()
ret = client.disconnect()
print("ret:%d" % ret)