import json import threading import time from dataclasses import dataclass from datetime import datetime from typing import Any, Dict, List, Optional def _now_iso() -> str: return datetime.now().isoformat(timespec="seconds") def _contains_case_insensitive(haystack: str, needle: str) -> bool: return needle.lower() in haystack.lower() class SubtitleRulesStorage: def __init__(self, storage_dir: str = "storage"): self.storage_dir = storage_dir self._lock = threading.Lock() def _path(self, filename: str) -> str: if not filename.endswith(".json"): filename = f"{filename}.json" return f"{self.storage_dir}/{filename}" def read_json(self, filename: str, default: Any) -> Any: path = self._path(filename) try: with open(path, "r", encoding="utf-8") as f: return json.load(f) except (FileNotFoundError, json.JSONDecodeError): return default def write_json(self, filename: str, value: Any) -> None: import os if not os.path.exists(self.storage_dir): os.makedirs(self.storage_dir, exist_ok=True) path = self._path(filename) with self._lock: with open(path, "w", encoding="utf-8") as f: json.dump(value, f, indent=4, ensure_ascii=False) @dataclass class SubtitleRule: id: str enabled: bool = True contains_text: Optional[str] = None action: str = "clip" # clip | send_message as_account: Optional[str] = None # pseudo du compte bot (config/user.json). si None: 1er compte enabled message_text: str = "" # utilisé si action=send_message clip_has_delay: bool = False clip_send_message: bool = False clip_message_template: str = "Clip: {url}" cooldown_seconds: int = 10 @staticmethod def from_dict(d: Dict[str, Any]) -> "SubtitleRule": return SubtitleRule( id=str(d.get("id") or ""), enabled=bool(d.get("enabled", True)), contains_text=(d.get("contains_text") or None), action=str(d.get("action") or "clip"), as_account=(d.get("as_account") or None), message_text=str(d.get("message_text") or ""), clip_has_delay=bool(d.get("clip_has_delay", False)), clip_send_message=bool(d.get("clip_send_message", False)), clip_message_template=str(d.get("clip_message_template") or "Clip: {url}"), cooldown_seconds=int(d.get("cooldown_seconds", 10)), ) def to_dict(self) -> Dict[str, Any]: return { "id": self.id, "enabled": self.enabled, "contains_text": self.contains_text, "action": self.action, "as_account": self.as_account, "message_text": self.message_text, "clip_has_delay": self.clip_has_delay, "clip_send_message": self.clip_send_message, "clip_message_template": self.clip_message_template, "cooldown_seconds": self.cooldown_seconds, } def matches(self, subtitle_text: str) -> bool: if not self.enabled: return False if self.contains_text and not _contains_case_insensitive(subtitle_text or "", self.contains_text): return False return True class SubtitleRulesConfig: def __init__(self, data: Optional[Dict[str, Any]] = None): data = data or {} self.enabled: bool = bool(data.get("enabled", False)) self.rules: List[SubtitleRule] = [ SubtitleRule.from_dict(x) for x in (data.get("rules") or []) if isinstance(x, dict) ] def to_dict(self) -> Dict[str, Any]: return { "enabled": self.enabled, "rules": [r.to_dict() for r in self.rules], } class SubtitleRulesProcessor: def __init__( self, *, channel_name: str, send_message_as: callable, create_clip_as: callable, resolve_default_account: callable, storage: Optional[SubtitleRulesStorage] = None, ): self.channel_name = channel_name self._send_message_as = send_message_as self._create_clip_as = create_clip_as self._resolve_default_account = resolve_default_account self._storage = storage or SubtitleRulesStorage() self._config_file = "subtitle_rules_config" self._log_file = "subtitle_rules_log" self._last_fired_by_rule: Dict[str, float] = {} def load_config(self) -> SubtitleRulesConfig: data = self._storage.read_json(self._config_file, default={}) return SubtitleRulesConfig(data) def save_config(self, config_dict: Dict[str, Any]) -> SubtitleRulesConfig: cfg = SubtitleRulesConfig(config_dict) self._storage.write_json(self._config_file, cfg.to_dict()) return cfg def append_log(self, entry: Dict[str, Any]) -> None: logs = self._storage.read_json(self._log_file, default=[]) if not isinstance(logs, list): logs = [] logs.append(entry) logs = logs[-400:] self._storage.write_json(self._log_file, logs) def read_log(self, limit: int = 120) -> List[Dict[str, Any]]: logs = self._storage.read_json(self._log_file, default=[]) if not isinstance(logs, list): return [] return logs[-limit:] def _cooldown_ok(self, rule_id: str, cooldown_s: int) -> bool: last = self._last_fired_by_rule.get(rule_id) if last is None: return True return (time.time() - last) >= max(0, cooldown_s) def _mark_fired(self, rule_id: str) -> None: self._last_fired_by_rule[rule_id] = time.time() def on_new_subtitle(self, *, ts_key: str, subtitle_text: str) -> None: cfg = self.load_config() if not cfg.enabled: return text = str(subtitle_text or "") for r in cfg.rules: if not r.id: continue if not r.matches(text): continue if not self._cooldown_ok(r.id, r.cooldown_seconds): continue account = (r.as_account or "").strip() or self._resolve_default_account() action = (r.action or "clip").strip().lower() if action == "send_message": msg = (r.message_text or "").strip() if not msg: self.append_log( { "ts": _now_iso(), "type": "rule_empty_message", "channel": self.channel_name, "subtitle_ts": ts_key, "subtitle": text, "rule_id": r.id, } ) continue try: self._send_message_as(account, msg) self._mark_fired(r.id) self.append_log( { "ts": _now_iso(), "type": "message_sent", "channel": self.channel_name, "subtitle_ts": ts_key, "subtitle": text, "rule_id": r.id, "as_account": account, "message": msg, } ) except Exception as e: self.append_log( { "ts": _now_iso(), "type": "error", "channel": self.channel_name, "subtitle_ts": ts_key, "subtitle": text, "rule_id": r.id, "action": action, "error": str(e), } ) continue # default: clip try: clip = self._create_clip_as(account, self.channel_name, bool(r.clip_has_delay)) clip_id = (clip or {}).get("id") or "" clip_url = (clip or {}).get("url") or "" clip_edit_url = (clip or {}).get("edit_url") or "" sent_message = None if bool(getattr(r, "clip_send_message", False)): tpl = str(getattr(r, "clip_message_template", "") or "Clip: {url}") msg = ( tpl.replace("{url}", str(clip_url)) .replace("{edit_url}", str(clip_edit_url)) .replace("{id}", str(clip_id)) ).strip() if msg: self._send_message_as(account, msg) sent_message = msg self._mark_fired(r.id) self.append_log( { "ts": _now_iso(), "type": "clip_created", "channel": self.channel_name, "subtitle_ts": ts_key, "subtitle": text, "rule_id": r.id, "as_account": account, "clip_url": clip_url, "clip_edit_url": clip_edit_url, "clip_id": clip_id, "chat_message": sent_message, } ) except Exception as e: self.append_log( { "ts": _now_iso(), "type": "error", "channel": self.channel_name, "subtitle_ts": ts_key, "subtitle": text, "rule_id": r.id, "action": action, "error": str(e), } )