Upgrade to espmqttc head

This commit is contained in:
Andrew Gough 2018-03-19 19:31:38 +11:00
parent 4207d6c8ad
commit 33232b501d
2 changed files with 266 additions and 175 deletions

@ -1 +1 @@
Subproject commit 2967332b95454d4b53068a0d5484ae60e312eb12 Subproject commit 55f04a8e61a0e3bacf2b40518d9d522e9b79e615

View File

@ -5,30 +5,21 @@
#include "platform.h" #include "platform.h"
#include "task/task.h" #include "task/task.h"
#include "mqtt.h" #include "mqtt_client.h"
#include <string.h> #include <string.h>
task_handle_t hConn; task_handle_t hConn;
task_handle_t hOff; task_handle_t hOff;
task_handle_t hPub; task_handle_t hPub;
task_handle_t hSub; task_handle_t hSub;
task_handle_t hUnsub;
task_handle_t hData; task_handle_t hData;
//used as a holder to copy received data from the MQTT task // ------------------------------------------------------------------------- //
//to the main Lua task. Due to the async nature of the tasks
//we must copy the data to deliver it intact to the Lua
//callback
typedef struct lmqtt_ctx
{
mqtt_client * client;
char topic[CONFIG_MQTT_MAX_LWT_TOPIC];
char * data;
} lmqtt_ctx_t;
// locate the C mqtt_client pointer and leave the // locate the C mqtt_client pointer and leave the
// Lua instance on the top of the stack // Lua instance on the top of the stack
static mqtt_client * get_client( lua_State * L ) static esp_mqtt_client_handle_t get_client( lua_State * L )
{ {
if( !lua_istable( L, 1 ) ) if( !lua_istable( L, 1 ) )
{ {
@ -43,14 +34,14 @@ static mqtt_client * get_client( lua_State * L )
return 0; //never reached return 0; //never reached
} }
mqtt_client * client = (mqtt_client *) lua_touserdata( L, -1 ); esp_mqtt_client_handle_t client = (esp_mqtt_client_handle_t) lua_touserdata( L, -1 );
lua_pop( L, 1 ); // just pop the _mqtt field lua_pop( L, 1 ); // just pop the _mqtt field
return client; return client;
} }
// locate the C mqtt_settings pointer and leave the // locate the C mqtt_settings pointer and leave the
// Lua instance on the top of the stack // Lua instance on the top of the stack
static mqtt_settings * get_settings( lua_State * L ) static esp_mqtt_client_config_t * get_settings( lua_State * L )
{ {
if( !lua_istable( L, 1 ) ) if( !lua_istable( L, 1 ) )
{ {
@ -65,61 +56,112 @@ static mqtt_settings * get_settings( lua_State * L )
return 0; //never reached return 0; //never reached
} }
mqtt_settings * settings = (mqtt_settings *) lua_touserdata( L, -1 ); esp_mqtt_client_config_t * settings = (esp_mqtt_client_config_t *) lua_touserdata( L, -1 );
lua_pop( L, 1 ); // just pop the _mqtt field lua_pop( L, 1 ); // just pop the _mqtt field
return settings; return settings;
} }
// Lua: on() // ------------------------------------------------------------------------- //
static int lmqtt_on(lua_State *L)
static esp_mqtt_event_handle_t event_clone(esp_mqtt_event_handle_t ev)
{ {
enum events{ esp_mqtt_event_handle_t ev1 = (esp_mqtt_event_handle_t) malloc(sizeof(esp_mqtt_event_t));
ON_CONNECT = 0, memset(ev1, 0, sizeof(esp_mqtt_event_t));
ON_MESSAGE = 1, NODE_DBG("event_clone():malloc: event %p, msg %d\n", ev, ev->msg_id);
ON_OFFLINE = 2
};
const char *const eventnames[] = {"connect", "message", "offline", NULL};
// mqtt_settings * settings = get_settings( L ); ev1->event_id = ev->event_id;
int event = luaL_checkoption(L, 2, "message", eventnames); ev1->client = ev->client;
ev1->user_context = ev->user_context;
ev1->total_data_len = ev->total_data_len;
ev1->current_data_offset = ev->current_data_offset;
ev1->msg_id = ev->msg_id;
if( !lua_isfunction( L, 3 ) ) ev1->data_len = ev->data_len;
return 0; if( ev->data != NULL && ev->data_len > 0 )
{
switch (event) { ev1->data = malloc(ev->data_len + 1);
case ON_CONNECT: memcpy(ev1->data, ev->data, ev->data_len);
lua_setfield(L, 1, "_on_connect"); ev1->data[ev1->data_len] = '\0';
break; NODE_DBG("event_clone():malloc: event %p, msg %d, data %p, num %d\n", ev1, ev1->msg_id, ev1->data, ev1->data_len);
case ON_MESSAGE:
lua_setfield(L, 1, "_on_message");
break;
case ON_OFFLINE:
lua_setfield(L, 1, "_on_offline");
break;
default:
return 0;
} }
lua_pop(L, 1); //pop event name ev1->topic_len = ev->topic_len;
return 0; if( ev->topic != NULL && ev->topic_len > 0 )
{
ev1->topic = malloc(ev->topic_len + 1);
memcpy(ev1->topic, ev->topic, ev->topic_len);
ev1->topic[ev1->topic_len] = '\0';
NODE_DBG("event_clone():malloc: event %p, msg %d, topic %p, num %d\n", ev1, ev1->msg_id, ev1->topic, ev1->topic_len);
}
return ev1;
}
static void event_free(esp_mqtt_event_handle_t ev)
{
if(ev->data != NULL)
{
NODE_DBG("event_free():free: event %p, msg %d, data %p\n", ev, ev->msg_id, ev->data);
free(ev->data);
}
if(ev->topic != NULL)
{
NODE_DBG("event_free():free: event %p, msg %d, topic %p\n", ev, ev->msg_id, ev->topic);
free(ev->topic);
}
free(ev);
}
// ------------------------------------------------------------------------- //
static esp_err_t mqtt_event_handler(esp_mqtt_event_handle_t event)
{
switch (event->event_id) {
case MQTT_EVENT_CONNECTED:
task_post_medium(hConn, (task_param_t) event_clone(event));
break;
case MQTT_EVENT_DISCONNECTED:
task_post_medium( hOff, (task_param_t) event_clone(event));
break;
case MQTT_EVENT_SUBSCRIBED:
task_post_medium(hSub, (task_param_t) event_clone(event));
break;
case MQTT_EVENT_UNSUBSCRIBED:
task_post_medium(hUnsub, (task_param_t) event_clone(event));
break;
case MQTT_EVENT_PUBLISHED:
task_post_medium(hPub, (task_param_t) event_clone(event));
break;
case MQTT_EVENT_DATA:
task_post_medium(hData, (task_param_t) event_clone(event));
break;
case MQTT_EVENT_ERROR:
break;
}
return ESP_OK;
} }
//typedef void (*task_callback_t)(task_param_t param, task_prio_t prio); //typedef void (*task_callback_t)(task_param_t param, task_prio_t prio);
static void lmqtt_connected_cb(task_param_t param, task_prio_t prio) static void _connected_cb(task_param_t param, task_prio_t prio)
{ {
lua_State * L = lua_getstate(); //returns main Lua state lua_State * L = lua_getstate(); //returns main Lua state
if( L == NULL ) if( L == NULL )
return; return;
mqtt_client * client = (mqtt_client *) param; esp_mqtt_event_handle_t event = (esp_mqtt_event_handle_t) param;
int top = lua_gettop(L); int top = lua_gettop(L);
lua_checkstack(L, 8); lua_checkstack(L, 8);
char key[64]; char key[64];
snprintf(key, 64, "mqtt_%p", client->settings); snprintf(key, 64, "mqtt_%p", event->client);
lua_getglobal( L, key ); //retrieve MQTT table from _G lua_getglobal( L, key ); //retrieve MQTT table from _G
NODE_DBG("CB:connect: state %p, settings %p, stack top %d\n", L, client->settings, lua_gettop(L)); NODE_DBG("CB:connect: state %p, settings %p, stack top %d\n", L, event->client, lua_gettop(L));
lua_getfield( L, -1, "_connect_ok" ); lua_getfield( L, -1, "_connect_ok" );
if( lua_isfunction( L, -1 ) ) if( lua_isfunction( L, -1 ) )
@ -152,29 +194,24 @@ static void lmqtt_connected_cb(task_param_t param, task_prio_t prio)
NODE_DBG("CB:connect: Error when calling connect callback - (%d) %s\n", res, luaL_checkstring( L, -1 ) ); NODE_DBG("CB:connect: Error when calling connect callback - (%d) %s\n", res, luaL_checkstring( L, -1 ) );
} }
lua_settop(L, top); lua_settop(L, top);
event_free(event);
} }
static void connected_cb( mqtt_client *client, mqtt_event_data_t *event_data ) static void _disconnected_cb(task_param_t param, task_prio_t prio)
{
task_post_medium(hConn, (task_param_t) client);
}
static void lmqtt_disconnected_cb(task_param_t param, task_prio_t prio)
{ {
lua_State * L = lua_getstate(); //returns main Lua state lua_State * L = lua_getstate(); //returns main Lua state
if( L == NULL ) if( L == NULL )
return; return;
mqtt_client * client = (mqtt_client *) param; esp_mqtt_event_handle_t event = (esp_mqtt_event_handle_t) param;
int top = lua_gettop(L); int top = lua_gettop(L);
lua_checkstack(L, 8); lua_checkstack(L, 8);
char key[64]; char key[64];
snprintf(key, 64, "mqtt_%p", client->settings); snprintf(key, 64, "mqtt_%p", event->client);
lua_getglobal( L, key ); //retrieve MQTT table from _G lua_getglobal( L, key ); //retrieve MQTT table from _G
NODE_DBG("CB:disconnect: state %p, settings %p, stack top %d\n", L, client->settings, lua_gettop(L)); NODE_DBG("CB:disconnect: state %p, settings %p, stack top %d\n", L, event->client, lua_gettop(L));
lua_getfield( L, -1, "_connect_nok" ); lua_getfield( L, -1, "_connect_nok" );
if( lua_isfunction( L, -1 ) ) if( lua_isfunction( L, -1 ) )
@ -195,7 +232,10 @@ static void lmqtt_disconnected_cb(task_param_t param, task_prio_t prio)
// now we check for the standard connect callback registered with 'mqtt:on()' // now we check for the standard connect callback registered with 'mqtt:on()'
lua_getfield( L, -1, "_on_offline" ); lua_getfield( L, -1, "_on_offline" );
if( !lua_isfunction( L, -1 ) || lua_isnil( L, -1 ) ) if( !lua_isfunction( L, -1 ) || lua_isnil( L, -1 ) )
{
event_free(event);
return; return;
}
NODE_DBG("CB:disconnect: calling registered standard offline callback\n"); NODE_DBG("CB:disconnect: calling registered standard offline callback\n");
lua_pushvalue( L, -2 ); //dup mqtt table lua_pushvalue( L, -2 ); //dup mqtt table
@ -204,28 +244,24 @@ static void lmqtt_disconnected_cb(task_param_t param, task_prio_t prio)
NODE_DBG("CB:disconnect: Error when calling offline callback - (%d) %s\n", res, luaL_checkstring( L, -1 ) ); NODE_DBG("CB:disconnect: Error when calling offline callback - (%d) %s\n", res, luaL_checkstring( L, -1 ) );
lua_settop(L, top); lua_settop(L, top);
event_free(event);
} }
static void disconnected_cb( mqtt_client *client, mqtt_event_data_t *event_data ) static void _subscribe_cb(task_param_t param, task_prio_t prio)
{
task_post_medium( hOff, (task_param_t) client);
}
static void lmqtt_subscribe_cb(task_param_t param, task_prio_t prio)
{ {
lua_State * L = lua_getstate(); //returns main Lua state lua_State * L = lua_getstate(); //returns main Lua state
if( L == NULL ) if( L == NULL )
return; return;
mqtt_client * client = (mqtt_client *) param; esp_mqtt_event_handle_t event = (esp_mqtt_event_handle_t) param;
int top = lua_gettop(L); int top = lua_gettop(L);
lua_checkstack(L, 8); lua_checkstack(L, 8);
char key[64]; char key[64];
snprintf(key, 64, "mqtt_%p", client->settings); snprintf(key, 64, "mqtt_%p", event->client);
lua_getglobal( L, key ); //retrieve MQTT table from _G lua_getglobal( L, key ); //retrieve MQTT table from _G
NODE_DBG("CB:subscribe: state %p, settings %p, stack top %d\n", L, client->settings, lua_gettop(L)); NODE_DBG("CB:subscribe: state %p, settings %p, stack top %d\n", L, event->client, lua_gettop(L));
lua_getfield( L, 1, "_subscribe_ok" ); lua_getfield( L, 1, "_subscribe_ok" );
if( lua_isfunction( L, -1 ) ) if( lua_isfunction( L, -1 ) )
@ -240,29 +276,25 @@ static void lmqtt_subscribe_cb(task_param_t param, task_prio_t prio)
lua_setfield(L, 1, "_subscribe_ok"); lua_setfield(L, 1, "_subscribe_ok");
} }
lua_settop(L, top); lua_settop(L, top);
event_free(event);
} }
static void subscribe_cb( mqtt_client *client, mqtt_event_data_t *event_data ) static void _publish_cb(task_param_t param, task_prio_t prio)
{
task_post_medium(hSub, (task_param_t) client);
}
static void lmqtt_publish_cb(task_param_t param, task_prio_t prio)
{ {
NODE_DBG("CB:publish: successfully transferred control back to main task\n"); NODE_DBG("CB:publish: successfully transferred control back to main task\n");
lua_State * L = lua_getstate(); //returns main Lua state lua_State * L = lua_getstate(); //returns main Lua state
if( L == NULL ) if( L == NULL )
return; return;
mqtt_client * client = (mqtt_client *) param; esp_mqtt_event_handle_t event = (esp_mqtt_event_handle_t) param;
int top = lua_gettop(L); int top = lua_gettop(L);
lua_checkstack(L, 8); lua_checkstack(L, 8);
char key[64]; char key[64];
snprintf(key, 64, "mqtt_%p", client->settings); snprintf(key, 64, "mqtt_%p", event->client);
lua_getglobal( L, key ); //retrieve MQTT table from _G lua_getglobal( L, key ); //retrieve MQTT table from _G
NODE_DBG("CB:publish: state %p, settings %p, stack top %d\n", L, client->settings, lua_gettop(L)); NODE_DBG("CB:publish: state %p, settings %p, stack top %d\n", L, event->client, lua_gettop(L));
lua_getfield( L, 1, "_publish_ok" ); lua_getfield( L, 1, "_publish_ok" );
if( lua_isfunction( L, -1 ) ) if( lua_isfunction( L, -1 ) )
@ -277,39 +309,65 @@ static void lmqtt_publish_cb(task_param_t param, task_prio_t prio)
lua_setfield(L, 1, "_publish_ok"); lua_setfield(L, 1, "_publish_ok");
} }
lua_settop(L, top); lua_settop(L, top);
event_free(event);
} }
static void publish_cb( mqtt_client *client, mqtt_event_data_t *event_data ) static void _unsubscribe_cb(task_param_t param, task_prio_t prio)
{
NODE_DBG("CB:publish: transferring control back to main task\n");
task_post_medium(hPub, (task_param_t) client);
}
static void lmqtt_data_cb(task_param_t param, task_prio_t prio)
{ {
lua_State * L = lua_getstate(); //returns main Lua state lua_State * L = lua_getstate(); //returns main Lua state
if( L == NULL ) if( L == NULL )
return; return;
lmqtt_ctx_t * ctx = (lmqtt_ctx_t *) param; esp_mqtt_event_handle_t event = (esp_mqtt_event_handle_t) param;
int top = lua_gettop(L);
char key[64];
snprintf(key, 64, "mqtt_%p", event->client);
lua_getglobal( L, key ); //retrieve MQTT table from _G
NODE_DBG("CB:subscribe: state %p, settings %p, stack top %d\n", L, event->client, lua_gettop(L));
lua_getfield( L, 1, "_unsubscribe_ok" );
if( lua_isfunction( L, -1 ) )
{
NODE_DBG("CB:unsubscribe: calling registered one-shot unsubscribe callback\n");
lua_pushvalue( L, 1 ); //dup mqtt table
int res = lua_pcall( L, 1, 0, 0 ); //call the disconnect callback
if( res != 0 )
NODE_DBG("CB:unsubscribe: Error when calling one-shot unsubscribe callback - (%d) %s\n", res, luaL_checkstring( L, -1 ) );
lua_pushnil(L);
lua_setfield(L, 1, "_unsubscribe_ok");
}
lua_settop(L, top);
event_free(event);
}
static void _data_cb(task_param_t param, task_prio_t prio)
{
lua_State * L = lua_getstate(); //returns main Lua state
if( L == NULL )
return;
esp_mqtt_event_handle_t event = (esp_mqtt_event_handle_t) param;
int top = lua_gettop(L); int top = lua_gettop(L);
lua_checkstack(L, 8); lua_checkstack(L, 8);
char key[64]; char key[64];
snprintf(key, 64, "mqtt_%p", ctx->client->settings); snprintf(key, 64, "mqtt_%p", event->client);
lua_getglobal( L, key ); //retrieve MQTT table from _G lua_getglobal( L, key ); //retrieve MQTT table from _G
NODE_DBG("CB:data: state %p, settings %p, stack top %d\n", L, ctx->client->settings, lua_gettop(L)); NODE_DBG("CB:data: state %p, settings %p, stack top %d\n", L, event->client, lua_gettop(L));
lua_getfield( L, 1, "_on_message" ); lua_getfield( L, 1, "_on_message" );
if( lua_isfunction( L, -1 ) ) if( lua_isfunction( L, -1 ) )
{ {
int numArg = 2; int numArg = 2;
lua_pushvalue( L, 1 ); //dup mqtt table lua_pushvalue( L, 1 ); //dup mqtt table
lua_pushstring( L, ctx->topic ); lua_pushstring( L, event->topic );
if( ctx->data != NULL ) if( event->data != NULL )
{ {
lua_pushstring( L, ctx->data ); lua_pushstring( L, event->data );
numArg++; numArg++;
} }
@ -319,41 +377,58 @@ static void lmqtt_data_cb(task_param_t param, task_prio_t prio)
} }
lua_settop(L, top); lua_settop(L, top);
if( ctx->data != NULL ) event_free(event);
free( ctx->data );
} }
static void data_cb( mqtt_client *client, mqtt_event_data_t *event_data ) // ------------------------------------------------------------------------- //
{ // ------------------------------------------------------------------------- //
NODE_DBG("CB:data: topic len %d, data len %d\n", event_data->topic_length, event_data->data_length);
lmqtt_ctx_t * ctx = malloc(sizeof(lmqtt_ctx_t)); // Lua: on()
ctx->client = client; static int mqtt_on(lua_State *L)
strncpy(ctx->topic, event_data->topic, event_data->topic_length);
ctx->topic[event_data->topic_length] = '\0';
ctx->data = NULL;
if( event_data->data_length > 0 )
{ {
ctx->data = malloc( event_data->data_length ); enum events{
strncpy( ctx->data, event_data->data, event_data->data_length ); ON_CONNECT = 0,
ctx->data[event_data->data_length] = '\0'; ON_MESSAGE = 1,
ON_OFFLINE = 2
};
const char *const eventnames[] = {"connect", "message", "offline", NULL};
// mqtt_settings * settings = get_settings( L );
int event = luaL_checkoption(L, 2, "message", eventnames);
if( !lua_isfunction( L, 3 ) )
return 0;
switch (event) {
case ON_CONNECT:
lua_setfield(L, 1, "_on_connect");
break;
case ON_MESSAGE:
lua_setfield(L, 1, "_on_message");
break;
case ON_OFFLINE:
lua_setfield(L, 1, "_on_offline");
break;
default:
return 0;
} }
task_post_medium(hData, (task_param_t) ctx); lua_pop(L, 1); //pop event name
return 0;
} }
// Lua: mqtt:connect(host[, port[, secure[, autoreconnect]]][, function(client)[, function(client, reason)]]) // Lua: mqtt:connect(host[, port[, secure[, autoreconnect]]][, function(client)[, function(client, reason)]])
static int lmqtt_connect( lua_State* L ) static int mqtt_connect( lua_State* L )
{ {
mqtt_settings * settings = get_settings( L ); esp_mqtt_client_config_t * mqtt_cfg = get_settings( L );
// int secure = 0; int secure = 0;
int reconnect = 0; int reconnect = 0;
const char * host = luaL_checkstring( L, 2 ); const char * host = luaL_checkstring( L, 2 );
int port = 1883; int port = 1883;
int n = 3; int n = 3;
if( lua_isnumber( L, n ) ) if( lua_isnumber( L, n ) )
{ {
port = luaL_checknumber( L, n ); port = luaL_checknumber( L, n );
@ -362,7 +437,7 @@ static int lmqtt_connect( lua_State* L )
if( lua_isnumber( L, n ) ) if( lua_isnumber( L, n ) )
{ {
// secure = !!luaL_checkinteger( L, -4 ); secure = !!luaL_checkinteger( L, n );
n++; n++;
} }
@ -388,42 +463,44 @@ static int lmqtt_connect( lua_State* L )
lua_pop( L, n - 2 ); //pop parameters lua_pop( L, n - 2 ); //pop parameters
strncpy(settings->host, host, CONFIG_MQTT_MAX_HOST_LEN ); strncpy(mqtt_cfg->host, host, MQTT_MAX_HOST_LEN );
settings->port = port; mqtt_cfg->port = port;
settings->auto_reconnect = reconnect != 0; mqtt_cfg->disable_auto_reconnect = (reconnect == 0);
mqtt_cfg->transport = secure ? MQTT_TRANSPORT_OVER_SSL : MQTT_TRANSPORT_OVER_TCP;
settings->connected_cb = connected_cb; esp_mqtt_client_handle_t client = esp_mqtt_client_init(mqtt_cfg);
settings->disconnected_cb = disconnected_cb;
settings->subscribe_cb = subscribe_cb;
settings->publish_cb = publish_cb;
settings->data_cb = data_cb;
mqtt_client * client = mqtt_start( settings );
if( client == NULL ) if( client == NULL )
{ {
luaL_error( L, "MQTT library failed to start" ); luaL_error( L, "MQTT library failed to start" );
return 0; return 0;
} }
NODE_DBG("Created MQTT client @ %p, settings @ %p, state @ %p, top %d \n", client, settings, L, lua_gettop( L ) ); esp_mqtt_client_start(client);
lua_pushlightuserdata( L, client ); lua_pushlightuserdata( L, client );
lua_setfield( L, -2, "_client" ); //and store a reference in the MQTT table lua_setfield( L, -2, "_client" ); //and store a reference in the MQTT table
char id[32];
snprintf( id, 32, "mqtt_%p", client);
NODE_DBG("Store MQTT table in _G stack pos %d\n", lua_gettop(L));
lua_pushvalue( L, 1 ); //make a copy of the table
lua_setglobal( L, id);
return 0; return 0;
} }
// Lua: mqtt:close() // Lua: mqtt:close()
static int lmqtt_close( lua_State* L ) static int mqtt_close( lua_State* L )
{ {
mqtt_client * client = get_client( L ); esp_mqtt_client_handle_t client = get_client( L );
if( client == NULL ) if( client == NULL )
return 0; return 0;
NODE_DBG("Closing MQTT client %p\n", client); NODE_DBG("Closing MQTT client %p\n", client);
char id[64]; char id[64];
snprintf(id, 64, "mqtt_%p", client->settings); snprintf(id, 64, "mqtt_%p", client);
lua_pushnil( L ); lua_pushnil( L );
lua_setglobal( L, id ); // remove global reference lua_setglobal( L, id ); // remove global reference
@ -435,36 +512,37 @@ static int lmqtt_close( lua_State* L )
} }
// Lua: mqtt:lwt(topic, message[, qos[, retain]]) // Lua: mqtt:lwt(topic, message[, qos[, retain]])
static int lmqtt_lwt( lua_State* L ) static int mqtt_lwt( lua_State* L )
{ {
mqtt_settings * settings = get_settings( L ); esp_mqtt_client_config_t * mqtt_cfg = get_settings( L );
strncpy( settings->lwt_topic, luaL_checkstring( L, 2 ), CONFIG_MQTT_MAX_LWT_TOPIC ); strncpy( mqtt_cfg->lwt_topic, luaL_checkstring( L, 2 ), MQTT_MAX_LWT_TOPIC );
strncpy( settings->lwt_msg, luaL_checkstring( L, 3 ), CONFIG_MQTT_MAX_LWT_MSG ); strncpy( mqtt_cfg->lwt_msg, luaL_checkstring( L, 3 ), MQTT_MAX_LWT_MSG );
settings->lwt_msg_len = strlen( settings->lwt_msg ); mqtt_cfg->lwt_msg_len = strlen( mqtt_cfg->lwt_msg );
int n = 4; int n = 4;
if( lua_isnumber( L, n ) ) if( lua_isnumber( L, n ) )
{ {
settings->lwt_qos = lua_tonumber( L, n ); mqtt_cfg->lwt_qos = lua_tonumber( L, n );
n++; n++;
} }
if( lua_isnumber( L, n ) ) if( lua_isnumber( L, n ) )
{ {
settings->lwt_retain = lua_tonumber( L, n ); mqtt_cfg->lwt_retain = lua_tonumber( L, n );
n++; n++;
} }
lua_pop( L, n ); lua_pop( L, n );
NODE_DBG("Set LWT topic '%s', qos %d, retain %d, len %d\n", settings->lwt_topic, settings->lwt_qos, settings->lwt_retain, settings->lwt_msg_len); NODE_DBG("Set LWT topic '%s', qos %d, retain %d, len %d\n",
mqtt_cfg->lwt_topic, mqtt_cfg->lwt_qos, mqtt_cfg->lwt_retain, mqtt_cfg->lwt_msg_len);
return 0; return 0;
} }
//Lua: mqtt:publish(topic, payload, qos, retain[, function(client)]) //Lua: mqtt:publish(topic, payload, qos, retain[, function(client)])
static int lmqtt_publish( lua_State * L ) static int mqtt_publish( lua_State * L )
{ {
mqtt_client * client = get_client( L ); esp_mqtt_client_handle_t client = get_client( L );
int top = lua_gettop(L); int top = lua_gettop(L);
@ -482,15 +560,15 @@ static int lmqtt_publish( lua_State * L )
} }
lua_settop(L, top ); lua_settop(L, top );
NODE_DBG("MQTT publish client id %s, topic %s, %d bytes\n", client->settings->client_id, topic, strlen(data)); NODE_DBG("MQTT publish client %p, topic %s, %d bytes\n", client, topic, strlen(data));
mqtt_publish(client, topic, data, strlen(data), qos, retain); esp_mqtt_client_publish(client, topic, data, strlen(data), qos, retain);
return 0; return 0;
} }
// Lua: mqtt:subscribe(topic, qos[, function(client)]) OR mqtt:subscribe(table[, function(client)]) // Lua: mqtt:subscribe(topic, qos[, function(client)]) OR mqtt:subscribe(table[, function(client)])
static int lmqtt_subscribe( lua_State* L ) static int mqtt_subscribe( lua_State* L )
{ {
mqtt_client * client = get_client( L ); esp_mqtt_client_handle_t client = get_client( L );
int top = lua_gettop(L); int top = lua_gettop(L);
@ -504,65 +582,82 @@ static int lmqtt_subscribe( lua_State* L )
} }
lua_settop(L, top ); lua_settop(L, top );
NODE_DBG("MQTT subscribe client id %s, topic %s\n", client->settings->client_id, topic); NODE_DBG("MQTT subscribe client %p, topic %s\n", client, topic);
mqtt_subscribe(client, topic, qos); esp_mqtt_client_subscribe(client, topic, qos);
return 0; return 0;
} }
// Lua: mqtt:unsubscribe(topic[, function(client)]) OR mqtt:unsubscribe(table[, function(client)]) // Lua: mqtt:unsubscribe(topic[, function(client)]) OR mqtt:unsubscribe(table[, function(client)])
static int lmqtt_unsubscribe( lua_State* L ) static int mqtt_unsubscribe( lua_State* L )
{ {
// mqtt_client * client = get_client( L ); esp_mqtt_client_handle_t client = get_client( L );
int top = lua_gettop(L);
const char * topic = luaL_checkstring( L, 2 );
int n = 3;
if( lua_isfunction( L, n ) )
{
lua_pushvalue( L, n );
lua_setfield(L, 1, "_unsubscribe_ok"); // set _G["_cb_connect_nok"] = fn()
n++;
}
lua_settop(L, top );
NODE_DBG("MQTT unsubscribe client %p, topic %s\n", client, topic);
esp_mqtt_client_unsubscribe(client, topic);
return 0;
return 0; return 0;
} }
static int lmqtt_delete( lua_State* L ) static int mqtt_delete( lua_State* L )
{ {
mqtt_settings * settings = get_settings( L ); esp_mqtt_client_config_t * settings = get_settings( L );
if( settings != NULL ) if( settings != NULL )
free( settings ); free( settings );
mqtt_client * client = get_client( L ); esp_mqtt_client_handle_t client = get_client( L );
if( client != NULL ) if( client != NULL )
{ {
NODE_DBG("stopping MQTT client id %s\n", client->settings->client_id); NODE_DBG("stopping MQTT client %p\n", client);
mqtt_destroy( client ); esp_mqtt_client_destroy( client );
free( client ); free( client );
} }
return 0; return 0;
} }
// Lua: mqtt.Client(clientid, keepalive[, username, password, cleansession]) // Lua: mqtt.Client(clientid, keepalive[, username, password, cleansession])
static int lmqtt_new( lua_State* L ) static int mqtt_new( lua_State* L )
{ {
const char * clientid = NULL; const char * clientid = NULL;
clientid = luaL_checkstring( L, 1 ); clientid = luaL_checkstring( L, 1 );
NODE_DBG("MQTT client id %s\n", clientid); NODE_DBG("MQTT client id %s\n", clientid);
mqtt_settings * settings = (mqtt_settings *) malloc( sizeof(mqtt_settings) ); esp_mqtt_client_config_t * mqtt_cfg = (esp_mqtt_client_config_t *) malloc(sizeof(esp_mqtt_client_config_t));
memset(settings, 0, sizeof(mqtt_settings) ); memset(mqtt_cfg, 0, sizeof(esp_mqtt_client_config_t));
strncpy(settings->client_id, clientid, CONFIG_MQTT_MAX_CLIENT_LEN); mqtt_cfg->event_handle = mqtt_event_handler;
settings->keepalive = luaL_checkinteger( L, 2 );
strncpy(mqtt_cfg->client_id, clientid, MQTT_MAX_CLIENT_LEN);
mqtt_cfg->keepalive = luaL_checkinteger( L, 2 );
int n = 2; int n = 2;
if( lua_isstring(L, 3) ) if( lua_isstring(L, 3) )
{ {
strncpy( settings->username, luaL_checkstring( L, 3 ), CONFIG_MQTT_MAX_USERNAME_LEN); strncpy( mqtt_cfg->username, luaL_checkstring( L, 3 ), MQTT_MAX_USERNAME_LEN);
n++; n++;
} }
if( lua_isstring(L, 4) ) if( lua_isstring(L, 4) )
{ {
strncpy(settings->password, luaL_checkstring( L, 4 ), CONFIG_MQTT_MAX_PASSWORD_LEN); strncpy(mqtt_cfg->password, luaL_checkstring( L, 4 ), MQTT_MAX_PASSWORD_LEN);
n++; n++;
} }
if( lua_isnumber(L, 5) ) if( lua_isnumber(L, 5) )
{ {
settings->clean_session = luaL_checknumber( L, 5 ); mqtt_cfg->disable_clean_session = (luaL_checknumber( L, 5 ) == 0);
n++; n++;
} }
lua_pop( L, n ); //remove parameters lua_pop( L, n ); //remove parameters
@ -570,47 +665,42 @@ static int lmqtt_new( lua_State* L )
lua_newtable( L ); lua_newtable( L );
NODE_DBG("New MQTT table at stack pos %d\n", lua_gettop(L)); NODE_DBG("New MQTT table at stack pos %d\n", lua_gettop(L));
lua_pushlightuserdata( L, settings ); lua_pushlightuserdata( L, mqtt_cfg );
lua_setfield( L, -2, "_settings" ); // set t["_mqtt"] = client lua_setfield( L, -2, "_settings" ); // set t["_mqtt"] = client
lua_pushcfunction( L, lmqtt_connect ); lua_pushcfunction( L, mqtt_connect );
lua_setfield( L, -2, "connect" ); // set t["connect"] = lmqtt_connect lua_setfield( L, -2, "connect" ); // set t["connect"] = lmqtt_connect
lua_pushcfunction( L, lmqtt_close ); lua_pushcfunction( L, mqtt_close );
lua_setfield( L, -2, "close" ); // set t["close"] = lmqtt_close lua_setfield( L, -2, "close" ); // set t["close"] = lmqtt_close
lua_pushcfunction( L, lmqtt_lwt ); lua_pushcfunction( L, mqtt_lwt );
lua_setfield( L, -2, "lwt" ); // set t["lwt"] = lmqtt_lwt lua_setfield( L, -2, "lwt" ); // set t["lwt"] = lmqtt_lwt
lua_pushcfunction( L, lmqtt_publish ); lua_pushcfunction( L, mqtt_publish );
lua_setfield( L, -2, "publish" ); // set t["publish"] = lmqtt_publish lua_setfield( L, -2, "publish" ); // set t["publish"] = lmqtt_publish
lua_pushcfunction( L, lmqtt_subscribe ); lua_pushcfunction( L, mqtt_subscribe );
lua_setfield( L, -2, "subscribe" ); // set t["subscribe"] = lmqtt_subscribe lua_setfield( L, -2, "subscribe" ); // set t["subscribe"] = lmqtt_subscribe
lua_pushcfunction( L, lmqtt_unsubscribe ); lua_pushcfunction( L, mqtt_unsubscribe );
lua_setfield( L, -2, "unsubscribe" ); // set t["unsubscribe"] = lmqtt_unsubscribe lua_setfield( L, -2, "unsubscribe" ); // set t["unsubscribe"] = lmqtt_unsubscribe
lua_pushcfunction( L, lmqtt_on ); lua_pushcfunction( L, mqtt_on );
lua_setfield( L, -2, "on" ); // set t["on"] = lmqtt_on lua_setfield( L, -2, "on" ); // set t["on"] = lmqtt_on
lua_pushcfunction( L, lmqtt_delete ); lua_pushcfunction( L, mqtt_delete );
lua_setfield( L, -2, "__gc" ); // set t["__gc"] = lmqtt_delete lua_setfield( L, -2, "__gc" ); // set t["__gc"] = lmqtt_delete
lua_pushvalue( L, 1 ); //make a copy of the table lua_pushvalue( L, 1 ); //make a copy of the table
lua_setmetatable( L, -2 ); lua_setmetatable( L, -2 );
char id[32]; hConn = task_get_id(_connected_cb);
snprintf( id, 32, "mqtt_%p", settings ); hOff = task_get_id(_disconnected_cb);
NODE_DBG("Store MQTT table in _G stack pos %d\n", lua_gettop(L)); hPub = task_get_id(_publish_cb);
lua_pushvalue( L, 1 ); //make a copy of the table hSub = task_get_id(_subscribe_cb);
lua_setglobal( L, id); hUnsub = task_get_id(_unsubscribe_cb);
hData = task_get_id(_data_cb);
hConn = task_get_id(lmqtt_connected_cb);
hOff = task_get_id(lmqtt_disconnected_cb);
hPub = task_get_id(lmqtt_publish_cb);
hSub = task_get_id(lmqtt_subscribe_cb);
hData = task_get_id(lmqtt_data_cb);
NODE_DBG("conn %d, off %d, pub %d, sub %d, data %d\n", hConn, hOff, hPub, hSub, hData); NODE_DBG("conn %d, off %d, pub %d, sub %d, data %d\n", hConn, hOff, hPub, hSub, hData);
return 1; //leave table on top of the stack return 1; //leave table on top of the stack
@ -619,8 +709,9 @@ static int lmqtt_new( lua_State* L )
// Module function map // Module function map
static const LUA_REG_TYPE mqtt_map[] = { static const LUA_REG_TYPE mqtt_map[] = {
{ LSTRKEY( "Client" ), LFUNCVAL( lmqtt_new ) }, { LSTRKEY( "Client" ), LFUNCVAL( mqtt_new ) },
{ LNILKEY, LNILVAL } { LNILKEY, LNILVAL }
}; };
NODEMCU_MODULE(MQTT, "mqtt", mqtt_map, NULL); NODEMCU_MODULE(MQTT, "mqtt", mqtt_map, NULL);