fix: swap betterproto for grpc-requests on snapshot fetches to reduce CPU#250
Conversation
devsjc
left a comment
There was a problem hiding this comment.
Nice Brad! You've beaten me to it.
| created_timestamp=dt.datetime.fromisoformat(v["created_timestamp_utc"].rstrip("Z")).replace(tzinfo=dt.UTC), | ||
| init_timestamp=dt.datetime.fromisoformat(v["initialization_timestamp_utc"].rstrip("Z")).replace(tzinfo=dt.UTC), |
There was a problem hiding this comment.
Surely there's no need for the rstrip -> replace here? Does fromisoformat not handle timezones??
There was a problem hiding this comment.
Yeah I know, it was having trouble with this for some reason! Weird error, but please fiddle if you did fancy, otherwise I can have a quick look tomo
There was a problem hiding this comment.
dt.datetime.fromisoformat(resp["timestamp_utc"]) seems to work for me
There was a problem hiding this comment.
Nice, in that case much nicer without this hacky string functions! 👍
| if forecaster_version is None: | ||
| resp = svc.ListForecasters({ | ||
| "forecaster_names_filter": [forecaster_name], | ||
| "latest_versions_only": True, | ||
| }) | ||
| forecaster_name = resp["forecasters"][0]["forecaster_name"] | ||
| forecaster_version = resp["forecasters"][0]["forecaster_version"] |
There was a problem hiding this comment.
Could save yourself some lines here and make the version non-optional. We always know it now - especially in the forecast/all call which is where this is being used.
There was a problem hiding this comment.
Ah okay good to know, just trying to stay quite defensive atm but if safe to remove then even better
|
FYI I had to remake |
| ) -> list[models.PredictedGenerationValue]: | ||
| if self._sync_client is not None: | ||
| loop = asyncio.get_running_loop() | ||
| return await loop.run_in_executor( |
There was a problem hiding this comment.
I think you can just run return self._sync_snapshot(....), unless ive missed something
There was a problem hiding this comment.
Ah I was leaving in the default branch as a fallback, but maybe this is overkill – I assume the gRPC server should always have reflection if it does atm
There was a problem hiding this comment.
sorry i meant you dont need loop = asyncio.get_running_loop(), you can juste run self._sync_snapshot(....)
* remake uv.lock, remove strip and timezone * lint * add jinja2 * TDD: add tests * dont use cache if start and end is the same * add params back in * add jinja2 * lint * also use sync for `GetForecastAsTimeseries` * remove code and tidy * dont use real host or port * lint * add mocking
| ) | ||
| client = dp.DataPlatformDataServiceStub(channel=grpc_channel) | ||
| storage = DataPlatformStorage.from_dp(dp_client=client) | ||
| storage.set_sync_client( |
There was a problem hiding this comment.
I'd imagine the type hinter might complain about this since it isn't a function on the generic interface, but I don't think it's worth quibbling over now.
| host_url = "host-url-not-set" | ||
| host_url = ${?HOST_URL} | ||
| workers = 2 | ||
| workers = 1 |
There was a problem hiding this comment.
This feels a shame but I understand the cache thing is a seperate issue
There was a problem hiding this comment.
Is there any meaningful overhead to leaving gunicorn in without actually using (multiple) workers do you know @devsjc?
|
|
||
| if gsp_ids is None and start_datetime_utc != end_datetime_utc: | ||
| if start_datetime_utc_set or end_datetime_utc_set: | ||
| raise HTTPException( |
Pull Request
Description
Reduces CPU and serialisation overhead on the /forecast/all/ endpoint, which fetches data for all 331 GSPs, as well as tweaking exactly what behaviour we want to allow on this endpoint.
gRPC client swap (original PoC — h/t @peterdudfield)
betterproto is pure Python, so deserialising 331 GSPs per snapshot was hammering the CPU. grpc-requests uses the protobuf C extension, does the same work in native C, and releases the GIL so the event loop stays free during the cache warm. CPU during cache warming dropped noticeably (~1% locally).
Serialisation
Swapped jsonable_encoder + json.dumps for Pydantic's TypeAdapter.dump_json, which uses the Rust-based serialiser.
Sync client extended (from #255)
The sync grpc-requests path now also covers GetForecastAsTimeseries, used by /{gsp_id}/forecast. Tests added with mocking.
Cache/routing logic
Default (no gsp_ids, default timestamps): served from warm cache only. Cache miss triggers a background warm and returns 503 + Retry-After: 60.
Custom gsp_ids or non-default timestamps: bypasses cache and hits the backend live.
Timestamp validation added when gsp_ids is not set.
Response ordering
compact=false response is now sorted by gsp_id ascending (sort happens at build time, before caching).
Default factories
default_now_window_start() and default_window_end() extracted as named, exportable functions so they can be called directly in route handler logic for default comparisons.
Helps with https://github.com/openclimatefix/client-private/issues/294
How Has This Been Tested?
Please describe the tests that you ran to verify your changes. Provide instructions so we can reproduce.
Please also list any relevant details for your test configuration
If your changes affect data processing, have you plotted any changes? i.e. have you done a quick sanity check?
Checklist: