pikapython/package/mqtt/_mqtt__MQTT.c

714 lines
24 KiB
C
Raw Normal View History

2022-11-12 22:21:23 +08:00
#include "_mqtt__MQTT.h"
2022-12-27 22:53:40 +08:00
#include "PikaObj.h"
#include "PikaStdData_List.h"
#include "TinyObj.h"
2022-12-18 22:02:55 +08:00
#include "mqttclient.h"
2022-12-29 18:22:20 +08:00
#if !PIKASCRIPT_VERSION_REQUIRE_MINIMUN(1, 12, 0)
#error "This library requires PikaScript version 1.12.0 or higher"
#endif
2022-11-23 21:16:20 +08:00
PikaEventListener* g_mqtt_event_listener = NULL;
void Subscribe_Handler(void* client, message_data_t* msg);
const uint32_t MQTT_RECONNECTION_EVENT_ID = 0xFFAA0088;
////////////////////////////////////////////////////////////////////
// 函 数 名_mqtt__MQTT___init__
// 功能说明:对象初始化
// 输入参数:
// 返 回 值:无
///////////////////////////////////////////////////////////////////
2022-11-12 22:21:23 +08:00
void _mqtt__MQTT___init__(PikaObj* self,
char* ip,
int port,
char* clinetID,
char* username,
char* password,
char* version,
char* ca,
int keepalive) {
2022-11-22 22:08:18 +08:00
obj_setInt(self, "_connected", 0);
2022-11-13 15:16:08 +08:00
mqtt_client_t* _client = mqtt_lease();
2022-11-23 21:16:20 +08:00
_client->user_data = self;
2022-11-22 22:08:18 +08:00
// obj_setPtr(self, "_client", _client);
2022-11-13 15:16:08 +08:00
/* port to str, and cache to object */
char port_str[10] = {0};
2022-11-22 22:08:18 +08:00
int tmp = 0;
2022-11-22 22:08:18 +08:00
if (strlen(ip) > 0) {
obj_setStr(self, "host_str", ip);
2022-11-22 22:08:18 +08:00
mqtt_set_host(_client, obj_getStr(self, "host_str"));
2022-12-18 22:02:55 +08:00
} else {
__platform_printf("mqtt_init input ip none\r\n");
}
2022-11-22 22:08:18 +08:00
memset(port_str, 0, sizeof(port_str));
2022-11-13 15:16:08 +08:00
__platform_sprintf(port_str, "%d", port);
obj_setStr(self, "port", port_str);
2022-11-22 22:08:18 +08:00
mqtt_set_port(_client, obj_getStr(self, "port"));
2022-11-22 22:08:18 +08:00
if (strlen(clinetID) > 0) {
obj_setStr(self, "id_str", clinetID);
2022-11-22 22:08:18 +08:00
mqtt_set_client_id(_client, obj_getStr(self, "id_str"));
2022-12-18 22:02:55 +08:00
} else {
__platform_printf("mqtt_init input clinetID none\r\n");
}
2022-11-22 22:08:18 +08:00
if (strlen(username) > 0) {
obj_setStr(self, "username_str", username);
2022-11-22 22:08:18 +08:00
mqtt_set_user_name(_client, obj_getStr(self, "username_str"));
2022-12-18 22:02:55 +08:00
} else {
__platform_printf("mqtt_init input username none\r\n");
}
2022-11-22 22:08:18 +08:00
if (strlen(password) > 0) {
obj_setStr(self, "password_str", password);
2022-11-22 22:08:18 +08:00
mqtt_set_password(_client, obj_getStr(self, "password_str"));
2022-12-18 22:02:55 +08:00
} else {
__platform_printf("mqtt_init input password none\r\n");
}
2022-11-22 22:08:18 +08:00
tmp = atoi(version);
2022-11-22 22:08:18 +08:00
if (tmp > 0) {
mqtt_set_version(_client, tmp);
2022-12-18 22:02:55 +08:00
} else {
__platform_printf("mqtt_init input version none\r\n");
}
2022-11-22 22:08:18 +08:00
if (strlen(ca) > 0) {
obj_setStr(self, "ca_str", ca);
2022-11-22 22:08:18 +08:00
mqtt_set_ca(_client, obj_getStr(self, "ca_str"));
2022-12-18 22:02:55 +08:00
} else {
// __platform_printf("mqtt_init input ca none\r\n");
}
2022-11-22 22:08:18 +08:00
if (keepalive > 0) {
mqtt_set_keep_alive_interval(_client, keepalive);
2022-12-18 22:02:55 +08:00
} else {
__platform_printf("mqtt_init input keepalive none\r\n");
}
2022-11-13 15:16:08 +08:00
mqtt_set_clean_session(_client, 1);
2022-11-22 22:08:18 +08:00
obj_setPtr(self, "_client",
2022-12-18 22:02:55 +08:00
_client); // 这里要再保存一次mqtt结构体的内容到python环境
// __platform_printf("Mqtt_Lib buildtime:%s-%s\r\n", __DATE__, __TIME__);
2022-11-13 15:16:08 +08:00
}
////////////////////////////////////////////////////////////////////
// 函 数 名_mqtt__MQTT___del__
// 功能说明:释放对象资源
// 输入参数:
// 返 回 值:无
///////////////////////////////////////////////////////////////////
2022-11-13 15:16:08 +08:00
void _mqtt__MQTT___del__(PikaObj* self) {
mqtt_client_t* _client = obj_getPtr(self, "_client");
2022-11-22 22:08:18 +08:00
if (NULL == _client) {
return;
}
/* disconnect autoly if not disconnected */
int _connected = obj_getInt(self, "_connected");
if (_connected) {
_mqtt__MQTT_disconnect(self);
2022-11-13 15:16:08 +08:00
}
// MQTT_LOG_E("%s:%d %s() >_<", __FILE__, __LINE__, __FUNCTION__);
2022-11-22 22:08:18 +08:00
mqtt_release_free(_client);
2022-11-12 23:28:55 +08:00
}
////////////////////////////////////////////////////////////////////
// 函 数 名_mqtt__MQTT_connect
// 功能说明连接mqtt的服务端
// 输入参数:无
// 返 回 值0=成功非0=错误码
///////////////////////////////////////////////////////////////////
2022-11-22 22:08:18 +08:00
int _mqtt__MQTT_connect(PikaObj* self) {
int ret;
2022-11-22 22:08:18 +08:00
obj_setInt(self, "_connected", 1);
mqtt_client_t* _client = obj_getPtr(self, "_client");
ret = mqtt_connect(_client);
2022-11-22 22:08:18 +08:00
if (ret != 0)
__platform_printf("mqtt connect ERROR! :%d\r\n", ret);
// if (ret == 0)
// __platform_printf("mqtt connect OK\r\n");
return ret;
}
////////////////////////////////////////////////////////////////////
// 函 数 名_mqtt__MQTT_disconnect
// 功能说明:断开 mqtt的连接
// 输入参数:无
// 返 回 值0=成功非0=错误码
///////////////////////////////////////////////////////////////////
2022-11-22 22:08:18 +08:00
int _mqtt__MQTT_disconnect(PikaObj* self) {
int ret;
2022-11-22 22:08:18 +08:00
obj_setInt(self, "_connected", 0);
mqtt_client_t* _client = obj_getPtr(self, "_client");
ret = mqtt_disconnect(_client);
2022-11-22 22:08:18 +08:00
if (ret != 0)
__platform_printf("mqtt disconnect ERROR! :%d\r\n", ret);
// if (ret == 0)
// __platform_printf("mqtt disconnect OK\r\n");
return ret;
}
////////////////////////////////////////////////////////////////////
// 函 数 名_mqtt__MQTT_listSubscribeTopic
// 功能说明:罗列出当前订阅的主题
// 输入参数:无
// 返 回 值:对象指针
///////////////////////////////////////////////////////////////////
PikaObj* _mqtt__MQTT_listSubscribeTopic(PikaObj* self) {
2022-11-23 10:14:48 +08:00
mqtt_client_t* _client = obj_getPtr(self, "_client");
2022-12-27 17:01:56 +08:00
// int i = 0;
mqtt_list_t *curr, *next;
message_handlers_t* msg_handler;
PikaObj* list = NULL;
2022-12-18 22:02:55 +08:00
if (NULL == _client) {
return NULL;
}
if (mqtt_list_is_empty(&_client->mqtt_msg_handler_list)) {
MQTT_LOG_I("%s:%d %s()... there are no subscribed topics...", __FILE__,
__LINE__, __FUNCTION__);
return NULL;
}
/* 创建 list 对象 */
list = newNormalObj(New_PikaStdData_List);
/* 初始化 list */
PikaStdData_List___init__(list);
2022-12-18 22:02:55 +08:00
LIST_FOR_EACH_SAFE(curr, next, &_client->mqtt_msg_handler_list) {
msg_handler = LIST_ENTRY(curr, message_handlers_t, list);
/* determine whether a node already exists by mqtt topic, but wildcards
* are not supported */
if (NULL != msg_handler->topic_filter) {
MQTT_LOG_I("%s:%d %s()...[%d] subscribe topic: %s", __FILE__,
__LINE__, __FUNCTION__, ++i, msg_handler->topic_filter);
2022-12-27 22:53:40 +08:00
// __platform_printf("[%d]subscribe topic: %s\n",++i,
// msg_handler->topic_filter);
/* 用 arg_new<type> 的 api 创建 arg */
Arg* str_arg1 = arg_newStr((char*)msg_handler->topic_filter);
/* 添加到 list 对象 */
PikaStdData_List_append(list, str_arg1);
arg_deinit(str_arg1);
}
}
return list;
}
////////////////////////////////////////////////////////////////////
// 函 数 名_mqtt__MQTT_publish
// 功能说明:发布主题消息
// 输入参数:主题名称,有效数据
// 返 回 值0=成功非0=错误码
///////////////////////////////////////////////////////////////////
2022-12-27 22:53:40 +08:00
int _mqtt__MQTT_publish(PikaObj* self, char* topic, char* payload, int qos) {
2022-11-23 10:14:48 +08:00
int ret;
mqtt_message_t msg;
mqtt_client_t* _client = obj_getPtr(self, "_client");
2022-11-23 21:16:20 +08:00
memset(&msg, 0, sizeof(msg));
2022-11-23 10:14:48 +08:00
2022-12-18 22:02:55 +08:00
if ((qos < 0) || (qos > 2)) {
__platform_printf("input qos error\r\n");
return -1;
}
2022-11-23 21:16:20 +08:00
if (strlen(topic) <= 0) {
2022-11-23 10:14:48 +08:00
__platform_printf("input topic error\r\n");
return -2;
2022-11-23 10:14:48 +08:00
}
2022-11-23 21:16:20 +08:00
if (strlen(payload) <= 0) {
2022-11-23 10:14:48 +08:00
__platform_printf("input payload error\r\n");
return -3;
2022-11-23 10:14:48 +08:00
}
2022-11-23 21:16:20 +08:00
msg.payload = (void*)payload;
msg.qos = qos;
2022-12-18 22:02:55 +08:00
__platform_printf("msg.qos:%d\r\n",
msg.qos); // 这里为了防止被优化,导致运行异常
2022-11-23 21:16:20 +08:00
ret = mqtt_publish(_client, topic, &msg);
if (ret == 0) {
// __platform_printf("MQTT_publish OK\r\n");
2022-12-18 22:02:55 +08:00
} else
__platform_printf("MQTT_publish ERROR\r\n");
2022-11-23 10:14:48 +08:00
return ret;
}
////////////////////////////////////////////////////////////////////
// 函 数 名_mqtt__MQTT_setCa
// 功能说明设置ca值
// 输入参数ca值
// 返 回 值0=成功非0=错误码
///////////////////////////////////////////////////////////////////
2022-11-22 22:08:18 +08:00
int _mqtt__MQTT_setCa(PikaObj* self, char* ca) {
mqtt_client_t* _client = obj_getPtr(self, "_client");
2022-11-22 22:08:18 +08:00
if (ca == NULL) {
__platform_printf("input ca error\r\n");
return -1;
}
2022-11-22 22:08:18 +08:00
if (strlen(ca) <= 0) {
__platform_printf("input ca data error\r\n");
return -2;
}
//__platform_printf("ca_str:%s\r\n",ca);
obj_setStr(self, "ca_str", ca);
2022-11-22 22:08:18 +08:00
mqtt_set_ca(_client, obj_getStr(self, "ca_str"));
// __platform_printf("MQTT_setCa len:%d\r\n", strlen(ca));
return 0;
}
////////////////////////////////////////////////////////////////////
// 函 数 名_mqtt__MQTT_setClientID
// 功能说明设置mqtt客户端的id
// 输入参数id 字符串格式
// 返 回 值0=成功非0=错误码
///////////////////////////////////////////////////////////////////
2022-11-22 22:08:18 +08:00
int _mqtt__MQTT_setClientID(PikaObj* self, char* id) {
mqtt_client_t* _client = obj_getPtr(self, "_client");
2022-11-22 22:08:18 +08:00
if (id == NULL) {
__platform_printf("input id error\r\n");
return -1;
}
2022-11-22 22:08:18 +08:00
if (strlen(id) <= 0) {
__platform_printf("input id data error\r\n");
return -2;
}
obj_setStr(self, "id_str", id);
2022-11-22 22:08:18 +08:00
mqtt_set_client_id(_client, obj_getStr(self, "id_str"));
// __platform_printf("MQTT_setClientID :%s\r\n", id);
return 0;
}
////////////////////////////////////////////////////////////////////
// 函 数 名_mqtt__MQTT_setHost
// 功能说明设置mqtt客户端连接主机的ip或者url
// 输入参数:字符串格式
// 返 回 值0=成功非0=错误码
///////////////////////////////////////////////////////////////////
2022-11-22 22:08:18 +08:00
int _mqtt__MQTT_setHost(PikaObj* self, char* host_url) {
mqtt_client_t* _client = obj_getPtr(self, "_client");
2022-11-22 22:08:18 +08:00
if (host_url == NULL) {
__platform_printf("input host_url error\r\n");
return -1;
}
2022-11-22 22:08:18 +08:00
if (strlen(host_url) <= 0) {
__platform_printf("input host_url data error\r\n");
return -2;
}
2022-12-18 22:02:55 +08:00
obj_setStr(self, "host_str",
host_url); // python 环境创建一个全局变量存放 host
mqtt_set_host(
_client,
obj_getStr(self,
"host_str")); // 从python环境中取出 host的指针 赋值给结构体
// __platform_printf("MQTT_setHost :%s\r\n", host_url);
return 0;
}
////////////////////////////////////////////////////////////////////
// 函 数 名_mqtt__MQTT_setKeepAlive
// 功能说明设置mqtt客户端的 心跳包发送间隔
// 输入参数:字符串格式
// 返 回 值0=成功非0=错误码
///////////////////////////////////////////////////////////////////
2022-12-18 22:02:55 +08:00
int _mqtt__MQTT_setKeepAlive(PikaObj* self, int time) {
mqtt_client_t* _client = obj_getPtr(self, "_client");
int tmp;
// tmp = atoi(time);
tmp = time;
2022-11-22 22:08:18 +08:00
if (tmp > 0) {
mqtt_set_keep_alive_interval(_client, tmp);
2022-11-22 22:08:18 +08:00
} else {
__platform_printf("input keepalive data error \r\n");
return -2;
}
// __platform_printf("MQTT_setKeepAlive :%d\r\n", tmp);
return 0;
}
////////////////////////////////////////////////////////////////////
// 函 数 名_mqtt__MQTT_setPassword
// 功能说明设置mqtt客户端的 密码
// 输入参数:字符串格式
// 返 回 值0=成功非0=错误码
///////////////////////////////////////////////////////////////////
2022-11-22 22:08:18 +08:00
int _mqtt__MQTT_setPassword(PikaObj* self, char* passwd) {
mqtt_client_t* _client = obj_getPtr(self, "_client");
2022-11-22 22:08:18 +08:00
if (passwd == NULL) {
__platform_printf("input passwd error\r\n");
return -1;
}
2022-11-22 22:08:18 +08:00
if (strlen(passwd) <= 0) {
__platform_printf("input passwd data error\r\n");
return -2;
}
obj_setStr(self, "password_str", passwd);
2022-11-22 22:08:18 +08:00
mqtt_set_password(_client, obj_getStr(self, "password_str"));
// __platform_printf("MQTT_setPassword :%s\r\n", passwd);
return 0;
}
////////////////////////////////////////////////////////////////////
// 函 数 名_mqtt__MQTT_setPort
// 功能说明设置mqtt客户端连接主机的端口号
// 输入参数:字符串格式
// 返 回 值0=成功非0=错误码
///////////////////////////////////////////////////////////////////
2022-11-22 22:08:18 +08:00
int _mqtt__MQTT_setPort(PikaObj* self, int port) {
char port_str[10] = {0};
mqtt_client_t* _client = obj_getPtr(self, "_client");
2022-11-22 22:08:18 +08:00
if (port <= 0) {
__platform_printf("input port error\r\n");
return -1;
}
__platform_sprintf(port_str, "%d", port);
obj_setStr(self, "port", port_str);
2022-11-22 22:08:18 +08:00
mqtt_set_port(_client, obj_getStr(self, "port"));
// __platform_printf("MQTT_setPort :%s\r\n", port_str);
2022-11-22 22:08:18 +08:00
return 0;
}
////////////////////////////////////////////////////////////////////
// 函 数 名_mqtt__MQTT_setUsername
// 功能说明设置mqtt客户端的用户名
// 输入参数:字符串格式
// 返 回 值0=成功非0=错误码
///////////////////////////////////////////////////////////////////
2022-11-22 22:08:18 +08:00
int _mqtt__MQTT_setUsername(PikaObj* self, char* name) {
mqtt_client_t* _client = obj_getPtr(self, "_client");
2022-11-22 22:08:18 +08:00
if (name == NULL) {
__platform_printf("input name error\r\n");
return -1;
}
2022-11-22 22:08:18 +08:00
if (strlen(name) <= 0) {
__platform_printf("input name data error\r\n");
return -2;
}
obj_setStr(self, "username_str", name);
2022-11-22 22:08:18 +08:00
mqtt_set_user_name(_client, obj_getStr(self, "username_str"));
// __platform_printf("MQTT_setUsername :%s\r\n", name);
return 0;
}
////////////////////////////////////////////////////////////////////
// 函 数 名_mqtt__MQTT_setVersion
// 功能说明设置mqtt 协议版本
// 输入参数:字符串格式
// 返 回 值0=成功非0=错误码
///////////////////////////////////////////////////////////////////
2022-11-22 22:08:18 +08:00
int _mqtt__MQTT_setVersion(PikaObj* self, char* version) {
mqtt_client_t* _client = obj_getPtr(self, "_client");
// int tmp;
2022-12-18 22:02:55 +08:00
if (version == NULL) {
__platform_printf("input version str error\n");
return -1;
}
2022-12-18 22:02:55 +08:00
if ((strcmp(version, "3.1") == 0) || (strcmp(version, "3.1.1") == 0)) {
2022-12-27 22:53:40 +08:00
if (strcmp(version, "3.1.1") == 0)
mqtt_set_version(_client, 4);
else
mqtt_set_version(_client, 3);
2022-12-18 22:02:55 +08:00
} else {
__platform_printf("input version data error\n");
return -2;
}
// __platform_printf("MQTT_setVersion :%d\r\n", tmp);
return 0;
}
////////////////////////////////////////////////////////////////////
// 函 数 名_mqtt__MQTT_setWill
// 功能说明:设置遗嘱消息,异常断连时会发送这个消息
// 输入参数:
// 返 回 值0=成功非0=错误码
///////////////////////////////////////////////////////////////////
2022-12-18 22:02:55 +08:00
int _mqtt__MQTT_setWill(PikaObj* self,
char* topic,
char* payload,
int qos,
int retain) {
2022-11-23 09:28:31 +08:00
mqtt_client_t* _client = obj_getPtr(self, "_client");
int ret;
char topic_str[MQTT_TOPIC_LEN_MAX];
2022-11-23 09:28:31 +08:00
// __platform_printf("\r\n");
2022-12-18 22:02:55 +08:00
if (topic == NULL) {
__platform_printf("input topic error\r\n");
return -1;
}
2022-12-18 22:02:55 +08:00
if (strlen(topic) <= 0) {
2022-11-23 09:28:31 +08:00
__platform_printf("input topic error\r\n");
return -1;
}
if ((qos < 0) || (qos > 2)) {
__platform_printf("input qos error\r\n");
return -1;
}
2022-12-18 22:02:55 +08:00
if (payload == NULL) {
__platform_printf("input payload error\r\n");
return -1;
}
2022-12-18 22:02:55 +08:00
2022-11-23 09:28:31 +08:00
if (strlen(payload) <= 0) {
__platform_printf("input payload error\r\n");
return -1;
}
2022-12-18 22:02:55 +08:00
// 必须转换成python环境的变量否则函数退出后topic里的是个空指针
memset(topic_str, 0, sizeof(topic_str));
sprintf(topic_str, "%s", topic);
obj_setStr(self, topic_str, topic);
obj_setStr(self, "Will_payload", payload);
2022-12-27 22:53:40 +08:00
// __platform_printf("obj_getStr(self, topic_str) :%s\r\n", obj_getStr(self,
// topic_str));
2022-12-27 17:01:56 +08:00
// __platform_printf("iqos :%d\r\n", qos);
// __platform_printf("retain :%d\r\n", retain);
2022-12-27 22:53:40 +08:00
// __platform_printf("obj_getStr(self, \"Will_payload\") :%s\r\n",
// obj_getStr(self, "Will_payload"));
2022-12-27 17:01:56 +08:00
2022-12-18 22:02:55 +08:00
ret = mqtt_set_will_options(_client, obj_getStr(self, topic_str), qos,
(uint8_t)retain,
obj_getStr(self, "Will_payload"));
2022-11-23 09:28:31 +08:00
if (ret == 0) {
// __platform_printf("MQTT_setWill OK\r\n", topic);
2022-11-23 09:28:31 +08:00
} else
__platform_printf("MQTT_setWill ERROR\r\n");
2022-11-23 09:28:31 +08:00
return 0;
}
////////////////////////////////////////////////////////////////////
// 函 数 名_mqtt__MQTT_subscribe
// 功能说明设置mqtt 订阅主题
// 输入参数:
// 返 回 值0=成功非0=错误码
///////////////////////////////////////////////////////////////////
2022-12-27 22:53:40 +08:00
int _mqtt__MQTT_subscribe(PikaObj* self, char* topic, Arg* cb, int qos) {
mqtt_client_t* _client = obj_getPtr(self, "_client");
int ret;
2022-12-18 22:02:55 +08:00
char topic_str[MQTT_TOPIC_LEN_MAX + 24];
// __platform_printf("topic_str:%s \r\n",topic_str);
2022-12-18 22:02:55 +08:00
if (topic == NULL) {
__platform_printf("input topic error\r\n");
return -1;
}
2022-12-18 22:02:55 +08:00
if ((strlen(topic) > MQTT_TOPIC_LEN_MAX) || (strlen(topic) <= 0)) {
__platform_printf("input topic data error strlen(topic):%d\r\n",
strlen(topic));
return -2;
}
2022-11-22 22:08:18 +08:00
if ((qos < 0) || (qos > 2)) {
__platform_printf("input qos error\r\n");
return -3;
}
2022-12-18 22:02:55 +08:00
// 必须转换成python环境的变量否则函数退出后topic里的是个空指针
memset(topic_str, 0, sizeof(topic_str));
sprintf(topic_str, "%s", topic);
obj_setStr(self, topic_str, topic);
2022-12-18 22:02:55 +08:00
ret = mqtt_subscribe(_client, obj_getStr(self, topic_str), qos,
Subscribe_Handler);
2022-11-22 22:08:18 +08:00
if (ret == 0) {
2022-12-27 22:53:40 +08:00
// __platform_printf("MQTT_subscribe Topic :%s Qos:%d OK\r\n",
// topic,qos);
// 注册mqtt订阅主题的 回调函数
if (cb != NULL) {
char hash_str[32] = {0};
2022-12-27 22:53:40 +08:00
memset(hash_str, 0, sizeof(hash_str));
sprintf(hash_str, "C%d", hash_time33(topic_str));
obj_newDirectObj(self, hash_str,
New_TinyObj); // 新建一个对象来放CB
PikaObj* eventHandler = obj_getPtr(self, hash_str);
obj_setArg(eventHandler, "eventCallBack", cb);
/* init event_listener for the first time */
if (NULL == g_mqtt_event_listener) {
2023-07-06 22:02:30 +08:00
pika_eventListener_init(&g_mqtt_event_listener);
}
uint32_t eventId = hash_time33(topic_str);
2022-12-27 22:53:40 +08:00
// __platform_printf("hash_time33(topic_str):%d
// \r\n",hash_time33(topic_str));
2023-07-06 22:02:30 +08:00
pika_eventListener_registEvent(g_mqtt_event_listener, eventId,
2023-07-20 16:12:33 +08:00
eventHandler);
}
2022-11-22 22:08:18 +08:00
} else
__platform_printf("MQTT_subscribe Topic ERROR\r\n");
2022-12-18 22:02:55 +08:00
return ret;
}
////////////////////////////////////////////////////////////////////
// 函 数 名_mqtt__MQTT_unsubscribe
// 功能说明取消mqtt 订阅主题
// 输入参数:
// 返 回 值0=成功非0=错误码
///////////////////////////////////////////////////////////////////
2022-11-22 22:08:18 +08:00
int _mqtt__MQTT_unsubscribe(PikaObj* self, char* topic) {
2022-11-23 09:28:31 +08:00
mqtt_client_t* _client = obj_getPtr(self, "_client");
int ret;
if (strlen(topic) <= 0) {
__platform_printf("input topic error\r\n");
return -1;
}
2022-11-23 21:16:20 +08:00
ret = mqtt_unsubscribe(_client, topic);
2022-11-23 09:28:31 +08:00
if (ret == 0) {
// __platform_printf("MQTT_unsubscribe :%s OK\r\n", topic);
2022-11-23 09:28:31 +08:00
} else
__platform_printf("MQTT_unsubscribe :%s ERROR\r\n", topic);
2022-11-23 09:28:31 +08:00
return 0;
}
////////////////////////////////////////////////////////////////////
// 函 数 名Subscribe_Handler
// 功能说明mqtt 订阅主题 的回调函数
// 输入参数:
// 返 回 值0=成功非0=错误码
///////////////////////////////////////////////////////////////////
2022-11-23 21:16:20 +08:00
void Subscribe_Handler(void* client, message_data_t* msg) {
2022-12-18 22:02:55 +08:00
char topic_str[MQTT_TOPIC_LEN_MAX + 24];
2022-12-27 22:53:40 +08:00
memset(topic_str, 0, sizeof(topic_str));
if (strlen(msg->topic_name) <= MQTT_TOPIC_LEN_MAX)
sprintf(topic_str, "%s", msg->topic_name);
else {
__platform_printf("Subscribe Topic recv data topic length ERROR\r\n");
2022-12-27 22:53:40 +08:00
return;
}
2022-12-27 22:53:40 +08:00
Arg* evt_obj_arg = arg_newDirectObj(New_TinyObj);
PikaObj* evt_obj = arg_getPtr(evt_obj_arg);
obj_setStr(evt_obj, "topic", (char*)msg->topic_name);
obj_setStr(evt_obj, "msg", (char*)msg->message->payload);
obj_setInt(evt_obj, "qos", msg->message->qos);
2022-12-27 22:53:40 +08:00
// 存好数据后,再发送事件信号,防止信号收到了但是需要传输的数据没准备好
2023-07-06 22:02:30 +08:00
pika_eventListener_send(g_mqtt_event_listener, hash_time33(msg->topic_name),
2023-07-20 16:12:33 +08:00
evt_obj_arg);
// MQTT_LOG_I("\n>>>------------------");
// MQTT_LOG_I("Topic:%s \nlen:%d,message: %s", msg->topic_name,
// (int)msg->message->payloadlen, (char*)msg->message->payload);
// MQTT_LOG_I("------------------<<<");
2022-11-23 21:16:20 +08:00
}
2022-12-27 23:25:03 +08:00
void _mqtt__MQTT__fakeMsg(PikaObj* self, char* topic, int qos, char* msg) {
message_data_t msg_data;
mqtt_message_t mqtt_msg;
mqtt_msg.payload = msg;
mqtt_msg.payloadlen = strlen(msg);
mqtt_msg.qos = qos;
mqtt_msg.retained = 0;
msg_data.message = &mqtt_msg;
strcpy(msg_data.topic_name, topic);
Subscribe_Handler(NULL, &msg_data);
}
////////////////////////////////////////////////////////////////////
// 函 数 名_mqtt___del__
// 功能说明:释放事件处理器
// 输入参数:
// 返 回 值:
///////////////////////////////////////////////////////////////////
void _mqtt___del__(PikaObj* self) {
if (NULL != g_mqtt_event_listener) {
2023-07-06 22:02:30 +08:00
pika_eventListener_deinit(&g_mqtt_event_listener);
}
}
////////////////////////////////////////////////////////////////////
// 函 数 名Reconnect_Handler
2022-12-27 22:53:40 +08:00
// 功能说明mqtt 断开连接后
// 的回调函数这里使用mqttclient库函数的断线重连接口提示发生了mqtt断连的事件
// 输入参数:
// 返 回 值0=成功非0=错误码
///////////////////////////////////////////////////////////////////
2022-12-27 22:53:40 +08:00
void Reconnect_Handler(void* client, void* reconnect_date) {
2022-12-27 17:01:56 +08:00
// PikaObj* self = ((mqtt_client_t*)client)->user_data;
// __platform_printf("Reconnect_Handler\r\n");
2022-12-27 22:53:40 +08:00
if (((mqtt_client_t*)client)->mqtt_client_state != CLIENT_STATE_CONNECTED) {
// 发送事件信号
2023-07-06 22:02:30 +08:00
pika_eventListener_sendSignal(g_mqtt_event_listener,
2023-07-20 16:12:33 +08:00
MQTT_RECONNECTION_EVENT_ID, 1);
2022-12-27 17:01:56 +08:00
}
}
////////////////////////////////////////////////////////////////////
// 函 数 名_mqtt__MQTT_setDisconnectHandler
// 功能说明:设置断开连接的回调函数
// 输入参数:
// 返 回 值:
///////////////////////////////////////////////////////////////////
2022-12-18 22:02:55 +08:00
int _mqtt__MQTT_setDisconnectHandler(PikaObj* self, Arg* cb) {
mqtt_client_t* _client = obj_getPtr(self, "_client");
2022-12-27 22:53:40 +08:00
2022-12-27 17:01:56 +08:00
// __platform_printf("_mqtt__MQTT_setDisconnectHandler\r\n");
2022-12-27 22:53:40 +08:00
// 注册到c库中
mqtt_set_reconnect_handler(_client, Reconnect_Handler);
// char hash_str[32] = {0};
// memset(hash_str,0,sizeof(hash_str));
// sprintf(hash_str,"C%d",hash_time33(topic_str));
// obj_newDirectObj(self,hash_str,New_TinyObj);//新建一个对象来放CB
// PikaObj* eventHandler = obj_getPtr(self,hash_str);
// obj_setArg(eventHandler, "eventCallBack", cb);
2022-12-27 22:53:40 +08:00
obj_setArg(self, "eventCallBack",
cb); // 重连回调是唯一的就直接用self对象
/* init event_listener for the first time */
if (NULL == g_mqtt_event_listener) {
2023-07-06 22:02:30 +08:00
pika_eventListener_init(&g_mqtt_event_listener);
}
// uint32_t eventId = hash_time33(topic_str);
2022-12-27 22:53:40 +08:00
// __platform_printf("hash_time33(topic_str):%d
// \r\n",hash_time33(topic_str));
2023-07-06 22:02:30 +08:00
pika_eventListener_registEvent(g_mqtt_event_listener,
2023-07-20 16:12:33 +08:00
MQTT_RECONNECTION_EVENT_ID, self);
return 0;
2023-07-20 16:12:33 +08:00
}