ajout de mémoire avec fichier

This commit is contained in:
Foufure13
2025-01-10 23:13:37 +01:00
parent b1ac5c7506
commit 08c63b576c
15 changed files with 292 additions and 465 deletions
+103
View File
@@ -0,0 +1,103 @@
import os
import json
class PersistentStorage:
def __init__(self, storage_dir="storage"):
"""
Initialise l'environnement de stockage avec un répertoire dédié.
:param storage_dir: Chemin du répertoire de stockage.
"""
self.storage_dir = storage_dir
if not os.path.exists(storage_dir):
os.makedirs(storage_dir)
def _get_file_path(self, filename):
"""Renvoie le chemin complet d'un fichier dans le répertoire de stockage."""
return os.path.join(self.storage_dir, filename + ".json")
def read(self, filename):
"""
Lit et retourne le contenu d'un fichier sous forme de dictionnaire.
:param filename: Nom du fichier (sans extension).
:return: Dictionnaire des données du fichier ou {} si le fichier est vide/inexistant.
"""
file_path = self._get_file_path(filename)
try:
with open(file_path, 'r', encoding='utf-8') as file:
return json.load(file)
except (FileNotFoundError, json.JSONDecodeError):
return {}
def write(self, filename, key, value):
"""
Ajoute ou met à jour une paire clé-valeur dans un fichier.
:param filename: Nom du fichier (sans extension).
:param key: Clé à ajouter ou mettre à jour.
:param value: Valeur associée.
"""
data = self.read(filename)
data[key] = value
file_path = self._get_file_path(filename)
with open(file_path, 'w', encoding='utf-8') as file:
json.dump(data, file, indent=4, ensure_ascii=False)
return data
def delete(self, filename, key):
"""
Supprime une clé d'un fichier si elle existe.
:param filename: Nom du fichier (sans extension).
:param key: Clé à supprimer.
"""
data = self.read(filename)
if key in data:
del data[key]
file_path = self._get_file_path(filename)
with open(file_path, 'w', encoding='utf-8') as file:
json.dump(data, file, indent=4, ensure_ascii=False)
return data
def list_keys(self, filename):
"""
Liste toutes les clés présentes dans un fichier.
:param filename: Nom du fichier (sans extension).
:return: Liste des clés.
"""
data = self.read(filename)
return list(data.keys())
def query_all(self):
"""
Interroge tous les fichiers dans le répertoire de stockage et retourne leurs contenus.
:return: Dictionnaire de tous les fichiers et leurs données.
"""
all_data = {}
for file in os.listdir(self.storage_dir):
if file.endswith(".json"):
filename = os.path.splitext(file)[0]
all_data[filename] = self.read(filename)
return all_data
# Exemple d'utilisation
if __name__ == "__main__":
# Initialisation de l'environnement
storage = PersistentStorage()
# Écriture de données
storage.write("session1", "key1", "Valeur 1")
storage.write("session1", "key2", "Valeur 2")
storage.write("session2", "keyA", "Autre valeur")
# Lecture des données
print("Contenu de session1 :", storage.read("session1"))
print("Contenu de session2 :", storage.read("session2"))
# Suppression d'une clé
#storage.delete("session1", "key1")
print("Après suppression de key1 :", storage.read("session1"))
# Liste des clés
print("Clés dans session1 :", storage.list_keys("session1"))
# Interrogation globale
print("Contenu global :", storage.query_all())
-238
View File
@@ -1,238 +0,0 @@
import os
import signal
import sys
import argparse
import json
import time
import pprint
import random
import requests
import datetime
import glob
import threading
import traceback
# import keyboard
from pynput import keyboard
from threading import Thread, Semaphore
from streamlink import Streamlink
from fake_useragent import UserAgent
from rich.console import Console
from rich.live import Live
from rich.prompt import Prompt
from rich.spinner import Spinner
from rich.table import Table
from rich.text import Text
import subprocess
import pty
console = Console()
def hprint(color, texte):
timestamp = datetime.datetime.now().strftime("%Hh %Mm %Ss")
console.print("[bold "+color+"] ["+timestamp+"] "+texte+" [/bold "+color+"]")
def del_file(dossier, file):
motif = os.path.join(dossier, file)
for file in glob.glob(motif):
os.remove(file)
hprint("",f"file deleted : {file}")
def get_value_json(var_name, config):
return config.get(var_name)
def get_value_json_list(index, key, json_data):
try:
return json_data[index][key]
except (IndexError, KeyError, TypeError):
return None
# Fonction pour obtenir l'heure actuelle sous forme de chaîne
def get_current_time():
current_time = time.time()
return time.strftime('%H:%M:%S', time.localtime(current_time))
# now = datetime.now()
# return now.strftime("%H:%M")
class RecordTwitch:
def __init__(self, channel_name, record_time):
self.channel_name = channel_name
self.request_count = 0
self.channel_url = "https://www.twitch.tv/" + self.channel_name
self.active_threads = 0
self.record_time = record_time
self.max_timerecordfile = 60
self.running = False
self.script_name = "twitch_record"
if (self.record_time == -1):
self.record_time = 9999
def hprint(self,color, texte):
timestamp = datetime.datetime.now().strftime("%Hh %Mm %Ss")
console.print("[bold "+color+"] ["+timestamp+"] ("+self.script_name+") "+texte+" [/bold "+color+"]")
def stop(self, signum, frame):
print("\STOPING...")
self.running = False
def del_mp3(self, dossier):
motif = os.path.join(dossier, '*.mp3')
for file in glob.glob(motif):
os.remove(file)
self.hprint("",f"file deleted : {file}")
def clear_diretory(self, dir_inrecord="in_record", dir_record="record"):
# self.hprint('green', 'clear_diretory start')
if os.path.exists(dir_inrecord) and os.path.exists(dir_record):
self.hprint('green', f"dir {dir_inrecord} and {dir_record} exist clearing")
self.del_mp3(dir_inrecord)
self.del_mp3(dir_record)
def create_session(self):
# Session creating for request
self.ua = UserAgent()
self.session = Streamlink()
self.session.set_option("http-headers", {
"Accept-Language": "en-US,en;q=0.5",
"Connection": "keep-alive",
"DNT": "1",
"Upgrade-Insecure-Requests": "1",
"User-Agent": self.ua.random,
"Client-ID": "your_client_id", # Replace with your actual Client-ID
"Referer": "https://www.google.com/"
})
def get_url(self):
hprint("green","try to get url")
url = ""
self.create_session()
try:
streams = self.session.streams(self.channel_url)
url = streams['audio_only'].url if 'audio_only' in streams else streams['worst'].url
hprint("cyan","url generate : " + url)
except Exception as e:
hprint("red","Error on get url : "+str(e))
# traceback.print_exc() # Affiche les détails de l'erreur
pass
return url
def loop_run(self, intervalle):
try:
while True:
self.hprint("green","Start scan files...")
self.verif_record_move()
# self.hprint("yellow",f"wait {intervalle}s for next scan file completed.")
time.sleep(intervalle)
except KeyboardInterrupt:
self.hprint("red","STOP LOOP.")
def verif_record_move(self, dir_inrecord="in_record", dir_record="record"):
if not os.path.exists(dir_record):
os.makedirs(dir_record)
fichiers = [f for f in os.listdir(dir_inrecord) if os.path.isfile(os.path.join(dir_inrecord, f))]
if len(fichiers) > 1:
# sort file
sort_file = sorted(fichiers, key=lambda x: int(x.split('_')[-1].split('.')[0]))
fileto_move = sort_file[0]
chemin_source = os.path.join(dir_inrecord, fileto_move)
chemin_destination = os.path.join(dir_record, fileto_move)
# move file
shutil.move(chemin_source, chemin_destination)
self.hprint("green",f"File moved: {fileto_move}")
# else:
# self.hprint("yellow","Not enough files to compare.")
def compteur(self ):
seconds = 0
loop = 0
self.running = True
while True:
time.sleep(1) # Attend une seconde
# self.hprint("magenta", f"self.running = {self.running}")
if (seconds < 10):
print(f"\033[94mRecording time: 0{seconds}s | file : {loop}\033[0m", end='\r', flush=True) # Réinitialise la ligne à chaque fois
else:
print(f"\033[94mRecording time: {seconds}s | file : {loop}\033[0m", end='\r', flush=True) # Réinitialise la ligne à chaque fois
seconds += 1
if seconds == self.max_timerecordfile:
loop +=1
seconds = 0 # Réinitialise le compteur après 60 seconds
def record_audio(self):
output_directory = "record"
in_record_directory = "in_record"
os.makedirs(output_directory, exist_ok=True)
os.makedirs(in_record_directory, exist_ok=True)
stream_url = self.get_url()
if not stream_url:
console.print("[bold red]Impossible de récupérer l'URL du flux[/bold red]")
return
timestamp_complet = datetime.datetime.now().strftime("%Y%m%d-%H%M%S")
timestamp = datetime.datetime.now().strftime("%Y-%m-%d")
output_file_path = os.path.join(in_record_directory, f"{timestamp}_%03d.mp3")
self.hprint("green", f"start record")
thread_compteur = Thread(target=self.compteur, daemon=True)
thread_compteur.start()
command = ['ffmpeg','-i', stream_url,'-vn','-acodec','libmp3lame','-ar','44100','-ac','2','-map','0:a','-f','segment','-segment_time',str(self.max_timerecordfile),'-segment_format','mp3',output_file_path,'-loglevel', 'error']
# self.hprint('yellow',str(command))
process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
output, error = process.communicate()
# thread_compteur.do_run = False
thread_compteur.join()
def main(self):
self.running = True
self.clear_diretory()
record_thread = Thread(target=self.record_audio)
record_thread.start()
# record_thread.join() # Wait for the recording to finish
loop_run = Thread(target=self.loop_run, args=(10,), daemon=True)
loop_run.start()
loop_run.join() # Wait for the recording to finish
console.print("[bold magenta]Enregistrement terminé, le programme va se terminer.[/bold magenta]")
def start_recording(self):
"""Lance main_record_twitch dans un thread pour ne pas bloquer le code principal."""
self.thread = threading.Thread(target=self.main)
self.thread.start() # Démarre l'enregistrement dans un thread séparé
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('-recordtime', type=int, default=60, help='Time to record')
parser.add_argument('-twitchname', type=str, required=True, help='Twitch channel name')
args = parser.parse_args()
bot = RecordTwitch(channel_name=args.twitchname, record_time=args.recordtime)
try:
bot.main()
except KeyboardInterrupt:
console.print("[bold red]ERREUR RECORDING[/bold red]")
+4
View File
@@ -0,0 +1,4 @@
{
"key2": "Valeur 2",
"key1": "Valeur 1"
}
+3
View File
@@ -0,0 +1,3 @@
{
"keyA": "Autre valeur"
}