Files
twitchBot-intelligent/fonction/first_class.py
T
2025-01-10 15:53:17 +01:00

666 lines
23 KiB
Python

import os
import signal
import sys
import argparse
import json
import time
import pprint
import random
import requests
import shutil
import datetime
import glob
import threading
# import keyboard
from pynput import keyboard
from threading import Thread, Semaphore
from streamlink import Streamlink
from fake_useragent import UserAgent
from rich.console import Console
from rich.live import Live
from rich.prompt import Prompt
from rich.spinner import Spinner
from rich.table import Table
from rich.text import Text
import subprocess
import pty
console = Console()
def hprint(color, texte):
timestamp = datetime.datetime.now().strftime("%Hh %Mm %Ss")
console.print("[bold "+color+"] ["+timestamp+"] "+texte+" [/bold "+color+"]")
def sprint(script_name,color, texte):
timestamp = datetime.datetime.now().strftime("%Hh %Mm %Ss")
console.print("[bold "+color+"] ["+timestamp+"] ("+script_name+") "+texte+" [/bold "+color+"]")
def del_file(dossier, file):
motif = os.path.join(dossier, file)
for file in glob.glob(motif):
os.remove(file)
hprint("",f"file deleted : {file}")
def get_value_json(var_name, config):
return config.get(var_name)
def get_value_json_list(index, key, json_data):
try:
return json_data[index][key]
except (IndexError, KeyError, TypeError):
return None
# Fonction pour obtenir l'heure actuelle sous forme de chaîne
def get_current_time():
current_time = time.time()
return time.strftime('%H:%M:%S', time.localtime(current_time))
# now = datetime.now()
# return now.strftime("%H:%M")
################### RecordTwitch ########################
class RecordTwitch:
def __init__(self, channel_name, record_time):
self.channel_name = channel_name
self.channel_url = "https://www.twitch.tv/" + self.channel_name
self.record_time = record_time
self.max_timerecordfile = 60
self.running = True
self.script_name = "twitch_record"
if (self.record_time == -1):
self.record_time = 9999
def stop(self):
sprint(self.script_name,"magenta","STOPING RecordTwitch")
self.running = False
def del_mp3(self, dossier):
motif = os.path.join(dossier, '*.mp3')
for file in glob.glob(motif):
os.remove(file)
sprint(self.script_name,"yellow",f"file deleted : {file}")
def clear_diretory(self, dir_inrecord="in_record", dir_record="record"):
#sprint(self.script_name,'green', 'clear_diretory start')
if os.path.exists(dir_inrecord) and os.path.exists(dir_record):
sprint(self.script_name,'green', f"dir {dir_inrecord} and {dir_record} exist clearing")
self.del_mp3(dir_inrecord)
self.del_mp3(dir_record)
def create_session(self):
# Session creating for request
self.ua = UserAgent()
self.session = Streamlink()
self.session.set_option("http-headers", {
"Accept-Language": "en-US,en;q=0.5",
"Connection": "keep-alive",
"DNT": "1",
"Upgrade-Insecure-Requests": "1",
"User-Agent": self.ua.random,
"Client-ID": "your_client_id", # Replace with your actual Client-ID
"Referer": "https://www.google.com/"
})
def get_url(self):
sprint(self.script_name,"green","start get_url")
url = ""
self.create_session()
try:
streams = self.session.streams(self.channel_url)
url = streams['audio_only'].url if 'audio_only' in streams else streams['worst'].url
sprint(self.script_name,"cyan","url generate : " + url)
except Exception as e:
sprint(self.script_name,"red","Error on get url : "+str(e))
# traceback.print_exc() # Affiche les détails de l'erreur
pass
return url
def loop_run(self, intervalle):#boucle pour déplacer les fichier fini enregistrement
time.sleep(30) # attente début du script complet et enregistrement
try:
while self.running:
self.verif_record_move()
sprint(self.script_name,"yellow", f"wait {intervalle}s for next scan file completed.")
time.sleep(intervalle)
#except KeyboardInterrupt:
except Exception as e:
sprint(self.script_name,"red", "STOP loop_run Error : "+ str(e))
finally:
self.running = False # Assurez-vous que le drapeau est désactivé
def verif_record_move(self, dir_inrecord="in_record", dir_record="record"):
sprint(self.script_name,"green", "start verif_record_move")
if not os.path.exists(dir_record):
os.makedirs(dir_record)
sprint(self.script_name,"green", "création du dossier : "+dir_record)
fichiers = [f for f in os.listdir(dir_inrecord) if os.path.isfile(os.path.join(dir_inrecord, f))]
if len(fichiers) > 1:
# sort file
sort_file = sorted(fichiers, key=lambda x: int(x.split('_')[-1].split('.')[0]))
fileto_move = sort_file[0]
chemin_source = os.path.join(dir_inrecord, fileto_move)
chemin_destination = os.path.join(dir_record, fileto_move)
# move file
shutil.move(chemin_source, chemin_destination)
sprint(self.script_name,"green",f"File moved: {fileto_move}")
else:
sprint(self.script_name,"yellow","Not enough files to compare.")
def compteur(self ):
seconds = 0
loop = 0
while self.running:
time.sleep(1) # Attend une seconde
if (seconds < 10):
print(f"\033[94mRecording time: 0{seconds}s | file : {loop}\033[0m", end='\r', flush=True) # Réinitialise la ligne à chaque fois
else:
print(f"\033[94mRecording time: {seconds}s | file : {loop}\033[0m", end='\r', flush=True) # Réinitialise la ligne à chaque fois
seconds += 1
if seconds == self.max_timerecordfile:
loop +=1
seconds = 0 # Réinitialise le compteur après 60 seconds
def record_audio(self):
output_directory = "record"
in_record_directory = "in_record"
os.makedirs(output_directory, exist_ok=True)
os.makedirs(in_record_directory, exist_ok=True)
timestamp_complet = datetime.datetime.now().strftime("%Y%m%d-%H%M%S")
timestamp = datetime.datetime.now().strftime("%Y-%m-%d")
output_file_path = os.path.join(in_record_directory, f"{timestamp}_%03d.mp3")
sprint(self.script_name,"green", f"start record")
thread_compteur = Thread(target=self.compteur, daemon=True)
thread_compteur.start()
command = ['ffmpeg','-i', self.stream_url,'-vn','-acodec','libmp3lame','-ar','44100','-ac','2','-map','0:a','-f','segment','-segment_time',str(self.max_timerecordfile),'-segment_format','mp3',output_file_path,'-loglevel', 'error']
# sprint(self.script_name,'yellow',str(command))
process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
output, error = process.communicate()
# thread_compteur.do_run = False
thread_compteur.join()
def main(self):
self.running = True
self.clear_diretory()
self.stream_url = self.get_url()
if not self.stream_url:
sprint(self.script_name,"red","Impossible de récupérer l'URL du flux")
return
record_thread = Thread(target=self.record_audio)
record_thread.start()
# record_thread.join() # Wait for the recording to finish
loop_run = Thread(target=self.loop_run, args=(20,), daemon=True)
loop_run.start()
#loop_run.join() # Wait for the recording to finish
sprint(self.script_name,"cyan","Enregistrement terminé, le programme va se terminer.")
################## RecordTwitch FIN ########################
################### Subtitle ########################
class Subtitle_translation:
def __init__(self, pathjson):
self.script_name = "translation"
self.pathjson = pathjson
self.reload_json()
self.dir_whisperX = "whisperX"
self.subtitle = {}
self.all_subtitle = ""
self.is_running = True
self.loop_timer = 20
self.language = get_value_json("language", self.config)
self.dir_record = "record"
def stop(self):
sprint(self.script_name,"yellow",f"Arrêt loop subtitle")
self.is_running = False
if self.loop_thread.is_alive():
# self.loop_thread.join() # Attend la fin du thread pour un arrêt propre
sprint(self.script_name,"red", "Boucle de traitement arrêtée.")
else :
sprint(self.script_name,"yellow",f"loop subtitle déja arreté")
def reload_json(self):
with open(self.pathjson, 'r') as file:
self.config = json.load(file)
def verif_file_transcribe(self):
for file in os.listdir(self.dir_record):
file_path = os.path.join(self.dir_record, file)
if os.path.isfile(file_path):
sprint(self.script_name,"blue",f"File find -> transcribe.")
# Exécuter une fonction sur le fichier
self.auto_create_subtitle(file)
# Supprimer le fichier après le traitement
os.remove(file_path)
print(f"Fichier {file_path} supprimé.")
def remove_repetitions(self, message):
"""
Supprime les répétitions dans un message en conservant uniquement le premier
exemple d'une phrase répétée.
:param message: Le message sous forme de chaîne de caractères
:return: Le message sans répétitions
"""
sprint(self.script_name,"yellow", "remove_repetitions start for: \n" +message)
words = message.split() # Découpe le texte en mots
seen = set() # Pour suivre les mots déjà rencontrés
result = [] # Stocke les mots sans répétition
for word in words:
if word not in seen:
result.append(word)
seen.add(word)
sprint(self.script_name,"green", "remove_repetitions cleaned text : \n" +" ".join(result))
return " ".join(result)## str this switch
def print_allsubtitle(self):
os.system('clear')
sprint(self.script_name,"yellow", "\n\n All subtitle Print\n ")
for key, text in self.subtitle.items():
sprint(self.script_name,"yellow", " time "+ str(key)+" = "+ text+" \n\n")
def get_lasttext(self):
# Vérifie que le dictionnaire n'est pas vide
if not self.subtitle:
sprint(self.script_name,"red", "Aucun sous-titre disponible.")
return ""
# Récupère la dernière clé et valeur
last_time, last_text = list(self.subtitle.items())[-1]
sprint(self.script_name,"magenta", f"Dernier sous-titre à {last_time} = {last_text}")
return last_text
def auto_create_subtitle(self,file):
sprint(self.script_name,"green",f"start auto_create_subtitle")
del_file("","*.txt")
del_file("","*.srt")
del_file("","*.vtt")
del_file("","*.tsv")
del_file("","*.json")
# subtitle_directory = "subtitle"
# os.makedirs(subtitle_directory, exist_ok=True)
# os.chdir(subtitle_directory)
spead_find = ""
sprint(self.script_name,"green", "translation : "+file+" ")
# whisperx --language fr --compute_type float32 ../record/final_output_audio.mp3
# record_twitch = ['whisperx', '--language',self.language, '--compute_type','float32', '../'+self.dir_record+"/"+file] #ancienne version
# record_twitch = ['whisper', '--language',self.language, '../'+self.dir_record+"/"+file,"--device","cuda"]
record_twitch = ['whisper', '--language',self.language, './'+self.dir_record+"/"+file,"--device","cuda", "--model","large-v3"]
console.print("[bold yellow] command : "+str(record_twitch)+" [/bold yellow]")
subprocess.run(record_twitch)
file_noext = file.rsplit('.', 1)[0]
with open(file_noext+".txt", "r") as thisfile:
spead_find += thisfile.read().replace("'", "").replace('"', "").replace("\n", " ").replace(",", "")
# parties = file_noext.split('_') # Sépare la base du nom et le numéro + extension
# num_ext = parties[-1] # Prend la dernière partie qui contient "007.mp3"
spead_find = self.remove_repetitions(spead_find)
self.all_subtitle += spead_find+"\n"
num_ext = get_current_time()
self.subtitle[str(num_ext)] = spead_find
sprint(self.script_name,"yellow", "parole du streamer : \n" +spead_find)
# attention a No active speech found in audio
# os.chdir("../")
sprint(self.script_name,"green","finish create_subtitle")
def main_loop(self):
time.sleep(45)
sprint(self.script_name,"green", "start main boucle_traitement record")
try:
while self.is_running:
sprint(self.script_name,"blue",f"wait {self.loop_timer}s loop main translation.")
# Attend 'intervalle' secondes avant la prochaine itération
self.verif_file_transcribe()
time.sleep(self.loop_timer)
# except KeyboardInterrupt:
# print("Arrêt du script.")
except Exception as e:
sprint(self.script_name,"red", "STOP main_loop Error : "+ str(e))
# #console.print("[bold green]Enregistrement terminé, le programme va se terminer.[/bold green]")
def start_main_loop(self):
"""Lance dans un thread pour ne pas bloquer le code principal."""
self.loop_thread = threading.Thread(target=self.main_loop)
self.loop_thread.start() # Démarre dans un thread séparé
################### Subtitle FIN ########################
################### IA_GENERATOR TEXT ########################
class IA_generator:
def __init__(self,pathjson):
self.script_name = "IA_generator"
self.pathjson = pathjson
self.reload_json()
self.subtitle = {}
self.all_subtitle = ""
self.ia_running = True
self.loop_timer_ia = 20
self.streamer_word = ""
self.last_streamer_word = None
self.index_prompt = 0
self.response_generation = {}
self.list_prompt = get_value_json("list_prompt", self.config)
self.bad_answer = get_value_json("bad_answer", self.config)
def stop(self):
sprint(self.script_name,"yellow",f"Arrêt loop subtitle")
self.ia_running = False
if self.loop_thread.is_alive():
# self.loop_thread.join() # Attend la fin du thread pour un arrêt propre
sprint(self.script_name,"red", "Boucle de traitement arrêtée.")
else :
sprint(self.script_name,"yellow",f"loop subtitle déja arreté")
def change_prompt(self):
self.index_prompt = len(self.list_prompt) -1
# if self.index_prompt == len(self.list_prompt):
# self.index_prompt = 0
# else :
# self.index_prompt += 1
def reload_json(self):
with open(self.pathjson, 'r') as file:
self.config = json.load(file)
def get_last_values(self, n):
values = list(self.subtitle.values())
return ''.join(values[-n:])
def print_allgeneration(self):
os.system('clear')
sprint(self.script_name,"yellow", "\n\n Print All response_generation \n ")
for key, text in self.response_generation.items():
sprint(self.script_name,"yellow", " key = "+ str(key)+" = "+ text+" \n\n")
def getlast_generation(self):
# return self.response_generation
if not self.response_generation:
sprint(self.script_name,"red", "Aucun response_generation disponible.")
return ""
# Récupère la dernière clé et valeur
key, last_generation = list(self.response_generation.items())[-1]
sprint(self.script_name,"magenta", f"Dernier response_generation key : {key} = {last_generation}")
return last_generation
def clear_response(self, text):
# Vérifie si ":" est dans la chaîne de caractères
if ":" in text:
# Sépare la chaîne et récupère la deuxième partie
return text.split(":", 1)[1].strip()
else:
# Retourne la chaîne telle quelle si ":" n'est pas présent
return text
def imagine_response(self):
sprint(self.script_name,"green",f"start imagine_response\n")
# sprint(self.script_name,"magenta",f"streamer_word : \n"+str(self.streamer_word))
# sprint(self.script_name,"magenta",f"streamer_word : \n"+str(len(self.streamer_word[0])))
if str(self.streamer_word) == "":
sprint(self.script_name,"magenta",f"pas encore de sous titre on quitte génération")
return
prompt_gpt = ['tgpt','-q','-w', '"'+self.list_prompt[self.index_prompt]+' "'+ str(self.streamer_word)+'"']
sprint(self.script_name,"DARKCYAN","index preprompt : "+str(self.index_prompt)+"\n")
sprint(self.script_name,"DARKCYAN","\nPrompt demandé : "+str(prompt_gpt)+"\n")
process = subprocess.run(prompt_gpt, capture_output=True, text=True, check=False)
text_generation = process.stdout.replace("'", "").replace('"', "").replace("\n", "")
# sprint(self.script_name,"CYAN","réponse imaginé sans formatage : \n"+text_generation+"\n\n") #debug
textfinal = self.clear_response(text_generation)
# nombre_de_mots = len(textfinal.split())
# nombre_de_caracteres = len(textfinal)
self.last_streamer_word = self.streamer_word
sprint(self.script_name,"CYAN","réponse imaginé : \n"+textfinal+"\n\n")
self.change_prompt()
key = get_current_time()
self.response_generation[str(key)] = textfinal
def setnew_streamer_word_text(self, streamer_word_text):
self.streamer_word = streamer_word_text
def get_last_values(self, n):
values = list(self.subtitle.values())
return ''.join(values[-n:])
def main_ask(self, streamer_word_text):
sprint(self.script_name,"blue", "imagine_response start")
self.streamer_word = streamer_word_text
imagine_response_thread = Thread(target=self.imagine_response)
imagine_response_thread.start()
def main_loop_ia(self):
sprint(self.script_name,"blue", "main_loop_ia imagine_response start")
time.sleep(20)
try:
while self.ia_running:
time.sleep(self.loop_timer_ia)
sprint(self.script_name,"yellow",f"wait {self.loop_timer_ia}s main_loop_ia imagine_response.")
if(self.last_streamer_word != self.streamer_word):
self.imagine_response()
else:
sprint(self.script_name,"yellow",f"génération déja créer")
# except KeyboardInterrupt:
# print("Arrêt du script.")
except Exception as e:
sprint(self.script_name,"red", "STOP main_loop_ia Error : "+ str(e))
def start_main_loop(self):
"""Lance dans un thread pour ne pas bloquer le code principal."""
self.ia_running = True
self.loop_thread = threading.Thread(target=self.main_loop_ia)
self.loop_thread.start() # Démarre dans un thread séparé
################### IA_GENERATOR TEXT FIN ########################
################### sendmessageTwitch ########################
class messageTwitch:
def __init__(self, channel_name):
self.channel_name = channel_name
self.indexuser = 0
with open("../config/user.json", 'r') as file:
self.userjson = json.load(file)
self.totaluser = len(self.userjson)
self.tw_acc_pseudo = get_value_json_list(self.indexuser, "tw_acc_pseudo", self.userjson)
self.tw_acc_token = get_value_json_list(self.indexuser, "tw_acc_token", self.userjson)
self.charactere = get_value_json_list(self.indexuser, "charactere", self.userjson)
def set_user(self,index_user):
self.indexuser = index_user
self.tw_acc_pseudo = get_value_json_list(self.indexuser, "tw_acc_pseudo", self.userjson)
self.tw_acc_token = get_value_json_list(self.indexuser, "tw_acc_token", self.userjson)
self.charactere = get_value_json_list(self.indexuser, "charactere", self.userjson)
def change_user(self):
# commented for pausing
if (self.totaluser != 1): # si la liste ne fait pas que 1 de taille
if(self.totaluser > self.indexuser): # si la taille de liste est plus grande que lindex
self.indexuser = self.indexuser + 1
else :
self.indexuser = 0
self.tw_acc_pseudo = get_value_json_list(self.indexuser, "tw_acc_pseudo", self.userjson)
self.tw_acc_token = get_value_json_list(self.indexuser, "tw_acc_token", self.userjson)
self.charactere = get_value_json_list(self.indexuser, "charactere", self.userjson)
def set_user(self,index_user):
self.indexuser = index_user
self.tw_acc_pseudo = get_value_json_list(self.indexuser, "tw_acc_pseudo", self.userjson)
self.tw_acc_token = get_value_json_list(self.indexuser, "tw_acc_token", self.userjson)
self.charactere = get_value_json_list(self.indexuser, "charactere", self.userjson)
def conversation(self):
user1 = 1
user2 = 2
pseudo1 = self.tw_acc_pseudo = get_value_json_list(user1, "tw_acc_pseudo", self.userjson)
pseudo2 = self.tw_acc_pseudo = get_value_json_list(user2, "tw_acc_pseudo", self.userjson)
self.send_message_user(user1,"yo @"+pseudo2+" ^^")
sleep(10)
self.send_message_user(user1,"Heyyy cool ? @"+pseudo1+"")
def send_message_user(self,index_user, Message_text):
hprint("green","start send_message_user")
self.set_user(index_user)
command = '-pseudo "'+self.tw_acc_pseudo+'" -token "'+self.tw_acc_token+'" -twitchname "'+self.channel_name+'" -message " '+self.charactere+' '+Message_text+'"'
message_tosend = 'python send_message.py '+command
hprint("yellow",f"send message = "+str(message_tosend))
subprocess.run(message_tosend, shell=True)
hprint("yellow",f"Finnish send send_message_user index :" + str(index_user))
def send_message(self, Message_text):
hprint("green","start send_message")
command = '-pseudo "'+self.tw_acc_pseudo+'" -token "'+self.tw_acc_token+'" -twitchname "'+self.channel_name+'" -message " '+self.charactere+' '+Message_text+'"'
message_tosend = 'python send_message.py '+command
hprint("yellow",f"send message = "+str(message_tosend))
subprocess.run(message_tosend, shell=True)
hprint("yellow",f"Finnish send_message message")
################### sendmessageTwitch FIN ########################