272 lines
9.8 KiB
Python
272 lines
9.8 KiB
Python
import json
|
|
import threading
|
|
import time
|
|
from dataclasses import dataclass
|
|
from datetime import datetime
|
|
from typing import Any, Dict, List, Optional
|
|
|
|
|
|
def _now_iso() -> str:
|
|
return datetime.now().isoformat(timespec="seconds")
|
|
|
|
|
|
def _contains_case_insensitive(haystack: str, needle: str) -> bool:
|
|
return needle.lower() in haystack.lower()
|
|
|
|
|
|
class SubtitleRulesStorage:
|
|
def __init__(self, storage_dir: str = "storage"):
|
|
self.storage_dir = storage_dir
|
|
self._lock = threading.Lock()
|
|
|
|
def _path(self, filename: str) -> str:
|
|
if not filename.endswith(".json"):
|
|
filename = f"{filename}.json"
|
|
return f"{self.storage_dir}/{filename}"
|
|
|
|
def read_json(self, filename: str, default: Any) -> Any:
|
|
path = self._path(filename)
|
|
try:
|
|
with open(path, "r", encoding="utf-8") as f:
|
|
return json.load(f)
|
|
except (FileNotFoundError, json.JSONDecodeError):
|
|
return default
|
|
|
|
def write_json(self, filename: str, value: Any) -> None:
|
|
import os
|
|
|
|
if not os.path.exists(self.storage_dir):
|
|
os.makedirs(self.storage_dir, exist_ok=True)
|
|
path = self._path(filename)
|
|
with self._lock:
|
|
with open(path, "w", encoding="utf-8") as f:
|
|
json.dump(value, f, indent=4, ensure_ascii=False)
|
|
|
|
|
|
@dataclass
|
|
class SubtitleRule:
|
|
id: str
|
|
enabled: bool = True
|
|
contains_text: Optional[str] = None
|
|
action: str = "clip" # clip | send_message
|
|
as_account: Optional[str] = None # pseudo du compte bot (config/user.json). si None: 1er compte enabled
|
|
message_text: str = "" # utilisé si action=send_message
|
|
clip_has_delay: bool = False
|
|
clip_send_message: bool = False
|
|
clip_message_template: str = "Clip: {url}"
|
|
cooldown_seconds: int = 10
|
|
|
|
@staticmethod
|
|
def from_dict(d: Dict[str, Any]) -> "SubtitleRule":
|
|
return SubtitleRule(
|
|
id=str(d.get("id") or ""),
|
|
enabled=bool(d.get("enabled", True)),
|
|
contains_text=(d.get("contains_text") or None),
|
|
action=str(d.get("action") or "clip"),
|
|
as_account=(d.get("as_account") or None),
|
|
message_text=str(d.get("message_text") or ""),
|
|
clip_has_delay=bool(d.get("clip_has_delay", False)),
|
|
clip_send_message=bool(d.get("clip_send_message", False)),
|
|
clip_message_template=str(d.get("clip_message_template") or "Clip: {url}"),
|
|
cooldown_seconds=int(d.get("cooldown_seconds", 10)),
|
|
)
|
|
|
|
def to_dict(self) -> Dict[str, Any]:
|
|
return {
|
|
"id": self.id,
|
|
"enabled": self.enabled,
|
|
"contains_text": self.contains_text,
|
|
"action": self.action,
|
|
"as_account": self.as_account,
|
|
"message_text": self.message_text,
|
|
"clip_has_delay": self.clip_has_delay,
|
|
"clip_send_message": self.clip_send_message,
|
|
"clip_message_template": self.clip_message_template,
|
|
"cooldown_seconds": self.cooldown_seconds,
|
|
}
|
|
|
|
def matches(self, subtitle_text: str) -> bool:
|
|
if not self.enabled:
|
|
return False
|
|
if self.contains_text and not _contains_case_insensitive(subtitle_text or "", self.contains_text):
|
|
return False
|
|
return True
|
|
|
|
|
|
class SubtitleRulesConfig:
|
|
def __init__(self, data: Optional[Dict[str, Any]] = None):
|
|
data = data or {}
|
|
self.enabled: bool = bool(data.get("enabled", False))
|
|
self.rules: List[SubtitleRule] = [
|
|
SubtitleRule.from_dict(x) for x in (data.get("rules") or []) if isinstance(x, dict)
|
|
]
|
|
|
|
def to_dict(self) -> Dict[str, Any]:
|
|
return {
|
|
"enabled": self.enabled,
|
|
"rules": [r.to_dict() for r in self.rules],
|
|
}
|
|
|
|
|
|
class SubtitleRulesProcessor:
|
|
def __init__(
|
|
self,
|
|
*,
|
|
channel_name: str,
|
|
send_message_as: callable,
|
|
create_clip_as: callable,
|
|
resolve_default_account: callable,
|
|
storage: Optional[SubtitleRulesStorage] = None,
|
|
):
|
|
self.channel_name = channel_name
|
|
self._send_message_as = send_message_as
|
|
self._create_clip_as = create_clip_as
|
|
self._resolve_default_account = resolve_default_account
|
|
self._storage = storage or SubtitleRulesStorage()
|
|
|
|
self._config_file = "subtitle_rules_config"
|
|
self._log_file = "subtitle_rules_log"
|
|
self._last_fired_by_rule: Dict[str, float] = {}
|
|
|
|
def load_config(self) -> SubtitleRulesConfig:
|
|
data = self._storage.read_json(self._config_file, default={})
|
|
return SubtitleRulesConfig(data)
|
|
|
|
def save_config(self, config_dict: Dict[str, Any]) -> SubtitleRulesConfig:
|
|
cfg = SubtitleRulesConfig(config_dict)
|
|
self._storage.write_json(self._config_file, cfg.to_dict())
|
|
return cfg
|
|
|
|
def append_log(self, entry: Dict[str, Any]) -> None:
|
|
logs = self._storage.read_json(self._log_file, default=[])
|
|
if not isinstance(logs, list):
|
|
logs = []
|
|
logs.append(entry)
|
|
logs = logs[-400:]
|
|
self._storage.write_json(self._log_file, logs)
|
|
|
|
def read_log(self, limit: int = 120) -> List[Dict[str, Any]]:
|
|
logs = self._storage.read_json(self._log_file, default=[])
|
|
if not isinstance(logs, list):
|
|
return []
|
|
return logs[-limit:]
|
|
|
|
def _cooldown_ok(self, rule_id: str, cooldown_s: int) -> bool:
|
|
last = self._last_fired_by_rule.get(rule_id)
|
|
if last is None:
|
|
return True
|
|
return (time.time() - last) >= max(0, cooldown_s)
|
|
|
|
def _mark_fired(self, rule_id: str) -> None:
|
|
self._last_fired_by_rule[rule_id] = time.time()
|
|
|
|
def on_new_subtitle(self, *, ts_key: str, subtitle_text: str) -> None:
|
|
cfg = self.load_config()
|
|
if not cfg.enabled:
|
|
return
|
|
text = str(subtitle_text or "")
|
|
|
|
for r in cfg.rules:
|
|
if not r.id:
|
|
continue
|
|
if not r.matches(text):
|
|
continue
|
|
if not self._cooldown_ok(r.id, r.cooldown_seconds):
|
|
continue
|
|
|
|
account = (r.as_account or "").strip() or self._resolve_default_account()
|
|
action = (r.action or "clip").strip().lower()
|
|
|
|
if action == "send_message":
|
|
msg = (r.message_text or "").strip()
|
|
if not msg:
|
|
self.append_log(
|
|
{
|
|
"ts": _now_iso(),
|
|
"type": "rule_empty_message",
|
|
"channel": self.channel_name,
|
|
"subtitle_ts": ts_key,
|
|
"subtitle": text,
|
|
"rule_id": r.id,
|
|
}
|
|
)
|
|
continue
|
|
try:
|
|
self._send_message_as(account, msg)
|
|
self._mark_fired(r.id)
|
|
self.append_log(
|
|
{
|
|
"ts": _now_iso(),
|
|
"type": "message_sent",
|
|
"channel": self.channel_name,
|
|
"subtitle_ts": ts_key,
|
|
"subtitle": text,
|
|
"rule_id": r.id,
|
|
"as_account": account,
|
|
"message": msg,
|
|
}
|
|
)
|
|
except Exception as e:
|
|
self.append_log(
|
|
{
|
|
"ts": _now_iso(),
|
|
"type": "error",
|
|
"channel": self.channel_name,
|
|
"subtitle_ts": ts_key,
|
|
"subtitle": text,
|
|
"rule_id": r.id,
|
|
"action": action,
|
|
"error": str(e),
|
|
}
|
|
)
|
|
continue
|
|
|
|
# default: clip
|
|
try:
|
|
clip = self._create_clip_as(account, self.channel_name, bool(r.clip_has_delay))
|
|
clip_id = (clip or {}).get("id") or ""
|
|
clip_url = (clip or {}).get("url") or ""
|
|
clip_edit_url = (clip or {}).get("edit_url") or ""
|
|
|
|
sent_message = None
|
|
if bool(getattr(r, "clip_send_message", False)):
|
|
tpl = str(getattr(r, "clip_message_template", "") or "Clip: {url}")
|
|
msg = (
|
|
tpl.replace("{url}", str(clip_url))
|
|
.replace("{edit_url}", str(clip_edit_url))
|
|
.replace("{id}", str(clip_id))
|
|
).strip()
|
|
if msg:
|
|
self._send_message_as(account, msg)
|
|
sent_message = msg
|
|
self._mark_fired(r.id)
|
|
self.append_log(
|
|
{
|
|
"ts": _now_iso(),
|
|
"type": "clip_created",
|
|
"channel": self.channel_name,
|
|
"subtitle_ts": ts_key,
|
|
"subtitle": text,
|
|
"rule_id": r.id,
|
|
"as_account": account,
|
|
"clip_url": clip_url,
|
|
"clip_edit_url": clip_edit_url,
|
|
"clip_id": clip_id,
|
|
"chat_message": sent_message,
|
|
}
|
|
)
|
|
except Exception as e:
|
|
self.append_log(
|
|
{
|
|
"ts": _now_iso(),
|
|
"type": "error",
|
|
"channel": self.channel_name,
|
|
"subtitle_ts": ts_key,
|
|
"subtitle": text,
|
|
"rule_id": r.id,
|
|
"action": action,
|
|
"error": str(e),
|
|
}
|
|
)
|
|
|