gcpdiag.queries.apis_utils
GCP API-related utility functions.
def
execute_concurrently( api: Any, requests: List[Any], context: gcpdiag.models.Context) -> Iterator[Tuple[Any, Optional[Any], Optional[Exception]]]:
30def execute_concurrently( 31 api: Any, requests: List[Any], context: models.Context 32) -> Iterator[Tuple[Any, Optional[Any], Optional[Exception]]]: 33 """ 34 Executes a list of API requests concurrently. 35 Uses ThreadPoolExecutor in API server context, batch_execute_all in CLI context. 36 Yields: (request, response, exception) 37 """ 38 if not requests: 39 return 40 41 if context.context_provider: 42 # API Server context: Use ThreadPoolExecutor 43 exec_ = executor.get_executor(context) 44 future_to_request = { 45 exec_.submit(execute_single_request, req): req for req in requests 46 } 47 48 for future in concurrent.futures.as_completed(future_to_request): 49 request = future_to_request[future] 50 try: 51 response, exception = future.result() 52 yield (request, response, exception) 53 except googleapiclient.errors.HttpError as e: 54 yield (request, None, e) 55 else: 56 # CLI context: Use original batch_execute_all 57 yield from batch_execute_all(api, requests)
Executes a list of API requests concurrently. Uses ThreadPoolExecutor in API server context, batch_execute_all in CLI context. Yields: (request, response, exception)
def
execute_concurrently_with_pagination( api: Any, requests: List[Any], next_function: Callable, context: gcpdiag.models.Context, log_text: Optional[str] = None, response_keyword: str = 'items') -> Iterator[Any]:
97def execute_concurrently_with_pagination( 98 api: Any, 99 requests: List[Any], 100 next_function: Callable, 101 context: models.Context, 102 log_text: Optional[str] = None, 103 response_keyword: str = 'items') -> Iterator[Any]: 104 """ 105 Executes and paginates a list of API 'list' requests concurrently. 106 """ 107 if not context.context_provider: 108 yield from batch_list_all(api, requests, next_function, log_text, 109 response_keyword) 110 return 111 112 yield from _execute_with_pagination_in_api_context(api, requests, 113 next_function, context, 114 response_keyword)
Executes and paginates a list of API 'list' requests concurrently.
def
list_all( request, next_function: Callable, response_keyword='items') -> Iterator[Any]:
117def list_all(request, 118 next_function: Callable, 119 response_keyword='items') -> Iterator[Any]: 120 """Execute GCP API `request` and subsequently call `next_function` until 121 there are no more results. Assumes that it is a list method and that 122 the results are under a `items` key.""" 123 124 while True: 125 try: 126 response = request.execute(num_retries=config.API_RETRIES) 127 except googleapiclient.errors.HttpError as err: 128 raise utils.GcpApiError(err) from err 129 130 # Empty lists are omitted in GCP API responses 131 if response_keyword in response: 132 yield from response[response_keyword] 133 134 request = next_function(previous_request=request, 135 previous_response=response) 136 if request is None: 137 break
Execute GCP API request and subsequently call next_function until
there are no more results. Assumes that it is a list method and that
the results are under a items key.
def
multi_list_all(requests: list, next_function: Callable) -> Iterator[Any]:
def
batch_list_all( api, requests: list, next_function: Callable, log_text: Optional[str] = None, response_keyword='items'):
148def batch_list_all(api, 149 requests: list, 150 next_function: Callable, 151 log_text: Optional[str] = None, 152 response_keyword='items'): 153 """Similar to list_all but using batch API except in TPC environment.""" 154 155 if 'googleapis.com' not in requests[0].uri: 156 # the api client library does not handle batch api calls for TPC yet, so 157 # the batch is processed and collected one at a time in that case 158 for req in requests: 159 yield from list_all(req, next_function) 160 else: 161 yield from _original_batch(api, requests, next_function, log_text, 162 response_keyword)
Similar to list_all but using batch API except in TPC environment.
def
should_retry(resp_status):
def
get_nth_exponential_random_retry(n, random_pct, multiplier, random_fn=None):
def
batch_execute_all(api, requests: list):
214def batch_execute_all(api, requests: list): 215 """Execute all `requests` using the batch API and yield (request,response,exception) 216 tuples.""" 217 # results: (request, result, exception) tuples 218 results: List[Tuple[Any, Optional[Any], Optional[Exception]]] = [] 219 requests_todo = requests 220 requests_in_flight: List = [] 221 retry_count = 0 222 223 def fetch_all_cb(request_id, response, exception): 224 try: 225 request = requests_in_flight[int(request_id)] 226 except (IndexError, ValueError, TypeError): 227 logging.debug( 228 'BUG: Cannot find request %r in list of pending requests, dropping request.', 229 request_id) 230 return 231 232 if exception: 233 if isinstance(exception, googleapiclient.errors.HttpError) and \ 234 should_retry(exception.status_code) and \ 235 retry_count < config.API_RETRIES: 236 logging.debug('received HTTP error status code %d from API, retrying', 237 exception.status_code) 238 requests_todo.append(request) 239 else: 240 results.append((request, None, utils.GcpApiError(exception))) 241 return 242 243 if not response: 244 return 245 246 results.append((request, response, None)) 247 248 while True: 249 requests_in_flight = requests_todo 250 requests_todo = [] 251 results = [] 252 253 # Do the batch API request 254 try: 255 batch = api.new_batch_http_request() 256 for i, req in enumerate(requests_in_flight): 257 batch.add(req, callback=fetch_all_cb, request_id=str(i)) 258 batch.execute() 259 except (googleapiclient.errors.HttpError, httplib2.HttpLib2Error) as err: 260 if isinstance(err, googleapiclient.errors.HttpError): 261 error_msg = f'received HTTP error status code {err.status_code} from Batch API, retrying' 262 else: 263 error_msg = f'received exception from Batch API: {err}, retrying' 264 if (not isinstance(err, googleapiclient.errors.HttpError) or \ 265 should_retry(err.status_code)) \ 266 and retry_count < config.API_RETRIES: 267 logging.debug(error_msg) 268 requests_todo = requests_in_flight 269 results = [] 270 else: 271 raise utils.GcpApiError(err) from err 272 273 # Yield results 274 yield from results 275 276 # If no requests_todo, means we are done. 277 if not requests_todo: 278 break 279 280 # for example: retry delay: 20% is random, progression: 1, 1.4, 2.0, 2.7, ... 28.9 (10 retries) 281 sleep_time = get_nth_exponential_random_retry( 282 n=retry_count, 283 random_pct=config.API_RETRY_SLEEP_RANDOMNESS_PCT, 284 multiplier=config.API_RETRY_SLEEP_MULTIPLIER) 285 logging.debug('sleeping %.2f seconds before retry #%d', sleep_time, 286 retry_count + 1) 287 time.sleep(sleep_time) 288 retry_count += 1
Execute all requests using the batch API and yield (request,response,exception)
tuples.
def
execute_single_request(request: Any) -> Tuple[Optional[Any], Optional[Exception]]:
291def execute_single_request( 292 request: Any) -> Tuple[Optional[Any], Optional[Exception]]: 293 """Executes a single API request and returns the response and exception.""" 294 try: 295 response = request.execute(num_retries=config.API_RETRIES) 296 return response, None 297 except googleapiclient.errors.HttpError as e: 298 return None, e
Executes a single API request and returns the response and exception.