Files
twitchBot-intelligent/fonction/first_class.py
T

1137 lines
41 KiB
Python

import os
import signal
import sys
import argparse
import json
import time
import pprint
import random
import requests
import shutil
# import datetime
from datetime import datetime
import glob
import threading
import websockets
import asyncio
from urllib.parse import urlparse
from pytmi import Client
# import keyboard
from pynput import keyboard
from threading import Thread, Semaphore
from streamlink import Streamlink
from fake_useragent import UserAgent
from rich.console import Console
from rich.live import Live
from rich.prompt import Prompt
from rich.spinner import Spinner
from rich.table import Table
from rich.text import Text
import subprocess
import pty
from typing import List, Optional
from dataclasses import dataclass
console = Console()
def sleep_control(time_sleep, running):
if running:
time.sleep(time_sleep)
def hprint(color, texte):
timestamp = datetime.now().strftime("%Hh %Mm %Ss")
console.print("[bold "+color+"] ["+timestamp+"] "+texte+" [/bold "+color+"]")
def sprint(script_name,color, texte):
timestamp = datetime.now().strftime("%Hh %Mm %Ss")
console.print("[bold "+color+"] ["+timestamp+"] ("+script_name+") "+texte+" [/bold "+color+"]")
def debug_print(TAG, texte_print, type_debug,script_name):
timestamp = datetime.now().strftime("%Hh %Mm %Ss")
type_color = "blue"
# more color
# DARKCYAN
if TAG == "e": # Erreur
type_color = "red"
console.print("[bold "+type_color+"] ["+timestamp+"] ("+script_name+") "+texte_print+" [/bold "+type_color+"]")
elif TAG == "w": # Warning
type_color = "yellow"
console.print("[bold "+type_color+"] ["+timestamp+"] ("+script_name+") "+texte_print+" [/bold "+type_color+"]")
elif TAG == "g": # Succes
type_color = "green"
console.print("[bold "+type_color+"] ["+timestamp+"] ("+script_name+") "+texte_print+" [/bold "+type_color+"]")
elif TAG == "i": # Info
type_color = "cyan"
console.print("[bold "+type_color+"] ["+timestamp+"] ("+script_name+") "+texte_print+" [/bold "+type_color+"]")
elif TAG == "v" and type_debug != "Info": # Verbos
type_color = "magenta"
console.print("[bold "+type_color+"] ["+timestamp+"] ("+script_name+") "+texte_print+" [/bold "+type_color+"]")
elif TAG == "d" and type_debug != "Verbos" and type_debug != "Info": # Debug
type_color = "blue"
console.print("[bold "+type_color+"] ["+timestamp+"] ("+script_name+") "+texte_print+" [/bold "+type_color+"]")
def del_file(dossier, file):
motif = os.path.join(dossier, file)
for file in glob.glob(motif):
os.remove(file)
hprint("",f"file deleted : {file}")
def del_pathfile(file_path):
try:
os.remove(file_path)
hprint("green", "Fichier Suprimé : " +file_path)
except Exception as e:
hprint("red", "del_file Error : "+ str(e))
# Supprimer le fichier après le traitement
def get_value_json(var_name, config):
return config.get(var_name)
def get_value_json_list(index, key, json_data):
try:
return json_data[index][key]
except (IndexError, KeyError, TypeError):
return None
# Fonction pour obtenir l'heure actuelle sous forme de chaîne
def get_current_time():
current_time = time.time()
return time.strftime('%H:%M:%S', time.localtime(current_time))
# now = datetime.now()
# return now.strftime("%H:%M")
def get_current_date():
current_time = time.time()
return time.strftime('%Yy%mm%dd', time.localtime(current_time))
def dict_to_json_string(data):
"""Convertit un dictionnaire Python en chaîne JSON.
:param data: Dictionnaire Python.
:return: Chaîne JSON.
"""
try:
return json.dumps(data, indent=4, ensure_ascii=False)
except Exception as e:
print(f"Erreur lors de la conversion en JSON : {e}")
return None
def json_string_to_dict(json_string):
""" Convertit une chaîne JSON en dictionnaire Python.
:param json_string: Chaîne JSON.
:return: Dictionnaire Python.
"""
try:
return json.loads(json_string)
except Exception as e:
print(f"Erreur lors de la conversion en dictionnaire : {e}")
return None
class PersistentStorage:
def __init__(self, storage_dir="storage"):
"""
Initialise l'environnement de stockage avec un répertoire dédié.
:param storage_dir: Chemin du répertoire de stockage.
"""
self.storage_dir = storage_dir
def _get_file_path(self, filename):
"""Renvoie le chemin complet d'un fichier dans le répertoire de stockage."""
return os.path.join(self.storage_dir, filename + ".json")
def read(self, filename):
"""
Lit et retourne le contenu d'un fichier sous forme de dictionnaire.
:param filename: Nom du fichier (sans extension).
:return: Dictionnaire des données du fichier ou {} si le fichier est vide/inexistant.
"""
file_path = self._get_file_path(filename)
try:
with open(file_path, 'r', encoding='utf-8') as file:
return json.load(file)
except (FileNotFoundError, json.JSONDecodeError):
return {}
def write(self, filename, key, value):
"""
Ajoute ou met à jour une paire clé-valeur dans un fichier.
:param filename: Nom du fichier (sans extension).
:param key: Clé à ajouter ou mettre à jour.
:param value: Valeur associée.
"""
if not os.path.exists(self.storage_dir):
os.makedirs(self.storage_dir)
data = self.read(filename)
data[key] = value
file_path = self._get_file_path(filename)
with open(file_path, 'w', encoding='utf-8') as file:
json.dump(data, file, indent=4, ensure_ascii=False)
return data
def delete(self, filename, key):
"""
Supprime une clé d'un fichier si elle existe.
:param filename: Nom du fichier (sans extension).
:param key: Clé à supprimer.
"""
data = self.read(filename)
if key in data:
del data[key]
file_path = self._get_file_path(filename)
with open(file_path, 'w', encoding='utf-8') as file:
json.dump(data, file, indent=4, ensure_ascii=False)
return data
def list_keys(self, filename):
"""
Liste toutes les clés présentes dans un fichier.
:param filename: Nom du fichier (sans extension).
:return: Liste des clés.
"""
data = self.read(filename)
return list(data.keys())
def query_all(self):
"""
Interroge tous les fichiers dans le répertoire de stockage et retourne leurs contenus.
:return: Dictionnaire de tous les fichiers et leurs données.
"""
all_data = {}
for file in os.listdir(self.storage_dir):
if file.endswith(".json"):
filename = os.path.splitext(file)[0]
all_data[filename] = self.read(filename)
return all_data
storage = PersistentStorage()
################### RecordTwitch ########################
class RecordTwitch:
def __init__(self, channel_name, record_time):
self.channel_name = channel_name
self.channel_url = "https://www.twitch.tv/" + self.channel_name
self.record_time = record_time
self.max_timerecordfile = 30
self.running = True
self.type_debug = "Debug"
self.script_name = "twitch_record"
if (self.record_time == -1):
self.record_time = 9999
def stop(self):
sprint(self.script_name,"magenta","STOPING RecordTwitch")
self.running = False
def del_mp3(self, dossier):
motif = os.path.join(dossier, '*.mp3')
for file in glob.glob(motif):
os.remove(file)
sprint(self.script_name,"yellow",f"file deleted : {file}")
def clear_diretory(self, dir_inrecord="in_record", dir_record="record"):
#sprint(self.script_name,'green', 'clear_diretory start')
if os.path.exists(dir_inrecord) and os.path.exists(dir_record):
sprint(self.script_name,'green', f"dir {dir_inrecord} and {dir_record} exist clearing")
self.del_mp3(dir_inrecord)
self.del_mp3(dir_record)
def create_session(self):
# Session creating for request
self.ua = UserAgent()
self.session = Streamlink()
self.session.set_option("http-headers", {
"Accept-Language": "en-US,en;q=0.5",
"Connection": "keep-alive",
"DNT": "1",
"Upgrade-Insecure-Requests": "1",
"User-Agent": self.ua.random,
"Client-ID": "your_client_id", # Replace with your actual Client-ID
"Referer": "https://www.google.com/"
})
def get_url(self):
sprint(self.script_name,"green","start get_url")
url = ""
self.create_session()
try:
streams = self.session.streams(self.channel_url)
url = streams['audio_only'].url if 'audio_only' in streams else streams['worst'].url
sprint(self.script_name,"cyan","url generate : " + url)
except Exception as e:
sprint(self.script_name,"red","Error on get url : "+str(e))
# traceback.print_exc() # Affiche les détails de l'erreur
pass
return url
def get_url_with_retry(self, max_attempts=10, retry_delay=30):
"""
Essaie de récupérer l'URL du flux avec des tentatives répétées
"""
sprint(self.script_name,"green","start get_url_with_retry")
attempt = 1
while attempt <= max_attempts and self.running:
sprint(self.script_name,"yellow",f"Tentative {attempt}/{max_attempts} de récupération du flux...")
url = self.get_url()
if url:
sprint(self.script_name,"green",f"Flux trouvé après {attempt} tentative(s)")
return url
if attempt < max_attempts:
sprint(self.script_name,"yellow",f"Aucun flux trouvé. Nouvelle tentative dans {retry_delay} secondes...")
time.sleep(retry_delay)
attempt += 1
sprint(self.script_name,"red",f"Impossible de récupérer l'URL du flux après {max_attempts} tentatives")
return None
def loop_run(self, intervalle):#boucle pour déplacer les fichier fini enregistrement
time.sleep(20) # attente début du script complet et enregistrement
try:
while self.running:
self.verif_record_move()
# sprint(self.script_name,"yellow", f"wait {intervalle}s for next scan file completed.")
sleep_control(intervalle,self.running)
#except KeyboardInterrupt:
except Exception as e:
sprint(self.script_name,"red", "STOP loop_run Error : "+ str(e))
finally:
self.running = False # Assurez-vous que le drapeau est désactivé
def verif_record_move(self, dir_inrecord="in_record", dir_record="record"):
# sprint(self.script_name,"green", "start verif_record_move")
debug_print("v", "Verif_record_move Start", self.type_debug, self.script_name)
if not os.path.exists(dir_record):
os.makedirs(dir_record)
sprint(self.script_name,"green", "création du dossier : "+dir_record)
fichiers = [f for f in os.listdir(dir_inrecord) if os.path.isfile(os.path.join(dir_inrecord, f))]
if len(fichiers) > 1:
# sort file
sort_file = sorted(fichiers, key=lambda x: int(x.split('_')[-1].split('.')[0]))
fileto_move = sort_file[0]
chemin_source = os.path.join(dir_inrecord, fileto_move)
chemin_destination = os.path.join(dir_record, fileto_move)
# move file
shutil.move(chemin_source, chemin_destination)
# sprint(self.script_name,"green",f"File moved: {fileto_move}")
debug_print("d", f"File moved: {fileto_move}", self.type_debug, self.script_name)
def compteur(self):
self.seconds = 0
self.loop = 0
while self.running:
time.sleep(1) # Attend une seconde
# if (self.seconds < 10):
# print(f"\033[94mRecording time: 0{self.seconds}s | file : {self.loop}\033[0m", end='\r', flush=True) # Réinitialise la ligne à chaque fois
# else:
# print(f"\033[94mRecording time: {self.seconds}s | file : {self.loop}\033[0m", end='\r', flush=True) # Réinitialise la ligne à chaque fois
self.seconds += 1
if self.seconds == self.max_timerecordfile:
self.loop +=1
self.seconds = 0 # Réinitialise le compteur après 60 seconds
def get_seconde_compteur(self):
return self.seconds
def get_loop_compteur(self):
return self.loop
def record_audio(self):
output_directory = "record"
in_record_directory = "in_record"
os.makedirs(output_directory, exist_ok=True)
os.makedirs(in_record_directory, exist_ok=True)
timestamp_complet = datetime.now().strftime("%Y%m%d-%H%M%S")
timestamp = datetime.now().strftime("%Y-%m-%d")
output_file_path = os.path.join(in_record_directory, f"{timestamp}_%03d.mp3")
sprint(self.script_name,"green", f"start record")
thread_compteur = Thread(target=self.compteur, daemon=True)
thread_compteur.start()
command = ['ffmpeg','-i', self.stream_url,'-vn','-acodec','libmp3lame','-ar','44100','-ac','2','-map','0:a','-f','segment','-segment_time',str(self.max_timerecordfile),'-segment_format','mp3',output_file_path,'-loglevel', 'error']
# sprint(self.script_name,'yellow',str(command))
process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
output, error = process.communicate()
# thread_compteur.do_run = False
thread_compteur.join()
def main(self):
self.running = True
self.clear_diretory()
# Boucle infinie pour attendre qu'un flux soit disponible
while self.running:
self.stream_url = self.get_url_with_retry()
if not self.stream_url:
sprint(self.script_name,"red","Aucun flux disponible. Attente de 60 secondes avant nouvelle tentative...")
time.sleep(60) # Attendre 1 minute avant de réessayer
continue
# Si on arrive ici, on a trouvé un flux
sprint(self.script_name,"green","Flux trouvé ! Démarrage de l'enregistrement...")
break
if not self.running:
sprint(self.script_name,"yellow","Arrêt demandé pendant la recherche de flux")
return
record_thread = Thread(target=self.record_audio)
record_thread.start()
# record_thread.join() # Wait for the recording to finish
loop_run = Thread(target=self.loop_run, args=(20,), daemon=True)
loop_run.start()
#loop_run.join() # Wait for the recording to finish
# sprint(self.script_name,"cyan","Enregistrement terminé, le programme va se terminer.")
################## RecordTwitch FIN ########################
################### ChatMessage ########################
@dataclass
class ChatMessage:
timestamp: datetime
username: str
content: str
class TwitchChatBot:
def __init__(self, channel: str):
self.channel = channel.lower()
self.messages: List[ChatMessage] = []
self.uri = "wss://irc-ws.chat.twitch.tv:443"
self.is_running = True
self.script_name = "TwitchChat"
self.chat_thread = None
self.loop = None
def stop(self):
sprint(self.script_name, "red", f"Arrêt de la connexion au chat")
self.is_running = False
if self.loop and self.loop.is_running():
self.loop.call_soon_threadsafe(self.loop.stop)
if self.chat_thread and self.chat_thread.is_alive():
self.chat_thread.join(timeout=2) # Attendre max 2 secondes
sprint(self.script_name, "red", "Connexion au chat arrêtée.")
else:
sprint(self.script_name, "yellow", f"Connexion au chat déjà arrêtée")
async def connect_to_twitch_chat(self):
while self.is_running:
try:
async with websockets.connect(self.uri) as websocket:
await websocket.send("CAP REQ :twitch.tv/tags twitch.tv/commands")
await websocket.send(f"NICK justinfan{random.randint(10000,99999)}")
await websocket.send(f"JOIN #{self.channel}")
sprint(self.script_name, "green", f"Connecté au chat de #{self.channel}")
while self.is_running:
try:
message = await websocket.recv()
if "PRIVMSG" in message:
user = message.split("!", 1)[0][1:]
if "display-name=" in user:
user = user.split("display-name=")[1].split(";")[0]
else:
user = user.lower()
msg = message.split("PRIVMSG", 1)[1].split(":", 1)[1]
timestamp = datetime.now()
chat_message = ChatMessage(timestamp=timestamp, username=user, content=msg)
self.messages.append(chat_message)
time_str = timestamp.strftime("%H:%M:%S.%f")[:-3]
sprint(self.script_name, "magenta", f"[{time_str}] {user}: {msg}")
except websockets.exceptions.ConnectionClosed:
sprint(self.script_name, "yellow", "⚠️ Connexion fermée, tentative de reconnexion...")
break
except Exception as e:
sprint(self.script_name, "red", f"Erreur de connexion: {str(e)}")
if self.is_running:
await asyncio.sleep(5)
def run_chat_loop(self):
try:
self.loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.loop)
self.loop.run_until_complete(self.connect_to_twitch_chat())
except Exception as e:
if self.is_running: # Ignorer les erreurs pendant l'arrêt
sprint(self.script_name, "red", f"Erreur dans la boucle du chat: {str(e)}")
finally:
try:
if self.loop and self.loop.is_running():
self.loop.close()
except:
pass
def start_background(self):
"""Lance la connexion au chat dans un thread séparé"""
self.is_running = True
self.chat_thread = threading.Thread(target=self.run_chat_loop)
self.chat_thread.daemon = True
self.chat_thread.start()
################## ChatMessage FIN ########################
################### Subtitle ########################
class Subtitle_translation:
def __init__(self, pathjson):
self.script_name = "translation"
self.type_debug = "Debug"
self.pathjson = pathjson
self.reload_json()
self.dir_whisperX = "whisperX"
self.filename_memory = "subtitle_data"
self.subtitle = {}
self.all_subtitle = ""
self.is_running = True
self.loop_timer = 20
self.language = get_value_json("language", self.config)
self.dir_record = "record"
def stop(self):
sprint(self.script_name,"red",f"Arrêt loop subtitle")
self.is_running = False
if self.loop_thread.is_alive():
# self.loop_thread.join() # Attend la fin du thread pour un arrêt propre
sprint(self.script_name,"red", "Boucle de traitement arrêtée.")
else :
sprint(self.script_name,"yellow",f"loop subtitle déja arreté")
def reload_json(self):
with open(self.pathjson, 'r') as file:
self.config = json.load(file)
def verif_file_transcribe(self):
for file in os.listdir(self.dir_record):
file_path = os.path.join(self.dir_record, file)
if os.path.isfile(file_path):
# sprint(self.script_name,"blue",f"File find -> transcribe.")
debug_print("d", "Creation subtitle : "+file_path, self.type_debug, self.script_name)
# Exécuter une fonction sur le fichier
self.auto_create_subtitle(file)
del_pathfile(file_path)
def remove_repetitions(self, message):
# sprint(self.script_name,"yellow", "remove_repetitions start for: \n" +message)
words = message.split() # Découpe le texte en mots
seen = set() # Pour suivre les mots déjà rencontrés
result = [] # Stocke les mots sans répétition
for word in words:
if word not in seen:
result.append(word)
seen.add(word)
# sprint(self.script_name,"green", "remove_repetitions cleaned text : \n" +" ".join(result))
return " ".join(result)## str this switch
def print_allsubtitle(self):
sprint(self.script_name,"yellow", "\n\n All subtitle Print\n ")
for key, text in self.subtitle.items():
sprint(self.script_name,"yellow", " time "+ str(key)+" = "+ text+" \n\n")
def get_lasttext(self):
# Vérifie que le dictionnaire n'est pas vide
if not self.subtitle:
sprint(self.script_name,"red", "Aucun sous-titre disponible.")
return ""
# Récupère la dernière clé et valeur
last_time, last_text = list(self.subtitle.items())[-1]
sprint(self.script_name,"magenta", f"Dernier sous-titre à {last_time} = {last_text}")
return last_text
def auto_create_subtitle(self,file):
sprint(self.script_name,"green",f"start auto_create_subtitle")
# debug_print("v", "auto_create_subtitle Start", self.type_debug,self.script_name)
del_file("","*.txt")
del_file("","*.srt")
del_file("","*.vtt")
del_file("","*.tsv")
del_file("","*.json")
speak_found = ""
# sprint(self.script_name,"green", "translation : "+file+" ")
# whisperx --language fr --compute_type float32 ../record/final_output_audio.mp3
# record_twitch = ['whisperx', '--language',self.language, '--compute_type','float32', '../'+self.dir_record+"/"+file] #ancienne version
# record_twitch = ['whisper', '--language',self.language, '../'+self.dir_record+"/"+file,"--device","cuda"]
record_twitch = ['whisper', '--language',self.language, './'+self.dir_record+"/"+file,"--device","cuda", "--model","large-v3"]
console.print("[bold yellow] command : "+str(record_twitch)+" [/bold yellow]")
subprocess.run(record_twitch)
file_noext = file.rsplit('.', 1)[0]
with open(file_noext+".txt", "r") as thisfile:
speak_found += thisfile.read().replace("'", "").replace('"', "").replace("\n", " ").replace(",", "")
# parties = file_noext.split('_') # Sépare la base du nom et le numéro + extension
# num_ext = parties[-1] # Prend la dernière partie qui contient "007.mp3"
speak_found = self.remove_repetitions(speak_found)
self.all_subtitle += speak_found+"\n"
current_time = get_current_time()
# if self.subtitle:
self.subtitle[str(current_time)] = speak_found
storage.write(self.filename_memory, str(current_time), speak_found)
sprint(self.script_name,"yellow", "parole du streamer : \n" +speak_found)
sprint(self.script_name,"green","finish create_subtitle")
def main_loop(self):
time.sleep(25)
sprint(self.script_name,"green", "start main boucle_traitement record")
# debug_print("v", "main_loop record Start", self.type_debug,self.script_name)
while self.is_running:
try:
# sprint(self.script_name,"blue",f"wait {self.loop_timer}s loop main translation.")
self.verif_file_transcribe()
sleep_control(self.loop_timer,self.is_running)
except Exception as e:
sprint(self.script_name,"red", "STOP main_loop Error : "+ str(e))
#console.print("[bold green]Enregistrement terminé, le programme va se terminer.[/bold green]")
def start_main_loop(self):
"""Lance dans un thread pour ne pas bloquer le code principal."""
self.loop_thread = threading.Thread(target=self.main_loop)
self.loop_thread.start() # Démarre dans un thread séparé
################### Subtitle FIN ########################
################### IA_GENERATOR TEXT ########################
class IA_generator:
def __init__(self,pathjson):
self.type_debug = "Debug"
self.script_name = "IA_generator"
self.filename_memory = "IA_generator"
self.pathjson = pathjson
self.reload_json()
self.subtitle = {}
self.all_subtitle = ""
self.ia_running = True
self.loop_timer_ia = 20
self.streamer_word = ""
self.last_streamer_word = None
self.index_prompt = 0
self.response_generation = {}
self.list_prompt = get_value_json("list_prompt", self.config)
self.bad_answer = get_value_json("bad_answer", self.config)
def stop(self):
sprint(self.script_name,"red",f"Arrêt loop subtitle")
self.ia_running = False
if self.loop_thread.is_alive():
# self.loop_thread.join() # Attend la fin du thread pour un arrêt propre
sprint(self.script_name,"red", "Boucle de traitement arrêtée.")
else :
sprint(self.script_name,"yellow",f"loop subtitle déja arreté")
def change_prompt(self):
self.index_prompt = len(self.list_prompt) -1
# if self.index_prompt == len(self.list_prompt):
# self.index_prompt = 0
# else :
# self.index_prompt += 1
def reload_json(self):
with open(self.pathjson, 'r') as file:
self.config = json.load(file)
def get_last_values(self, n):
values = list(self.subtitle.values())
return ''.join(values[-n:])
def print_allgeneration(self):
sprint(self.script_name,"yellow", "\n\n Print All response_generation \n ")
for key, text in self.response_generation.items():
sprint(self.script_name,"yellow", " key = "+ str(key)+" = "+ text+" \n\n")
def getlast_generation(self):
# return self.response_generation
if not self.response_generation:
sprint(self.script_name,"red", "Aucun response_generation disponible.")
return ""
# Récupère la dernière clé et valeur
key, last_generation = list(self.response_generation.items())[-1]
# sprint(self.script_name,"magenta", f"Dernier response_generation key : {key} = {last_generation}")
return last_generation
def get_last_subtitle(self):
data = storage.read("subtitle_data")
if data: # Vérifie si data n'est pas vide
# Trie les clés et récupère la dernière
sorted_keys = sorted(data.keys())
last_key = sorted_keys[-1]
self.streamer_word = data[last_key]
# sprint(self.script_name,"blue", "get_last_subtitle : " + data[last_key])
else:
self.generation_text = ""
def clear_response(self, text):
# Vérifie si ":" est dans la chaîne de caractères
if ":" in text:
# Sépare la chaîne et récupère la deuxième partie
return text.split(":", 1)[1].strip()
else:
# Retourne la chaîne telle quelle si ":" n'est pas présent
return text
def imagine_response(self):
# sprint(self.script_name,"green",f"start imagine_response\n")
debug_print("v", "start imagine_response", self.type_debug,self.script_name)
# sprint(self.script_name,"magenta",f"streamer_word : \n"+str(self.streamer_word))
# sprint(self.script_name,"magenta",f"streamer_word : \n"+str(len(self.streamer_word[0])))
if str(self.streamer_word) == "":
sprint(self.script_name,"magenta",f"pas encore de sous titre on quitte génération")
return
prompt_gpt = ['tgpt','-q','-w', '"'+self.list_prompt[self.index_prompt]+' "'+ str(self.streamer_word)+'"']
# sprint(self.script_name,"DARKCYAN","index preprompt : "+str(self.index_prompt)+"\n")
# sprint(self.script_name,"DARKCYAN","\nPrompt demandé : "+str(prompt_gpt)+"\n")
process = subprocess.run(prompt_gpt, capture_output=True, text=True, check=False)
text_generation = process.stdout.replace("'", "").replace('"', "").replace("\n", "")
# sprint(self.script_name,"CYAN","réponse imaginé sans formatage : \n"+text_generation+"\n\n") #debug
textfinal = self.clear_response(text_generation)
nombre_de_mots = len(textfinal.split())
#TODO amélioration condition de génération
# if nombre_de_mots>100:
# sprint(self.script_name,"red","réponse prompt trop grande : "+str(textfinal))
# self.change_prompt()
# self.imagine_response()
# return
# nombre_de_caracteres = len(textfinal)
self.last_streamer_word = self.streamer_word
# sprint(self.script_name,"CYAN","réponse imaginé : \n"+textfinal+"\n\n")
debug_print("d", "Réponse imaginé : \n"+textfinal, self.type_debug,self.script_name)
self.change_prompt()
key = get_current_time()
self.response_generation[str(key)] = textfinal
storage.write(self.filename_memory, str(key), textfinal) #ecrire dans la mémoire fichier
def setnew_streamer_word_text(self, streamer_word_text):
self.streamer_word = streamer_word_text
def get_last_values(self, n):
values = list(self.subtitle.values())
return ''.join(values[-n:])
def main_ask(self, streamer_word_text):
# sprint(self.script_name,"blue", "imagine_response start")
debug_print("i", "PRINCIPAL START imagine_response", self.type_debug,self.script_name)
self.streamer_word = streamer_word_text
imagine_response_thread = Thread(target=self.imagine_response)
imagine_response_thread.start()
def main_loop_ia(self):
time.sleep(20)
# sprint(self.script_name,"blue", "main_loop_ia imagine_response start")
# debug_print("v", "main_loop_ia imagine_response start", self.type_debug,self.script_name)
try:
while self.ia_running:
# sprint(self.script_name,"yellow",f"wait {self.loop_timer_ia}s main_loop_ia imagine_response.")
self.get_last_subtitle()
if(self.last_streamer_word != self.streamer_word):
self.imagine_response()
self.last_streamer_word = self.streamer_word
else:
# sprint(self.script_name,"yellow",f"génération déja créer")
debug_print("d", "génération déja créer", self.type_debug,self.script_name)
sleep_control(self.loop_timer_ia,self.ia_running)
# except KeyboardInterrupt:
# print("Arrêt du script.")
except Exception as e:
sprint(self.script_name,"red", "STOP main_loop_ia Error : "+ str(e))
def start_main_loop(self):
"""Lance dans un thread pour ne pas bloquer le code principal."""
self.ia_running = True
self.loop_thread = threading.Thread(target=self.main_loop_ia)
self.loop_thread.start() # Démarre dans un thread séparé
################### IA_GENERATOR TEXT FIN ########################
# class messageTwitch:
# def __init__(self, channel_name, pseudo, token):
# self.channel_name = channel_name
# self.pseudo = pseudo
# self.token = token
# async def send_message_to_twitch_stream(self, message):
# parsed_url = urlparse(self.channel_name)
# channel = parsed_url.path.lstrip('/')
# pseudo = self.tw_acc_pseudo
# token = self.tw_acc_token
# message = self.ram_msgnow
# async with Client() as client:
# try:
# print("Tentative de login")
# await client.login_oauth(token, pseudo)
# print("Tentative de join")
# await client.join(channel)
# print("Tentative d'envoi de message")
# await client.send_message(message)
# print("Message envoyé avec succès.")
# sleep(45) # Utilisez asyncio.sleep pour éviter de bloquer l'event loop
# await client.part(channel)
# print("Déconnecté.")
# except Exception as e:
# print(f"Erreur lors de l'interaction avec Twitch: {type(e).__name__}, {e}")
################### sendmessageTwitch ########################
class messageTwitch:
def __init__(self, config_user, channel_name):
self.message_running = True
self.script_name = "Control_Twitch"
self.path_file_config_user = config_user
self.channel_name = channel_name
self.indexuser = 0
self.type_debug = "Debug"
with open(self.path_file_config_user, 'r') as file:
self.userjson = json.load(file)
self.totaluser = len(self.userjson)
self.tw_acc_pseudo = get_value_json_list(self.indexuser, "tw_acc_pseudo", self.userjson)
self.tw_acc_token = get_value_json_list(self.indexuser, "tw_acc_token", self.userjson)
self.charactere = get_value_json_list(self.indexuser, "charactere", self.userjson)
self.ram_msgnow = ""
self.generation_text = ""
self.last_respond_word = ""
def set_user(self,index_user):
self.indexuser = index_user
self.tw_acc_pseudo = get_value_json_list(self.indexuser, "tw_acc_pseudo", self.userjson)
self.tw_acc_token = get_value_json_list(self.indexuser, "tw_acc_token", self.userjson)
self.charactere = get_value_json_list(self.indexuser, "charactere", self.userjson)
def stop(self):
self.message_running = False
def change_user(self):
# commented for pausing
debug_print("v", "Changement User Twitch", self.type_debug, self.script_name)
if (self.totaluser != 1): # si la liste ne fait pas que 1 de taille
if(self.totaluser-1 > self.indexuser): # si la taille de liste est plus grande que lindex
self.indexuser = self.indexuser + 1
else :
self.indexuser = 0
self.tw_acc_pseudo = get_value_json_list(self.indexuser, "tw_acc_pseudo", self.userjson)
self.tw_acc_token = get_value_json_list(self.indexuser, "tw_acc_token", self.userjson)
self.charactere = get_value_json_list(self.indexuser, "charactere", self.userjson)
def conversation(self):
user1 = 1
user2 = 2
pseudo1 = self.tw_acc_pseudo = get_value_json_list(user1, "tw_acc_pseudo", self.userjson)
pseudo2 = self.tw_acc_pseudo = get_value_json_list(user2, "tw_acc_pseudo", self.userjson)
self.send_message_user(user1,"yo @"+pseudo2+" ^^")
sleep(10)
self.send_message_user(user1,"Heyyy cool ? @"+pseudo1+"")
def send_message_user(self,index_user, Message_text):
hprint("cyan","start send_message_user")
self.set_user(index_user)
self.send_message(Message_text)
hprint("cyan",f"Finnish send send_message_user index :" + str(index_user))
def send_message(self, Message_text):
try:
# Vérifier si l'envoi de messages est activé (variable globale accessible depuis web_interface)
import sys
import os
sys.path.append(os.path.dirname(os.path.dirname(__file__)))
# Essayer d'importer la variable depuis web_interface
try:
from web_interface import chat_messages_enabled
if not chat_messages_enabled:
debug_print("w", "Envoi de messages désactivé, message ignoré", self.type_debug, self.script_name)
return
except ImportError:
# Si web_interface n'est pas disponible, on continue normalement
pass
command = '-pseudo "'+self.tw_acc_pseudo+'" -token "'+self.tw_acc_token+'" -twitchname "'+self.channel_name+'" -message " '+self.charactere+' '+Message_text+'"'
# Utiliser le Python de l'environnement virtuel
python_path = os.path.join(os.path.dirname(os.path.dirname(__file__)), 'env', 'bin', 'python')
message_tosend = python_path + ' send_message.py '+ str(command)
debug_print("d", f"send message = "+str(message_tosend), self.type_debug, self.script_name)
sprint(self.script_name,"cyan",f"send message = "+str(message_tosend))
subprocess.run(message_tosend, shell=True)
# self.ram_msgnow = Message_text
# await self.send_message_to_twitch_stream()
# asyncio.run(self.send_message_to_twitch_stream())
self.last_respond_word = Message_text
# sprint(self.script_name,"cyan",f"Finnish send_message message")
debug_print("g", "Finnish send_message message", self.type_debug,self.script_name)
except Exception as e:
sprint(self.script_name,"red", "send_message Error : "+ str(e))
async def send_message_to_twitch_stream(self):
parsed_url = urlparse(self.channel_name)
channel = parsed_url.path.lstrip('/')
pseudo = self.tw_acc_pseudo
token = self.tw_acc_token
message = self.ram_msgnow
async with Client() as client:
try:
print("Tentative de login")
await client.login_oauth(token, pseudo)
# while self.message_running:
print("Tentative de join")
await client.join(channel)
print("Tentative d'envoi de message")
await client.send_message(message)
print("Message envoyé avec succès.")
sleep_control(25,self.message_running)
await client.part(channel)
print("disconnect.")
except Exception as e:
print(f"Erreur lors de l'interaction avec Twitch: {type(e).__name__}, {e}")
def get_last_generation(self):
data = storage.read("IA_generator")
if data: # Vérifie si data n'est pas vide
# Trie les clés et récupère la dernière
sorted_keys = sorted(data.keys())
last_key = sorted_keys[-1]
self.generation_text = data[last_key]
# sprint(self.script_name,"blue", "get_last_generation : " + data[last_key])
else:
debug_print("e", "get_last_generation = No generation: ", self.type_debug, self.script_name)
# sprint(self.script_name,"red", "get_last_generation = No generation: ")
self.generation_text = ""
def start_main_loop_respond(self):
time.sleep(40) #wait starting script
sprint(self.script_name,"blue", "main_loop_respond start")
try:
while self.message_running:
self.get_last_generation()
if (self.generation_text == ""):
sprint(self.script_name,"yellow",f"pas encore de génération")
sleep_control(45,self.message_running)
continue
if(self.last_respond_word != self.generation_text):
# sprint(self.script_name,"green",f"\n\n ENVOIE MESSAGE : \n "+self.generation_text)
debug_print("v", "Début envoie message : "+self.generation_text, self.type_debug, self.script_name)
self.send_message(self.generation_text) # envoie de message
self.last_respond_word = self.generation_text # mise a jour du dernier message envoyé
self.change_user() # cahngement de user
else:
# sprint(self.script_name,"yellow",f"génération déja envoyé")
debug_print("d", "génération déja envoyé", self.type_debug, self.script_name)
sleep_control(20,self.message_running)
except Exception as e:
sprint(self.script_name,"red", "STOP main_loop_respond Error : "+ str(e))
def start_loop_respond(self):
"""Lance dans un thread pour ne pas bloquer le code principal."""
self.message_running = True
self.loop_thread = threading.Thread(target=self.start_main_loop_respond)
self.loop_thread.start() # Démarre dans un thread séparé
################### sendmessageTwitch FIN ########################