From 33232b501dc69cd96d4c0607401697d13bc6b593 Mon Sep 17 00:00:00 2001 From: Andrew Gough Date: Mon, 19 Mar 2018 19:31:38 +1100 Subject: [PATCH] Upgrade to espmqttc head --- components/espmqtt | 2 +- components/modules/mqtt.c | 439 +++++++++++++++++++++++--------------- 2 files changed, 266 insertions(+), 175 deletions(-) diff --git a/components/espmqtt b/components/espmqtt index 2967332b..55f04a8e 160000 --- a/components/espmqtt +++ b/components/espmqtt @@ -1 +1 @@ -Subproject commit 2967332b95454d4b53068a0d5484ae60e312eb12 +Subproject commit 55f04a8e61a0e3bacf2b40518d9d522e9b79e615 diff --git a/components/modules/mqtt.c b/components/modules/mqtt.c index 3b2d8c48..9ab4518a 100644 --- a/components/modules/mqtt.c +++ b/components/modules/mqtt.c @@ -5,30 +5,21 @@ #include "platform.h" #include "task/task.h" -#include "mqtt.h" +#include "mqtt_client.h" #include task_handle_t hConn; task_handle_t hOff; task_handle_t hPub; task_handle_t hSub; +task_handle_t hUnsub; task_handle_t hData; -//used as a holder to copy received data from the MQTT task -//to the main Lua task. Due to the async nature of the tasks -//we must copy the data to deliver it intact to the Lua -//callback -typedef struct lmqtt_ctx -{ - mqtt_client * client; - char topic[CONFIG_MQTT_MAX_LWT_TOPIC]; - char * data; -} lmqtt_ctx_t; - +// ------------------------------------------------------------------------- // // locate the C mqtt_client pointer and leave the // Lua instance on the top of the stack -static mqtt_client * get_client( lua_State * L ) +static esp_mqtt_client_handle_t get_client( lua_State * L ) { if( !lua_istable( L, 1 ) ) { @@ -43,14 +34,14 @@ static mqtt_client * get_client( lua_State * L ) return 0; //never reached } - mqtt_client * client = (mqtt_client *) lua_touserdata( L, -1 ); + esp_mqtt_client_handle_t client = (esp_mqtt_client_handle_t) lua_touserdata( L, -1 ); lua_pop( L, 1 ); // just pop the _mqtt field return client; } // locate the C mqtt_settings pointer and leave the // Lua instance on the top of the stack -static mqtt_settings * get_settings( lua_State * L ) +static esp_mqtt_client_config_t * get_settings( lua_State * L ) { if( !lua_istable( L, 1 ) ) { @@ -65,61 +56,112 @@ static mqtt_settings * get_settings( lua_State * L ) return 0; //never reached } - mqtt_settings * settings = (mqtt_settings *) lua_touserdata( L, -1 ); + esp_mqtt_client_config_t * settings = (esp_mqtt_client_config_t *) lua_touserdata( L, -1 ); lua_pop( L, 1 ); // just pop the _mqtt field return settings; } -// Lua: on() -static int lmqtt_on(lua_State *L) +// ------------------------------------------------------------------------- // + +static esp_mqtt_event_handle_t event_clone(esp_mqtt_event_handle_t ev) { - enum events{ - ON_CONNECT = 0, - ON_MESSAGE = 1, - ON_OFFLINE = 2 - }; - const char *const eventnames[] = {"connect", "message", "offline", NULL}; + esp_mqtt_event_handle_t ev1 = (esp_mqtt_event_handle_t) malloc(sizeof(esp_mqtt_event_t)); + memset(ev1, 0, sizeof(esp_mqtt_event_t)); + NODE_DBG("event_clone():malloc: event %p, msg %d\n", ev, ev->msg_id); - // mqtt_settings * settings = get_settings( L ); - int event = luaL_checkoption(L, 2, "message", eventnames); + ev1->event_id = ev->event_id; + ev1->client = ev->client; + ev1->user_context = ev->user_context; + ev1->total_data_len = ev->total_data_len; + ev1->current_data_offset = ev->current_data_offset; + ev1->msg_id = ev->msg_id; - if( !lua_isfunction( L, 3 ) ) - return 0; - - switch (event) { - case ON_CONNECT: - lua_setfield(L, 1, "_on_connect"); - break; - case ON_MESSAGE: - lua_setfield(L, 1, "_on_message"); - break; - case ON_OFFLINE: - lua_setfield(L, 1, "_on_offline"); - break; - default: - return 0; + ev1->data_len = ev->data_len; + if( ev->data != NULL && ev->data_len > 0 ) + { + ev1->data = malloc(ev->data_len + 1); + memcpy(ev1->data, ev->data, ev->data_len); + ev1->data[ev1->data_len] = '\0'; + NODE_DBG("event_clone():malloc: event %p, msg %d, data %p, num %d\n", ev1, ev1->msg_id, ev1->data, ev1->data_len); } - lua_pop(L, 1); //pop event name - return 0; + ev1->topic_len = ev->topic_len; + if( ev->topic != NULL && ev->topic_len > 0 ) + { + ev1->topic = malloc(ev->topic_len + 1); + memcpy(ev1->topic, ev->topic, ev->topic_len); + ev1->topic[ev1->topic_len] = '\0'; + NODE_DBG("event_clone():malloc: event %p, msg %d, topic %p, num %d\n", ev1, ev1->msg_id, ev1->topic, ev1->topic_len); + } + return ev1; +} + +static void event_free(esp_mqtt_event_handle_t ev) +{ + if(ev->data != NULL) + { + NODE_DBG("event_free():free: event %p, msg %d, data %p\n", ev, ev->msg_id, ev->data); + free(ev->data); + } + if(ev->topic != NULL) + { + NODE_DBG("event_free():free: event %p, msg %d, topic %p\n", ev, ev->msg_id, ev->topic); + free(ev->topic); + } + free(ev); +} + +// ------------------------------------------------------------------------- // + +static esp_err_t mqtt_event_handler(esp_mqtt_event_handle_t event) +{ + switch (event->event_id) { + case MQTT_EVENT_CONNECTED: + task_post_medium(hConn, (task_param_t) event_clone(event)); + break; + + case MQTT_EVENT_DISCONNECTED: + task_post_medium( hOff, (task_param_t) event_clone(event)); + break; + + case MQTT_EVENT_SUBSCRIBED: + task_post_medium(hSub, (task_param_t) event_clone(event)); + break; + + case MQTT_EVENT_UNSUBSCRIBED: + task_post_medium(hUnsub, (task_param_t) event_clone(event)); + break; + + case MQTT_EVENT_PUBLISHED: + task_post_medium(hPub, (task_param_t) event_clone(event)); + break; + + case MQTT_EVENT_DATA: + task_post_medium(hData, (task_param_t) event_clone(event)); + break; + + case MQTT_EVENT_ERROR: + break; + } + return ESP_OK; } //typedef void (*task_callback_t)(task_param_t param, task_prio_t prio); -static void lmqtt_connected_cb(task_param_t param, task_prio_t prio) +static void _connected_cb(task_param_t param, task_prio_t prio) { lua_State * L = lua_getstate(); //returns main Lua state if( L == NULL ) return; - mqtt_client * client = (mqtt_client *) param; + esp_mqtt_event_handle_t event = (esp_mqtt_event_handle_t) param; int top = lua_gettop(L); lua_checkstack(L, 8); char key[64]; - snprintf(key, 64, "mqtt_%p", client->settings); + snprintf(key, 64, "mqtt_%p", event->client); lua_getglobal( L, key ); //retrieve MQTT table from _G - NODE_DBG("CB:connect: state %p, settings %p, stack top %d\n", L, client->settings, lua_gettop(L)); + NODE_DBG("CB:connect: state %p, settings %p, stack top %d\n", L, event->client, lua_gettop(L)); lua_getfield( L, -1, "_connect_ok" ); if( lua_isfunction( L, -1 ) ) @@ -152,29 +194,24 @@ static void lmqtt_connected_cb(task_param_t param, task_prio_t prio) NODE_DBG("CB:connect: Error when calling connect callback - (%d) %s\n", res, luaL_checkstring( L, -1 ) ); } lua_settop(L, top); + event_free(event); } -static void connected_cb( mqtt_client *client, mqtt_event_data_t *event_data ) -{ - task_post_medium(hConn, (task_param_t) client); -} - - -static void lmqtt_disconnected_cb(task_param_t param, task_prio_t prio) +static void _disconnected_cb(task_param_t param, task_prio_t prio) { lua_State * L = lua_getstate(); //returns main Lua state if( L == NULL ) return; - mqtt_client * client = (mqtt_client *) param; + esp_mqtt_event_handle_t event = (esp_mqtt_event_handle_t) param; int top = lua_gettop(L); lua_checkstack(L, 8); char key[64]; - snprintf(key, 64, "mqtt_%p", client->settings); + snprintf(key, 64, "mqtt_%p", event->client); lua_getglobal( L, key ); //retrieve MQTT table from _G - NODE_DBG("CB:disconnect: state %p, settings %p, stack top %d\n", L, client->settings, lua_gettop(L)); + NODE_DBG("CB:disconnect: state %p, settings %p, stack top %d\n", L, event->client, lua_gettop(L)); lua_getfield( L, -1, "_connect_nok" ); if( lua_isfunction( L, -1 ) ) @@ -195,7 +232,10 @@ static void lmqtt_disconnected_cb(task_param_t param, task_prio_t prio) // now we check for the standard connect callback registered with 'mqtt:on()' lua_getfield( L, -1, "_on_offline" ); if( !lua_isfunction( L, -1 ) || lua_isnil( L, -1 ) ) + { + event_free(event); return; + } NODE_DBG("CB:disconnect: calling registered standard offline callback\n"); lua_pushvalue( L, -2 ); //dup mqtt table @@ -204,28 +244,24 @@ static void lmqtt_disconnected_cb(task_param_t param, task_prio_t prio) NODE_DBG("CB:disconnect: Error when calling offline callback - (%d) %s\n", res, luaL_checkstring( L, -1 ) ); lua_settop(L, top); + event_free(event); } -static void disconnected_cb( mqtt_client *client, mqtt_event_data_t *event_data ) -{ - task_post_medium( hOff, (task_param_t) client); -} - -static void lmqtt_subscribe_cb(task_param_t param, task_prio_t prio) +static void _subscribe_cb(task_param_t param, task_prio_t prio) { lua_State * L = lua_getstate(); //returns main Lua state if( L == NULL ) return; - mqtt_client * client = (mqtt_client *) param; + esp_mqtt_event_handle_t event = (esp_mqtt_event_handle_t) param; int top = lua_gettop(L); lua_checkstack(L, 8); char key[64]; - snprintf(key, 64, "mqtt_%p", client->settings); + snprintf(key, 64, "mqtt_%p", event->client); lua_getglobal( L, key ); //retrieve MQTT table from _G - NODE_DBG("CB:subscribe: state %p, settings %p, stack top %d\n", L, client->settings, lua_gettop(L)); + NODE_DBG("CB:subscribe: state %p, settings %p, stack top %d\n", L, event->client, lua_gettop(L)); lua_getfield( L, 1, "_subscribe_ok" ); if( lua_isfunction( L, -1 ) ) @@ -240,29 +276,25 @@ static void lmqtt_subscribe_cb(task_param_t param, task_prio_t prio) lua_setfield(L, 1, "_subscribe_ok"); } lua_settop(L, top); + event_free(event); } -static void subscribe_cb( mqtt_client *client, mqtt_event_data_t *event_data ) -{ - task_post_medium(hSub, (task_param_t) client); -} - -static void lmqtt_publish_cb(task_param_t param, task_prio_t prio) +static void _publish_cb(task_param_t param, task_prio_t prio) { NODE_DBG("CB:publish: successfully transferred control back to main task\n"); lua_State * L = lua_getstate(); //returns main Lua state if( L == NULL ) return; - mqtt_client * client = (mqtt_client *) param; + esp_mqtt_event_handle_t event = (esp_mqtt_event_handle_t) param; int top = lua_gettop(L); lua_checkstack(L, 8); char key[64]; - snprintf(key, 64, "mqtt_%p", client->settings); + snprintf(key, 64, "mqtt_%p", event->client); lua_getglobal( L, key ); //retrieve MQTT table from _G - NODE_DBG("CB:publish: state %p, settings %p, stack top %d\n", L, client->settings, lua_gettop(L)); + NODE_DBG("CB:publish: state %p, settings %p, stack top %d\n", L, event->client, lua_gettop(L)); lua_getfield( L, 1, "_publish_ok" ); if( lua_isfunction( L, -1 ) ) @@ -277,39 +309,65 @@ static void lmqtt_publish_cb(task_param_t param, task_prio_t prio) lua_setfield(L, 1, "_publish_ok"); } lua_settop(L, top); + event_free(event); } -static void publish_cb( mqtt_client *client, mqtt_event_data_t *event_data ) -{ - NODE_DBG("CB:publish: transferring control back to main task\n"); - task_post_medium(hPub, (task_param_t) client); -} - -static void lmqtt_data_cb(task_param_t param, task_prio_t prio) +static void _unsubscribe_cb(task_param_t param, task_prio_t prio) { lua_State * L = lua_getstate(); //returns main Lua state if( L == NULL ) return; - lmqtt_ctx_t * ctx = (lmqtt_ctx_t *) param; + esp_mqtt_event_handle_t event = (esp_mqtt_event_handle_t) param; + + int top = lua_gettop(L); + + char key[64]; + snprintf(key, 64, "mqtt_%p", event->client); + lua_getglobal( L, key ); //retrieve MQTT table from _G + NODE_DBG("CB:subscribe: state %p, settings %p, stack top %d\n", L, event->client, lua_gettop(L)); + + lua_getfield( L, 1, "_unsubscribe_ok" ); + if( lua_isfunction( L, -1 ) ) + { + NODE_DBG("CB:unsubscribe: calling registered one-shot unsubscribe callback\n"); + lua_pushvalue( L, 1 ); //dup mqtt table + int res = lua_pcall( L, 1, 0, 0 ); //call the disconnect callback + if( res != 0 ) + NODE_DBG("CB:unsubscribe: Error when calling one-shot unsubscribe callback - (%d) %s\n", res, luaL_checkstring( L, -1 ) ); + + lua_pushnil(L); + lua_setfield(L, 1, "_unsubscribe_ok"); + } + lua_settop(L, top); + event_free(event); +} + +static void _data_cb(task_param_t param, task_prio_t prio) +{ + lua_State * L = lua_getstate(); //returns main Lua state + if( L == NULL ) + return; + + esp_mqtt_event_handle_t event = (esp_mqtt_event_handle_t) param; int top = lua_gettop(L); lua_checkstack(L, 8); char key[64]; - snprintf(key, 64, "mqtt_%p", ctx->client->settings); + snprintf(key, 64, "mqtt_%p", event->client); lua_getglobal( L, key ); //retrieve MQTT table from _G - NODE_DBG("CB:data: state %p, settings %p, stack top %d\n", L, ctx->client->settings, lua_gettop(L)); + NODE_DBG("CB:data: state %p, settings %p, stack top %d\n", L, event->client, lua_gettop(L)); lua_getfield( L, 1, "_on_message" ); if( lua_isfunction( L, -1 ) ) { int numArg = 2; lua_pushvalue( L, 1 ); //dup mqtt table - lua_pushstring( L, ctx->topic ); - if( ctx->data != NULL ) + lua_pushstring( L, event->topic ); + if( event->data != NULL ) { - lua_pushstring( L, ctx->data ); + lua_pushstring( L, event->data ); numArg++; } @@ -319,41 +377,58 @@ static void lmqtt_data_cb(task_param_t param, task_prio_t prio) } lua_settop(L, top); - if( ctx->data != NULL ) - free( ctx->data ); + event_free(event); } -static void data_cb( mqtt_client *client, mqtt_event_data_t *event_data ) +// ------------------------------------------------------------------------- // +// ------------------------------------------------------------------------- // + +// Lua: on() +static int mqtt_on(lua_State *L) { - NODE_DBG("CB:data: topic len %d, data len %d\n", event_data->topic_length, event_data->data_length); + enum events{ + ON_CONNECT = 0, + ON_MESSAGE = 1, + ON_OFFLINE = 2 + }; + const char *const eventnames[] = {"connect", "message", "offline", NULL}; - lmqtt_ctx_t * ctx = malloc(sizeof(lmqtt_ctx_t)); - ctx->client = client; - strncpy(ctx->topic, event_data->topic, event_data->topic_length); - ctx->topic[event_data->topic_length] = '\0'; + // mqtt_settings * settings = get_settings( L ); + int event = luaL_checkoption(L, 2, "message", eventnames); - ctx->data = NULL; - if( event_data->data_length > 0 ) - { - ctx->data = malloc( event_data->data_length ); - strncpy( ctx->data, event_data->data, event_data->data_length ); - ctx->data[event_data->data_length] = '\0'; + if( !lua_isfunction( L, 3 ) ) + return 0; + + switch (event) { + case ON_CONNECT: + lua_setfield(L, 1, "_on_connect"); + break; + case ON_MESSAGE: + lua_setfield(L, 1, "_on_message"); + break; + case ON_OFFLINE: + lua_setfield(L, 1, "_on_offline"); + break; + default: + return 0; } - task_post_medium(hData, (task_param_t) ctx); + lua_pop(L, 1); //pop event name + return 0; } // Lua: mqtt:connect(host[, port[, secure[, autoreconnect]]][, function(client)[, function(client, reason)]]) -static int lmqtt_connect( lua_State* L ) +static int mqtt_connect( lua_State* L ) { - mqtt_settings * settings = get_settings( L ); + esp_mqtt_client_config_t * mqtt_cfg = get_settings( L ); - // int secure = 0; + int secure = 0; int reconnect = 0; const char * host = luaL_checkstring( L, 2 ); int port = 1883; int n = 3; + if( lua_isnumber( L, n ) ) { port = luaL_checknumber( L, n ); @@ -362,7 +437,7 @@ static int lmqtt_connect( lua_State* L ) if( lua_isnumber( L, n ) ) { - // secure = !!luaL_checkinteger( L, -4 ); + secure = !!luaL_checkinteger( L, n ); n++; } @@ -388,42 +463,44 @@ static int lmqtt_connect( lua_State* L ) lua_pop( L, n - 2 ); //pop parameters - strncpy(settings->host, host, CONFIG_MQTT_MAX_HOST_LEN ); - settings->port = port; + strncpy(mqtt_cfg->host, host, MQTT_MAX_HOST_LEN ); + mqtt_cfg->port = port; - settings->auto_reconnect = reconnect != 0; + mqtt_cfg->disable_auto_reconnect = (reconnect == 0); + mqtt_cfg->transport = secure ? MQTT_TRANSPORT_OVER_SSL : MQTT_TRANSPORT_OVER_TCP; - settings->connected_cb = connected_cb; - settings->disconnected_cb = disconnected_cb; - settings->subscribe_cb = subscribe_cb; - settings->publish_cb = publish_cb; - settings->data_cb = data_cb; - - mqtt_client * client = mqtt_start( settings ); + esp_mqtt_client_handle_t client = esp_mqtt_client_init(mqtt_cfg); if( client == NULL ) { luaL_error( L, "MQTT library failed to start" ); return 0; } - NODE_DBG("Created MQTT client @ %p, settings @ %p, state @ %p, top %d \n", client, settings, L, lua_gettop( L ) ); + esp_mqtt_client_start(client); + lua_pushlightuserdata( L, client ); lua_setfield( L, -2, "_client" ); //and store a reference in the MQTT table + char id[32]; + snprintf( id, 32, "mqtt_%p", client); + NODE_DBG("Store MQTT table in _G stack pos %d\n", lua_gettop(L)); + lua_pushvalue( L, 1 ); //make a copy of the table + lua_setglobal( L, id); + return 0; } // Lua: mqtt:close() -static int lmqtt_close( lua_State* L ) +static int mqtt_close( lua_State* L ) { - mqtt_client * client = get_client( L ); + esp_mqtt_client_handle_t client = get_client( L ); if( client == NULL ) return 0; NODE_DBG("Closing MQTT client %p\n", client); char id[64]; - snprintf(id, 64, "mqtt_%p", client->settings); + snprintf(id, 64, "mqtt_%p", client); lua_pushnil( L ); lua_setglobal( L, id ); // remove global reference @@ -435,36 +512,37 @@ static int lmqtt_close( lua_State* L ) } // Lua: mqtt:lwt(topic, message[, qos[, retain]]) -static int lmqtt_lwt( lua_State* L ) +static int mqtt_lwt( lua_State* L ) { - mqtt_settings * settings = get_settings( L ); + esp_mqtt_client_config_t * mqtt_cfg = get_settings( L ); - strncpy( settings->lwt_topic, luaL_checkstring( L, 2 ), CONFIG_MQTT_MAX_LWT_TOPIC ); - strncpy( settings->lwt_msg, luaL_checkstring( L, 3 ), CONFIG_MQTT_MAX_LWT_MSG ); - settings->lwt_msg_len = strlen( settings->lwt_msg ); + strncpy( mqtt_cfg->lwt_topic, luaL_checkstring( L, 2 ), MQTT_MAX_LWT_TOPIC ); + strncpy( mqtt_cfg->lwt_msg, luaL_checkstring( L, 3 ), MQTT_MAX_LWT_MSG ); + mqtt_cfg->lwt_msg_len = strlen( mqtt_cfg->lwt_msg ); int n = 4; if( lua_isnumber( L, n ) ) { - settings->lwt_qos = lua_tonumber( L, n ); + mqtt_cfg->lwt_qos = lua_tonumber( L, n ); n++; } if( lua_isnumber( L, n ) ) { - settings->lwt_retain = lua_tonumber( L, n ); + mqtt_cfg->lwt_retain = lua_tonumber( L, n ); n++; } lua_pop( L, n ); - NODE_DBG("Set LWT topic '%s', qos %d, retain %d, len %d\n", settings->lwt_topic, settings->lwt_qos, settings->lwt_retain, settings->lwt_msg_len); + NODE_DBG("Set LWT topic '%s', qos %d, retain %d, len %d\n", + mqtt_cfg->lwt_topic, mqtt_cfg->lwt_qos, mqtt_cfg->lwt_retain, mqtt_cfg->lwt_msg_len); return 0; } //Lua: mqtt:publish(topic, payload, qos, retain[, function(client)]) -static int lmqtt_publish( lua_State * L ) +static int mqtt_publish( lua_State * L ) { - mqtt_client * client = get_client( L ); + esp_mqtt_client_handle_t client = get_client( L ); int top = lua_gettop(L); @@ -482,15 +560,15 @@ static int lmqtt_publish( lua_State * L ) } lua_settop(L, top ); - NODE_DBG("MQTT publish client id %s, topic %s, %d bytes\n", client->settings->client_id, topic, strlen(data)); - mqtt_publish(client, topic, data, strlen(data), qos, retain); + NODE_DBG("MQTT publish client %p, topic %s, %d bytes\n", client, topic, strlen(data)); + esp_mqtt_client_publish(client, topic, data, strlen(data), qos, retain); return 0; } // Lua: mqtt:subscribe(topic, qos[, function(client)]) OR mqtt:subscribe(table[, function(client)]) -static int lmqtt_subscribe( lua_State* L ) +static int mqtt_subscribe( lua_State* L ) { - mqtt_client * client = get_client( L ); + esp_mqtt_client_handle_t client = get_client( L ); int top = lua_gettop(L); @@ -504,65 +582,82 @@ static int lmqtt_subscribe( lua_State* L ) } lua_settop(L, top ); - NODE_DBG("MQTT subscribe client id %s, topic %s\n", client->settings->client_id, topic); - mqtt_subscribe(client, topic, qos); + NODE_DBG("MQTT subscribe client %p, topic %s\n", client, topic); + esp_mqtt_client_subscribe(client, topic, qos); return 0; } // Lua: mqtt:unsubscribe(topic[, function(client)]) OR mqtt:unsubscribe(table[, function(client)]) -static int lmqtt_unsubscribe( lua_State* L ) +static int mqtt_unsubscribe( lua_State* L ) { - // mqtt_client * client = get_client( L ); + esp_mqtt_client_handle_t client = get_client( L ); + + int top = lua_gettop(L); + + const char * topic = luaL_checkstring( L, 2 ); + int n = 3; + if( lua_isfunction( L, n ) ) + { + lua_pushvalue( L, n ); + lua_setfield(L, 1, "_unsubscribe_ok"); // set _G["_cb_connect_nok"] = fn() + n++; + } + + lua_settop(L, top ); + NODE_DBG("MQTT unsubscribe client %p, topic %s\n", client, topic); + esp_mqtt_client_unsubscribe(client, topic); + return 0; return 0; } -static int lmqtt_delete( lua_State* L ) +static int mqtt_delete( lua_State* L ) { - mqtt_settings * settings = get_settings( L ); + esp_mqtt_client_config_t * settings = get_settings( L ); if( settings != NULL ) free( settings ); - mqtt_client * client = get_client( L ); + esp_mqtt_client_handle_t client = get_client( L ); if( client != NULL ) { - NODE_DBG("stopping MQTT client id %s\n", client->settings->client_id); - mqtt_destroy( client ); + NODE_DBG("stopping MQTT client %p\n", client); + esp_mqtt_client_destroy( client ); free( client ); } return 0; } // Lua: mqtt.Client(clientid, keepalive[, username, password, cleansession]) -static int lmqtt_new( lua_State* L ) +static int mqtt_new( lua_State* L ) { const char * clientid = NULL; - clientid = luaL_checkstring( L, 1 ); NODE_DBG("MQTT client id %s\n", clientid); - mqtt_settings * settings = (mqtt_settings *) malloc( sizeof(mqtt_settings) ); - memset(settings, 0, sizeof(mqtt_settings) ); + esp_mqtt_client_config_t * mqtt_cfg = (esp_mqtt_client_config_t *) malloc(sizeof(esp_mqtt_client_config_t)); + memset(mqtt_cfg, 0, sizeof(esp_mqtt_client_config_t)); - strncpy(settings->client_id, clientid, CONFIG_MQTT_MAX_CLIENT_LEN); - settings->keepalive = luaL_checkinteger( L, 2 ); + mqtt_cfg->event_handle = mqtt_event_handler; + + strncpy(mqtt_cfg->client_id, clientid, MQTT_MAX_CLIENT_LEN); + mqtt_cfg->keepalive = luaL_checkinteger( L, 2 ); int n = 2; if( lua_isstring(L, 3) ) { - strncpy( settings->username, luaL_checkstring( L, 3 ), CONFIG_MQTT_MAX_USERNAME_LEN); + strncpy( mqtt_cfg->username, luaL_checkstring( L, 3 ), MQTT_MAX_USERNAME_LEN); n++; } if( lua_isstring(L, 4) ) { - strncpy(settings->password, luaL_checkstring( L, 4 ), CONFIG_MQTT_MAX_PASSWORD_LEN); + strncpy(mqtt_cfg->password, luaL_checkstring( L, 4 ), MQTT_MAX_PASSWORD_LEN); n++; } if( lua_isnumber(L, 5) ) { - settings->clean_session = luaL_checknumber( L, 5 ); + mqtt_cfg->disable_clean_session = (luaL_checknumber( L, 5 ) == 0); n++; } lua_pop( L, n ); //remove parameters @@ -570,47 +665,42 @@ static int lmqtt_new( lua_State* L ) lua_newtable( L ); NODE_DBG("New MQTT table at stack pos %d\n", lua_gettop(L)); - lua_pushlightuserdata( L, settings ); + lua_pushlightuserdata( L, mqtt_cfg ); lua_setfield( L, -2, "_settings" ); // set t["_mqtt"] = client - lua_pushcfunction( L, lmqtt_connect ); + lua_pushcfunction( L, mqtt_connect ); lua_setfield( L, -2, "connect" ); // set t["connect"] = lmqtt_connect - lua_pushcfunction( L, lmqtt_close ); + lua_pushcfunction( L, mqtt_close ); lua_setfield( L, -2, "close" ); // set t["close"] = lmqtt_close - lua_pushcfunction( L, lmqtt_lwt ); + lua_pushcfunction( L, mqtt_lwt ); lua_setfield( L, -2, "lwt" ); // set t["lwt"] = lmqtt_lwt - lua_pushcfunction( L, lmqtt_publish ); + lua_pushcfunction( L, mqtt_publish ); lua_setfield( L, -2, "publish" ); // set t["publish"] = lmqtt_publish - lua_pushcfunction( L, lmqtt_subscribe ); + lua_pushcfunction( L, mqtt_subscribe ); lua_setfield( L, -2, "subscribe" ); // set t["subscribe"] = lmqtt_subscribe - lua_pushcfunction( L, lmqtt_unsubscribe ); + lua_pushcfunction( L, mqtt_unsubscribe ); lua_setfield( L, -2, "unsubscribe" ); // set t["unsubscribe"] = lmqtt_unsubscribe - lua_pushcfunction( L, lmqtt_on ); + lua_pushcfunction( L, mqtt_on ); lua_setfield( L, -2, "on" ); // set t["on"] = lmqtt_on - lua_pushcfunction( L, lmqtt_delete ); + lua_pushcfunction( L, mqtt_delete ); lua_setfield( L, -2, "__gc" ); // set t["__gc"] = lmqtt_delete lua_pushvalue( L, 1 ); //make a copy of the table lua_setmetatable( L, -2 ); - char id[32]; - snprintf( id, 32, "mqtt_%p", settings ); - NODE_DBG("Store MQTT table in _G stack pos %d\n", lua_gettop(L)); - lua_pushvalue( L, 1 ); //make a copy of the table - lua_setglobal( L, id); - - hConn = task_get_id(lmqtt_connected_cb); - hOff = task_get_id(lmqtt_disconnected_cb); - hPub = task_get_id(lmqtt_publish_cb); - hSub = task_get_id(lmqtt_subscribe_cb); - hData = task_get_id(lmqtt_data_cb); + hConn = task_get_id(_connected_cb); + hOff = task_get_id(_disconnected_cb); + hPub = task_get_id(_publish_cb); + hSub = task_get_id(_subscribe_cb); + hUnsub = task_get_id(_unsubscribe_cb); + hData = task_get_id(_data_cb); NODE_DBG("conn %d, off %d, pub %d, sub %d, data %d\n", hConn, hOff, hPub, hSub, hData); return 1; //leave table on top of the stack @@ -619,8 +709,9 @@ static int lmqtt_new( lua_State* L ) // Module function map static const LUA_REG_TYPE mqtt_map[] = { - { LSTRKEY( "Client" ), LFUNCVAL( lmqtt_new ) }, + { LSTRKEY( "Client" ), LFUNCVAL( mqtt_new ) }, { LNILKEY, LNILVAL } }; NODEMCU_MODULE(MQTT, "mqtt", mqtt_map, NULL); +