"""
SearchWaste Module
==================
This script loads data from '<db name>_exploded.pickle', runs search queries,
and produces CSV files to store the results and a log entry. The search queries are
formatted as dictionaries with fields NAME, CODE, and search terms keywords_AND,
keywords_OR, and keywords_NOT. These queries are defined in `config/queries_waste.py`.
Functionality
-------------
Provides a function, :func:`SearchWaste`, that loads data from '<db name>_exploded.pickle',
runs search queries, and produces result CSVs and log entries.
"""
import os
import shutil
from datetime import datetime
import pandas as pd
from config.queries_waste import queries_waste
from config.user_settings import dir_logs, dir_searchwaste_results, dir_tmp
[docs]
def SearchWaste(db_name, dir_searchwaste_results=dir_searchwaste_results):
"""
Load data from '<db name>_exploded.pickle', run search queries, and produce
result CSVs and log entries.
This function processes waste-related data from a given database and runs
predefined queries to identify relevant waste exchanges. The results are
saved in CSV files and log entries are created for each search operation.
:param str db_name: The database name to be used in the search operation.
Note:
The queries are defined in `config/queries_waste.py`.
"""
print("\n*** Starting SearchWaste ***")
dir_searchwaste_results = os.path.join(dir_searchwaste_results, db_name)
if os.path.isdir(dir_searchwaste_results):
print("Deleting existing results directory")
shutil.rmtree(dir_searchwaste_results)
# Ensure necessary directories exist
if not os.path.exists(dir_logs):
os.makedirs(dir_logs)
if not os.path.exists(dir_searchwaste_results):
os.makedirs(dir_searchwaste_results)
# Load dataset
pickle_path = os.path.join(dir_tmp, db_name + "_exploded.pickle")
if os.path.isfile(pickle_path):
df = pd.read_pickle(pickle_path)
print("*** Loading pickle to dataframe ***")
else:
print("Pickle file does not exist.")
return
print("*** Searching for waste exchanges ***")
def search(query):
"""
Execute an individual search query on the dataset.
Parameters:
- query (dict): Search query defined in `config/queries_waste.py`.
Returns:
A CSV file with search results, saved to `data/SearchWasteResults/<db_name>` with the query name.
"""
# Extract and process query components (for readability in the code)
NAME_BASE = query["name"]
UNIT = query["unit"]
NAME = NAME_BASE + "-" + UNIT
CODE = NAME.replace(" ", "")
query.update({"code": NAME, "db_name": db_name})
AND = query["AND"]
OR = query["OR"]
NOT = query["NOT"]
DBNAME = query["db_name"]
# Apply the search terms to the dataframe
df_results = df[
(df["ex_name"].apply(lambda x: all(i in x for i in AND)))
& (df["ex_unit"] == UNIT)
# & (df["ex_amount"] != 1)
# & (df["ex_type"].isin(['technosphere', 'production']))
].copy()
if df_results.shape[0] == 0:
print(f"\t\t** No results for {NAME}")
return
if OR:
df_results = df_results[
df_results["ex_name"].apply(lambda x: any(i in x for i in OR))
]
if df_results.shape[0] == 0:
print(f"\t\t** No results for {NAME}")
return
if NOT:
df_results = df_results[
df_results["ex_name"].apply(lambda x: not any(i in x for i in NOT))
]
if df_results.shape[0] == 0:
print(f"\t\t** No results for {NAME}")
return
if "carbon dioxide" in NAME_BASE:
df_results = df_results[df_results["ex_amount"] > 0]
df_results["ex_amount"] = -df_results["ex_amount"]
else:
df_results = df_results[df_results["ex_amount"] < 0]
# Save results to CSV
T_reX_file_name = NAME.replace(" ", "")
T_reX_file = os.path.join(dir_searchwaste_results, T_reX_file_name)
df_results["database"] = DBNAME
if df_results.shape[0] != 0:
df_results.to_csv(T_reX_file + ".csv", sep=";")
# Log the results
log_entry = (
f"TIME: {datetime.now().strftime('%Y-%m-%d_%H:%M:%S')}, DB: {db_name}, RESULTS: {df_results.shape[0]}, "
f"NAME: {query['name']}, Search parameters, AND={query['AND']}, OR={query['OR']}, NOT={query['NOT']}, "
f"UNIT={query['unit']}, CODE={CODE}"
)
date = datetime.now().strftime("%Y%m%d")
log_file = os.path.join(dir_logs, f"SearchWaste_{date}.log")
with open(log_file, "a") as l:
l.write(str(log_entry) + "\n")
print(
f"\t{query['name']:<25} \t| {query['unit']:<13} \t| {df_results.shape[0]:>6}"
)
# Execute each query using the search() function defined above
for query in queries_waste:
search(query)
print("*** Finished searching for waste exchanges ***")
return None