import os import signal import sys import argparse import json import time import pprint import random import requests import datetime import glob import threading # import keyboard from pynput import keyboard from threading import Thread, Semaphore from streamlink import Streamlink from fake_useragent import UserAgent from rich.console import Console from rich.live import Live from rich.prompt import Prompt from rich.spinner import Spinner from rich.table import Table from rich.text import Text import subprocess import pty console = Console() def hprint(color, texte): timestamp = datetime.datetime.now().strftime("%Hh %Mm %Ss") console.print("[bold "+color+"] ["+timestamp+"] "+texte+" [/bold "+color+"]") def del_file(dossier, file): motif = os.path.join(dossier, file) for file in glob.glob(motif): os.remove(file) hprint("",f"file deleted : {file}") def get_value_json(var_name, config): return config.get(var_name) def get_value_json_list(index, key, json_data): try: return json_data[index][key] except (IndexError, KeyError, TypeError): return None # Fonction pour obtenir l'heure actuelle sous forme de chaîne def get_current_time(): current_time = time.time() return time.strftime('%H:%M:%S', time.localtime(current_time)) # now = datetime.now() # return now.strftime("%H:%M") ################### RecordTwitch ######################## class RecordTwitch: def __init__(self, channel_name, recordtime): self.channel_name = channel_name self.all_proxies = [] self.channel_url = "https://www.twitch.tv/" + self.channel_name self.record_time = recordtime self.dir_twitchrecord = "twitch-recordAudio" self.process = None # Pour stocker le sous-processus en cours if (self.record_time == -1): self.record_time = 9999 def stop(self): """Arrête le processus d'enregistrement s'il est en cours.""" if self.process and self.process.poll() is None: # Vérifie si le processus est actif hprint("yellow",f"Arrêt de l'enregistrement...") self.process.terminate() # Envoie un signal de terminaison try: self.process.wait(timeout=5) # Attend la fin avec un délai de 5 secondes except subprocess.TimeoutExpired: hprint("RED",f"Le processus ne répond pas, forçage de l'arrêt.") self.process.kill() # Forcer l'arrêt si le processus ne termine pas hprint("yellow",f"Enregistrement arrêté.") else: hprint("green",f"Aucun enregistrement en cours.") def hprint(self,color, texte): timestamp = datetime.datetime.now().strftime("%Hh %Mm %Ss") console.print("[bold "+color+"] ["+timestamp+"] "+texte+" [/bold "+color+"]") # Fonction pour convertir l'heure en secondes def time_to_seconds(time_str): h, m, s = map(int, time_str.split(':')) return h * 3600 + m * 60 + s # Fonction pour rechercher dans le dictionnaire def search_dict(d, key): return d.get(key, "Key not found") def main_record_twitch(self): # Utiliser Popen à la place de run pour pouvoir arrêter le processus try: print(f"Lancement de l'enregistrement pour {self.channel_name} pendant {self.record_time} secondes.") record_twitch = ['python', self.dir_twitchrecord+'/twitch_record_unlimited.py', '-twitchname', self.channel_name,'-recordtime',str(self.record_time)] self.process = subprocess.Popen(record_twitch) self.process.communicate() # Attend que le sous-processus termine except Exception as e: print(f"Erreur pendant l'enregistrement: {e}") def start_recording(self): """Lance main_record_twitch dans un thread pour ne pas bloquer le code principal.""" self.thread = threading.Thread(target=self.main_record_twitch) self.thread.start() # Démarre l'enregistrement dans un thread séparé ################### RecordTwitch FIN ######################## ################### Subtitle ######################## class Subtitle_translation: def __init__(self, pathjson): self.pathjson = pathjson self.reload_json() self.dir_whisperX = "whisperX" self.subtitle = {} self.all_subtitle = "" self.is_running = True self.loop_timer = 30 self.language = get_value_json("language", self.config) self.dir_record = "record" def stop(self): hprint("yellow",f"Arrêt loop subtitle") self.is_running = False if self.loop_thread.is_alive(): # self.loop_thread.join() # Attend la fin du thread pour un arrêt propre hprint("red", "Boucle de traitement arrêtée.") else : hprint("yellow",f"loop subtitle déja arreté") def reload_json(self): with open(self.pathjson, 'r') as file: self.config = json.load(file) def verif_file_transcribe(self): for file in os.listdir(self.dir_record): file_path = os.path.join(self.dir_record, file) if os.path.isfile(file_path): hprint("blue",f"File find -> transcribe.") # Exécuter une fonction sur le fichier self.auto_create_subtitle(file) # Supprimer le fichier après le traitement os.remove(file_path) print(f"Fichier {file_path} supprimé.") def remove_repetitions(self, message): """ Supprime les répétitions dans un message en conservant uniquement le premier exemple d'une phrase répétée. :param message: Le message sous forme de chaîne de caractères :return: Le message sans répétitions """ hprint("yellow", "remove_repetitions start for: \n" +message) words = message.split() # Découpe le texte en mots seen = set() # Pour suivre les mots déjà rencontrés result = [] # Stocke les mots sans répétition for word in words: if word not in seen: result.append(word) seen.add(word) hprint("green", "remove_repetitions cleaned text : \n" +" ".join(result)) return " ".join(result)## str this switch def print_allsubtitle(self): os.system('clear') hprint("yellow", "\n\n All subtitle Print\n ") for key, text in self.subtitle.items(): hprint("yellow", " time "+ str(key)+" = "+ text+" \n\n") def get_lasttext(self): # Vérifie que le dictionnaire n'est pas vide if not self.subtitle: hprint("red", "Aucun sous-titre disponible.") return "" # Récupère la dernière clé et valeur last_time, last_text = list(self.subtitle.items())[-1] hprint("magenta", f"Dernier sous-titre à {last_time} = {last_text}") return last_text def auto_create_subtitle(self,file): hprint("green",f"start auto_create_subtitle") del_file("","*.txt") del_file("","*.srt") del_file("","*.vtt") del_file("","*.tsv") del_file("","*.json") # subtitle_directory = "subtitle" # os.makedirs(subtitle_directory, exist_ok=True) # os.chdir(subtitle_directory) spead_find = "" hprint("green", "translation : "+file+" ") # whisperx --language fr --compute_type float32 ../record/final_output_audio.mp3 # record_twitch = ['whisperx', '--language',self.language, '--compute_type','float32', '../'+self.dir_record+"/"+file] #ancienne version # record_twitch = ['whisper', '--language',self.language, '../'+self.dir_record+"/"+file,"--device","cuda"] record_twitch = ['whisper', '--language',self.language, './'+self.dir_record+"/"+file,"--device","cuda", "--model","large-v3"] console.print("[bold yellow] command : "+str(record_twitch)+" [/bold yellow]") subprocess.run(record_twitch) file_noext = file.rsplit('.', 1)[0] with open(file_noext+".txt", "r") as thisfile: spead_find += thisfile.read().replace("'", "").replace('"', "").replace("\n", " ").replace(",", "") # parties = file_noext.split('_') # Sépare la base du nom et le numéro + extension # num_ext = parties[-1] # Prend la dernière partie qui contient "007.mp3" spead_find = self.remove_repetitions(spead_find) self.all_subtitle += spead_find+"\n" num_ext = get_current_time() self.subtitle[str(num_ext)] = spead_find hprint("yellow", "parole du streamer : \n" +spead_find) # attention a No active speech found in audio # os.chdir("../") console.print("[bold PURPLE] finish create_subtitle [/bold PURPLE]") def main_loop(self): time.sleep(45) hprint("green", "start main boucle_traitement record") try: while self.is_running: hprint("blue",f"wait {self.loop_timer}s loop main translation.") # Attend 'intervalle' secondes avant la prochaine itération self.verif_file_transcribe() time.sleep(self.loop_timer) except KeyboardInterrupt: print("Arrêt du script.") # #console.print("[bold green]Enregistrement terminé, le programme va se terminer.[/bold green]") def start_main_loop(self): """Lance dans un thread pour ne pas bloquer le code principal.""" self.loop_thread = threading.Thread(target=self.main_loop) self.loop_thread.start() # Démarre dans un thread séparé ################### Subtitle FIN ######################## ################### IA_GENERATOR TEXT ######################## class IA_generator: def __init__(self,pathjson): self.pathjson = pathjson self.reload_json() self.subtitle = {} self.all_subtitle = "" self.is_running = True self.loop_timer = 60 self.streamer_word = "" self.index_prompt = 0 self.response_generation = {} self.list_prompt = get_value_json("list_prompt", self.config) self.bad_answer = get_value_json("bad_answer", self.config) def stop(self): hprint("yellow",f"Arrêt loop subtitle") self.is_running = False if self.loop_thread.is_alive(): # self.loop_thread.join() # Attend la fin du thread pour un arrêt propre hprint("red", "Boucle de traitement arrêtée.") else : hprint("yellow",f"loop subtitle déja arreté") def change_prompt(self): if self.index_prompt == len(self.list_prompt): self.index_prompt = 0 else : self.index_prompt += 1 def reload_json(self): with open(self.pathjson, 'r') as file: self.config = json.load(file) def get_last_values(self, n): values = list(self.subtitle.values()) return ''.join(values[-n:]) def print_allgeneration(self): os.system('clear') hprint("yellow", "\n\n Print All response_generation \n ") for key, text in self.response_generation.items(): hprint("yellow", " key = "+ str(key)+" = "+ text+" \n\n") def getlast_generation(self): # return self.response_generation if not self.response_generation: hprint("red", "Aucun response_generation disponible.") return "" # Récupère la dernière clé et valeur key, last_generation = list(self.response_generation.items())[-1] hprint("magenta", f"Dernier response_generation key : {key} = {last_generation}") return last_generation def clear_response(self, text): # Vérifie si ":" est dans la chaîne de caractères if ":" in text: # Sépare la chaîne et récupère la deuxième partie return text.split(":", 1)[1].strip() else: # Retourne la chaîne telle quelle si ":" n'est pas présent return text def imagine_response(self): hprint("green",f"start imagine_response\n") # hprint("magenta",f"streamer_word : \n"+str(self.streamer_word)) # hprint("magenta",f"streamer_word : \n"+str(len(self.streamer_word[0]))) if str(self.streamer_word) == "": hprint("magenta",f"pas encore de sosu titre on quitte génération") return prompt_gpt = ['tgpt','-q','-w', '"'+self.list_prompt[self.index_prompt]+' "'+ str(self.streamer_word)+'"'] hprint("DARKCYAN","index preprompt : "+str(self.index_prompt)+"\n") hprint("DARKCYAN","\nPrompt demandé : "+str(prompt_gpt)+"\n") process = subprocess.run(prompt_gpt, capture_output=True, text=True, check=False) text_generation = process.stdout.replace("'", "").replace('"', "").replace("\n", "") # hprint("CYAN","réponse imaginé sans formatage : \n"+text_generation+"\n\n") #debug textfinal = self.clear_response(text_generation) # nombre_de_mots = len(textfinal.split()) # nombre_de_caracteres = len(textfinal) hprint("CYAN","réponse imaginé : \n"+textfinal+"\n\n") self.change_prompt() key = get_current_time() self.response_generation[str(key)] = textfinal def setnew_streamer_word_text(self, streamer_word_text): self.streamer_word = streamer_word_text def get_last_values(self, n): values = list(self.subtitle.values()) return ''.join(values[-n:]) def main_ask(self, streamer_word_text): hprint("blue", "imagine_response start") self.streamer_word = streamer_word_text imagine_response_thread = Thread(target=self.imagine_response) imagine_response_thread.start() def main_loop(self): hprint("blue", "main_loop imagine_response start") time.sleep(100) try: while self.is_running: hprint("blue",f"wait {self.loop_timer}s loop main imagine_response.") self.imagine_response() time.sleep(self.loop_timer) except KeyboardInterrupt: print("Arrêt du script.") def start_main_loop(self): """Lance dans un thread pour ne pas bloquer le code principal.""" self.loop_thread = threading.Thread(target=self.main_loop) self.loop_thread.start() # Démarre dans un thread séparé ################### IA_GENERATOR TEXT FIN ######################## ################### sendmessageTwitch ######################## class messageTwitch: def __init__(self, channel_name): self.channel_name = channel_name self.indexuser = 0 with open("../config/user.json", 'r') as file: self.userjson = json.load(file) self.totaluser = len(self.userjson) self.tw_acc_pseudo = get_value_json_list(self.indexuser, "tw_acc_pseudo", self.userjson) self.tw_acc_token = get_value_json_list(self.indexuser, "tw_acc_token", self.userjson) self.charactere = get_value_json_list(self.indexuser, "charactere", self.userjson) def set_user(self,index_user): self.indexuser = index_user self.tw_acc_pseudo = get_value_json_list(self.indexuser, "tw_acc_pseudo", self.userjson) self.tw_acc_token = get_value_json_list(self.indexuser, "tw_acc_token", self.userjson) self.charactere = get_value_json_list(self.indexuser, "charactere", self.userjson) def change_user(self): #commented for pausing #if (self.totaluser != 1): # si la liste ne fait que 1 de taille # if(self.totaluser+1 > self.indexuser): # si la taille de liste est plus grande que lindex # self.indexuser = self.indexuser + 1 #else : # self.indexuser = 0 self.tw_acc_pseudo = get_value_json_list(self.indexuser, "tw_acc_pseudo", self.userjson) self.tw_acc_token = get_value_json_list(self.indexuser, "tw_acc_token", self.userjson) self.charactere = get_value_json_list(self.indexuser, "charactere", self.userjson) def send_message(self, Message_text): hprint("green","start send_message") command = '-pseudo "'+self.tw_acc_pseudo+'" -token "'+self.tw_acc_token+'" -twitchname "'+self.channel_name+'" -message " '+self.charactere+' '+Message_text+'"' message_tosend = 'python send_message.py '+command hprint("yellow",f"send message = "+str(message_tosend)) subprocess.run(message_tosend, shell=True) hprint("yellow",f"Finnish send message") ################### sendmessageTwitch FIN ########################