1+ import json
2+ import logging
3+ import time
4+ from contextlib import contextmanager
5+ from datetime import timedelta
6+ from functools import wraps
7+ from typing import (
8+ Callable ,
9+ Generator ,
10+ Literal ,
11+ Optional ,
12+ TypeVar ,
13+ Union ,
14+ )
15+
116import httpx
17+ from pydantic import BaseModel , Field , ValidationError
18+ from typing_extensions import Annotated , ParamSpec
219
320from fastapi_cloud_cli import __version__
421from fastapi_cloud_cli .config import Settings
522from fastapi_cloud_cli .utils .auth import get_auth_token
23+ from fastapi_cloud_cli .utils .pydantic_compat import TypeAdapter
24+
25+ logger = logging .getLogger (__name__ )
26+
27+ BUILD_LOG_MAX_RETRIES = 3
28+ BUILD_LOG_TIMEOUT = timedelta (minutes = 5 )
29+
30+
31+ class BuildLogError (Exception ):
32+ pass
33+
34+
35+ class TooManyRetriesError (Exception ):
36+ pass
37+
38+
39+ class BuildLogLineGeneric (BaseModel ):
40+ type : Literal ["complete" , "failed" , "timeout" , "heartbeat" ]
41+ id : Optional [str ] = None
42+
43+
44+ class BuildLogLineMessage (BaseModel ):
45+ type : Literal ["message" ] = "message"
46+ message : str
47+ id : Optional [str ] = None
48+
49+
50+ BuildLogLine = Union [BuildLogLineMessage , BuildLogLineGeneric ]
51+ BuildLogAdapter = TypeAdapter [BuildLogLine ](
52+ Annotated [BuildLogLine , Field (discriminator = "type" )] # type: ignore
53+ )
54+
55+
56+ @contextmanager
57+ def attempt (attempt_number : int ) -> Generator [None , None , None ]:
58+ def _backoff () -> None :
59+ backoff_seconds = min (2 ** attempt_number , 30 )
60+ logger .debug (
61+ "Retrying in %ds (attempt %d)" ,
62+ backoff_seconds ,
63+ attempt_number ,
64+ )
65+ time .sleep (backoff_seconds )
66+
67+ try :
68+ yield
69+
70+ except (
71+ httpx .TimeoutException ,
72+ httpx .NetworkError ,
73+ httpx .RemoteProtocolError ,
74+ ) as error :
75+ logger .debug ("Network error (will retry): %s" , error )
76+
77+ _backoff ()
78+
79+ except httpx .HTTPStatusError as error :
80+ if error .response .status_code >= 500 :
81+ logger .debug (
82+ "Server error %d (will retry): %s" ,
83+ error .response .status_code ,
84+ error ,
85+ )
86+ _backoff ()
87+ else :
88+ # Try to get response text, but handle streaming responses gracefully
89+ try :
90+ error_detail = error .response .text
91+ except Exception :
92+ error_detail = "(response body unavailable)"
93+ raise BuildLogError (
94+ f"HTTP { error .response .status_code } : { error_detail } "
95+ ) from error
96+
97+
98+ P = ParamSpec ("P" )
99+ T = TypeVar ("T" )
100+
101+
102+ def attempts (
103+ total_attempts : int = 3 , timeout : timedelta = timedelta (minutes = 5 )
104+ ) -> Callable [
105+ [Callable [P , Generator [T , None , None ]]], Callable [P , Generator [T , None , None ]]
106+ ]:
107+ def decorator (
108+ func : Callable [P , Generator [T , None , None ]],
109+ ) -> Callable [P , Generator [T , None , None ]]:
110+ @wraps (func )
111+ def wrapper (* args : P .args , ** kwargs : P .kwargs ) -> Generator [T , None , None ]:
112+ start = time .monotonic ()
113+
114+ for attempt_number in range (total_attempts ):
115+ if time .monotonic () - start > timeout .total_seconds ():
116+ raise TimeoutError (
117+ "Build log streaming timed out after %ds" ,
118+ timeout .total_seconds (),
119+ )
120+
121+ with attempt (attempt_number ):
122+ yield from func (* args , ** kwargs )
123+ # If we get here without exception, the generator completed successfully
124+ return
125+
126+ raise TooManyRetriesError (f"Failed after { total_attempts } attempts" )
127+
128+ return wrapper
129+
130+ return decorator
6131
7132
8133class APIClient (httpx .Client ):
@@ -19,3 +144,52 @@ def __init__(self) -> None:
19144 "User-Agent" : f"fastapi-cloud-cli/{ __version__ } " ,
20145 },
21146 )
147+
148+ @attempts (BUILD_LOG_MAX_RETRIES , BUILD_LOG_TIMEOUT )
149+ def stream_build_logs (
150+ self , deployment_id : str
151+ ) -> Generator [BuildLogLine , None , None ]:
152+ last_id = None
153+
154+ while True :
155+ params = {"last_id" : last_id } if last_id else None
156+
157+ with self .stream (
158+ "GET" ,
159+ f"/deployments/{ deployment_id } /build-logs" ,
160+ timeout = 60 ,
161+ params = params ,
162+ ) as response :
163+ response .raise_for_status ()
164+
165+ for line in response .iter_lines ():
166+ if not line or not line .strip ():
167+ continue
168+
169+ if log_line := self ._parse_log_line (line ):
170+ if log_line .id :
171+ last_id = log_line .id
172+
173+ if log_line .type == "message" :
174+ yield log_line
175+
176+ if log_line .type in ("complete" , "failed" ):
177+ yield log_line
178+ return
179+
180+ if log_line .type == "timeout" :
181+ logger .debug ("Received timeout; reconnecting" )
182+ break # Breaks for loop to reconnect
183+ else :
184+ logger .debug ("Connection closed by server unexpectedly; will retry" )
185+
186+ raise httpx .NetworkError ("Connection closed without terminal state" )
187+
188+ time .sleep (0.5 )
189+
190+ def _parse_log_line (self , line : str ) -> Optional [BuildLogLine ]:
191+ try :
192+ return BuildLogAdapter .validate_json (line )
193+ except (ValidationError , json .JSONDecodeError ) as e :
194+ logger .debug ("Skipping malformed log: %s (error: %s)" , line [:100 ], e )
195+ return None
0 commit comments