@@ -31,14 +31,21 @@ async def run_blend_app() -> None:
3131
3232 Steps:
3333 1. Determine blend reference time (t0)
34- 2. Fetch full location map from Data Platform (national + regional)
34+ 2. Fetch full location map from Data Platform
3535 3. Load the MAE scorecard
36- 4. Calculate national blend weights (used for national location)
37- 5. Calculate regional blend weights (used for all regional locations)
38- 6. For each location: fetch + blend + save
36+ 4. Calculate blend weights and run blend for main models
37+ 5. Save main forecast under {forecaster_name}
38+ 6. If use_adjuster=True:
39+ - Calculate blend weights and run blend for adjuster models
40+ ({model_name}_adjust) — full pipeline runs unchanged
41+ - Save adjuster blend under {forecaster_name}_adjust
3942 """
4043 _cfg = load_blend_config ()
41- logger .info ("Starting NL Blend execution." )
44+ logger .info (
45+ f"Starting NL Blend execution. "
46+ f"use_adjuster={ _cfg .use_adjuster } , "
47+ f"forecaster='{ _cfg .forecaster_name } '" ,
48+ )
4249
4350 # ------------------------------------------------------------------ #
4451 # Determine blend reference time - floor to 15-min boundary #
@@ -94,67 +101,139 @@ async def run_blend_app() -> None:
94101 max_horizon = df_mae .index .max ()
95102
96103 # -------------------------------------------------------------- #
97- # Calculate national blend weights for the national location #
104+ # Main blend #
98105 # -------------------------------------------------------------- #
99- logger .info ("Calculating national blend weights." )
100- try :
101- national_weights_df = await get_blend_weights (
106+ await _run_blend_pass (
107+ client = client ,
108+ t0 = t0 ,
109+ location_uuid = national_location_uuid ,
110+ location_key = NL_NATIONAL_LOCATION_KEY ,
111+ df_mae = df_mae ,
112+ max_horizon = max_horizon ,
113+ forecaster_name = _cfg .forecaster_name ,
114+ )
115+
116+ # -------------------------------------------------------------- #
117+ # Adjuster blend (only if use_adjuster=True in config) #
118+ # Weights are computed from the same module-level constants. #
119+ # Weight column names are suffixed with '_adjust' so that #
120+ # get_blend_forecast_values_latest fetches {model}_adjust from #
121+ # the Data Platform instead of the base model forecasters. #
122+ # -------------------------------------------------------------- #
123+ if _cfg .use_adjuster :
124+ logger .info ("use_adjuster=True: running adjuster blend pass." )
125+ await _run_blend_pass (
126+ client = client ,
102127 t0 = t0 ,
103128 location_uuid = national_location_uuid ,
129+ location_key = NL_NATIONAL_LOCATION_KEY ,
104130 df_mae = df_mae ,
105131 max_horizon = max_horizon ,
106- client = client ,
132+ forecaster_name = _cfg .adjuster_forecaster_name ,
133+ use_adjuster = True ,
107134 )
108- logger .info (f"National blend weights calculated:\n { national_weights_df .head (10 )} " )
109- except Exception :
110- logger .exception ("Failed to calculate national blend weights." )
111- return
112135
113136
114- # -------------------------------------------------------------- #
115- # Blend and save for the national location only #
116- # -------------------------------------------------------------- #
117- location_key = NL_NATIONAL_LOCATION_KEY
118- location_uuid = national_location_uuid
119- logger .info (
120- f"Blending forecasts for national location '{ location_key } ' "
121- f"(uuid={ location_uuid } )" ,
137+ def rename_columns_with_adjuster (weights_df : pd .DataFrame ) -> pd .DataFrame :
138+ """Appends the '_adjust' suffix to all column names in the DataFrame."""
139+ return weights_df .rename (
140+ columns = {col : f"{ col } _adjust" for col in weights_df .columns },
141+ )
142+
143+
144+ async def _run_blend_pass (
145+ client : dp .DataPlatformDataServiceStub ,
146+ t0 : pd .Timestamp ,
147+ location_uuid : str ,
148+ location_key : str ,
149+ df_mae : pd .DataFrame ,
150+ max_horizon : pd .Timedelta ,
151+ forecaster_name : str ,
152+ use_adjuster : bool = False ,
153+ ) -> None :
154+ """Runs the full blend pipeline for one set of models and saves the result.
155+
156+ Shared by the main blend pass and the adjuster blend pass.
157+
158+ Blend weights are always computed from the module-level constants in
159+ ``weights.py`` (NL_BACKUP_MODEL / NL_NATIONAL_CANDIDATE_MODELS).
160+ When *use_adjuster* is True, the weight column names are renamed with an
161+ ``_adjust`` suffix before fetching, so that
162+ :func:`get_blend_forecast_values_latest` fetches ``{model}_adjust``
163+ forecasters from the Data Platform instead of the base model forecasters.
164+
165+ Args:
166+ client: Active Data Platform gRPC client stub.
167+ t0: Blend reference time (UTC).
168+ location_uuid: DP location UUID to blend and save for.
169+ location_key: Human-readable location identifier (for logging).
170+ df_mae: (horizon x model) MAE scorecard.
171+ max_horizon: Maximum scorecard horizon.
172+ forecaster_name: Forecaster tag to save under.
173+ use_adjuster: When True, fetches {model}_adjust forecasters and
174+ saves under {forecaster_name} (caller sets the
175+ correct adjuster forecaster name).
176+ """
177+ log_prefix = "adjuster" if use_adjuster else "blend"
178+ logger .info (
179+ f"[{ log_prefix } ] Starting blend pass for '{ location_key } ' "
180+ f"(forecaster='{ forecaster_name } ', use_adjuster={ use_adjuster } )" ,
181+ )
182+
183+ # Weights are always computed from the module-level constants.
184+ try :
185+ weights_df = await get_blend_weights (
186+ t0 = t0 ,
187+ location_uuid = location_uuid ,
188+ df_mae = df_mae ,
189+ max_horizon = max_horizon ,
190+ client = client ,
122191 )
192+ logger .info (f"[{ log_prefix } ] Blend weights calculated:\n { weights_df .head (10 )} " )
193+ except Exception :
194+ logger .exception (f"[{ log_prefix } ] Failed to calculate blend weights." )
195+ return
123196
124- try :
125- blended_df = await get_blend_forecast_values_latest (
126- location_uuid = location_uuid ,
127- weights_df = national_weights_df ,
128- client = client ,
129- start_datetime = t0 ,
130- )
197+ # For the adjuster pass: rename columns so DP fetches {model}_adjust.
198+ if use_adjuster :
199+ weights_df = rename_columns_with_adjuster ( weights_df )
200+ logger . info (
201+ f"[ { log_prefix } ] Weight columns renamed with '_adjust' suffix: "
202+ f" { list ( weights_df . columns ) } " ,
203+ )
131204
132- if blended_df .empty :
133- logger .warning (
134- f"Blended timeseries is empty for location '{ location_key } '. "
135- "This is expected in dev when no forecast megawatts are stored." ,
136- )
137- return
205+ # Fetch and blend
206+ try :
207+ blended_df = await get_blend_forecast_values_latest (
208+ location_uuid = location_uuid ,
209+ weights_df = weights_df ,
210+ client = client ,
211+ start_datetime = t0 ,
212+ )
213+ except Exception :
214+ logger .exception (f"[{ log_prefix } ] Failed to fetch or blend forecast timeseries." )
215+ return
138216
139- logger .info (
140- f"Blended timeseries for '{ location_key } ' "
141- f"(first 5 rows):\n { blended_df .head (5 )} " ,
142- )
217+ if blended_df .empty :
218+ logger .warning (
219+ f"[{ log_prefix } ] Blended timeseries is empty for '{ location_key } '. "
220+ "This is expected in dev when no forecast megawatts are stored." ,
221+ )
222+ return
143223
144- await _save_forecasts (
145- client = client ,
146- t0 = t0 ,
147- location_uuid = location_uuid ,
148- location_key = location_key ,
149- blended_df = blended_df ,
150- forecaster_name = _cfg .forecaster_name ,
151- )
224+ logger .info (
225+ f"[{ log_prefix } ] Blended timeseries for '{ location_key } ' "
226+ f"(first 5 rows):\n { blended_df .head (5 )} " ,
227+ )
152228
153- except Exception :
154- logger .exception (
155- f"Failed to blend or save forecasts for national location '{ location_key } ' "
156- f"(uuid={ location_uuid } )." ,
157- )
229+ await _save_forecasts (
230+ client = client ,
231+ t0 = t0 ,
232+ location_uuid = location_uuid ,
233+ location_key = location_key ,
234+ blended_df = blended_df ,
235+ forecaster_name = forecaster_name ,
236+ )
158237
159238
160239async def _save_forecasts (
@@ -168,14 +247,14 @@ async def _save_forecasts(
168247 """Persists the blended forecast to the Data Platform.
169248
170249 Args:
171- client: Active Data Platform gRPC client stub.
172- t0: Blend reference time (UTC); used as the forecast init_time .
173- location_uuid: DP location UUID to write forecasts under.
174- location_key: Human-readable location identifier.
175- blended_df: DataFrame with columns [target_time,
176- expected_power_generation_megawatts, p10_mw (opt),
177- p90_mw (opt)].
178- forecaster_name: Forecaster tag written to the Data Platform.
250+ client: Active Data Platform gRPC client stub.
251+ t0: Blend reference time (UTC).
252+ location_uuid: DP location UUID to write forecasts under.
253+ location_key: Human-readable location identifier (for logging only) .
254+ blended_df: DataFrame with columns [target_time,
255+ expected_power_generation_megawatts, p10_mw (opt),
256+ p90_mw (opt)].
257+ forecaster_name: Forecaster tag written to the Data Platform.
179258 """
180259 n_rows = len (blended_df )
181260 has_p10 = "p10_mw" in blended_df .columns
@@ -188,29 +267,27 @@ async def _save_forecasts(
188267 f"p50={ n_rows } | p10={ n_p10 } | p90={ n_p90 } rows with valid values." ,
189268 )
190269
191- # ------------------------------------------------------------------ #
192- # Build the DP value objects #
193- # ------------------------------------------------------------------ #
270+ # Build DP value objects
194271 try :
195272 forecast_values = build_forecast_value_objects (
196273 blended_df = blended_df ,
197274 init_time_utc = t0 .to_pydatetime (),
198275 )
199276 except Exception :
200277 logger .exception (
201- f"Failed to build DP forecast value objects for '{ location_key } ' - skipping save." ,
278+ f"Failed to build DP forecast value objects for "
279+ f"'{ location_key } ' - skipping save." ,
202280 )
203281 return
204282
205283 if not forecast_values :
206284 logger .warning (
207- f"No forecast value objects produced for '{ location_key } ' - skipping save." ,
285+ f"No forecast value objects produced for "
286+ f"'{ location_key } ' - skipping save." ,
208287 )
209288 return
210289
211- # ------------------------------------------------------------------ #
212- # Resolve / create the forecaster record #
213- # ------------------------------------------------------------------ #
290+ # Resolve / create forecaster record
214291 try :
215292 forecaster = await create_forecaster_if_not_exists (
216293 client = client ,
@@ -222,28 +299,27 @@ async def _save_forecasts(
222299 )
223300 except Exception :
224301 logger .exception (
225- f"Failed to resolve/create blend forecaster for '{ location_key } ' - skipping save." ,
302+ f"Failed to resolve/create forecaster '{ forecaster_name } ' "
303+ f"for '{ location_key } ' - skipping save." ,
226304 )
227305 return
228306
229- base_request = dp .CreateForecastRequest (
230- forecaster = forecaster ,
231- location_uuid = location_uuid ,
232- energy_source = dp .EnergySource .SOLAR ,
233- init_time_utc = t0 .to_pydatetime (),
234- values = forecast_values ,
235- )
236-
237- # ------------------------------------------------------------------ #
238- # Write to Data Platform #
239- # ------------------------------------------------------------------ #
307+ # Write to Data Platform
240308 logger .info (
241309 f"Saving { n_rows } rows to Data Platform "
242- f"(forecaster='nl_blend ', t0={ t0 } , location='{ location_key } ') - "
310+ f"(forecaster='{ forecaster_name } ', t0={ t0 } , location='{ location_key } ') - "
243311 f"p50={ n_rows } , p10={ n_p10 } , p90={ n_p90 } valid rows." ,
244312 )
245313 try :
246- await client .create_forecast (base_request )
314+ await client .create_forecast (
315+ dp .CreateForecastRequest (
316+ forecaster = forecaster ,
317+ location_uuid = location_uuid ,
318+ energy_source = dp .EnergySource .SOLAR ,
319+ init_time_utc = t0 .to_pydatetime (),
320+ values = forecast_values ,
321+ ),
322+ )
247323 logger .info (f"Forecast write succeeded for '{ location_key } '." )
248324 except Exception :
249325 logger .exception (f"Failed to write forecast for '{ location_key } '." )
0 commit comments