Files
Foufure13 91c8ac5a3a upate
2025-04-07 13:26:43 +02:00

232 lines
7.7 KiB
Python

import os
import sys
import argparse
import json
import time
import pprint
import random
import requests
import datetime
import glob
import threading
# import keyboard
from pynput import keyboard
from threading import Thread, Semaphore
from streamlink import Streamlink
from fake_useragent import UserAgent
from rich.console import Console
from rich.live import Live
from rich.prompt import Prompt
from rich.spinner import Spinner
from rich.table import Table
from rich.text import Text
import subprocess
import pty
# import win32console
import multiprocessing
#import from local file 4 main class
from fonction.first_class import RecordTwitch
from fonction.first_class import Subtitle_translation
from fonction.first_class import IA_generator
from fonction.first_class import messageTwitch
from fonction.first_class import TwitchChatBot
#import from local file secondary fonction
from fonction.second_fonction import *
# def rh():
# os.system('killall python')
# sys.exit(0)
console = Console()
def stop_all():
hprint("blue", "Arrêt du bot twitch.")
if 'recordTw' in globals() and recordTw:
recordTw.stop()
if 'bot' in globals() and bot:
bot.stop()
if 'sb_translation' in globals() and sb_translation:
sb_translation.stop()
if 'ask_text' in globals() and ask_text:
ask_text.stop()
if 'controluser' in globals() and controluser:
controluser.stop()
def print_help():
hprint("PURPLE", "Touche 'F5'BOT Active l'écoute clavier")
hprint("PURPLE", "Touche 'F6'BOT Désactive l'écoute clavier")
hprint("PURPLE", "Touche h : pour afficher les commandes")
hprint("PURPLE", "Touche q : pour quitter le sript")
hprint("PURPLE", "Touche c : Clear le terminal")
hprint("PURPLE", "Touche s : envoie un message dans le tchat twitch avec la derniere génération")
hprint("PURPLE", "Touche u : pour stopper lenregistrement twitch")
# hprint("PURPLE", "Touche j : pour start lenregistrement twitch") # pas set
# hprint("PURPLE", "Touche i : pour start la traduction en text du record") # pas set
hprint("PURPLE", "Touche p : pour stopper la traduction en text du record")
hprint("PURPLE", "Touche o : liste tout les sous titre créé")
hprint("PURPLE", "Touche k : lancer une generation de reponse avec ia depuis les traduction")
hprint("PURPLE", "Touche l : liste toute les génération")
hprint("PURPLE", "Touche m : change le prompt a donner a l'ia")
def del_pathfile(file_path):
try:
os.remove(file_path)
hprint("green", "Fichier Suprimé : " +file_path)
except Exception as e:
hprint("red", "del_file Error : "+ str(e))
# Supprimer le fichier après le traitement
def start_keyboard_listener():
global listening_keyboard # Déclare que tu utilises la variable globale
listening_keyboard = True # Initialisation de la variable globale
with keyboard.Listener(on_press=on_press) as listener:
listener.join()
def stop_keyboard_listener():
global listening_keyboard # Utilisation de la variable globale
listening_keyboard = False # Changer la valeur de la variable globale
# D'autres opérations pour arrêter le listener si nécessaire
def on_press(key):
global listening_keyboard
global main_loop
try:
if key == keyboard.Key.f5:
hprint("yellow", "Touche 'F5'BOT Active l'écoute clavier")
listening_keyboard = True
elif key == keyboard.Key.f6:
hprint("yellow", "Touche 'F6'BOT Désactive l'écoute clavier")
listening_keyboard = False
elif key.char == 'h':
hprint("cyan", "Touche 'h' détectée. Help")
print_help()
if listening_keyboard:
if key.char == 'q': # Si la touche 'q' est pressée, arrête l'écoute
hprint("cyan", "Touche 'q' détectée. Fin du programme.")
stop_all()
main_loop = False
return False # Cela arrêtera l'écouteur
elif key.char == 'c':
hprint("cyan", "Touche 'c' clear terminal")
clear_screen()
elif key.char == 'u':
hprint("cyan", "Touche 'u' détectée. Arrêt de l'enregistrement.")
recordTw.stop() # Arrêter l'enregistrement
elif key.char == 'p':
hprint("cyan", "Touche 'p' détectée. Arrêt de la traduction en text.")
sb_translation.stop() # Arrêter l'enregistrement
elif key.char == 'k':
hprint("cyan", "Touche 'k' imagination réponse")
generation_responce()
elif key.char == 's':
hprint("cyan", "Touche 's' Envoie du message")
Message_text = ask_text.getlast_generation()
controluser.send_message(Message_text)
elif key.char == 'o':
hprint("cyan", "Touche 'o'liste sous titre")
sb_translation.print_allsubtitle()
elif key.char == 'l':
hprint("cyan", "Touche 'l' liste génération")
ask_text.print_allgeneration()
elif key.char == 'x':
hprint("cyan", "Touche 'x' reload json")
ask_text.reload_json()
sb_translation.reload_json()
elif key.char == 'm':
hprint("cyan", "Touche 'm' change prompt ia")
ask_text.change_prompt()
except AttributeError:
pass
def generation_responce():
text_streamer = sb_translation.get_lasttext()
hprint("blue", "start IA_generator")
ask_text.main_ask(text_streamer)
def print_Status_bar():
print(f"\033[94mRecording time: 0{self.seconds}s | file : {self.loop}\033[0m", end='\r', flush=True) # Réinitialise la ligne à chaque fois
#----------------------------------------------------------------------------------------------------
if __name__ == '__main__':
global main_loop
main_loop = True
del_pathfile("./working_bot/storage/IA_generator.json")
del_pathfile("./working_bot/storage/subtitle_data.json")
#configuration du dossier de travaille
work_directory = "working_bot"
if not os.path.exists(work_directory):
os.makedirs(work_directory)
hprint("green", "création du dossier : "+work_directory)
os.chdir(work_directory)
#récupération du fichier de configuration
with open("../config/config.json", 'r') as file:
config = json.load(file)
#récupération des arguments
parser = argparse.ArgumentParser()
parser.add_argument('-threads', type=int, default=10, help='Number of threads')
parser.add_argument('-recordtime', type=int, default=60, help='Time to record')
parser.add_argument('-twitchname', type=str, required=True, help='Twitch channel name')
args = parser.parse_args()
#lancement de la class d'enregistrement audio du stream
hprint("blue", "start loop RecordTwitch")
recordTw = RecordTwitch(channel_name=args.twitchname, record_time=args.recordtime)
recordTw.main()
#lancement d'enregistrement chat du stream
# hprint("blue", "start loop TwitchChatBot")
# bot = TwitchChatBot(args.twitchname)
# bot.start_background()
hprint("blue", "start loop Subtitle_translation")
sb_translation = Subtitle_translation("../config/config.json")
sb_translation.start_main_loop()
hprint("blue", "start loop IA_generator")
ask_text = IA_generator("../config/config.json")
ask_text.start_main_loop()
controluser = messageTwitch("../config/user.json",args.twitchname)
controluser.start_loop_respond()
print_help()
listener_thread = threading.Thread(target=start_keyboard_listener)
listener_thread.start() # mise en thread de l'écoute des appuye touche