Source code for T-reX.ExchangeEditor

"""
ExchangeEditor Module
=====================

This module is responsible for editing exchanges with wurst and Brightway2.
It appends relevant exchanges from the `db_T_reX` (database containing waste and material exchange details)
to activities identified by `WasteAndMaterialSearch()` in the specified project's database (`db_name`).
Each appended exchange replicates the same amount and unit as the original technosphere waste and material exchange.

"""

# Imports
import os
from datetime import datetime
import bw2data as bd
import pandas as pd
from tqdm import tqdm


[docs] def ExchangeEditor(project_T_reX, db_name, db_T_reX_name): """ Append relevant exchanges from `db_T_reX` to each activity in `db_name` identified by `WasteAndMaterialSearch()`. This function modifies the specified project's database by appending exchanges from the `db_T_reX` to activities identified by `WasteAndMaterialSearch()`. The appended exchanges mirror the quantity and unit of the original technosphere waste and material exchange. :param str project_T_reX: Name of the Brightway2 project to be modified. :param str db_name: Name of the database within the project where activities and exchanges are stored. :param str db_T_reX_name: Name of the database containing waste and material exchange details. :returns: None. Modifies the given Brightway2 project by appending exchanges and logs statistics about the added exchanges. :rtype: None :raises Exception: If any specified process or exchange is not found in the database. """ # Import user settings and directory paths from config.user_settings import ( dir_logs, dir_searchmaterial_results, dir_searchwaste_results, ) # Set the current project to project_T_reX bd.projects.set_current(project_T_reX) # Get database objects db = bd.Database(db_name) db_T_reX = bd.Database(db_T_reX_name) # Define directories dir_searchwaste_results = dir_searchwaste_results / db_name dir_searchmaterial_results_grouped = ( dir_searchmaterial_results / db_name / "grouped" ) # Create a dictionary of files produced by SearchWaste() and SearchMaterial() file_dict = { os.path.splitext(f)[0]: os.path.join(dir_searchwaste_results, f) for f in os.listdir(dir_searchwaste_results) } file_dict.update( { os.path.splitext(f)[0]: os.path.join(dir_searchmaterial_results_grouped, f) for f in os.listdir(dir_searchmaterial_results_grouped) } ) sorted_items = sorted(file_dict.items()) file_dict = dict(sorted_items) # Create a DataFrame for each file and store it in the dictionary for key, f_path in file_dict.items(): df = pd.read_csv(f_path, sep=";", header=0, index_col=0) df.reset_index(inplace=True) df = df[ [ "code", "name", "location", "ex_name", "ex_amount", "ex_unit", "ex_location", "database", ] ] file_dict[key] = df # Start adding exchanges print("\n\n*** ExchangeEditor() is running for " + db_name + " ***\n") print(f"* Appending waste and material exchanges in {db_T_reX_name}\n ") countNAME = 0 # Calculate the maximum length of NAME max_name_length = max(len(name) for name in file_dict.keys()) # Calculate the maximum length of progress_db max_progress_length = ( len(str(len(file_dict.items()))) * 2 + 1 ) # times 2 for countNAME and len(file_dict.items()), plus 1 for the slash # Define the bar format with a fixed width for desc bar_format = f"{{desc:<{max_progress_length + max_name_length + 10}}} | {{bar:30}} | {{percentage:3.1f}}% | Progress: {{n:>5}} of {{total:<5}} | Elapsed: {{elapsed:<5}} | Remaining: {{remaining:<5}}" start = datetime.now() # Iterate over each category (NAME) for NAME, df in sorted(file_dict.items(), reverse=False): countNAME += 1 progress_db = f"{countNAME:2}/{len(file_dict.items())}" count = 0 # For each exchange in the current category's DataFrame for exc in tqdm( df.to_dict("records"), desc=f" - {progress_db} : {NAME} ", bar_format=bar_format, colour="magenta", smoothing=0.01, ): # Extract details of the exchange code, name, location, ex_name, amount, unit, ex_location, database = ( exc["code"], exc["name"], exc["location"], exc["ex_name"], exc["ex_amount"], exc["ex_unit"], exc["ex_location"], db_name, ) KEY = (database, code) T_reX_KEY = ( db_T_reX_name, NAME.split("_")[1] .capitalize() .replace("_", " ") .replace("-", " ") .replace("kilogram", "(kg)") .replace("cubicmeter", "(m3)"), ) # Retrieve the process and T_reX exchange from the databases try: process = bd.get_activity(KEY) T_reX_ex = bd.get_activity(T_reX_KEY) before = len(process.exchanges()) #! TODO: Check if the exchange already exists in the process, and if so, skip it # Create a new exchange in the process process.new_exchange( input=T_reX_ex, amount=amount, unit=unit, type="biosphere", ).save() after = len(process.exchanges()) # If the exchange was successfully added, increment the counter if (after - before) == 1: count += 1 except Exception as e: print(e) print(f"Process {name}, {ex_location} not found in {db_name}") continue # Log the time taken and the number of additions for this category end = datetime.now() duration = end - start log_entry = ( end.strftime("%m/%d/%Y, %H:%M:%S"), db_name, NAME, "additions", count, "duration:", str(duration), ) log_file = os.path.join( dir_logs, f'{datetime.now().strftime("%Y-%m-%d")}_ExchangeEditor.txt' ) with open(log_file, "a") as l: l.write(str(log_entry) + "\n") print(f'{"*"*100}') print( f"\n*** ExchangeEditor() completed for {db_name} in {str(duration).split('.')[0]} (h:m:s) ***\n" ) print(f'{"*"*100}') return None