Files
2026-04-28 21:06:26 +02:00

1546 lines
58 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import json
import os
import subprocess
import sys
import threading
import time
from datetime import datetime
_ROOT = os.path.dirname(os.path.abspath(__file__))
if _ROOT not in sys.path:
sys.path.insert(0, _ROOT)
from flask import Flask, render_template, request, jsonify
from flask_socketio import SocketIO, emit
from fonction.first_class import IA_generator, messageTwitch, storage
from twitch_bot import chat_state
from twitch_bot.controller import bot_controller
from twitch_bot.interaction_chat import InteractionChatConfig
import requests
def _first_enabled_user_index(config_path: str = "config/user.json") -> int:
try:
with open(config_path, "r", encoding="utf-8") as f:
users = json.load(f) or []
for i, u in enumerate(users):
if isinstance(u, dict) and u.get("enabled", True):
return i
except Exception:
pass
return 0
def _read_json_file(path: str, default):
try:
with open(path, "r", encoding="utf-8") as f:
data = json.load(f)
return data if data is not None else default
except Exception:
return default
def _twitch_client_id() -> str:
cfg = _read_json_file("config/config.json", default={})
if isinstance(cfg, dict):
v = cfg.get("twitch_client_id") or cfg.get("twitch_clientId") or cfg.get("twitchClientId")
if isinstance(v, str):
return v.strip()
return ""
def _user_bearer_token(user_id: int) -> str:
users = _read_json_file("config/user.json", default=[])
if not isinstance(users, list) or user_id < 0 or user_id >= len(users):
raise ValueError("Utilisateur non trouvé")
u = users[user_id]
if not isinstance(u, dict):
raise ValueError("Utilisateur invalide")
raw = (u.get("tw_acc_token") or "").strip()
if raw.lower().startswith("oauth:"):
raw = raw.split(":", 1)[1].strip()
if not raw:
raise ValueError("Token OAuth manquant")
return raw
def _twitch_helix_headers(user_bearer: str) -> dict:
client_id = _twitch_client_id()
if not client_id:
raise ValueError("Client-ID Twitch manquant. Ajoutez `twitch_client_id` dans config/config.json")
return {
"Client-ID": client_id,
"Authorization": f"Bearer {user_bearer}",
}
def _twitch_get_user_id_by_login(login: str, user_bearer: str) -> str:
login = (login or "").strip().lstrip("@")
if not login:
raise ValueError("Nom de chaîne requis")
headers = _twitch_helix_headers(user_bearer)
r = requests.get("https://api.twitch.tv/helix/users", headers=headers, params={"login": login}, timeout=10)
try:
payload = r.json()
except Exception:
payload = None
if r.status_code >= 400:
msg = ""
if isinstance(payload, dict):
msg = payload.get("message") or payload.get("error") or ""
raise RuntimeError(f"Erreur Twitch /helix/users ({r.status_code}) {msg}".strip())
data = (payload or {}).get("data") if isinstance(payload, dict) else None
if not isinstance(data, list) or not data:
raise ValueError("Chaîne Twitch introuvable (login)")
user = data[0] or {}
broadcaster_id = user.get("id")
if not broadcaster_id:
raise RuntimeError("Réponse Twitch invalide: id manquant")
return str(broadcaster_id)
def _twitch_create_clip(broadcaster_id: str, user_bearer: str, has_delay: bool = False) -> dict:
headers = _twitch_helix_headers(user_bearer)
params = {"broadcaster_id": str(broadcaster_id), "has_delay": "true" if has_delay else "false"}
r = requests.post("https://api.twitch.tv/helix/clips", headers=headers, params=params, timeout=15)
try:
payload = r.json()
except Exception:
payload = None
if r.status_code >= 400:
msg = ""
if isinstance(payload, dict):
msg = payload.get("message") or payload.get("error") or ""
hint = ""
if r.status_code in (401, 403):
hint = " (vérifiez que le token a le scope `clips:edit` et qu'il n'est pas expiré)"
raise RuntimeError(f"Erreur Twitch /helix/clips ({r.status_code}) {msg}{hint}".strip())
data = (payload or {}).get("data") if isinstance(payload, dict) else None
if not isinstance(data, list) or not data:
raise RuntimeError("Réponse Twitch invalide: data manquant")
clip = data[0] or {}
clip_id = clip.get("id")
edit_url = clip.get("edit_url")
if not clip_id:
raise RuntimeError("Réponse Twitch invalide: clip id manquant")
# URL publique approximative (le retour officiel donne surtout edit_url)
public_url = f"https://clips.twitch.tv/{clip_id}"
return {
"id": clip_id,
"edit_url": edit_url,
"url": public_url,
}
app = Flask(__name__)
app.config['SECRET_KEY'] = 'your-secret-key-here'
socketio = SocketIO(app, cors_allowed_origins="*")
@app.route('/')
def index():
return render_template('index.html')
@app.route('/api/flux', methods=['GET'])
def get_flux():
return jsonify(bot_controller.get_flux_list())
@app.route('/api/flux', methods=['POST'])
def add_flux():
data = request.json
channel_name = data.get('channel_name')
record_audio = data.get('record_audio', True)
send_messages = bool(data.get('send_messages', True))
enable_ia = bool(data.get('enable_ia', True))
if not channel_name:
return jsonify({'error': 'Nom du canal requis'}), 400
try:
flux_id = bot_controller.add_flux(channel_name, record_audio, send_messages, enable_ia)
return jsonify({'success': True, 'flux_id': flux_id})
except Exception as e:
error_msg = f"Erreur lors de l'ajout du flux: {str(e)}"
print(f"Erreur API add_flux: {error_msg}")
return jsonify({'error': error_msg}), 500
@app.route('/api/flux/<int:flux_id>', methods=['DELETE'])
def remove_flux(flux_id):
if bot_controller.remove_flux(flux_id):
return jsonify({'success': True})
return jsonify({'error': 'Flux non trouvé'}), 404
@app.route('/api/flux/<int:flux_id>/status', methods=['GET'])
def get_flux_status(flux_id):
"""Obtenir le statut détaillé d'un flux spécifique"""
for flux in bot_controller.flux_list:
if flux['id'] == flux_id:
status = {
'id': flux_id,
'name': flux['name'],
'active': flux['active'],
'status': flux.get('status', 'unknown'),
'created_at': flux['created_at'],
'bots': {}
}
# Ajouter les informations des bots si disponibles
if flux_id in bot_controller.bots:
bots = bot_controller.bots[flux_id]
if bots['chat_bot']:
status['bots']['chat'] = {
'running': bots['chat_bot'].is_running if hasattr(bots['chat_bot'], 'is_running') else True
}
if bots['record_bot']:
status['bots']['record'] = {
'running': bots['record_bot'].running if hasattr(bots['record_bot'], 'running') else True
}
return jsonify(status)
return jsonify({'error': 'Flux non trouvé'}), 404
@app.route('/api/flux/<int:flux_id>/toggle', methods=['POST'])
def toggle_flux(flux_id):
"""Activer/désactiver un flux"""
for flux in bot_controller.flux_list:
if flux['id'] == flux_id:
flux['active'] = not flux['active']
# Arrêter/démarrer les bots selon le nouveau statut
if flux_id in bot_controller.bots:
bots = bot_controller.bots[flux_id]
if not flux['active']:
# Arrêter les bots
try:
if bots['chat_bot']:
bots['chat_bot'].stop()
if bots['record_bot']:
bots['record_bot'].stop()
if bots['subtitle_bot']:
bots['subtitle_bot'].stop()
if bots['ia_bot']:
bots['ia_bot'].stop()
if bots['message_bot']:
bots['message_bot'].stop()
except Exception as e:
print(f"Erreur lors de l'arrêt des bots: {e}")
else:
# Redémarrer les bots
try:
if bots['chat_bot']:
bots['chat_bot'].start_background()
if bots['record_bot']:
threading.Thread(target=bots['record_bot'].main, daemon=True).start()
if bots['subtitle_bot']:
bots['subtitle_bot'].start_main_loop()
if bool(flux.get("enable_ia", True)) and bots.get('ia_bot'):
bots['ia_bot'].start_main_loop()
# Respecter le flag par-flux send_messages
if bool(flux.get("send_messages", True)) and bool(flux.get("enable_ia", True)) and bots.get('message_bot'):
bots['message_bot'].start_loop_respond()
except Exception as e:
print(f"Erreur lors du redémarrage des bots: {e}")
return jsonify({'success': True, 'active': flux['active']})
return jsonify({'error': 'Flux non trouvé'}), 404
@app.route('/api/config/prompts', methods=['GET'])
def get_prompts():
return jsonify(bot_controller.config.get('list_prompt', []))
@app.route('/api/config/prompts', methods=['POST'])
def update_prompts():
data = request.json
prompts = data.get('prompts', [])
bot_controller.config['list_prompt'] = prompts
bot_controller.save_config()
return jsonify({'success': True})
@app.route('/api/config/settings', methods=['GET'])
def get_settings():
"""Récupérer les paramètres généraux (config/config.json)."""
try:
cfg = bot_controller.load_config()
return jsonify({
'success': True,
'settings': {
'twitch_message_max_chars': int(cfg.get('twitch_message_max_chars', 100)),
}
})
except Exception as e:
return jsonify({'success': False, 'error': str(e)}), 500
@app.route('/api/config/settings', methods=['POST'])
def save_settings():
"""Sauvegarder les paramètres généraux (config/config.json)."""
try:
data = request.json or {}
settings = data.get('settings') or {}
cfg = bot_controller.load_config()
try:
cfg['twitch_message_max_chars'] = int(settings.get('twitch_message_max_chars', cfg.get('twitch_message_max_chars', 100)))
except Exception:
cfg['twitch_message_max_chars'] = 100
bot_controller.config = cfg
bot_controller.save_config()
return jsonify({'success': True, 'settings': {'twitch_message_max_chars': cfg['twitch_message_max_chars']}})
except Exception as e:
return jsonify({'success': False, 'error': str(e)}), 500
@app.route('/api/config/users', methods=['GET'])
def get_users():
"""Récupérer la liste des utilisateurs"""
try:
with open('config/user.json', 'r') as file:
users = json.load(file)
return jsonify(users)
except FileNotFoundError:
return jsonify([])
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route('/api/config/users', methods=['POST'])
def add_user():
"""Ajouter un nouvel utilisateur"""
data = request.json
pseudo = data.get('tw_acc_pseudo')
token = data.get('tw_acc_token')
charactere = data.get('charactere', '😊')
enabled = bool(data.get('enabled', True))
interaction_bypass_antiloop = bool(data.get('interaction_bypass_antiloop', False))
if not pseudo or not token:
return jsonify({'error': 'Pseudo et token requis'}), 400
try:
with open('config/user.json', 'r') as file:
users = json.load(file)
# Vérifier si l'utilisateur existe déjà
for user in users:
if user['tw_acc_pseudo'] == pseudo:
return jsonify({'error': 'Cet utilisateur existe déjà'}), 400
# Ajouter le nouvel utilisateur
new_user = {
'tw_acc_pseudo': pseudo,
'tw_acc_token': token,
'charactere': charactere,
'enabled': enabled,
'interaction_bypass_antiloop': interaction_bypass_antiloop,
}
users.append(new_user)
# Sauvegarder
with open('config/user.json', 'w') as file:
json.dump(users, file, indent=4, ensure_ascii=False)
return jsonify({'success': True, 'user': new_user})
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route('/api/config/users/<int:user_id>', methods=['PUT'])
def update_user(user_id):
"""Modifier un utilisateur existant"""
data = request.json
pseudo = data.get('tw_acc_pseudo')
token = data.get('tw_acc_token')
charactere = data.get('charactere', '😊')
enabled = bool(data.get('enabled', True))
interaction_bypass_antiloop = bool(data.get('interaction_bypass_antiloop', False))
if not pseudo or not token:
return jsonify({'error': 'Pseudo et token requis'}), 400
try:
with open('config/user.json', 'r') as file:
users = json.load(file)
if user_id >= len(users):
return jsonify({'error': 'Utilisateur non trouvé'}), 404
# Vérifier si le pseudo existe déjà (sauf pour l'utilisateur actuel)
for i, user in enumerate(users):
if i != user_id and user['tw_acc_pseudo'] == pseudo:
return jsonify({'error': 'Ce pseudo est déjà utilisé'}), 400
# Mettre à jour l'utilisateur
users[user_id] = {
'tw_acc_pseudo': pseudo,
'tw_acc_token': token,
'charactere': charactere,
'enabled': enabled,
'interaction_bypass_antiloop': interaction_bypass_antiloop,
}
# Sauvegarder
with open('config/user.json', 'w') as file:
json.dump(users, file, indent=4, ensure_ascii=False)
return jsonify({'success': True, 'user': users[user_id]})
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route('/api/config/users/<int:user_id>/enabled', methods=['POST'])
def set_user_enabled(user_id):
"""Activer/désactiver un utilisateur sans toucher pseudo/token."""
try:
data = request.json or {}
enabled = bool(data.get('enabled', True))
with open('config/user.json', 'r', encoding='utf-8') as file:
users = json.load(file)
if user_id >= len(users):
return jsonify({'error': 'Utilisateur non trouvé'}), 404
if not isinstance(users[user_id], dict):
return jsonify({'error': 'Utilisateur invalide'}), 400
users[user_id]['enabled'] = enabled
with open('config/user.json', 'w', encoding='utf-8') as file:
json.dump(users, file, indent=4, ensure_ascii=False)
return jsonify({'success': True, 'user': users[user_id]})
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route('/api/config/users/<int:user_id>/interaction-bypass', methods=['POST'])
def set_user_interaction_bypass(user_id):
"""Autoriser un utilisateur à bypass lanti-boucle Interaction chat."""
try:
data = request.json or {}
bypass = bool(data.get('interaction_bypass_antiloop', False))
with open('config/user.json', 'r', encoding='utf-8') as file:
users = json.load(file)
if user_id >= len(users):
return jsonify({'error': 'Utilisateur non trouvé'}), 404
if not isinstance(users[user_id], dict):
return jsonify({'error': 'Utilisateur invalide'}), 400
users[user_id]['interaction_bypass_antiloop'] = bypass
with open('config/user.json', 'w', encoding='utf-8') as file:
json.dump(users, file, indent=4, ensure_ascii=False)
return jsonify({'success': True, 'user': users[user_id]})
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route('/api/config/users/<int:user_id>', methods=['DELETE'])
def delete_user(user_id):
"""Supprimer un utilisateur"""
try:
with open('config/user.json', 'r') as file:
users = json.load(file)
if user_id >= len(users):
return jsonify({'error': 'Utilisateur non trouvé'}), 404
# Supprimer l'utilisateur
deleted_user = users.pop(user_id)
# Sauvegarder
with open('config/user.json', 'w') as file:
json.dump(users, file, indent=4, ensure_ascii=False)
return jsonify({'success': True, 'deleted_user': deleted_user})
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route('/api/subtitles', methods=['GET'])
def get_subtitles():
flux_id = request.args.get("flux_id")
channel = request.args.get("channel")
key = "subtitle_data"
if channel:
key = f"subtitle_data__{str(channel).strip().lower()}"
elif flux_id:
try:
fid = int(flux_id)
flux = next((f for f in bot_controller.flux_list if f.get("id") == fid), None)
if flux and flux.get("twitchname"):
key = f"subtitle_data__{str(flux.get('twitchname')).strip().lower()}"
except Exception:
pass
data = storage.read(key)
return jsonify(data if isinstance(data, dict) else {})
@app.route('/api/subtitles/last', methods=['GET'])
def get_last_subtitle():
"""Retourne le dernier sous-titre (clé + texte)."""
try:
flux_id = request.args.get("flux_id")
channel = request.args.get("channel")
key = "subtitle_data"
if channel:
key = f"subtitle_data__{str(channel).strip().lower()}"
elif flux_id:
try:
fid = int(flux_id)
flux = next((f for f in bot_controller.flux_list if f.get("id") == fid), None)
if flux and flux.get("twitchname"):
key = f"subtitle_data__{str(flux.get('twitchname')).strip().lower()}"
except Exception:
pass
subtitle_data = storage.read(key) or {}
if not isinstance(subtitle_data, dict) or not subtitle_data:
return jsonify({'success': True, 'last_key': None, 'last_subtitle': ''})
sorted_keys = sorted(subtitle_data.keys())
def _good_candidate(text: str) -> bool:
t = (text or "").strip()
if not t:
return False
# Heuristique: éviter les "bruits" très courts pour la génération IA
words = [w for w in t.split() if w]
return (len(t) >= 35) or (len(words) >= 8)
# Chercher le plus récent "utile", sinon fallback sur le tout dernier
picked_key = None
picked_text = ""
for k in reversed(sorted_keys):
t = subtitle_data.get(k, "")
if _good_candidate(str(t)):
picked_key = k
picked_text = str(t)
break
if picked_key is None:
picked_key = sorted_keys[-1]
picked_text = str(subtitle_data.get(picked_key, "") or "")
return jsonify({'success': True, 'last_key': picked_key, 'last_subtitle': picked_text})
except Exception as e:
return jsonify({'success': False, 'error': str(e)}), 500
@app.route('/api/subtitles/clear', methods=['POST'])
def clear_subtitles():
"""Nettoyer l'historique des sous-titres"""
try:
flux_id = request.args.get("flux_id")
channel = request.args.get("channel")
storage_key = "subtitle_data"
if channel:
storage_key = f"subtitle_data__{str(channel).strip().lower()}"
elif flux_id:
try:
fid = int(flux_id)
flux = next((f for f in bot_controller.flux_list if f.get("id") == fid), None)
if flux and flux.get("twitchname"):
storage_key = f"subtitle_data__{str(flux.get('twitchname')).strip().lower()}"
except Exception:
pass
# Récupérer toutes les clés de sous-titres
subtitle_data = storage.read(storage_key)
# Supprimer chaque clé une par une
for loop_key in list((subtitle_data or {}).keys()):
storage.delete(storage_key, loop_key)
# Supprimer également le fichier JSON s'il existe
storage_dir = "storage"
subtitle_file = os.path.join(storage_dir, f"{storage_key}.json")
if os.path.exists(subtitle_file):
os.remove(subtitle_file)
return jsonify({
'success': True,
'message': 'Historique des sous-titres nettoyé avec succès'
})
except Exception as e:
return jsonify({
'success': False,
'error': f'Erreur lors du nettoyage: {str(e)}'
}), 500
@app.route('/api/subtitles/rules/config', methods=['GET'])
def get_subtitle_rules_config():
try:
processor = None
for flux_id, bots in bot_controller.bots.items():
processor = bots.get('subtitle_rules')
if processor:
break
if not processor:
from twitch_bot.subtitle_rules import SubtitleRulesStorage, SubtitleRulesConfig
st = SubtitleRulesStorage()
cfg = SubtitleRulesConfig(st.read_json("subtitle_rules_config", default={}))
data = cfg.to_dict()
else:
data = processor.load_config().to_dict()
return jsonify({'success': True, 'config': data})
except Exception as e:
return jsonify({'success': False, 'error': str(e)}), 500
@app.route('/api/subtitles/rules/config', methods=['POST'])
def save_subtitle_rules_config():
try:
payload = request.json or {}
config_dict = payload.get('config') or {}
processor = None
for flux_id, bots in bot_controller.bots.items():
processor = bots.get('subtitle_rules')
if processor:
break
if processor:
cfg = processor.save_config(config_dict)
data = cfg.to_dict()
else:
from twitch_bot.subtitle_rules import SubtitleRulesStorage, SubtitleRulesConfig
st = SubtitleRulesStorage()
cfg = SubtitleRulesConfig(config_dict)
st.write_json("subtitle_rules_config", cfg.to_dict())
data = cfg.to_dict()
return jsonify({'success': True, 'config': data})
except Exception as e:
return jsonify({'success': False, 'error': str(e)}), 500
@app.route('/api/subtitles/rules/log', methods=['GET'])
def get_subtitle_rules_log():
try:
limit = int(request.args.get('limit', '120'))
limit = max(1, min(500, limit))
processor = None
for flux_id, bots in bot_controller.bots.items():
processor = bots.get('subtitle_rules')
if processor:
break
if processor:
logs = processor.read_log(limit=limit)
else:
from twitch_bot.subtitle_rules import SubtitleRulesStorage
st = SubtitleRulesStorage()
logs = st.read_json("subtitle_rules_log", default=[])
if not isinstance(logs, list):
logs = []
logs = logs[-limit:]
return jsonify({'success': True, 'logs': logs})
except Exception as e:
return jsonify({'success': False, 'error': str(e)}), 500
@app.route('/api/subtitles/process', methods=['POST'])
def process_subtitles():
"""Lancer manuellement le traitement des sous-titres"""
try:
import subprocess
import os
import json
from datetime import datetime
# Vérifier que le dossier record existe
record_dir = "record"
if not os.path.exists(record_dir):
return jsonify({
'success': False,
'error': 'Dossier record non trouvé'
}), 404
# Trouver les fichiers audio
audio_files = [f for f in os.listdir(record_dir) if f.endswith('.mp3')]
if not audio_files:
return jsonify({
'success': False,
'error': 'Aucun fichier audio trouvé'
}), 404
subtitles_created = 0
subtitles_data = {}
# Traiter chaque fichier audio
for audio_file in audio_files[:3]: # Limiter à 3 fichiers pour éviter de surcharger
audio_path = os.path.join(record_dir, audio_file)
try:
# Lancer Whisper
command = [
'whisper',
'--language', 'fr',
audio_path,
'--device', 'cuda',
'--model', 'large-v3'
]
result = subprocess.run(command, capture_output=True, text=True, timeout=60)
if result.returncode == 0:
# Lire le fichier .txt généré
txt_file = audio_file.replace('.mp3', '.txt')
if os.path.exists(txt_file):
with open(txt_file, 'r', encoding='utf-8') as f:
content = f.read().strip()
# Nettoyer le contenu
content = content.replace("'", "").replace('"', "").replace("\n", " ").replace(",", "")
# Supprimer les répétitions de mots
words = content.split()
seen = set()
result_words = []
for word in words:
if word not in seen:
result_words.append(word)
seen.add(word)
cleaned_content = " ".join(result_words)
# Sauvegarder dans le stockage
current_time = datetime.now().strftime('%H:%M:%S')
subtitles_data[current_time] = cleaned_content
# Sauvegarder dans le fichier de stockage
storage_dir = "storage"
if not os.path.exists(storage_dir):
os.makedirs(storage_dir)
subtitle_file = os.path.join(storage_dir, "subtitle_data.json")
try:
with open(subtitle_file, 'r', encoding='utf-8') as f:
existing_data = json.load(f)
except (FileNotFoundError, json.JSONDecodeError):
existing_data = {}
existing_data[current_time] = cleaned_content
with open(subtitle_file, 'w', encoding='utf-8') as f:
json.dump(existing_data, f, indent=4, ensure_ascii=False)
subtitles_created += 1
# Nettoyer les fichiers temporaires
os.remove(txt_file)
os.remove(audio_path) # Supprimer le fichier audio traité
except subprocess.TimeoutExpired:
continue # Passer au fichier suivant
except Exception as e:
continue # Passer au fichier suivant
return jsonify({
'success': True,
'message': f'Traitement terminé. {subtitles_created} sous-titre(s) créé(s).',
'subtitles': subtitles_data
})
except Exception as e:
return jsonify({
'success': False,
'error': f'Erreur lors du traitement: {str(e)}'
}), 500
@app.route('/api/generations', methods=['GET'])
def get_generations():
data = storage.read("IA_generator")
return jsonify(data)
@app.route('/api/send-message', methods=['POST'])
def send_message():
data = request.json
message = data.get('message')
channel = data.get('channel', 'default')
user_id = data.get('user_id', 0) # Nouveau paramètre pour choisir l'utilisateur
if not message:
return jsonify({'error': 'Message requis'}), 400
# Trouver le bot de message pour ce canal
try:
msg_bot = messageTwitch("config/user.json", channel)
# Utiliser send_message_user au lieu de send_message pour spécifier l'utilisateur
msg_bot.send_message_user(user_id, message)
return jsonify({'success': True})
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route('/api/generate-response', methods=['POST'])
def generate_response():
data = request.json
text = data.get('text', '')
try:
ia_gen = IA_generator("config/config.json")
ia_gen.main_ask(text)
return jsonify({'success': True})
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route('/api/chat/<int:flux_id>/messages', methods=['GET'])
def get_chat_messages(flux_id):
"""Récupérer les messages de chat d'un flux spécifique"""
try:
if flux_id not in bot_controller.bots:
return jsonify({'error': 'Flux non trouvé'}), 404
chat_bot = bot_controller.bots[flux_id]['chat_bot']
if not chat_bot:
return jsonify({'error': 'Bot de chat non disponible'}), 404
# Récupérer les messages du bot de chat
messages = []
for msg in chat_bot.messages[-50:]: # Derniers 50 messages
messages.append({
'timestamp': msg.timestamp.isoformat(),
'username': msg.username,
'content': msg.content
})
return jsonify({
'success': True,
'flux_id': flux_id,
'messages': messages
})
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route('/api/chat/<int:flux_id>/send', methods=['POST'])
def send_chat_message(flux_id):
"""Envoyer un message dans le chat d'un flux spécifique"""
data = request.json
message = data.get('message', '')
user_id = data.get('user_id', 0) # Nouveau paramètre pour choisir l'utilisateur
if not message:
return jsonify({'error': 'Message requis'}), 400
try:
# Trouver le flux
flux = None
for f in bot_controller.flux_list:
if f['id'] == flux_id:
flux = f
break
if not flux:
return jsonify({'error': 'Flux non trouvé'}), 404
if not bool(flux.get("send_messages", True)):
return jsonify({'error': 'Envoi de messages désactivé pour ce flux'}), 403
# Envoyer le message avec l'utilisateur spécifié
msg_bot = messageTwitch("config/user.json", flux['twitchname'])
msg_bot.send_message_user(user_id, message)
return jsonify({'success': True})
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route('/api/status', methods=['GET'])
def get_status():
status = {
'flux_count': len(bot_controller.flux_list),
'active_recordings': sum(1 for f in bot_controller.flux_list if f['record_audio'] and f['active']),
'chat_connections': sum(1 for f in bot_controller.flux_list if f['active']),
'last_subtitle': '',
'next_message': '',
'recent_messages': []
}
# Récupérer le dernier sous-titre
subtitle_data = storage.read("subtitle_data")
if subtitle_data:
sorted_keys = sorted(subtitle_data.keys())
if sorted_keys:
status['last_subtitle'] = subtitle_data[sorted_keys[-1]]
# Récupérer la dernière génération
generation_data = storage.read("IA_generator")
if generation_data:
sorted_keys = sorted(generation_data.keys())
if sorted_keys:
status['next_message'] = generation_data[sorted_keys[-1]]
return jsonify(status)
@socketio.on('connect')
def handle_connect():
print('Client connecté')
emit('status', {'message': 'Connecté au serveur'})
@socketio.on('disconnect')
def handle_disconnect():
print('Client déconnecté')
# Thread pour envoyer les mises à jour en temps réel
def background_updates():
while True:
try:
status = {
'timestamp': datetime.now().isoformat(),
'flux_count': len(bot_controller.flux_list),
'active_recordings': sum(1 for f in bot_controller.flux_list if f['record_audio'] and f['active']),
}
socketio.emit('status_update', status)
time.sleep(5) # Mise à jour toutes les 5 secondes
except Exception as e:
print(f"Erreur dans background_updates: {e}")
time.sleep(10)
# Variables globales pour la génération automatique
auto_subtitle_running = False
current_processing_file = None
# Variables globales pour l'envoi automatique de messages
auto_message_running = False
current_message_bot = None
@app.route('/api/subtitles/auto/start', methods=['POST'])
def start_auto_subtitle():
"""Démarrer la génération automatique de sous-titres"""
global auto_subtitle_running
try:
# Vérifier si déjà en cours
if auto_subtitle_running:
return jsonify({
'success': False,
'error': 'La génération automatique est déjà en cours'
}), 400
auto_subtitle_running = True
print(f"[{datetime.now().strftime('%H:%M:%S')}] Démarrage de la génération automatique de sous-titres")
# Démarrer le thread de génération automatique
threading.Thread(target=auto_subtitle_loop, daemon=True).start()
return jsonify({
'success': True,
'message': 'Génération automatique démarrée'
})
except Exception as e:
auto_subtitle_running = False
return jsonify({
'success': False,
'error': f'Erreur lors du démarrage: {str(e)}'
}), 500
@app.route('/api/subtitles/auto/stop', methods=['POST'])
def stop_auto_subtitle():
"""Arrêter la génération automatique de sous-titres"""
global auto_subtitle_running
try:
if not auto_subtitle_running:
return jsonify({
'success': False,
'error': 'La génération automatique n\'est pas en cours'
}), 400
print(f"[{datetime.now().strftime('%H:%M:%S')}] Arrêt de la génération automatique de sous-titres")
auto_subtitle_running = False
return jsonify({
'success': True,
'message': 'Génération automatique arrêtée'
})
except Exception as e:
return jsonify({
'success': False,
'error': f'Erreur lors de l\'arrêt: {str(e)}'
}), 500
@app.route('/api/subtitles/auto/force-stop', methods=['POST'])
def force_stop_auto_subtitle():
"""Forcer l'arrêt de la génération automatique de sous-titres"""
global auto_subtitle_running, current_processing_file
try:
print(f"[{datetime.now().strftime('%H:%M:%S')}] Arrêt forcé de la génération automatique de sous-titres")
auto_subtitle_running = False
current_processing_file = None
return jsonify({
'success': True,
'message': 'Arrêt forcé de la génération automatique de sous-titres'
})
except Exception as e:
return jsonify({
'success': False,
'error': f'Erreur lors de l\'arrêt forcé: {str(e)}'
}), 500
@app.route('/api/subtitles/auto/status', methods=['GET'])
def get_auto_subtitle_status():
"""Obtenir le statut de la génération automatique"""
global auto_subtitle_running, current_processing_file
return jsonify({
'running': auto_subtitle_running,
'current_file': current_processing_file
})
def auto_subtitle_loop():
"""Boucle de génération automatique de sous-titres"""
global auto_subtitle_running, current_processing_file
print(f"[{datetime.now().strftime('%H:%M:%S')}] Démarrage de la boucle de génération automatique de sous-titres")
while auto_subtitle_running:
try:
# Vérifier s'il y a des fichiers audio à traiter
record_dir = "record"
if not os.path.exists(record_dir):
time.sleep(5)
continue
audio_files = [f for f in os.listdir(record_dir) if f.endswith('.mp3')]
if not audio_files:
time.sleep(5)
continue
# Traiter le premier fichier
audio_file = audio_files[0]
current_processing_file = audio_file
print(f"[{datetime.now().strftime('%H:%M:%S')}] Traitement de: {audio_file}")
# Émettre l'événement de début de traitement
socketio.emit('subtitle_processing_start', {
'file': audio_file,
'timestamp': datetime.now().isoformat()
})
# Traiter le fichier
audio_path = os.path.join(record_dir, audio_file)
try:
# Lancer Whisper avec vérification périodique de l'arrêt
command = [
'whisper',
'--language', 'fr',
audio_path,
'--device', 'cuda',
'--model', 'large-v3'
]
# Utiliser Popen pour pouvoir interrompre le processus
process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
# Attendre la fin du processus avec vérification périodique
while process.poll() is None:
if not auto_subtitle_running:
print(f"[{datetime.now().strftime('%H:%M:%S')}] Arrêt demandé, interruption du processus Whisper")
process.terminate()
try:
process.wait(timeout=5)
except subprocess.TimeoutExpired:
process.kill()
break
time.sleep(1)
# Si la boucle a été arrêtée, sortir
if not auto_subtitle_running:
print(f"[{datetime.now().strftime('%H:%M:%S')}] Arrêt de la boucle de génération automatique")
break
result = process.returncode
stdout, stderr = process.communicate()
if result == 0:
# Lire le fichier .txt généré
txt_file = audio_file.replace('.mp3', '.txt')
if os.path.exists(txt_file):
with open(txt_file, 'r', encoding='utf-8') as f:
content = f.read().strip()
# Nettoyer le contenu
content = content.replace("'", "").replace('"', "").replace("\n", " ").replace(",", "")
# Supprimer les répétitions de mots
words = content.split()
seen = set()
result_words = []
for word in words:
if word not in seen:
result_words.append(word)
seen.add(word)
cleaned_content = " ".join(result_words)
# Sauvegarder dans le stockage
current_time = datetime.now().strftime('%H:%M:%S')
# Sauvegarder dans le fichier de stockage
storage_dir = "storage"
if not os.path.exists(storage_dir):
os.makedirs(storage_dir)
subtitle_file = os.path.join(storage_dir, "subtitle_data.json")
try:
with open(subtitle_file, 'r', encoding='utf-8') as f:
existing_data = json.load(f)
except (FileNotFoundError, json.JSONDecodeError):
existing_data = {}
existing_data[current_time] = cleaned_content
with open(subtitle_file, 'w', encoding='utf-8') as f:
json.dump(existing_data, f, indent=4, ensure_ascii=False)
# Émettre l'événement de fin de traitement
socketio.emit('subtitle_processing_complete', {
'file': audio_file,
'subtitle': cleaned_content,
'timestamp': current_time
})
print(f"[{datetime.now().strftime('%H:%M:%S')}] Sous-titre généré: {cleaned_content}")
# Nettoyer les fichiers temporaires
os.remove(txt_file)
os.remove(audio_path)
else:
# Émettre l'événement d'erreur
socketio.emit('subtitle_processing_error', {
'file': audio_file,
'error': 'Aucun fichier .txt généré'
})
print(f"[{datetime.now().strftime('%H:%M:%S')}] Erreur: Aucun fichier .txt généré")
else:
# Émettre l'événement d'erreur
socketio.emit('subtitle_processing_error', {
'file': audio_file,
'error': f'Erreur Whisper: {stderr}'
})
print(f"[{datetime.now().strftime('%H:%M:%S')}] Erreur Whisper: {stderr}")
except subprocess.TimeoutExpired:
socketio.emit('subtitle_processing_error', {
'file': audio_file,
'error': 'Timeout - Whisper a pris trop de temps'
})
print(f"[{datetime.now().strftime('%H:%M:%S')}] Timeout - Whisper a pris trop de temps")
except Exception as e:
socketio.emit('subtitle_processing_error', {
'file': audio_file,
'error': f'Erreur: {str(e)}'
})
print(f"[{datetime.now().strftime('%H:%M:%S')}] Erreur lors du traitement: {str(e)}")
current_processing_file = None
# Vérifier à nouveau si la boucle doit continuer
if not auto_subtitle_running:
print(f"[{datetime.now().strftime('%H:%M:%S')}] Arrêt de la boucle de génération automatique")
break
# Attendre avant de traiter le prochain fichier
time.sleep(2)
except Exception as e:
print(f"[{datetime.now().strftime('%H:%M:%S')}] Erreur dans la boucle de génération automatique: {e}")
time.sleep(5)
print(f"[{datetime.now().strftime('%H:%M:%S')}] Fin de la boucle de génération automatique de sous-titres")
@app.route('/api/messages/auto/start', methods=['POST'])
def start_auto_message():
"""Démarrer l'envoi automatique de messages"""
global auto_message_running
try:
# Vérifier si déjà en cours
if auto_message_running:
return jsonify({
'success': False,
'error': 'L\'envoi automatique est déjà en cours'
}), 400
auto_message_running = True
# Créer une instance du bot de messages
current_message_bot = messageTwitch("config/user.json", "default")
# Démarrer le thread d'envoi automatique
threading.Thread(target=auto_message_loop, daemon=True).start()
print(f"[{datetime.now().strftime('%H:%M:%S')}] Envoi automatique de messages démarré")
return jsonify({
'success': True,
'message': 'Envoi automatique de messages démarré'
})
except Exception as e:
auto_message_running = False
current_message_bot = None
return jsonify({
'success': False,
'error': f'Erreur lors du démarrage: {str(e)}'
}), 500
@app.route('/api/messages/auto/stop', methods=['POST'])
def stop_auto_message():
"""Arrêter l'envoi automatique de messages"""
global auto_message_running, current_message_bot
try:
if not auto_message_running:
return jsonify({
'success': False,
'error': 'L\'envoi automatique n\'est pas en cours'
}), 400
auto_message_running = False
current_message_bot = None
print(f"[{datetime.now().strftime('%H:%M:%S')}] Envoi automatique de messages arrêté")
return jsonify({
'success': True,
'message': 'Envoi automatique de messages arrêté'
})
except Exception as e:
return jsonify({
'success': False,
'error': f'Erreur lors de l\'arrêt: {str(e)}'
}), 500
@app.route('/api/messages/auto/force-stop', methods=['POST'])
def force_stop_auto_message():
"""Forcer l'arrêt de l'envoi automatique de messages"""
global auto_message_running, current_message_bot
try:
auto_message_running = False
current_message_bot = None
print(f"[{datetime.now().strftime('%H:%M:%S')}] Arrêt forcé de l'envoi automatique de messages")
return jsonify({
'success': True,
'message': 'Arrêt forcé de l\'envoi automatique de messages'
})
except Exception as e:
return jsonify({
'success': False,
'error': f'Erreur lors de l\'arrêt forcé: {str(e)}'
}), 500
@app.route('/api/messages/auto/status', methods=['GET'])
def get_auto_message_status():
"""Obtenir le statut de l'envoi automatique de messages"""
global auto_message_running
return jsonify({
'running': auto_message_running
})
def auto_message_loop():
"""Boucle d'envoi automatique de messages"""
global auto_message_running, current_message_bot
print(f"[{datetime.now().strftime('%H:%M:%S')}] Démarrage de la boucle d'envoi automatique")
while auto_message_running:
try:
# Vérifier s'il y a des générations disponibles
generation_data = storage.read("IA_generator")
if not generation_data:
time.sleep(5)
continue
# Récupérer la dernière génération
sorted_keys = sorted(generation_data.keys())
if not sorted_keys:
time.sleep(5)
continue
last_generation = generation_data[sorted_keys[-1]]
print(f"[{datetime.now().strftime('%H:%M:%S')}] Envoi automatique: {last_generation}")
# Émettre l'événement de début d'envoi
socketio.emit('message_sending_start', {
'message': last_generation,
'timestamp': datetime.now().isoformat()
})
# Envoyer le message
try:
if current_message_bot:
# Utiliser le premier utilisateur par défaut
current_message_bot.send_message_user(_first_enabled_user_index(), last_generation)
# Émettre l'événement de fin d'envoi
socketio.emit('message_sending_complete', {
'message': last_generation,
'timestamp': datetime.now().isoformat()
})
# Supprimer la génération envoyée
storage.delete("IA_generator", sorted_keys[-1])
print(f"[{datetime.now().strftime('%H:%M:%S')}] Message envoyé avec succès")
except Exception as e:
# Émettre l'événement d'erreur
socketio.emit('message_sending_error', {
'message': last_generation,
'error': str(e)
})
print(f"[{datetime.now().strftime('%H:%M:%S')}] Erreur d'envoi: {e}")
# Attendre avant d'envoyer le prochain message
time.sleep(10)
except Exception as e:
print(f"[{datetime.now().strftime('%H:%M:%S')}] Erreur dans la boucle d'envoi automatique: {e}")
time.sleep(5)
print(f"[{datetime.now().strftime('%H:%M:%S')}] Arrêt de la boucle d'envoi automatique")
@app.route('/api/chat/messages/enable', methods=['POST'])
def enable_chat_messages():
"""Activer l'envoi de messages dans le chat"""
try:
chat_state.chat_messages_enabled = True
return jsonify({
'success': True,
'message': 'Envoi de messages dans le chat activé'
})
except Exception as e:
return jsonify({
'success': False,
'error': f'Erreur lors de l\'activation: {str(e)}'
}), 500
@app.route('/api/chat/messages/disable', methods=['POST'])
def disable_chat_messages():
"""Désactiver l'envoi de messages dans le chat"""
try:
chat_state.chat_messages_enabled = False
return jsonify({
'success': True,
'message': 'Envoi de messages dans le chat désactivé'
})
except Exception as e:
return jsonify({
'success': False,
'error': f'Erreur lors de la désactivation: {str(e)}'
}), 500
@app.route('/api/chat/messages/status', methods=['GET'])
def get_chat_messages_status():
"""Obtenir le statut de l'envoi de messages dans le chat"""
return jsonify({
'enabled': chat_state.chat_messages_enabled
})
@app.route('/api/system-status', methods=['GET'])
def get_system_status():
"""Obtenir le statut de tous les composants"""
return jsonify(bot_controller.get_system_status())
@app.route('/api/clips/create', methods=['POST'])
def create_clip():
"""
Créer un clip Twitch via Helix.
Nécessite:
- `twitch_client_id` dans config/config.json
- un user token avec scope `clips:edit` (config/user.json, champ tw_acc_token)
"""
try:
data = request.json or {}
channel_login = (data.get("channel_login") or "").strip()
user_id = int(data.get("user_id", _first_enabled_user_index()))
has_delay = bool(data.get("has_delay", False))
bearer = _user_bearer_token(user_id)
broadcaster_id = _twitch_get_user_id_by_login(channel_login, bearer)
clip = _twitch_create_clip(broadcaster_id=broadcaster_id, user_bearer=bearer, has_delay=has_delay)
return jsonify({
"success": True,
"clip": {
"broadcaster_login": channel_login.lstrip("@"),
"broadcaster_id": broadcaster_id,
**clip,
}
})
except ValueError as e:
return jsonify({"success": False, "error": str(e)}), 400
except RuntimeError as e:
return jsonify({"success": False, "error": str(e)}), 502
except Exception as e:
return jsonify({"success": False, "error": str(e)}), 500
# === CLIPS AUTOMATION (stub for future) ===
_clips_automation_running = False
_clips_automation_mode = "manual"
@app.route('/api/clips/automation/status', methods=['GET'])
def clip_automation_status():
return jsonify({
"success": True,
"running": _clips_automation_running,
"mode": _clips_automation_mode,
})
@app.route('/api/clips/automation/start', methods=['POST'])
def clip_automation_start():
# Intentionally not implemented yet: future hook point.
return jsonify({
"success": False,
"error": "Automatisation clips: pas encore implémentée (stub prêt)",
}), 501
@app.route('/api/clips/automation/stop', methods=['POST'])
def clip_automation_stop():
# Intentionally not implemented yet: future hook point.
return jsonify({
"success": False,
"error": "Automatisation clips: pas encore implémentée (stub prêt)",
}), 501
@app.route('/api/interaction/config', methods=['GET'])
def get_interaction_config():
"""
Récupérer la config Interaction Chat (réponses préenregistrées + règles).
Stockée en JSON sous storage/interaction_chat_config.json.
"""
try:
# Utiliser le processor du premier flux actif si possible, sinon un "fallback" via une instance ad-hoc
processor = None
for flux_id, bots in bot_controller.bots.items():
processor = bots.get('interaction')
if processor:
break
if not processor:
# fallback: config globale, même fichier storage
from twitch_bot.interaction_chat import InteractionChatStorage
st = InteractionChatStorage()
cfg = InteractionChatConfig(st.read_json("interaction_chat_config", default={}))
data = cfg.to_dict()
else:
data = processor.load_config().to_dict()
# Comptes enregistrés (config/user.json)
try:
with open('config/user.json', 'r', encoding='utf-8') as f:
users = json.load(f)
registered = [u.get('tw_acc_pseudo') for u in (users or []) if isinstance(u, dict) and u.get('tw_acc_pseudo')]
except Exception:
registered = []
return jsonify({'success': True, 'config': data, 'registered_accounts': registered})
except Exception as e:
return jsonify({'success': False, 'error': str(e)}), 500
@app.route('/api/interaction/config', methods=['POST'])
def save_interaction_config():
"""Sauvegarder la config Interaction Chat."""
try:
payload = request.json or {}
config_dict = payload.get('config') or {}
# Sauvegarder via un processor si présent, sinon via stockage direct
processor = None
for flux_id, bots in bot_controller.bots.items():
processor = bots.get('interaction')
if processor:
break
if processor:
cfg = processor.save_config(config_dict)
else:
from twitch_bot.interaction_chat import InteractionChatStorage
st = InteractionChatStorage()
cfg = InteractionChatConfig(config_dict)
st.write_json("interaction_chat_config", cfg.to_dict())
return jsonify({'success': True, 'config': cfg.to_dict()})
except Exception as e:
return jsonify({'success': False, 'error': str(e)}), 500
@app.route('/api/interaction/log', methods=['GET'])
def get_interaction_log():
"""Lire les logs Interaction Chat."""
try:
limit = int(request.args.get('limit', '100'))
limit = max(1, min(500, limit))
processor = None
for flux_id, bots in bot_controller.bots.items():
processor = bots.get('interaction')
if processor:
break
if processor:
logs = processor.read_log(limit=limit)
else:
from twitch_bot.interaction_chat import InteractionChatStorage
st = InteractionChatStorage()
logs = st.read_json("interaction_chat_log", default=[])
if not isinstance(logs, list):
logs = []
logs = logs[-limit:]
return jsonify({'success': True, 'logs': logs})
except Exception as e:
return jsonify({'success': False, 'error': str(e)}), 500
@app.route('/api/ia-generator/start', methods=['POST'])
def start_ia_generator():
"""Démarrer le générateur IA"""
success, message = bot_controller.start_ia_generator()
if success:
# Synchroniser l'état de l'interface web
socketio.emit('ia_generator_status_changed', {
'running': True,
'message': message
})
return jsonify({'success': success, 'message': message})
@app.route('/api/ia-generator/stop', methods=['POST'])
def stop_ia_generator():
"""Arrêter le générateur IA"""
success, message = bot_controller.stop_ia_generator()
if success:
# Synchroniser l'état de l'interface web
socketio.emit('ia_generator_status_changed', {
'running': False,
'message': message
})
return jsonify({'success': success, 'message': message})
@app.route('/api/control-twitch/start', methods=['POST'])
def start_control_twitch():
"""Démarrer le contrôleur Twitch"""
success, message = bot_controller.start_control_twitch()
return jsonify({'success': success, 'message': message})
@app.route('/api/control-twitch/stop', methods=['POST'])
def stop_control_twitch():
"""Arrêter le contrôleur Twitch"""
success, message = bot_controller.stop_control_twitch()
return jsonify({'success': success, 'message': message})
if __name__ == '__main__':
# Démarrer les mises à jour en arrière-plan
update_thread = threading.Thread(target=background_updates, daemon=True)
update_thread.start()
# Démarrer l'application Flask
socketio.run(app, host='0.0.0.0', port=5000, debug=True)