Files
2025-01-10 15:07:14 +01:00

233 lines
8.0 KiB
Python

import os
import signal
import sys
import shutil
import argparse
import json
import time
import pprint
import random
import requests
import datetime
import glob
import threading
import traceback
# import keyboard
from pynput import keyboard
from threading import Thread, Semaphore
from streamlink import Streamlink
from fake_useragent import UserAgent
from rich.console import Console
from rich.live import Live
from rich.prompt import Prompt
from rich.spinner import Spinner
from rich.table import Table
from rich.text import Text
import subprocess
import pty
console = Console()
def hprint(color, texte):
timestamp = datetime.datetime.now().strftime("%Hh %Mm %Ss")
console.print("[bold "+color+"] ["+timestamp+"] "+texte+" [/bold "+color+"]")
def del_file(dossier, file):
motif = os.path.join(dossier, file)
for file in glob.glob(motif):
os.remove(file)
hprint("",f"file deleted : {file}")
def get_value_json(var_name, config):
return config.get(var_name)
def get_value_json_list(index, key, json_data):
try:
return json_data[index][key]
except (IndexError, KeyError, TypeError):
return None
# Fonction pour obtenir l'heure actuelle sous forme de chaîne
def get_current_time():
current_time = time.time()
return time.strftime('%H:%M:%S', time.localtime(current_time))
# now = datetime.now()
# return now.strftime("%H:%M")
class RecordTwitch:
def __init__(self, channel_name, record_time):
self.channel_name = channel_name
self.channel_url = "https://www.twitch.tv/" + self.channel_name
self.record_time = record_time
self.max_timerecordfile = 60
self.running = True
self.script_name = "twitch_record"
if (self.record_time == -1):
self.record_time = 9999
def hprint(self,color, texte):
timestamp = datetime.datetime.now().strftime("%Hh %Mm %Ss")
console.print("[bold "+color+"] ["+timestamp+"] ("+self.script_name+") "+texte+" [/bold "+color+"]")
def stop(self):
hprint("magenta","STOPING RecordTwitch")
self.running = False
def del_mp3(self, dossier):
motif = os.path.join(dossier, '*.mp3')
for file in glob.glob(motif):
os.remove(file)
hprint("",f"file deleted : {file}")
def clear_diretory(self, dir_inrecord="in_record", dir_record="record"):
# hprint('green', 'clear_diretory start')
if os.path.exists(dir_inrecord) and os.path.exists(dir_record):
hprint('green', f"dir {dir_inrecord} and {dir_record} exist clearing")
self.del_mp3(dir_inrecord)
self.del_mp3(dir_record)
def create_session(self):
# Session creating for request
self.ua = UserAgent()
self.session = Streamlink()
self.session.set_option("http-headers", {
"Accept-Language": "en-US,en;q=0.5",
"Connection": "keep-alive",
"DNT": "1",
"Upgrade-Insecure-Requests": "1",
"User-Agent": self.ua.random,
"Client-ID": "your_client_id", # Replace with your actual Client-ID
"Referer": "https://www.google.com/"
})
def get_url(self):
hprint("green","try to get url")
url = ""
self.create_session()
try:
streams = self.session.streams(self.channel_url)
url = streams['audio_only'].url if 'audio_only' in streams else streams['worst'].url
hprint("cyan","url generate : " + url)
except Exception as e:
hprint("red","Error on get url : "+str(e))
# traceback.print_exc() # Affiche les détails de l'erreur
pass
return url
def loop_run(self, intervalle):
try:
while self.running:
self.hprint("green", "Start scan files...")
self.verif_record_move()
self.hprint("yellow", f"wait {intervalle}s for next scan file completed.")
time.sleep(intervalle)
except KeyboardInterrupt:
self.hprint("red", "STOP LOOP.")
finally:
self.running = False # Assurez-vous que le drapeau est désactivé
def verif_record_move(self, dir_inrecord="in_record", dir_record="record"):
hprint("green", "start verif_record_move")
if not os.path.exists(dir_record):
os.makedirs(dir_record)
hprint("green", "création du dossier : "+dir_record)
fichiers = [f for f in os.listdir(dir_inrecord) if os.path.isfile(os.path.join(dir_inrecord, f))]
if len(fichiers) > 1:
# sort file
sort_file = sorted(fichiers, key=lambda x: int(x.split('_')[-1].split('.')[0]))
fileto_move = sort_file[0]
chemin_source = os.path.join(dir_inrecord, fileto_move)
chemin_destination = os.path.join(dir_record, fileto_move)
# move file
shutil.move(chemin_source, chemin_destination)
hprint("green",f"File moved: {fileto_move}")
# else:
# hprint("yellow","Not enough files to compare.")
def compteur(self ):
seconds = 0
loop = 0
while self.running:
time.sleep(1) # Attend une seconde
# hprint("magenta", f"self.running = {self.running}")
if (seconds < 10):
print(f"\033[94mRecording time: 0{seconds}s | file : {loop}\033[0m", end='\r', flush=True) # Réinitialise la ligne à chaque fois
else:
print(f"\033[94mRecording time: {seconds}s | file : {loop}\033[0m", end='\r', flush=True) # Réinitialise la ligne à chaque fois
seconds += 1
if seconds == self.max_timerecordfile:
loop +=1
seconds = 0 # Réinitialise le compteur après 60 seconds
def record_audio(self):
output_directory = "record"
in_record_directory = "in_record"
os.makedirs(output_directory, exist_ok=True)
os.makedirs(in_record_directory, exist_ok=True)
stream_url = self.get_url()
if not stream_url:
hprint("red","Impossible de récupérer l'URL du flux")
return
timestamp_complet = datetime.datetime.now().strftime("%Y%m%d-%H%M%S")
timestamp = datetime.datetime.now().strftime("%Y-%m-%d")
output_file_path = os.path.join(in_record_directory, f"{timestamp}_%03d.mp3")
hprint("green", f"start record")
thread_compteur = Thread(target=self.compteur, daemon=True)
thread_compteur.start()
command = ['ffmpeg','-i', stream_url,'-vn','-acodec','libmp3lame','-ar','44100','-ac','2','-map','0:a','-f','segment','-segment_time',str(self.max_timerecordfile),'-segment_format','mp3',output_file_path,'-loglevel', 'error']
# hprint('yellow',str(command))
process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
output, error = process.communicate()
# thread_compteur.do_run = False
thread_compteur.join()
def main(self):
self.running = True
self.clear_diretory()
record_thread = Thread(target=self.record_audio)
record_thread.start()
# record_thread.join() # Wait for the recording to finish
loop_run = Thread(target=self.loop_run, args=(10,), daemon=True)
loop_run.start()
loop_run.join() # Wait for the recording to finish
hprint("cyan","Enregistrement terminé, le programme va se terminer.")
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('-recordtime', type=int, default=60, help='Time to record')
parser.add_argument('-twitchname', type=str, required=True, help='Twitch channel name')
args = parser.parse_args()
bot = RecordTwitch(channel_name=args.twitchname, record_time=args.recordtime)
try:
bot.main()
except KeyboardInterrupt:
console.print("[bold red]ERREUR RECORDING[/bold red]")