更新全双工框架

This commit is contained in:
kkkjtr 2025-08-12 16:58:01 +08:00
parent 7709d118d2
commit 9e65e4a905
5 changed files with 193 additions and 83 deletions

View File

@ -1,5 +1,3 @@
#ifndef __JT808_PKG_PARSE__ #ifndef __JT808_PKG_PARSE__
#define __JT808_PKG_PARSE__ #define __JT808_PKG_PARSE__
#include "jt808_protocol.h" #include "jt808_protocol.h"

View File

@ -19,8 +19,24 @@ typedef struct{
#define PKG_SEND(__buf, __len) 0 #define PKG_SEND(__buf, __len) 0
#endif #endif
#define MAX_PENDING_REQUESTS 5 // 最大并发等待请求数
extern PrsResult_t PrsResult; extern PrsResult_t PrsResult;
// 等待请求结构体
typedef struct {
uint16_t msg_id; // 发送的消息ID
uint16_t flow_num; // 消息流水号
uint32_t timeout; // 等待超时时间(毫秒)
uint32_t start_tick; // 开始等待的时间戳
uint8_t is_waiting; // 是否在等待中1=是0=否)
uint8_t response_received; // 是否已收到应答1=是0=否)
} PendingRequest_t;
// 全局变量
static PendingRequest_t pending_requests[MAX_PENDING_REQUESTS]; // 等待请求数组
// 触发消息发送 // 触发消息发送
int jt808_pkg_send(MessageID_t Msg_ID, uint32_t timeout); int jt808_pkg_send(MessageID_t Msg_ID, uint32_t timeout);

View File

@ -34,6 +34,7 @@ typedef enum {
ID_Delete_Polygon_area = 0x8605, // 删除多边形区域 ID_Delete_Polygon_area = 0x8605, // 删除多边形区域
ID_Data_Down = 0x8900, // 数据透传下行 ID_Data_Down = 0x8900, // 数据透传下行
ID_Data_Up = 0x0900, // 数据透传上行 ID_Data_Up = 0x0900, // 数据透传上行
//ID_Req_Fence_update =
}MessageID_t; }MessageID_t;
#pragma pack(1) #pragma pack(1)

View File

@ -30,12 +30,13 @@ static bool is_valid_url(const char *url) {
static int jt808_BodyParse(void *Prsmsg_body, PrsResult_t *Result){ static int jt808_BodyParse(void *Prsmsg_body, PrsResult_t *Result){
switch (Result->msg_head.msg_id){ switch (Result->msg_head.msg_id){
case ID_Plat_GenResp:{// 平台通用应答 case ID_Plat_GenResp:{// 平台通用应答
memcpy(&(Result->Rsp_flow_num), Prsmsg_body, sizeof(Plat_GenResp_t)); Plat_GenResp_t resp_body;
//从消息体复制数据
memcpy(&resp_body, Prsmsg_body, sizeof(Plat_GenResp_t));
// 转小端 // 转小端
Result->Rsp_flow_num = Swap16(Result->msg_head.msg_flow_num); Result->Rsp_flow_num = Swap16(resp_body.msg_flow_num); // 应答流水号
Result->Rsp_msg_id = Swap16(Result->msg_head.msg_id); Result->Rsp_msg_id = Swap16(resp_body.msg_id_ack); // 应答ID
// Result->Rsp_result = Result->Rsp_result; Result->Rsp_result = resp_body.result; // 结果
break;
} }
case ID_FillPktReq:{// 补传分包请求 case ID_FillPktReq:{// 补传分包请求
@ -157,6 +158,8 @@ static int jt808_BodyParse(void *Prsmsg_body, PrsResult_t *Result){
const char *url = Result->term_param_item->big_upgrade_info.file_path; const char *url = Result->term_param_item->big_upgrade_info.file_path;
if(!is_valid_url(url)) { if(!is_valid_url(url)) {
JT808_DEBUG("Invalid URL: %s\n", url); JT808_DEBUG("Invalid URL: %s\n", url);
jt808_free(Result->term_param_item->big_upgrade_info.file_path);
Result->term_param_item->big_upgrade_info.file_path = NULL;
break; // 不继续升级 break; // 不继续升级
} }
int ret = 0; int ret = 0;

View File

@ -1,106 +1,198 @@
#include "jt808_pkg_transmit.h" #include "jt808_pkg_transmit.h"
static osThreadId_t jt808_pkg_send_ThreadId; static osThreadId_t jt808_pkg_send_ThreadId;
static osThreadId_t jt808_timeout_monitor_ThreadId;
static osMessageQueueId_t jt808_send_msg_queue = NULL; static osMessageQueueId_t jt808_send_msg_queue = NULL;
static osSemaphoreId_t jt808_send_ack_sem = NULL; // 发送成功应答信号量 static osMutexId_t pending_mutex = NULL;
static osSemaphoreId_t jt808_parse_ok_sem = NULL; // 解析完成信号量
// 触发消息发送 // 超时监控任务 - 后台处理超时
static void jt808_timeout_monitor_task(void *arg) {
while (1) {
osDelay(50); // 每50ms检查一次
uint32_t current_tick = osKernelGetTickCount();
osMutexAcquire(pending_mutex, osWaitForever);
for (int i = 0; i < MAX_PENDING_REQUESTS; i++) {
if (pending_requests[i].is_waiting) {
PendingRequest_t *req = &pending_requests[i];
uint32_t elapsed = current_tick - req->start_tick;
if (req->response_received) {
JT808_DEBUG("Response OK: ID=0x%04X, Flow=%d\n",
req->msg_id, req->flow_num);
req->is_waiting = 0; // 标记槽位为空闲
}
else if (elapsed > req->timeout) {
JT808_DEBUG("Response Timeout: ID=0x%04X, Flow=%d\n",
req->msg_id, req->flow_num);
req->is_waiting = 0; // 标记槽位为空闲
// 这里可以添加超时处理逻辑,如重发
// 如果需要重发,调用 jt808_pkg_send(req->msg_id, req->timeout);
}
}
}
osMutexRelease(pending_mutex);
}
}
// 触发消息发送修改版
int jt808_pkg_send(MessageID_t Msg_ID, uint32_t timeout){ int jt808_pkg_send(MessageID_t Msg_ID, uint32_t timeout){
pkg_msg_t send_pkg_msg={0}; pkg_msg_t send_pkg_msg={0};
send_pkg_msg.msg_id = Msg_ID; send_pkg_msg.msg_id = Msg_ID;
send_pkg_msg.timeout = timeout; send_pkg_msg.timeout = timeout;
// JT808_DEBUG("send pkg_msg:%04x\n", Msg_ID); // JT808_DEBUG("send pkg_msg:%04x\n", Msg_ID);
if(osOK == osMessageQueuePut(jt808_send_msg_queue, &send_pkg_msg, 0, 0)){ // if(osOK == osMessageQueuePut(jt808_send_msg_queue, &send_pkg_msg, 0, 0)){ //将数据送入发送队列
if(0 < timeout){ // 阻塞等待发送回应
osStatus_t ret =osSemaphoreAcquire(jt808_send_ack_sem, timeout);
if(ret == osOK){
return PrsResult.Rsp_result; // 发送成功应答
JT808_DEBUG("send ack success\n");
}else if(ret == osErrorTimeout){
JT808_DEBUG("send ack timeout\n");
return -1;// 发送失败
}
}
}else{
JT808_DEBUG("osMessageQueuePut fail\n");
return -1;// 发送失败
}
return 0; return 0;
}
// 接收处理 (放入TCP接收回调中) }
void jt808_pkg_handle(uint8_t *receive_buf, uint16_t receive_len){ else
if(0 ==jt808_msg_parse(receive_buf, receive_len, &PrsResult)){ // 解析协议数据 {
osSemaphoreRelease(jt808_parse_ok_sem); // 释放解析完成信号量 return -1;
// JT808_DEBUG("jt808_msg_parse success\n");
} }
} }
// 发送任务
static void jt808_pkg_send_task(void *arg){
pkg_msg_t send_pkg_msg={0};
JT808MsgESC_t MsgESC={
.buf = NULL,
.len = 0,
};
while(1){ // 接收处理 (放入TCP接收回调中)修改版
if(osOK == osMessageQueueGet(jt808_send_msg_queue, &send_pkg_msg, NULL, osWaitForever)){ void jt808_pkg_handle(uint8_t *receive_buf, uint16_t receive_len) {
if(0 == jt808_msg_pkg(send_pkg_msg.msg_id ,&MsgESC)){ // 生成协议数据成功 // 1. 解析协议数据
// 发送数据 if (0 == jt808_msg_parse(receive_buf, receive_len, &PrsResult)) {
if(NULL == MsgESC.buf){ // 协议数据为空(发生错误) // 2. 提取接收到的消息ID和流水号 这地方根据消息id分两种情况 一种是平台应答 另一种是平台指令 平台应答需要判断结构体里的id和流水号
JT808_DEBUG("MsgESC.buf is NULL:len=%d\n",MsgESC.len); if (PrsResult.msg_head.msg_id == ID_Plat_GenResp ||PrsResult.msg_head.msg_id == ID_Term_RegResp)
continue; {
uint16_t recv_msg_id = PrsResult.Rsp_msg_id;
uint16_t recv_flow_num = PrsResult.Rsp_flow_num;
JT808_DEBUG("Recv Msg: ID=0x%04X, Flow=%d\n", recv_msg_id, recv_flow_num);
// 3. 在等待队列中查找匹配项
int matched = -1; // 匹配的请求索引
osMutexAcquire(pending_mutex, osWaitForever); // 获取互斥锁
// 遍历所有等待中的请求
for (int i = 0; i < MAX_PENDING_REQUESTS; i++) {
if ((1 == pending_requests[i].is_waiting)&&
recv_msg_id == pending_requests[i].msg_id && recv_flow_num == pending_requests[i].flow_num) // 消息ID匹配且流水号匹配
{
pending_requests[i].response_received = 1;
JT808_DEBUG("Matched response for Flow=%d\n", recv_flow_num);
break;
} }
// JT808_DEBUG("tcp send %d bytes\n", MsgESC.len); }
if(0 == PKG_SEND(MsgESC.buf, MsgESC.len)){ // 发送成功处理 osMutexRelease(pending_mutex); // 释放互斥锁
if(NULL != jt808_parse_ok_sem){
osStatus_t ret =osSemaphoreAcquire(jt808_parse_ok_sem, send_pkg_msg.timeout);// 等待接收数据并解析完成
if(ret == osOK || 0 == send_pkg_msg.timeout){
osSemaphoreRelease(jt808_send_ack_sem); // 释放发送成功应答信号量
// JT808_DEBUG("parse success\n");
if(PrsResult.term_param_item!= NULL){ // 协议参数项不为空
PrsResult.term_param_item->msg_flow_num++; // 协议流水号+1
}else{
JT808_DEBUG("PrsResult.term_param_item is NULL\n");
} }
}else if(ret == osErrorTimeout){
JT808_DEBUG("parse timeout\n"); // 5. 如果不是等待中的应答,检查是否是命令消息
else //if ()
{
JT808_DEBUG("Recv Command: ID=0x%04X\n", PrsResult.msg_head.msg_id);
} }
}else{
JT808_DEBUG("jt808_parse_ok_sem is NULL\n"); }
}
//发送任务修改版
static void jt808_pkg_send_task(void *arg) {
pkg_msg_t send_pkg_msg = {0};
JT808MsgESC_t MsgESC = {0};
while (1) {
// 1. 从发送队列获取消息
if (osOK == osMessageQueueGet(jt808_send_msg_queue, &send_pkg_msg, NULL, osWaitForever)) {
// 2. 生成协议数据
if (0 == jt808_msg_pkg(send_pkg_msg.msg_id, &MsgESC)) {
// 3. 发送数据
if (MsgESC.buf && (0 == PKG_SEND(MsgESC.buf, MsgESC.len))) {
PendingRequest_t *req = NULL;
// 4. 如果需要等待应答timeout>0
if (send_pkg_msg.timeout > 0) {
osMutexAcquire(pending_mutex, osWaitForever); // 获取互斥锁
// 5. 查找空闲槽位
int slot = -1;
for (int i = 0; i < MAX_PENDING_REQUESTS; i++) {
if (!pending_requests[i].is_waiting) {
slot = i;
break;
} }
} }
// 6. 创建等待请求
if (slot >= 0) {
req = &pending_requests[slot];
req->msg_id = send_pkg_msg.msg_id;
req->flow_num = PrsResult.term_param_item->msg_flow_num;
req->timeout = send_pkg_msg.timeout;
req->start_tick = osKernelGetTickCount();
req->is_waiting = 1;
req->response_received = 0;
JT808_DEBUG("Waiting: ID=0x%04X, Flow=%d, MsgID=0x%04X\n",
req->msg_id, req->flow_num, req->msg_id);
} }
if(MsgESC.buf!= NULL){ // 释放发送缓存 osMutexRelease(pending_mutex); // 释放互斥锁
// JT808_DEBUG("jt808_free MsgESC.buf\n"); }
// 7. 递增全局流水号(无论是否等待应答)
if (PrsResult.term_param_item != NULL) {
PrsResult.term_param_item->msg_flow_num++;
}
}
// 8. 释放发送缓存
jt808_free(MsgESC.buf); jt808_free(MsgESC.buf);
MsgESC.buf = NULL; MsgESC.buf = NULL;
} }
//jt808_free(MsgESC.buf);
//MsgESC.buf = NULL;
} }
} }
} }
// jt808协议初始化 // jt808协议初始化
int jt808_init(void){ int jt808_init(void){
if(jt808_send_msg_queue == NULL){ if(jt808_send_msg_queue == NULL){
jt808_send_msg_queue = osMessageQueueNew(3, sizeof(pkg_msg_t), NULL); jt808_send_msg_queue = osMessageQueueNew(5, sizeof(pkg_msg_t), NULL);
} }
if(jt808_send_ack_sem == NULL){
jt808_send_ack_sem = osSemaphoreNew(1, 0, NULL); // 创建互斥锁
if(pending_mutex == NULL) {
pending_mutex = osMutexNew(NULL);
} }
if(jt808_parse_ok_sem == NULL){
jt808_parse_ok_sem = osSemaphoreNew(1, 0, NULL); // 初始化等待请求数组
for (int i = 0; i < MAX_PENDING_REQUESTS; i++) {
pending_requests[i].is_waiting = 0;
} }
// 创建发送任务
osThreadAttr_t jt808_pkg_send_task_attr = { osThreadAttr_t jt808_pkg_send_task_attr = {
.name = "jt808_pkg_send_task", .name = "jt808_pkg_send_task",
.stack_size = 4096*4, .stack_size = 4096*4,
.priority = osPriorityNormal, .priority = osPriorityNormal,
}; };
// 创建超时监控任务
osThreadAttr_t monitor_task_attr = {
.name = "jt808_monitor_task",
.stack_size = 2048,
.priority = osPriorityBelowNormal,
};
jt808_pkg_send_ThreadId = osThreadNew((osThreadFunc_t)jt808_pkg_send_task, 0, &jt808_pkg_send_task_attr); jt808_pkg_send_ThreadId = osThreadNew((osThreadFunc_t)jt808_pkg_send_task, 0, &jt808_pkg_send_task_attr);
jt808_timeout_monitor_ThreadId = osThreadNew((osThreadFunc_t)jt808_timeout_monitor_task, 0, &monitor_task_attr);
// osDelay(200); // 等待线程启动 // osDelay(200); // 等待线程启动
return 0; return 0;
} }