Files
twitchBot-intelligent/twitch_bot/interaction_chat.py
T
2026-04-22 22:42:36 +02:00

488 lines
18 KiB
Python

import json
import os
import re
import subprocess
import threading
import time
from dataclasses import dataclass
from datetime import datetime
from typing import Any, Dict, List, Optional, Tuple
def _now_iso() -> str:
return datetime.now().isoformat(timespec="seconds")
def _lower(s: Optional[str]) -> Optional[str]:
return s.lower() if isinstance(s, str) else None
def _contains_case_insensitive(haystack: str, needle: str) -> bool:
return needle.lower() in haystack.lower()
def _normalize_accounts(accounts: List[str]) -> List[str]:
out: List[str] = []
for a in accounts:
a = (a or "").strip()
if not a:
continue
if a.startswith("@"):
a = a[1:]
out.append(a.lower())
return sorted(set(out))
def _extract_mentioned_accounts(message: str, registered_accounts: List[str]) -> List[str]:
msg = message or ""
hits: List[str] = []
for acc in registered_accounts:
if _contains_case_insensitive(msg, f"@{acc}"):
hits.append(acc)
return hits
def _strip_registered_mentions(message: str, registered_accounts: List[str]) -> str:
"""
Retire du texte les mentions @<compte> pour les comptes enregistrés.
Exemple: "@exoticnaturees hello" -> "hello"
"""
out = message or ""
for acc in registered_accounts:
# \b pour éviter de matcher des sous-chaînes
out = re.sub(rf"@{re.escape(acc)}\b", "", out, flags=re.IGNORECASE)
# Normaliser les espaces
out = re.sub(r"\s+", " ", out).strip()
return out
@dataclass
class InteractionRule:
id: str
enabled: bool
from_username: Optional[str] = None
mention_account: Optional[str] = None
contains_text: Optional[str] = None
response_text: str = ""
tgpt_preprompt: Optional[str] = None
@staticmethod
def from_dict(d: Dict[str, Any]) -> "InteractionRule":
return InteractionRule(
id=str(d.get("id") or ""),
enabled=bool(d.get("enabled", True)),
from_username=d.get("from_username") or None,
mention_account=d.get("mention_account") or None,
contains_text=d.get("contains_text") or None,
response_text=str(d.get("response_text") or ""),
tgpt_preprompt=d.get("tgpt_preprompt") or None,
)
def to_dict(self) -> Dict[str, Any]:
return {
"id": self.id,
"enabled": self.enabled,
"from_username": self.from_username,
"mention_account": self.mention_account,
"contains_text": self.contains_text,
"response_text": self.response_text,
"tgpt_preprompt": self.tgpt_preprompt,
}
def matches(self, *, username: str, message: str, mentioned_accounts: List[str]) -> bool:
if not self.enabled:
return False
if self.from_username and _lower(self.from_username) != _lower(username):
return False
if self.mention_account:
ma = self.mention_account.strip()
if ma.startswith("@"):
ma = ma[1:]
if ma.lower() not in mentioned_accounts:
return False
if self.contains_text and not _contains_case_insensitive(message, self.contains_text):
return False
return True
class InteractionChatConfig:
def __init__(self, data: Optional[Dict[str, Any]] = None):
data = data or {}
self.enabled: bool = bool(data.get("enabled", True))
self.mode: str = str(data.get("mode", "predefined")) # predefined | tgpt (inactive by default)
self.tgpt_enabled: bool = bool(data.get("tgpt_enabled", False))
self.tgpt_preprompt: str = str(data.get("tgpt_preprompt", "") or "")
self.tgpt_max_chars: int = int(data.get("tgpt_max_chars", 100))
self.cooldown_seconds: int = int(data.get("cooldown_seconds", 8))
self.default_responses: List[str] = [
s.strip() for s in (data.get("default_responses") or ["salut"]) if isinstance(s, str) and s.strip()
]
self.rules: List[InteractionRule] = [
InteractionRule.from_dict(x) for x in (data.get("rules") or []) if isinstance(x, dict)
]
def to_dict(self) -> Dict[str, Any]:
return {
"enabled": self.enabled,
"mode": self.mode,
"tgpt_enabled": self.tgpt_enabled,
"tgpt_preprompt": self.tgpt_preprompt,
"tgpt_max_chars": self.tgpt_max_chars,
"cooldown_seconds": self.cooldown_seconds,
"default_responses": self.default_responses,
"rules": [r.to_dict() for r in self.rules],
}
class InteractionChatStorage:
"""
Stockage simple en JSON dans storage/.
On évite d'utiliser PersistentStorage ici pour pouvoir stocker des listes (logs) proprement.
"""
def __init__(self, storage_dir: str = "storage"):
self.storage_dir = storage_dir
self._lock = threading.Lock()
def _path(self, filename: str) -> str:
if not filename.endswith(".json"):
filename = f"{filename}.json"
return os.path.join(self.storage_dir, filename)
def read_json(self, filename: str, default: Any) -> Any:
path = self._path(filename)
try:
with open(path, "r", encoding="utf-8") as f:
return json.load(f)
except (FileNotFoundError, json.JSONDecodeError):
return default
def write_json(self, filename: str, value: Any) -> None:
if not os.path.exists(self.storage_dir):
os.makedirs(self.storage_dir, exist_ok=True)
path = self._path(filename)
with self._lock:
with open(path, "w", encoding="utf-8") as f:
json.dump(value, f, indent=4, ensure_ascii=False)
class InteractionChatProcessor:
def __init__(
self,
*,
channel_name: str,
get_registered_accounts: callable,
get_account_policies: Optional[callable] = None,
send_message_as: callable,
storage: Optional[InteractionChatStorage] = None,
):
self.channel_name = channel_name
self._get_registered_accounts = get_registered_accounts
self._get_account_policies = get_account_policies
self._send_message_as = send_message_as
self._storage = storage or InteractionChatStorage()
self._config_file = "interaction_chat_config"
self._log_file = "interaction_chat_log"
self._running = False
self._thread: Optional[threading.Thread] = None
self._last_seen_key: Optional[Tuple[str, str, str]] = None # (timestamp_iso, username, content)
self._last_reply_ts_by_user: Dict[str, float] = {}
def load_config(self) -> InteractionChatConfig:
data = self._storage.read_json(self._config_file, default={})
return InteractionChatConfig(data)
def save_config(self, config_dict: Dict[str, Any]) -> InteractionChatConfig:
cfg = InteractionChatConfig(config_dict)
self._storage.write_json(self._config_file, cfg.to_dict())
return cfg
def append_log(self, entry: Dict[str, Any]) -> None:
logs = self._storage.read_json(self._log_file, default=[])
if not isinstance(logs, list):
logs = []
logs.append(entry)
logs = logs[-300:] # borne simple
self._storage.write_json(self._log_file, logs)
def read_log(self, limit: int = 100) -> List[Dict[str, Any]]:
logs = self._storage.read_json(self._log_file, default=[])
if not isinstance(logs, list):
return []
return logs[-limit:]
def stop(self) -> None:
self._running = False
if self._thread and self._thread.is_alive():
self._thread.join(timeout=2)
def start_background(self, *, get_latest_messages: callable, poll_interval_s: float = 0.5) -> None:
if self._thread and self._thread.is_alive():
return
self._running = True
def loop() -> None:
while self._running:
try:
self.process_new_messages(get_latest_messages())
except Exception as e:
self.append_log(
{
"ts": _now_iso(),
"type": "error",
"channel": self.channel_name,
"error": str(e),
}
)
time.sleep(poll_interval_s)
self._thread = threading.Thread(target=loop, daemon=True)
self._thread.start()
def _cooldown_ok(self, username: str, cooldown_s: int) -> bool:
last = self._last_reply_ts_by_user.get(username.lower())
if last is None:
return True
return (time.time() - last) >= max(0, cooldown_s)
def _mark_replied(self, username: str) -> None:
self._last_reply_ts_by_user[username.lower()] = time.time()
def process_new_messages(self, messages: List[Any]) -> None:
"""
`messages` est typiquement une liste de ChatMessage (timestamp, username, content).
On traite uniquement les nouveaux messages depuis le dernier message vu.
"""
if not messages:
return
cfg = self.load_config()
if not cfg.enabled:
return
registered_accounts = _normalize_accounts(self._get_registered_accounts())
if not registered_accounts:
return
# On ne veut pas répondre aux comptes "enregistrés" (évite boucles)
registered_set = set(registered_accounts)
policies: Dict[str, Dict[str, Any]] = {}
try:
if self._get_account_policies:
policies = self._get_account_policies() or {}
except Exception:
policies = {}
# Construire une vue stable des derniers messages (sécurité)
tail = messages[-80:]
to_process: List[Any] = []
if self._last_seen_key is None:
# premier passage: ne répondre qu'aux prochains messages, pas au backlog
last = tail[-1]
self._last_seen_key = (getattr(last, "timestamp").isoformat(), getattr(last, "username"), getattr(last, "content"))
return
seen = False
for m in tail:
key = (getattr(m, "timestamp").isoformat(), getattr(m, "username"), getattr(m, "content"))
if seen:
to_process.append(m)
elif key == self._last_seen_key:
seen = True
if not to_process:
# Si la clé a disparu (tail trop court), on se recale sur le dernier message
last = tail[-1]
self._last_seen_key = (getattr(last, "timestamp").isoformat(), getattr(last, "username"), getattr(last, "content"))
return
for m in to_process:
ts_iso = getattr(m, "timestamp").isoformat()
username = str(getattr(m, "username") or "")
content = str(getattr(m, "content") or "")
# mise à jour last_seen à chaque itération
self._last_seen_key = (ts_iso, username, content)
if not username or not content:
continue
sender = username.strip().lower()
if sender in registered_set:
allow_bypass = bool((policies.get(sender) or {}).get("interaction_bypass_antiloop", False))
if not allow_bypass:
continue
mentioned = _extract_mentioned_accounts(content, registered_accounts)
if not mentioned:
continue
if not self._cooldown_ok(username, cfg.cooldown_seconds):
continue
# mode tgpt prévu mais inactif par défaut
response_text = ""
matched_rule_id: Optional[str] = None
responder_account: Optional[str] = None
rule_tgpt_preprompt: Optional[str] = None
for r in cfg.rules:
if r.matches(username=username, message=content, mentioned_accounts=mentioned):
response_text = (r.response_text or "").strip()
matched_rule_id = r.id
if r.mention_account:
ma = r.mention_account.strip()
if ma.startswith("@"):
ma = ma[1:]
responder_account = ma.lower() if ma else None
if r.tgpt_preprompt:
rule_tgpt_preprompt = str(r.tgpt_preprompt)
break
# Choisir le compte expéditeur
if mentioned and not responder_account:
responder_account = mentioned[0].lower()
# Priorité aux règles: si une règle match, on n'exécute QUE la règle.
if matched_rule_id is not None:
if not response_text:
self.append_log(
{
"ts": _now_iso(),
"type": "rule_empty_response",
"channel": self.channel_name,
"from": username,
"content": content,
"mentioned_accounts": mentioned,
"responder_account": responder_account,
"rule_id": matched_rule_id,
}
)
continue
else:
if cfg.mode == "tgpt" and cfg.tgpt_enabled:
preprompt = (rule_tgpt_preprompt or cfg.tgpt_preprompt or "").strip()
cleaned_for_tgpt = _strip_registered_mentions(content, registered_accounts)
response_text = self._tgpt_generate(preprompt=preprompt, message=cleaned_for_tgpt)
response_text = self._truncate(response_text, cfg.tgpt_max_chars)
if not response_text:
self.append_log(
{
"ts": _now_iso(),
"type": "tgpt_empty",
"channel": self.channel_name,
"from": username,
"content": content,
"content_sent_to_tgpt": cleaned_for_tgpt,
"responder_account": responder_account,
}
)
continue
else:
if not response_text and cfg.default_responses:
response_text = cfg.default_responses[0].strip()
if not response_text:
continue
outgoing = f"@{username} {response_text}".strip()
# fallback: si on ne sait pas quel compte utiliser, on prend le 1er enregistré
if not responder_account:
responder_account = registered_accounts[0].lower()
self._send_message_as(responder_account, outgoing)
self._mark_replied(username)
self.append_log(
{
"ts": _now_iso(),
"type": "responded",
"channel": self.channel_name,
"from": username,
"content": content,
"mentioned_accounts": mentioned,
"responder_account": responder_account,
"response": outgoing,
"rule_id": matched_rule_id,
"mode": cfg.mode,
}
)
def _truncate(self, text: str, max_chars: int) -> str:
t = (text or "").strip()
if max_chars is None:
return t
try:
n = int(max_chars)
except Exception:
n = 100
if n <= 0:
return ""
if len(t) <= n:
return t
# laisser de la place pour "..."
cut = max(0, n - 3)
return t[:cut].rstrip() + "..."
def _tgpt_generate(self, *, preprompt: str, message: str) -> str:
"""
Appelle tgpt (CLI) et retourne une réponse nettoyée.
"""
query = message.strip()
if preprompt:
query = f"{preprompt}\n\n{query}"
try:
proc = subprocess.run(
["tgpt", "-q", query],
capture_output=True,
text=True,
timeout=25,
check=False,
)
except FileNotFoundError:
self.append_log(
{
"ts": _now_iso(),
"type": "tgpt_error",
"channel": self.channel_name,
"error": "tgpt_not_found",
}
)
return ""
except subprocess.TimeoutExpired:
self.append_log(
{
"ts": _now_iso(),
"type": "tgpt_error",
"channel": self.channel_name,
"error": "tgpt_timeout",
}
)
return ""
except Exception as e:
self.append_log(
{
"ts": _now_iso(),
"type": "tgpt_error",
"channel": self.channel_name,
"error": str(e),
}
)
return ""
out = (proc.stdout or "").strip()
if not out:
out = (proc.stderr or "").strip()
# Nettoyage minimal (tgpt renvoie parfois "Assistant:" / "Answer:")
for prefix in ("Assistant:", "assistant:", "Answer:", "Réponse:", "Response:"):
if out.startswith(prefix):
out = out[len(prefix) :].strip()
return out.replace("\n", " ").strip()