Skip to content
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
235 changes: 153 additions & 82 deletions site_forecast_app/blend/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,21 @@ async def run_blend_app() -> None:

Steps:
1. Determine blend reference time (t0)
2. Fetch full location map from Data Platform (national + regional)
2. Fetch full location map from Data Platform
3. Load the MAE scorecard
4. Calculate national blend weights (used for national location)
5. Calculate regional blend weights (used for all regional locations)
6. For each location: fetch + blend + save
4. Calculate blend weights and run blend for main models
5. Save main forecast under {forecaster_name}
6. If use_adjuster=True:
- Calculate blend weights and run blend for adjuster models
({model_name}_adjust) — full pipeline runs unchanged
- Save adjuster blend under {forecaster_name}_adjust
"""
_cfg = load_blend_config()
logger.info("Starting NL Blend execution.")
logger.info(
f"Starting NL Blend execution. "
f"use_adjuster={_cfg.use_adjuster}, "
f"forecaster='{_cfg.forecaster_name}'",
)

# ------------------------------------------------------------------ #
# Determine blend reference time - floor to 15-min boundary #
Expand Down Expand Up @@ -94,67 +101,134 @@ async def run_blend_app() -> None:
max_horizon = df_mae.index.max()

# -------------------------------------------------------------- #
# Calculate national blend weights for the national location #
# Main blend #
# -------------------------------------------------------------- #
logger.info("Calculating national blend weights.")
try:
national_weights_df = await get_blend_weights(
await _run_blend_pass(
client=client,
t0=t0,
location_uuid=national_location_uuid,
location_key=NL_NATIONAL_LOCATION_KEY,
df_mae=df_mae,
max_horizon=max_horizon,
forecaster_name=_cfg.forecaster_name,
)

# -------------------------------------------------------------- #
# Adjuster blend (only if use_adjuster=True in config) #
# Weights are computed from the same module-level constants. #
# Weight column names are suffixed with '_adjust' so that #
# get_blend_forecast_values_latest fetches {model}_adjust from #
# the Data Platform instead of the base model forecasters. #
# -------------------------------------------------------------- #
if _cfg.use_adjuster:
logger.info("use_adjuster=True: running adjuster blend pass.")
await _run_blend_pass(
client=client,
t0=t0,
location_uuid=national_location_uuid,
location_key=NL_NATIONAL_LOCATION_KEY,
df_mae=df_mae,
max_horizon=max_horizon,
client=client,
forecaster_name=_cfg.adjuster_forecaster_name,
use_adjuster=True,
)
logger.info(f"National blend weights calculated:\n{national_weights_df.head(10)}")
except Exception:
logger.exception("Failed to calculate national blend weights.")
return


# -------------------------------------------------------------- #
# Blend and save for the national location only #
# -------------------------------------------------------------- #
location_key = NL_NATIONAL_LOCATION_KEY
location_uuid = national_location_uuid
logger.info(
f"Blending forecasts for national location '{location_key}' "
f"(uuid={location_uuid})",
async def _run_blend_pass(
client: dp.DataPlatformDataServiceStub,
t0: pd.Timestamp,
location_uuid: str,
location_key: str,
df_mae: pd.DataFrame,
max_horizon: pd.Timedelta,
forecaster_name: str,
use_adjuster: bool = False,
) -> None:
"""Runs the full blend pipeline for one set of models and saves the result.

Shared by the main blend pass and the adjuster blend pass.

Blend weights are always computed from the module-level constants in
``weights.py`` (NL_BACKUP_MODEL / NL_NATIONAL_CANDIDATE_MODELS).
When *use_adjuster* is True, the weight column names are renamed with an
``_adjust`` suffix before fetching, so that
:func:`get_blend_forecast_values_latest` fetches ``{model}_adjust``
forecasters from the Data Platform instead of the base model forecasters.

Args:
client: Active Data Platform gRPC client stub.
t0: Blend reference time (UTC).
location_uuid: DP location UUID to blend and save for.
location_key: Human-readable location identifier (for logging).
df_mae: (horizon x model) MAE scorecard.
max_horizon: Maximum scorecard horizon.
forecaster_name: Forecaster tag to save under.
use_adjuster: When True, fetches {model}_adjust forecasters and
saves under {forecaster_name} (caller sets the
correct adjuster forecaster name).
"""
log_prefix = "adjuster" if use_adjuster else "blend"
logger.info(
f"[{log_prefix}] Starting blend pass for '{location_key}' "
f"(forecaster='{forecaster_name}', use_adjuster={use_adjuster})",
)

# Weights are always computed from the module-level constants.
try:
weights_df = await get_blend_weights(
t0=t0,
location_uuid=location_uuid,
df_mae=df_mae,
max_horizon=max_horizon,
client=client,
)
logger.info(f"[{log_prefix}] Blend weights calculated:\n{weights_df.head(10)}")
except Exception:
logger.exception(f"[{log_prefix}] Failed to calculate blend weights.")
return

try:
blended_df = await get_blend_forecast_values_latest(
location_uuid=location_uuid,
weights_df=national_weights_df,
client=client,
start_datetime=t0,
)
# For the adjuster pass: rename columns so DP fetches {model}_adjust.
if use_adjuster:
weights_df = weights_df.rename(
columns={col: f"{col}_adjust" for col in weights_df.columns},
)
logger.info(
f"[{log_prefix}] Weight columns renamed with '_adjust' suffix: "
f"{list(weights_df.columns)}",
)

if blended_df.empty:
logger.warning(
f"Blended timeseries is empty for location '{location_key}'. "
"This is expected in dev when no forecast megawatts are stored.",
)
return
# Fetch and blend
try:
blended_df = await get_blend_forecast_values_latest(
location_uuid=location_uuid,
weights_df=weights_df,
client=client,
start_datetime=t0,
)
except Exception:
logger.exception(f"[{log_prefix}] Failed to fetch or blend forecast timeseries.")
return

logger.info(
f"Blended timeseries for '{location_key}' "
f"(first 5 rows):\n{blended_df.head(5)}",
)
if blended_df.empty:
logger.warning(
f"[{log_prefix}] Blended timeseries is empty for '{location_key}'. "
"This is expected in dev when no forecast megawatts are stored.",
)
return

await _save_forecasts(
client=client,
t0=t0,
location_uuid=location_uuid,
location_key=location_key,
blended_df=blended_df,
forecaster_name=_cfg.forecaster_name,
)
logger.info(
f"[{log_prefix}] Blended timeseries for '{location_key}' "
f"(first 5 rows):\n{blended_df.head(5)}",
)

except Exception:
logger.exception(
f"Failed to blend or save forecasts for national location '{location_key}' "
f"(uuid={location_uuid}).",
)
await _save_forecasts(
client=client,
t0=t0,
location_uuid=location_uuid,
location_key=location_key,
blended_df=blended_df,
forecaster_name=forecaster_name,
)


async def _save_forecasts(
Expand All @@ -168,14 +242,14 @@ async def _save_forecasts(
"""Persists the blended forecast to the Data Platform.

Args:
client: Active Data Platform gRPC client stub.
t0: Blend reference time (UTC); used as the forecast init_time.
location_uuid: DP location UUID to write forecasts under.
location_key: Human-readable location identifier.
blended_df: DataFrame with columns [target_time,
expected_power_generation_megawatts, p10_mw (opt),
p90_mw (opt)].
forecaster_name: Forecaster tag written to the Data Platform.
client: Active Data Platform gRPC client stub.
t0: Blend reference time (UTC).
location_uuid: DP location UUID to write forecasts under.
location_key: Human-readable location identifier (for logging only).
blended_df: DataFrame with columns [target_time,
expected_power_generation_megawatts, p10_mw (opt),
p90_mw (opt)].
forecaster_name: Forecaster tag written to the Data Platform.
"""
n_rows = len(blended_df)
has_p10 = "p10_mw" in blended_df.columns
Expand All @@ -188,29 +262,27 @@ async def _save_forecasts(
f"p50={n_rows} | p10={n_p10} | p90={n_p90} rows with valid values.",
)

# ------------------------------------------------------------------ #
# Build the DP value objects #
# ------------------------------------------------------------------ #
# Build DP value objects
try:
forecast_values = build_forecast_value_objects(
blended_df=blended_df,
init_time_utc=t0.to_pydatetime(),
)
except Exception:
logger.exception(
f"Failed to build DP forecast value objects for '{location_key}' - skipping save.",
f"Failed to build DP forecast value objects for "
f"'{location_key}' - skipping save.",
)
return

if not forecast_values:
logger.warning(
f"No forecast value objects produced for '{location_key}' - skipping save.",
f"No forecast value objects produced for "
f"'{location_key}' - skipping save.",
)
return

# ------------------------------------------------------------------ #
# Resolve / create the forecaster record #
# ------------------------------------------------------------------ #
# Resolve / create forecaster record
try:
forecaster = await create_forecaster_if_not_exists(
client=client,
Expand All @@ -222,28 +294,27 @@ async def _save_forecasts(
)
except Exception:
logger.exception(
f"Failed to resolve/create blend forecaster for '{location_key}' - skipping save.",
f"Failed to resolve/create forecaster '{forecaster_name}' "
f"for '{location_key}' - skipping save.",
)
return

base_request = dp.CreateForecastRequest(
forecaster=forecaster,
location_uuid=location_uuid,
energy_source=dp.EnergySource.SOLAR,
init_time_utc=t0.to_pydatetime(),
values=forecast_values,
)

# ------------------------------------------------------------------ #
# Write to Data Platform #
# ------------------------------------------------------------------ #
# Write to Data Platform
logger.info(
f"Saving {n_rows} rows to Data Platform "
f"(forecaster='nl_blend', t0={t0}, location='{location_key}') - "
f"(forecaster='{forecaster_name}', t0={t0}, location='{location_key}') - "
f"p50={n_rows}, p10={n_p10}, p90={n_p90} valid rows.",
)
try:
await client.create_forecast(base_request)
await client.create_forecast(
dp.CreateForecastRequest(
forecaster=forecaster,
location_uuid=location_uuid,
energy_source=dp.EnergySource.SOLAR,
init_time_utc=t0.to_pydatetime(),
values=forecast_values,
),
)
logger.info(f"Forecast write succeeded for '{location_key}'.")
except Exception:
logger.exception(f"Failed to write forecast for '{location_key}'.")
Expand Down
19 changes: 19 additions & 0 deletions site_forecast_app/blend/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,20 @@ class NlBlendConfig(BaseModel):
description="Minimum forecast horizon emitted in any blended forecast.",
)

# ------------------------------------------------------------------
# Adjuster
# ------------------------------------------------------------------
use_adjuster: bool = Field(
False,
title="Use Adjuster",
description=(
"Whether to run a second blend pass using adjuster model forecasts "
"({model_name}_adjust) and save the result under {forecaster_name}_adjust. "
"The full blend pipeline runs unchanged on the adjuster model names. "
"Set to false to skip the adjuster blend entirely."
),
)

# ------------------------------------------------------------------
# Infrastructure / naming
# ------------------------------------------------------------------
Expand All @@ -79,6 +93,11 @@ def min_forecast_horizon(self) -> pd.Timedelta:
"""Minimum forecast horizon as a pd.Timedelta."""
return pd.Timedelta(minutes=self.min_forecast_horizon_minutes)

@property
def adjuster_forecaster_name(self) -> str:
"""Forecaster name for the adjusted blend."""
return f"{self.forecaster_name}_adjust"


class NlBlendConfigWrapper(BaseModel):
"""Wrapper for the NL Blend configuration."""
Expand Down
7 changes: 6 additions & 1 deletion site_forecast_app/blend/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ blend:
- nl_national_pv_ecmwf_sat_small

# Candidate models for regional blends (subset of national candidates).
# These values are not currently being used
# These values are not currently being used.
regional_candidate_models:
- nl_regional_48h_pv_ecmwf
- nl_regional_pv_ecmwf_mo_sat
Expand All @@ -37,6 +37,11 @@ blend:
# Minimum forecast horizon (minutes) emitted in any blended forecast.
min_forecast_horizon_minutes: 15

# ------------------------------------------------------------------ #
# Adjuster #
# ------------------------------------------------------------------ #
use_adjuster: true

# ------------------------------------------------------------------ #
# Infrastructure / naming #
# ------------------------------------------------------------------ #
Expand Down
Loading
Loading