pressure_sensor_system/SerialTool/TestTool.py

466 lines
18 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import sys
import serial
import serial.tools.list_ports
import threading
import time
import datetime
from queue import Queue
import tkinter as tk
from tkinter import ttk, scrolledtext, messagebox
import traceback
from dataclasses import dataclass
import json
import os
from typing import Optional, List
@dataclass
class SerialConfig:
port: str = "COM1"
baudrate: int = 9600
bytesize: int = 8
parity: str = 'N'
stopbits: float = 1.0
timeout: float = 1.0
class SerialTestTool:
def __init__(self, root):
self.root = root
self.root.title("485串口测试工具 - 带详细时间戳")
self.root.geometry("900x700")
# 串口对象和线程控制
self.serial_port: Optional[serial.Serial] = None
self.receive_thread: Optional[threading.Thread] = None
self.send_thread: Optional[threading.Thread] = None
self.running = False
self.receive_queue = Queue()
self.send_queue = Queue()
# 数据统计
self.receive_count = 0
self.send_count = 0
self.error_count = 0
# 配置
self.config = SerialConfig()
self.send_data = bytes([0x03, 0x03, 0x00, 0x99])
self.send_interval = 100 # ms
# 时间戳记录
self.last_send_time = 0
self.last_receive_time = 0
self.setup_ui()
self.refresh_ports()
def setup_ui(self):
# 主框架布局
main_frame = ttk.Frame(self.root, padding="10")
main_frame.grid(row=0, column=0, sticky=(tk.W, tk.E, tk.N, tk.S))
# 配置串口参数
config_frame = ttk.LabelFrame(main_frame, text="串口配置", padding="10")
config_frame.grid(row=0, column=0, columnspan=3, sticky=(tk.W, tk.E), pady=(0, 10))
# 串口号选择
ttk.Label(config_frame, text="串口号:").grid(row=0, column=0, padx=5)
self.port_combo = ttk.Combobox(config_frame, width=15, state="readonly")
self.port_combo.grid(row=0, column=1, padx=5)
ttk.Button(config_frame, text="刷新", command=self.refresh_ports, width=8).grid(row=0, column=2, padx=5)
# 波特率选择
ttk.Label(config_frame, text="波特率:").grid(row=0, column=3, padx=5)
baudrates = ["9600", "19200", "38400", "57600", "115200", "256000", "460800", "921600", "自定义"]
self.baudrate_combo = ttk.Combobox(config_frame, values=baudrates, width=12, state="readonly")
self.baudrate_combo.set("9600")
self.baudrate_combo.grid(row=0, column=4, padx=5)
self.baudrate_combo.bind("<<ComboboxSelected>>", self.on_baudrate_select)
# 自定义波特率输入
self.custom_baud_var = tk.StringVar()
self.custom_baud_entry = ttk.Entry(config_frame, textvariable=self.custom_baud_var, width=10, state="disabled")
self.custom_baud_entry.grid(row=0, column=5, padx=5)
# 其他参数
ttk.Label(config_frame, text="数据位:").grid(row=1, column=0, padx=5, pady=5)
self.databits_combo = ttk.Combobox(config_frame, values=["5", "6", "7", "8"], width=8, state="readonly")
self.databits_combo.set("8")
self.databits_combo.grid(row=1, column=1, padx=5, pady=5)
ttk.Label(config_frame, text="校验位:").grid(row=1, column=2, padx=5, pady=5)
self.parity_combo = ttk.Combobox(config_frame, values=["", "奇校验", "偶校验"], width=10, state="readonly")
self.parity_combo.set("")
self.parity_combo.grid(row=1, column=3, padx=5, pady=5)
ttk.Label(config_frame, text="停止位:").grid(row=1, column=4, padx=5, pady=5)
self.stopbits_combo = ttk.Combobox(config_frame, values=["1", "1.5", "2"], width=8, state="readonly")
self.stopbits_combo.set("1")
self.stopbits_combo.grid(row=1, column=5, padx=5, pady=5)
# 发送配置
send_frame = ttk.LabelFrame(main_frame, text="发送配置", padding="10")
send_frame.grid(row=1, column=0, columnspan=3, sticky=(tk.W, tk.E), pady=(0, 10))
ttk.Label(send_frame, text="发送数据 (HEX):").grid(row=0, column=0, padx=5)
self.send_data_var = tk.StringVar(value="03 03 00 99")
self.send_data_entry = ttk.Entry(send_frame, textvariable=self.send_data_var, width=30)
self.send_data_entry.grid(row=0, column=1, padx=5)
ttk.Button(send_frame, text="解析", command=self.parse_send_data, width=8).grid(row=0, column=2, padx=5)
ttk.Label(send_frame, text="发送间隔 (ms):").grid(row=0, column=3, padx=5)
self.interval_var = tk.StringVar(value="100")
self.interval_entry = ttk.Entry(send_frame, textvariable=self.interval_var, width=10)
self.interval_entry.grid(row=0, column=4, padx=5)
# 控制按钮
control_frame = ttk.Frame(main_frame)
control_frame.grid(row=2, column=0, columnspan=3, pady=(0, 10))
self.open_btn = ttk.Button(control_frame, text="打开串口", command=self.open_serial, width=12)
self.open_btn.grid(row=0, column=0, padx=5)
self.close_btn = ttk.Button(control_frame, text="关闭串口", command=self.close_serial, width=12, state="disabled")
self.close_btn.grid(row=0, column=1, padx=5)
self.clear_btn = ttk.Button(control_frame, text="清空显示", command=self.clear_display, width=12)
self.clear_btn.grid(row=0, column=2, padx=5)
self.save_btn = ttk.Button(control_frame, text="保存数据", command=self.save_data, width=12)
self.save_btn.grid(row=0, column=3, padx=5)
# 数据显示区域
display_area_frame = ttk.LabelFrame(main_frame, text="通信日志", padding="10")
display_area_frame.grid(row=3, column=0, columnspan=3, sticky=(tk.W, tk.E, tk.N, tk.S))
# 配置网格权重
main_frame.columnconfigure(0, weight=1)
main_frame.rowconfigure(3, weight=1)
display_area_frame.columnconfigure(0, weight=1)
display_area_frame.rowconfigure(0, weight=1)
# 创建带滚动条的文本框
self.text_display = scrolledtext.ScrolledText(
display_area_frame,
width=80,
height=20,
font=("Consolas", 10)
)
self.text_display.grid(row=0, column=0, sticky=(tk.W, tk.E, tk.N, tk.S))
# 状态栏
self.status_var = tk.StringVar(value="就绪")
status_bar = ttk.Label(self.root, textvariable=self.status_var, relief=tk.SUNKEN, anchor=tk.W)
status_bar.grid(row=1, column=0, sticky=(tk.W, tk.E))
def refresh_ports(self):
"""刷新可用串口列表"""
ports = serial.tools.list_ports.comports()
port_list = [port.device for port in ports]
if not port_list:
port_list = ["无可用串口"]
self.port_combo['values'] = port_list
if port_list and port_list[0] != "无可用串口":
self.port_combo.set(port_list[0])
def on_baudrate_select(self, event):
"""处理波特率选择"""
selected = self.baudrate_combo.get()
if selected == "自定义":
self.custom_baud_entry.config(state="normal")
self.custom_baud_entry.focus()
else:
self.custom_baud_entry.config(state="disabled")
self.custom_baud_var.set("")
def parse_send_data(self):
"""解析发送数据"""
try:
data_str = self.send_data_var.get().strip()
if not data_str:
return
# 去除空格,处理各种格式
data_str = data_str.replace("0x", "").replace("\\x", "").replace(",", " ").replace(";", " ")
hex_bytes = bytes.fromhex(data_str)
self.send_data = hex_bytes
self.status_var.set(f"发送数据已更新: {hex_bytes.hex(' ', 1).upper()}")
except ValueError as e:
messagebox.showerror("错误", f"数据格式错误: {e}\n请使用HEX格式如: 01 02 AB CD")
def open_serial(self):
"""打开串口"""
try:
# 获取配置参数
port = self.port_combo.get()
if not port or port == "无可用串口":
messagebox.showwarning("警告", "请选择有效的串口号")
return
# 获取波特率
baudrate_str = self.baudrate_combo.get()
if baudrate_str == "自定义":
baudrate = self.custom_baud_var.get()
if not baudrate.isdigit():
messagebox.showwarning("警告", "请输入有效的波特率")
return
baudrate = int(baudrate)
else:
baudrate = int(baudrate_str)
# 获取其他参数
bytesize = int(self.databits_combo.get())
parity_map = {"": 'N', "奇校验": 'O', "偶校验": 'E'}
parity = parity_map[self.parity_combo.get()]
stopbits = float(self.stopbits_combo.get())
# 解析发送数据
self.parse_send_data()
# 获取发送间隔
try:
self.send_interval = int(self.interval_var.get())
if self.send_interval < 10:
self.send_interval = 10
self.interval_var.set("10")
except:
self.send_interval = 100
# 创建串口对象
self.serial_port = serial.Serial(
port=port,
baudrate=baudrate,
bytesize=bytesize,
parity=parity,
stopbits=stopbits,
timeout=1
)
if not self.serial_port.is_open:
self.serial_port.open()
# 启动线程
self.running = True
self.receive_thread = threading.Thread(target=self.receive_data, daemon=True)
self.send_thread = threading.Thread(target=self.send_data_thread, daemon=True)
self.receive_thread.start()
self.send_thread.start()
# 更新UI状态
self.open_btn.config(state="disabled")
self.close_btn.config(state="normal")
self.status_var.set(f"串口已打开: {port} @ {baudrate}bps")
# 显示连接信息
self.append_display(f"=== 串口连接成功 ===", "info")
self.append_display(f"端口: {port}", "info")
self.append_display(f"波特率: {baudrate}", "info")
self.append_display(f"发送间隔: {self.send_interval}ms", "info")
self.append_display(f"发送数据: {self.send_data.hex(' ', 1).upper()}", "info")
self.append_display("=" * 40, "info")
except Exception as e:
messagebox.showerror("错误", f"打开串口失败: {str(e)}")
if self.serial_port:
self.serial_port.close()
self.serial_port = None
def close_serial(self):
"""关闭串口"""
self.running = False
if self.serial_port and self.serial_port.is_open:
self.serial_port.close()
# 等待线程结束
if self.receive_thread:
self.receive_thread.join(timeout=2)
if self.send_thread:
self.send_thread.join(timeout=2)
# 更新UI状态
self.open_btn.config(state="normal")
self.close_btn.config(state="disabled")
self.status_var.set("串口已关闭")
self.append_display("=== 串口已关闭 ===", "info")
def receive_data(self):
"""接收数据线程"""
buffer = bytearray()
last_byte_time = time.time()
while self.running and self.serial_port and self.serial_port.is_open:
try:
# 读取可用数据
if self.serial_port.in_waiting > 0:
data = self.serial_port.read(self.serial_port.in_waiting)
if data:
buffer.extend(data)
self.receive_count += len(data)
last_byte_time = time.time()
# 显示接收到的数据(带时间戳)
current_time = time.time()
self.root.after(0, self.display_received_data, bytes(buffer), current_time)
buffer.clear()
time.sleep(0.001) # 短时间休眠
except Exception as e:
self.error_count += 1
error_time = time.time()
self.root.after(0, self.append_display, f"[{self.format_timestamp(error_time)}] 接收错误: {str(e)}", "error")
time.sleep(0.1)
def send_data_thread(self):
"""发送数据线程"""
while self.running:
try:
if self.serial_port and self.serial_port.is_open:
# 记录发送时间
send_time = time.time()
self.last_send_time = send_time
# 发送数据
self.serial_port.write(self.send_data)
self.send_count += 1
# 显示发送信息
self.root.after(0, self.display_sent_data, send_time)
# 等待发送间隔
time.sleep(self.send_interval / 1000.0)
except Exception as e:
self.error_count += 1
error_time = time.time()
self.root.after(0, self.append_display, f"[{self.format_timestamp(error_time)}] 发送错误: {str(e)}", "error")
time.sleep(0.1)
def format_timestamp(self, timestamp):
"""格式化时间戳为毫秒级字符串"""
dt = datetime.datetime.fromtimestamp(timestamp)
return dt.strftime("%H:%M:%S.%f")[:-3]
def display_sent_data(self, send_time):
"""显示发送的数据"""
timestamp = self.format_timestamp(send_time)
hex_str = self.send_data.hex(' ', 1).upper()
display_text = f"[{timestamp}] 发送: {hex_str}"
self.append_display(display_text, "send")
def display_received_data(self, data: bytes, receive_time):
"""显示接收到的数据"""
if not data:
return
timestamp = self.format_timestamp(receive_time)
# 计算距离上次发送的时间差(如果知道上次发送时间)
time_diff = ""
if self.last_send_time > 0:
delay_ms = (receive_time - self.last_send_time) * 1000
time_diff = f" [延迟: {delay_ms:.1f}ms]"
# 显示数据
hex_str = data.hex(' ', 1).upper()
if len(data) > 16:
# 对长数据进行格式化显示
hex_lines = []
for i in range(0, len(data), 16):
chunk = data[i:i+16]
hex_lines.append(chunk.hex(' ', 1).upper())
hex_str = "\n ".join(hex_lines)
display_text = f"[{timestamp}] 接收: {hex_str}{time_diff}"
self.append_display(display_text, "receive")
# 尝试显示ASCII
try:
ascii_str = ""
for byte in data:
if 32 <= byte <= 126: # 可打印字符
ascii_str += chr(byte)
else:
ascii_str += "."
if ascii_str:
ascii_text = f"[{timestamp}] ASCII: {ascii_str}"
self.append_display(ascii_text, "receive")
except:
pass
def append_display(self, text: str, msg_type: str = "normal"):
"""向文本框添加文本"""
if not self.text_display:
return
# 根据消息类型设置颜色
color_map = {
"error": "red",
"info": "blue",
"send": "darkgreen",
"receive": "purple",
"normal": "black"
}
color = color_map.get(msg_type, "black")
# 插入文本
self.text_display.insert(tk.END, text + "\n")
# 应用颜色标签
start_index = self.text_display.index("end-2c linestart")
end_index = self.text_display.index("end-1c")
self.text_display.tag_add(color, start_index, end_index)
self.text_display.tag_config(color, foreground=color)
# 自动滚屏
self.text_display.see(tk.END)
def clear_display(self):
"""清空显示区域"""
if self.text_display:
self.text_display.delete(1.0, tk.END)
def save_data(self):
"""保存数据到文件"""
try:
# 获取当前时间作为文件名
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"serial_data_{timestamp}.txt"
# 获取文本框内容
content = self.text_display.get(1.0, tk.END)
# 写入文件
with open(filename, 'w', encoding='utf-8') as f:
f.write("=== 485串口测试数据 ===\n")
f.write(f"保存时间: {datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n")
f.write(f"端口: {self.config.port if self.serial_port else '未连接'}\n")
f.write("=" * 40 + "\n\n")
f.write(content)
self.status_var.set(f"数据已保存到: {filename}")
messagebox.showinfo("保存成功", f"数据已保存到文件:\n{filename}")
except Exception as e:
messagebox.showerror("保存失败", f"保存数据时出错: {str(e)}")
def main():
"""主函数"""
root = tk.Tk()
app = SerialTestTool(root)
# 设置窗口关闭时的处理
def on_closing():
if app.running:
app.close_serial()
root.destroy()
root.protocol("WM_DELETE_WINDOW", on_closing)
root.mainloop()
if __name__ == "__main__":
main()