pressure_sensor_system/SerialTool/SerialTool.py

235 lines
9.0 KiB
Python

import tkinter as tk
from tkinter import ttk
import serial
import serial.tools.list_ports
import threading
import time
from datetime import datetime
class SerialApp:
def __init__(self, root):
self.root = root
self.root.title("串口通信工具")
self.root.geometry("800x600")
# 串口相关变量
self.serial_port = None
self.is_connected = False
self.read_thread = None
self.stop_thread = False
self.create_widgets()
self.refresh_ports()
def create_widgets(self):
# 主框架
main_frame = ttk.Frame(self.root, padding="10")
main_frame.grid(row=0, column=0, sticky=(tk.W, tk.E, tk.N, tk.S))
# 配置行权重
self.root.columnconfigure(0, weight=1)
self.root.rowconfigure(0, weight=1)
main_frame.columnconfigure(1, weight=1)
# 串口配置区域
config_frame = ttk.LabelFrame(main_frame, text="串口配置", padding="5")
config_frame.grid(row=0, column=0, columnspan=2, sticky=(tk.W, tk.E), pady=(0, 10))
config_frame.columnconfigure(1, weight=1)
# 串口选择
ttk.Label(config_frame, text="串口:").grid(row=0, column=0, sticky=tk.W, padx=(0, 5))
self.port_combo = ttk.Combobox(config_frame, state="readonly")
self.port_combo.grid(row=0, column=1, sticky=(tk.W, tk.E), padx=(0, 10))
# 刷新按钮
self.refresh_btn = ttk.Button(config_frame, text="刷新", command=self.refresh_ports)
self.refresh_btn.grid(row=0, column=2, padx=(0, 10))
# 波特率选择
ttk.Label(config_frame, text="波特率:").grid(row=0, column=3, sticky=tk.W, padx=(0, 5))
self.baud_combo = ttk.Combobox(config_frame, values=[
"9600", "19200", "38400", "57600", "115200", "230400", "460800", "921600"
], state="readonly")
self.baud_combo.set("115200")
self.baud_combo.grid(row=0, column=4, sticky=(tk.W, tk.E))
# 连接/断开按钮
self.connect_btn = ttk.Button(config_frame, text="打开串口", command=self.toggle_connection)
self.connect_btn.grid(row=0, column=5, padx=(10, 0))
# 数据显示区域
display_frame = ttk.LabelFrame(main_frame, text="接收数据", padding="5")
display_frame.grid(row=1, column=0, columnspan=2, sticky=(tk.W, tk.E, tk.N, tk.S), pady=(0, 10))
main_frame.rowconfigure(1, weight=1)
display_frame.columnconfigure(0, weight=1)
display_frame.rowconfigure(0, weight=1)
# 文本框和滚动条
self.text_frame = ttk.Frame(display_frame)
self.text_frame.grid(row=0, column=0, sticky=(tk.W, tk.E, tk.N, tk.S))
self.text_frame.columnconfigure(0, weight=1)
self.text_frame.rowconfigure(0, weight=1)
self.text_area = tk.Text(self.text_frame, wrap=tk.WORD, width=80, height=20)
scrollbar = ttk.Scrollbar(self.text_frame, orient=tk.VERTICAL, command=self.text_area.yview)
self.text_area.configure(yscrollcommand=scrollbar.set)
self.text_area.grid(row=0, column=0, sticky=(tk.W, tk.E, tk.N, tk.S))
scrollbar.grid(row=0, column=1, sticky=(tk.N, tk.S))
# 状态栏
self.status_var = tk.StringVar(value="就绪")
status_bar = ttk.Label(main_frame, textvariable=self.status_var, relief=tk.SUNKEN)
status_bar.grid(row=2, column=0, columnspan=2, sticky=(tk.W, tk.E))
def refresh_ports(self):
"""刷新可用串口列表"""
ports = serial.tools.list_ports.comports()
port_list = [port.device for port in ports]
self.port_combo['values'] = port_list
if port_list:
self.port_combo.set(port_list[0])
def toggle_connection(self):
"""打开或关闭串口连接"""
if not self.is_connected:
self.connect_serial()
else:
self.disconnect_serial()
def connect_serial(self):
"""打开串口连接"""
port = self.port_combo.get()
baudrate = self.baud_combo.get()
if not port:
self.status_var.set("错误: 请选择串口")
return
try:
self.serial_port = serial.Serial(
port=port,
baudrate=int(baudrate),
bytesize=serial.EIGHTBITS,
parity=serial.PARITY_NONE,
stopbits=serial.STOPBITS_ONE,
timeout=1
)
self.is_connected = True
self.stop_thread = False
self.connect_btn.config(text="关闭串口")
self.status_var.set(f"已连接到 {port},波特率 {baudrate}")
# 启动读取线程
self.read_thread = threading.Thread(target=self.read_serial_data, daemon=True)
self.read_thread.start()
except Exception as e:
self.status_var.set(f"连接错误: {str(e)}")
def disconnect_serial(self):
"""关闭串口连接"""
self.stop_thread = True
self.is_connected = False
if self.serial_port and self.serial_port.is_open:
self.serial_port.close()
self.connect_btn.config(text="打开串口")
self.status_var.set("串口已关闭")
def read_serial_data(self):
"""在单独的线程中读取串口数据"""
buffer = bytearray()
while not self.stop_thread and self.serial_port and self.serial_port.is_open:
try:
if self.serial_port.in_waiting > 0:
data = self.serial_port.read(self.serial_port.in_waiting)
buffer.extend(data)
# 处理缓冲区中的数据
while len(buffer) >= 2:
# 检查是否是 0x03 0x01 命令
if buffer[0] == 0x03 and buffer[1] == 0x01 and len(buffer) <= 4:
self.handle_0301_command(buffer[:2])
buffer = buffer[2:]
# 检查是否是 0x03 0x03 命令
elif buffer[0] == 0x03 and buffer[1] == 0x03 and len(buffer) <= 4:
self.handle_0303_command(buffer[:2])
buffer = buffer[2:]
else:
# 如果不是我们处理的命令,显示并丢弃第一个字节
self.display_data(buffer[:1], "RX")
buffer = buffer[1:]
time.sleep(0.01)
except Exception as e:
self.root.after(0, lambda: self.status_var.set(f"读取错误: {str(e)}"))
break
def handle_0301_command(self, data):
"""处理 0x03 0x01 命令"""
# 显示接收到的数据
self.display_data(data, "RX")
# 发送回复 0x03 0x01 0xAA 0xAA
response = bytes([0x03, 0x01, 0xAA, 0xAA])
self.send_data(response)
def handle_0303_command(self, data):
"""处理 0x03 0x03 命令"""
# 显示接收到的数据
self.display_data(data, "RX")
# 发送回复 0x03 0x03 0x05 0x01 0x01 0x01 0x01 0x01 0xAA 0xAA
response = bytes([0x03, 0x03, 0x05, 0x01, 0x01, 0x01, 0x01, 0x01, 0xAA, 0xAA])
self.send_data(response)
def send_data(self, data):
"""发送数据到串口"""
if self.serial_port and self.serial_port.is_open:
try:
self.serial_port.write(data)
self.display_data(data, "TX")
except Exception as e:
self.root.after(0, lambda: self.status_var.set(f"发送错误: {str(e)}"))
def display_data(self, data, direction):
"""在文本框中显示数据"""
timestamp = datetime.now().strftime("%H:%M:%S.%f")[:-3]
hex_str = ' '.join([f'{byte:02X}' for byte in data])
display_text = f"[{timestamp}] {direction}: {hex_str}\n"
# 在主线程中更新GUI
self.root.after(0, lambda: self.update_text_display(display_text))
def update_text_display(self, text):
"""更新文本框显示"""
self.text_area.insert(tk.END, text)
self.text_area.see(tk.END)
def __del__(self):
"""析构函数,确保串口关闭"""
if hasattr(self, 'serial_port') and self.serial_port and self.serial_port.is_open:
self.serial_port.close()
def main():
root = tk.Tk()
app = SerialApp(root)
# 处理窗口关闭事件
def on_closing():
app.disconnect_serial()
root.destroy()
root.protocol("WM_DELETE_WINDOW", on_closing)
root.mainloop()
if __name__ == "__main__":
main()