466 lines
18 KiB
Python
466 lines
18 KiB
Python
import sys
|
||
import serial
|
||
import serial.tools.list_ports
|
||
import threading
|
||
import time
|
||
import datetime
|
||
from queue import Queue
|
||
import tkinter as tk
|
||
from tkinter import ttk, scrolledtext, messagebox
|
||
import traceback
|
||
from dataclasses import dataclass
|
||
import json
|
||
import os
|
||
from typing import Optional, List
|
||
|
||
@dataclass
|
||
class SerialConfig:
|
||
port: str = "COM1"
|
||
baudrate: int = 9600
|
||
bytesize: int = 8
|
||
parity: str = 'N'
|
||
stopbits: float = 1.0
|
||
timeout: float = 1.0
|
||
|
||
class SerialTestTool:
|
||
def __init__(self, root):
|
||
self.root = root
|
||
self.root.title("485串口测试工具 - 带详细时间戳")
|
||
self.root.geometry("900x700")
|
||
|
||
# 串口对象和线程控制
|
||
self.serial_port: Optional[serial.Serial] = None
|
||
self.receive_thread: Optional[threading.Thread] = None
|
||
self.send_thread: Optional[threading.Thread] = None
|
||
self.running = False
|
||
self.receive_queue = Queue()
|
||
self.send_queue = Queue()
|
||
|
||
# 数据统计
|
||
self.receive_count = 0
|
||
self.send_count = 0
|
||
self.error_count = 0
|
||
|
||
# 配置
|
||
self.config = SerialConfig()
|
||
self.send_data = bytes([0x03, 0x03, 0x00, 0x99])
|
||
self.send_interval = 100 # ms
|
||
|
||
# 时间戳记录
|
||
self.last_send_time = 0
|
||
self.last_receive_time = 0
|
||
|
||
self.setup_ui()
|
||
self.refresh_ports()
|
||
|
||
def setup_ui(self):
|
||
# 主框架布局
|
||
main_frame = ttk.Frame(self.root, padding="10")
|
||
main_frame.grid(row=0, column=0, sticky=(tk.W, tk.E, tk.N, tk.S))
|
||
|
||
# 配置串口参数
|
||
config_frame = ttk.LabelFrame(main_frame, text="串口配置", padding="10")
|
||
config_frame.grid(row=0, column=0, columnspan=3, sticky=(tk.W, tk.E), pady=(0, 10))
|
||
|
||
# 串口号选择
|
||
ttk.Label(config_frame, text="串口号:").grid(row=0, column=0, padx=5)
|
||
self.port_combo = ttk.Combobox(config_frame, width=15, state="readonly")
|
||
self.port_combo.grid(row=0, column=1, padx=5)
|
||
ttk.Button(config_frame, text="刷新", command=self.refresh_ports, width=8).grid(row=0, column=2, padx=5)
|
||
|
||
# 波特率选择
|
||
ttk.Label(config_frame, text="波特率:").grid(row=0, column=3, padx=5)
|
||
baudrates = ["9600", "19200", "38400", "57600", "115200", "256000", "460800", "921600", "自定义"]
|
||
self.baudrate_combo = ttk.Combobox(config_frame, values=baudrates, width=12, state="readonly")
|
||
self.baudrate_combo.set("9600")
|
||
self.baudrate_combo.grid(row=0, column=4, padx=5)
|
||
self.baudrate_combo.bind("<<ComboboxSelected>>", self.on_baudrate_select)
|
||
|
||
# 自定义波特率输入
|
||
self.custom_baud_var = tk.StringVar()
|
||
self.custom_baud_entry = ttk.Entry(config_frame, textvariable=self.custom_baud_var, width=10, state="disabled")
|
||
self.custom_baud_entry.grid(row=0, column=5, padx=5)
|
||
|
||
# 其他参数
|
||
ttk.Label(config_frame, text="数据位:").grid(row=1, column=0, padx=5, pady=5)
|
||
self.databits_combo = ttk.Combobox(config_frame, values=["5", "6", "7", "8"], width=8, state="readonly")
|
||
self.databits_combo.set("8")
|
||
self.databits_combo.grid(row=1, column=1, padx=5, pady=5)
|
||
|
||
ttk.Label(config_frame, text="校验位:").grid(row=1, column=2, padx=5, pady=5)
|
||
self.parity_combo = ttk.Combobox(config_frame, values=["无", "奇校验", "偶校验"], width=10, state="readonly")
|
||
self.parity_combo.set("无")
|
||
self.parity_combo.grid(row=1, column=3, padx=5, pady=5)
|
||
|
||
ttk.Label(config_frame, text="停止位:").grid(row=1, column=4, padx=5, pady=5)
|
||
self.stopbits_combo = ttk.Combobox(config_frame, values=["1", "1.5", "2"], width=8, state="readonly")
|
||
self.stopbits_combo.set("1")
|
||
self.stopbits_combo.grid(row=1, column=5, padx=5, pady=5)
|
||
|
||
# 发送配置
|
||
send_frame = ttk.LabelFrame(main_frame, text="发送配置", padding="10")
|
||
send_frame.grid(row=1, column=0, columnspan=3, sticky=(tk.W, tk.E), pady=(0, 10))
|
||
|
||
ttk.Label(send_frame, text="发送数据 (HEX):").grid(row=0, column=0, padx=5)
|
||
self.send_data_var = tk.StringVar(value="03 03 00 99")
|
||
self.send_data_entry = ttk.Entry(send_frame, textvariable=self.send_data_var, width=30)
|
||
self.send_data_entry.grid(row=0, column=1, padx=5)
|
||
ttk.Button(send_frame, text="解析", command=self.parse_send_data, width=8).grid(row=0, column=2, padx=5)
|
||
|
||
ttk.Label(send_frame, text="发送间隔 (ms):").grid(row=0, column=3, padx=5)
|
||
self.interval_var = tk.StringVar(value="100")
|
||
self.interval_entry = ttk.Entry(send_frame, textvariable=self.interval_var, width=10)
|
||
self.interval_entry.grid(row=0, column=4, padx=5)
|
||
|
||
# 控制按钮
|
||
control_frame = ttk.Frame(main_frame)
|
||
control_frame.grid(row=2, column=0, columnspan=3, pady=(0, 10))
|
||
|
||
self.open_btn = ttk.Button(control_frame, text="打开串口", command=self.open_serial, width=12)
|
||
self.open_btn.grid(row=0, column=0, padx=5)
|
||
|
||
self.close_btn = ttk.Button(control_frame, text="关闭串口", command=self.close_serial, width=12, state="disabled")
|
||
self.close_btn.grid(row=0, column=1, padx=5)
|
||
|
||
self.clear_btn = ttk.Button(control_frame, text="清空显示", command=self.clear_display, width=12)
|
||
self.clear_btn.grid(row=0, column=2, padx=5)
|
||
|
||
self.save_btn = ttk.Button(control_frame, text="保存数据", command=self.save_data, width=12)
|
||
self.save_btn.grid(row=0, column=3, padx=5)
|
||
|
||
# 数据显示区域
|
||
display_area_frame = ttk.LabelFrame(main_frame, text="通信日志", padding="10")
|
||
display_area_frame.grid(row=3, column=0, columnspan=3, sticky=(tk.W, tk.E, tk.N, tk.S))
|
||
|
||
# 配置网格权重
|
||
main_frame.columnconfigure(0, weight=1)
|
||
main_frame.rowconfigure(3, weight=1)
|
||
display_area_frame.columnconfigure(0, weight=1)
|
||
display_area_frame.rowconfigure(0, weight=1)
|
||
|
||
# 创建带滚动条的文本框
|
||
self.text_display = scrolledtext.ScrolledText(
|
||
display_area_frame,
|
||
width=80,
|
||
height=20,
|
||
font=("Consolas", 10)
|
||
)
|
||
self.text_display.grid(row=0, column=0, sticky=(tk.W, tk.E, tk.N, tk.S))
|
||
|
||
# 状态栏
|
||
self.status_var = tk.StringVar(value="就绪")
|
||
status_bar = ttk.Label(self.root, textvariable=self.status_var, relief=tk.SUNKEN, anchor=tk.W)
|
||
status_bar.grid(row=1, column=0, sticky=(tk.W, tk.E))
|
||
|
||
def refresh_ports(self):
|
||
"""刷新可用串口列表"""
|
||
ports = serial.tools.list_ports.comports()
|
||
port_list = [port.device for port in ports]
|
||
if not port_list:
|
||
port_list = ["无可用串口"]
|
||
self.port_combo['values'] = port_list
|
||
if port_list and port_list[0] != "无可用串口":
|
||
self.port_combo.set(port_list[0])
|
||
|
||
def on_baudrate_select(self, event):
|
||
"""处理波特率选择"""
|
||
selected = self.baudrate_combo.get()
|
||
if selected == "自定义":
|
||
self.custom_baud_entry.config(state="normal")
|
||
self.custom_baud_entry.focus()
|
||
else:
|
||
self.custom_baud_entry.config(state="disabled")
|
||
self.custom_baud_var.set("")
|
||
|
||
def parse_send_data(self):
|
||
"""解析发送数据"""
|
||
try:
|
||
data_str = self.send_data_var.get().strip()
|
||
if not data_str:
|
||
return
|
||
|
||
# 去除空格,处理各种格式
|
||
data_str = data_str.replace("0x", "").replace("\\x", "").replace(",", " ").replace(";", " ")
|
||
hex_bytes = bytes.fromhex(data_str)
|
||
self.send_data = hex_bytes
|
||
self.status_var.set(f"发送数据已更新: {hex_bytes.hex(' ', 1).upper()}")
|
||
except ValueError as e:
|
||
messagebox.showerror("错误", f"数据格式错误: {e}\n请使用HEX格式,如: 01 02 AB CD")
|
||
|
||
def open_serial(self):
|
||
"""打开串口"""
|
||
try:
|
||
# 获取配置参数
|
||
port = self.port_combo.get()
|
||
if not port or port == "无可用串口":
|
||
messagebox.showwarning("警告", "请选择有效的串口号")
|
||
return
|
||
|
||
# 获取波特率
|
||
baudrate_str = self.baudrate_combo.get()
|
||
if baudrate_str == "自定义":
|
||
baudrate = self.custom_baud_var.get()
|
||
if not baudrate.isdigit():
|
||
messagebox.showwarning("警告", "请输入有效的波特率")
|
||
return
|
||
baudrate = int(baudrate)
|
||
else:
|
||
baudrate = int(baudrate_str)
|
||
|
||
# 获取其他参数
|
||
bytesize = int(self.databits_combo.get())
|
||
|
||
parity_map = {"无": 'N', "奇校验": 'O', "偶校验": 'E'}
|
||
parity = parity_map[self.parity_combo.get()]
|
||
|
||
stopbits = float(self.stopbits_combo.get())
|
||
|
||
# 解析发送数据
|
||
self.parse_send_data()
|
||
|
||
# 获取发送间隔
|
||
try:
|
||
self.send_interval = int(self.interval_var.get())
|
||
if self.send_interval < 10:
|
||
self.send_interval = 10
|
||
self.interval_var.set("10")
|
||
except:
|
||
self.send_interval = 100
|
||
|
||
# 创建串口对象
|
||
self.serial_port = serial.Serial(
|
||
port=port,
|
||
baudrate=baudrate,
|
||
bytesize=bytesize,
|
||
parity=parity,
|
||
stopbits=stopbits,
|
||
timeout=1
|
||
)
|
||
|
||
if not self.serial_port.is_open:
|
||
self.serial_port.open()
|
||
|
||
# 启动线程
|
||
self.running = True
|
||
self.receive_thread = threading.Thread(target=self.receive_data, daemon=True)
|
||
self.send_thread = threading.Thread(target=self.send_data_thread, daemon=True)
|
||
self.receive_thread.start()
|
||
self.send_thread.start()
|
||
|
||
# 更新UI状态
|
||
self.open_btn.config(state="disabled")
|
||
self.close_btn.config(state="normal")
|
||
self.status_var.set(f"串口已打开: {port} @ {baudrate}bps")
|
||
|
||
# 显示连接信息
|
||
self.append_display(f"=== 串口连接成功 ===", "info")
|
||
self.append_display(f"端口: {port}", "info")
|
||
self.append_display(f"波特率: {baudrate}", "info")
|
||
self.append_display(f"发送间隔: {self.send_interval}ms", "info")
|
||
self.append_display(f"发送数据: {self.send_data.hex(' ', 1).upper()}", "info")
|
||
self.append_display("=" * 40, "info")
|
||
|
||
except Exception as e:
|
||
messagebox.showerror("错误", f"打开串口失败: {str(e)}")
|
||
if self.serial_port:
|
||
self.serial_port.close()
|
||
self.serial_port = None
|
||
|
||
def close_serial(self):
|
||
"""关闭串口"""
|
||
self.running = False
|
||
|
||
if self.serial_port and self.serial_port.is_open:
|
||
self.serial_port.close()
|
||
|
||
# 等待线程结束
|
||
if self.receive_thread:
|
||
self.receive_thread.join(timeout=2)
|
||
if self.send_thread:
|
||
self.send_thread.join(timeout=2)
|
||
|
||
# 更新UI状态
|
||
self.open_btn.config(state="normal")
|
||
self.close_btn.config(state="disabled")
|
||
self.status_var.set("串口已关闭")
|
||
|
||
self.append_display("=== 串口已关闭 ===", "info")
|
||
|
||
def receive_data(self):
|
||
"""接收数据线程"""
|
||
buffer = bytearray()
|
||
last_byte_time = time.time()
|
||
|
||
while self.running and self.serial_port and self.serial_port.is_open:
|
||
try:
|
||
# 读取可用数据
|
||
if self.serial_port.in_waiting > 0:
|
||
data = self.serial_port.read(self.serial_port.in_waiting)
|
||
if data:
|
||
buffer.extend(data)
|
||
self.receive_count += len(data)
|
||
last_byte_time = time.time()
|
||
|
||
# 显示接收到的数据(带时间戳)
|
||
current_time = time.time()
|
||
self.root.after(0, self.display_received_data, bytes(buffer), current_time)
|
||
buffer.clear()
|
||
|
||
time.sleep(0.001) # 短时间休眠
|
||
|
||
except Exception as e:
|
||
self.error_count += 1
|
||
error_time = time.time()
|
||
self.root.after(0, self.append_display, f"[{self.format_timestamp(error_time)}] 接收错误: {str(e)}", "error")
|
||
time.sleep(0.1)
|
||
|
||
def send_data_thread(self):
|
||
"""发送数据线程"""
|
||
while self.running:
|
||
try:
|
||
if self.serial_port and self.serial_port.is_open:
|
||
# 记录发送时间
|
||
send_time = time.time()
|
||
self.last_send_time = send_time
|
||
|
||
# 发送数据
|
||
self.serial_port.write(self.send_data)
|
||
self.send_count += 1
|
||
|
||
# 显示发送信息
|
||
self.root.after(0, self.display_sent_data, send_time)
|
||
|
||
# 等待发送间隔
|
||
time.sleep(self.send_interval / 1000.0)
|
||
|
||
except Exception as e:
|
||
self.error_count += 1
|
||
error_time = time.time()
|
||
self.root.after(0, self.append_display, f"[{self.format_timestamp(error_time)}] 发送错误: {str(e)}", "error")
|
||
time.sleep(0.1)
|
||
|
||
def format_timestamp(self, timestamp):
|
||
"""格式化时间戳为毫秒级字符串"""
|
||
dt = datetime.datetime.fromtimestamp(timestamp)
|
||
return dt.strftime("%H:%M:%S.%f")[:-3]
|
||
|
||
def display_sent_data(self, send_time):
|
||
"""显示发送的数据"""
|
||
timestamp = self.format_timestamp(send_time)
|
||
hex_str = self.send_data.hex(' ', 1).upper()
|
||
display_text = f"[{timestamp}] 发送: {hex_str}"
|
||
self.append_display(display_text, "send")
|
||
|
||
def display_received_data(self, data: bytes, receive_time):
|
||
"""显示接收到的数据"""
|
||
if not data:
|
||
return
|
||
|
||
timestamp = self.format_timestamp(receive_time)
|
||
|
||
# 计算距离上次发送的时间差(如果知道上次发送时间)
|
||
time_diff = ""
|
||
if self.last_send_time > 0:
|
||
delay_ms = (receive_time - self.last_send_time) * 1000
|
||
time_diff = f" [延迟: {delay_ms:.1f}ms]"
|
||
|
||
# 显示数据
|
||
hex_str = data.hex(' ', 1).upper()
|
||
if len(data) > 16:
|
||
# 对长数据进行格式化显示
|
||
hex_lines = []
|
||
for i in range(0, len(data), 16):
|
||
chunk = data[i:i+16]
|
||
hex_lines.append(chunk.hex(' ', 1).upper())
|
||
hex_str = "\n ".join(hex_lines)
|
||
|
||
display_text = f"[{timestamp}] 接收: {hex_str}{time_diff}"
|
||
self.append_display(display_text, "receive")
|
||
|
||
# 尝试显示ASCII
|
||
try:
|
||
ascii_str = ""
|
||
for byte in data:
|
||
if 32 <= byte <= 126: # 可打印字符
|
||
ascii_str += chr(byte)
|
||
else:
|
||
ascii_str += "."
|
||
|
||
if ascii_str:
|
||
ascii_text = f"[{timestamp}] ASCII: {ascii_str}"
|
||
self.append_display(ascii_text, "receive")
|
||
except:
|
||
pass
|
||
|
||
def append_display(self, text: str, msg_type: str = "normal"):
|
||
"""向文本框添加文本"""
|
||
if not self.text_display:
|
||
return
|
||
|
||
# 根据消息类型设置颜色
|
||
color_map = {
|
||
"error": "red",
|
||
"info": "blue",
|
||
"send": "darkgreen",
|
||
"receive": "purple",
|
||
"normal": "black"
|
||
}
|
||
color = color_map.get(msg_type, "black")
|
||
|
||
# 插入文本
|
||
self.text_display.insert(tk.END, text + "\n")
|
||
|
||
# 应用颜色标签
|
||
start_index = self.text_display.index("end-2c linestart")
|
||
end_index = self.text_display.index("end-1c")
|
||
self.text_display.tag_add(color, start_index, end_index)
|
||
self.text_display.tag_config(color, foreground=color)
|
||
|
||
# 自动滚屏
|
||
self.text_display.see(tk.END)
|
||
|
||
def clear_display(self):
|
||
"""清空显示区域"""
|
||
if self.text_display:
|
||
self.text_display.delete(1.0, tk.END)
|
||
|
||
def save_data(self):
|
||
"""保存数据到文件"""
|
||
try:
|
||
# 获取当前时间作为文件名
|
||
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
|
||
filename = f"serial_data_{timestamp}.txt"
|
||
|
||
# 获取文本框内容
|
||
content = self.text_display.get(1.0, tk.END)
|
||
|
||
# 写入文件
|
||
with open(filename, 'w', encoding='utf-8') as f:
|
||
f.write("=== 485串口测试数据 ===\n")
|
||
f.write(f"保存时间: {datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n")
|
||
f.write(f"端口: {self.config.port if self.serial_port else '未连接'}\n")
|
||
f.write("=" * 40 + "\n\n")
|
||
f.write(content)
|
||
|
||
self.status_var.set(f"数据已保存到: {filename}")
|
||
messagebox.showinfo("保存成功", f"数据已保存到文件:\n{filename}")
|
||
|
||
except Exception as e:
|
||
messagebox.showerror("保存失败", f"保存数据时出错: {str(e)}")
|
||
|
||
def main():
|
||
"""主函数"""
|
||
root = tk.Tk()
|
||
app = SerialTestTool(root)
|
||
|
||
# 设置窗口关闭时的处理
|
||
def on_closing():
|
||
if app.running:
|
||
app.close_serial()
|
||
root.destroy()
|
||
|
||
root.protocol("WM_DELETE_WINDOW", on_closing)
|
||
root.mainloop()
|
||
|
||
if __name__ == "__main__":
|
||
main() |