diff --git a/examples/mqtt/mqtt_subscribe.py b/examples/mqtt/mqtt_subscribe.py index 092baf04b..2c32bda87 100644 --- a/examples/mqtt/mqtt_subscribe.py +++ b/examples/mqtt/mqtt_subscribe.py @@ -56,7 +56,12 @@ print('listSubscribeTopic out', out) ret = client.setDisconnectHandler(reconnect_mq); print("setDisconnectHandler:%d" % ret) -# ret = client.setWill(1,'topic_will',1,'lost mqtt connect') +# ret = client.setWill('topic_will','lost mqtt connect') +# print("setWill:%d" % ret) +# client.publish('topic_will', 'hello pikascript', 1) +# T.sleep_s(5) +# print("sleep_s:5s") + T.sleep_s(30) # exit() ret = client.disconnect() diff --git a/package/mqtt/_mqtt__MQTT.c b/package/mqtt/_mqtt__MQTT.c index f268ef04e..0c1eb528b 100644 --- a/package/mqtt/_mqtt__MQTT.c +++ b/package/mqtt/_mqtt__MQTT.c @@ -160,7 +160,7 @@ int _mqtt__MQTT_disconnect(PikaObj* self) { /////////////////////////////////////////////////////////////////// PikaObj* _mqtt__MQTT_listSubscribeTopic(PikaObj* self) { mqtt_client_t* _client = obj_getPtr(self, "_client"); - int i = 0; + // int i = 0; mqtt_list_t *curr, *next; message_handlers_t* msg_handler; PikaObj* list = NULL; @@ -187,7 +187,7 @@ PikaObj* _mqtt__MQTT_listSubscribeTopic(PikaObj* self) { if (NULL != msg_handler->topic_filter) { MQTT_LOG_I("%s:%d %s()...[%d] subscribe topic: %s", __FILE__, __LINE__, __FUNCTION__, ++i, msg_handler->topic_filter); - __platform_printf("[%d]subscribe topic: %s\n",++i, msg_handler->topic_filter); + // __platform_printf("[%d]subscribe topic: %s\n",++i, msg_handler->topic_filter); /* 用 arg_new 的 api 创建 arg */ Arg* str_arg1 = arg_newStr((char*)msg_handler->topic_filter); /* 添加到 list 对象 */ @@ -482,7 +482,6 @@ int _mqtt__MQTT_setWill(PikaObj* self, return -1; } - // __platform_printf("input retain :%d\r\n", (uint8_t)retain); // 必须转换成python环境的变量,否则函数退出后,topic里的是个空指针 memset(topic_str, 0, sizeof(topic_str)); @@ -490,6 +489,11 @@ int _mqtt__MQTT_setWill(PikaObj* self, obj_setStr(self, topic_str, topic); obj_setStr(self, "Will_payload", payload); + // __platform_printf("obj_getStr(self, topic_str) :%s\r\n", obj_getStr(self, topic_str)); + // __platform_printf("iqos :%d\r\n", qos); + // __platform_printf("retain :%d\r\n", retain); + // __platform_printf("obj_getStr(self, \"Will_payload\") :%s\r\n", obj_getStr(self, "Will_payload")); + ret = mqtt_set_will_options(_client, obj_getStr(self, topic_str), qos, (uint8_t)retain, obj_getStr(self, "Will_payload")); @@ -689,12 +693,13 @@ int _mqtt__MQTT_getQos(PikaObj* self, int signal) { // 返 回 值:0=成功;非0=错误码 /////////////////////////////////////////////////////////////////// void Reconnect_Handler(void *client, void *reconnect_date) { - PikaObj* self = ((mqtt_client_t*)client)->user_data; - __platform_printf("Reconnect_Handler\r\n"); + // PikaObj* self = ((mqtt_client_t*)client)->user_data; + // __platform_printf("Reconnect_Handler\r\n"); - //发送事件信号 - pks_eventListener_sendSignal(g_mqtt_event_listener,MQTT_RECONNECTION_EVENT_ID, - 112233); + if(((mqtt_client_t*)client)->mqtt_client_state != CLIENT_STATE_CONNECTED) { + //发送事件信号 + pks_eventListener_sendSignal(g_mqtt_event_listener,MQTT_RECONNECTION_EVENT_ID,1); + } } @@ -707,7 +712,7 @@ void Reconnect_Handler(void *client, void *reconnect_date) { int _mqtt__MQTT_setDisconnectHandler(PikaObj* self, Arg* cb) { mqtt_client_t* _client = obj_getPtr(self, "_client"); - __platform_printf("_mqtt__MQTT_setDisconnectHandler\r\n"); + // __platform_printf("_mqtt__MQTT_setDisconnectHandler\r\n"); //注册到c库中 mqtt_set_reconnect_handler(_client,Reconnect_Handler); diff --git a/package/mqtt/mqtt.py b/package/mqtt/mqtt.py index 998adf1e0..5549b368b 100644 --- a/package/mqtt/mqtt.py +++ b/package/mqtt/mqtt.py @@ -15,13 +15,13 @@ class MQTT(_mqtt._MQTT): username, password, version, ca, keepalive) - def subscribe(self, topic, cb, qos=0): + def subscribe(self, topic, cb, qos=1): return super().subscribe(topic, cb, qos) - def publish(self, topic, payload, qos=0): + def publish(self, topic, payload, qos=1): return super().publish(topic, payload, qos) - def setWill(self, topic, payload, qos=0, retain=0): + def setWill(self, topic, payload, qos=1, retain=0): return super().setWill(topic, payload, qos, retain) def unsubscribe(self, topic=''): diff --git a/package/mqtt/mqtt_config.h b/package/mqtt/mqtt_config.h index b58657148..89a6ed961 100644 --- a/package/mqtt/mqtt_config.h +++ b/package/mqtt/mqtt_config.h @@ -9,7 +9,7 @@ // #define MQTT_LOG_IS_SALOF -#define MQTT_LOG_LEVEL MQTT_LOG_WARN_LEVEL //MQTT_LOG_WARN_LEVEL MQTT_LOG_DEBUG_LEVEL +#define MQTT_LOG_LEVEL MQTT_LOG_ERR_LEVEL //MQTT_LOG_WARN_LEVEL MQTT_LOG_DEBUG_LEVEL #ifdef MQTT_LOG_IS_SALOF #define SALOF_USING_LOG (1U) diff --git a/package/mqtt/mqttclient.c b/package/mqtt/mqttclient.c index 5a446b154..11d1361cd 100644 --- a/package/mqtt/mqttclient.c +++ b/package/mqtt/mqttclient.c @@ -630,9 +630,10 @@ static int mqtt_try_resubscribe(mqtt_client_t* c) { /* resubscribe topic */ if ((rc = mqtt_subscribe(c, msg_handler->topic_filter, msg_handler->qos, msg_handler->handler)) == - MQTT_ACK_HANDLER_NUM_TOO_MUCH_ERROR) - MQTT_LOG_W("%s:%d %s()... mqtt ack handler num too much ...", - __FILE__, __LINE__, __FUNCTION__); + MQTT_ACK_HANDLER_NUM_TOO_MUCH_ERROR) { + MQTT_LOG_W("%s:%d %s()... mqtt ack handler num too much ...", + __FILE__, __LINE__, __FUNCTION__); + } } RETURN_ERROR(rc); @@ -995,7 +996,7 @@ static int mqtt_yield(mqtt_client_t* c, int timeout_ms) { mqtt_ack_list_scan(c, 1); } else if (MQTT_NOT_CONNECT_ERROR == rc) { - MQTT_LOG_E("%s:%d %s()... mqtt not connect", __FILE__, __LINE__, + MQTT_LOG_W("%s:%d %s()... mqtt not connect", __FILE__, __LINE__, __FUNCTION__); } else { break; @@ -1563,7 +1564,7 @@ exit: } int mqtt_list_subscribe_topic(mqtt_client_t* c) { - int i = 0; + // int i = 0; mqtt_list_t *curr, *next; message_handlers_t* msg_handler; diff --git a/port/linux/package/pikascript/mqtt.py b/port/linux/package/pikascript/mqtt.py index 998adf1e0..5549b368b 100644 --- a/port/linux/package/pikascript/mqtt.py +++ b/port/linux/package/pikascript/mqtt.py @@ -15,13 +15,13 @@ class MQTT(_mqtt._MQTT): username, password, version, ca, keepalive) - def subscribe(self, topic, cb, qos=0): + def subscribe(self, topic, cb, qos=1): return super().subscribe(topic, cb, qos) - def publish(self, topic, payload, qos=0): + def publish(self, topic, payload, qos=1): return super().publish(topic, payload, qos) - def setWill(self, topic, payload, qos=0, retain=0): + def setWill(self, topic, payload, qos=1, retain=0): return super().setWill(topic, payload, qos, retain) def unsubscribe(self, topic=''): diff --git a/port/linux/package/pikascript/pikascript-lib/mqtt/_mqtt__MQTT.c b/port/linux/package/pikascript/pikascript-lib/mqtt/_mqtt__MQTT.c index f268ef04e..0c1eb528b 100644 --- a/port/linux/package/pikascript/pikascript-lib/mqtt/_mqtt__MQTT.c +++ b/port/linux/package/pikascript/pikascript-lib/mqtt/_mqtt__MQTT.c @@ -160,7 +160,7 @@ int _mqtt__MQTT_disconnect(PikaObj* self) { /////////////////////////////////////////////////////////////////// PikaObj* _mqtt__MQTT_listSubscribeTopic(PikaObj* self) { mqtt_client_t* _client = obj_getPtr(self, "_client"); - int i = 0; + // int i = 0; mqtt_list_t *curr, *next; message_handlers_t* msg_handler; PikaObj* list = NULL; @@ -187,7 +187,7 @@ PikaObj* _mqtt__MQTT_listSubscribeTopic(PikaObj* self) { if (NULL != msg_handler->topic_filter) { MQTT_LOG_I("%s:%d %s()...[%d] subscribe topic: %s", __FILE__, __LINE__, __FUNCTION__, ++i, msg_handler->topic_filter); - __platform_printf("[%d]subscribe topic: %s\n",++i, msg_handler->topic_filter); + // __platform_printf("[%d]subscribe topic: %s\n",++i, msg_handler->topic_filter); /* 用 arg_new 的 api 创建 arg */ Arg* str_arg1 = arg_newStr((char*)msg_handler->topic_filter); /* 添加到 list 对象 */ @@ -482,7 +482,6 @@ int _mqtt__MQTT_setWill(PikaObj* self, return -1; } - // __platform_printf("input retain :%d\r\n", (uint8_t)retain); // 必须转换成python环境的变量,否则函数退出后,topic里的是个空指针 memset(topic_str, 0, sizeof(topic_str)); @@ -490,6 +489,11 @@ int _mqtt__MQTT_setWill(PikaObj* self, obj_setStr(self, topic_str, topic); obj_setStr(self, "Will_payload", payload); + // __platform_printf("obj_getStr(self, topic_str) :%s\r\n", obj_getStr(self, topic_str)); + // __platform_printf("iqos :%d\r\n", qos); + // __platform_printf("retain :%d\r\n", retain); + // __platform_printf("obj_getStr(self, \"Will_payload\") :%s\r\n", obj_getStr(self, "Will_payload")); + ret = mqtt_set_will_options(_client, obj_getStr(self, topic_str), qos, (uint8_t)retain, obj_getStr(self, "Will_payload")); @@ -689,12 +693,13 @@ int _mqtt__MQTT_getQos(PikaObj* self, int signal) { // 返 回 值:0=成功;非0=错误码 /////////////////////////////////////////////////////////////////// void Reconnect_Handler(void *client, void *reconnect_date) { - PikaObj* self = ((mqtt_client_t*)client)->user_data; - __platform_printf("Reconnect_Handler\r\n"); + // PikaObj* self = ((mqtt_client_t*)client)->user_data; + // __platform_printf("Reconnect_Handler\r\n"); - //发送事件信号 - pks_eventListener_sendSignal(g_mqtt_event_listener,MQTT_RECONNECTION_EVENT_ID, - 112233); + if(((mqtt_client_t*)client)->mqtt_client_state != CLIENT_STATE_CONNECTED) { + //发送事件信号 + pks_eventListener_sendSignal(g_mqtt_event_listener,MQTT_RECONNECTION_EVENT_ID,1); + } } @@ -707,7 +712,7 @@ void Reconnect_Handler(void *client, void *reconnect_date) { int _mqtt__MQTT_setDisconnectHandler(PikaObj* self, Arg* cb) { mqtt_client_t* _client = obj_getPtr(self, "_client"); - __platform_printf("_mqtt__MQTT_setDisconnectHandler\r\n"); + // __platform_printf("_mqtt__MQTT_setDisconnectHandler\r\n"); //注册到c库中 mqtt_set_reconnect_handler(_client,Reconnect_Handler); diff --git a/port/linux/package/pikascript/pikascript-lib/mqtt/mqtt_config.h b/port/linux/package/pikascript/pikascript-lib/mqtt/mqtt_config.h index b58657148..89a6ed961 100644 --- a/port/linux/package/pikascript/pikascript-lib/mqtt/mqtt_config.h +++ b/port/linux/package/pikascript/pikascript-lib/mqtt/mqtt_config.h @@ -9,7 +9,7 @@ // #define MQTT_LOG_IS_SALOF -#define MQTT_LOG_LEVEL MQTT_LOG_WARN_LEVEL //MQTT_LOG_WARN_LEVEL MQTT_LOG_DEBUG_LEVEL +#define MQTT_LOG_LEVEL MQTT_LOG_ERR_LEVEL //MQTT_LOG_WARN_LEVEL MQTT_LOG_DEBUG_LEVEL #ifdef MQTT_LOG_IS_SALOF #define SALOF_USING_LOG (1U) diff --git a/port/linux/package/pikascript/pikascript-lib/mqtt/mqttclient.c b/port/linux/package/pikascript/pikascript-lib/mqtt/mqttclient.c index 5a446b154..11d1361cd 100644 --- a/port/linux/package/pikascript/pikascript-lib/mqtt/mqttclient.c +++ b/port/linux/package/pikascript/pikascript-lib/mqtt/mqttclient.c @@ -630,9 +630,10 @@ static int mqtt_try_resubscribe(mqtt_client_t* c) { /* resubscribe topic */ if ((rc = mqtt_subscribe(c, msg_handler->topic_filter, msg_handler->qos, msg_handler->handler)) == - MQTT_ACK_HANDLER_NUM_TOO_MUCH_ERROR) - MQTT_LOG_W("%s:%d %s()... mqtt ack handler num too much ...", - __FILE__, __LINE__, __FUNCTION__); + MQTT_ACK_HANDLER_NUM_TOO_MUCH_ERROR) { + MQTT_LOG_W("%s:%d %s()... mqtt ack handler num too much ...", + __FILE__, __LINE__, __FUNCTION__); + } } RETURN_ERROR(rc); @@ -995,7 +996,7 @@ static int mqtt_yield(mqtt_client_t* c, int timeout_ms) { mqtt_ack_list_scan(c, 1); } else if (MQTT_NOT_CONNECT_ERROR == rc) { - MQTT_LOG_E("%s:%d %s()... mqtt not connect", __FILE__, __LINE__, + MQTT_LOG_W("%s:%d %s()... mqtt not connect", __FILE__, __LINE__, __FUNCTION__); } else { break; @@ -1563,7 +1564,7 @@ exit: } int mqtt_list_subscribe_topic(mqtt_client_t* c) { - int i = 0; + // int i = 0; mqtt_list_t *curr, *next; message_handlers_t* msg_handler; diff --git a/test/python/mqtt/mqtt_subscribe.py b/test/python/mqtt/mqtt_subscribe.py index 092baf04b..2c32bda87 100644 --- a/test/python/mqtt/mqtt_subscribe.py +++ b/test/python/mqtt/mqtt_subscribe.py @@ -56,7 +56,12 @@ print('listSubscribeTopic out', out) ret = client.setDisconnectHandler(reconnect_mq); print("setDisconnectHandler:%d" % ret) -# ret = client.setWill(1,'topic_will',1,'lost mqtt connect') +# ret = client.setWill('topic_will','lost mqtt connect') +# print("setWill:%d" % ret) +# client.publish('topic_will', 'hello pikascript', 1) +# T.sleep_s(5) +# print("sleep_s:5s") + T.sleep_s(30) # exit() ret = client.disconnect()