diff --git a/examples/mqtt/mqtt_subscribe.py b/examples/mqtt/mqtt_subscribe.py index 6de77e56d..167796c27 100644 --- a/examples/mqtt/mqtt_subscribe.py +++ b/examples/mqtt/mqtt_subscribe.py @@ -25,14 +25,15 @@ T.sleep_s(5) out = client.listSubscribrTopic() print('out',out) -client.unsubscribe('topic_pikapy_qos0'); -client.unsubscribe('topic_pikapy_qos1'); -client.unsubscribe('topic_pikapy_qos2'); +# client.unsubscribe('topic_pikapy_qos0'); +# client.unsubscribe('topic_pikapy_qos1'); +# client.unsubscribe('topic_pikapy_qos2'); +# T.sleep_s(5) +# client.listSubscribrTopic() -T.sleep_s(5) -client.listSubscribrTopic() +# ret = client.setWill(1,'topic_will',1,'lost mqtt connect') T.sleep_s(10) - +# exit() ret = client.disconnect() print("ret:%d" % ret) \ No newline at end of file diff --git a/package/mqtt/_mqtt__MQTT.c b/package/mqtt/_mqtt__MQTT.c index fa628ffda..92cec4cd2 100644 --- a/package/mqtt/_mqtt__MQTT.c +++ b/package/mqtt/_mqtt__MQTT.c @@ -421,8 +421,15 @@ int _mqtt__MQTT_setWill(PikaObj* self, char* payload) { mqtt_client_t* _client = obj_getPtr(self, "_client"); int ret; + char topic_str[MQTT_TOPIC_LEN_MAX]; - if (strlen(topic) <= 0) { + __platform_printf("\r\n"); + if(topic == NULL) { + __platform_printf("input topic error\r\n"); + return -1; + } + + if (strlen(topic) <= 0) { __platform_printf("input topic error\r\n"); return -1; } @@ -432,6 +439,11 @@ int _mqtt__MQTT_setWill(PikaObj* self, return -1; } + if(payload == NULL) { + __platform_printf("input payload error\r\n"); + return -1; + } + if (strlen(payload) <= 0) { __platform_printf("input payload error\r\n"); return -1; @@ -439,7 +451,13 @@ int _mqtt__MQTT_setWill(PikaObj* self, __platform_printf("input retain :%d\r\n", (uint8_t)retain); - ret = mqtt_set_will_options(_client, topic, qos, (uint8_t)retain, payload); + //必须转换成python环境的变量,否则函数退出后,topic里的是个空指针 + memset(topic_str,0,sizeof(topic_str)); + sprintf(topic_str,"%s",topic); + obj_setStr(self, topic_str, topic); + obj_setStr(self, "Will_payload", payload); + + ret = mqtt_set_will_options(_client, obj_getStr(self, topic_str), qos, (uint8_t)retain, obj_getStr(self, "Will_payload")); if (ret == 0) { __platform_printf("MQTT_setWill OK\r\n", topic); @@ -458,7 +476,12 @@ int _mqtt__MQTT_setWill(PikaObj* self, int _mqtt__MQTT_subscribe(PikaObj* self, char* topic, int qos, Arg* cb) { mqtt_client_t* _client = obj_getPtr(self, "_client"); int ret; - char topic_str[MQTT_TOPIC_LEN_MAX]; + char topic_str[MQTT_TOPIC_LEN_MAX+24]; + + if(topic == NULL) { + __platform_printf("input topic error\r\n"); + return -1; + } if ((strlen(topic) > MQTT_TOPIC_LEN_MAX)||(strlen(topic) <= 0)) { __platform_printf("input topic error\r\n"); @@ -470,14 +493,6 @@ int _mqtt__MQTT_subscribe(PikaObj* self, char* topic, int qos, Arg* cb) { return -1; } - // obj_setArg(self, "eventCallBack", cb); - // /* init event_listener for the first time */ - // if (NULL == g_mqtt_event_listener) { - // pks_eventLisener_init(&g_mqtt_event_listener); - // } - // uint32_t eventId = hash_time33(topic); - // pks_eventLicener_registEvent(g_mqtt_event_listener, eventId, self); - //必须转换成python环境的变量,否则函数退出后,topic里的是个空指针 memset(topic_str,0,sizeof(topic_str)); sprintf(topic_str,"%s",topic); @@ -486,6 +501,20 @@ int _mqtt__MQTT_subscribe(PikaObj* self, char* topic, int qos, Arg* cb) { ret = mqtt_subscribe(_client, obj_getStr(self, topic_str), qos, Subscribe_Handler); if (ret == 0) { __platform_printf("MQTT_subscribe Topic :%s Qos:%d OK\r\n", topic,qos); + //注册mqtt订阅主题的 回调函数 + if(cb != NULL) { + memset(topic_str,0,sizeof(topic_str)); + sprintf(topic_str,"eventCallBack_%s",topic); + __platform_printf("topic_str:%s \r\n",topic_str); + obj_setArg(self, topic_str, cb); + /* init event_listener for the first time */ + if (NULL == g_mqtt_event_listener) { + pks_eventLisener_init(&g_mqtt_event_listener); + } + uint32_t eventId = hash_time33(topic); + pks_eventLicener_registEvent(g_mqtt_event_listener, eventId, self); + } + } else __platform_printf("MQTT_subscribe Topic ERROR\r\n"); @@ -523,12 +552,24 @@ int _mqtt__MQTT_unsubscribe(PikaObj* self, char* topic) { // 返 回 值:0=成功;非0=错误码 /////////////////////////////////////////////////////////////////// void Subscribe_Handler(void* client, message_data_t* msg) { - // PikaObj* self = ((mqtt_client_t*)client)->user_data; - // Arg* cb = obj_getArg(self, "eventCallBack"); + char topic_str[MQTT_TOPIC_LEN_MAX+24]; + PikaObj* self = ((mqtt_client_t*)client)->user_data; + + //防止数组约界 + memset(topic_str,0,sizeof(topic_str)); + if(strlen(msg->topic_name) <= MQTT_TOPIC_LEN_MAX) + sprintf(topic_str,"eventCallBack_%s",msg->topic_name); + else { + sprintf(topic_str,"eventCallBack_"); + memcpy((topic_str+strlen("eventCallBack_")),msg->topic_name,MQTT_TOPIC_LEN_MAX); + } + __platform_printf("topic_str:%s \r\n",topic_str); + + Arg* cb = obj_getArg(self, topic_str); // obj_setStr(self, "recv_topic", msg->topic_name); // obj_setStr(self, "recv_msg", msg->message->payload); - // pks_eventLisener_sendSignal(g_mqtt_event_listener, - // hash_time33(msg->topic_name), 1); + pks_eventLisener_sendSignal(g_mqtt_event_listener, + hash_time33(msg->topic_name), 1); MQTT_LOG_I("\n>>>------------------"); MQTT_LOG_I("Topic:%s \nlen:%d,message: %s", msg->topic_name, diff --git a/port/linux/package/pikascript/pikascript-lib/mqtt/_mqtt__MQTT.c b/port/linux/package/pikascript/pikascript-lib/mqtt/_mqtt__MQTT.c index fa628ffda..92cec4cd2 100644 --- a/port/linux/package/pikascript/pikascript-lib/mqtt/_mqtt__MQTT.c +++ b/port/linux/package/pikascript/pikascript-lib/mqtt/_mqtt__MQTT.c @@ -421,8 +421,15 @@ int _mqtt__MQTT_setWill(PikaObj* self, char* payload) { mqtt_client_t* _client = obj_getPtr(self, "_client"); int ret; + char topic_str[MQTT_TOPIC_LEN_MAX]; - if (strlen(topic) <= 0) { + __platform_printf("\r\n"); + if(topic == NULL) { + __platform_printf("input topic error\r\n"); + return -1; + } + + if (strlen(topic) <= 0) { __platform_printf("input topic error\r\n"); return -1; } @@ -432,6 +439,11 @@ int _mqtt__MQTT_setWill(PikaObj* self, return -1; } + if(payload == NULL) { + __platform_printf("input payload error\r\n"); + return -1; + } + if (strlen(payload) <= 0) { __platform_printf("input payload error\r\n"); return -1; @@ -439,7 +451,13 @@ int _mqtt__MQTT_setWill(PikaObj* self, __platform_printf("input retain :%d\r\n", (uint8_t)retain); - ret = mqtt_set_will_options(_client, topic, qos, (uint8_t)retain, payload); + //必须转换成python环境的变量,否则函数退出后,topic里的是个空指针 + memset(topic_str,0,sizeof(topic_str)); + sprintf(topic_str,"%s",topic); + obj_setStr(self, topic_str, topic); + obj_setStr(self, "Will_payload", payload); + + ret = mqtt_set_will_options(_client, obj_getStr(self, topic_str), qos, (uint8_t)retain, obj_getStr(self, "Will_payload")); if (ret == 0) { __platform_printf("MQTT_setWill OK\r\n", topic); @@ -458,7 +476,12 @@ int _mqtt__MQTT_setWill(PikaObj* self, int _mqtt__MQTT_subscribe(PikaObj* self, char* topic, int qos, Arg* cb) { mqtt_client_t* _client = obj_getPtr(self, "_client"); int ret; - char topic_str[MQTT_TOPIC_LEN_MAX]; + char topic_str[MQTT_TOPIC_LEN_MAX+24]; + + if(topic == NULL) { + __platform_printf("input topic error\r\n"); + return -1; + } if ((strlen(topic) > MQTT_TOPIC_LEN_MAX)||(strlen(topic) <= 0)) { __platform_printf("input topic error\r\n"); @@ -470,14 +493,6 @@ int _mqtt__MQTT_subscribe(PikaObj* self, char* topic, int qos, Arg* cb) { return -1; } - // obj_setArg(self, "eventCallBack", cb); - // /* init event_listener for the first time */ - // if (NULL == g_mqtt_event_listener) { - // pks_eventLisener_init(&g_mqtt_event_listener); - // } - // uint32_t eventId = hash_time33(topic); - // pks_eventLicener_registEvent(g_mqtt_event_listener, eventId, self); - //必须转换成python环境的变量,否则函数退出后,topic里的是个空指针 memset(topic_str,0,sizeof(topic_str)); sprintf(topic_str,"%s",topic); @@ -486,6 +501,20 @@ int _mqtt__MQTT_subscribe(PikaObj* self, char* topic, int qos, Arg* cb) { ret = mqtt_subscribe(_client, obj_getStr(self, topic_str), qos, Subscribe_Handler); if (ret == 0) { __platform_printf("MQTT_subscribe Topic :%s Qos:%d OK\r\n", topic,qos); + //注册mqtt订阅主题的 回调函数 + if(cb != NULL) { + memset(topic_str,0,sizeof(topic_str)); + sprintf(topic_str,"eventCallBack_%s",topic); + __platform_printf("topic_str:%s \r\n",topic_str); + obj_setArg(self, topic_str, cb); + /* init event_listener for the first time */ + if (NULL == g_mqtt_event_listener) { + pks_eventLisener_init(&g_mqtt_event_listener); + } + uint32_t eventId = hash_time33(topic); + pks_eventLicener_registEvent(g_mqtt_event_listener, eventId, self); + } + } else __platform_printf("MQTT_subscribe Topic ERROR\r\n"); @@ -523,12 +552,24 @@ int _mqtt__MQTT_unsubscribe(PikaObj* self, char* topic) { // 返 回 值:0=成功;非0=错误码 /////////////////////////////////////////////////////////////////// void Subscribe_Handler(void* client, message_data_t* msg) { - // PikaObj* self = ((mqtt_client_t*)client)->user_data; - // Arg* cb = obj_getArg(self, "eventCallBack"); + char topic_str[MQTT_TOPIC_LEN_MAX+24]; + PikaObj* self = ((mqtt_client_t*)client)->user_data; + + //防止数组约界 + memset(topic_str,0,sizeof(topic_str)); + if(strlen(msg->topic_name) <= MQTT_TOPIC_LEN_MAX) + sprintf(topic_str,"eventCallBack_%s",msg->topic_name); + else { + sprintf(topic_str,"eventCallBack_"); + memcpy((topic_str+strlen("eventCallBack_")),msg->topic_name,MQTT_TOPIC_LEN_MAX); + } + __platform_printf("topic_str:%s \r\n",topic_str); + + Arg* cb = obj_getArg(self, topic_str); // obj_setStr(self, "recv_topic", msg->topic_name); // obj_setStr(self, "recv_msg", msg->message->payload); - // pks_eventLisener_sendSignal(g_mqtt_event_listener, - // hash_time33(msg->topic_name), 1); + pks_eventLisener_sendSignal(g_mqtt_event_listener, + hash_time33(msg->topic_name), 1); MQTT_LOG_I("\n>>>------------------"); MQTT_LOG_I("Topic:%s \nlen:%d,message: %s", msg->topic_name, diff --git a/test/python/mqtt/mqtt_subscribe.py b/test/python/mqtt/mqtt_subscribe.py index 6de77e56d..167796c27 100644 --- a/test/python/mqtt/mqtt_subscribe.py +++ b/test/python/mqtt/mqtt_subscribe.py @@ -25,14 +25,15 @@ T.sleep_s(5) out = client.listSubscribrTopic() print('out',out) -client.unsubscribe('topic_pikapy_qos0'); -client.unsubscribe('topic_pikapy_qos1'); -client.unsubscribe('topic_pikapy_qos2'); +# client.unsubscribe('topic_pikapy_qos0'); +# client.unsubscribe('topic_pikapy_qos1'); +# client.unsubscribe('topic_pikapy_qos2'); +# T.sleep_s(5) +# client.listSubscribrTopic() -T.sleep_s(5) -client.listSubscribrTopic() +# ret = client.setWill(1,'topic_will',1,'lost mqtt connect') T.sleep_s(10) - +# exit() ret = client.disconnect() print("ret:%d" % ret) \ No newline at end of file