Merge pull request #1123 from jfollas/mqtt-connack

MQTT Client - CONNACK processing
This commit is contained in:
Philip Gladstone 2016-03-18 21:53:22 -04:00
commit 3d508591e0
3 changed files with 156 additions and 19 deletions

View File

@ -58,6 +58,7 @@ typedef struct lmqtt_userdata
struct espconn *pesp_conn; struct espconn *pesp_conn;
int self_ref; int self_ref;
int cb_connect_ref; int cb_connect_ref;
int cb_connect_fail_ref;
int cb_disconnect_ref; int cb_disconnect_ref;
int cb_message_ref; int cb_message_ref;
int cb_suback_ref; int cb_suback_ref;
@ -77,6 +78,7 @@ typedef struct lmqtt_userdata
static sint8 socket_connect(struct espconn *pesp_conn); static sint8 socket_connect(struct espconn *pesp_conn);
static void mqtt_socket_reconnected(void *arg, sint8_t err); static void mqtt_socket_reconnected(void *arg, sint8_t err);
static void mqtt_socket_connected(void *arg); static void mqtt_socket_connected(void *arg);
static void mqtt_connack_fail(lmqtt_userdata * mud, int reason_code);
static void mqtt_socket_disconnected(void *arg) // tcp only static void mqtt_socket_disconnected(void *arg) // tcp only
{ {
@ -193,6 +195,22 @@ static void deliver_publish(lmqtt_userdata * mud, uint8_t* message, int length)
NODE_DBG("leave deliver_publish.\n"); NODE_DBG("leave deliver_publish.\n");
} }
static void mqtt_connack_fail(lmqtt_userdata * mud, int reason_code)
{
if(mud->cb_connect_fail_ref == LUA_NOREF || mud->self_ref == LUA_NOREF)
{
return;
}
lua_State *L = lua_getstate();
lua_rawgeti(L, LUA_REGISTRYINDEX, mud->cb_connect_fail_ref);
lua_rawgeti(L, LUA_REGISTRYINDEX, mud->self_ref); // pass the userdata(client) to callback func in lua
lua_pushinteger(L, reason_code);
lua_call(L, 2, 0);
}
static sint8 mqtt_send_if_possible(struct espconn *pesp_conn) static sint8 mqtt_send_if_possible(struct espconn *pesp_conn)
{ {
if(pesp_conn == NULL) if(pesp_conn == NULL)
@ -253,10 +271,13 @@ READPACKET:
uint8_t temp_buffer[MQTT_BUF_SIZE]; uint8_t temp_buffer[MQTT_BUF_SIZE];
mqtt_msg_init(&mud->mqtt_state.mqtt_connection, temp_buffer, MQTT_BUF_SIZE); mqtt_msg_init(&mud->mqtt_state.mqtt_connection, temp_buffer, MQTT_BUF_SIZE);
mqtt_message_t *temp_msg = NULL; mqtt_message_t *temp_msg = NULL;
lua_State *L = lua_getstate(); lua_State *L = lua_getstate();
switch(mud->connState){ switch(mud->connState){
case MQTT_CONNECT_SENDING: case MQTT_CONNECT_SENDING:
case MQTT_CONNECT_SENT: case MQTT_CONNECT_SENT:
mud->event_timeout = 0;
if(mqtt_get_type(in_buffer) != MQTT_MSG_TYPE_CONNACK){ if(mqtt_get_type(in_buffer) != MQTT_MSG_TYPE_CONNACK){
NODE_DBG("MQTT: Invalid packet\r\n"); NODE_DBG("MQTT: Invalid packet\r\n");
mud->connState = MQTT_INIT; mud->connState = MQTT_INIT;
@ -270,6 +291,31 @@ READPACKET:
{ {
espconn_disconnect(pesp_conn); espconn_disconnect(pesp_conn);
} }
mqtt_connack_fail(mud, MQTT_CONN_FAIL_NOT_A_CONNACK_MSG);
break;
} else if (mqtt_get_connect_ret_code(in_buffer) != MQTT_CONNACK_ACCEPTED) {
NODE_DBG("MQTT: CONNACK REFUSED (CODE: %d)\n", mqtt_get_connect_ret_code(in_buffer));
mud->connState = MQTT_INIT;
#ifdef CLIENT_SSL_ENABLE
if(mud->secure)
{
espconn_secure_disconnect(pesp_conn);
}
else
#endif
{
espconn_disconnect(pesp_conn);
}
mqtt_connack_fail(mud, mqtt_get_connect_ret_code(in_buffer));
break;
} else { } else {
mud->connState = MQTT_DATA; mud->connState = MQTT_DATA;
NODE_DBG("MQTT: Connected\r\n"); NODE_DBG("MQTT: Connected\r\n");
@ -352,7 +398,7 @@ READPACKET:
case MQTT_MSG_TYPE_PUBREC: case MQTT_MSG_TYPE_PUBREC:
if(pending_msg && pending_msg->msg_type == MQTT_MSG_TYPE_PUBLISH && pending_msg->msg_id == msg_id){ if(pending_msg && pending_msg->msg_type == MQTT_MSG_TYPE_PUBLISH && pending_msg->msg_id == msg_id){
NODE_DBG("MQTT: Publish with QoS = 2 Received PUBREC\r\n"); NODE_DBG("MQTT: Publish with QoS = 2 Received PUBREC\r\n");
// Note: actrually, should not destroy the msg until PUBCOMP is received. // Note: actually, should not destroy the msg until PUBCOMP is received.
msg_destroy(msg_dequeue(&(mud->mqtt_state.pending_msg_q))); msg_destroy(msg_dequeue(&(mud->mqtt_state.pending_msg_q)));
temp_msg = mqtt_msg_pubrel(&mud->mqtt_state.mqtt_connection, msg_id); temp_msg = mqtt_msg_pubrel(&mud->mqtt_state.mqtt_connection, msg_id);
msg_enqueue(&(mud->mqtt_state.pending_msg_q), temp_msg, msg_enqueue(&(mud->mqtt_state.pending_msg_q), temp_msg,
@ -435,6 +481,7 @@ static void mqtt_socket_sent(void *arg)
if(mud->connState == MQTT_CONNECT_SENDING){ if(mud->connState == MQTT_CONNECT_SENDING){
mud->connState = MQTT_CONNECT_SENT; mud->connState = MQTT_CONNECT_SENT;
mud->event_timeout = MQTT_SEND_TIMEOUT;
// MQTT_CONNECT not queued. // MQTT_CONNECT not queued.
return; return;
} }
@ -534,10 +581,13 @@ void mqtt_socket_timer(void *arg)
if(mud->connState == MQTT_INIT){ // socket connect time out. if(mud->connState == MQTT_INIT){ // socket connect time out.
NODE_DBG("Can not connect to broker.\n"); NODE_DBG("Can not connect to broker.\n");
// Never goes here. os_timer_disarm(&mud->mqttTimer);
mqtt_connack_fail(mud, MQTT_CONN_FAIL_SERVER_NOT_FOUND);
} else if(mud->connState == MQTT_CONNECT_SENDING){ // MQTT_CONNECT send time out. } else if(mud->connState == MQTT_CONNECT_SENDING){ // MQTT_CONNECT send time out.
NODE_DBG("sSend MQTT_CONNECT failed.\n"); NODE_DBG("sSend MQTT_CONNECT failed.\n");
mud->connState = MQTT_INIT; mud->connState = MQTT_INIT;
mqtt_connack_fail(mud, MQTT_CONN_FAIL_TIMEOUT_SENDING);
#ifdef CLIENT_SSL_ENABLE #ifdef CLIENT_SSL_ENABLE
if(mud->secure) if(mud->secure)
{ {
@ -549,8 +599,21 @@ void mqtt_socket_timer(void *arg)
espconn_disconnect(mud->pesp_conn); espconn_disconnect(mud->pesp_conn);
} }
mud->keep_alive_tick = 0; // not need count anymore mud->keep_alive_tick = 0; // not need count anymore
} else if(mud->connState == MQTT_CONNECT_SENT){ // wait for CONACK time out. } else if(mud->connState == MQTT_CONNECT_SENT) { // wait for CONACK time out.
NODE_DBG("MQTT_CONNECT failed.\n"); NODE_DBG("MQTT_CONNECT timeout.\n");
mud->connState == MQTT_INIT;
#ifdef CLIENT_SSL_ENABLE
if(mud->secure)
{
espconn_secure_disconnect(mud->pesp_conn);
}
else
#endif
{
espconn_disconnect(mud->pesp_conn);
}
mqtt_connack_fail(mud, MQTT_CONN_FAIL_TIMEOUT_RECEIVING);
} else if(mud->connState == MQTT_DATA){ } else if(mud->connState == MQTT_DATA){
msg_queue_t *pending_msg = msg_peek(&(mud->mqtt_state.pending_msg_q)); msg_queue_t *pending_msg = msg_peek(&(mud->mqtt_state.pending_msg_q));
if(pending_msg){ if(pending_msg){
@ -597,6 +660,7 @@ static int mqtt_socket_client( lua_State* L )
// pre-initialize it, in case of errors // pre-initialize it, in case of errors
mud->self_ref = LUA_NOREF; mud->self_ref = LUA_NOREF;
mud->cb_connect_ref = LUA_NOREF; mud->cb_connect_ref = LUA_NOREF;
mud->cb_connect_fail_ref = LUA_NOREF;
mud->cb_disconnect_ref = LUA_NOREF; mud->cb_disconnect_ref = LUA_NOREF;
mud->cb_message_ref = LUA_NOREF; mud->cb_message_ref = LUA_NOREF;
@ -641,7 +705,7 @@ static int mqtt_socket_client( lua_State* L )
} }
if(username == NULL) if(username == NULL)
unl = 0; unl = 0;
NODE_DBG("lengh username: %d\r\n", unl); NODE_DBG("length username: %d\r\n", unl);
if(lua_isstring( L, stack )){ if(lua_isstring( L, stack )){
password = luaL_checklstring( L, stack, &pwl ); password = luaL_checklstring( L, stack, &pwl );
@ -649,7 +713,7 @@ static int mqtt_socket_client( lua_State* L )
} }
if(password == NULL) if(password == NULL)
pwl = 0; pwl = 0;
NODE_DBG("lengh password: %d\r\n", pwl); NODE_DBG("length password: %d\r\n", pwl);
if(lua_isnumber( L, stack )) if(lua_isnumber( L, stack ))
{ {
@ -763,6 +827,10 @@ static int mqtt_delete( lua_State* L )
luaL_unref(L, LUA_REGISTRYINDEX, mud->cb_connect_ref); luaL_unref(L, LUA_REGISTRYINDEX, mud->cb_connect_ref);
mud->cb_connect_ref = LUA_NOREF; mud->cb_connect_ref = LUA_NOREF;
} }
if(LUA_NOREF!=mud->cb_connect_fail_ref){
luaL_unref(L, LUA_REGISTRYINDEX, mud->cb_connect_fail_ref);
mud->cb_connect_fail_ref = LUA_NOREF;
}
if(LUA_NOREF!=mud->cb_disconnect_ref){ if(LUA_NOREF!=mud->cb_disconnect_ref){
luaL_unref(L, LUA_REGISTRYINDEX, mud->cb_disconnect_ref); luaL_unref(L, LUA_REGISTRYINDEX, mud->cb_disconnect_ref);
mud->cb_disconnect_ref = LUA_NOREF; mud->cb_disconnect_ref = LUA_NOREF;
@ -791,7 +859,9 @@ static int mqtt_delete( lua_State* L )
static sint8 socket_connect(struct espconn *pesp_conn) static sint8 socket_connect(struct espconn *pesp_conn)
{ {
NODE_DBG("enter socket_connect.\n"); NODE_DBG("enter socket_connect.\n");
sint8 espconn_status; sint8 espconn_status;
if(pesp_conn == NULL) if(pesp_conn == NULL)
@ -847,6 +917,15 @@ static sint8 socket_dns_found(const char *name, ip_addr_t *ipaddr, void *arg)
if( dns_reconn_count >= 5 ){ if( dns_reconn_count >= 5 ){
NODE_ERR( "DNS Fail!\n" ); NODE_ERR( "DNS Fail!\n" );
// Note: should delete the pesp_conn or unref self_ref here. // Note: should delete the pesp_conn or unref self_ref here.
struct espconn *pesp_conn = arg;
if(pesp_conn != NULL) {
lmqtt_userdata *mud = (lmqtt_userdata *)pesp_conn->reverse;
if(mud != NULL) {
mqtt_connack_fail(mud, MQTT_CONN_FAIL_DNS);
}
}
mqtt_socket_disconnected(arg); // although not connected, but fire disconnect callback to release every thing. mqtt_socket_disconnected(arg); // although not connected, but fire disconnect callback to release every thing.
return -1; return -1;
} }
@ -870,7 +949,7 @@ static sint8 socket_dns_found(const char *name, ip_addr_t *ipaddr, void *arg)
return espconn_status; return espconn_status;
} }
// Lua: mqtt:connect( host, port, secure, auto_reconnect, function(client) ) // Lua: mqtt:connect( host, port, secure, auto_reconnect, function(client), function(client, connect_return_code) )
static int mqtt_socket_connect( lua_State* L ) static int mqtt_socket_connect( lua_State* L )
{ {
NODE_DBG("enter mqtt_socket_connect.\n"); NODE_DBG("enter mqtt_socket_connect.\n");
@ -987,6 +1066,15 @@ static int mqtt_socket_connect( lua_State* L )
stack++; stack++;
} }
// call back function when a connection fails
if ((stack<=top) && (lua_type(L, stack) == LUA_TFUNCTION || lua_type(L, stack) == LUA_TLIGHTFUNCTION)){
lua_pushvalue(L, stack); // copy argument (func) to the top of stack
if(mud->cb_connect_fail_ref != LUA_NOREF)
luaL_unref(L, LUA_REGISTRYINDEX, mud->cb_connect_fail_ref);
mud->cb_connect_fail_ref = luaL_ref(L, LUA_REGISTRYINDEX);
stack++;
}
lua_pushvalue(L, 1); // copy userdata to the top of stack lua_pushvalue(L, 1); // copy userdata to the top of stack
if(mud->self_ref != LUA_NOREF) if(mud->self_ref != LUA_NOREF)
luaL_unref(L, LUA_REGISTRYINDEX, mud->self_ref); luaL_unref(L, LUA_REGISTRYINDEX, mud->self_ref);
@ -1213,6 +1301,7 @@ static int mqtt_socket_subscribe( lua_State* L ) {
msg_id, MQTT_MSG_TYPE_SUBSCRIBE, (int)mqtt_get_qos(temp_msg->data) ); msg_id, MQTT_MSG_TYPE_SUBSCRIBE, (int)mqtt_get_qos(temp_msg->data) );
NODE_DBG("topic: %s - id: %d - qos: %d, length: %d\n", topic, node->msg_id, node->publish_qos, node->msg.length); NODE_DBG("topic: %s - id: %d - qos: %d, length: %d\n", topic, node->msg_id, node->publish_qos, node->msg.length);
NODE_DBG("msg_size: %d, event_timeout: %d\n", msg_size(&(mud->mqtt_state.pending_msg_q)), mud->event_timeout);
sint8 espconn_status = ESPCONN_IF; sint8 espconn_status = ESPCONN_IF;
@ -1393,8 +1482,22 @@ static const LUA_REG_TYPE mqtt_socket_map[] = {
{ LNILKEY, LNILVAL } { LNILKEY, LNILVAL }
}; };
static const LUA_REG_TYPE mqtt_map[] = { static const LUA_REG_TYPE mqtt_map[] = {
{ LSTRKEY( "Client" ), LFUNCVAL( mqtt_socket_client ) }, { LSTRKEY( "Client" ), LFUNCVAL( mqtt_socket_client ) },
{ LSTRKEY( "CONN_FAIL_SERVER_NOT_FOUND" ), LNUMVAL( MQTT_CONN_FAIL_SERVER_NOT_FOUND ) },
{ LSTRKEY( "CONN_FAIL_NOT_A_CONNACK_MSG" ), LNUMVAL( MQTT_CONN_FAIL_NOT_A_CONNACK_MSG ) },
{ LSTRKEY( "CONN_FAIL_DNS" ), LNUMVAL( MQTT_CONN_FAIL_DNS ) },
{ LSTRKEY( "CONN_FAIL_TIMEOUT_RECEIVING" ), LNUMVAL( MQTT_CONN_FAIL_TIMEOUT_RECEIVING ) },
{ LSTRKEY( "CONN_FAIL_TIMEOUT_SENDING" ), LNUMVAL( MQTT_CONN_FAIL_TIMEOUT_SENDING ) },
{ LSTRKEY( "CONNACK_ACCEPTED" ), LNUMVAL( MQTT_CONNACK_ACCEPTED ) },
{ LSTRKEY( "CONNACK_REFUSED_PROTOCOL_VER" ), LNUMVAL( MQTT_CONNACK_REFUSED_PROTOCOL_VER ) },
{ LSTRKEY( "CONNACK_REFUSED_ID_REJECTED" ), LNUMVAL( MQTT_CONNACK_REFUSED_ID_REJECTED ) },
{ LSTRKEY( "CONNACK_REFUSED_SERVER_UNAVAILABLE" ), LNUMVAL( MQTT_CONNACK_REFUSED_SERVER_UNAVAILABLE ) },
{ LSTRKEY( "CONNACK_REFUSED_BAD_USER_OR_PASS" ), LNUMVAL( MQTT_CONNACK_REFUSED_BAD_USER_OR_PASS ) },
{ LSTRKEY( "CONNACK_REFUSED_NOT_AUTHORIZED" ), LNUMVAL( MQTT_CONNACK_REFUSED_NOT_AUTHORIZED ) },
{ LSTRKEY( "__metatable" ), LROVAL( mqtt_map ) }, { LSTRKEY( "__metatable" ), LROVAL( mqtt_map ) },
{ LNILKEY, LNILVAL } { LNILKEY, LNILVAL }
}; };

View File

@ -65,6 +65,21 @@ enum mqtt_message_type
MQTT_MSG_TYPE_DISCONNECT = 14 MQTT_MSG_TYPE_DISCONNECT = 14
}; };
enum mqtt_connack_return_code
{
MQTT_CONN_FAIL_SERVER_NOT_FOUND = -5,
MQTT_CONN_FAIL_NOT_A_CONNACK_MSG = -4,
MQTT_CONN_FAIL_DNS = -3,
MQTT_CONN_FAIL_TIMEOUT_RECEIVING = -2,
MQTT_CONN_FAIL_TIMEOUT_SENDING = -1,
MQTT_CONNACK_ACCEPTED = 0,
MQTT_CONNACK_REFUSED_PROTOCOL_VER = 1,
MQTT_CONNACK_REFUSED_ID_REJECTED = 2,
MQTT_CONNACK_REFUSED_SERVER_UNAVAILABLE = 3,
MQTT_CONNACK_REFUSED_BAD_USER_OR_PASS = 4,
MQTT_CONNACK_REFUSED_NOT_AUTHORIZED = 5
};
typedef struct mqtt_message typedef struct mqtt_message
{ {
uint8_t* data; uint8_t* data;
@ -101,6 +116,7 @@ static inline int mqtt_get_type(uint8_t* buffer) { return (buffer[0] & 0xf0) >>
static inline int mqtt_get_dup(uint8_t* buffer) { return (buffer[0] & 0x08) >> 3; } static inline int mqtt_get_dup(uint8_t* buffer) { return (buffer[0] & 0x08) >> 3; }
static inline int mqtt_get_qos(uint8_t* buffer) { return (buffer[0] & 0x06) >> 1; } static inline int mqtt_get_qos(uint8_t* buffer) { return (buffer[0] & 0x06) >> 1; }
static inline int mqtt_get_retain(uint8_t* buffer) { return (buffer[0] & 0x01); } static inline int mqtt_get_retain(uint8_t* buffer) { return (buffer[0] & 0x01); }
static inline int mqtt_get_connect_ret_code(uint8_t* buffer) { return (buffer[3]); }
void mqtt_msg_init(mqtt_connection_t* connection, uint8_t* buffer, uint16_t buffer_length); void mqtt_msg_init(mqtt_connection_t* connection, uint8_t* buffer, uint16_t buffer_length);
int mqtt_get_total_length(uint8_t* buffer, uint16_t length); int mqtt_get_total_length(uint8_t* buffer, uint16_t length);

View File

@ -45,7 +45,8 @@ m:on("message", function(client, topic, data)
end) end)
-- for TLS: m:connect("192.168.11.118", secure-port, 1) -- for TLS: m:connect("192.168.11.118", secure-port, 1)
m:connect("192.168.11.118", 1880, 0, function(client) print("connected") end) m:connect("192.168.11.118", 1883, 0, function(client) print("connected") end,
function(client, reason) print("failed reason: "..reason) end)
-- Calling subscribe/publish only makes sense once the connection -- Calling subscribe/publish only makes sense once the connection
-- was successfully established. In a real-world application you want -- was successfully established. In a real-world application you want
@ -82,18 +83,35 @@ none
Connects to the broker specified by the given host, port, and secure options. Connects to the broker specified by the given host, port, and secure options.
#### Syntax #### Syntax
`mqtt:connect(host[, port[, secure[, autoreconnect]]][, function(client)])` `mqtt:connect(host[, port[, secure[, autoreconnect]]][, function(client)[, function(client, reason)]])`
#### Parameters #### Parameters
- `host` host, domain or IP (string) - `host` host, domain or IP (string)
- `port` broker port (number), default 1883 - `port` broker port (number), default 1883
- `secure` 0/1 for `false`/`true`, default 0. [As per #996](https://github.com/nodemcu/nodemcu-firmware/issues/996#issuecomment-178053308) secure connections use **TLS 1.1** with the following cipher suites: `TLS_RSA_WITH_AES_128_CBC_SHA`, `TLS_RSA_WITH_AES_256_CBC_SHA`, `TLS_RSA_WITH_RC4_128_SHA`, and `TLS_RSA_WITH_RC4_128_MD5`. - `secure` 0/1 for `false`/`true`, default 0. [As per #996](https://github.com/nodemcu/nodemcu-firmware/issues/996#issuecomment-178053308) secure connections use **TLS 1.1** with the following cipher suites: `TLS_RSA_WITH_AES_128_CBC_SHA`, `TLS_RSA_WITH_AES_256_CBC_SHA`, `TLS_RSA_WITH_RC4_128_SHA`, and `TLS_RSA_WITH_RC4_128_MD5`.
- `autoreconnect` 0/1 for `false`/`true`, default 0 - `autoreconnect` 0/1 for `false`/`true`, default 0
- `function(client)` call back function for when the connection was established - `function(client)` callback function for when the connection was established
- `function(client, reason)` callback function for when the connection could not be established
#### Returns #### Returns
`true` on success, `false` otherwise `true` on success, `false` otherwise
#### Connection failure callback reason codes:
| Constant | Value | Description |
|----------|-------|-------------|
|`mqtt.CONN_FAIL_SERVER_NOT_FOUND`|-5|There is no broker listening at the specified IP Address and Port|
|`mqtt.CONN_FAIL_NOT_A_CONNACK_MSG`|-4|The response from the broker was not a CONNACK as required by the protocol|
|`mqtt.CONN_FAIL_DNS`|-3|DNS Lookup failed|
|`mqtt.CONN_FAIL_TIMEOUT_RECEIVING`|-2|Timeout waiting for a CONNACK from the broker|
|`mqtt.CONN_FAIL_TIMEOUT_SENDING`|-1|Timeout trying to send the Connect message|
|`mqtt.CONNACK_ACCEPTED`|0|No errors. _Note: This will not trigger a failure callback._|
|`mqtt.CONNACK_REFUSED_PROTOCOL_VER`|1|The broker is not a 3.1.1 MQTT broker.|
|`mqtt.CONNACK_REFUSED_ID_REJECTED`|2|The specified ClientID was rejected by the broker. (See `mqtt.Client()`)|
|`mqtt.CONNACK_REFUSED_SERVER_UNAVAILABLE`|3|The server is unavailable.|
|`mqtt.CONNACK_REFUSED_BAD_USER_OR_PASS`|4|The broker refused the specified username or password.|
|`mqtt.CONNACK_REFUSED_NOT_AUTHORIZED`|5|The username is not authorized.|
## mqtt.client:lwt() ## mqtt.client:lwt()
Setup [Last Will and Testament](http://www.hivemq.com/blog/mqtt-essentials-part-9-last-will-and-testament) (optional). A broker will publish a message with qos = 0, retain = 0, data = "offline" to topic "/lwt" if client does not send keepalive packet. Setup [Last Will and Testament](http://www.hivemq.com/blog/mqtt-essentials-part-9-last-will-and-testament) (optional). A broker will publish a message with qos = 0, retain = 0, data = "offline" to topic "/lwt" if client does not send keepalive packet.