diff --git a/custom/jt808/inc/jt808_msg_parse.h b/custom/jt808/inc/jt808_msg_parse.h index 80b31f2..657ed89 100644 --- a/custom/jt808/inc/jt808_msg_parse.h +++ b/custom/jt808/inc/jt808_msg_parse.h @@ -1,5 +1,3 @@ - - #ifndef __JT808_PKG_PARSE__ #define __JT808_PKG_PARSE__ #include "jt808_protocol.h" diff --git a/custom/jt808/inc/jt808_pkg_transmit.h b/custom/jt808/inc/jt808_pkg_transmit.h index 0cae177..e94fa49 100644 --- a/custom/jt808/inc/jt808_pkg_transmit.h +++ b/custom/jt808/inc/jt808_pkg_transmit.h @@ -19,8 +19,24 @@ typedef struct{ #define PKG_SEND(__buf, __len) 0 #endif +#define MAX_PENDING_REQUESTS 5 // 最大并发等待请求数 + extern PrsResult_t PrsResult; +// 等待请求结构体 +typedef struct { + uint16_t msg_id; // 发送的消息ID + uint16_t flow_num; // 消息流水号 + uint32_t timeout; // 等待超时时间(毫秒) + uint32_t start_tick; // 开始等待的时间戳 + uint8_t is_waiting; // 是否在等待中(1=是,0=否) + uint8_t response_received; // 是否已收到应答(1=是,0=否) +} PendingRequest_t; + + +// 全局变量 +static PendingRequest_t pending_requests[MAX_PENDING_REQUESTS]; // 等待请求数组 + // 触发消息发送 int jt808_pkg_send(MessageID_t Msg_ID, uint32_t timeout); diff --git a/custom/jt808/inc/jt808_protocol.h b/custom/jt808/inc/jt808_protocol.h index 272668a..5071553 100644 --- a/custom/jt808/inc/jt808_protocol.h +++ b/custom/jt808/inc/jt808_protocol.h @@ -34,6 +34,7 @@ typedef enum { ID_Delete_Polygon_area = 0x8605, // 删除多边形区域 ID_Data_Down = 0x8900, // 数据透传下行 ID_Data_Up = 0x0900, // 数据透传上行 + //ID_Req_Fence_update = }MessageID_t; #pragma pack(1) diff --git a/custom/jt808/src/jt808_msg_parse.c b/custom/jt808/src/jt808_msg_parse.c index db6cf52..cbd05b1 100644 --- a/custom/jt808/src/jt808_msg_parse.c +++ b/custom/jt808/src/jt808_msg_parse.c @@ -30,12 +30,13 @@ static bool is_valid_url(const char *url) { static int jt808_BodyParse(void *Prsmsg_body, PrsResult_t *Result){ switch (Result->msg_head.msg_id){ case ID_Plat_GenResp:{// 平台通用应答 - memcpy(&(Result->Rsp_flow_num), Prsmsg_body, sizeof(Plat_GenResp_t)); + Plat_GenResp_t resp_body; + //从消息体复制数据 + memcpy(&resp_body, Prsmsg_body, sizeof(Plat_GenResp_t)); // 转小端 - Result->Rsp_flow_num = Swap16(Result->msg_head.msg_flow_num); - Result->Rsp_msg_id = Swap16(Result->msg_head.msg_id); - // Result->Rsp_result = Result->Rsp_result; - break; + Result->Rsp_flow_num = Swap16(resp_body.msg_flow_num); // 应答流水号 + Result->Rsp_msg_id = Swap16(resp_body.msg_id_ack); // 应答ID + Result->Rsp_result = resp_body.result; // 结果 } case ID_FillPktReq:{// 补传分包请求 @@ -157,6 +158,8 @@ static int jt808_BodyParse(void *Prsmsg_body, PrsResult_t *Result){ const char *url = Result->term_param_item->big_upgrade_info.file_path; if(!is_valid_url(url)) { JT808_DEBUG("Invalid URL: %s\n", url); + jt808_free(Result->term_param_item->big_upgrade_info.file_path); + Result->term_param_item->big_upgrade_info.file_path = NULL; break; // 不继续升级 } int ret = 0; diff --git a/custom/jt808/src/jt808_pkg_transmit.c b/custom/jt808/src/jt808_pkg_transmit.c index 09a2848..79c38d0 100644 --- a/custom/jt808/src/jt808_pkg_transmit.c +++ b/custom/jt808/src/jt808_pkg_transmit.c @@ -1,106 +1,198 @@ #include "jt808_pkg_transmit.h" static osThreadId_t jt808_pkg_send_ThreadId; +static osThreadId_t jt808_timeout_monitor_ThreadId; static osMessageQueueId_t jt808_send_msg_queue = NULL; -static osSemaphoreId_t jt808_send_ack_sem = NULL; // 发送成功应答信号量 -static osSemaphoreId_t jt808_parse_ok_sem = NULL; // 解析完成信号量 +static osMutexId_t pending_mutex = NULL; -// 触发消息发送 +// 超时监控任务 - 后台处理超时 +static void jt808_timeout_monitor_task(void *arg) { + while (1) { + osDelay(50); // 每50ms检查一次 + + uint32_t current_tick = osKernelGetTickCount(); + osMutexAcquire(pending_mutex, osWaitForever); + + for (int i = 0; i < MAX_PENDING_REQUESTS; i++) { + if (pending_requests[i].is_waiting) { + PendingRequest_t *req = &pending_requests[i]; + uint32_t elapsed = current_tick - req->start_tick; + + if (req->response_received) { + JT808_DEBUG("Response OK: ID=0x%04X, Flow=%d\n", + req->msg_id, req->flow_num); + req->is_waiting = 0; // 标记槽位为空闲 + } + else if (elapsed > req->timeout) { + JT808_DEBUG("Response Timeout: ID=0x%04X, Flow=%d\n", + req->msg_id, req->flow_num); + req->is_waiting = 0; // 标记槽位为空闲 + + // 这里可以添加超时处理逻辑,如重发 + // 如果需要重发,调用 jt808_pkg_send(req->msg_id, req->timeout); + } + } + } + + osMutexRelease(pending_mutex); + } +} + + + + +// 触发消息发送修改版 int jt808_pkg_send(MessageID_t Msg_ID, uint32_t timeout){ pkg_msg_t send_pkg_msg={0}; send_pkg_msg.msg_id = Msg_ID; send_pkg_msg.timeout = timeout; // JT808_DEBUG("send pkg_msg:%04x\n", Msg_ID); - if(osOK == osMessageQueuePut(jt808_send_msg_queue, &send_pkg_msg, 0, 0)){ // - if(0 < timeout){ // 阻塞等待发送回应 - - osStatus_t ret =osSemaphoreAcquire(jt808_send_ack_sem, timeout); - if(ret == osOK){ - return PrsResult.Rsp_result; // 发送成功应答 - JT808_DEBUG("send ack success\n"); - }else if(ret == osErrorTimeout){ - JT808_DEBUG("send ack timeout\n"); - return -1;// 发送失败 - } - } - }else{ - JT808_DEBUG("osMessageQueuePut fail\n"); - return -1;// 发送失败 + if(osOK == osMessageQueuePut(jt808_send_msg_queue, &send_pkg_msg, 0, 0)){ //将数据送入发送队列 + return 0; + } - return 0; + else + { + return -1; + } } -// 接收处理 (放入TCP接收回调中) -void jt808_pkg_handle(uint8_t *receive_buf, uint16_t receive_len){ - if(0 ==jt808_msg_parse(receive_buf, receive_len, &PrsResult)){ // 解析协议数据 - osSemaphoreRelease(jt808_parse_ok_sem); // 释放解析完成信号量 - // JT808_DEBUG("jt808_msg_parse success\n"); - } -} -// 发送任务 -static void jt808_pkg_send_task(void *arg){ - pkg_msg_t send_pkg_msg={0}; - JT808MsgESC_t MsgESC={ - .buf = NULL, - .len = 0, - }; - - while(1){ - if(osOK == osMessageQueueGet(jt808_send_msg_queue, &send_pkg_msg, NULL, osWaitForever)){ - if(0 == jt808_msg_pkg(send_pkg_msg.msg_id ,&MsgESC)){ // 生成协议数据成功 - // 发送数据 - if(NULL == MsgESC.buf){ // 协议数据为空(发生错误) - JT808_DEBUG("MsgESC.buf is NULL:len=%d\n",MsgESC.len); - continue; - } - // JT808_DEBUG("tcp send %d bytes\n", MsgESC.len); - if(0 == PKG_SEND(MsgESC.buf, MsgESC.len)){ // 发送成功处理 - if(NULL != jt808_parse_ok_sem){ - osStatus_t ret =osSemaphoreAcquire(jt808_parse_ok_sem, send_pkg_msg.timeout);// 等待接收数据并解析完成 - if(ret == osOK || 0 == send_pkg_msg.timeout){ - osSemaphoreRelease(jt808_send_ack_sem); // 释放发送成功应答信号量 - // JT808_DEBUG("parse success\n"); - if(PrsResult.term_param_item!= NULL){ // 协议参数项不为空 - PrsResult.term_param_item->msg_flow_num++; // 协议流水号+1 - }else{ - JT808_DEBUG("PrsResult.term_param_item is NULL\n"); - } - - }else if(ret == osErrorTimeout){ - JT808_DEBUG("parse timeout\n"); - } - }else{ - JT808_DEBUG("jt808_parse_ok_sem is NULL\n"); - } - } - } - if(MsgESC.buf!= NULL){ // 释放发送缓存 - // JT808_DEBUG("jt808_free MsgESC.buf\n"); - jt808_free(MsgESC.buf); - MsgESC.buf = NULL; - } +// 接收处理 (放入TCP接收回调中)修改版 +void jt808_pkg_handle(uint8_t *receive_buf, uint16_t receive_len) { + // 1. 解析协议数据 + if (0 == jt808_msg_parse(receive_buf, receive_len, &PrsResult)) { + // 2. 提取接收到的消息ID和流水号 这地方根据消息id分两种情况 一种是平台应答 另一种是平台指令 平台应答需要判断结构体里的id和流水号 + if (PrsResult.msg_head.msg_id == ID_Plat_GenResp ||PrsResult.msg_head.msg_id == ID_Term_RegResp) + { + uint16_t recv_msg_id = PrsResult.Rsp_msg_id; + uint16_t recv_flow_num = PrsResult.Rsp_flow_num; + + JT808_DEBUG("Recv Msg: ID=0x%04X, Flow=%d\n", recv_msg_id, recv_flow_num); + + // 3. 在等待队列中查找匹配项 + int matched = -1; // 匹配的请求索引 + osMutexAcquire(pending_mutex, osWaitForever); // 获取互斥锁 + + // 遍历所有等待中的请求 + for (int i = 0; i < MAX_PENDING_REQUESTS; i++) { + if ((1 == pending_requests[i].is_waiting)&& + recv_msg_id == pending_requests[i].msg_id && recv_flow_num == pending_requests[i].flow_num) // 消息ID匹配且流水号匹配 + { + pending_requests[i].response_received = 1; + JT808_DEBUG("Matched response for Flow=%d\n", recv_flow_num); + break; + } + } + osMutexRelease(pending_mutex); // 释放互斥锁 } - } + + + // 5. 如果不是等待中的应答,检查是否是命令消息 + else //if () + { + JT808_DEBUG("Recv Command: ID=0x%04X\n", PrsResult.msg_head.msg_id); + } + + } } + +//发送任务修改版 +static void jt808_pkg_send_task(void *arg) { + pkg_msg_t send_pkg_msg = {0}; + JT808MsgESC_t MsgESC = {0}; + + while (1) { + // 1. 从发送队列获取消息 + if (osOK == osMessageQueueGet(jt808_send_msg_queue, &send_pkg_msg, NULL, osWaitForever)) { + // 2. 生成协议数据 + if (0 == jt808_msg_pkg(send_pkg_msg.msg_id, &MsgESC)) { + // 3. 发送数据 + if (MsgESC.buf && (0 == PKG_SEND(MsgESC.buf, MsgESC.len))) { + PendingRequest_t *req = NULL; + + // 4. 如果需要等待应答(timeout>0) + if (send_pkg_msg.timeout > 0) { + osMutexAcquire(pending_mutex, osWaitForever); // 获取互斥锁 + + // 5. 查找空闲槽位 + int slot = -1; + for (int i = 0; i < MAX_PENDING_REQUESTS; i++) { + if (!pending_requests[i].is_waiting) { + slot = i; + break; + } + } + + // 6. 创建等待请求 + if (slot >= 0) { + req = &pending_requests[slot]; + req->msg_id = send_pkg_msg.msg_id; + req->flow_num = PrsResult.term_param_item->msg_flow_num; + req->timeout = send_pkg_msg.timeout; + req->start_tick = osKernelGetTickCount(); + req->is_waiting = 1; + req->response_received = 0; + + JT808_DEBUG("Waiting: ID=0x%04X, Flow=%d, MsgID=0x%04X\n", + req->msg_id, req->flow_num, req->msg_id); + } + osMutexRelease(pending_mutex); // 释放互斥锁 + } + + // 7. 递增全局流水号(无论是否等待应答) + if (PrsResult.term_param_item != NULL) { + PrsResult.term_param_item->msg_flow_num++; + } + + } + + // 8. 释放发送缓存 + jt808_free(MsgESC.buf); + MsgESC.buf = NULL; + } + + //jt808_free(MsgESC.buf); + //MsgESC.buf = NULL; + + } + } +} + + + // jt808协议初始化 int jt808_init(void){ if(jt808_send_msg_queue == NULL){ - jt808_send_msg_queue = osMessageQueueNew(3, sizeof(pkg_msg_t), NULL); - } - if(jt808_send_ack_sem == NULL){ - jt808_send_ack_sem = osSemaphoreNew(1, 0, NULL); - } - if(jt808_parse_ok_sem == NULL){ - jt808_parse_ok_sem = osSemaphoreNew(1, 0, NULL); + jt808_send_msg_queue = osMessageQueueNew(5, sizeof(pkg_msg_t), NULL); } + + // 创建互斥锁 + if(pending_mutex == NULL) { + pending_mutex = osMutexNew(NULL); + } + + // 初始化等待请求数组 + for (int i = 0; i < MAX_PENDING_REQUESTS; i++) { + pending_requests[i].is_waiting = 0; + } + + // 创建发送任务 osThreadAttr_t jt808_pkg_send_task_attr = { .name = "jt808_pkg_send_task", .stack_size = 4096*4, .priority = osPriorityNormal, }; + // 创建超时监控任务 + osThreadAttr_t monitor_task_attr = { + .name = "jt808_monitor_task", + .stack_size = 2048, + .priority = osPriorityBelowNormal, + }; + jt808_pkg_send_ThreadId = osThreadNew((osThreadFunc_t)jt808_pkg_send_task, 0, &jt808_pkg_send_task_attr); + jt808_timeout_monitor_ThreadId = osThreadNew((osThreadFunc_t)jt808_timeout_monitor_task, 0, &monitor_task_attr); // osDelay(200); // 等待线程启动 return 0; }