568 lines
18 KiB
Python
568 lines
18 KiB
Python
import os
|
|
import signal
|
|
import sys
|
|
import argparse
|
|
import json
|
|
import time
|
|
import pprint
|
|
import random
|
|
import requests
|
|
import datetime
|
|
import glob
|
|
import threading
|
|
# import keyboard
|
|
from pynput import keyboard
|
|
from threading import Thread, Semaphore
|
|
from streamlink import Streamlink
|
|
from fake_useragent import UserAgent
|
|
from rich.console import Console
|
|
from rich.live import Live
|
|
from rich.prompt import Prompt
|
|
from rich.spinner import Spinner
|
|
from rich.table import Table
|
|
from rich.text import Text
|
|
import subprocess
|
|
import pty
|
|
|
|
console = Console()
|
|
|
|
def hprint(color, texte):
|
|
timestamp = datetime.datetime.now().strftime("%Hh %Mm %Ss")
|
|
console.print("[bold "+color+"] ["+timestamp+"] "+texte+" [/bold "+color+"]")
|
|
|
|
def del_file(dossier, file):
|
|
motif = os.path.join(dossier, file)
|
|
for file in glob.glob(motif):
|
|
os.remove(file)
|
|
hprint("",f"file deleted : {file}")
|
|
|
|
def get_value_json(var_name, config):
|
|
return config.get(var_name)
|
|
|
|
def get_value_json_list(index, key, json_data):
|
|
try:
|
|
return json_data[index][key]
|
|
except (IndexError, KeyError, TypeError):
|
|
return None
|
|
|
|
# Fonction pour obtenir l'heure actuelle sous forme de chaîne
|
|
def get_current_time():
|
|
current_time = time.time()
|
|
return time.strftime('%H:%M:%S', time.localtime(current_time))
|
|
# now = datetime.now()
|
|
# return now.strftime("%H:%M")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
################### RecordTwitch ########################
|
|
|
|
class RecordTwitch:
|
|
def __init__(self, channel_name, recordtime):
|
|
|
|
self.channel_name = channel_name
|
|
self.all_proxies = []
|
|
self.channel_url = "https://www.twitch.tv/" + self.channel_name
|
|
self.record_time = recordtime
|
|
self.dir_twitchrecord = "twitch-recordAudio"
|
|
self.process = None # Pour stocker le sous-processus en cours
|
|
|
|
|
|
if (self.record_time == -1):
|
|
self.record_time = 9999
|
|
|
|
def stop(self):
|
|
"""Arrête le processus d'enregistrement s'il est en cours."""
|
|
if self.process and self.process.poll() is None: # Vérifie si le processus est actif
|
|
hprint("yellow",f"Arrêt de l'enregistrement...")
|
|
self.process.terminate() # Envoie un signal de terminaison
|
|
try:
|
|
self.process.wait(timeout=5) # Attend la fin avec un délai de 5 secondes
|
|
except subprocess.TimeoutExpired:
|
|
hprint("RED",f"Le processus ne répond pas, forçage de l'arrêt.")
|
|
self.process.kill() # Forcer l'arrêt si le processus ne termine pas
|
|
hprint("yellow",f"Enregistrement arrêté.")
|
|
else:
|
|
hprint("green",f"Aucun enregistrement en cours.")
|
|
|
|
def hprint(self,color, texte):
|
|
timestamp = datetime.datetime.now().strftime("%Hh %Mm %Ss")
|
|
console.print("[bold "+color+"] ["+timestamp+"] "+texte+" [/bold "+color+"]")
|
|
|
|
|
|
# Fonction pour convertir l'heure en secondes
|
|
def time_to_seconds(time_str):
|
|
h, m, s = map(int, time_str.split(':'))
|
|
return h * 3600 + m * 60 + s
|
|
|
|
|
|
# Fonction pour rechercher dans le dictionnaire
|
|
def search_dict(d, key):
|
|
return d.get(key, "Key not found")
|
|
|
|
|
|
def main_record_twitch(self):
|
|
# Utiliser Popen à la place de run pour pouvoir arrêter le processus
|
|
try:
|
|
print(f"Lancement de l'enregistrement pour {self.channel_name} pendant {self.record_time} secondes.")
|
|
record_twitch = ['python', self.dir_twitchrecord+'/twitch_record_unlimited.py', '-twitchname', self.channel_name,'-recordtime',str(self.record_time)]
|
|
self.process = subprocess.Popen(record_twitch)
|
|
self.process.communicate() # Attend que le sous-processus termine
|
|
except Exception as e:
|
|
print(f"Erreur pendant l'enregistrement: {e}")
|
|
|
|
def start_recording(self):
|
|
"""Lance main_record_twitch dans un thread pour ne pas bloquer le code principal."""
|
|
self.thread = threading.Thread(target=self.main_record_twitch)
|
|
self.thread.start() # Démarre l'enregistrement dans un thread séparé
|
|
|
|
|
|
################### RecordTwitch FIN ########################
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
################### Subtitle ########################
|
|
|
|
|
|
class Subtitle_translation:
|
|
def __init__(self, pathjson):
|
|
self.pathjson = pathjson
|
|
self.reload_json()
|
|
self.dir_whisperX = "whisperX"
|
|
self.subtitle = {}
|
|
self.all_subtitle = ""
|
|
self.is_running = True
|
|
self.loop_timer = 30
|
|
self.language = get_value_json("language", self.config)
|
|
self.dir_record = "record"
|
|
|
|
def stop(self):
|
|
hprint("yellow",f"Arrêt loop subtitle")
|
|
self.is_running = False
|
|
if self.loop_thread.is_alive():
|
|
# self.loop_thread.join() # Attend la fin du thread pour un arrêt propre
|
|
hprint("red", "Boucle de traitement arrêtée.")
|
|
else :
|
|
hprint("yellow",f"loop subtitle déja arreté")
|
|
|
|
def reload_json(self):
|
|
with open(self.pathjson, 'r') as file:
|
|
self.config = json.load(file)
|
|
|
|
|
|
def verif_file_transcribe(self):
|
|
for file in os.listdir(self.dir_record):
|
|
file_path = os.path.join(self.dir_record, file)
|
|
if os.path.isfile(file_path):
|
|
hprint("blue",f"File find -> transcribe.")
|
|
# Exécuter une fonction sur le fichier
|
|
self.auto_create_subtitle(file)
|
|
|
|
# Supprimer le fichier après le traitement
|
|
os.remove(file_path)
|
|
print(f"Fichier {file_path} supprimé.")
|
|
|
|
|
|
def remove_repetitions(self, message):
|
|
"""
|
|
Supprime les répétitions dans un message en conservant uniquement le premier
|
|
exemple d'une phrase répétée.
|
|
|
|
:param message: Le message sous forme de chaîne de caractères
|
|
:return: Le message sans répétitions
|
|
"""
|
|
hprint("yellow", "remove_repetitions start for: \n" +message)
|
|
|
|
words = message.split() # Découpe le texte en mots
|
|
seen = set() # Pour suivre les mots déjà rencontrés
|
|
result = [] # Stocke les mots sans répétition
|
|
|
|
|
|
for word in words:
|
|
if word not in seen:
|
|
result.append(word)
|
|
seen.add(word)
|
|
|
|
hprint("green", "remove_repetitions cleaned text : \n" +" ".join(result))
|
|
return " ".join(result)## str this switch
|
|
|
|
def print_allsubtitle(self):
|
|
os.system('clear')
|
|
hprint("yellow", "\n\n All subtitle Print\n ")
|
|
for key, text in self.subtitle.items():
|
|
hprint("yellow", " time "+ str(key)+" = "+ text+" \n\n")
|
|
|
|
def get_lasttext(self):
|
|
# Vérifie que le dictionnaire n'est pas vide
|
|
if not self.subtitle:
|
|
hprint("red", "Aucun sous-titre disponible.")
|
|
return ""
|
|
|
|
# Récupère la dernière clé et valeur
|
|
last_time, last_text = list(self.subtitle.items())[-1]
|
|
hprint("magenta", f"Dernier sous-titre à {last_time} = {last_text}")
|
|
return last_text
|
|
|
|
|
|
|
|
def auto_create_subtitle(self,file):
|
|
hprint("green",f"start auto_create_subtitle")
|
|
del_file("","*.txt")
|
|
del_file("","*.srt")
|
|
del_file("","*.vtt")
|
|
del_file("","*.tsv")
|
|
del_file("","*.json")
|
|
|
|
# subtitle_directory = "subtitle"
|
|
# os.makedirs(subtitle_directory, exist_ok=True)
|
|
# os.chdir(subtitle_directory)
|
|
spead_find = ""
|
|
hprint("green", "translation : "+file+" ")
|
|
|
|
# whisperx --language fr --compute_type float32 ../record/final_output_audio.mp3
|
|
# record_twitch = ['whisperx', '--language',self.language, '--compute_type','float32', '../'+self.dir_record+"/"+file] #ancienne version
|
|
# record_twitch = ['whisper', '--language',self.language, '../'+self.dir_record+"/"+file,"--device","cuda"]
|
|
record_twitch = ['whisper', '--language',self.language, './'+self.dir_record+"/"+file,"--device","cuda", "--model","large-v3"]
|
|
|
|
console.print("[bold yellow] command : "+str(record_twitch)+" [/bold yellow]")
|
|
subprocess.run(record_twitch)
|
|
|
|
file_noext = file.rsplit('.', 1)[0]
|
|
with open(file_noext+".txt", "r") as thisfile:
|
|
spead_find += thisfile.read().replace("'", "").replace('"', "").replace("\n", " ").replace(",", "")
|
|
|
|
# parties = file_noext.split('_') # Sépare la base du nom et le numéro + extension
|
|
# num_ext = parties[-1] # Prend la dernière partie qui contient "007.mp3"
|
|
spead_find = self.remove_repetitions(spead_find)
|
|
self.all_subtitle += spead_find+"\n"
|
|
num_ext = get_current_time()
|
|
self.subtitle[str(num_ext)] = spead_find
|
|
hprint("yellow", "parole du streamer : \n" +spead_find)
|
|
|
|
# attention a No active speech found in audio
|
|
# os.chdir("../")
|
|
console.print("[bold PURPLE] finish create_subtitle [/bold PURPLE]")
|
|
|
|
def main_loop(self):
|
|
time.sleep(45)
|
|
hprint("green", "start main boucle_traitement record")
|
|
try:
|
|
while self.is_running:
|
|
hprint("blue",f"wait {self.loop_timer}s loop main translation.")
|
|
# Attend 'intervalle' secondes avant la prochaine itération
|
|
self.verif_file_transcribe()
|
|
time.sleep(self.loop_timer)
|
|
except KeyboardInterrupt:
|
|
print("Arrêt du script.")
|
|
|
|
# #console.print("[bold green]Enregistrement terminé, le programme va se terminer.[/bold green]")
|
|
|
|
def start_main_loop(self):
|
|
"""Lance dans un thread pour ne pas bloquer le code principal."""
|
|
self.loop_thread = threading.Thread(target=self.main_loop)
|
|
self.loop_thread.start() # Démarre dans un thread séparé
|
|
|
|
|
|
################### Subtitle FIN ########################
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
################### IA_GENERATOR TEXT ########################
|
|
|
|
|
|
class IA_generator:
|
|
def __init__(self,pathjson):
|
|
self.pathjson = pathjson
|
|
self.reload_json()
|
|
self.subtitle = {}
|
|
self.all_subtitle = ""
|
|
self.is_running = True
|
|
self.loop_timer = 60
|
|
self.streamer_word = ""
|
|
self.index_prompt = 0
|
|
self.response_generation = {}
|
|
self.list_prompt = get_value_json("list_prompt", self.config)
|
|
self.bad_answer = get_value_json("bad_answer", self.config)
|
|
|
|
def stop(self):
|
|
hprint("yellow",f"Arrêt loop subtitle")
|
|
self.is_running = False
|
|
if self.loop_thread.is_alive():
|
|
# self.loop_thread.join() # Attend la fin du thread pour un arrêt propre
|
|
hprint("red", "Boucle de traitement arrêtée.")
|
|
else :
|
|
hprint("yellow",f"loop subtitle déja arreté")
|
|
|
|
|
|
def change_prompt(self):
|
|
if self.index_prompt == len(self.list_prompt):
|
|
self.index_prompt = 0
|
|
else :
|
|
self.index_prompt += 1
|
|
|
|
|
|
def reload_json(self):
|
|
with open(self.pathjson, 'r') as file:
|
|
self.config = json.load(file)
|
|
|
|
|
|
def get_last_values(self, n):
|
|
values = list(self.subtitle.values())
|
|
return ''.join(values[-n:])
|
|
|
|
def print_allgeneration(self):
|
|
os.system('clear')
|
|
hprint("yellow", "\n\n Print All response_generation \n ")
|
|
for key, text in self.response_generation.items():
|
|
hprint("yellow", " key = "+ str(key)+" = "+ text+" \n\n")
|
|
|
|
def getlast_generation(self):
|
|
# return self.response_generation
|
|
if not self.response_generation:
|
|
hprint("red", "Aucun response_generation disponible.")
|
|
return ""
|
|
# Récupère la dernière clé et valeur
|
|
key, last_generation = list(self.response_generation.items())[-1]
|
|
hprint("magenta", f"Dernier response_generation key : {key} = {last_generation}")
|
|
return last_generation
|
|
|
|
def clear_response(self, text):
|
|
# Vérifie si ":" est dans la chaîne de caractères
|
|
if ":" in text:
|
|
# Sépare la chaîne et récupère la deuxième partie
|
|
return text.split(":", 1)[1].strip()
|
|
else:
|
|
# Retourne la chaîne telle quelle si ":" n'est pas présent
|
|
return text
|
|
|
|
def imagine_response(self):
|
|
hprint("green",f"start imagine_response\n")
|
|
# hprint("magenta",f"streamer_word : \n"+str(self.streamer_word))
|
|
# hprint("magenta",f"streamer_word : \n"+str(len(self.streamer_word[0])))
|
|
if str(self.streamer_word) == "":
|
|
hprint("magenta",f"pas encore de sosu titre on quitte génération")
|
|
return
|
|
|
|
prompt_gpt = ['tgpt','-q','-w', '"'+self.list_prompt[self.index_prompt]+' "'+ str(self.streamer_word)+'"']
|
|
|
|
hprint("DARKCYAN","index preprompt : "+str(self.index_prompt)+"\n")
|
|
hprint("DARKCYAN","\nPrompt demandé : "+str(prompt_gpt)+"\n")
|
|
process = subprocess.run(prompt_gpt, capture_output=True, text=True, check=False)
|
|
text_generation = process.stdout.replace("'", "").replace('"', "").replace("\n", "")
|
|
# hprint("CYAN","réponse imaginé sans formatage : \n"+text_generation+"\n\n") #debug
|
|
textfinal = self.clear_response(text_generation)
|
|
|
|
# nombre_de_mots = len(textfinal.split())
|
|
# nombre_de_caracteres = len(textfinal)
|
|
hprint("CYAN","réponse imaginé : \n"+textfinal+"\n\n")
|
|
self.change_prompt()
|
|
|
|
key = get_current_time()
|
|
self.response_generation[str(key)] = textfinal
|
|
|
|
def setnew_streamer_word_text(self, streamer_word_text):
|
|
self.streamer_word = streamer_word_text
|
|
|
|
def get_last_values(self, n):
|
|
values = list(self.subtitle.values())
|
|
return ''.join(values[-n:])
|
|
|
|
def main_ask(self, streamer_word_text):
|
|
hprint("blue", "imagine_response start")
|
|
self.streamer_word = streamer_word_text
|
|
imagine_response_thread = Thread(target=self.imagine_response)
|
|
imagine_response_thread.start()
|
|
|
|
def main_loop(self):
|
|
hprint("blue", "main_loop imagine_response start")
|
|
time.sleep(100)
|
|
try:
|
|
while self.is_running:
|
|
hprint("blue",f"wait {self.loop_timer}s loop main imagine_response.")
|
|
self.imagine_response()
|
|
time.sleep(self.loop_timer)
|
|
except KeyboardInterrupt:
|
|
print("Arrêt du script.")
|
|
|
|
def start_main_loop(self):
|
|
"""Lance dans un thread pour ne pas bloquer le code principal."""
|
|
self.loop_thread = threading.Thread(target=self.main_loop)
|
|
self.loop_thread.start() # Démarre dans un thread séparé
|
|
|
|
|
|
|
|
|
|
################### IA_GENERATOR TEXT FIN ########################
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
################### sendmessageTwitch ########################
|
|
|
|
class messageTwitch:
|
|
def __init__(self, channel_name):
|
|
|
|
self.channel_name = channel_name
|
|
self.indexuser = 0
|
|
with open("../config/user.json", 'r') as file:
|
|
self.userjson = json.load(file)
|
|
self.totaluser = len(self.userjson)
|
|
|
|
self.tw_acc_pseudo = get_value_json_list(self.indexuser, "tw_acc_pseudo", self.userjson)
|
|
self.tw_acc_token = get_value_json_list(self.indexuser, "tw_acc_token", self.userjson)
|
|
self.charactere = get_value_json_list(self.indexuser, "charactere", self.userjson)
|
|
|
|
|
|
def set_user(self,index_user):
|
|
self.indexuser = index_user
|
|
self.tw_acc_pseudo = get_value_json_list(self.indexuser, "tw_acc_pseudo", self.userjson)
|
|
self.tw_acc_token = get_value_json_list(self.indexuser, "tw_acc_token", self.userjson)
|
|
self.charactere = get_value_json_list(self.indexuser, "charactere", self.userjson)
|
|
|
|
|
|
|
|
def change_user(self):
|
|
# commented for pausing
|
|
if (self.totaluser != 1): # si la liste ne fait que 1 de taille
|
|
if(self.totaluser+1 > self.indexuser): # si la taille de liste est plus grande que lindex
|
|
self.indexuser = self.indexuser + 1
|
|
else :
|
|
self.indexuser = 0
|
|
|
|
self.tw_acc_pseudo = get_value_json_list(self.indexuser, "tw_acc_pseudo", self.userjson)
|
|
self.tw_acc_token = get_value_json_list(self.indexuser, "tw_acc_token", self.userjson)
|
|
self.charactere = get_value_json_list(self.indexuser, "charactere", self.userjson)
|
|
|
|
|
|
|
|
def set_user(self,index_user):
|
|
self.indexuser = index_user
|
|
self.tw_acc_pseudo = get_value_json_list(self.indexuser, "tw_acc_pseudo", self.userjson)
|
|
self.tw_acc_token = get_value_json_list(self.indexuser, "tw_acc_token", self.userjson)
|
|
self.charactere = get_value_json_list(self.indexuser, "charactere", self.userjson)
|
|
|
|
|
|
def conversation(self):
|
|
user1 = 1
|
|
user2 = 2
|
|
pseudo1 = self.tw_acc_pseudo = get_value_json_list(user1, "tw_acc_pseudo", self.userjson)
|
|
pseudo2 = self.tw_acc_pseudo = get_value_json_list(user2, "tw_acc_pseudo", self.userjson)
|
|
|
|
self.send_message_user(user1,"yo @"+pseudo2+" ^^")
|
|
sleep(10)
|
|
self.send_message_user(user1,"Heyyy cool ? @"+pseudo1+"")
|
|
|
|
|
|
def send_message_user(self,index_user, Message_text):
|
|
hprint("green","start send_message_user")
|
|
self.set_user(index_user)
|
|
command = '-pseudo "'+self.tw_acc_pseudo+'" -token "'+self.tw_acc_token+'" -twitchname "'+self.channel_name+'" -message " '+self.charactere+' '+Message_text+'"'
|
|
message_tosend = 'python send_message.py '+command
|
|
hprint("yellow",f"send message = "+str(message_tosend))
|
|
subprocess.run(message_tosend, shell=True)
|
|
hprint("yellow",f"Finnish send message")
|
|
|
|
|
|
def send_message(self, Message_text):
|
|
hprint("green","start send_message")
|
|
|
|
command = '-pseudo "'+self.tw_acc_pseudo+'" -token "'+self.tw_acc_token+'" -twitchname "'+self.channel_name+'" -message " '+self.charactere+' '+Message_text+'"'
|
|
message_tosend = 'python send_message.py '+command
|
|
hprint("yellow",f"send message = "+str(message_tosend))
|
|
subprocess.run(message_tosend, shell=True)
|
|
hprint("yellow",f"Finnish send message")
|
|
|
|
|
|
|
|
################### sendmessageTwitch FIN ########################
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|