diff --git a/package/mqtt/_mqtt__MQTT.c b/package/mqtt/_mqtt__MQTT.c index 0ef5cd44f..33d012128 100644 --- a/package/mqtt/_mqtt__MQTT.c +++ b/package/mqtt/_mqtt__MQTT.c @@ -1,6 +1,7 @@ #include "_mqtt__MQTT.h" -#include "mqttclient.h" #include "PikaStdData_List.h" +#include "TinyObj.h" +#include "mqttclient.h" PikaEventListener* g_mqtt_event_listener = NULL; @@ -32,8 +33,7 @@ void _mqtt__MQTT___init__(PikaObj* self, if (strlen(ip) > 0) { obj_setStr(self, "host_str", ip); mqtt_set_host(_client, obj_getStr(self, "host_str")); - } - else { + } else { __platform_printf("mqtt_init input ip none\r\n"); } @@ -45,54 +45,48 @@ void _mqtt__MQTT___init__(PikaObj* self, if (strlen(clinetID) > 0) { obj_setStr(self, "id_str", clinetID); mqtt_set_client_id(_client, obj_getStr(self, "id_str")); - } - else { + } else { __platform_printf("mqtt_init input clinetID none\r\n"); } if (strlen(username) > 0) { obj_setStr(self, "username_str", username); mqtt_set_user_name(_client, obj_getStr(self, "username_str")); - } - else { + } else { __platform_printf("mqtt_init input username none\r\n"); } if (strlen(password) > 0) { obj_setStr(self, "password_str", password); mqtt_set_password(_client, obj_getStr(self, "password_str")); - } - else { + } else { __platform_printf("mqtt_init input password none\r\n"); } tmp = atoi(version); if (tmp > 0) { mqtt_set_version(_client, tmp); - } - else { + } else { __platform_printf("mqtt_init input version none\r\n"); } if (strlen(ca) > 0) { obj_setStr(self, "ca_str", ca); mqtt_set_ca(_client, obj_getStr(self, "ca_str")); - } - else { + } else { // __platform_printf("mqtt_init input ca none\r\n"); } if (keepalive > 0) { mqtt_set_keep_alive_interval(_client, keepalive); - } - else { + } else { __platform_printf("mqtt_init input keepalive none\r\n"); } mqtt_set_clean_session(_client, 1); obj_setPtr(self, "_client", - _client); //这里要再保存一次mqtt结构体的内容到python环境 + _client); // 这里要再保存一次mqtt结构体的内容到python环境 // __platform_printf("Mqtt_Lib buildtime:%s-%s\r\n", __DATE__, __TIME__); } @@ -169,8 +163,7 @@ PikaObj* _mqtt__MQTT_listSubscribeTopic(PikaObj* self) { message_handlers_t* msg_handler; PikaObj* list = NULL; - if (NULL == _client) - { + if (NULL == _client) { return NULL; } @@ -184,15 +177,16 @@ PikaObj* _mqtt__MQTT_listSubscribeTopic(PikaObj* self) { list = newNormalObj(New_PikaStdData_List); /* 初始化 list */ PikaStdData_List___init__(list); - + LIST_FOR_EACH_SAFE(curr, next, &_client->mqtt_msg_handler_list) { msg_handler = LIST_ENTRY(curr, message_handlers_t, list); /* determine whether a node already exists by mqtt topic, but wildcards * are not supported */ if (NULL != msg_handler->topic_filter) { // MQTT_LOG_I("%s:%d %s()...[%d] subscribe topic: %s", __FILE__, - // __LINE__, __FUNCTION__, ++i, msg_handler->topic_filter); - + // __LINE__, __FUNCTION__, ++i, + // msg_handler->topic_filter); + /* 用 arg_new 的 api 创建 arg */ Arg* str_arg1 = arg_newStr((char*)msg_handler->topic_filter); /* 添加到 list 对象 */ @@ -209,14 +203,14 @@ PikaObj* _mqtt__MQTT_listSubscribeTopic(PikaObj* self) { // 输入参数:主题名称,有效数据 // 返 回 值:0=成功;非0=错误码 /////////////////////////////////////////////////////////////////// -int _mqtt__MQTT_publish(PikaObj *self, char* topic, int qos, char* payload) { +int _mqtt__MQTT_publish(PikaObj* self, char* topic, int qos, char* payload) { int ret; mqtt_message_t msg; mqtt_client_t* _client = obj_getPtr(self, "_client"); memset(&msg, 0, sizeof(msg)); - if((qos < 0) || (qos > 2)) { + if ((qos < 0) || (qos > 2)) { __platform_printf("input qos error\r\n"); return -1; } @@ -232,12 +226,12 @@ int _mqtt__MQTT_publish(PikaObj *self, char* topic, int qos, char* payload) { msg.payload = (void*)payload; msg.qos = qos; - __platform_printf("msg.qos:%d\r\n",msg.qos);//这里为了防止被优化,导致运行异常 + __platform_printf("msg.qos:%d\r\n", + msg.qos); // 这里为了防止被优化,导致运行异常 ret = mqtt_publish(_client, topic, &msg); if (ret == 0) { // __platform_printf("MQTT_publish OK\r\n"); - } - else + } else __platform_printf("MQTT_publish ERROR\r\n"); return ret; } @@ -310,8 +304,12 @@ int _mqtt__MQTT_setHost(PikaObj* self, char* host_url) { return -2; } - obj_setStr(self, "host_str",host_url); // python 环境创建一个全局变量存放 host - mqtt_set_host(_client,obj_getStr(self,"host_str")); //从python环境中取出 host的指针 赋值给结构体 + obj_setStr(self, "host_str", + host_url); // python 环境创建一个全局变量存放 host + mqtt_set_host( + _client, + obj_getStr(self, + "host_str")); // 从python环境中取出 host的指针 赋值给结构体 // __platform_printf("MQTT_setHost :%s\r\n", host_url); return 0; @@ -323,7 +321,7 @@ int _mqtt__MQTT_setHost(PikaObj* self, char* host_url) { // 输入参数:字符串格式 // 返 回 值:0=成功;非0=错误码 /////////////////////////////////////////////////////////////////// -int _mqtt__MQTT_setKeepAlive(PikaObj *self, int time) { +int _mqtt__MQTT_setKeepAlive(PikaObj* self, int time) { mqtt_client_t* _client = obj_getPtr(self, "_client"); int tmp; @@ -424,14 +422,13 @@ int _mqtt__MQTT_setVersion(PikaObj* self, char* version) { mqtt_client_t* _client = obj_getPtr(self, "_client"); // int tmp; - if(version == NULL) { + if (version == NULL) { __platform_printf("input version str error\n"); return -1; } - if((strcmp(version,"3.1") == 0) || (strcmp(version,"3.1.1") == 0)) { + if ((strcmp(version, "3.1") == 0) || (strcmp(version, "3.1.1") == 0)) { mqtt_set_version(_client, 3); - } - else { + } else { __platform_printf("input version data error\n"); return -2; } @@ -446,18 +443,22 @@ int _mqtt__MQTT_setVersion(PikaObj* self, char* version) { // 输入参数: // 返 回 值:0=成功;非0=错误码 /////////////////////////////////////////////////////////////////// -int _mqtt__MQTT_setWill(PikaObj *self, char* topic, char* payload, int qos, int retain) { +int _mqtt__MQTT_setWill(PikaObj* self, + char* topic, + char* payload, + int qos, + int retain) { mqtt_client_t* _client = obj_getPtr(self, "_client"); int ret; char topic_str[MQTT_TOPIC_LEN_MAX]; // __platform_printf("\r\n"); - if(topic == NULL) { + if (topic == NULL) { __platform_printf("input topic error\r\n"); return -1; } - if (strlen(topic) <= 0) { + if (strlen(topic) <= 0) { __platform_printf("input topic error\r\n"); return -1; } @@ -467,11 +468,11 @@ int _mqtt__MQTT_setWill(PikaObj *self, char* topic, char* payload, int qos, int return -1; } - if(payload == NULL) { + if (payload == NULL) { __platform_printf("input payload error\r\n"); return -1; } - + if (strlen(payload) <= 0) { __platform_printf("input payload error\r\n"); return -1; @@ -479,13 +480,15 @@ int _mqtt__MQTT_setWill(PikaObj *self, char* topic, char* payload, int qos, int // __platform_printf("input retain :%d\r\n", (uint8_t)retain); - //必须转换成python环境的变量,否则函数退出后,topic里的是个空指针 - memset(topic_str,0,sizeof(topic_str)); - sprintf(topic_str,"%s",topic); + // 必须转换成python环境的变量,否则函数退出后,topic里的是个空指针 + memset(topic_str, 0, sizeof(topic_str)); + sprintf(topic_str, "%s", topic); obj_setStr(self, topic_str, topic); obj_setStr(self, "Will_payload", payload); - ret = mqtt_set_will_options(_client, obj_getStr(self, topic_str), qos, (uint8_t)retain, obj_getStr(self, "Will_payload")); + ret = mqtt_set_will_options(_client, obj_getStr(self, topic_str), qos, + (uint8_t)retain, + obj_getStr(self, "Will_payload")); if (ret == 0) { // __platform_printf("MQTT_setWill OK\r\n", topic); @@ -501,19 +504,20 @@ int _mqtt__MQTT_setWill(PikaObj *self, char* topic, char* payload, int qos, int // 输入参数: // 返 回 值:0=成功;非0=错误码 /////////////////////////////////////////////////////////////////// -int _mqtt__MQTT_subscribe(PikaObj *self, char* topic, int qos, Arg* cb) { +int _mqtt__MQTT_subscribe(PikaObj* self, char* topic, int qos, Arg* cb) { mqtt_client_t* _client = obj_getPtr(self, "_client"); int ret; - char topic_str[MQTT_TOPIC_LEN_MAX+24]; - + char topic_str[MQTT_TOPIC_LEN_MAX + 24]; + // __platform_printf("topic_str:%s \r\n",topic_str); - if(topic == NULL) { + if (topic == NULL) { __platform_printf("input topic error\r\n"); return -1; } - if ((strlen(topic) > MQTT_TOPIC_LEN_MAX)||(strlen(topic) <= 0)) { - __platform_printf("input topic data error strlen(topic):%d\r\n",strlen(topic)); + if ((strlen(topic) > MQTT_TOPIC_LEN_MAX) || (strlen(topic) <= 0)) { + __platform_printf("input topic data error strlen(topic):%d\r\n", + strlen(topic)); return -2; } @@ -521,34 +525,37 @@ int _mqtt__MQTT_subscribe(PikaObj *self, char* topic, int qos, Arg* cb) { __platform_printf("input qos error\r\n"); return -3; } - - //必须转换成python环境的变量,否则函数退出后,topic里的是个空指针 - memset(topic_str,0,sizeof(topic_str)); - sprintf(topic_str,"%s",topic); + + // 必须转换成python环境的变量,否则函数退出后,topic里的是个空指针 + memset(topic_str, 0, sizeof(topic_str)); + sprintf(topic_str, "%s", topic); obj_setStr(self, topic_str, topic); - ret = mqtt_subscribe(_client, obj_getStr(self, topic_str), qos, Subscribe_Handler); + ret = mqtt_subscribe(_client, obj_getStr(self, topic_str), qos, + Subscribe_Handler); if (ret == 0) { - // __platform_printf("MQTT_subscribe Topic :%s Qos:%d OK\r\n", topic,qos); - //注册mqtt订阅主题的 回调函数 - if(cb != NULL) { - memset(topic_str,0,sizeof(topic_str)); - // sprintf(topic_str,"eventCallBack_%s",topic); - sprintf(topic_str,"eventCallBack"); - // __platform_printf("topic_str:%s \r\n",topic_str); - obj_setArg(self, topic_str, cb); + // __platform_printf("MQTT_subscribe Topic :%s Qos:%d OK\r\n", + // topic,qos); + // 注册mqtt订阅主题的 回调函数 + if (cb != NULL) { + char hash_str[32] = {0}; + memset(hash_str, 0, sizeof(hash_str)); + sprintf(hash_str, "C%d", hash_time33(topic)); + obj_newDirectObj(self, hash_str, New_TinyObj); + PikaObj* eventHandler = obj_getPtr(self, hash_str); + obj_setArg(eventHandler, "eventCallBack", cb); /* init event_listener for the first time */ if (NULL == g_mqtt_event_listener) { - pks_eventLisener_init(&g_mqtt_event_listener); + pks_eventListener_init(&g_mqtt_event_listener); } uint32_t eventId = hash_time33(topic_str); - // __platform_printf("hash_time33(topic_str):%d \r\n",hash_time33(topic_str)); - pks_eventLicener_registEvent(g_mqtt_event_listener, eventId, self); + pks_eventListener_registEvent(g_mqtt_event_listener, eventId, + eventHandler); } } else __platform_printf("MQTT_subscribe Topic ERROR\r\n"); - + return ret; } @@ -583,33 +590,34 @@ int _mqtt__MQTT_unsubscribe(PikaObj* self, char* topic) { // 返 回 值:0=成功;非0=错误码 /////////////////////////////////////////////////////////////////// void Subscribe_Handler(void* client, message_data_t* msg) { - char topic_str[MQTT_TOPIC_LEN_MAX+24]; + char topic_str[MQTT_TOPIC_LEN_MAX + 24]; PikaObj* self = ((mqtt_client_t*)client)->user_data; char hash_str[32] = {0}; - //防止数组越界 - memset(topic_str,0,sizeof(topic_str)); - if(strlen(msg->topic_name) <= MQTT_TOPIC_LEN_MAX) + // 防止数组越界 + memset(topic_str, 0, sizeof(topic_str)); + if (strlen(msg->topic_name) <= MQTT_TOPIC_LEN_MAX) // sprintf(topic_str,"eventCallBack_%s",msg->topic_name); - sprintf(topic_str,"eventCallBack"); + sprintf(topic_str, "eventCallBack"); else { - sprintf(topic_str,"eventCallBack_"); - memcpy((topic_str+strlen("eventCallBack_")),msg->topic_name,MQTT_TOPIC_LEN_MAX); + sprintf(topic_str, "eventCallBack_"); + memcpy((topic_str + strlen("eventCallBack_")), msg->topic_name, + MQTT_TOPIC_LEN_MAX); } - - pks_eventLisener_sendSignal(g_mqtt_event_listener, - hash_time33(topic_str), hash_time33(msg->topic_name)); - - memset(hash_str,0,sizeof(hash_str)); - sprintf(hash_str,"M%d",hash_time33(msg->topic_name)); + + pks_eventListener_sendSignal(g_mqtt_event_listener, hash_time33(topic_str), + hash_time33(msg->topic_name)); + + memset(hash_str, 0, sizeof(hash_str)); + sprintf(hash_str, "M%d", hash_time33(msg->topic_name)); obj_setStr(self, hash_str, (char*)msg->message->payload); - memset(hash_str,0,sizeof(hash_str)); - sprintf(hash_str,"T%d",hash_time33(msg->topic_name)); + memset(hash_str, 0, sizeof(hash_str)); + sprintf(hash_str, "T%d", hash_time33(msg->topic_name)); obj_setStr(self, hash_str, (char*)msg->topic_name); - memset(hash_str,0,sizeof(hash_str)); - sprintf(hash_str,"Q%d",hash_time33(msg->topic_name)); + memset(hash_str, 0, sizeof(hash_str)); + sprintf(hash_str, "Q%d", hash_time33(msg->topic_name)); obj_setInt(self, hash_str, msg->message->qos); // MQTT_LOG_I("\n>>>------------------"); @@ -626,7 +634,7 @@ void Subscribe_Handler(void* client, message_data_t* msg) { /////////////////////////////////////////////////////////////////// void _mqtt___del__(PikaObj* self) { if (NULL != g_mqtt_event_listener) { - pks_eventLisener_deinit(&g_mqtt_event_listener); + pks_eventListener_deinit(&g_mqtt_event_listener); } } @@ -636,12 +644,12 @@ void _mqtt___del__(PikaObj* self) { // 输入参数: // 返 回 值: /////////////////////////////////////////////////////////////////// -char* _mqtt__MQTT_getMsg(PikaObj *self, int signal) { +char* _mqtt__MQTT_getMsg(PikaObj* self, int signal) { // mqtt_client_t* _client = obj_getPtr(self, "_client"); char hash_str[32]; - memset(hash_str,0,sizeof(hash_str)); - sprintf(hash_str,"M%d",signal); + memset(hash_str, 0, sizeof(hash_str)); + sprintf(hash_str, "M%d", signal); return (obj_getStr(self, hash_str)); } @@ -651,11 +659,11 @@ char* _mqtt__MQTT_getMsg(PikaObj *self, int signal) { // 输入参数: // 返 回 值: /////////////////////////////////////////////////////////////////// -char* _mqtt__MQTT_getTopic(PikaObj *self, int signal) { +char* _mqtt__MQTT_getTopic(PikaObj* self, int signal) { char hash_str[32]; - memset(hash_str,0,sizeof(hash_str)); - sprintf(hash_str,"T%d",signal); + memset(hash_str, 0, sizeof(hash_str)); + sprintf(hash_str, "T%d", signal); return (obj_getStr(self, hash_str)); } @@ -665,11 +673,11 @@ char* _mqtt__MQTT_getTopic(PikaObj *self, int signal) { // 输入参数: // 返 回 值: /////////////////////////////////////////////////////////////////// -int _mqtt__MQTT_getQos(PikaObj *self, int signal) { +int _mqtt__MQTT_getQos(PikaObj* self, int signal) { char hash_str[32]; - memset(hash_str,0,sizeof(hash_str)); - sprintf(hash_str,"Q%d",signal); + memset(hash_str, 0, sizeof(hash_str)); + sprintf(hash_str, "Q%d", signal); return (obj_getInt(self, hash_str)); } @@ -679,7 +687,7 @@ int _mqtt__MQTT_getQos(PikaObj *self, int signal) { // 输入参数: // 返 回 值: /////////////////////////////////////////////////////////////////// -int _mqtt__MQTT_setDisconnectHandler(PikaObj *self, Arg* cb) { +int _mqtt__MQTT_setDisconnectHandler(PikaObj* self, Arg* cb) { // mqtt_client_t* _client = obj_getPtr(self, "_client"); __platform_printf("_mqtt__MQTT_setDisconnectHandler\r\n");