import os import signal import sys import argparse import json import time import pprint import random import requests import shutil import datetime import glob import threading # import keyboard from pynput import keyboard from threading import Thread, Semaphore from streamlink import Streamlink from fake_useragent import UserAgent from rich.console import Console from rich.live import Live from rich.prompt import Prompt from rich.spinner import Spinner from rich.table import Table from rich.text import Text import subprocess import pty console = Console() def hprint(color, texte): timestamp = datetime.datetime.now().strftime("%Hh %Mm %Ss") console.print("[bold "+color+"] ["+timestamp+"] "+texte+" [/bold "+color+"]") def sprint(script_name,color, texte): timestamp = datetime.datetime.now().strftime("%Hh %Mm %Ss") console.print("[bold "+color+"] ["+timestamp+"] ("+script_name+") "+texte+" [/bold "+color+"]") def del_file(dossier, file): motif = os.path.join(dossier, file) for file in glob.glob(motif): os.remove(file) hprint("",f"file deleted : {file}") def get_value_json(var_name, config): return config.get(var_name) def get_value_json_list(index, key, json_data): try: return json_data[index][key] except (IndexError, KeyError, TypeError): return None # Fonction pour obtenir l'heure actuelle sous forme de chaîne def get_current_time(): current_time = time.time() return time.strftime('%H:%M:%S', time.localtime(current_time)) # now = datetime.now() # return now.strftime("%H:%M") def get_current_date(): current_time = time.time() return time.strftime('%Yy%mm%dd', time.localtime(current_time)) def dict_to_json_string(data): """Convertit un dictionnaire Python en chaîne JSON. :param data: Dictionnaire Python. :return: Chaîne JSON. """ try: return json.dumps(data, indent=4, ensure_ascii=False) except Exception as e: print(f"Erreur lors de la conversion en JSON : {e}") return None def json_string_to_dict(json_string): """ Convertit une chaîne JSON en dictionnaire Python. :param json_string: Chaîne JSON. :return: Dictionnaire Python. """ try: return json.loads(json_string) except Exception as e: print(f"Erreur lors de la conversion en dictionnaire : {e}") return None class PersistentStorage: def __init__(self, storage_dir="storage"): """ Initialise l'environnement de stockage avec un répertoire dédié. :param storage_dir: Chemin du répertoire de stockage. """ self.storage_dir = storage_dir def _get_file_path(self, filename): """Renvoie le chemin complet d'un fichier dans le répertoire de stockage.""" return os.path.join(self.storage_dir, filename + ".json") def read(self, filename): """ Lit et retourne le contenu d'un fichier sous forme de dictionnaire. :param filename: Nom du fichier (sans extension). :return: Dictionnaire des données du fichier ou {} si le fichier est vide/inexistant. """ file_path = self._get_file_path(filename) try: with open(file_path, 'r', encoding='utf-8') as file: return json.load(file) except (FileNotFoundError, json.JSONDecodeError): return {} def write(self, filename, key, value): """ Ajoute ou met à jour une paire clé-valeur dans un fichier. :param filename: Nom du fichier (sans extension). :param key: Clé à ajouter ou mettre à jour. :param value: Valeur associée. """ if not os.path.exists(self.storage_dir): os.makedirs(self.storage_dir) data = self.read(filename) data[key] = value file_path = self._get_file_path(filename) with open(file_path, 'w', encoding='utf-8') as file: json.dump(data, file, indent=4, ensure_ascii=False) return data def delete(self, filename, key): """ Supprime une clé d'un fichier si elle existe. :param filename: Nom du fichier (sans extension). :param key: Clé à supprimer. """ data = self.read(filename) if key in data: del data[key] file_path = self._get_file_path(filename) with open(file_path, 'w', encoding='utf-8') as file: json.dump(data, file, indent=4, ensure_ascii=False) return data def list_keys(self, filename): """ Liste toutes les clés présentes dans un fichier. :param filename: Nom du fichier (sans extension). :return: Liste des clés. """ data = self.read(filename) return list(data.keys()) def query_all(self): """ Interroge tous les fichiers dans le répertoire de stockage et retourne leurs contenus. :return: Dictionnaire de tous les fichiers et leurs données. """ all_data = {} for file in os.listdir(self.storage_dir): if file.endswith(".json"): filename = os.path.splitext(file)[0] all_data[filename] = self.read(filename) return all_data storage = PersistentStorage() ################### RecordTwitch ######################## class RecordTwitch: def __init__(self, channel_name, record_time): self.channel_name = channel_name self.channel_url = "https://www.twitch.tv/" + self.channel_name self.record_time = record_time self.max_timerecordfile = 30 self.running = True self.script_name = "twitch_record" if (self.record_time == -1): self.record_time = 9999 def stop(self): sprint(self.script_name,"magenta","STOPING RecordTwitch") self.running = False def del_mp3(self, dossier): motif = os.path.join(dossier, '*.mp3') for file in glob.glob(motif): os.remove(file) sprint(self.script_name,"yellow",f"file deleted : {file}") def clear_diretory(self, dir_inrecord="in_record", dir_record="record"): #sprint(self.script_name,'green', 'clear_diretory start') if os.path.exists(dir_inrecord) and os.path.exists(dir_record): sprint(self.script_name,'green', f"dir {dir_inrecord} and {dir_record} exist clearing") self.del_mp3(dir_inrecord) self.del_mp3(dir_record) def create_session(self): # Session creating for request self.ua = UserAgent() self.session = Streamlink() self.session.set_option("http-headers", { "Accept-Language": "en-US,en;q=0.5", "Connection": "keep-alive", "DNT": "1", "Upgrade-Insecure-Requests": "1", "User-Agent": self.ua.random, "Client-ID": "your_client_id", # Replace with your actual Client-ID "Referer": "https://www.google.com/" }) def get_url(self): sprint(self.script_name,"green","start get_url") url = "" self.create_session() try: streams = self.session.streams(self.channel_url) url = streams['audio_only'].url if 'audio_only' in streams else streams['worst'].url sprint(self.script_name,"cyan","url generate : " + url) except Exception as e: sprint(self.script_name,"red","Error on get url : "+str(e)) # traceback.print_exc() # Affiche les détails de l'erreur pass return url def loop_run(self, intervalle):#boucle pour déplacer les fichier fini enregistrement time.sleep(30) # attente début du script complet et enregistrement try: while self.running: self.verif_record_move() sprint(self.script_name,"yellow", f"wait {intervalle}s for next scan file completed.") time.sleep(intervalle) #except KeyboardInterrupt: except Exception as e: sprint(self.script_name,"red", "STOP loop_run Error : "+ str(e)) finally: self.running = False # Assurez-vous que le drapeau est désactivé def verif_record_move(self, dir_inrecord="in_record", dir_record="record"): sprint(self.script_name,"green", "start verif_record_move") if not os.path.exists(dir_record): os.makedirs(dir_record) sprint(self.script_name,"green", "création du dossier : "+dir_record) fichiers = [f for f in os.listdir(dir_inrecord) if os.path.isfile(os.path.join(dir_inrecord, f))] if len(fichiers) > 1: # sort file sort_file = sorted(fichiers, key=lambda x: int(x.split('_')[-1].split('.')[0])) fileto_move = sort_file[0] chemin_source = os.path.join(dir_inrecord, fileto_move) chemin_destination = os.path.join(dir_record, fileto_move) # move file shutil.move(chemin_source, chemin_destination) sprint(self.script_name,"green",f"File moved: {fileto_move}") else: sprint(self.script_name,"yellow","Not enough files to compare.") def compteur(self ): seconds = 0 loop = 0 while self.running: time.sleep(1) # Attend une seconde if (seconds < 10): print(f"\033[94mRecording time: 0{seconds}s | file : {loop}\033[0m", end='\r', flush=True) # Réinitialise la ligne à chaque fois else: print(f"\033[94mRecording time: {seconds}s | file : {loop}\033[0m", end='\r', flush=True) # Réinitialise la ligne à chaque fois seconds += 1 if seconds == self.max_timerecordfile: loop +=1 seconds = 0 # Réinitialise le compteur après 60 seconds def record_audio(self): output_directory = "record" in_record_directory = "in_record" os.makedirs(output_directory, exist_ok=True) os.makedirs(in_record_directory, exist_ok=True) timestamp_complet = datetime.datetime.now().strftime("%Y%m%d-%H%M%S") timestamp = datetime.datetime.now().strftime("%Y-%m-%d") output_file_path = os.path.join(in_record_directory, f"{timestamp}_%03d.mp3") sprint(self.script_name,"green", f"start record") thread_compteur = Thread(target=self.compteur, daemon=True) thread_compteur.start() command = ['ffmpeg','-i', self.stream_url,'-vn','-acodec','libmp3lame','-ar','44100','-ac','2','-map','0:a','-f','segment','-segment_time',str(self.max_timerecordfile),'-segment_format','mp3',output_file_path,'-loglevel', 'error'] # sprint(self.script_name,'yellow',str(command)) process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) output, error = process.communicate() # thread_compteur.do_run = False thread_compteur.join() def main(self): self.running = True self.clear_diretory() self.stream_url = self.get_url() if not self.stream_url: sprint(self.script_name,"red","Impossible de récupérer l'URL du flux") return record_thread = Thread(target=self.record_audio) record_thread.start() # record_thread.join() # Wait for the recording to finish loop_run = Thread(target=self.loop_run, args=(20,), daemon=True) loop_run.start() #loop_run.join() # Wait for the recording to finish sprint(self.script_name,"cyan","Enregistrement terminé, le programme va se terminer.") ################## RecordTwitch FIN ######################## ################### Subtitle ######################## class Subtitle_translation: def __init__(self, pathjson): self.script_name = "translation" self.pathjson = pathjson self.reload_json() self.dir_whisperX = "whisperX" self.filename_memory = "subtitle_" +get_current_date() self.subtitle = {} self.all_subtitle = "" self.is_running = True self.loop_timer = 20 self.language = get_value_json("language", self.config) self.dir_record = "record" def stop(self): sprint(self.script_name,"yellow",f"Arrêt loop subtitle") self.is_running = False if self.loop_thread.is_alive(): # self.loop_thread.join() # Attend la fin du thread pour un arrêt propre sprint(self.script_name,"red", "Boucle de traitement arrêtée.") else : sprint(self.script_name,"yellow",f"loop subtitle déja arreté") def reload_json(self): with open(self.pathjson, 'r') as file: self.config = json.load(file) def verif_file_transcribe(self): for file in os.listdir(self.dir_record): file_path = os.path.join(self.dir_record, file) if os.path.isfile(file_path): sprint(self.script_name,"blue",f"File find -> transcribe.") # Exécuter une fonction sur le fichier self.auto_create_subtitle(file) # Supprimer le fichier après le traitement os.remove(file_path) print(f"Fichier {file_path} supprimé.") def remove_repetitions(self, message): sprint(self.script_name,"yellow", "remove_repetitions start for: \n" +message) words = message.split() # Découpe le texte en mots seen = set() # Pour suivre les mots déjà rencontrés result = [] # Stocke les mots sans répétition for word in words: if word not in seen: result.append(word) seen.add(word) sprint(self.script_name,"green", "remove_repetitions cleaned text : \n" +" ".join(result)) return " ".join(result)## str this switch def print_allsubtitle(self): os.system('clear') sprint(self.script_name,"yellow", "\n\n All subtitle Print\n ") for key, text in self.subtitle.items(): sprint(self.script_name,"yellow", " time "+ str(key)+" = "+ text+" \n\n") def get_lasttext(self): # Vérifie que le dictionnaire n'est pas vide if not self.subtitle: sprint(self.script_name,"red", "Aucun sous-titre disponible.") return "" # Récupère la dernière clé et valeur last_time, last_text = list(self.subtitle.items())[-1] sprint(self.script_name,"magenta", f"Dernier sous-titre à {last_time} = {last_text}") return last_text def auto_create_subtitle(self,file): sprint(self.script_name,"green",f"start auto_create_subtitle") del_file("","*.txt") del_file("","*.srt") del_file("","*.vtt") del_file("","*.tsv") del_file("","*.json") # subtitle_directory = "subtitle" # os.makedirs(subtitle_directory, exist_ok=True) # os.chdir(subtitle_directory) speak_found = "" sprint(self.script_name,"green", "translation : "+file+" ") # whisperx --language fr --compute_type float32 ../record/final_output_audio.mp3 # record_twitch = ['whisperx', '--language',self.language, '--compute_type','float32', '../'+self.dir_record+"/"+file] #ancienne version # record_twitch = ['whisper', '--language',self.language, '../'+self.dir_record+"/"+file,"--device","cuda"] record_twitch = ['whisper', '--language',self.language, './'+self.dir_record+"/"+file,"--device","cuda", "--model","large-v3"] console.print("[bold yellow] command : "+str(record_twitch)+" [/bold yellow]") subprocess.run(record_twitch) file_noext = file.rsplit('.', 1)[0] with open(file_noext+".txt", "r") as thisfile: speak_found += thisfile.read().replace("'", "").replace('"', "").replace("\n", " ").replace(",", "") # parties = file_noext.split('_') # Sépare la base du nom et le numéro + extension # num_ext = parties[-1] # Prend la dernière partie qui contient "007.mp3" speak_found = self.remove_repetitions(speak_found) self.all_subtitle += speak_found+"\n" current_time = get_current_time() self.subtitle[str(current_time)] = speak_found storage.write(self.filename_memory, str(current_time), speak_found) sprint(self.script_name,"yellow", "parole du streamer : \n" +speak_found) # attention a No active speech found in audio # os.chdir("../") sprint(self.script_name,"green","finish create_subtitle") def main_loop(self): time.sleep(45) sprint(self.script_name,"green", "start main boucle_traitement record") try: while self.is_running: sprint(self.script_name,"blue",f"wait {self.loop_timer}s loop main translation.") # Attend 'intervalle' secondes avant la prochaine itération self.verif_file_transcribe() time.sleep(self.loop_timer) # except KeyboardInterrupt: # print("Arrêt du script.") except Exception as e: sprint(self.script_name,"red", "STOP main_loop Error : "+ str(e)) # #console.print("[bold green]Enregistrement terminé, le programme va se terminer.[/bold green]") def start_main_loop(self): """Lance dans un thread pour ne pas bloquer le code principal.""" self.loop_thread = threading.Thread(target=self.main_loop) self.loop_thread.start() # Démarre dans un thread séparé ################### Subtitle FIN ######################## ################### IA_GENERATOR TEXT ######################## class IA_generator: def __init__(self,pathjson): self.script_name = "IA_generator" self.filename_memory = "IA_generator_" +get_current_date() self.pathjson = pathjson self.reload_json() self.subtitle = {} self.all_subtitle = "" self.ia_running = True self.loop_timer_ia = 20 self.streamer_word = "" self.last_streamer_word = None self.index_prompt = 0 self.response_generation = {} self.list_prompt = get_value_json("list_prompt", self.config) self.bad_answer = get_value_json("bad_answer", self.config) def stop(self): sprint(self.script_name,"yellow",f"Arrêt loop subtitle") self.ia_running = False if self.loop_thread.is_alive(): # self.loop_thread.join() # Attend la fin du thread pour un arrêt propre sprint(self.script_name,"red", "Boucle de traitement arrêtée.") else : sprint(self.script_name,"yellow",f"loop subtitle déja arreté") def change_prompt(self): self.index_prompt = len(self.list_prompt) -1 # if self.index_prompt == len(self.list_prompt): # self.index_prompt = 0 # else : # self.index_prompt += 1 def reload_json(self): with open(self.pathjson, 'r') as file: self.config = json.load(file) def get_last_values(self, n): values = list(self.subtitle.values()) return ''.join(values[-n:]) def print_allgeneration(self): os.system('clear') sprint(self.script_name,"yellow", "\n\n Print All response_generation \n ") for key, text in self.response_generation.items(): sprint(self.script_name,"yellow", " key = "+ str(key)+" = "+ text+" \n\n") def getlast_generation(self): # return self.response_generation if not self.response_generation: sprint(self.script_name,"red", "Aucun response_generation disponible.") return "" # Récupère la dernière clé et valeur key, last_generation = list(self.response_generation.items())[-1] sprint(self.script_name,"magenta", f"Dernier response_generation key : {key} = {last_generation}") return last_generation def clear_response(self, text): # Vérifie si ":" est dans la chaîne de caractères if ":" in text: # Sépare la chaîne et récupère la deuxième partie return text.split(":", 1)[1].strip() else: # Retourne la chaîne telle quelle si ":" n'est pas présent return text def imagine_response(self): sprint(self.script_name,"green",f"start imagine_response\n") # sprint(self.script_name,"magenta",f"streamer_word : \n"+str(self.streamer_word)) # sprint(self.script_name,"magenta",f"streamer_word : \n"+str(len(self.streamer_word[0]))) if str(self.streamer_word) == "": sprint(self.script_name,"magenta",f"pas encore de sous titre on quitte génération") return prompt_gpt = ['tgpt','-q','-w', '"'+self.list_prompt[self.index_prompt]+' "'+ str(self.streamer_word)+'"'] sprint(self.script_name,"DARKCYAN","index preprompt : "+str(self.index_prompt)+"\n") sprint(self.script_name,"DARKCYAN","\nPrompt demandé : "+str(prompt_gpt)+"\n") process = subprocess.run(prompt_gpt, capture_output=True, text=True, check=False) text_generation = process.stdout.replace("'", "").replace('"', "").replace("\n", "") # sprint(self.script_name,"CYAN","réponse imaginé sans formatage : \n"+text_generation+"\n\n") #debug textfinal = self.clear_response(text_generation) # nombre_de_mots = len(textfinal.split()) # nombre_de_caracteres = len(textfinal) self.last_streamer_word = self.streamer_word sprint(self.script_name,"CYAN","réponse imaginé : \n"+textfinal+"\n\n") self.change_prompt() key = get_current_time() self.response_generation[str(key)] = textfinal storage.write(self.filename_memory, str(key), textfinal) #ecrire dans la mémoire fichier def setnew_streamer_word_text(self, streamer_word_text): self.streamer_word = streamer_word_text def get_last_values(self, n): values = list(self.subtitle.values()) return ''.join(values[-n:]) def main_ask(self, streamer_word_text): sprint(self.script_name,"blue", "imagine_response start") self.streamer_word = streamer_word_text imagine_response_thread = Thread(target=self.imagine_response) imagine_response_thread.start() def main_loop_ia(self): sprint(self.script_name,"blue", "main_loop_ia imagine_response start") time.sleep(20) try: while self.ia_running: time.sleep(self.loop_timer_ia) sprint(self.script_name,"yellow",f"wait {self.loop_timer_ia}s main_loop_ia imagine_response.") if(self.last_streamer_word != self.streamer_word): self.imagine_response() else: sprint(self.script_name,"yellow",f"génération déja créer") # except KeyboardInterrupt: # print("Arrêt du script.") except Exception as e: sprint(self.script_name,"red", "STOP main_loop_ia Error : "+ str(e)) def start_main_loop(self): """Lance dans un thread pour ne pas bloquer le code principal.""" self.ia_running = True self.loop_thread = threading.Thread(target=self.main_loop_ia) self.loop_thread.start() # Démarre dans un thread séparé ################### IA_GENERATOR TEXT FIN ######################## ################### sendmessageTwitch ######################## class messageTwitch: def __init__(self, config_user, channel_name): self.message_running = True self.script_name = "Control_Twitch" self.path_file_config_user = config_user self.channel_name = channel_name self.indexuser = 0 with open(self.path_file_config_user, 'r') as file: self.userjson = json.load(file) self.totaluser = len(self.userjson) self.tw_acc_pseudo = get_value_json_list(self.indexuser, "tw_acc_pseudo", self.userjson) self.tw_acc_token = get_value_json_list(self.indexuser, "tw_acc_token", self.userjson) self.charactere = get_value_json_list(self.indexuser, "charactere", self.userjson) self.generation_text = "" self.last_respond_word = "" def set_user(self,index_user): self.indexuser = index_user self.tw_acc_pseudo = get_value_json_list(self.indexuser, "tw_acc_pseudo", self.userjson) self.tw_acc_token = get_value_json_list(self.indexuser, "tw_acc_token", self.userjson) self.charactere = get_value_json_list(self.indexuser, "charactere", self.userjson) def stop(self): self.message_running = False def change_user(self): # commented for pausing if (self.totaluser != 1): # si la liste ne fait pas que 1 de taille if(self.totaluser > self.indexuser): # si la taille de liste est plus grande que lindex self.indexuser = self.indexuser + 1 else : self.indexuser = 0 self.tw_acc_pseudo = get_value_json_list(self.indexuser, "tw_acc_pseudo", self.userjson) self.tw_acc_token = get_value_json_list(self.indexuser, "tw_acc_token", self.userjson) self.charactere = get_value_json_list(self.indexuser, "charactere", self.userjson) def set_user(self,index_user): self.indexuser = index_user self.tw_acc_pseudo = get_value_json_list(self.indexuser, "tw_acc_pseudo", self.userjson) self.tw_acc_token = get_value_json_list(self.indexuser, "tw_acc_token", self.userjson) self.charactere = get_value_json_list(self.indexuser, "charactere", self.userjson) def conversation(self): user1 = 1 user2 = 2 pseudo1 = self.tw_acc_pseudo = get_value_json_list(user1, "tw_acc_pseudo", self.userjson) pseudo2 = self.tw_acc_pseudo = get_value_json_list(user2, "tw_acc_pseudo", self.userjson) self.send_message_user(user1,"yo @"+pseudo2+" ^^") sleep(10) self.send_message_user(user1,"Heyyy cool ? @"+pseudo1+"") def send_message_user(self,index_user, Message_text): hprint("cyan","start send_message_user") self.set_user(index_user) self.send_message(Message_text) hprint("cyan",f"Finnish send send_message_user index :" + str(index_user)) def send_message(self, Message_text): hprint("cyan","start send_message") command = '-pseudo "'+self.tw_acc_pseudo+'" -token "'+self.tw_acc_token+'" -twitchname "'+self.channel_name+'" -message " '+self.charactere+' '+Message_text+'"' message_tosend = 'python send_message.py '+ str(command) hprint("cyan",f"send message = "+str(message_tosend)) subprocess.run(message_tosend, shell=True) self.last_respond_word = Message_text hprint("cyan",f"Finnish send_message message") def get_last_generation(self): data = storage.read("IA_generator_" +get_current_date()) # Trie les clés et récupère la dernière sorted_keys = sorted(data.keys()) last_key = sorted_keys[-1] self.generation_text = data[last_key] def start_main_loop_respond(self): time.sleep(20) #wait starting script sprint(self.script_name,"blue", "main_loop_ia imagine_response start") try: while self.message_running: time.sleep(20) self.get_last_generation() if(self.last_respond_word != self.generation_text): sprint(self.script_name,"green",f"\n\n ENVOIE MESSAGE : \n "+self.generation_text) self.send_message(self.generation_text) # envoie de message self.last_respond_word = self.generation_text # mise a jour du dernier message envoyé self.change_user() # cahngement de user else: sprint(self.script_name,"yellow",f"génération déja créer") except KeyboardInterrupt: print("Arrêt du script.") except Exception as e: sprint(self.script_name,"red", "STOP main_loop_respond Error : "+ str(e)) def start_loop_respond(self): """Lance dans un thread pour ne pas bloquer le code principal.""" self.message_running = True self.loop_thread = threading.Thread(target=self.start_main_loop_respond) self.loop_thread.start() # Démarre dans un thread séparé ################### sendmessageTwitch FIN ########################