Files
twitchBot-intelligent/web_interface.py
T
gpatruno 68cf59ae75 update
2025-09-06 17:26:38 +02:00

1291 lines
50 KiB
Python

from flask import Flask, render_template, request, jsonify, redirect, url_for
from flask_socketio import SocketIO, emit
import json
import os
import threading
import time
from datetime import datetime
import sys
# Ajouter le chemin de l'environnement virtuel au PYTHONPATH
venv_path = os.path.join(os.path.dirname(__file__), 'env', 'lib', 'python3.10', 'site-packages')
if venv_path not in sys.path:
sys.path.insert(0, venv_path)
# Import des classes du bot
sys.path.append('.')
from fonction.first_class import RecordTwitch, Subtitle_translation, IA_generator, messageTwitch, TwitchChatBot, storage
app = Flask(__name__)
app.config['SECRET_KEY'] = 'your-secret-key-here'
socketio = SocketIO(app, cors_allowed_origins="*")
class BotController:
def __init__(self):
self.bots = {} # Stockage des instances de bots (pour l'utilisation interne)
self.flux_list = [] # Liste des flux surveillés (pour l'API JSON)
self.config = self.load_config()
self.ia_generator = None
self.control_twitch = None
self.ia_generator_running = False
self.control_twitch_running = False
def load_config(self):
try:
with open('config/config.json', 'r') as file:
return json.load(file)
except FileNotFoundError:
return {}
def save_config(self):
with open('config/config.json', 'w') as file:
json.dump(self.config, file, indent=4, ensure_ascii=False)
def get_system_status(self):
"""Obtenir le statut de tous les composants"""
return {
'ia_generator': {
'running': self.ia_generator_running,
'status': 'En cours' if self.ia_generator_running else 'Arrêté'
},
'control_twitch': {
'running': self.control_twitch_running,
'status': 'En cours' if self.control_twitch_running else 'Arrêté'
},
'flux_count': len(self.flux_list),
'active_flux': len([f for f in self.flux_list if f['active']])
}
def add_flux(self, channel_name, record_audio=True):
flux_id = len(self.flux_list) + 1
# Créer l'objet flux pour l'API (sans les instances de bots)
flux_data = {
'id': flux_id,
'name': channel_name,
'twitchname': channel_name,
'record_audio': record_audio,
'active': True,
'created_at': datetime.now().isoformat(),
'status': 'starting'
}
try:
# Créer le bot de chat pour ce flux
chat_bot = TwitchChatBot(channel_name)
self.bots[flux_id] = {
'chat_bot': chat_bot,
'record_bot': None,
'subtitle_bot': None,
'ia_bot': None,
'message_bot': None
}
chat_bot.start_background()
# Si enregistrement audio activé
if record_audio:
record_bot = RecordTwitch(channel_name, 60)
self.bots[flux_id]['record_bot'] = record_bot
threading.Thread(target=record_bot.main, daemon=True).start()
# Démarrer le bot de sous-titres
subtitle_bot = Subtitle_translation("config/config.json")
self.bots[flux_id]['subtitle_bot'] = subtitle_bot
subtitle_bot.start_main_loop()
# Démarrer le générateur IA
ia_bot = IA_generator("config/config.json")
self.bots[flux_id]['ia_bot'] = ia_bot
ia_bot.start_main_loop()
# Démarrer le contrôleur de messages
message_bot = messageTwitch("config/user.json", channel_name)
self.bots[flux_id]['message_bot'] = message_bot
message_bot.start_loop_respond()
# Mettre à jour le statut
flux_data['status'] = 'active'
self.flux_list.append(flux_data)
return flux_id
except Exception as e:
print(f"Erreur lors de l'ajout du flux {channel_name}: {str(e)}")
# Nettoyer en cas d'erreur
if flux_id in self.bots:
try:
if self.bots[flux_id]['chat_bot']:
self.bots[flux_id]['chat_bot'].stop()
if self.bots[flux_id]['record_bot']:
self.bots[flux_id]['record_bot'].stop()
if self.bots[flux_id]['subtitle_bot']:
self.bots[flux_id]['subtitle_bot'].stop()
if self.bots[flux_id]['ia_bot']:
self.bots[flux_id]['ia_bot'].stop()
if self.bots[flux_id]['message_bot']:
self.bots[flux_id]['message_bot'].stop()
except:
pass
del self.bots[flux_id]
flux_data['status'] = 'error'
flux_data['error'] = str(e)
self.flux_list.append(flux_data)
raise e
def remove_flux(self, flux_id):
for i, flux in enumerate(self.flux_list):
if flux['id'] == flux_id:
# Arrêter les bots si ils existent
if flux_id in self.bots:
try:
if self.bots[flux_id]['chat_bot']:
self.bots[flux_id]['chat_bot'].stop()
if self.bots[flux_id]['record_bot']:
self.bots[flux_id]['record_bot'].stop()
if self.bots[flux_id]['subtitle_bot']:
self.bots[flux_id]['subtitle_bot'].stop()
if self.bots[flux_id]['ia_bot']:
self.bots[flux_id]['ia_bot'].stop()
if self.bots[flux_id]['message_bot']:
self.bots[flux_id]['message_bot'].stop()
except Exception as e:
print(f"Erreur lors de l'arrêt des bots: {e}")
del self.bots[flux_id]
del self.flux_list[i]
return True
return False
def get_flux_list(self):
# Retourner seulement les données JSON (pas les instances de bots)
return self.flux_list
def start_ia_generator(self):
"""Démarrer le générateur IA de manière contrôlée"""
if self.ia_generator_running:
return False, "IA Generator déjà en cours d'exécution"
try:
self.ia_generator = IA_generator("config/config.json")
self.ia_generator_running = True
# Activer l'envoi de messages quand l'IA Generator est démarré
global chat_messages_enabled
chat_messages_enabled = True
# Démarrer dans un thread séparé
threading.Thread(target=self._ia_generator_loop, daemon=True).start()
print(f"[{datetime.now().strftime('%H:%M:%S')}] IA Generator démarré")
return True, "IA Generator démarré avec succès"
except Exception as e:
self.ia_generator_running = False
return False, f"Erreur lors du démarrage de l'IA Generator: {str(e)}"
def stop_ia_generator(self):
"""Arrêter le générateur IA"""
if not self.ia_generator_running:
return False, "IA Generator n'est pas en cours d'exécution"
try:
self.ia_generator_running = False
if self.ia_generator:
self.ia_generator.stop()
# Désactiver l'envoi de messages quand l'IA Generator est arrêté
global chat_messages_enabled
chat_messages_enabled = False
print(f"[{datetime.now().strftime('%H:%M:%S')}] IA Generator arrêté")
return True, "IA Generator arrêté avec succès"
except Exception as e:
return False, f"Erreur lors de l'arrêt de l'IA Generator: {str(e)}"
def _ia_generator_loop(self):
"""Boucle contrôlée pour l'IA Generator"""
while self.ia_generator_running:
try:
if self.ia_generator:
self.ia_generator.main_ask("") # Génération automatique
time.sleep(20) # Attendre 20 secondes entre les générations
except Exception as e:
print(f"[{datetime.now().strftime('%H:%M:%S')}] Erreur dans IA Generator: {e}")
time.sleep(10)
def start_control_twitch(self):
"""Démarrer le contrôleur Twitch de manière contrôlée"""
if self.control_twitch_running:
return False, "Control Twitch déjà en cours d'exécution"
try:
# Utiliser le premier utilisateur par défaut
self.control_twitch = messageTwitch("config/user.json", "default")
self.control_twitch_running = True
# Démarrer dans un thread séparé
threading.Thread(target=self._control_twitch_loop, daemon=True).start()
print(f"[{datetime.now().strftime('%H:%M:%S')}] Control Twitch démarré")
return True, "Control Twitch démarré avec succès"
except Exception as e:
self.control_twitch_running = False
return False, f"Erreur lors du démarrage de Control Twitch: {str(e)}"
def stop_control_twitch(self):
"""Arrêter le contrôleur Twitch"""
if not self.control_twitch_running:
return False, "Control Twitch n'est pas en cours d'exécution"
try:
self.control_twitch_running = False
if self.control_twitch:
self.control_twitch.stop()
print(f"[{datetime.now().strftime('%H:%M:%S')}] Control Twitch arrêté")
return True, "Control Twitch arrêté avec succès"
except Exception as e:
return False, f"Erreur lors de l'arrêt de Control Twitch: {str(e)}"
def _control_twitch_loop(self):
"""Boucle contrôlée pour Control Twitch"""
while self.control_twitch_running:
try:
if self.control_twitch:
# Vérifier s'il y a des générations à envoyer
generation_data = storage.read("IA_generator")
if generation_data:
sorted_keys = sorted(generation_data.keys())
if sorted_keys:
last_generation = generation_data[sorted_keys[-1]]
# Envoyer le message avec le premier utilisateur
self.control_twitch.send_message_user(0, last_generation)
# Supprimer la génération envoyée
storage.delete("IA_generator", sorted_keys[-1])
print(f"[{datetime.now().strftime('%H:%M:%S')}] Message envoyé: {last_generation[:50]}...")
time.sleep(10) # Attendre 10 secondes entre les vérifications
except Exception as e:
print(f"[{datetime.now().strftime('%H:%M:%S')}] Erreur dans Control Twitch: {e}")
time.sleep(10)
bot_controller = BotController()
@app.route('/')
def index():
return render_template('index.html')
@app.route('/api/flux', methods=['GET'])
def get_flux():
return jsonify(bot_controller.get_flux_list())
@app.route('/api/flux', methods=['POST'])
def add_flux():
data = request.json
channel_name = data.get('channel_name')
record_audio = data.get('record_audio', True)
if not channel_name:
return jsonify({'error': 'Nom du canal requis'}), 400
try:
flux_id = bot_controller.add_flux(channel_name, record_audio)
return jsonify({'success': True, 'flux_id': flux_id})
except Exception as e:
error_msg = f"Erreur lors de l'ajout du flux: {str(e)}"
print(f"Erreur API add_flux: {error_msg}")
return jsonify({'error': error_msg}), 500
@app.route('/api/flux/<int:flux_id>', methods=['DELETE'])
def remove_flux(flux_id):
if bot_controller.remove_flux(flux_id):
return jsonify({'success': True})
return jsonify({'error': 'Flux non trouvé'}), 404
@app.route('/api/flux/<int:flux_id>/status', methods=['GET'])
def get_flux_status(flux_id):
"""Obtenir le statut détaillé d'un flux spécifique"""
for flux in bot_controller.flux_list:
if flux['id'] == flux_id:
status = {
'id': flux_id,
'name': flux['name'],
'active': flux['active'],
'status': flux.get('status', 'unknown'),
'created_at': flux['created_at'],
'bots': {}
}
# Ajouter les informations des bots si disponibles
if flux_id in bot_controller.bots:
bots = bot_controller.bots[flux_id]
if bots['chat_bot']:
status['bots']['chat'] = {
'running': bots['chat_bot'].is_running if hasattr(bots['chat_bot'], 'is_running') else True
}
if bots['record_bot']:
status['bots']['record'] = {
'running': bots['record_bot'].running if hasattr(bots['record_bot'], 'running') else True
}
return jsonify(status)
return jsonify({'error': 'Flux non trouvé'}), 404
@app.route('/api/flux/<int:flux_id>/toggle', methods=['POST'])
def toggle_flux(flux_id):
"""Activer/désactiver un flux"""
for flux in bot_controller.flux_list:
if flux['id'] == flux_id:
flux['active'] = not flux['active']
# Arrêter/démarrer les bots selon le nouveau statut
if flux_id in bot_controller.bots:
bots = bot_controller.bots[flux_id]
if not flux['active']:
# Arrêter les bots
try:
if bots['chat_bot']:
bots['chat_bot'].stop()
if bots['record_bot']:
bots['record_bot'].stop()
if bots['subtitle_bot']:
bots['subtitle_bot'].stop()
if bots['ia_bot']:
bots['ia_bot'].stop()
if bots['message_bot']:
bots['message_bot'].stop()
except Exception as e:
print(f"Erreur lors de l'arrêt des bots: {e}")
else:
# Redémarrer les bots
try:
if bots['chat_bot']:
bots['chat_bot'].start_background()
if bots['record_bot']:
threading.Thread(target=bots['record_bot'].main, daemon=True).start()
if bots['subtitle_bot']:
bots['subtitle_bot'].start_main_loop()
if bots['ia_bot']:
bots['ia_bot'].start_main_loop()
if bots['message_bot']:
bots['message_bot'].start_loop_respond()
except Exception as e:
print(f"Erreur lors du redémarrage des bots: {e}")
return jsonify({'success': True, 'active': flux['active']})
return jsonify({'error': 'Flux non trouvé'}), 404
@app.route('/api/config/prompts', methods=['GET'])
def get_prompts():
return jsonify(bot_controller.config.get('list_prompt', []))
@app.route('/api/config/prompts', methods=['POST'])
def update_prompts():
data = request.json
prompts = data.get('prompts', [])
bot_controller.config['list_prompt'] = prompts
bot_controller.save_config()
return jsonify({'success': True})
@app.route('/api/config/users', methods=['GET'])
def get_users():
"""Récupérer la liste des utilisateurs"""
try:
with open('config/user.json', 'r') as file:
users = json.load(file)
return jsonify(users)
except FileNotFoundError:
return jsonify([])
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route('/api/config/users', methods=['POST'])
def add_user():
"""Ajouter un nouvel utilisateur"""
data = request.json
pseudo = data.get('tw_acc_pseudo')
token = data.get('tw_acc_token')
charactere = data.get('charactere', '😊')
if not pseudo or not token:
return jsonify({'error': 'Pseudo et token requis'}), 400
try:
with open('config/user.json', 'r') as file:
users = json.load(file)
# Vérifier si l'utilisateur existe déjà
for user in users:
if user['tw_acc_pseudo'] == pseudo:
return jsonify({'error': 'Cet utilisateur existe déjà'}), 400
# Ajouter le nouvel utilisateur
new_user = {
'tw_acc_pseudo': pseudo,
'tw_acc_token': token,
'charactere': charactere
}
users.append(new_user)
# Sauvegarder
with open('config/user.json', 'w') as file:
json.dump(users, file, indent=4, ensure_ascii=False)
return jsonify({'success': True, 'user': new_user})
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route('/api/config/users/<int:user_id>', methods=['PUT'])
def update_user(user_id):
"""Modifier un utilisateur existant"""
data = request.json
pseudo = data.get('tw_acc_pseudo')
token = data.get('tw_acc_token')
charactere = data.get('charactere', '😊')
if not pseudo or not token:
return jsonify({'error': 'Pseudo et token requis'}), 400
try:
with open('config/user.json', 'r') as file:
users = json.load(file)
if user_id >= len(users):
return jsonify({'error': 'Utilisateur non trouvé'}), 404
# Vérifier si le pseudo existe déjà (sauf pour l'utilisateur actuel)
for i, user in enumerate(users):
if i != user_id and user['tw_acc_pseudo'] == pseudo:
return jsonify({'error': 'Ce pseudo est déjà utilisé'}), 400
# Mettre à jour l'utilisateur
users[user_id] = {
'tw_acc_pseudo': pseudo,
'tw_acc_token': token,
'charactere': charactere
}
# Sauvegarder
with open('config/user.json', 'w') as file:
json.dump(users, file, indent=4, ensure_ascii=False)
return jsonify({'success': True, 'user': users[user_id]})
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route('/api/config/users/<int:user_id>', methods=['DELETE'])
def delete_user(user_id):
"""Supprimer un utilisateur"""
try:
with open('config/user.json', 'r') as file:
users = json.load(file)
if user_id >= len(users):
return jsonify({'error': 'Utilisateur non trouvé'}), 404
# Supprimer l'utilisateur
deleted_user = users.pop(user_id)
# Sauvegarder
with open('config/user.json', 'w') as file:
json.dump(users, file, indent=4, ensure_ascii=False)
return jsonify({'success': True, 'deleted_user': deleted_user})
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route('/api/subtitles', methods=['GET'])
def get_subtitles():
data = storage.read("subtitle_data")
return jsonify(data)
@app.route('/api/subtitles/clear', methods=['POST'])
def clear_subtitles():
"""Nettoyer l'historique des sous-titres"""
try:
# Récupérer toutes les clés de sous-titres
subtitle_data = storage.read("subtitle_data")
# Supprimer chaque clé une par une
for key in list(subtitle_data.keys()):
storage.delete("subtitle_data", key)
# Supprimer également le fichier JSON s'il existe
storage_dir = "storage"
subtitle_file = os.path.join(storage_dir, "subtitle_data.json")
if os.path.exists(subtitle_file):
os.remove(subtitle_file)
return jsonify({
'success': True,
'message': 'Historique des sous-titres nettoyé avec succès'
})
except Exception as e:
return jsonify({
'success': False,
'error': f'Erreur lors du nettoyage: {str(e)}'
}), 500
@app.route('/api/subtitles/process', methods=['POST'])
def process_subtitles():
"""Lancer manuellement le traitement des sous-titres"""
try:
import subprocess
import os
import json
from datetime import datetime
# Vérifier que le dossier record existe
record_dir = "record"
if not os.path.exists(record_dir):
return jsonify({
'success': False,
'error': 'Dossier record non trouvé'
}), 404
# Trouver les fichiers audio
audio_files = [f for f in os.listdir(record_dir) if f.endswith('.mp3')]
if not audio_files:
return jsonify({
'success': False,
'error': 'Aucun fichier audio trouvé'
}), 404
subtitles_created = 0
subtitles_data = {}
# Traiter chaque fichier audio
for audio_file in audio_files[:3]: # Limiter à 3 fichiers pour éviter de surcharger
audio_path = os.path.join(record_dir, audio_file)
try:
# Lancer Whisper
command = [
'whisper',
'--language', 'fr',
audio_path,
'--device', 'cuda',
'--model', 'large-v3'
]
result = subprocess.run(command, capture_output=True, text=True, timeout=60)
if result.returncode == 0:
# Lire le fichier .txt généré
txt_file = audio_file.replace('.mp3', '.txt')
if os.path.exists(txt_file):
with open(txt_file, 'r', encoding='utf-8') as f:
content = f.read().strip()
# Nettoyer le contenu
content = content.replace("'", "").replace('"', "").replace("\n", " ").replace(",", "")
# Supprimer les répétitions de mots
words = content.split()
seen = set()
result_words = []
for word in words:
if word not in seen:
result_words.append(word)
seen.add(word)
cleaned_content = " ".join(result_words)
# Sauvegarder dans le stockage
current_time = datetime.now().strftime('%H:%M:%S')
subtitles_data[current_time] = cleaned_content
# Sauvegarder dans le fichier de stockage
storage_dir = "storage"
if not os.path.exists(storage_dir):
os.makedirs(storage_dir)
subtitle_file = os.path.join(storage_dir, "subtitle_data.json")
try:
with open(subtitle_file, 'r', encoding='utf-8') as f:
existing_data = json.load(f)
except (FileNotFoundError, json.JSONDecodeError):
existing_data = {}
existing_data[current_time] = cleaned_content
with open(subtitle_file, 'w', encoding='utf-8') as f:
json.dump(existing_data, f, indent=4, ensure_ascii=False)
subtitles_created += 1
# Nettoyer les fichiers temporaires
os.remove(txt_file)
os.remove(audio_path) # Supprimer le fichier audio traité
except subprocess.TimeoutExpired:
continue # Passer au fichier suivant
except Exception as e:
continue # Passer au fichier suivant
return jsonify({
'success': True,
'message': f'Traitement terminé. {subtitles_created} sous-titre(s) créé(s).',
'subtitles': subtitles_data
})
except Exception as e:
return jsonify({
'success': False,
'error': f'Erreur lors du traitement: {str(e)}'
}), 500
@app.route('/api/generations', methods=['GET'])
def get_generations():
data = storage.read("IA_generator")
return jsonify(data)
@app.route('/api/send-message', methods=['POST'])
def send_message():
data = request.json
message = data.get('message')
channel = data.get('channel', 'default')
user_id = data.get('user_id', 0) # Nouveau paramètre pour choisir l'utilisateur
if not message:
return jsonify({'error': 'Message requis'}), 400
# Vérifier si l'envoi de messages est activé
if not chat_messages_enabled:
return jsonify({'error': 'Envoi de messages désactivé'}), 403
# Trouver le bot de message pour ce canal
try:
msg_bot = messageTwitch("config/user.json", channel)
# Utiliser send_message_user au lieu de send_message pour spécifier l'utilisateur
msg_bot.send_message_user(user_id, message)
return jsonify({'success': True})
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route('/api/generate-response', methods=['POST'])
def generate_response():
data = request.json
text = data.get('text', '')
try:
ia_gen = IA_generator("config/config.json")
ia_gen.main_ask(text)
return jsonify({'success': True})
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route('/api/chat/<int:flux_id>/messages', methods=['GET'])
def get_chat_messages(flux_id):
"""Récupérer les messages de chat d'un flux spécifique"""
try:
if flux_id not in bot_controller.bots:
return jsonify({'error': 'Flux non trouvé'}), 404
chat_bot = bot_controller.bots[flux_id]['chat_bot']
if not chat_bot:
return jsonify({'error': 'Bot de chat non disponible'}), 404
# Récupérer les messages du bot de chat
messages = []
for msg in chat_bot.messages[-50:]: # Derniers 50 messages
messages.append({
'timestamp': msg.timestamp.isoformat(),
'username': msg.username,
'content': msg.content
})
return jsonify({
'success': True,
'flux_id': flux_id,
'messages': messages
})
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route('/api/chat/<int:flux_id>/send', methods=['POST'])
def send_chat_message(flux_id):
"""Envoyer un message dans le chat d'un flux spécifique"""
data = request.json
message = data.get('message', '')
user_id = data.get('user_id', 0) # Nouveau paramètre pour choisir l'utilisateur
if not message:
return jsonify({'error': 'Message requis'}), 400
# Vérifier si l'envoi de messages est activé
if not chat_messages_enabled:
return jsonify({'error': 'Envoi de messages désactivé'}), 403
try:
# Trouver le flux
flux = None
for f in bot_controller.flux_list:
if f['id'] == flux_id:
flux = f
break
if not flux:
return jsonify({'error': 'Flux non trouvé'}), 404
# Envoyer le message avec l'utilisateur spécifié
msg_bot = messageTwitch("config/user.json", flux['twitchname'])
msg_bot.send_message_user(user_id, message)
return jsonify({'success': True})
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route('/api/status', methods=['GET'])
def get_status():
status = {
'flux_count': len(bot_controller.flux_list),
'active_recordings': sum(1 for f in bot_controller.flux_list if f['record_audio'] and f['active']),
'chat_connections': sum(1 for f in bot_controller.flux_list if f['active']),
'last_subtitle': '',
'next_message': '',
'recent_messages': []
}
# Récupérer le dernier sous-titre
subtitle_data = storage.read("subtitle_data")
if subtitle_data:
sorted_keys = sorted(subtitle_data.keys())
if sorted_keys:
status['last_subtitle'] = subtitle_data[sorted_keys[-1]]
# Récupérer la dernière génération
generation_data = storage.read("IA_generator")
if generation_data:
sorted_keys = sorted(generation_data.keys())
if sorted_keys:
status['next_message'] = generation_data[sorted_keys[-1]]
return jsonify(status)
@socketio.on('connect')
def handle_connect():
print('Client connecté')
emit('status', {'message': 'Connecté au serveur'})
@socketio.on('disconnect')
def handle_disconnect():
print('Client déconnecté')
# Thread pour envoyer les mises à jour en temps réel
def background_updates():
while True:
try:
status = {
'timestamp': datetime.now().isoformat(),
'flux_count': len(bot_controller.flux_list),
'active_recordings': sum(1 for f in bot_controller.flux_list if f['record_audio'] and f['active']),
}
socketio.emit('status_update', status)
time.sleep(5) # Mise à jour toutes les 5 secondes
except Exception as e:
print(f"Erreur dans background_updates: {e}")
time.sleep(10)
# Variables globales pour la génération automatique
auto_subtitle_running = False
current_processing_file = None
# Variables globales pour l'envoi automatique de messages
auto_message_running = False
current_message_bot = None
# Variable globale pour contrôler l'envoi de messages dans le chat
chat_messages_enabled = False
@app.route('/api/subtitles/auto/start', methods=['POST'])
def start_auto_subtitle():
"""Démarrer la génération automatique de sous-titres"""
global auto_subtitle_running
try:
# Vérifier si déjà en cours
if auto_subtitle_running:
return jsonify({
'success': False,
'error': 'La génération automatique est déjà en cours'
}), 400
auto_subtitle_running = True
print(f"[{datetime.now().strftime('%H:%M:%S')}] Démarrage de la génération automatique de sous-titres")
# Démarrer le thread de génération automatique
threading.Thread(target=auto_subtitle_loop, daemon=True).start()
return jsonify({
'success': True,
'message': 'Génération automatique démarrée'
})
except Exception as e:
auto_subtitle_running = False
return jsonify({
'success': False,
'error': f'Erreur lors du démarrage: {str(e)}'
}), 500
@app.route('/api/subtitles/auto/stop', methods=['POST'])
def stop_auto_subtitle():
"""Arrêter la génération automatique de sous-titres"""
global auto_subtitle_running
try:
if not auto_subtitle_running:
return jsonify({
'success': False,
'error': 'La génération automatique n\'est pas en cours'
}), 400
print(f"[{datetime.now().strftime('%H:%M:%S')}] Arrêt de la génération automatique de sous-titres")
auto_subtitle_running = False
return jsonify({
'success': True,
'message': 'Génération automatique arrêtée'
})
except Exception as e:
return jsonify({
'success': False,
'error': f'Erreur lors de l\'arrêt: {str(e)}'
}), 500
@app.route('/api/subtitles/auto/force-stop', methods=['POST'])
def force_stop_auto_subtitle():
"""Forcer l'arrêt de la génération automatique de sous-titres"""
global auto_subtitle_running, current_processing_file
try:
print(f"[{datetime.now().strftime('%H:%M:%S')}] Arrêt forcé de la génération automatique de sous-titres")
auto_subtitle_running = False
current_processing_file = None
return jsonify({
'success': True,
'message': 'Arrêt forcé de la génération automatique de sous-titres'
})
except Exception as e:
return jsonify({
'success': False,
'error': f'Erreur lors de l\'arrêt forcé: {str(e)}'
}), 500
@app.route('/api/subtitles/auto/status', methods=['GET'])
def get_auto_subtitle_status():
"""Obtenir le statut de la génération automatique"""
global auto_subtitle_running, current_processing_file
return jsonify({
'running': auto_subtitle_running,
'current_file': current_processing_file
})
def auto_subtitle_loop():
"""Boucle de génération automatique de sous-titres"""
global auto_subtitle_running, current_processing_file
print(f"[{datetime.now().strftime('%H:%M:%S')}] Démarrage de la boucle de génération automatique de sous-titres")
while auto_subtitle_running:
try:
# Vérifier s'il y a des fichiers audio à traiter
record_dir = "record"
if not os.path.exists(record_dir):
time.sleep(5)
continue
audio_files = [f for f in os.listdir(record_dir) if f.endswith('.mp3')]
if not audio_files:
time.sleep(5)
continue
# Traiter le premier fichier
audio_file = audio_files[0]
current_processing_file = audio_file
print(f"[{datetime.now().strftime('%H:%M:%S')}] Traitement de: {audio_file}")
# Émettre l'événement de début de traitement
socketio.emit('subtitle_processing_start', {
'file': audio_file,
'timestamp': datetime.now().isoformat()
})
# Traiter le fichier
audio_path = os.path.join(record_dir, audio_file)
try:
# Lancer Whisper avec vérification périodique de l'arrêt
command = [
'whisper',
'--language', 'fr',
audio_path,
'--device', 'cuda',
'--model', 'large-v3'
]
# Utiliser Popen pour pouvoir interrompre le processus
process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
# Attendre la fin du processus avec vérification périodique
while process.poll() is None:
if not auto_subtitle_running:
print(f"[{datetime.now().strftime('%H:%M:%S')}] Arrêt demandé, interruption du processus Whisper")
process.terminate()
try:
process.wait(timeout=5)
except subprocess.TimeoutExpired:
process.kill()
break
time.sleep(1)
# Si la boucle a été arrêtée, sortir
if not auto_subtitle_running:
print(f"[{datetime.now().strftime('%H:%M:%S')}] Arrêt de la boucle de génération automatique")
break
result = process.returncode
stdout, stderr = process.communicate()
if result == 0:
# Lire le fichier .txt généré
txt_file = audio_file.replace('.mp3', '.txt')
if os.path.exists(txt_file):
with open(txt_file, 'r', encoding='utf-8') as f:
content = f.read().strip()
# Nettoyer le contenu
content = content.replace("'", "").replace('"', "").replace("\n", " ").replace(",", "")
# Supprimer les répétitions de mots
words = content.split()
seen = set()
result_words = []
for word in words:
if word not in seen:
result_words.append(word)
seen.add(word)
cleaned_content = " ".join(result_words)
# Sauvegarder dans le stockage
current_time = datetime.now().strftime('%H:%M:%S')
# Sauvegarder dans le fichier de stockage
storage_dir = "storage"
if not os.path.exists(storage_dir):
os.makedirs(storage_dir)
subtitle_file = os.path.join(storage_dir, "subtitle_data.json")
try:
with open(subtitle_file, 'r', encoding='utf-8') as f:
existing_data = json.load(f)
except (FileNotFoundError, json.JSONDecodeError):
existing_data = {}
existing_data[current_time] = cleaned_content
with open(subtitle_file, 'w', encoding='utf-8') as f:
json.dump(existing_data, f, indent=4, ensure_ascii=False)
# Émettre l'événement de fin de traitement
socketio.emit('subtitle_processing_complete', {
'file': audio_file,
'subtitle': cleaned_content,
'timestamp': current_time
})
print(f"[{datetime.now().strftime('%H:%M:%S')}] Sous-titre généré: {cleaned_content}")
# Nettoyer les fichiers temporaires
os.remove(txt_file)
os.remove(audio_path)
else:
# Émettre l'événement d'erreur
socketio.emit('subtitle_processing_error', {
'file': audio_file,
'error': 'Aucun fichier .txt généré'
})
print(f"[{datetime.now().strftime('%H:%M:%S')}] Erreur: Aucun fichier .txt généré")
else:
# Émettre l'événement d'erreur
socketio.emit('subtitle_processing_error', {
'file': audio_file,
'error': f'Erreur Whisper: {stderr}'
})
print(f"[{datetime.now().strftime('%H:%M:%S')}] Erreur Whisper: {stderr}")
except subprocess.TimeoutExpired:
socketio.emit('subtitle_processing_error', {
'file': audio_file,
'error': 'Timeout - Whisper a pris trop de temps'
})
print(f"[{datetime.now().strftime('%H:%M:%S')}] Timeout - Whisper a pris trop de temps")
except Exception as e:
socketio.emit('subtitle_processing_error', {
'file': audio_file,
'error': f'Erreur: {str(e)}'
})
print(f"[{datetime.now().strftime('%H:%M:%S')}] Erreur lors du traitement: {str(e)}")
current_processing_file = None
# Vérifier à nouveau si la boucle doit continuer
if not auto_subtitle_running:
print(f"[{datetime.now().strftime('%H:%M:%S')}] Arrêt de la boucle de génération automatique")
break
# Attendre avant de traiter le prochain fichier
time.sleep(2)
except Exception as e:
print(f"[{datetime.now().strftime('%H:%M:%S')}] Erreur dans la boucle de génération automatique: {e}")
time.sleep(5)
print(f"[{datetime.now().strftime('%H:%M:%S')}] Fin de la boucle de génération automatique de sous-titres")
@app.route('/api/messages/auto/start', methods=['POST'])
def start_auto_message():
"""Démarrer l'envoi automatique de messages"""
global auto_message_running
try:
# Vérifier si déjà en cours
if auto_message_running:
return jsonify({
'success': False,
'error': 'L\'envoi automatique est déjà en cours'
}), 400
auto_message_running = True
# Créer une instance du bot de messages
current_message_bot = messageTwitch("config/user.json", "default")
# Démarrer le thread d'envoi automatique
threading.Thread(target=auto_message_loop, daemon=True).start()
print(f"[{datetime.now().strftime('%H:%M:%S')}] Envoi automatique de messages démarré")
return jsonify({
'success': True,
'message': 'Envoi automatique de messages démarré'
})
except Exception as e:
auto_message_running = False
current_message_bot = None
return jsonify({
'success': False,
'error': f'Erreur lors du démarrage: {str(e)}'
}), 500
@app.route('/api/messages/auto/stop', methods=['POST'])
def stop_auto_message():
"""Arrêter l'envoi automatique de messages"""
global auto_message_running, current_message_bot
try:
if not auto_message_running:
return jsonify({
'success': False,
'error': 'L\'envoi automatique n\'est pas en cours'
}), 400
auto_message_running = False
current_message_bot = None
print(f"[{datetime.now().strftime('%H:%M:%S')}] Envoi automatique de messages arrêté")
return jsonify({
'success': True,
'message': 'Envoi automatique de messages arrêté'
})
except Exception as e:
return jsonify({
'success': False,
'error': f'Erreur lors de l\'arrêt: {str(e)}'
}), 500
@app.route('/api/messages/auto/force-stop', methods=['POST'])
def force_stop_auto_message():
"""Forcer l'arrêt de l'envoi automatique de messages"""
global auto_message_running, current_message_bot
try:
auto_message_running = False
current_message_bot = None
print(f"[{datetime.now().strftime('%H:%M:%S')}] Arrêt forcé de l'envoi automatique de messages")
return jsonify({
'success': True,
'message': 'Arrêt forcé de l\'envoi automatique de messages'
})
except Exception as e:
return jsonify({
'success': False,
'error': f'Erreur lors de l\'arrêt forcé: {str(e)}'
}), 500
@app.route('/api/messages/auto/status', methods=['GET'])
def get_auto_message_status():
"""Obtenir le statut de l'envoi automatique de messages"""
global auto_message_running
return jsonify({
'running': auto_message_running
})
def auto_message_loop():
"""Boucle d'envoi automatique de messages"""
global auto_message_running, current_message_bot
print(f"[{datetime.now().strftime('%H:%M:%S')}] Démarrage de la boucle d'envoi automatique")
while auto_message_running:
try:
# Vérifier si l'envoi de messages est activé
if not chat_messages_enabled:
print(f"[{datetime.now().strftime('%H:%M:%S')}] Envoi de messages désactivé, attente...")
time.sleep(5)
continue
# Vérifier s'il y a des générations disponibles
generation_data = storage.read("IA_generator")
if not generation_data:
time.sleep(5)
continue
# Récupérer la dernière génération
sorted_keys = sorted(generation_data.keys())
if not sorted_keys:
time.sleep(5)
continue
last_generation = generation_data[sorted_keys[-1]]
print(f"[{datetime.now().strftime('%H:%M:%S')}] Envoi automatique: {last_generation}")
# Émettre l'événement de début d'envoi
socketio.emit('message_sending_start', {
'message': last_generation,
'timestamp': datetime.now().isoformat()
})
# Envoyer le message
try:
if current_message_bot:
# Utiliser le premier utilisateur par défaut
current_message_bot.send_message_user(0, last_generation)
# Émettre l'événement de fin d'envoi
socketio.emit('message_sending_complete', {
'message': last_generation,
'timestamp': datetime.now().isoformat()
})
# Supprimer la génération envoyée
storage.delete("IA_generator", sorted_keys[-1])
print(f"[{datetime.now().strftime('%H:%M:%S')}] Message envoyé avec succès")
except Exception as e:
# Émettre l'événement d'erreur
socketio.emit('message_sending_error', {
'message': last_generation,
'error': str(e)
})
print(f"[{datetime.now().strftime('%H:%M:%S')}] Erreur d'envoi: {e}")
# Attendre avant d'envoyer le prochain message
time.sleep(10)
except Exception as e:
print(f"[{datetime.now().strftime('%H:%M:%S')}] Erreur dans la boucle d'envoi automatique: {e}")
time.sleep(5)
print(f"[{datetime.now().strftime('%H:%M:%S')}] Arrêt de la boucle d'envoi automatique")
@app.route('/api/chat/messages/enable', methods=['POST'])
def enable_chat_messages():
"""Activer l'envoi de messages dans le chat"""
global chat_messages_enabled
try:
chat_messages_enabled = True
return jsonify({
'success': True,
'message': 'Envoi de messages dans le chat activé'
})
except Exception as e:
return jsonify({
'success': False,
'error': f'Erreur lors de l\'activation: {str(e)}'
}), 500
@app.route('/api/chat/messages/disable', methods=['POST'])
def disable_chat_messages():
"""Désactiver l'envoi de messages dans le chat"""
global chat_messages_enabled
try:
chat_messages_enabled = False
return jsonify({
'success': True,
'message': 'Envoi de messages dans le chat désactivé'
})
except Exception as e:
return jsonify({
'success': False,
'error': f'Erreur lors de la désactivation: {str(e)}'
}), 500
@app.route('/api/chat/messages/status', methods=['GET'])
def get_chat_messages_status():
"""Obtenir le statut de l'envoi de messages dans le chat"""
global chat_messages_enabled
return jsonify({
'enabled': chat_messages_enabled
})
@app.route('/api/system-status', methods=['GET'])
def get_system_status():
"""Obtenir le statut de tous les composants"""
return jsonify(bot_controller.get_system_status())
@app.route('/api/ia-generator/start', methods=['POST'])
def start_ia_generator():
"""Démarrer le générateur IA"""
success, message = bot_controller.start_ia_generator()
if success:
# Synchroniser l'état de l'interface web
socketio.emit('ia_generator_status_changed', {
'running': True,
'message': message
})
return jsonify({'success': success, 'message': message})
@app.route('/api/ia-generator/stop', methods=['POST'])
def stop_ia_generator():
"""Arrêter le générateur IA"""
success, message = bot_controller.stop_ia_generator()
if success:
# Synchroniser l'état de l'interface web
socketio.emit('ia_generator_status_changed', {
'running': False,
'message': message
})
return jsonify({'success': success, 'message': message})
@app.route('/api/control-twitch/start', methods=['POST'])
def start_control_twitch():
"""Démarrer le contrôleur Twitch"""
success, message = bot_controller.start_control_twitch()
return jsonify({'success': success, 'message': message})
@app.route('/api/control-twitch/stop', methods=['POST'])
def stop_control_twitch():
"""Arrêter le contrôleur Twitch"""
success, message = bot_controller.stop_control_twitch()
return jsonify({'success': success, 'message': message})
if __name__ == '__main__':
# Démarrer les mises à jour en arrière-plan
update_thread = threading.Thread(target=background_updates, daemon=True)
update_thread.start()
# Démarrer l'application Flask
socketio.run(app, host='0.0.0.0', port=5000, debug=True)