Files
2024-11-30 15:53:25 +01:00

269 lines
7.9 KiB
Python

import os
import signal
import sys
import argparse
import json
import time
import pprint
import random
import requests
import datetime
import glob
import threading
# import keyboard
from pynput import keyboard
from threading import Thread, Semaphore
from streamlink import Streamlink
from fake_useragent import UserAgent
from rich.console import Console
from rich.live import Live
from rich.prompt import Prompt
from rich.spinner import Spinner
from rich.table import Table
from rich.text import Text
import subprocess
import pty
console = Console()
class MessageTwitch:
def __init__(self, channel_name):
with open("config.json", 'r') as file:
self.config = json.load(file)
self.channel_name = channel_name
self.channel_url = "https://www.twitch.tv/" + self.channel_name
self.dir_twitchrecord = "twitch-recordAudio"
self.dir_whisperX = "whisperX"
self.subtitle = {}
self.all_subtitle = ""
self.response_stream = "Hello !"
self.current_record = ""
self.language = self.get_value_json("language")
self.list_prompt = self.get_value_json("list_prompt")
self.bad_answer = self.get_value_json("bad_answer")
self.tw_acc_pseudo = self.get_value_json("tw_acc_pseudo")
self.tw_acc_token = self.get_value_json("tw_acc_token")
def get_value_json(self, var_name):
return self.config.get(var_name)
def hprint(self,color, texte):
timestamp = datetime.datetime.now().strftime("%Hh %Mm %Ss")
console.print("[bold "+color+"] ["+timestamp+"] "+texte+" [/bold "+color+"]")
def del_file(self, dossier, file):
motif = os.path.join(dossier, file)
for file in glob.glob(motif):
os.remove(file)
self.hprint("",f"file deleted : {file}")
# Fonction pour convertir l'heure en secondes
# def time_to_seconds(time_str):
# h, m = map(int, time_str.split(':'))
# return h * 3600 + m * 60
def time_to_seconds(time_str):
h, m, s = map(int, time_str.split(':'))
return h * 3600 + m * 60 + s
# Fonction pour obtenir l'heure actuelle sous forme de chaîne
def get_current_time(self):
current_time = time.time()
return time.strftime('%H:%M:%S', time.localtime(current_time))
# now = datetime.now()
# return now.strftime("%H:%M")
# Fonction pour rechercher dans le dictionnaire
def search_dict(d, key):
return d.get(key, "Key not found")
def get_last_values(self, n):
values = list(self.subtitle.values())
return ''.join(values[-n:])
def get_value_by_index(self, index):
values = list(self.subtitle.values())
if index < len(values):
return values[index]
else:
return None
def verif_file_transcribe(self, dir):
for file in os.listdir(dir):
file_path = os.path.join(dir, file)
if os.path.isfile(file_path):
self.hprint("blue",f"File find -> transcribe.")
# Exécuter une fonction sur le fichier
self.auto_create_subtitle(file, dir)
# Supprimer le fichier après le traitement
os.remove(file_path)
print(f"Fichier {file_path} supprimé.")
def boucle_traitement(self, intervalle):
count_loop = 0
try:
while True:
self.hprint("blue",f"wait {intervalle} loop main.")
self.verif_file_transcribe("record")
if (count_loop >= 4):
self.hprint("blue",f"count_loop ==4 on lance gpt + message")
count_loop = 0
imagine_response = Thread(target=self.imagine_response)
imagine_response.start()
imagine_response.join() # Wait for the recording to finish
time.sleep(intervalle) # Attend 'intervalle' secondes avant la prochaine itération
count_loop += 1
except KeyboardInterrupt:
print("Arrêt du script.")
def imagine_response(self):
self.hprint("green",f"start imagine_response\n")
streamer_word = ""
self.hprint("blue",f"word in streamer_word : \n"+str(len(streamer_word)))
if len(self.subtitle) >= 6:
index = 6
streamer_word = self.get_last_values(index)
while len(streamer_word) < 100:
self.hprint("yellow",f"streamer_word to short add text\n")
index=index+1
if len(self.subtitle) >= index:
streamer_word = self.get_last_values(index)
else:
break
elif len(self.subtitle) >= 4:
streamer_word = self.get_last_values(4)
elif len(self.subtitle) >= 2:
streamer_word = self.get_last_values(2)
else :
streamer_word = self.all_subtitle
values = list(self.subtitle.values())
index = len(values)
while len(streamer_word) < 100 and index > 0:
index -= 1
streamer_word += values[index]
self.hprint("magenta","streamer_word : "+str(streamer_word)+"\n")
load_reponse = True
well_response = True
countload = 0
while load_reponse:
prompt_gpt = ['tgpt','-q','-w', '"'+self.list_prompt[countload]+' "'+ streamer_word+'"']
# self.hprint("DARKCYAN","\nPrompt demandé : "+str(prompt_gpt)+"\n")
process = subprocess.run(prompt_gpt, capture_output=True, text=True, check=False)
self.response_stream = process.stdout.replace("'", "").replace('"', "").replace("\n", "")
# console.print("réponse imaginé : "+self.response_stream)
nombre_de_mots = len(self.response_stream.split())
nombre_de_caracteres = len(self.response_stream)
self.hprint("CYAN","réponse imaginé : "+self.response_stream+"\n\n")
# load_reponse = False
# if (self.string_contient(self.bad_answer, self.response_stream)):
# console.print("[bold red] contient mot incorect [/bold red]\n")
# countload+=1
# continue
if (nombre_de_mots < 22):
load_reponse = False
console.print("[bold green]result create : "+self.response_stream+"[/bold green]\n\n")
else:
self.hprint("red","incorect result\n")
if (countload > 3):
self.hprint("red","tentative de load trop grosse\n")
self.response_stream = "Kappa"
load_reponse = False
countload+=1
console.print("finish imagine_response")
if (well_response):
self.hprint("green","send message")
send_message = Thread(target=self.send_message)
send_message.start()
send_message.join() # Wait for the recording to finish
def send_message(self):
self.hprint("green","startsend_message")
command = '-pseudo "'+self.tw_acc_pseudo+'" -token "'+self.tw_acc_token+'" -twitchname "'+self.channel_name+'" -message " Kappa '+self.response_stream+'"'
message_tosend = 'python send_message.py '+command
console.print("[bold yellow]"+str(message_tosend)+"[/bold yellow]")
subprocess.run(message_tosend, shell=True)
console.print("finish send_message")
def main(self):
self.hprint("blue", "wait 30")
time.sleep(30)
self.hprint("blue", "start loop all")
boucle_traitement = Thread(target=self.boucle_traitement(30))
boucle_traitement.start()
# boucle_traitement.join() # Wait for the recording to finish
# #console.print("[bold green]Enregistrement terminé, le programme va se terminer.[/bold green]")