2024-05-26 11:08:46 +02:00

223 lines
7.6 KiB
Python

# 20.05.24
import csv
import logging
from typing import List
# Internal utilities
from Src.Util._jsonConfig import config_manager
# Variable
CREATE_REPORT = config_manager.get_bool('M3U8_DOWNLOAD', 'create_report')
CREATE_JOB_DB = config_manager.get_bool('DEFAULT', 'create_job_database')
class SimpleDBManager:
def __init__(self, filename, columns):
"""
Initialize a new database manager.
Args:
- filename (str): The name of the CSV file containing the database.
- columns (list): List of database columns.
"""
self.filename = filename
self.db = []
self.columns = columns
logging.info("Database manager initialized.")
def load_database(self):
"""
Load the database from the specified CSV file.
If the file doesn't exist, initialize a new database.
"""
try:
with open(self.filename, 'r', newline='') as file:
reader = csv.reader(file)
self.db = list(reader)
logging.info(f"Database {self.filename} loaded successfully.")
except FileNotFoundError:
logging.warning(f"File {self.filename} not found, creating a new database...")
self.initialize_database()
def initialize_database(self, columns=None, rows=None):
"""
Initialize a new database with specified columns and rows.
Args:
- columns (list, optional): List of database columns. If not specified, uses the columns provided in the constructor.
- rows (list, optional): List of database rows. Each row should be a list of values. If not specified, the database will be empty.
"""
self.db = [self.columns]
if rows:
for row_data in rows:
self.add_row_to_database(row_data)
logging.info("Database initialized successfully.")
def add_row_to_database(self, *row_data):
"""
Add a new row to the database.
Args:
- row_data (list): List of values for the new row.
"""
self.db.append(list(row_data))
logging.info("New row added to the database.")
def add_column_to_database(self, column_name, default_value=''):
"""
Add a new column to the database.
Args:
- column_name (str): Name of the new column.
- default_value (str, optional): Default value to be inserted in cells of the new column. Default is an empty string.
"""
for row in self.db:
row.append(default_value)
self.db[0][-1] = column_name
logging.info(f"New column '{column_name}' added to the database.")
def update_row_in_database(self, row_index, new_row_data):
"""
Update an existing row in the database.
Args:
- row_index (int): Index of the row to update.
- new_row_data (list): List of the new values for the updated row.
"""
self.db[row_index] = new_row_data
logging.info(f"Row {row_index} of the database updated.")
def remove_row_from_database(self, column_index: int, search_value) -> list:
"""
Remove a row from the database based on a specific column value.
Args:
- column_index (int): Index of the column to search in.
- search_value: The value to search for in the specified column.
Returns:
list: The removed row from the database, if found; otherwise, an empty list.
"""
# Find the index of the row with the specified value in the specified column
row_index = None
for i, row in enumerate(self.db):
if row[column_index] == search_value:
row_index = i
break
# If the row with the specified value is found, remove it
remove_row = []
if row_index is not None:
remove_row = self.db[row_index]
del self.db[row_index]
logging.info(f"Row at index {row_index} with value {search_value} in column {column_index} removed from the database.")
else:
logging.warning(f"No row found with value {search_value} in column {column_index}. Nothing was removed from the database.")
return remove_row
def remove_column_from_database(self, col_index):
"""
Remove a column from the database.
Args:
- col_index (int): Index of the column to remove.
"""
for row in self.db:
del row[col_index]
logging.info(f"Column {col_index} of the database removed.")
def save_database(self):
"""
Save the database to the CSV file specified in the constructor.
"""
with open(self.filename, 'w', newline='') as file:
writer = csv.writer(file)
writer.writerows(self.db)
logging.info("Database saved to file.")
def print_database_as_sql(self):
"""
Print the database in SQL format to the console.
"""
max_lengths = [max(len(str(cell)) for cell in col) for col in zip(*self.db)]
line = "+-" + "-+-".join("-" * length for length in max_lengths) + "-+"
print(line)
print("| " + " | ".join(f"{cell:<{length}}" for cell, length in zip(self.db[0], max_lengths)) + " |")
print(line)
for row in self.db[1:]:
print("| " + " | ".join(f"{cell:<{length}}" for cell, length in zip(row, max_lengths)) + " |")
print(line)
logging.info("Database printed as SQL.")
def search_database(self, column_index, value):
"""
Search the database for rows matching the specified value in the given column.
Args:
- column_index (int): Index of the column to search on.
- value (str): Value to search for.
Returns:
list: List of rows matching the search.
"""
results = []
for row in self.db[1:]:
if row[column_index] == value:
results.append(row)
logging.info(f"Database searched for value '{value}' in column {column_index}. Found {len(results)} matches.")
return results
def sort_database(self, column_index):
"""
Sort the database based on values in the specified column.
Args:
- column_index (int): Index of the column to sort on.
"""
self.db[1:] = sorted(self.db[1:], key=lambda x: x[column_index])
logging.info(f"Database sorted based on column {column_index}.")
def filter_database(self, column_index, condition):
"""
Filter the database based on a condition on the specified column.
Args:
- column_index (int): Index of the column to apply the condition on.
- condition (function): Condition function to apply on the values of the column. Should return True or False.
Returns:
list: List of rows satisfying the condition.
"""
results = [self.db[0]] # Keep the header row
for row in self.db[1:]:
if condition(row[column_index]):
results.append(row)
logging.info(f"Filter applied on column {column_index}. Found {len(results) - 1} rows satisfying the condition.")
return results
# Output
if CREATE_REPORT:
report_table = SimpleDBManager("riepilogo.csv", ["Date", "Name", "Size"])
report_table.load_database()
report_table.save_database()
else:
report_table = None
if CREATE_JOB_DB:
job_database = SimpleDBManager("Job_database.csv", ["Id", "Name", "Season_n"])
job_database.load_database()
job_database.save_database()
else:
job_database = None