big debug amélioration enregistrement

This commit is contained in:
Foufure13
2025-01-10 15:07:14 +01:00
parent 096c605213
commit af359e9c70
13 changed files with 842 additions and 128 deletions
+238
View File
@@ -0,0 +1,238 @@
import os
import signal
import sys
import argparse
import json
import time
import pprint
import random
import requests
import datetime
import glob
import threading
import traceback
# import keyboard
from pynput import keyboard
from threading import Thread, Semaphore
from streamlink import Streamlink
from fake_useragent import UserAgent
from rich.console import Console
from rich.live import Live
from rich.prompt import Prompt
from rich.spinner import Spinner
from rich.table import Table
from rich.text import Text
import subprocess
import pty
console = Console()
def hprint(color, texte):
timestamp = datetime.datetime.now().strftime("%Hh %Mm %Ss")
console.print("[bold "+color+"] ["+timestamp+"] "+texte+" [/bold "+color+"]")
def del_file(dossier, file):
motif = os.path.join(dossier, file)
for file in glob.glob(motif):
os.remove(file)
hprint("",f"file deleted : {file}")
def get_value_json(var_name, config):
return config.get(var_name)
def get_value_json_list(index, key, json_data):
try:
return json_data[index][key]
except (IndexError, KeyError, TypeError):
return None
# Fonction pour obtenir l'heure actuelle sous forme de chaîne
def get_current_time():
current_time = time.time()
return time.strftime('%H:%M:%S', time.localtime(current_time))
# now = datetime.now()
# return now.strftime("%H:%M")
class RecordTwitch:
def __init__(self, channel_name, record_time):
self.channel_name = channel_name
self.request_count = 0
self.channel_url = "https://www.twitch.tv/" + self.channel_name
self.active_threads = 0
self.record_time = record_time
self.max_timerecordfile = 60
self.running = False
self.script_name = "twitch_record"
if (self.record_time == -1):
self.record_time = 9999
def hprint(self,color, texte):
timestamp = datetime.datetime.now().strftime("%Hh %Mm %Ss")
console.print("[bold "+color+"] ["+timestamp+"] ("+self.script_name+") "+texte+" [/bold "+color+"]")
def stop(self, signum, frame):
print("\STOPING...")
self.running = False
def del_mp3(self, dossier):
motif = os.path.join(dossier, '*.mp3')
for file in glob.glob(motif):
os.remove(file)
self.hprint("",f"file deleted : {file}")
def clear_diretory(self, dir_inrecord="in_record", dir_record="record"):
# self.hprint('green', 'clear_diretory start')
if os.path.exists(dir_inrecord) and os.path.exists(dir_record):
self.hprint('green', f"dir {dir_inrecord} and {dir_record} exist clearing")
self.del_mp3(dir_inrecord)
self.del_mp3(dir_record)
def create_session(self):
# Session creating for request
self.ua = UserAgent()
self.session = Streamlink()
self.session.set_option("http-headers", {
"Accept-Language": "en-US,en;q=0.5",
"Connection": "keep-alive",
"DNT": "1",
"Upgrade-Insecure-Requests": "1",
"User-Agent": self.ua.random,
"Client-ID": "your_client_id", # Replace with your actual Client-ID
"Referer": "https://www.google.com/"
})
def get_url(self):
hprint("green","try to get url")
url = ""
self.create_session()
try:
streams = self.session.streams(self.channel_url)
url = streams['audio_only'].url if 'audio_only' in streams else streams['worst'].url
hprint("cyan","url generate : " + url)
except Exception as e:
hprint("red","Error on get url : "+str(e))
# traceback.print_exc() # Affiche les détails de l'erreur
pass
return url
def loop_run(self, intervalle):
try:
while True:
self.hprint("green","Start scan files...")
self.verif_record_move()
# self.hprint("yellow",f"wait {intervalle}s for next scan file completed.")
time.sleep(intervalle)
except KeyboardInterrupt:
self.hprint("red","STOP LOOP.")
def verif_record_move(self, dir_inrecord="in_record", dir_record="record"):
if not os.path.exists(dir_record):
os.makedirs(dir_record)
fichiers = [f for f in os.listdir(dir_inrecord) if os.path.isfile(os.path.join(dir_inrecord, f))]
if len(fichiers) > 1:
# sort file
sort_file = sorted(fichiers, key=lambda x: int(x.split('_')[-1].split('.')[0]))
fileto_move = sort_file[0]
chemin_source = os.path.join(dir_inrecord, fileto_move)
chemin_destination = os.path.join(dir_record, fileto_move)
# move file
shutil.move(chemin_source, chemin_destination)
self.hprint("green",f"File moved: {fileto_move}")
# else:
# self.hprint("yellow","Not enough files to compare.")
def compteur(self ):
seconds = 0
loop = 0
self.running = True
while True:
time.sleep(1) # Attend une seconde
# self.hprint("magenta", f"self.running = {self.running}")
if (seconds < 10):
print(f"\033[94mRecording time: 0{seconds}s | file : {loop}\033[0m", end='\r', flush=True) # Réinitialise la ligne à chaque fois
else:
print(f"\033[94mRecording time: {seconds}s | file : {loop}\033[0m", end='\r', flush=True) # Réinitialise la ligne à chaque fois
seconds += 1
if seconds == self.max_timerecordfile:
loop +=1
seconds = 0 # Réinitialise le compteur après 60 seconds
def record_audio(self):
output_directory = "record"
in_record_directory = "in_record"
os.makedirs(output_directory, exist_ok=True)
os.makedirs(in_record_directory, exist_ok=True)
stream_url = self.get_url()
if not stream_url:
console.print("[bold red]Impossible de récupérer l'URL du flux[/bold red]")
return
timestamp_complet = datetime.datetime.now().strftime("%Y%m%d-%H%M%S")
timestamp = datetime.datetime.now().strftime("%Y-%m-%d")
output_file_path = os.path.join(in_record_directory, f"{timestamp}_%03d.mp3")
self.hprint("green", f"start record")
thread_compteur = Thread(target=self.compteur, daemon=True)
thread_compteur.start()
command = ['ffmpeg','-i', stream_url,'-vn','-acodec','libmp3lame','-ar','44100','-ac','2','-map','0:a','-f','segment','-segment_time',str(self.max_timerecordfile),'-segment_format','mp3',output_file_path,'-loglevel', 'error']
# self.hprint('yellow',str(command))
process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
output, error = process.communicate()
# thread_compteur.do_run = False
thread_compteur.join()
def main(self):
self.running = True
self.clear_diretory()
record_thread = Thread(target=self.record_audio)
record_thread.start()
# record_thread.join() # Wait for the recording to finish
loop_run = Thread(target=self.loop_run, args=(10,), daemon=True)
loop_run.start()
loop_run.join() # Wait for the recording to finish
console.print("[bold magenta]Enregistrement terminé, le programme va se terminer.[/bold magenta]")
def start_recording(self):
"""Lance main_record_twitch dans un thread pour ne pas bloquer le code principal."""
self.thread = threading.Thread(target=self.main)
self.thread.start() # Démarre l'enregistrement dans un thread séparé
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('-recordtime', type=int, default=60, help='Time to record')
parser.add_argument('-twitchname', type=str, required=True, help='Twitch channel name')
args = parser.parse_args()
bot = RecordTwitch(channel_name=args.twitchname, record_time=args.recordtime)
try:
bot.main()
except KeyboardInterrupt:
console.print("[bold red]ERREUR RECORDING[/bold red]")
+232
View File
@@ -0,0 +1,232 @@
import os
import signal
import sys
import shutil
import argparse
import json
import time
import pprint
import random
import requests
import datetime
import glob
import threading
import traceback
# import keyboard
from pynput import keyboard
from threading import Thread, Semaphore
from streamlink import Streamlink
from fake_useragent import UserAgent
from rich.console import Console
from rich.live import Live
from rich.prompt import Prompt
from rich.spinner import Spinner
from rich.table import Table
from rich.text import Text
import subprocess
import pty
console = Console()
def hprint(color, texte):
timestamp = datetime.datetime.now().strftime("%Hh %Mm %Ss")
console.print("[bold "+color+"] ["+timestamp+"] "+texte+" [/bold "+color+"]")
def del_file(dossier, file):
motif = os.path.join(dossier, file)
for file in glob.glob(motif):
os.remove(file)
hprint("",f"file deleted : {file}")
def get_value_json(var_name, config):
return config.get(var_name)
def get_value_json_list(index, key, json_data):
try:
return json_data[index][key]
except (IndexError, KeyError, TypeError):
return None
# Fonction pour obtenir l'heure actuelle sous forme de chaîne
def get_current_time():
current_time = time.time()
return time.strftime('%H:%M:%S', time.localtime(current_time))
# now = datetime.now()
# return now.strftime("%H:%M")
class RecordTwitch:
def __init__(self, channel_name, record_time):
self.channel_name = channel_name
self.channel_url = "https://www.twitch.tv/" + self.channel_name
self.record_time = record_time
self.max_timerecordfile = 60
self.running = True
self.script_name = "twitch_record"
if (self.record_time == -1):
self.record_time = 9999
def hprint(self,color, texte):
timestamp = datetime.datetime.now().strftime("%Hh %Mm %Ss")
console.print("[bold "+color+"] ["+timestamp+"] ("+self.script_name+") "+texte+" [/bold "+color+"]")
def stop(self):
hprint("magenta","STOPING RecordTwitch")
self.running = False
def del_mp3(self, dossier):
motif = os.path.join(dossier, '*.mp3')
for file in glob.glob(motif):
os.remove(file)
hprint("",f"file deleted : {file}")
def clear_diretory(self, dir_inrecord="in_record", dir_record="record"):
# hprint('green', 'clear_diretory start')
if os.path.exists(dir_inrecord) and os.path.exists(dir_record):
hprint('green', f"dir {dir_inrecord} and {dir_record} exist clearing")
self.del_mp3(dir_inrecord)
self.del_mp3(dir_record)
def create_session(self):
# Session creating for request
self.ua = UserAgent()
self.session = Streamlink()
self.session.set_option("http-headers", {
"Accept-Language": "en-US,en;q=0.5",
"Connection": "keep-alive",
"DNT": "1",
"Upgrade-Insecure-Requests": "1",
"User-Agent": self.ua.random,
"Client-ID": "your_client_id", # Replace with your actual Client-ID
"Referer": "https://www.google.com/"
})
def get_url(self):
hprint("green","try to get url")
url = ""
self.create_session()
try:
streams = self.session.streams(self.channel_url)
url = streams['audio_only'].url if 'audio_only' in streams else streams['worst'].url
hprint("cyan","url generate : " + url)
except Exception as e:
hprint("red","Error on get url : "+str(e))
# traceback.print_exc() # Affiche les détails de l'erreur
pass
return url
def loop_run(self, intervalle):
try:
while self.running:
self.hprint("green", "Start scan files...")
self.verif_record_move()
self.hprint("yellow", f"wait {intervalle}s for next scan file completed.")
time.sleep(intervalle)
except KeyboardInterrupt:
self.hprint("red", "STOP LOOP.")
finally:
self.running = False # Assurez-vous que le drapeau est désactivé
def verif_record_move(self, dir_inrecord="in_record", dir_record="record"):
hprint("green", "start verif_record_move")
if not os.path.exists(dir_record):
os.makedirs(dir_record)
hprint("green", "création du dossier : "+dir_record)
fichiers = [f for f in os.listdir(dir_inrecord) if os.path.isfile(os.path.join(dir_inrecord, f))]
if len(fichiers) > 1:
# sort file
sort_file = sorted(fichiers, key=lambda x: int(x.split('_')[-1].split('.')[0]))
fileto_move = sort_file[0]
chemin_source = os.path.join(dir_inrecord, fileto_move)
chemin_destination = os.path.join(dir_record, fileto_move)
# move file
shutil.move(chemin_source, chemin_destination)
hprint("green",f"File moved: {fileto_move}")
# else:
# hprint("yellow","Not enough files to compare.")
def compteur(self ):
seconds = 0
loop = 0
while self.running:
time.sleep(1) # Attend une seconde
# hprint("magenta", f"self.running = {self.running}")
if (seconds < 10):
print(f"\033[94mRecording time: 0{seconds}s | file : {loop}\033[0m", end='\r', flush=True) # Réinitialise la ligne à chaque fois
else:
print(f"\033[94mRecording time: {seconds}s | file : {loop}\033[0m", end='\r', flush=True) # Réinitialise la ligne à chaque fois
seconds += 1
if seconds == self.max_timerecordfile:
loop +=1
seconds = 0 # Réinitialise le compteur après 60 seconds
def record_audio(self):
output_directory = "record"
in_record_directory = "in_record"
os.makedirs(output_directory, exist_ok=True)
os.makedirs(in_record_directory, exist_ok=True)
stream_url = self.get_url()
if not stream_url:
hprint("red","Impossible de récupérer l'URL du flux")
return
timestamp_complet = datetime.datetime.now().strftime("%Y%m%d-%H%M%S")
timestamp = datetime.datetime.now().strftime("%Y-%m-%d")
output_file_path = os.path.join(in_record_directory, f"{timestamp}_%03d.mp3")
hprint("green", f"start record")
thread_compteur = Thread(target=self.compteur, daemon=True)
thread_compteur.start()
command = ['ffmpeg','-i', stream_url,'-vn','-acodec','libmp3lame','-ar','44100','-ac','2','-map','0:a','-f','segment','-segment_time',str(self.max_timerecordfile),'-segment_format','mp3',output_file_path,'-loglevel', 'error']
# hprint('yellow',str(command))
process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
output, error = process.communicate()
# thread_compteur.do_run = False
thread_compteur.join()
def main(self):
self.running = True
self.clear_diretory()
record_thread = Thread(target=self.record_audio)
record_thread.start()
# record_thread.join() # Wait for the recording to finish
loop_run = Thread(target=self.loop_run, args=(10,), daemon=True)
loop_run.start()
loop_run.join() # Wait for the recording to finish
hprint("cyan","Enregistrement terminé, le programme va se terminer.")
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('-recordtime', type=int, default=60, help='Time to record')
parser.add_argument('-twitchname', type=str, required=True, help='Twitch channel name')
args = parser.parse_args()
bot = RecordTwitch(channel_name=args.twitchname, record_time=args.recordtime)
try:
bot.main()
except KeyboardInterrupt:
console.print("[bold red]ERREUR RECORDING[/bold red]")