""" 通知类定义: ``` InfoData: date: 当前日期(包括星期几) time: 当前时间 course_time: 课程开始时间 course_name: 课程名称 course_teacher: 教师名称 course_location: 上课地点 course_time_interval: 距上课剩余时间,根据上文计算 ``` 通知格式参考InfoData内容 通知形式:参考下方bash脚本,将ip换为192.168.12.1,uid、路径均不变。抽取为方法 ``` #!/bin/bash msg=$1 json=$(jq -n --arg msg "$msg" --argjson uid 2998813882 '{user_id: $uid, message: $msg}') curl -s -o /dev/null -X POST http://127.0.0.1:3000/send_private_msg \ -H "Content-Type: application/json" \ -d "$json" ``` 定义方法publish_info负责推送,该类只暴露publish_info方法,如果涉及其他方法以及字段,均需要以`_`开头进行私有化 """ from dataclasses import dataclass from datetime import datetime import json import requests import os @dataclass class InfoData: """Data class for notification information""" date: str # 当前日期(包括星期几) time: str # 当前时间 course_time: str # 课程开始时间 course_name: str # 课程名称 course_teacher: str # 教师名称 course_location: str # 上课地点 course_time_interval: str # 距上课剩余时间,根据上文计算 def publish_info(info_data: InfoData) -> bool: """ Publish course information notification Args: info_data: InfoData object containing notification information Returns: bool: True if notification was sent successfully, False otherwise """ message = f"""==========课程提醒========== 课程: {info_data.course_name} 地点: {info_data.course_location} 教师: {info_data.course_teacher} 课程开始时间: {info_data.course_time} 距离上课: {info_data.course_time_interval} 当前日期: {info_data.date} 当前时间: {info_data.time}""" try: # Format the message based on InfoData # Send the notification using curl _send_notification(message) return True except Exception as e: print(f"Failed to publish info: {e}") return False def _load_config(): """ Load notification configuration from JSON file (private method) Returns: dict: Configuration dictionary """ config_path = os.path.join(os.path.dirname(__file__), 'notification_config.json') with open(config_path, 'r', encoding='utf-8') as f: return json.load(f) def _send_notification(message: str) -> None: """ Send notification via HTTP POST request (private method) Args: message: The message to send """ # Load configuration config = _load_config() # Extract server and user configuration host = config['server']['host'] port = config['server']['port'] endpoint = config['server']['endpoint'] uid = config['user']['uid'] # Create URL url = f"http://{host}:{port}{endpoint}" # Create JSON payload payload = { "user_id": uid, "message": message } # Send POST request headers = {"Content-Type": "application/json"} response = requests.post(url, json=payload, headers=headers) # Raise an exception if the request was unsuccessful response.raise_for_status() def _get_test_info() -> InfoData: """ Get current course information (private method) Returns: InfoData: Current course information """ # Get current date and time now = datetime.now() date_str = now.strftime("%Y-%m-%d") time_str = now.strftime("%H:%M:%S") # For demonstration, we'll use sample data # In a real implementation, this would fetch from reader.py info_data = InfoData( date=date_str+" 星期一", time=time_str, course_time="08:00", course_name="示例课程", course_teacher="示例教师", course_location="示例地点", course_time_interval="30分钟" ) return info_data # Example usage (for testing purposes) if __name__ == "__main__": # Get current info info = _get_test_info() # Publish the info success = publish_info(info) if success: print("Notification sent successfully") else: print("Failed to send notification")