Files
2026-04-28 21:06:26 +02:00

272 lines
9.8 KiB
Python

import json
import threading
import time
from dataclasses import dataclass
from datetime import datetime
from typing import Any, Dict, List, Optional
def _now_iso() -> str:
return datetime.now().isoformat(timespec="seconds")
def _contains_case_insensitive(haystack: str, needle: str) -> bool:
return needle.lower() in haystack.lower()
class SubtitleRulesStorage:
def __init__(self, storage_dir: str = "storage"):
self.storage_dir = storage_dir
self._lock = threading.Lock()
def _path(self, filename: str) -> str:
if not filename.endswith(".json"):
filename = f"{filename}.json"
return f"{self.storage_dir}/{filename}"
def read_json(self, filename: str, default: Any) -> Any:
path = self._path(filename)
try:
with open(path, "r", encoding="utf-8") as f:
return json.load(f)
except (FileNotFoundError, json.JSONDecodeError):
return default
def write_json(self, filename: str, value: Any) -> None:
import os
if not os.path.exists(self.storage_dir):
os.makedirs(self.storage_dir, exist_ok=True)
path = self._path(filename)
with self._lock:
with open(path, "w", encoding="utf-8") as f:
json.dump(value, f, indent=4, ensure_ascii=False)
@dataclass
class SubtitleRule:
id: str
enabled: bool = True
contains_text: Optional[str] = None
action: str = "clip" # clip | send_message
as_account: Optional[str] = None # pseudo du compte bot (config/user.json). si None: 1er compte enabled
message_text: str = "" # utilisé si action=send_message
clip_has_delay: bool = False
clip_send_message: bool = False
clip_message_template: str = "Clip: {url}"
cooldown_seconds: int = 10
@staticmethod
def from_dict(d: Dict[str, Any]) -> "SubtitleRule":
return SubtitleRule(
id=str(d.get("id") or ""),
enabled=bool(d.get("enabled", True)),
contains_text=(d.get("contains_text") or None),
action=str(d.get("action") or "clip"),
as_account=(d.get("as_account") or None),
message_text=str(d.get("message_text") or ""),
clip_has_delay=bool(d.get("clip_has_delay", False)),
clip_send_message=bool(d.get("clip_send_message", False)),
clip_message_template=str(d.get("clip_message_template") or "Clip: {url}"),
cooldown_seconds=int(d.get("cooldown_seconds", 10)),
)
def to_dict(self) -> Dict[str, Any]:
return {
"id": self.id,
"enabled": self.enabled,
"contains_text": self.contains_text,
"action": self.action,
"as_account": self.as_account,
"message_text": self.message_text,
"clip_has_delay": self.clip_has_delay,
"clip_send_message": self.clip_send_message,
"clip_message_template": self.clip_message_template,
"cooldown_seconds": self.cooldown_seconds,
}
def matches(self, subtitle_text: str) -> bool:
if not self.enabled:
return False
if self.contains_text and not _contains_case_insensitive(subtitle_text or "", self.contains_text):
return False
return True
class SubtitleRulesConfig:
def __init__(self, data: Optional[Dict[str, Any]] = None):
data = data or {}
self.enabled: bool = bool(data.get("enabled", False))
self.rules: List[SubtitleRule] = [
SubtitleRule.from_dict(x) for x in (data.get("rules") or []) if isinstance(x, dict)
]
def to_dict(self) -> Dict[str, Any]:
return {
"enabled": self.enabled,
"rules": [r.to_dict() for r in self.rules],
}
class SubtitleRulesProcessor:
def __init__(
self,
*,
channel_name: str,
send_message_as: callable,
create_clip_as: callable,
resolve_default_account: callable,
storage: Optional[SubtitleRulesStorage] = None,
):
self.channel_name = channel_name
self._send_message_as = send_message_as
self._create_clip_as = create_clip_as
self._resolve_default_account = resolve_default_account
self._storage = storage or SubtitleRulesStorage()
self._config_file = "subtitle_rules_config"
self._log_file = "subtitle_rules_log"
self._last_fired_by_rule: Dict[str, float] = {}
def load_config(self) -> SubtitleRulesConfig:
data = self._storage.read_json(self._config_file, default={})
return SubtitleRulesConfig(data)
def save_config(self, config_dict: Dict[str, Any]) -> SubtitleRulesConfig:
cfg = SubtitleRulesConfig(config_dict)
self._storage.write_json(self._config_file, cfg.to_dict())
return cfg
def append_log(self, entry: Dict[str, Any]) -> None:
logs = self._storage.read_json(self._log_file, default=[])
if not isinstance(logs, list):
logs = []
logs.append(entry)
logs = logs[-400:]
self._storage.write_json(self._log_file, logs)
def read_log(self, limit: int = 120) -> List[Dict[str, Any]]:
logs = self._storage.read_json(self._log_file, default=[])
if not isinstance(logs, list):
return []
return logs[-limit:]
def _cooldown_ok(self, rule_id: str, cooldown_s: int) -> bool:
last = self._last_fired_by_rule.get(rule_id)
if last is None:
return True
return (time.time() - last) >= max(0, cooldown_s)
def _mark_fired(self, rule_id: str) -> None:
self._last_fired_by_rule[rule_id] = time.time()
def on_new_subtitle(self, *, ts_key: str, subtitle_text: str) -> None:
cfg = self.load_config()
if not cfg.enabled:
return
text = str(subtitle_text or "")
for r in cfg.rules:
if not r.id:
continue
if not r.matches(text):
continue
if not self._cooldown_ok(r.id, r.cooldown_seconds):
continue
account = (r.as_account or "").strip() or self._resolve_default_account()
action = (r.action or "clip").strip().lower()
if action == "send_message":
msg = (r.message_text or "").strip()
if not msg:
self.append_log(
{
"ts": _now_iso(),
"type": "rule_empty_message",
"channel": self.channel_name,
"subtitle_ts": ts_key,
"subtitle": text,
"rule_id": r.id,
}
)
continue
try:
self._send_message_as(account, msg)
self._mark_fired(r.id)
self.append_log(
{
"ts": _now_iso(),
"type": "message_sent",
"channel": self.channel_name,
"subtitle_ts": ts_key,
"subtitle": text,
"rule_id": r.id,
"as_account": account,
"message": msg,
}
)
except Exception as e:
self.append_log(
{
"ts": _now_iso(),
"type": "error",
"channel": self.channel_name,
"subtitle_ts": ts_key,
"subtitle": text,
"rule_id": r.id,
"action": action,
"error": str(e),
}
)
continue
# default: clip
try:
clip = self._create_clip_as(account, self.channel_name, bool(r.clip_has_delay))
clip_id = (clip or {}).get("id") or ""
clip_url = (clip or {}).get("url") or ""
clip_edit_url = (clip or {}).get("edit_url") or ""
sent_message = None
if bool(getattr(r, "clip_send_message", False)):
tpl = str(getattr(r, "clip_message_template", "") or "Clip: {url}")
msg = (
tpl.replace("{url}", str(clip_url))
.replace("{edit_url}", str(clip_edit_url))
.replace("{id}", str(clip_id))
).strip()
if msg:
self._send_message_as(account, msg)
sent_message = msg
self._mark_fired(r.id)
self.append_log(
{
"ts": _now_iso(),
"type": "clip_created",
"channel": self.channel_name,
"subtitle_ts": ts_key,
"subtitle": text,
"rule_id": r.id,
"as_account": account,
"clip_url": clip_url,
"clip_edit_url": clip_edit_url,
"clip_id": clip_id,
"chat_message": sent_message,
}
)
except Exception as e:
self.append_log(
{
"ts": _now_iso(),
"type": "error",
"channel": self.channel_name,
"subtitle_ts": ts_key,
"subtitle": text,
"rule_id": r.id,
"action": action,
"error": str(e),
}
)