import os import signal import sys import argparse import json import time import pprint import random import requests import datetime import glob import threading # import keyboard from pynput import keyboard from threading import Thread, Semaphore from streamlink import Streamlink from fake_useragent import UserAgent from rich.console import Console from rich.live import Live from rich.prompt import Prompt from rich.spinner import Spinner from rich.table import Table from rich.text import Text import subprocess import pty console = Console() class MessageTwitch: def __init__(self, channel_name): with open("config.json", 'r') as file: self.config = json.load(file) self.channel_name = channel_name self.channel_url = "https://www.twitch.tv/" + self.channel_name self.dir_twitchrecord = "twitch-recordAudio" self.dir_whisperX = "whisperX" self.subtitle = {} self.all_subtitle = "" self.response_stream = "Hello !" self.current_record = "" self.language = self.get_value_json("language") self.list_prompt = self.get_value_json("list_prompt") self.bad_answer = self.get_value_json("bad_answer") self.tw_acc_pseudo = self.get_value_json("tw_acc_pseudo") self.tw_acc_token = self.get_value_json("tw_acc_token") def get_value_json(self, var_name): return self.config.get(var_name) def hprint(self,color, texte): timestamp = datetime.datetime.now().strftime("%Hh %Mm %Ss") console.print("[bold "+color+"] ["+timestamp+"] "+texte+" [/bold "+color+"]") def del_file(self, dossier, file): motif = os.path.join(dossier, file) for file in glob.glob(motif): os.remove(file) self.hprint("",f"file deleted : {file}") # Fonction pour convertir l'heure en secondes # def time_to_seconds(time_str): # h, m = map(int, time_str.split(':')) # return h * 3600 + m * 60 def time_to_seconds(time_str): h, m, s = map(int, time_str.split(':')) return h * 3600 + m * 60 + s # Fonction pour obtenir l'heure actuelle sous forme de chaîne def get_current_time(self): current_time = time.time() return time.strftime('%H:%M:%S', time.localtime(current_time)) # now = datetime.now() # return now.strftime("%H:%M") # Fonction pour rechercher dans le dictionnaire def search_dict(d, key): return d.get(key, "Key not found") def get_last_values(self, n): values = list(self.subtitle.values()) return ''.join(values[-n:]) def get_value_by_index(self, index): values = list(self.subtitle.values()) if index < len(values): return values[index] else: return None def verif_file_transcribe(self, dir): for file in os.listdir(dir): file_path = os.path.join(dir, file) if os.path.isfile(file_path): self.hprint("blue",f"File find -> transcribe.") # Exécuter une fonction sur le fichier self.auto_create_subtitle(file, dir) # Supprimer le fichier après le traitement os.remove(file_path) print(f"Fichier {file_path} supprimé.") def boucle_traitement(self, intervalle): count_loop = 0 try: while True: self.hprint("blue",f"wait {intervalle} loop main.") self.verif_file_transcribe("record") if (count_loop >= 4): self.hprint("blue",f"count_loop ==4 on lance gpt + message") count_loop = 0 imagine_response = Thread(target=self.imagine_response) imagine_response.start() imagine_response.join() # Wait for the recording to finish time.sleep(intervalle) # Attend 'intervalle' secondes avant la prochaine itération count_loop += 1 except KeyboardInterrupt: print("Arrêt du script.") def imagine_response(self): self.hprint("green",f"start imagine_response\n") streamer_word = "" self.hprint("blue",f"word in streamer_word : \n"+str(len(streamer_word))) if len(self.subtitle) >= 6: index = 6 streamer_word = self.get_last_values(index) while len(streamer_word) < 100: self.hprint("yellow",f"streamer_word to short add text\n") index=index+1 if len(self.subtitle) >= index: streamer_word = self.get_last_values(index) else: break elif len(self.subtitle) >= 4: streamer_word = self.get_last_values(4) elif len(self.subtitle) >= 2: streamer_word = self.get_last_values(2) else : streamer_word = self.all_subtitle values = list(self.subtitle.values()) index = len(values) while len(streamer_word) < 100 and index > 0: index -= 1 streamer_word += values[index] self.hprint("magenta","streamer_word : "+str(streamer_word)+"\n") load_reponse = True well_response = True countload = 0 while load_reponse: prompt_gpt = ['tgpt','-q','-w', '"'+self.list_prompt[countload]+' "'+ streamer_word+'"'] # self.hprint("DARKCYAN","\nPrompt demandé : "+str(prompt_gpt)+"\n") process = subprocess.run(prompt_gpt, capture_output=True, text=True, check=False) self.response_stream = process.stdout.replace("'", "").replace('"', "").replace("\n", "") # console.print("réponse imaginé : "+self.response_stream) nombre_de_mots = len(self.response_stream.split()) nombre_de_caracteres = len(self.response_stream) self.hprint("CYAN","réponse imaginé : "+self.response_stream+"\n\n") # load_reponse = False # if (self.string_contient(self.bad_answer, self.response_stream)): # console.print("[bold red] contient mot incorect [/bold red]\n") # countload+=1 # continue if (nombre_de_mots < 22): load_reponse = False console.print("[bold green]result create : "+self.response_stream+"[/bold green]\n\n") else: self.hprint("red","incorect result\n") if (countload > 3): self.hprint("red","tentative de load trop grosse\n") self.response_stream = "Kappa" load_reponse = False countload+=1 console.print("finish imagine_response") if (well_response): self.hprint("green","send message") send_message = Thread(target=self.send_message) send_message.start() send_message.join() # Wait for the recording to finish def send_message(self): self.hprint("green","startsend_message") command = '-pseudo "'+self.tw_acc_pseudo+'" -token "'+self.tw_acc_token+'" -twitchname "'+self.channel_name+'" -message " Kappa '+self.response_stream+'"' message_tosend = 'python send_message.py '+command console.print("[bold yellow]"+str(message_tosend)+"[/bold yellow]") subprocess.run(message_tosend, shell=True) console.print("finish send_message") def main(self): self.hprint("blue", "wait 30") time.sleep(30) self.hprint("blue", "start loop all") boucle_traitement = Thread(target=self.boucle_traitement(30)) boucle_traitement.start() # boucle_traitement.join() # Wait for the recording to finish # #console.print("[bold green]Enregistrement terminé, le programme va se terminer.[/bold green]")