/******************************************************************************* * File Name : MQTT.c * Description : * Created on : 2018Äê5ÔÂ22ÈÕ * Author : ¶Å¼ü *******************************************************************************/ /******************************************************************************* * Include Files * *******************************************************************************/ #include "EG800MQTT.h" #include "string.h" #include "stdlib.h" #include "EG800Driver.h" #include "EG800FSM.h" #include "HIDO_FSM.h" #include "HIDO_Timer.h" #include "HIDO_Util.h" #include "HIDO_VLQueue.h" #include "HIDO_ArraryQueue.h" #include "HIDO_ATLite.h" #include "HIDO_Debug.h" #include "Module.h" #include "ATConfig.h" #include "MQTT.h" /******************************************************************************* * Macro * *******************************************************************************/ /******************************************************************************* * Type Definition * *******************************************************************************/ typedef struct { HIDO_BOOL m_bUsed; HIDO_UINT32 m_u32MsgID; HIDO_UINT32 m_u32Tick; } ST_Inflight; /******************************************************************************* * Local Variable * *******************************************************************************/ HIDO_FSM_STATE_FULL_DECLARE(EG800IPReady, EG800MQTTSetup) /* MQTTÁ¬½Ó½¨Á¢*/ HIDO_FSM_STATE_FULL_DECLARE(EG800IPReady, EG800MQTTClose) /* MQTT¹Ø±Õ״̬ */ #if 0 HIDO_FSM_STATE_FULL_DECLARE(EG800IPReady, EG800MQTTConnect) HIDO_FSM_STATE_FULL_DECLARE(EG800IPReady, EG800MQTTDisconnect) HIDO_FSM_STATE_FULL_DECLARE(EG800IPReady, EG800MQTTSubscribe) HIDO_FSM_STATE_FULL_DECLARE(EG800IPReady, EG800MQTTUnsubscribe) #endif HIDO_FSM_STATE_FULL_DECLARE(EG800IPReady, EG800MQTTPublish) HIDO_FSM_STATE_FULL_DECLARE(EG800IPReady, EG800MQTTRecv) #define MAX_FLIGHT 5 #define INVALID_MSG_ID 0xFFFFFFFF static ST_Inflight l_aastInflightList[MQTT_NUM][MAX_FLIGHT]; /******************************************************************************* * Local Function Declaration * *******************************************************************************/ /******************************************************************************* * Local Function * *******************************************************************************/ /******************************************************************************* * Function Name : EG800MQTT_IsInflightFull * Description : * Input : * Output : * Return : *******************************************************************************/ static HIDO_BOOL EG800MQTT_IsInflightFull(HIDO_UINT32 _u32ClientID) { HIDO_UINT32 i = 0; for (i = 0; i < MAX_FLIGHT; i++) { if (HIDO_FALSE == l_aastInflightList[_u32ClientID][i].m_bUsed) { return HIDO_FALSE; } } return HIDO_TRUE; } /******************************************************************************* * Function Name : EG800MQTT_InflightAdd * Description : * Input : * Output : * Return : *******************************************************************************/ static HIDO_INT32 EG800MQTT_InflightAdd(HIDO_UINT32 _u32ClientID, HIDO_UINT32 _u32MsgID) { HIDO_UINT32 i = 0; for (i = 0; i < MAX_FLIGHT; i++) { if (HIDO_FALSE == l_aastInflightList[_u32ClientID][i].m_bUsed) { l_aastInflightList[_u32ClientID][i].m_bUsed = HIDO_TRUE; l_aastInflightList[_u32ClientID][i].m_u32MsgID = _u32MsgID; l_aastInflightList[_u32ClientID][i].m_u32Tick = HIDO_TimerGetTick(); return HIDO_OK; } } return HIDO_ERR; } /******************************************************************************* * Function Name : EG800MQTT_InflightRemove * Description : * Input : * Output : * Return : *******************************************************************************/ static HIDO_INT32 EG800MQTT_InflightRemove(HIDO_UINT32 _u32ClientID, HIDO_UINT32 _u32MsgID) { HIDO_UINT32 i = 0; if (_u32ClientID >= MQTT_NUM) { return HIDO_ERR; } for (i = 0; i < MAX_FLIGHT; i++) { if ((HIDO_TRUE == l_aastInflightList[_u32ClientID][i].m_bUsed) && (l_aastInflightList[_u32ClientID][i].m_u32MsgID == _u32MsgID)) { memset(&l_aastInflightList[_u32ClientID][i], 0, sizeof(l_aastInflightList[_u32ClientID][i])); MQTT_OnSend(_u32ClientID); } } return HIDO_OK; } /******************************************************************************* * Function Name : EG800MQTT_InflightCleanTimeout * Description : * Input : * Output : * Return : *******************************************************************************/ static HIDO_INT32 EG800MQTT_InflightCleanTimeout(HIDO_UINT32 _u32ClientID) { HIDO_UINT32 i = 0; for (i = 0; i < MAX_FLIGHT; i++) { if (HIDO_TRUE == l_aastInflightList[_u32ClientID][i].m_bUsed) { if ((HIDO_TimerGetTick() - l_aastInflightList[_u32ClientID][i].m_u32Tick) > HIDO_TIMER_TICK_S(15)) { memset(&l_aastInflightList[_u32ClientID][i], 0, sizeof(l_aastInflightList[_u32ClientID][i])); MQTT_OnSend(_u32ClientID); } } } return HIDO_OK; } /******************************************************************************* * Function Name : EG800MQTT_InflightClean * Description : * Input : * Output : * Return : *******************************************************************************/ static HIDO_INT32 EG800MQTT_InflightClean(HIDO_UINT32 _u32ClientID) { memset(&l_aastInflightList[_u32ClientID], 0, sizeof(l_aastInflightList[_u32ClientID])); return HIDO_OK; } /******************************************************************************* * Global Function * *******************************************************************************/ /******************************************************************************* * State Name : EG800MQTTSetup * Parent State : EG800 * Description : * Author : ¶Å¼ü *******************************************************************************/ HIDO_FSM_STATE_IMPLEMENT(EG800MQTTSetup, HIDO_FSM_STATE(EG800IPReady), HIDO_NULL) { typedef enum { MQTT_SETUP_STATE_INIT_VERSION, MQTT_SETUP_STATE_INIT_PDPCID, MQTT_SETUP_STATE_INIT_TIMEOUT, MQTT_SETUP_STATE_INIT_SESSION, MQTT_SETUP_STATE_INIT_KEEPALIVE, MQTT_SETUP_STATE_INIT_RECV_MODE, MQTT_SETUP_STATE_OPEN, MQTT_SETUP_STATE_CONN, MQTT_SETUP_STATE_SUBSCRIBE, } E_MQTTInitState; static HIDO_INT32 l_i32MQTTID = 0; static E_MQTTInitState l_eInitState; static HIDO_BOOL l_bATResponse = HIDO_FALSE; static HIDO_BOOL l_bResultResponse = HIDO_FALSE; static ST_SubscribeTopic *pstTopicList = HIDO_NULL; HIDO_ATLiteDeviceStruct *pstATDevice = (HIDO_ATLiteDeviceStruct *)_pstFSM->m_pPrivateData; HIDO_UINT32 u32FSMTimerID = ((ST_EG800DriverData *)pstATDevice->m_pUserData)->m_u32FSMTimerID; switch (_u32Event) { case HIDO_EVENT_ENTRY: { l_i32MQTTID = (HIDO_INT32)_pArg; l_eInitState = MQTT_SETUP_STATE_INIT_VERSION; HIDO_ATLiteCmdSend(pstATDevice, AT_GENERAL_TIMEOUT_TIME, "AT+QMTCFG=\"version\",%u,%u\r\n", l_i32MQTTID, MQTT_GetVersion(l_i32MQTTID)); break; } case HIDO_EVENT_EXIT: { HIDO_ATLiteCmdSendOver(pstATDevice); HIDO_TimerCancel(u32FSMTimerID); break; } case HIDO_AT_EVENT_OK: { if (MQTT_SETUP_STATE_INIT_VERSION == l_eInitState) { l_eInitState = MQTT_SETUP_STATE_INIT_PDPCID; HIDO_ATLiteCmdSend(pstATDevice, AT_GENERAL_TIMEOUT_TIME, "AT+QMTCFG=\"pdpcid\",%u,%u\r\n", l_i32MQTTID, MQTT_GetPdpCid(l_i32MQTTID)); } else if (MQTT_SETUP_STATE_INIT_PDPCID == l_eInitState) { l_eInitState = MQTT_SETUP_STATE_INIT_TIMEOUT; HIDO_ATLiteCmdSend(pstATDevice, AT_GENERAL_TIMEOUT_TIME, "AT+QMTCFG=\"timeout\",%u,%u,%u,1\r\n", l_i32MQTTID, MQTT_GetPktTimeout(l_i32MQTTID), MQTT_GetRetryTimes(l_i32MQTTID)); } else if (MQTT_SETUP_STATE_INIT_TIMEOUT == l_eInitState) { l_eInitState = MQTT_SETUP_STATE_INIT_SESSION; HIDO_ATLiteCmdSend(pstATDevice, AT_GENERAL_TIMEOUT_TIME, "AT+QMTCFG=\"session\",%u,%u\r\n", l_i32MQTTID, MQTT_GetCleanSession(l_i32MQTTID)); } else if (MQTT_SETUP_STATE_INIT_SESSION == l_eInitState) { l_eInitState = MQTT_SETUP_STATE_INIT_KEEPALIVE; HIDO_ATLiteCmdSend(pstATDevice, AT_GENERAL_TIMEOUT_TIME, "AT+QMTCFG=\"keepalive\",%u,%u\r\n", l_i32MQTTID, MQTT_GetKeepAlive(l_i32MQTTID)); } else if (MQTT_SETUP_STATE_INIT_KEEPALIVE == l_eInitState) { l_eInitState = MQTT_SETUP_STATE_INIT_RECV_MODE; HIDO_ATLiteCmdSend(pstATDevice, AT_GENERAL_TIMEOUT_TIME, "AT+QMTCFG=\"recv/mode\",%u,1,1\r\n", l_i32MQTTID); } else if (MQTT_SETUP_STATE_INIT_RECV_MODE == l_eInitState) { l_bATResponse = HIDO_FALSE; l_bResultResponse = HIDO_FALSE; l_eInitState = MQTT_SETUP_STATE_OPEN; HIDO_ATLiteCmdSend(pstATDevice, AT_GENERAL_TIMEOUT_TIME, "AT+QMTOPEN=%u,\"%s\",%u\r\n", l_i32MQTTID, MQTT_GetRemoteAddr(l_i32MQTTID), MQTT_GetRemotePort(l_i32MQTTID)); } else if (MQTT_SETUP_STATE_OPEN == l_eInitState) { l_bATResponse = HIDO_TRUE; OPEN: if (l_bATResponse == HIDO_TRUE && l_bResultResponse == HIDO_TRUE) { l_bATResponse = HIDO_FALSE; l_bResultResponse = HIDO_FALSE; l_eInitState = MQTT_SETUP_STATE_CONN; HIDO_ATLiteCmdSend(pstATDevice, AT_GENERAL_TIMEOUT_TIME, "AT+QMTCONN=%u,\"%s\",\"%s\",\"%s\"\r\n", l_i32MQTTID, MQTT_GetClientID(l_i32MQTTID), MQTT_GetUsername(l_i32MQTTID), MQTT_GetmPassword(l_i32MQTTID)); } } else if (MQTT_SETUP_STATE_CONN == l_eInitState) { l_bATResponse = HIDO_TRUE; CONN: if (l_bATResponse == HIDO_TRUE && l_bResultResponse == HIDO_TRUE) { MQTT_OnConnected(l_i32MQTTID); MQTT_SetMsgID(l_i32MQTTID, 0); pstTopicList = MQTT_GetSubscribeTopicList(l_i32MQTTID); if (pstTopicList != HIDO_NULL) { l_bATResponse = HIDO_FALSE; l_bResultResponse = HIDO_FALSE; l_eInitState = MQTT_SETUP_STATE_SUBSCRIBE; HIDO_ATLiteCmdSend(pstATDevice, AT_GENERAL_TIMEOUT_TIME, "AT+QMTSUB=%u,%u", l_i32MQTTID, MQTT_GetMsgID(l_i32MQTTID)); HIDO_ATLiteCmdSend(pstATDevice, AT_GENERAL_TIMEOUT_TIME, ",\"%s\",%u", pstTopicList->m_acTopic, pstTopicList->m_u32QOS); HIDO_ATLiteCmdSend(pstATDevice, AT_GENERAL_TIMEOUT_TIME, "\r\n"); } else { HIDO_FSMStateChange(_pstFSM, HIDO_FSM_STATE(EG800IPPoll), HIDO_NULL); } } } else if (MQTT_SETUP_STATE_SUBSCRIBE == l_eInitState) { l_bATResponse = HIDO_TRUE; SUB: if (l_bATResponse == HIDO_TRUE && l_bResultResponse == HIDO_TRUE) { if (pstTopicList != HIDO_NULL) { pstTopicList = pstTopicList->m_pstNext; } if (pstTopicList != HIDO_NULL) { l_bATResponse = HIDO_FALSE; l_bResultResponse = HIDO_FALSE; l_eInitState = MQTT_SETUP_STATE_SUBSCRIBE; HIDO_ATLiteCmdSend(pstATDevice, AT_GENERAL_TIMEOUT_TIME, "AT+QMTSUB=%u,%u", l_i32MQTTID, MQTT_GetMsgID(l_i32MQTTID)); HIDO_ATLiteCmdSend(pstATDevice, AT_GENERAL_TIMEOUT_TIME, ",\"%s\",%u", pstTopicList->m_acTopic, pstTopicList->m_u32QOS); HIDO_ATLiteCmdSend(pstATDevice, AT_GENERAL_TIMEOUT_TIME, "\r\n"); } else { HIDO_FSMStateChange(_pstFSM, HIDO_FSM_STATE(EG800IPPoll), HIDO_NULL); } } } break; } case HIDO_AT_EVENT_ERROR: { MQTT_OnClosed(l_i32MQTTID); HIDO_FSMStateChange(_pstFSM, HIDO_FSM_STATE(EG800IPPoll), HIDO_NULL); break; } case EG800_EVENT_QMTOPEN: { HIDO_DataStruct *pstData = (HIDO_DataStruct *)_pArg; HIDO_UINT32 u32Err = 0; HIDO_UINT32 u32ClientID = 0; if (HIDO_UtilParseFormat((HIDO_UINT8 *)pstData->m_pData, pstData->m_u32Len, "+QMTOPEN: %d,%d\r\n", &u32ClientID, &u32Err) != 2) { break; } if ((u32ClientID == l_i32MQTTID) && (u32Err != 0)) { MQTT_OnConnectFailed(u32ClientID); HIDO_FSMStateChange(_pstFSM, HIDO_FSM_STATE(EG800IPPoll), HIDO_NULL); break; } l_bResultResponse = HIDO_TRUE; goto OPEN; } case EG800_EVENT_QMTCONN: { HIDO_DataStruct *pstData = (HIDO_DataStruct *)_pArg; HIDO_UINT32 u32Err = 0; HIDO_UINT32 u32ClientID = 0; HIDO_UINT32 u32RetCode = 0; if (HIDO_UtilParseFormat((HIDO_UINT8 *)pstData->m_pData, pstData->m_u32Len, "+QMTCONN: %d,%d,%d", &u32ClientID, &u32Err, &u32RetCode) != 3) { if (HIDO_UtilParseFormat((HIDO_UINT8 *)pstData->m_pData, pstData->m_u32Len, "+QMTCONN: %d,%d", &u32ClientID, &u32Err) != 2) { break; } } if ((u32ClientID == l_i32MQTTID) && (u32Err != 0 || u32RetCode != 0)) { MQTT_OnConnectFailed(u32ClientID); HIDO_FSMStateChange(_pstFSM, HIDO_FSM_STATE(EG800IPPoll), HIDO_NULL); break; } l_bResultResponse = HIDO_TRUE; goto CONN; } case EG800_EVENT_QMTSUB: { HIDO_DataStruct *pstData = (HIDO_DataStruct *)_pArg; HIDO_UINT32 u32Err = 0; HIDO_UINT32 u32ClientID = 0; if (HIDO_UtilParseFormat((HIDO_UINT8 *)pstData->m_pData, pstData->m_u32Len, "+QMTSUB: %d,%*,%d", &u32ClientID, &u32Err) != 3) { break; } if ((u32ClientID == l_i32MQTTID) && (u32Err != 0)) { MQTT_OnClosed(u32ClientID); HIDO_FSMStateChange(_pstFSM, HIDO_FSM_STATE(EG800IPPoll), HIDO_NULL); break; } l_bResultResponse = HIDO_TRUE; goto SUB; } case EG800_EVENT_QMTSTAT: { HIDO_DataStruct *pstData = (HIDO_DataStruct *)_pArg; HIDO_UINT32 u32Err = 0; HIDO_UINT32 u32ClientID = 0; if (HIDO_UtilParseFormat((HIDO_UINT8 *)pstData->m_pData, pstData->m_u32Len, "+QMTSTAT: %d,%d\r\n", &u32ClientID, &u32Err) != 2) { break; } if ((u32ClientID == l_i32MQTTID) && (u32Err != 0)) { MQTT_OnClosed(u32ClientID); HIDO_FSMStateChange(_pstFSM, HIDO_FSM_STATE(EG800IPPoll), HIDO_NULL); break; } break; } default: { return HIDO_EVENT_NO_PROC; } } return HIDO_EVENT_OK; } /******************************************************************************* * State Name : EG800MQTTClose * Parent State : EG800 * Description : * Author : ¶Å¼ü *******************************************************************************/ HIDO_FSM_STATE_IMPLEMENT(EG800MQTTClose, HIDO_FSM_STATE(EG800IPReady), HIDO_NULL) { HIDO_ATLiteDeviceStruct *pstATDevice = (HIDO_ATLiteDeviceStruct *)_pstFSM->m_pPrivateData; HIDO_UINT32 u32FSMTimerID = ((ST_EG800DriverData *)pstATDevice->m_pUserData)->m_u32FSMTimerID; static HIDO_INT32 l_i32MQTTID = 0; static HIDO_BOOL l_bATResp = HIDO_FALSE; static HIDO_BOOL l_bOKResp = HIDO_FALSE; switch (_u32Event) { case HIDO_EVENT_ENTRY: { l_i32MQTTID = (HIDO_INT32)_pArg; l_bATResp = HIDO_FALSE; l_bOKResp = HIDO_FALSE; HIDO_ATLiteCmdSend(pstATDevice, HIDO_TIMER_TICK_S(15), "AT+QMTCLOSE=%d\r\n", l_i32MQTTID); HIDO_FSMStartTimer(u32FSMTimerID, HIDO_TIMER_TYPE_ONCE, HIDO_TIMER_TICK_S(60), _pstFSM, EG800_EVENT_DELAY); break; } case HIDO_EVENT_EXIT: { HIDO_ATLiteCmdSendOver(pstATDevice); HIDO_TimerCancel(u32FSMTimerID); break; } case HIDO_AT_EVENT_ERROR: { MQTT_OnClosed(l_i32MQTTID); HIDO_FSMStateChange(_pstFSM, HIDO_FSM_STATE(EG800IPPoll), HIDO_NULL); break; } case HIDO_AT_EVENT_OK: { l_bOKResp = HIDO_TRUE; if (HIDO_TRUE == l_bATResp) { MQTT_OnClosed(l_i32MQTTID); HIDO_FSMStateChange(_pstFSM, HIDO_FSM_STATE(EG800IPPoll), HIDO_NULL); } break; } case EG800_EVENT_QMTCLOSE: { l_bATResp = HIDO_TRUE; if (HIDO_TRUE == l_bOKResp) { MQTT_OnClosed(l_i32MQTTID); HIDO_FSMStateChange(_pstFSM, HIDO_FSM_STATE(EG800IPPoll), HIDO_NULL); } break; } case EG800_EVENT_DELAY: { MQTT_OnClosed(l_i32MQTTID); HIDO_FSMStateChange(_pstFSM, HIDO_FSM_STATE(EG800IPPoll), HIDO_NULL); break; } default: { return HIDO_EVENT_NO_PROC; } } return HIDO_EVENT_OK; } #if 0 /******************************************************************************* * State Name : EG800MQTTConnect * Parent State : Ready * Description : * Author : ¶Å¼ü *******************************************************************************/ HIDO_FSM_STATE_IMPLEMENT(EG800MQTTConnect, HIDO_FSM_STATE(EG800IPReady), HIDO_NULL) { static HIDO_INT32 l_i32MQTTID = 0; static HIDO_BOOL l_bIsAck = HIDO_FALSE; HIDO_ATLiteDeviceStruct *pstATDevice = (HIDO_ATLiteDeviceStruct *) _pstFSM->m_pPrivateData; HIDO_UINT32 u32FSMTimerID = ((ST_EG800DriverData *)pstATDevice->m_pUserData)->m_u32FSMTimerID; switch (_u32Event) { case HIDO_EVENT_ENTRY: { l_i32MQTTID = (HIDO_INT32) _pArg; l_bIsAck = HIDO_FALSE; l_au32MQTTUnAckTick[l_i32MQTTID] = HIDO_TimerGetTick(); HIDO_ATLiteCmdSend(pstATDevice, AT_GENERAL_TIMEOUT_TIME, "AT+QISEND=%d,0\r\n", l_i32MQTTID); break; } case HIDO_EVENT_EXIT: { HIDO_ATLiteCmdSendOver(pstATDevice); HIDO_TimerCancel(u32FSMTimerID); break; } case HIDO_AT_EVENT_OK: { if(HIDO_TRUE == l_bIsAck) { l_abMQTTUnAck[l_i32MQTTID] = HIDO_FALSE; HIDO_FSMStateChange(_pstFSM, HIDO_FSM_STATE(EG800IPPoll), HIDO_NULL); } else { if((HIDO_TimerGetTick() - l_au32MQTTSendTick[l_i32MQTTID]) >= HIDO_TIMER_TICK_S(90)) { MQTT_OnClosed(l_i32MQTTID); l_abMQTTUnAck[l_i32MQTTID] = HIDO_FALSE; } HIDO_FSMStateChange(_pstFSM, HIDO_FSM_STATE(EG800IPPoll), HIDO_NULL); } break; } case HIDO_AT_EVENT_ERROR: { l_abMQTTUnAck[l_i32MQTTID] = HIDO_FALSE; MQTT_OnClosed(l_i32MQTTID); HIDO_FSMStateChange(_pstFSM, HIDO_FSM_STATE(EG800IPPoll), HIDO_NULL); break; } case EG800_EVENT_SEND: { HIDO_UINT32 u32Total = 0; HIDO_UINT32 u32Ack = 0; HIDO_UINT32 u32UnAck = 0; HIDO_DataStruct *pstData = (HIDO_DataStruct *) _pArg; if (HIDO_UtilParseFormat((HIDO_UINT8 *) pstData->m_pData, pstData->m_u32Len, "+QISEND: %d,%d,%d\r\n", &u32Total, &u32Ack, &u32UnAck) != 3) { MQTT_OnClosed(l_i32MQTTID); HIDO_FSMStateChange(_pstFSM, HIDO_FSM_STATE(EG800IPPoll), HIDO_NULL); break; } if(0 == u32UnAck) { l_bIsAck = HIDO_TRUE; } else { l_bIsAck = HIDO_FALSE; } break; } case EG800_EVENT_DELAY: { HIDO_ATLiteCmdSend(pstATDevice, AT_GENERAL_TIMEOUT_TIME, "AT+QISEND=%d,0\r\n", l_i32MQTTID); break; } default: { return HIDO_EVENT_NO_PROC; } } return HIDO_EVENT_OK; } /******************************************************************************* * State Name : EG800MQTTDisconnect * Parent State : Ready * Description : * Author : ¶Å¼ü *******************************************************************************/ HIDO_FSM_STATE_IMPLEMENT(EG800MQTTDisconnect, HIDO_FSM_STATE(EG800IPReady), HIDO_NULL) { static HIDO_INT32 l_i32MQTTID = 0; static HIDO_BOOL l_bIsAck = HIDO_FALSE; HIDO_ATLiteDeviceStruct *pstATDevice = (HIDO_ATLiteDeviceStruct *) _pstFSM->m_pPrivateData; HIDO_UINT32 u32FSMTimerID = ((ST_EG800DriverData *)pstATDevice->m_pUserData)->m_u32FSMTimerID; switch (_u32Event) { case HIDO_EVENT_ENTRY: { l_i32MQTTID = (HIDO_INT32) _pArg; l_bIsAck = HIDO_FALSE; l_au32MQTTUnAckTick[l_i32MQTTID] = HIDO_TimerGetTick(); HIDO_ATLiteCmdSend(pstATDevice, AT_GENERAL_TIMEOUT_TIME, "AT+QISEND=%d,0\r\n", l_i32MQTTID); break; } case HIDO_EVENT_EXIT: { HIDO_ATLiteCmdSendOver(pstATDevice); HIDO_TimerCancel(u32FSMTimerID); break; } case HIDO_AT_EVENT_OK: { if(HIDO_TRUE == l_bIsAck) { l_abMQTTUnAck[l_i32MQTTID] = HIDO_FALSE; HIDO_FSMStateChange(_pstFSM, HIDO_FSM_STATE(EG800IPPoll), HIDO_NULL); } else { if((HIDO_TimerGetTick() - l_au32MQTTSendTick[l_i32MQTTID]) >= HIDO_TIMER_TICK_S(90)) { MQTT_OnClosed(l_i32MQTTID); l_abMQTTUnAck[l_i32MQTTID] = HIDO_FALSE; } HIDO_FSMStateChange(_pstFSM, HIDO_FSM_STATE(EG800IPPoll), HIDO_NULL); } break; } case HIDO_AT_EVENT_ERROR: { l_abMQTTUnAck[l_i32MQTTID] = HIDO_FALSE; MQTT_OnClosed(l_i32MQTTID); HIDO_FSMStateChange(_pstFSM, HIDO_FSM_STATE(EG800IPPoll), HIDO_NULL); break; } case EG800_EVENT_SEND: { HIDO_UINT32 u32Total = 0; HIDO_UINT32 u32Ack = 0; HIDO_UINT32 u32UnAck = 0; HIDO_DataStruct *pstData = (HIDO_DataStruct *) _pArg; if (HIDO_UtilParseFormat((HIDO_UINT8 *) pstData->m_pData, pstData->m_u32Len, "+QISEND: %d,%d,%d\r\n", &u32Total, &u32Ack, &u32UnAck) != 3) { MQTT_OnClosed(l_i32MQTTID); HIDO_FSMStateChange(_pstFSM, HIDO_FSM_STATE(EG800IPPoll), HIDO_NULL); break; } if(0 == u32UnAck) { l_bIsAck = HIDO_TRUE; } else { l_bIsAck = HIDO_FALSE; } break; } case EG800_EVENT_DELAY: { HIDO_ATLiteCmdSend(pstATDevice, AT_GENERAL_TIMEOUT_TIME, "AT+QISEND=%d,0\r\n", l_i32MQTTID); break; } default: { return HIDO_EVENT_NO_PROC; } } return HIDO_EVENT_OK; } /******************************************************************************* * State Name : EG800MQTTSendData * Parent State : Ready * Description : * Author : ¶Å¼ü *******************************************************************************/ HIDO_FSM_STATE_IMPLEMENT(EG800MQTTSubscribe, HIDO_FSM_STATE(EG800IPReady), HIDO_NULL) { static HIDO_INT32 l_i32MQTTID = 0; static HIDO_BOOL l_bIsAck = HIDO_FALSE; HIDO_ATLiteDeviceStruct *pstATDevice = (HIDO_ATLiteDeviceStruct *) _pstFSM->m_pPrivateData; HIDO_UINT32 u32FSMTimerID = ((ST_EG800DriverData *)pstATDevice->m_pUserData)->m_u32FSMTimerID; switch (_u32Event) { case HIDO_EVENT_ENTRY: { l_i32MQTTID = (HIDO_INT32) _pArg; l_bIsAck = HIDO_FALSE; l_au32MQTTUnAckTick[l_i32MQTTID] = HIDO_TimerGetTick(); HIDO_ATLiteCmdSend(pstATDevice, AT_GENERAL_TIMEOUT_TIME, "AT+QISEND=%d,0\r\n", l_i32MQTTID); break; } case HIDO_EVENT_EXIT: { HIDO_ATLiteCmdSendOver(pstATDevice); HIDO_TimerCancel(u32FSMTimerID); break; } case HIDO_AT_EVENT_OK: { if(HIDO_TRUE == l_bIsAck) { l_abMQTTUnAck[l_i32MQTTID] = HIDO_FALSE; HIDO_FSMStateChange(_pstFSM, HIDO_FSM_STATE(EG800IPPoll), HIDO_NULL); } else { if((HIDO_TimerGetTick() - l_au32MQTTSendTick[l_i32MQTTID]) >= HIDO_TIMER_TICK_S(90)) { MQTT_OnClosed(l_i32MQTTID); l_abMQTTUnAck[l_i32MQTTID] = HIDO_FALSE; } HIDO_FSMStateChange(_pstFSM, HIDO_FSM_STATE(EG800IPPoll), HIDO_NULL); } break; } case HIDO_AT_EVENT_ERROR: { l_abMQTTUnAck[l_i32MQTTID] = HIDO_FALSE; MQTT_OnClosed(l_i32MQTTID); HIDO_FSMStateChange(_pstFSM, HIDO_FSM_STATE(EG800IPPoll), HIDO_NULL); break; } case EG800_EVENT_SEND: { HIDO_UINT32 u32Total = 0; HIDO_UINT32 u32Ack = 0; HIDO_UINT32 u32UnAck = 0; HIDO_DataStruct *pstData = (HIDO_DataStruct *) _pArg; if (HIDO_UtilParseFormat((HIDO_UINT8 *) pstData->m_pData, pstData->m_u32Len, "+QISEND: %d,%d,%d\r\n", &u32Total, &u32Ack, &u32UnAck) != 3) { MQTT_OnClosed(l_i32MQTTID); HIDO_FSMStateChange(_pstFSM, HIDO_FSM_STATE(EG800IPPoll), HIDO_NULL); break; } if(0 == u32UnAck) { l_bIsAck = HIDO_TRUE; } else { l_bIsAck = HIDO_FALSE; } break; } case EG800_EVENT_DELAY: { HIDO_ATLiteCmdSend(pstATDevice, AT_GENERAL_TIMEOUT_TIME, "AT+QISEND=%d,0\r\n", l_i32MQTTID); break; } default: { return HIDO_EVENT_NO_PROC; } } return HIDO_EVENT_OK; } /******************************************************************************* * State Name : EG800MQTTUnsubscribe * Parent State : EG800IPReady * Description : * Author : ¶Å¼ü *******************************************************************************/ HIDO_FSM_STATE_IMPLEMENT(EG800MQTTUnsubscribe, HIDO_FSM_STATE(EG800IPReady), HIDO_NULL) { static HIDO_INT32 l_i32MQTTID = 0; static HIDO_BOOL l_bIsAck = HIDO_FALSE; HIDO_ATLiteDeviceStruct *pstATDevice = (HIDO_ATLiteDeviceStruct *) _pstFSM->m_pPrivateData; HIDO_UINT32 u32FSMTimerID = ((ST_EG800DriverData *)pstATDevice->m_pUserData)->m_u32FSMTimerID; switch (_u32Event) { case HIDO_EVENT_ENTRY: { l_i32MQTTID = (HIDO_INT32) _pArg; l_bIsAck = HIDO_FALSE; l_au32MQTTUnAckTick[l_i32MQTTID] = HIDO_TimerGetTick(); HIDO_ATLiteCmdSend(pstATDevice, AT_GENERAL_TIMEOUT_TIME, "AT+QISEND=%d,0\r\n", l_i32MQTTID); break; } case HIDO_EVENT_EXIT: { HIDO_ATLiteCmdSendOver(pstATDevice); HIDO_TimerCancel(u32FSMTimerID); break; } case HIDO_AT_EVENT_OK: { if(HIDO_TRUE == l_bIsAck) { l_abMQTTUnAck[l_i32MQTTID] = HIDO_FALSE; HIDO_FSMStateChange(_pstFSM, HIDO_FSM_STATE(EG800IPPoll), HIDO_NULL); } else { if((HIDO_TimerGetTick() - l_au32MQTTSendTick[l_i32MQTTID]) >= HIDO_TIMER_TICK_S(90)) { MQTT_OnClosed(l_i32MQTTID); l_abMQTTUnAck[l_i32MQTTID] = HIDO_FALSE; } HIDO_FSMStateChange(_pstFSM, HIDO_FSM_STATE(EG800IPPoll), HIDO_NULL); } break; } case HIDO_AT_EVENT_ERROR: { l_abMQTTUnAck[l_i32MQTTID] = HIDO_FALSE; MQTT_OnClosed(l_i32MQTTID); HIDO_FSMStateChange(_pstFSM, HIDO_FSM_STATE(EG800IPPoll), HIDO_NULL); break; } case EG800_EVENT_SEND: { HIDO_UINT32 u32Total = 0; HIDO_UINT32 u32Ack = 0; HIDO_UINT32 u32UnAck = 0; HIDO_DataStruct *pstData = (HIDO_DataStruct *) _pArg; if (HIDO_UtilParseFormat((HIDO_UINT8 *) pstData->m_pData, pstData->m_u32Len, "+QISEND: %d,%d,%d\r\n", &u32Total, &u32Ack, &u32UnAck) != 3) { MQTT_OnClosed(l_i32MQTTID); HIDO_FSMStateChange(_pstFSM, HIDO_FSM_STATE(EG800IPPoll), HIDO_NULL); break; } if(0 == u32UnAck) { l_bIsAck = HIDO_TRUE; } else { l_bIsAck = HIDO_FALSE; } break; } case EG800_EVENT_DELAY: { HIDO_ATLiteCmdSend(pstATDevice, AT_GENERAL_TIMEOUT_TIME, "AT+QISEND=%d,0\r\n", l_i32MQTTID); break; } default: { return HIDO_EVENT_NO_PROC; } } return HIDO_EVENT_OK; } #endif /******************************************************************************* * State Name : EG800MQTTPublish * Parent State : EG800IPReady * Description : * Author : ¶Å¼ü *******************************************************************************/ HIDO_FSM_STATE_IMPLEMENT(EG800MQTTPublish, HIDO_FSM_STATE(EG800IPReady), HIDO_NULL) { static HIDO_INT32 l_i32MQTTID = 0; static HIDO_VLQMemberStruct *l_pstMember = HIDO_NULL; static ST_MQTTMessage *l_pstMessage = HIDO_NULL; static HIDO_VLQStruct *l_pstSendQueue = HIDO_NULL; HIDO_UINT32 u32MsgID = 0; HIDO_ATLiteDeviceStruct *pstATDevice = (HIDO_ATLiteDeviceStruct *)_pstFSM->m_pPrivateData; switch (_u32Event) { case HIDO_EVENT_ENTRY: { l_i32MQTTID = (HIDO_INT32)_pArg; l_pstSendQueue = MQTT_GetSendQueue(l_i32MQTTID); if (HIDO_NULL == l_pstSendQueue) { break; } if (EG800MQTT_IsInflightFull(l_i32MQTTID) == HIDO_TRUE) { break; } l_pstMember = HIDO_VLQGetDequeueMember(l_pstSendQueue); if (HIDO_NULL == l_pstMember) { break; } l_pstMessage = (ST_MQTTMessage *)l_pstMember->m_pDataAddr; if (l_pstMessage->m_u32QOS != 0) { u32MsgID = MQTT_GetMsgID(l_i32MQTTID); EG800MQTT_InflightAdd(l_i32MQTTID, u32MsgID); } HIDO_ATLiteCmdSend(pstATDevice, AT_GENERAL_TIMEOUT_TIME, "AT+QMTPUBEX=%u,%u,%u,%u,\"%s\",%u\r\n", l_i32MQTTID, u32MsgID, l_pstMessage->m_u32QOS, l_pstMessage->m_u32Retain, l_pstMessage->m_acTopic, l_pstMessage->m_u32DataLen); break; } case HIDO_EVENT_EXIT: { l_pstMember = HIDO_NULL; HIDO_ATLiteCmdSendOver(pstATDevice); break; } case HIDO_AT_EVENT_OK: { HIDO_VLQDequeue(l_pstSendQueue, l_pstMember); if (EG800MQTT_IsInflightFull(l_i32MQTTID) == HIDO_TRUE) { HIDO_FSMStateChange(_pstFSM, HIDO_FSM_STATE(EG800IPPoll), HIDO_NULL); break; } l_pstMember = HIDO_VLQGetDequeueMember(l_pstSendQueue); if (HIDO_NULL == l_pstMember) { HIDO_FSMStateChange(_pstFSM, HIDO_FSM_STATE(EG800IPPoll), HIDO_NULL); break; } l_pstMessage = (ST_MQTTMessage *)l_pstMember->m_pDataAddr; if (l_pstMessage->m_u32QOS != 0) { u32MsgID = MQTT_GetMsgID(l_i32MQTTID); EG800MQTT_InflightAdd(l_i32MQTTID, u32MsgID); } HIDO_ATLiteCmdSend(pstATDevice, AT_GENERAL_TIMEOUT_TIME, "AT+QMTPUBEX=%u,%u,%u,%u,\"%s\",%u\r\n", l_i32MQTTID, u32MsgID, l_pstMessage->m_u32QOS, l_pstMessage->m_u32Retain, l_pstMessage->m_acTopic, l_pstMessage->m_u32DataLen); break; } case HIDO_AT_EVENT_ERROR: { MQTT_OnClosed(l_i32MQTTID); HIDO_FSMStateChange(_pstFSM, HIDO_FSM_STATE(EG800IPPoll), HIDO_NULL); break; } case EG800_EVENT_ACK: { if (HIDO_ATLiteGetDebugFlag() == HIDO_TRUE) { if (HIDO_UtilIsAsciiString(l_pstMessage->m_au8Data, l_pstMessage->m_u32DataLen) == HIDO_TRUE) { HIDO_DebugString((HIDO_CHAR *)l_pstMessage->m_au8Data, l_pstMessage->m_u32DataLen); } else { HIDO_DebugHex(l_pstMessage->m_au8Data, l_pstMessage->m_u32DataLen); } } HIDO_ATLiteDataSend(pstATDevice, 20000, (HIDO_UINT8 *)l_pstMessage->m_au8Data, l_pstMessage->m_u32DataLen); break; } case EG800_EVENT_SEND_FAIL: { MQTT_OnClosed(l_i32MQTTID); HIDO_FSMStateChange(_pstFSM, HIDO_FSM_STATE(EG800IPPoll), HIDO_NULL); break; } default: { return HIDO_EVENT_NO_PROC; } } return HIDO_EVENT_OK; } /******************************************************************************* * State Name : EG800MQTTRecv * Parent State : EG800IPReady * Description : * Author : ¶Å¼ü *******************************************************************************/ HIDO_FSM_STATE_IMPLEMENT(EG800MQTTRecv, HIDO_FSM_STATE(EG800IPReady), HIDO_NULL) { static HIDO_INT32 l_i32MQTTID = 0; static HIDO_BOOL l_bMQTTRecv = HIDO_FALSE; HIDO_ATLiteDeviceStruct *pstATDevice = (HIDO_ATLiteDeviceStruct *)_pstFSM->m_pPrivateData; switch (_u32Event) { case HIDO_EVENT_ENTRY: { l_i32MQTTID = (HIDO_INT32)_pArg; l_bMQTTRecv = HIDO_FALSE; HIDO_ATLiteCmdSend(pstATDevice, AT_GENERAL_TIMEOUT_TIME, "AT+QMTRECV=%u\r\n", l_i32MQTTID); break; } case HIDO_EVENT_EXIT: { HIDO_ATLiteCmdSendOver(pstATDevice); break; } case HIDO_AT_EVENT_OK: case HIDO_AT_EVENT_ERROR: { if (((HIDO_AT_EVENT_OK == _u32Event) && (HIDO_FALSE == l_bMQTTRecv)) || (HIDO_AT_EVENT_ERROR == _u32Event)) { MQTT_NoRecvData(l_i32MQTTID); } HIDO_FSMStateChange(_pstFSM, HIDO_FSM_STATE(EG800IPPoll), HIDO_NULL); break; } case EG800_EVENT_QMTRECV: { HIDO_UINT32 u32ClientID = 0; HIDO_UINT32 u32MsgID = 0; HIDO_UINT32 u32PayloadLen = 0; HIDO_DataStruct stTopic; HIDO_DataStruct stPayload; HIDO_VLQStruct *pstRecvQueue = HIDO_NULL; HIDO_VLQMemberStruct *pstMember = HIDO_NULL; ST_MQTTMessage *pcMessage = HIDO_NULL; HIDO_DataStruct *pstData = (HIDO_DataStruct *)_pArg; if (HIDO_UtilParseFormat((HIDO_UINT8 *)pstData->m_pData, pstData->m_u32Len, "+QMTRECV: %d,%d,\"%p\",%d,%p\r\n", &u32ClientID, &u32MsgID, &stTopic, &u32PayloadLen, &stPayload) != 5) { return HIDO_EVENT_NO_PROC; } l_bMQTTRecv = HIDO_TRUE; pstRecvQueue = MQTT_GetRecvQueue(u32ClientID); if (HIDO_NULL == pstRecvQueue) { break; } if (u32PayloadLen + 2 != stPayload.m_u32Len) { break; } pstMember = HIDO_VLQGetEnqueueMember(pstRecvQueue, sizeof(ST_MQTTMessage) + u32PayloadLen); if (pstMember != HIDO_NULL) { pcMessage = (ST_MQTTMessage *)pstMember->m_pDataAddr; if (stTopic.m_u32Len < sizeof(pcMessage->m_acTopic)) { memcpy(pcMessage->m_acTopic, stTopic.m_pData, stTopic.m_u32Len); pcMessage->m_acTopic[stTopic.m_u32Len] = '\0'; } pcMessage->m_u32DataLen = u32PayloadLen; memcpy(pcMessage->m_au8Data, ((HIDO_CHAR *)stPayload.m_pData) + 1, stPayload.m_u32Len - 2); HIDO_VLQEnqueue(pstRecvQueue, pstMember); MQTT_OnRecv(u32ClientID); } else { HIDO_Debug("MQTT[%u] Recv Buffer Full\r\n", u32ClientID); } break; } default: { return HIDO_EVENT_NO_PROC; } } return HIDO_EVENT_OK; } /******************************************************************************* * Function Name : EG800MQTT_SendAck * Description : * Input : * Output : * Return : *******************************************************************************/ HIDO_INT32 EG800MQTT_SendAck(HIDO_DataStruct *_pstData) { HIDO_UINT32 u32ClientID = 0; HIDO_UINT32 u32MsgID = 0; HIDO_UINT32 u32Result = 0; if (HIDO_UtilParseFormat((HIDO_UINT8 *)_pstData->m_pData, _pstData->m_u32Len, "+QMTPUBEX: %d,%d,%d\r\n", &u32ClientID, &u32MsgID, &u32Result) != 3) { return HIDO_ERR; } EG800MQTT_InflightRemove(u32ClientID, u32MsgID); return HIDO_OK; } /******************************************************************************* * Function Name : EG800MQTT_Poll * Description : * Input : * Output : * Return : *******************************************************************************/ HIDO_INT32 EG800MQTT_Poll(HIDO_ATLiteDeviceStruct *_pstATDevice) { HIDO_INT32 i32MQTTIndex = 0; HIDO_VLQStruct *pstSendQueue = HIDO_NULL; HIDO_UINT32 u32CurTick = HIDO_TimerGetTick(); for (i32MQTTIndex = 0; i32MQTTIndex < MQTT_NUM; i32MQTTIndex++) { if (MQTT_STATE_CONNECT == MQTT_GetMQTTState(i32MQTTIndex)) { EG800MQTT_InflightClean(i32MQTTIndex); MQTT_SetMQTTState(i32MQTTIndex, MQTT_STATE_CLOSE_BEFORE_CONNECT); HIDO_FSMStateChange(_pstATDevice->m_pstFSM, HIDO_FSM_STATE(EG800MQTTClose), (HIDO_VOID *)(HIDO_UINT32)i32MQTTIndex); return HIDO_OK; } else if (MQTT_STATE_CLOSE_BEFORE_CONNECT == MQTT_GetMQTTState(i32MQTTIndex)) { HIDO_FSMStateChange(_pstATDevice->m_pstFSM, HIDO_FSM_STATE(EG800MQTTSetup), (HIDO_VOID *)(HIDO_UINT32)i32MQTTIndex); return HIDO_OK; } else if (MQTT_STATE_CLOSE == MQTT_GetMQTTState(i32MQTTIndex)) { HIDO_FSMStateChange(_pstATDevice->m_pstFSM, HIDO_FSM_STATE(EG800MQTTClose), (HIDO_VOID *)(HIDO_UINT32)i32MQTTIndex); return HIDO_OK; } if (HIDO_TRUE == MQTT_HasRecvData(i32MQTTIndex)) { HIDO_FSMStateChange(_pstATDevice->m_pstFSM, HIDO_FSM_STATE(EG800MQTTRecv), (HIDO_VOID *)(HIDO_UINT32)i32MQTTIndex); return HIDO_OK; } if (MQTT_STATE_CONNECTED == MQTT_GetMQTTState(i32MQTTIndex)) { pstSendQueue = MQTT_GetSendQueue(i32MQTTIndex); if (pstSendQueue != HIDO_NULL) { if (EG800MQTT_IsInflightFull(i32MQTTIndex) == HIDO_FALSE) { if (HIDO_VLQGetDequeueMember(pstSendQueue) != HIDO_NULL) { HIDO_FSMStateChange(_pstATDevice->m_pstFSM, HIDO_FSM_STATE(EG800MQTTPublish), (HIDO_VOID *)(HIDO_UINT32)i32MQTTIndex); return HIDO_OK; } } } } EG800MQTT_InflightCleanTimeout(i32MQTTIndex); } return HIDO_ERR; }