235 lines
9.0 KiB
Python
235 lines
9.0 KiB
Python
import tkinter as tk
|
|
from tkinter import ttk
|
|
import serial
|
|
import serial.tools.list_ports
|
|
import threading
|
|
import time
|
|
from datetime import datetime
|
|
|
|
class SerialApp:
|
|
def __init__(self, root):
|
|
self.root = root
|
|
self.root.title("串口通信工具")
|
|
self.root.geometry("800x600")
|
|
|
|
# 串口相关变量
|
|
self.serial_port = None
|
|
self.is_connected = False
|
|
self.read_thread = None
|
|
self.stop_thread = False
|
|
|
|
self.create_widgets()
|
|
self.refresh_ports()
|
|
|
|
def create_widgets(self):
|
|
# 主框架
|
|
main_frame = ttk.Frame(self.root, padding="10")
|
|
main_frame.grid(row=0, column=0, sticky=(tk.W, tk.E, tk.N, tk.S))
|
|
|
|
# 配置行权重
|
|
self.root.columnconfigure(0, weight=1)
|
|
self.root.rowconfigure(0, weight=1)
|
|
main_frame.columnconfigure(1, weight=1)
|
|
|
|
# 串口配置区域
|
|
config_frame = ttk.LabelFrame(main_frame, text="串口配置", padding="5")
|
|
config_frame.grid(row=0, column=0, columnspan=2, sticky=(tk.W, tk.E), pady=(0, 10))
|
|
config_frame.columnconfigure(1, weight=1)
|
|
|
|
# 串口选择
|
|
ttk.Label(config_frame, text="串口:").grid(row=0, column=0, sticky=tk.W, padx=(0, 5))
|
|
self.port_combo = ttk.Combobox(config_frame, state="readonly")
|
|
self.port_combo.grid(row=0, column=1, sticky=(tk.W, tk.E), padx=(0, 10))
|
|
|
|
# 刷新按钮
|
|
self.refresh_btn = ttk.Button(config_frame, text="刷新", command=self.refresh_ports)
|
|
self.refresh_btn.grid(row=0, column=2, padx=(0, 10))
|
|
|
|
# 波特率选择
|
|
ttk.Label(config_frame, text="波特率:").grid(row=0, column=3, sticky=tk.W, padx=(0, 5))
|
|
self.baud_combo = ttk.Combobox(config_frame, values=[
|
|
"9600", "19200", "38400", "57600", "115200", "230400", "460800", "921600"
|
|
], state="readonly")
|
|
self.baud_combo.set("115200")
|
|
self.baud_combo.grid(row=0, column=4, sticky=(tk.W, tk.E))
|
|
|
|
# 连接/断开按钮
|
|
self.connect_btn = ttk.Button(config_frame, text="打开串口", command=self.toggle_connection)
|
|
self.connect_btn.grid(row=0, column=5, padx=(10, 0))
|
|
|
|
# 数据显示区域
|
|
display_frame = ttk.LabelFrame(main_frame, text="接收数据", padding="5")
|
|
display_frame.grid(row=1, column=0, columnspan=2, sticky=(tk.W, tk.E, tk.N, tk.S), pady=(0, 10))
|
|
main_frame.rowconfigure(1, weight=1)
|
|
display_frame.columnconfigure(0, weight=1)
|
|
display_frame.rowconfigure(0, weight=1)
|
|
|
|
# 文本框和滚动条
|
|
self.text_frame = ttk.Frame(display_frame)
|
|
self.text_frame.grid(row=0, column=0, sticky=(tk.W, tk.E, tk.N, tk.S))
|
|
self.text_frame.columnconfigure(0, weight=1)
|
|
self.text_frame.rowconfigure(0, weight=1)
|
|
|
|
self.text_area = tk.Text(self.text_frame, wrap=tk.WORD, width=80, height=20)
|
|
scrollbar = ttk.Scrollbar(self.text_frame, orient=tk.VERTICAL, command=self.text_area.yview)
|
|
self.text_area.configure(yscrollcommand=scrollbar.set)
|
|
|
|
self.text_area.grid(row=0, column=0, sticky=(tk.W, tk.E, tk.N, tk.S))
|
|
scrollbar.grid(row=0, column=1, sticky=(tk.N, tk.S))
|
|
|
|
# 状态栏
|
|
self.status_var = tk.StringVar(value="就绪")
|
|
status_bar = ttk.Label(main_frame, textvariable=self.status_var, relief=tk.SUNKEN)
|
|
status_bar.grid(row=2, column=0, columnspan=2, sticky=(tk.W, tk.E))
|
|
|
|
def refresh_ports(self):
|
|
"""刷新可用串口列表"""
|
|
ports = serial.tools.list_ports.comports()
|
|
port_list = [port.device for port in ports]
|
|
self.port_combo['values'] = port_list
|
|
if port_list:
|
|
self.port_combo.set(port_list[0])
|
|
|
|
def toggle_connection(self):
|
|
"""打开或关闭串口连接"""
|
|
if not self.is_connected:
|
|
self.connect_serial()
|
|
else:
|
|
self.disconnect_serial()
|
|
|
|
def connect_serial(self):
|
|
"""打开串口连接"""
|
|
port = self.port_combo.get()
|
|
baudrate = self.baud_combo.get()
|
|
|
|
if not port:
|
|
self.status_var.set("错误: 请选择串口")
|
|
return
|
|
|
|
try:
|
|
self.serial_port = serial.Serial(
|
|
port=port,
|
|
baudrate=int(baudrate),
|
|
bytesize=serial.EIGHTBITS,
|
|
parity=serial.PARITY_NONE,
|
|
stopbits=serial.STOPBITS_ONE,
|
|
timeout=1
|
|
)
|
|
|
|
self.is_connected = True
|
|
self.stop_thread = False
|
|
self.connect_btn.config(text="关闭串口")
|
|
self.status_var.set(f"已连接到 {port},波特率 {baudrate}")
|
|
|
|
# 启动读取线程
|
|
self.read_thread = threading.Thread(target=self.read_serial_data, daemon=True)
|
|
self.read_thread.start()
|
|
|
|
except Exception as e:
|
|
self.status_var.set(f"连接错误: {str(e)}")
|
|
|
|
def disconnect_serial(self):
|
|
"""关闭串口连接"""
|
|
self.stop_thread = True
|
|
self.is_connected = False
|
|
|
|
if self.serial_port and self.serial_port.is_open:
|
|
self.serial_port.close()
|
|
|
|
self.connect_btn.config(text="打开串口")
|
|
self.status_var.set("串口已关闭")
|
|
|
|
def read_serial_data(self):
|
|
"""在单独的线程中读取串口数据"""
|
|
buffer = bytearray()
|
|
|
|
while not self.stop_thread and self.serial_port and self.serial_port.is_open:
|
|
try:
|
|
if self.serial_port.in_waiting > 0:
|
|
data = self.serial_port.read(self.serial_port.in_waiting)
|
|
buffer.extend(data)
|
|
|
|
# 处理缓冲区中的数据
|
|
while len(buffer) >= 2:
|
|
# 检查是否是 0x03 0x01 命令
|
|
if buffer[0] == 0x03 and buffer[1] == 0x01 and len(buffer) <= 4:
|
|
self.handle_0301_command(buffer[:2])
|
|
buffer = buffer[2:]
|
|
|
|
# 检查是否是 0x03 0x03 命令
|
|
elif buffer[0] == 0x03 and buffer[1] == 0x03 and len(buffer) <= 4:
|
|
self.handle_0303_command(buffer[:2])
|
|
buffer = buffer[2:]
|
|
|
|
else:
|
|
# 如果不是我们处理的命令,显示并丢弃第一个字节
|
|
self.display_data(buffer[:1], "RX")
|
|
buffer = buffer[1:]
|
|
|
|
time.sleep(0.01)
|
|
|
|
except Exception as e:
|
|
self.root.after(0, lambda: self.status_var.set(f"读取错误: {str(e)}"))
|
|
break
|
|
|
|
def handle_0301_command(self, data):
|
|
"""处理 0x03 0x01 命令"""
|
|
# 显示接收到的数据
|
|
self.display_data(data, "RX")
|
|
|
|
# 发送回复 0x03 0x01 0xAA 0xAA
|
|
response = bytes([0x03, 0x01, 0xAA, 0xAA])
|
|
self.send_data(response)
|
|
|
|
def handle_0303_command(self, data):
|
|
"""处理 0x03 0x03 命令"""
|
|
# 显示接收到的数据
|
|
self.display_data(data, "RX")
|
|
|
|
# 发送回复 0x03 0x03 0x05 0x01 0x01 0x01 0x01 0x01 0xAA 0xAA
|
|
response = bytes([0x03, 0x03, 0x05, 0x01, 0x01, 0x01, 0x01, 0x01, 0xAA, 0xAA])
|
|
self.send_data(response)
|
|
|
|
def send_data(self, data):
|
|
"""发送数据到串口"""
|
|
if self.serial_port and self.serial_port.is_open:
|
|
try:
|
|
self.serial_port.write(data)
|
|
self.display_data(data, "TX")
|
|
except Exception as e:
|
|
self.root.after(0, lambda: self.status_var.set(f"发送错误: {str(e)}"))
|
|
|
|
def display_data(self, data, direction):
|
|
"""在文本框中显示数据"""
|
|
timestamp = datetime.now().strftime("%H:%M:%S.%f")[:-3]
|
|
hex_str = ' '.join([f'{byte:02X}' for byte in data])
|
|
|
|
display_text = f"[{timestamp}] {direction}: {hex_str}\n"
|
|
|
|
# 在主线程中更新GUI
|
|
self.root.after(0, lambda: self.update_text_display(display_text))
|
|
|
|
def update_text_display(self, text):
|
|
"""更新文本框显示"""
|
|
self.text_area.insert(tk.END, text)
|
|
self.text_area.see(tk.END)
|
|
|
|
def __del__(self):
|
|
"""析构函数,确保串口关闭"""
|
|
if hasattr(self, 'serial_port') and self.serial_port and self.serial_port.is_open:
|
|
self.serial_port.close()
|
|
|
|
def main():
|
|
root = tk.Tk()
|
|
app = SerialApp(root)
|
|
|
|
# 处理窗口关闭事件
|
|
def on_closing():
|
|
app.disconnect_serial()
|
|
root.destroy()
|
|
|
|
root.protocol("WM_DELETE_WINDOW", on_closing)
|
|
root.mainloop()
|
|
|
|
if __name__ == "__main__":
|
|
main() |