目录

Python-智能机房签到系统高效管理课堂考勤

Python 智能机房签到系统:高效管理课堂考勤

智能机房签到系统:高效管理课堂考勤

相关资源文件已经打包成EXE文件,可双击直接运行程序,且文章末尾已附上相关源码,以供大家学习交流,博主主页还有更多Python相关程序案例,秉着开源精神的想法,望大家喜欢,点个关注不迷路!!!

一、概述

本机房签到系统(学生端)是一个基于 PythonTkinter 图形界面库开发的客户端软件,旨在帮助学生快速完成课堂签到,并实时反馈签到状态。该软件通过网络通信与教师端服务器保持连接,确保签到数据的可靠性和实时性。

本系统具备 自动检测服务器IP网络状态实时监测心跳检测 以及 签到重试机制 等多项智能特性,旨在提供一个高效、便捷的签到体验,提高课堂考勤的智能化管理水平。

https://i-blog.csdnimg.cn/direct/3eff41a54021499c9cc5cfaa974f367a.png


二、功能使用

1. 界面设计

  • 窗口布局
    • 采用 Tkinter + ttk 组件构建,界面简洁直观。
    • 窗口大小 420x320 ,不可调整大小,确保界面美观统一。
    • 居中显示 ,提高用户体验。
  • 输入框
    • 教师端IP输入 :自动填充本机推测的教师端IP,可手动修改。
    • 班级选择 :提供 “七(1)班”~“七(6)班” 可选项,采用 下拉框
    • 姓名输入 :限制 2-10 位中文字符 ,保证格式规范。
  • 状态指示灯
    • 连接成功 (绿灯) ,未连接 (红灯) ,直观显示连接状态。
  • 任务栏信息
    • 教师端连接状态 显示,颜色区分状态(绿色=已连接,红色=未连接)。
    • 当前时间 ,每秒更新一次,提升用户体验。

2. 网络通信

  • 服务器IP检测
    • 自动检测本地IP ,推测教师端IP,减少学生输入错误可能性。
  • 网络状态监测
    • 采用 双重检测机制 避免误判,每 1.5-3 秒进行一次检测。
    • 状态变更时,界面 实时更新 ,确保学生随时掌握教师端连接情况。
  • 心跳机制
    • 30 秒 发送一次 “heartbeat” ,保持与教师端的在线状态。
    • 如果断开连接,尝试快速重连。

3. 签到功能

  • 签到流程
    • 点击 “立即签到” 按钮,软件会执行 服务器连接检测 ,确保教师端可用。
    • 检查 IP 地址合法性姓名输入格式 ,确保数据有效性。
    • 采用 多线程 进行签到,防止界面卡顿。
    • 签到请求失败时,最多重试 5 次 ,逐步增加延迟(指数退避)。
  • 服务器响应
    • 成功返回 签到成功 ,弹窗提示,并自动关闭窗口。
    • 失败时,显示 错误信息 ,提示用户检查网络或联系教师。

三、效果展示

https://i-blog.csdnimg.cn/direct/c4affda3c3484aaa9c00c970f72af723.png

https://i-blog.csdnimg.cn/direct/74071e138eb64120a6bd38d14a4b11f8.png

https://i-blog.csdnimg.cn/direct/db78a656d6b94d5fa6c62635e543afc5.png

https://i-blog.csdnimg.cn/direct/2434ca40d7244a9787ef52273adaed2d.png


四、总结

本机房签到系统(学生端)以 简洁美观的界面高效的网络管理智能化签到机制 为核心,提供了一个稳定、易用的课堂签到方案。其主要特点包括:

自动检测服务器IP,降低手动输入成本

实时网络监测,确保签到状态可视化

心跳机制保持在线,提高签到稳定性

高效的重试机制,减少签到失败影响

通过该系统,学生可以快速完成签到,教师端也能高效获取签到数据,从而提高课堂管理的效率和智能化水平。


五、源码展示:

教师服务端源码:

import socket
import threading
import json
import time
import logging
import queue
import struct
from tkinter import *
from tkinter import ttk, messagebox, filedialog
from openpyxl import Workbook
 
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    filename='server.log',
    filemode='a'
)
 
def is_port_in_use(port, host='0.0.0.0'):
    with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
        s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        return s.connect_ex((host, port)) == 0
 
class ServerGUI:
    PORT = 8888
    HEARTBEAT_TIMEOUT = 60
    HEARTBEAT_CHECK_INTERVAL = 15
 
    def __init__(self):
        self.window = Tk()
        self.window.title("机房签到-教师端")
        self.window.geometry("1024x768")
        self.window.protocol("WM_DELETE_WINDOW", self.on_close)
 
        # 状态管理
        self.running = True
        self.clients = {}  # {(computer, ip): tree_item}
        self.last_activity = {}
        self.client_sockets = []
        self.server = None
        self.socket_lock = threading.Lock()
        self.update_queue = queue.Queue()
 
        # 初始化组件
        self._init_styles()
        self._create_menu_bar()
        self._create_auto_table()
        self._create_taskbar()
 
        # 网络初始化
        if not self._init_network():
            return
 
        # 启动线程
        self.accept_thread = threading.Thread(target=self.accept_clients, daemon=True)
        self.heartbeat_thread = threading.Thread(target=self.heartbeat_checker, daemon=True)
        self.accept_thread.start()
        self.heartbeat_thread.start()
 
        # GUI定时任务
        self.update_time()
        self.process_update_queue()
        self.center_window()
        self.window.mainloop()
 
    def _init_styles(self):
        self.style = ttk.Style()
        self.style.theme_use('clam')
        self.style.configure("Treeview",
                            background="#F5F5F5",
                            fieldbackground="#F5F5F5",
                            rowheight=25,
                            font=('微软雅黑', 10))
        self.style.map("Treeview", background=[('selected', '#4A90E2')])
 
    def _create_menu_bar(self):
        menubar = Menu(self.window)
        file_menu = Menu(menubar, tearoff=0)
        file_menu.add_command(label="导出Excel", command=self.save_to_excel)
        file_menu.add_separator()
        file_menu.add_command(label="退出", command=self.on_close)
        help_menu = Menu(menubar, tearoff=0)
        help_menu.add_command(label="关于", command=self._show_about)
        menubar.add_cascade(label="文件", menu=file_menu)
        menubar.add_cascade(label="帮助", menu=help_menu)
        self.window.config(menu=menubar)
 
    def _create_auto_table(self):
        container = ttk.Frame(self.window)
        container.pack(fill=BOTH, expand=True, padx=8, pady=8)
 
        columns = ("computer", "ip", "checkin_class", "checkin_student")
        self.tree = ttk.Treeview(
            container,
            columns=columns,
            show="headings",
            selectmode="browse"
        )
 
        headers = [
            ("computer", "计算机名", 180),
            ("ip", "IP地址", 150),
            ("checkin_class", "签到班级", 200),
            ("checkin_student", "签到学生", 150)
        ]
 
        for col_id, col_text, min_width in headers:
            self.tree.heading(col_id, text=col_text, anchor=CENTER)
            self.tree.column(col_id, width=min_width, anchor=CENTER)
 
        vsb = ttk.Scrollbar(container, orient="vertical", command=self.tree.yview)
        hsb = ttk.Scrollbar(container, orient="horizontal", command=self.tree.xview)
        self.tree.configure(yscrollcommand=vsb.set, xscrollcommand=hsb.set)
 
        self.tree.grid(row=0, column=0, sticky="nsew")
        vsb.grid(row=0, column=1, sticky="ns")
        hsb.grid(row=1, column=0, sticky="ew")
 
        container.grid_rowconfigure(0, weight=1)
        container.grid_columnconfigure(0, weight=1)
 
    def _create_taskbar(self):
        taskbar = ttk.Frame(self.window)
        taskbar.pack(fill=X, side=BOTTOM, pady=4)
 
        self.checkin_count = ttk.Label(taskbar, text="已签到: 0")
        self.checkin_count.pack(side=LEFT, padx=12)
 
        self.time_label = ttk.Label(taskbar, text="")
        self.time_label.pack(side=RIGHT, padx=12)
 
    def _init_network(self):
        if is_port_in_use(self.PORT):
            messagebox.showerror("错误", f"端口 {self.PORT} 已被占用")
            self.window.destroy()
            return False
 
        try:
            self.server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            self.server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
            self.server.bind(('0.0.0.0', self.PORT))
            self.server.settimeout(1)
            self.server.listen(5)
            logging.info("服务器启动成功")
            return True
        except Exception as e:
            logging.error(f"服务启动失败: {str(e)}")
            self.window.destroy()
            return False
 
    def accept_clients(self):
        while self.running:
            try:
                client, addr = self.server.accept()
                client.setsockopt(socket.SOL_SOCKET, socket.SO_LINGER,
                                struct.pack('ii', 1, 0))
                with self.socket_lock:
                    self.client_sockets.append(client)
                threading.Thread(
                    target=self.handle_client,
                    args=(client, addr),
                    daemon=True
                ).start()
                logging.info(f"新连接: {addr}")
            except socket.timeout:
                continue
            except Exception as e:
                if self.running:
                    logging.error(f"接受连接异常: {str(e)}")
                break
 
    def handle_client(self, client, addr):
        buffer = b''
        try:
            while self.running:
                try:
                    data = client.recv(1024)
                    if not data:
                        break
                    buffer += data
 
                    # 处理PING请求
                    if buffer.startswith(b'PING'):
                        client.sendall(b'PONG\n')
                        buffer = buffer[5:]
                        continue
 
                    while b'\n' in buffer:
                        line, buffer = buffer.split(b'\n', 1)
                        try:
                            info = json.loads(line.decode('utf-8'))
                            self.validate_client_info(info, addr)
 
                            if info['type'] == 'checkin':
                                self.process_checkin(info, addr)
                            elif info['type'] == 'init':
                                self.process_init(info, addr)
                            elif info['type'] == 'heartbeat':
                                self.process_heartbeat(info, addr)
 
                            response = {"status": "success"}
                            client.sendall(json.dumps(response).encode() + b'\n')
 
                        except json.JSONDecodeError as e:
                            truncated = line[:50] + b'...' if len(line) > 50 else line
                            logging.warning(f"无效JSON数据: {truncated} 错误: {str(e)}")
                        except Exception as e:
                            logging.error(f"数据处理失败: {str(e)}")
 
                except (socket.timeout, ConnectionResetError):
                    break
        except Exception as e:
            logging.error(f"客户端处理异常: {str(e)}")
        finally:
            with self.socket_lock:
                if client in self.client_sockets:
                    self.client_sockets.remove(client)
            client.close()
 
    def validate_client_info(self, info, addr):
        required_fields = {
            'init': ['type', 'computer_name'],
            'checkin': ['type', 'computer_name', 'class', 'name'],
            'heartbeat': ['type', 'computer_name']
        }
        msg_type = info.get('type')
        if msg_type not in required_fields:
            raise ValueError(f"未知消息类型: {msg_type}")
        for field in required_fields[msg_type]:
            if field not in info:
                raise ValueError(f"缺失字段 {field},来自 {addr}")
 
    def process_init(self, info, addr):
        key = (info['computer_name'], addr[0])
        if key not in self.clients:
            self.update_queue.put((self._add_tree_item, (key,)))
        self.update_queue.put((self._update_activity, (key,)))
 
    def _add_tree_item(self, key):
        item = self.tree.insert("", "end", values=(key[0], key[1], "", ""))
        self.clients[key] = item
 
    def process_checkin(self, info, addr):
        key = (info['computer_name'], addr[0])
        if key not in self.clients:
            self.process_init(info, addr)
        values = (key[0], key[1], info['class'], info['name'])
        self.update_queue.put((self._update_tree_item, (key, values)))
        self.update_queue.put((self._update_activity, (key,)))
 
    def _update_tree_item(self, key, values):
        if key in self.clients:
            self.tree.item(self.clients[key], values=values)
 
    def _update_activity(self, key):
        self.last_activity[key] = time.time()
        self.update_checkin_count()
 
    def process_heartbeat(self, info, addr):
        key = (info['computer_name'], addr[0])
        if key in self.last_activity:
            self.last_activity[key] = time.time()
 
    def heartbeat_checker(self):
        while self.running:
            current_time = time.time()
            expired = [k for k, t in self.last_activity.items()
                      if current_time - t > self.HEARTBEAT_TIMEOUT]
            if expired:
                self.update_queue.put((self.remove_clients, (expired,)))
            time.sleep(self.HEARTBEAT_CHECK_INTERVAL)
 
    def remove_clients(self, keys):
        for key in keys:
            if key in self.clients:
                self.tree.delete(self.clients[key])
                del self.clients[key]
            if key in self.last_activity:
                del self.last_activity[key]
        self.update_checkin_count()
 
    def update_checkin_count(self):
        count = len([k for k in self.clients
                    if self.tree.item(self.clients[k])['values'][3]])
        self.checkin_count.config(text=f"已签到: {count}")
 
    def update_time(self):
        if self.running:
            self.time_label.config(text=time.strftime("%Y-%m-%d %H:%M:%S"))
            self.window.after(1000, self.update_time)
 
    def save_to_excel(self):
        if not self.clients:
            messagebox.showwarning("提示", "当前没有可导出的签到数据")
            return
 
        path = filedialog.asksaveasfilename(
            defaultextension=".xlsx",
            filetypes=[("Excel文件", "*.xlsx")]
        )
        if not path:
            return
 
        try:
            wb = Workbook()
            ws = wb.active
            ws.append(["计算机名", "IP地址", "班级", "姓名"])
            for key in self.clients:
                values = self.tree.item(self.clients[key])['values']
                ws.append(values)
            wb.save(path)
            messagebox.showinfo("成功", "数据已保存")
        except Exception as e:
            messagebox.showerror("错误", f"保存失败: {str(e)}")
 
    def on_close(self):
        logging.info("正在关闭服务器...")
        self.running = False
 
        if self.accept_thread.is_alive():
            self.accept_thread.join(timeout=2)
        if self.heartbeat_thread.is_alive():
            self.heartbeat_thread.join(timeout=2)
 
        self.close_network_connections()
        self.window.destroy()
 
    def close_network_connections(self):
        with self.socket_lock:
            logging.info("关闭客户端连接...")
            for client in self.client_sockets:
                try:
                    client.setsockopt(socket.SOL_SOCKET, socket.SO_LINGER,
                                    struct.pack('ii', 1, 0))
                    client.shutdown(socket.SHUT_RDWR)
                    client.close()
                except Exception as e:
                    logging.warning(f"关闭连接出错: {str(e)}")
            self.client_sockets.clear()
 
        if self.server:
            try:
                self.server.close()
            except Exception as e:
                logging.warning(f"关闭服务器出错: {str(e)}")
 
    def center_window(self):
        self.window.update_idletasks()
        w = self.window.winfo_width()
        h = self.window.winfo_height()
        x = (self.window.winfo_screenwidth() - w) // 2
        y = (self.window.winfo_screenheight() - h) // 2
        self.window.geometry(f"{w}x{h}+{x}+{y}")
 
    def _show_about(self):
        messagebox.showinfo("关于", "机房签到系统教师端\n版本: 3.2")
 
    def process_update_queue(self):
        while not self.update_queue.empty():
            func, args = self.update_queue.get()
            try:
                if self.running:
                    func(*args)
            except Exception as e:
                logging.error(f"队列处理异常: {str(e)}")
        if self.running:
            self.window.after(100, self.process_update_queue)
 
if __name__ == '__main__':
    ServerGUI()

学生端代码:

import socket
import json
import threading
import logging
import time
import re
from tkinter import ttk, messagebox, Tk, Canvas, StringVar, LEFT, X, BOTTOM, RIGHT, DISABLED, NORMAL, TclError

logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    filename='client.log',
    filemode='a'
)

class EnhancedClientGUI:
    PORT = 8888
    HEARTBEAT_INTERVAL = 30
    CONNECT_TIMEOUT = 5
    MAX_RETRIES = 5
    RETRY_DELAY_BASE = 2
    STATUS_CHECK_INTERVAL = 3
    FAST_CHECK_INTERVAL = 1.5

    def __init__(self):
        self.window = Tk()
        self.window.title("机房签到-学生端")
        self.window.geometry("420x320")
        self.window.resizable(False, False)
        self.window.protocol("WM_DELETE_WINDOW", self.disable_close)

        self.running = True
        self.computer_name = socket.gethostname()
        self.stop_event = threading.Event()
        self.checkin_thread = None
        self._current_socket = None

        # 初始化界面组件
        self.create_ui()
        self.create_taskbar()
        self.center_window()

        # 设置初始状态为未连接
        self.update_status_ui(False)

        # 启动后台服务
        self.start_network_monitor()
        self.start_heartbeat()
        self.update_time()

        self.window.mainloop()

    def create_ui(self):
        """创建主界面组件"""
        main_frame = ttk.Frame(self.window, padding=20)
        main_frame.pack(fill=X, expand=True)

        # IP地址输入部分
        ip_frame = ttk.Frame(main_frame)
        ip_frame.pack(fill=X, pady=5)
        ttk.Label(ip_frame, text="教师端IP").pack(side=LEFT)
        self.ip_entry = ttk.Entry(ip_frame)
        self.ip_entry.insert(0, self.detect_server_ip())
        self.ip_entry.pack(side=LEFT, fill=X, expand=True, padx=5)

        # 状态指示灯(初始红色)
        self.status_light = Canvas(ip_frame, width=24, height=24, bg='white')
        self.light_id = self.status_light.create_oval(2, 2, 22, 22, fill="red")
        self.status_light.pack(side=RIGHT)

        # 班级选择
        class_frame = ttk.Frame(main_frame)
        class_frame.pack(fill=X, pady=5)
        ttk.Label(class_frame, text="选择班级").pack(side=LEFT)
        self.class_var = StringVar()
        self.class_combobox = ttk.Combobox(
            class_frame,
            textvariable=self.class_var,
            values=['七(1)班', '七(2)班', '七(3)班', '七(4)班', '七(5)班', '七(6)班'],
            state="readonly"
        )
        self.class_combobox.current(0)
        self.class_combobox.pack(side=LEFT, fill=X, expand=True, padx=5)

        # 姓名输入
        name_frame = ttk.Frame(main_frame)
        name_frame.pack(fill=X, pady=10)
        ttk.Label(name_frame, text="学生姓名").pack(side=LEFT)
        self.name_entry = ttk.Entry(name_frame)
        self.name_entry.pack(side=LEFT, fill=X, expand=True, padx=5)

        # 签到按钮
        self.btn_submit = ttk.Button(
            main_frame,
            text="立即签到",
            command=self.start_checkin,
            width=15
        )
        self.btn_submit.pack(pady=15)

    def create_taskbar(self):
        """创建底部状态栏"""
        taskbar = ttk.Frame(self.window, padding=(10, 3))
        taskbar.pack(side=BOTTOM, fill=X)

        self.conn_status = ttk.Label(
            taskbar,
            text="教师端状态未连接",
            font=('Microsoft YaHei', 9),
            foreground="#F44336"
        )
        self.conn_status.pack(side=LEFT, padx=5)

        self.time_label = ttk.Label(
            taskbar,
            font=('Microsoft YaHei', 9),
            foreground="#666"
        )
        self.time_label.pack(side=RIGHT, padx=5)

    def detect_server_ip(self):
        """自动检测服务器IP"""
        try:
            with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as s:
                s.connect(("8.8.8.8", 80))
                local_ip = s.getsockname()[0]
                base_ip = local_ip.rsplit('.', 1)[0]
                return f"{base_ip}.100" if base_ip.count('.') == 2 else "192.168.6.100"
        except Exception:
            return "192.168.6.100"

    def start_network_monitor(self):
        """可靠的网络监测线程"""
        def monitor():
            last_status = False
            time.sleep(0.5)  # 等待界面初始化完成

            while not self.stop_event.is_set():
                try:
                    # 双重检测机制
                    status1 = self.check_server_reachable()
                    time.sleep(0.3)
                    status2 = self.check_server_reachable()

                    current_status = status1 and status2
                except Exception as e:
                    logging.error(f"检测异常 {str(e)}")
                    current_status = False

                if current_status != last_status:
                    self.safe_gui_update(lambda: self.update_status_ui(current_status))
                    last_status = current_status

                interval = self.FAST_CHECK_INTERVAL if not current_status else self.STATUS_CHECK_INTERVAL
                time.sleep(interval)

        threading.Thread(target=monitor, daemon=True).start()

    def check_server_reachable(self):
        """精确检测服务器可达性"""
        try:
            with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
                s.settimeout(1.5)
                s.connect((self.ip_entry.get(), self.PORT))

                # 服务级验证
                s.sendall(b'PING\n')
                response = s.recv(16).decode().strip()
                return response == 'PONG'
        except (socket.timeout, ConnectionRefusedError):
            return False
        except Exception as e:
            logging.debug(f"连接异常 {str(e)}")
            return False

    def update_status_ui(self, connected):
        """更新状态指示"""
        target_color = "green" if connected else "red"
        status_text = "已连接" if connected else "未连接"
        status_color = "#4CAF50" if connected else "#F44336"

        # 更新指示灯
        self.status_light.itemconfig(self.light_id, fill=target_color)

        # 更新任务栏状态
        self.conn_status.config(
            text=f"教师端状态 {status_text}",
            foreground=status_color
        )

    def start_heartbeat(self):
        """心跳线程"""
        def heartbeat_loop():
            while not self.stop_event.is_set():
                try:
                    if self.check_server_reachable():
                        self.send_heartbeat()
                    time.sleep(self.HEARTBEAT_INTERVAL)
                except Exception as e:
                    logging.error(f"心跳异常 {str(e)}")
                    time.sleep(self.FAST_CHECK_INTERVAL)

        threading.Thread(target=heartbeat_loop, daemon=True).start()

    def send_heartbeat(self):
        """发送心跳包"""
        data = {
            "type": "heartbeat",
            "computer_name": self.computer_name
        }
        return self.send_request(data)

    def start_checkin(self):
        """开始签到流程"""
        current_status = self.check_server_reachable()
        self.update_status_ui(current_status)

        if not current_status:
            self.show_error("教师端不可达,请检查IP地址")
            return

        if not self.validate_input():
            return

        if self.checkin_thread and self.checkin_thread.is_alive():
            messagebox.showwarning("提示", "已有签到请求在处理中")
            return

        self.btn_submit.config(state=DISABLED, text="提交中...")
        self.checkin_thread = threading.Thread(target=self.process_checkin, daemon=True)
        self.checkin_thread.start()

    def process_checkin(self):
        """处理签到逻辑"""
        checkin_data = {
            "type": "checkin",
            "computer_name": self.computer_name,
            "class": self.class_var.get(),
            "name": self.name_entry.get().strip()
        }

        success = False
        for attempt in range(1, self.MAX_RETRIES + 1):
            try:
                if self.send_request(checkin_data):
                    success = True
                    break
                logging.warning(f"第{attempt}次尝试失败")
                time.sleep(self.RETRY_DELAY_BASE * attempt)
            except Exception as e:
                logging.error(f"请求异常 {str(e)}")

        self.safe_gui_update(lambda: self.handle_checkin_result(success))

    def send_request(self, data) -> bool:
        """发送网络请求"""
        try:
            self._current_socket = socket.create_connection(
                (self.ip_entry.get(), 8888),
                timeout=self.CONNECT_TIMEOUT
            )
            with self._current_socket as sock:
                sock.sendall(json.dumps(data).encode() + b'\n')
                response = self.receive_response(sock)
                return response and response.get("status") == "success"
        except socket.timeout:
            logging.warning("连接超时")
            return False
        except ConnectionRefusedError:
            logging.warning("连接被拒绝")
            return False
        except Exception as e:
            logging.error(f"网络错误 {str(e)}")
            return False
        finally:
            self._current_socket = None

    def receive_response(self, sock) -> dict:
        """接收服务器响应"""
        buffer = b''
        try:
            sock.settimeout(3)
            while True:
                part = sock.recv(1024)
                if not part:
                    break
                buffer += part
                if b'\n' in buffer:
                    response_line, buffer = buffer.split(b'\n', 1)
                    try:
                        return json.loads(response_line.decode('utf-8'))
                    except json.JSONDecodeError:
                        logging.error("无效的JSON响应")
                        return None
        except socket.timeout:
            logging.warning("响应接收超时")
            return None

    def validate_input(self) -> bool:
        """验证输入有效性"""
        try:
            socket.inet_aton(self.ip_entry.get())
        except socket.error:
            self.show_error("无效的IP地址格式")
            return False

        name = self.name_entry.get().strip()
        if not re.match(r'^[\u4e00-\u9fa5]{2,10}$', name):
            self.show_error("请输入2-10位中文姓名")
            return False
        return True

    def handle_checkin_result(self, success):
        """处理签到结果"""
        self.btn_submit.config(state=NORMAL, text="立即签到")
        if success:
            messagebox.showinfo("成功", "签到成功!")
            self.safe_destroy()
        else:
            messagebox.showerror("错误", "签到失败,请检查网络连接后重试")

    def update_time(self):
        """更新时间显示"""
        if self.running:
            current_time = time.strftime("%Y-%m-%d %H:%M:%S")
            self.time_label.config(text=current_time)
            self.window.after(1000, self.update_time)

    def safe_gui_update(self, func):
        """线程安全的GUI更新"""
        if self.running and self.window.winfo_exists():
            self.window.after(0, func)

    def show_error(self, message):
        """显示错误提示"""
        self.safe_gui_update(lambda: messagebox.showerror("错误", message))

    def disable_close(self):
        """阻止直接关闭窗口"""
        if self.running:
            messagebox.showinfo("提示", "请先完成签到操作")

    def safe_destroy(self):
        """安全关闭程序"""
        self.running = False
        self.stop_event.set()

        if self._current_socket:
            try:
                self._current_socket.close()
            except Exception as e:
                logging.warning(f"关闭连接异常 {str(e)}")

        if self.checkin_thread and self.checkin_thread.is_alive():
            self.checkin_thread.join(timeout=2)

        try:
            self.window.destroy()
        except TclError:
            pass

    def center_window(self):
        """窗口居中显示"""
        self.window.update_idletasks()
        width = self.window.winfo_width()
        height = self.window.winfo_height()
        x = (self.window.winfo_screenwidth() - width) // 2
        y = (self.window.winfo_screenheight() - height) // 2
        self.window.geometry(f"{width}x{height}+{x}+{y}")


if __name__ == '__main__':
    EnhancedClientGUI()