# pyfwg/iterator.py
import re
import pandas as pd
import inspect
import logging
from typing import Type, List, Dict, Any, Union, Optional
# Import the base class to use its type hint
from .workflow import _MorphingWorkflowBase
from .constants import ALL_POSSIBLE_YEARS
[docs]
class MorphingIterator:
"""Automates running multiple morphing configurations from a structured input.
This class is designed to perform parametric analysis by iterating over
different sets of parameters for a given morphing workflow. It uses a
Pandas DataFrame to define the different runs.
The typical usage is a structured, multi-step process that provides
clarity and control at each stage:
**Step 1: Initialization**
Instantiate the iterator with the desired workflow class.
```python
iterator = MorphingIterator(workflow_class=MorphingWorkflowGlobal)
```
**Step 2: Define Common Parameters (Optional)**
Use the `set_default_values()` method to define parameters that will be
the same for all runs in the batch, such as `fwg_jar_path`.
**Step 3: Define the Runs DataFrame**
Create a DataFrame that specifies what changes between each run. This
can be done in two ways:
* **A) Programmatically with Pandas:**
Use `get_template_dataframe()` to get a blank template, then add
rows for each run.
```python
runs_df = iterator.get_template_dataframe()
runs_df.loc = {'epw_paths': 'file1.epw', 'fwg_gcms': ['CanESM5']}
runs_df.loc = {'epw_paths': 'file2.epw', 'fwg_gcms': ['MIROC6']}
```
* **B) Using an Excel Template:**
Use the utility functions to export a template, edit it in Excel,
and then load it back.
```python
from pyfwg import export_template_to_excel, load_runs_from_excel
export_template_to_excel(iterator, 'my_runs.xlsx')
# (User edits the Excel file here)
runs_df = load_runs_from_excel('my_runs.xlsx')
```
**Step 4: Generate the Full Execution Plan**
Call `generate_morphing_workflows()` with the DataFrame of runs. This
method applies all defaults (from the class and from `set_default_values`),
parses filenames, prepares all the underlying workflow instances, and
stores the complete plan for review.
**Step 5: Inspect and Verify (Optional)**
Before running, you can inspect the `iterator.morphing_workflows_plan_df`
DataFrame and the `iterator.prepared_workflows` list to ensure
everything is configured as expected.
**Step 6: Execute the Batch Run**
Call `run_morphing_workflows()` to execute the entire batch of prepared
simulations.
Attributes:
workflow_class (Type[_MorphingWorkflowBase]): The workflow class that
will be used for each iteration.
custom_defaults (Dict[str, Any]): A dictionary of default parameters
set by the user via `set_default_values`.
prepared_workflows (List[_MorphingWorkflowBase]): A list of fully
configured, ready-to-run workflow instances. Populated by
`generate_morphing_workflows`.
morphing_workflows_plan_df (Optional[pd.DataFrame]): A detailed
DataFrame showing the complete configuration for every run in the
batch. Populated by `generate_morphing_workflows`.
"""
def __init__(self, workflow_class: Type[_MorphingWorkflowBase]):
"""Initializes the iterator with a specific workflow class."""
self.workflow_class = workflow_class
self.custom_defaults: Dict[str, Any] = {}
self.prepared_workflows: List[_MorphingWorkflowBase] = []
self.morphing_workflows_plan_df: Optional[pd.DataFrame] = None
logging.info(f"MorphingIterator initialized for {workflow_class.__name__}.")
[docs]
def set_default_values(self, *,
# --- All possible workflow and FWG arguments are listed here ---
final_output_dir: Optional[str] = None,
output_filename_pattern: Optional[str] = None,
scenario_mapping: Optional[Dict[str, str]] = None,
fwg_jar_path: Optional[str] = None,
run_incomplete_files: Optional[bool] = None,
delete_temp_files: Optional[bool] = None,
temp_base_dir: Optional[str] = None,
fwg_show_tool_output: Optional[bool] = None,
fwg_params: Optional[Dict[str, Any]] = None,
# --- Model-specific arguments ---
fwg_gcms: Optional[List[str]] = None,
fwg_rcm_pairs: Optional[List[str]] = None,
# --- Common FWG arguments ---
fwg_create_ensemble: Optional[bool] = None,
fwg_winter_sd_shift: Optional[float] = None,
fwg_summer_sd_shift: Optional[float] = None,
fwg_month_transition_hours: Optional[int] = None,
fwg_use_multithreading: Optional[bool] = None,
fwg_interpolation_method_id: Optional[int] = None,
fwg_limit_variables: Optional[bool] = None,
fwg_solar_hour_adjustment: Optional[int] = None,
fwg_diffuse_irradiation_model: Optional[int] = None,
fwg_add_uhi: Optional[bool] = None,
fwg_epw_original_lcz: Optional[int] = None,
fwg_target_uhi_lcz: Optional[int] = None,
fwg_version: Optional[Union[str, int]] = None):
"""Sets default parameter values for all runs in the batch.
This method is a convenient way to define parameters that are common
to all runs in a parametric study, such as `fwg_jar_path` or a shared
`output_filename_pattern`.
Any parameter set here will be used for every row in the DataFrame
unless a different value is specified for that parameter in the row
itself. This follows a clear priority order:
1. (Lowest) Hardcoded defaults from the workflow class.
2. (Medium) Defaults set with this method.
3. (Highest) Values specified directly in the runs DataFrame.
The method has an explicit signature for all possible arguments,
providing auto-completion and type hints in your editor. It also
includes a validation check to warn the user if they provide a
model-specific argument that is not applicable to the chosen workflow
(e.g., providing `fwg_rcm_pairs` for a `MorphingWorkflowGlobal` instance).
Args:
* (keyword-only): All arguments must be specified by name.
(All arguments are optional and correspond to the parameters of the
workflow's `configure_and_preview` method).
"""
# Manually collect all arguments passed to this method into a dictionary.
provided_args = {
'final_output_dir': final_output_dir, 'output_filename_pattern': output_filename_pattern,
'scenario_mapping': scenario_mapping, 'fwg_jar_path': fwg_jar_path,
'run_incomplete_files': run_incomplete_files, 'delete_temp_files': delete_temp_files,
'temp_base_dir': temp_base_dir, 'fwg_show_tool_output': fwg_show_tool_output,
'fwg_params': fwg_params, 'fwg_gcms': fwg_gcms, 'fwg_rcm_pairs': fwg_rcm_pairs,
'fwg_create_ensemble': fwg_create_ensemble, 'fwg_winter_sd_shift': fwg_winter_sd_shift,
'fwg_summer_sd_shift': fwg_summer_sd_shift, 'fwg_month_transition_hours': fwg_month_transition_hours,
'fwg_use_multithreading': fwg_use_multithreading, 'fwg_interpolation_method_id': fwg_interpolation_method_id,
'fwg_limit_variables': fwg_limit_variables, 'fwg_solar_hour_adjustment': fwg_solar_hour_adjustment,
'fwg_diffuse_irradiation_model': fwg_diffuse_irradiation_model, 'fwg_add_uhi': fwg_add_uhi,
'fwg_diffuse_irradiation_model': fwg_diffuse_irradiation_model, 'fwg_add_uhi': fwg_add_uhi,
'fwg_epw_original_lcz': fwg_epw_original_lcz, 'fwg_target_uhi_lcz': fwg_target_uhi_lcz,
'fwg_version': fwg_version
}
# Filter out any arguments that were not provided (are None).
self.custom_defaults = {key: value for key, value in provided_args.items() if value is not None}
# --- Validation and Warning Logic ---
correct_model_arg = f"fwg_{self.workflow_class.model_arg_name}"
incorrect_model_arg = 'fwg_rcm_pairs' if correct_model_arg == 'fwg_gcms' else 'fwg_gcms'
if incorrect_model_arg in self.custom_defaults:
logging.warning(
f"Argument '{incorrect_model_arg}' is not applicable for "
f"{self.workflow_class.__name__} and will be ignored."
)
# Remove the inapplicable argument so it's not used later.
self.custom_defaults.pop(incorrect_model_arg)
logging.info(f"Custom default values have been set for the iterator: {self.custom_defaults}")
[docs]
def get_template_dataframe(self) -> pd.DataFrame:
"""Generates an empty Pandas DataFrame with the correct parameter columns.
This helper method provides a convenient, error-proof template for the
user to define their parametric runs.
It works by dynamically inspecting the signature of the `configure_and_preview`
method of the workflow class that was passed to the iterator's
constructor (e.g., `MorphingWorkflowGlobal`). This ensures that the
DataFrame columns perfectly match the required and optional arguments
of the specific workflow, preventing typos and errors.
The template also includes the essential columns for the iterator:
`epw_paths`, `input_filename_pattern`, and `keyword_mapping`.
Returns:
pd.DataFrame: An empty Pandas DataFrame with columns corresponding
to all the configurable parameters for a batch run.
"""
# --- 1. Inspect the Workflow's Configuration Method ---
# Get the signature object of the target method. This object contains
# rich metadata about all of its parameters.
sig = inspect.signature(self.workflow_class.configure_and_preview)
# --- 2. Extract Parameter Names ---
# Iterate through the parameters in the signature.
# We only want keyword-only arguments, excluding 'self' and the generic
# '**kwargs' collector if it existed.
param_names = [
p.name for p in sig.parameters.values()
if p.name not in ('self', 'kwargs') and p.kind == p.KEYWORD_ONLY
]
# --- 3. Construct the Final Column List ---
# The final DataFrame should have the iterator-specific columns first,
# followed by all the parameters from the workflow's config method.
final_columns = ['epw_paths', 'input_filename_pattern', 'keyword_mapping'] + param_names
# --- 4. Create and Return the Template ---
# Create an empty DataFrame using the constructed list of column names.
return pd.DataFrame(columns=final_columns)
def _apply_defaults(self, runs_df: pd.DataFrame) -> pd.DataFrame:
"""(Private) Fills missing values and adds missing columns with defaults.
This helper method is the core of the configuration logic. It takes a
user-provided DataFrame (which may be sparse) and creates a complete,
fully populated DataFrame ready for planning and execution.
It operates with a clear priority order:
1. (Lowest) Hardcoded defaults from the workflow class signature.
2. (Medium) Custom defaults set by the user via `set_default_values`.
3. (Highest) Values specified directly in the input `runs_df`.
The method handles two main cases for each parameter:
- If a column for a parameter does not exist in the input DataFrame,
it is created and filled entirely with the final default value.
- If a column exists but contains missing (NaN) values, only those
missing values are filled with the final default value.
Args:
runs_df (pd.DataFrame): The user's DataFrame of runs, potentially
with missing values or columns.
Returns:
pd.DataFrame: A new, fully populated DataFrame with all defaults applied.
"""
# --- 1. Determine the Final Set of Default Values ---
# Get the hardcoded default values from the workflow's method signature.
sig = inspect.signature(self.workflow_class.configure_and_preview)
hardcoded_defaults = {
p.name: p.default
for p in sig.parameters.values()
if p.default is not inspect.Parameter.empty
}
# Create the final defaults dictionary by merging the two sources.
# The custom_defaults (from set_default_values) will override the
# hardcoded_defaults, establishing the correct priority.
final_defaults = {**hardcoded_defaults, **self.custom_defaults}
# Create a copy of the user's DataFrame to avoid modifying the original.
completed_df = runs_df.copy()
# --- 2. Apply Defaults to the DataFrame ---
# Iterate through all available default parameters.
for col, default_val in final_defaults.items():
if col not in completed_df.columns:
if default_val is not None:
# We wrap the default value in a list of the same length as the DataFrame
# to ensure that list or dictionary defaults are assigned to each row
# correctly without causing length mismatch errors.
completed_df[col] = [default_val] * len(completed_df)
# Case 2: The column exists, but may have missing values.
else:
# Only proceed if there is a default value to fill with.
if default_val is not None:
# Use the robust .apply() method to fill only the NaN values.
# This is safer than .fillna() for DataFrames with mixed
# data types (like lists) and avoids FutureWarning.
completed_df[col] = completed_df[col].apply(
lambda x: default_val if pd.isnull(x) else x
)
# Return the fully populated DataFrame.
return completed_df
[docs]
def generate_morphing_workflows(self,
runs_df: pd.DataFrame,
input_filename_pattern: Optional[str] = None,
keyword_mapping: Optional[Dict] = None,
raise_on_overwrite: bool = True):
"""Generates a detailed execution plan and prepares all workflow instances.
This method is the core of the planning phase. It orchestrates the
entire setup for a batch run by performing several key tasks:
1. **Applies Defaults**: It takes the user's (potentially sparse)
DataFrame of runs and creates a complete, fully populated version
by applying all default values from the class and from the
`set_default_values` method.
2. **Maps Categories**: It performs a "dry run" of the file mapping for
each run to extract categories from the EPW filenames.
3. **Enriches the Plan**: It adds new columns (`cat_*`) to the plan
DataFrame, showing the extracted categories for each run.
4. **Validates for Filename Overwrites**: It performs two layers of
validation to prevent data loss: first by checking for varying
parameters, and then by simulating all final filenames to detect
any direct collisions.
5. **Stores the Plan**: The final, validated DataFrame is stored in
`self.morphing_workflows_plan_df` for user inspection.
6. **Prepares Workflows**: It instantiates and fully configures a
`MorphingWorkflow` object for each run in the plan. These
ready-to-run instances are stored in `self.prepared_workflows`.
Args:
runs_df (pd.DataFrame): The user's DataFrame of runs. Each row
represents a unique configuration to be executed.
input_filename_pattern (Optional[str], optional): A regex pattern for
filename mapping, applied as a default to *every* run unless
overridden in the DataFrame. Defaults to None.
keyword_mapping (Optional[Dict], optional): A dictionary of keyword
rules for filename mapping, applied as a default to *every* run
unless overridden in the DataFrame.
raise_on_overwrite (bool, optional): If True (default), raises a
ValueError if a definitive filename overwrite is detected in
Layer 2 validation. Layer 1 validation will always only warn.
Raises:
ValueError: If `raise_on_overwrite` is True and a definitive file
overwrite is detected.
"""
logging.info("Generating detailed execution plan and preparing workflows...")
# --- Part 1: Generate the Detailed DataFrame Plan ---
# First, apply all default values to get a complete parameter set for each run.
plan_df = self._apply_defaults(runs_df)
# If the user provided a static mapping strategy, ensure the corresponding
# columns exist in the plan DataFrame for completeness.
if 'input_filename_pattern' not in plan_df.columns or plan_df['input_filename_pattern'].isnull().all():
plan_df['input_filename_pattern'] = input_filename_pattern
if 'keyword_mapping' not in plan_df.columns or plan_df['keyword_mapping'].isnull().all():
# Pandas requires lists/dicts to be wrapped when assigning to a column.
plan_df['keyword_mapping'] = [keyword_mapping] * len(plan_df)
# This list will store the dictionary of mapped categories for each run.
extracted_categories_per_run = []
# This set will collect all unique category keys found across all runs.
all_category_keys = set()
# Perform a dry run of the mapping for each run in the plan.
for index, row in plan_df.iterrows():
epw_paths = row.get('epw_paths')
epw_files = [epw_paths] if isinstance(epw_paths, str) else epw_paths
# Determine the mapping strategy for this specific run.
run_input_pattern = row.get('input_filename_pattern') if pd.notnull(row.get('input_filename_pattern')) else input_filename_pattern
run_keyword_map = row.get('keyword_mapping') if pd.notnull(row.get('keyword_mapping')) else keyword_mapping
# Create a temporary workflow instance just for mapping.
temp_workflow = self.workflow_class()
temp_workflow.map_categories(
epw_files=epw_files,
input_filename_pattern=run_input_pattern,
keyword_mapping=run_keyword_map
)
# Store the combined dictionary of all mapped categories for this run.
run_categories = {**temp_workflow.epw_categories, **temp_workflow.incomplete_epw_categories}
extracted_categories_per_run.append(run_categories)
# Keep track of all unique category keys found.
for cat_dict in run_categories.values():
all_category_keys.update(cat_dict.keys())
# Add new 'cat_*' columns to the plan DataFrame.
sorted_cat_keys = sorted(list(all_category_keys))
for key in sorted_cat_keys:
plan_df[f'cat_{key}'] = [
# For each run, compile a list of unique values found for this category.
list({cat_dict.get(key) for cat_dict in run_cats.values() if cat_dict.get(key)})
for run_cats in extracted_categories_per_run
]
# Reorder columns to place the new 'cat_*' columns in an intuitive position.
original_cols = list(plan_df.columns)
cat_cols = [f'cat_{key}' for key in sorted_cat_keys]
try:
insert_pos = original_cols.index('keyword_mapping') + 1
except ValueError:
try:
insert_pos = original_cols.index('input_filename_pattern') + 1
except ValueError:
insert_pos = 1 # After 'epw_paths'
non_cat_cols = [c for c in original_cols if not c.startswith('cat_')]
final_cols_order = non_cat_cols[:insert_pos] + cat_cols + non_cat_cols[insert_pos:]
plan_df = plan_df.reindex(columns=final_cols_order)
# --- Part 2: Validate for potential filename overwrites ---
logging.info("Validating for potential filename overwrites...")
# Initialize a flag to track if any overwrite issues were detected.
# This ensures the final log message is accurate.
overwrite_detected = False
# Get the output pattern from the plan (it should be the same for all runs).
output_pattern = plan_df['output_filename_pattern'].iloc[0]
# Find all placeholders the user has included in their pattern.
pattern_placeholders = set(re.findall(r'{(.*?)}', output_pattern))
# --- Validation Layer 1: Proactive check for varying parameters ---
varying_columns = []
# These columns are expected to be different per run but don't need to be in the filename pattern.
ignore_cols = ['epw_paths', 'final_output_dir', 'input_filename_pattern', 'keyword_mapping']
for col in plan_df.columns:
if col in ignore_cols:
continue
try:
# Use a robust method to check for uniqueness, handling lists.
if isinstance(plan_df[col].dropna().iloc[0], list):
unique_count = plan_df[col].dropna().apply(lambda x: tuple(x) if isinstance(x, list) else x).nunique()
else:
unique_count = plan_df[col].nunique()
if unique_count > 1:
varying_columns.append(col)
except (TypeError, IndexError):
# This can happen with mixed types or empty columns, which we can ignore.
continue
# Convert column names to placeholder names (e.g., 'cat_uhi' -> 'uhi').
varying_placeholders = {col.replace('cat_', '') for col in varying_columns}
# Check if any varying parameter or category is missing from the filename pattern.
missing_placeholders = varying_placeholders - pattern_placeholders
if missing_placeholders:
# Construct the message first.
message = (
f"Potential file overwrite detected! The following parameters or categories vary "
f"between runs but are not included as placeholders in the 'output_filename_pattern': "
f"{list(missing_placeholders)}. Please add them to the pattern to ensure unique filenames."
)
# This layer ONLY warns, it never raises an error.
logging.warning(message)
# logging.warning("Execution will continue, but output files may be overwritten.")
overwrite_detected = True
# --- Validation Layer 2: Definitive simulation of all filenames ---
# This set tracks all filenames generated so far to detect collisions.
generated_filenames = set()
# This set will collect ONLY the filenames that cause a collision.
colliding_filenames = set()
for index, row in plan_df.iterrows():
# Create a dictionary of all available placeholders for this run.
# The keys in the row already have the correct 'fwg_' prefix.
fwg_placeholders = {key: value for key, value in row.items() if key.startswith('fwg_')}
cat_placeholders = {key.replace('cat_', ''): value[0] if isinstance(value, list) and len(value) == 1 else value for key, value in row.items() if key.startswith('cat_')}
# Safely get the scenario_mapping dictionary for the current row.
scenario_map = row.get('scenario_mapping')
if not isinstance(scenario_map, dict):
scenario_map = {}
# Simulate the filename generation for every possible climate scenario and year.
for year in ALL_POSSIBLE_YEARS:
for scenario in self.workflow_class.tool_scenarios:
filename_data = {**cat_placeholders, **fwg_placeholders, 'year': year, self.workflow_class.scenario_placeholder_name: scenario_map.get(scenario, scenario)}
try:
# Generate the final filename.
final_filename = output_pattern.format(**filename_data)
# Check if the filename has been seen before.
if final_filename in generated_filenames:
# It's a collision. Add it to the collection of colliding names.
colliding_filenames.add(final_filename)
else:
# It's unique so far. Add it to the set of seen names.
generated_filenames.add(final_filename)
except KeyError as e:
logging.warning(f"Could not generate filename for run {index + 1} due to missing key: {e}")
break
# After checking ALL files, report if any collisions were found.
if colliding_filenames:
# Build a user-friendly error message listing ALL collisions.
sorted_collisions = sorted(list(colliding_filenames))
message = (
f"Definitive file overwrite(s) detected! The following {len(sorted_collisions)} filename(s) are generated by more than one run:\n"
+ "\n".join(f"- {fname}.epw" for fname in sorted_collisions)
+ "\n\nPlease review your `output_filename_pattern` and `keyword_mapping` to ensure unique filenames."
)
# For definitive collisions, we respect the raise_on_overwrite flag.
if raise_on_overwrite:
raise ValueError(message)
else:
logging.warning(message)
logging.warning("Execution will continue, but output files will be overwritten.")
overwrite_detected = True
# Log the final result of the validation.
if not overwrite_detected:
logging.info("Filename validation passed. No overwrites detected.")
else:
logging.info("Filename validation complete. Potential overwrites were detected (see warnings above).")
# --- Part 3: Store the plan and prepare workflows ---
self.morphing_workflows_plan_df = plan_df
logging.info(f"Preparing {len(plan_df)} workflow instances...")
self.prepared_workflows = []
# Iterate through the now-validated plan DataFrame.
for index, row in plan_df.iterrows():
# The row now contains all defaults, so dropna is safe.
run_params = row.dropna().to_dict()
# Extract parameters that are for the iterator, not the workflow config.
epw_paths = run_params.pop('epw_paths')
run_input_pattern = run_params.pop('input_filename_pattern', None) or input_filename_pattern
run_keyword_map = run_params.pop('keyword_mapping', None) or keyword_mapping
# Remove the informational 'cat_*' columns before passing to the config method.
for col in list(run_params.keys()):
if col.startswith('cat_'):
run_params.pop(col)
epw_files = [epw_paths] if isinstance(epw_paths, str) else epw_paths
try:
# Create and configure a workflow instance for this specific run.
workflow = self.workflow_class()
workflow.map_categories(
epw_files=epw_files,
input_filename_pattern=run_input_pattern,
keyword_mapping=run_keyword_map
)
workflow.configure_and_preview(**run_params)
# Add the fully configured instance to the list for later execution.
self.prepared_workflows.append(workflow)
except Exception as e:
logging.error(f"Failed to prepare workflow for run {index + 1}: {e}")
logging.info(f"Execution plan generated and {len(self.prepared_workflows)} workflows prepared.")
[docs]
def run_morphing_workflows(self, show_tool_output: Optional[bool] = None):
"""
Executes the batch of prepared morphing workflows.
This method is the final step in the iterator's workflow. It takes no
arguments to define the scenarios, as it relies entirely on the list of
workflow instances that were created and configured by the
`generate_morphing_workflows` method.
It iterates through the `self.prepared_workflows` list and calls the
`execute_morphing` method on each one that was found to have a valid
configuration during the preparation phase.
Args:
show_tool_output (Optional[bool], optional): A flag to globally
override the console output setting for all workflows in this
specific batch execution.
- If `True` or `False`, it will force this behavior for all runs,
ignoring the `fwg_show_tool_output` value in the plan.
- If `None` (the default), each run will use the
`fwg_show_tool_output` value that was defined for it in the
execution plan.
Raises:
RuntimeError: If `generate_morphing_workflows()` has not been run
first, as there are no prepared workflows to execute.
"""
# Guard clause: Ensure that the preparation step has been completed.
if not self.prepared_workflows:
raise RuntimeError("No workflows have been prepared. Please run generate_morphing_workflows() first.")
logging.info(f"Starting execution of {len(self.prepared_workflows)} prepared runs...")
# Iterate through the list of fully configured workflow instances.
for i, workflow in enumerate(self.prepared_workflows):
logging.info(f"--- Running Run {i + 1}/{len(self.prepared_workflows)} ---")
try:
# --- Override Logic for Tool Output ---
# If the user provides a value for show_tool_output when calling
# this method, it overrides the setting in the prepared workflow instance.
# This provides a convenient way to force silent or verbose output for
# the entire batch at execution time.
if show_tool_output is not None:
workflow.inputs['show_tool_output'] = show_tool_output
# Check the validity flag that was set during the preparation phase.
if workflow.is_config_valid:
# If the configuration is valid, call the workflow's execution method.
workflow.execute_morphing()
else:
# If the configuration was found to be invalid during preparation,
# log an error and skip this run.
logging.error(f"Run {i + 1} skipped due to invalid configuration detected during preparation.")
except Exception as e:
# If any unexpected error occurs during a single run, log it
# and continue with the next run in the batch. This makes the
# iterator robust to single-run failures.
logging.error(f"An unexpected error occurred in run {i + 1}: {e}")
logging.error("Moving to the next run.")
continue
logging.info("Batch run complete.")