gcpdiag.queries.apis_utils

GCP API-related utility functions.
def execute_concurrently( api: Any, requests: List[Any], context: gcpdiag.models.Context) -> Iterator[Tuple[Any, Optional[Any], Optional[Exception]]]:
30def execute_concurrently(
31    api: Any, requests: List[Any], context: models.Context
32) -> Iterator[Tuple[Any, Optional[Any], Optional[Exception]]]:
33  """
34  Executes a list of API requests concurrently.
35  Uses ThreadPoolExecutor in API server context, batch_execute_all in CLI context.
36  Yields: (request, response, exception)
37  """
38  if not requests:
39    return
40
41  if context.context_provider:
42    # API Server context: Use ThreadPoolExecutor
43    exec_ = executor.get_executor(context)
44    future_to_request = {
45        exec_.submit(execute_single_request, req): req for req in requests
46    }
47
48    for future in concurrent.futures.as_completed(future_to_request):
49      request = future_to_request[future]
50      try:
51        response, exception = future.result()
52        yield (request, response, exception)
53      except googleapiclient.errors.HttpError as e:
54        yield (request, None, e)
55  else:
56    # CLI context: Use original batch_execute_all
57    yield from batch_execute_all(api, requests)

Executes a list of API requests concurrently. Uses ThreadPoolExecutor in API server context, batch_execute_all in CLI context. Yields: (request, response, exception)

def execute_concurrently_with_pagination( api: Any, requests: List[Any], next_function: Callable, context: gcpdiag.models.Context, log_text: Optional[str] = None, response_keyword: str = 'items') -> Iterator[Any]:
 97def execute_concurrently_with_pagination(
 98    api: Any,
 99    requests: List[Any],
100    next_function: Callable,
101    context: models.Context,
102    log_text: Optional[str] = None,
103    response_keyword: str = 'items') -> Iterator[Any]:
104  """
105  Executes and paginates a list of API 'list' requests concurrently.
106  """
107  if not context.context_provider:
108    yield from batch_list_all(api, requests, next_function, log_text,
109                              response_keyword)
110    return
111
112  yield from _execute_with_pagination_in_api_context(api, requests,
113                                                     next_function, context,
114                                                     response_keyword)

Executes and paginates a list of API 'list' requests concurrently.

def list_all( request, next_function: Callable, response_keyword='items') -> Iterator[Any]:
117def list_all(request,
118             next_function: Callable,
119             response_keyword='items') -> Iterator[Any]:
120  """Execute GCP API `request` and subsequently call `next_function` until
121  there are no more results. Assumes that it is a list method and that
122  the results are under a `items` key."""
123
124  while True:
125    try:
126      response = request.execute(num_retries=config.API_RETRIES)
127    except googleapiclient.errors.HttpError as err:
128      raise utils.GcpApiError(err) from err
129
130    # Empty lists are omitted in GCP API responses
131    if response_keyword in response:
132      yield from response[response_keyword]
133
134    request = next_function(previous_request=request,
135                            previous_response=response)
136    if request is None:
137      break

Execute GCP API request and subsequently call next_function until there are no more results. Assumes that it is a list method and that the results are under a items key.

def multi_list_all(requests: list, next_function: Callable) -> Iterator[Any]:
140def multi_list_all(
141    requests: list,
142    next_function: Callable,
143) -> Iterator[Any]:
144  for req in requests:
145    yield from list_all(req, next_function)
def batch_list_all( api, requests: list, next_function: Callable, log_text: Optional[str] = None, response_keyword='items'):
148def batch_list_all(api,
149                   requests: list,
150                   next_function: Callable,
151                   log_text: Optional[str] = None,
152                   response_keyword='items'):
153  """Similar to list_all but using batch API except in TPC environment."""
154
155  if 'googleapis.com' not in requests[0].uri:
156    #  the api client library does not handle batch api calls for TPC yet, so
157    #  the batch is processed and collected one at a time in that case
158    for req in requests:
159      yield from list_all(req, next_function)
160  else:
161    yield from _original_batch(api, requests, next_function, log_text,
162                               response_keyword)

Similar to list_all but using batch API except in TPC environment.

def should_retry(resp_status):
201def should_retry(resp_status):
202  if resp_status >= 500:
203    return True
204  if resp_status == 429:  # too many requests
205    return True
206  return False
def get_nth_exponential_random_retry(n, random_pct, multiplier, random_fn=None):
209def get_nth_exponential_random_retry(n, random_pct, multiplier, random_fn=None):
210  random_fn = random_fn or random.random
211  return (1 - random_fn() * random_pct) * multiplier**n
def batch_execute_all(api, requests: list):
214def batch_execute_all(api, requests: list):
215  """Execute all `requests` using the batch API and yield (request,response,exception)
216  tuples."""
217  # results: (request, result, exception) tuples
218  results: List[Tuple[Any, Optional[Any], Optional[Exception]]] = []
219  requests_todo = requests
220  requests_in_flight: List = []
221  retry_count = 0
222
223  def fetch_all_cb(request_id, response, exception):
224    try:
225      request = requests_in_flight[int(request_id)]
226    except (IndexError, ValueError, TypeError):
227      logging.debug(
228          'BUG: Cannot find request %r in list of pending requests, dropping request.',
229          request_id)
230      return
231
232    if exception:
233      if isinstance(exception, googleapiclient.errors.HttpError) and \
234        should_retry(exception.status_code) and \
235        retry_count < config.API_RETRIES:
236        logging.debug('received HTTP error status code %d from API, retrying',
237                      exception.status_code)
238        requests_todo.append(request)
239      else:
240        results.append((request, None, utils.GcpApiError(exception)))
241      return
242
243    if not response:
244      return
245
246    results.append((request, response, None))
247
248  while True:
249    requests_in_flight = requests_todo
250    requests_todo = []
251    results = []
252
253    # Do the batch API request
254    try:
255      batch = api.new_batch_http_request()
256      for i, req in enumerate(requests_in_flight):
257        batch.add(req, callback=fetch_all_cb, request_id=str(i))
258      batch.execute()
259    except (googleapiclient.errors.HttpError, httplib2.HttpLib2Error) as err:
260      if isinstance(err, googleapiclient.errors.HttpError):
261        error_msg = f'received HTTP error status code {err.status_code} from Batch API, retrying'
262      else:
263        error_msg = f'received exception from Batch API: {err}, retrying'
264      if (not isinstance(err, googleapiclient.errors.HttpError) or \
265          should_retry(err.status_code)) \
266          and retry_count < config.API_RETRIES:
267        logging.debug(error_msg)
268        requests_todo = requests_in_flight
269        results = []
270      else:
271        raise utils.GcpApiError(err) from err
272
273    # Yield results
274    yield from results
275
276    # If no requests_todo, means we are done.
277    if not requests_todo:
278      break
279
280    # for example: retry delay: 20% is random, progression: 1, 1.4, 2.0, 2.7, ... 28.9 (10 retries)
281    sleep_time = get_nth_exponential_random_retry(
282        n=retry_count,
283        random_pct=config.API_RETRY_SLEEP_RANDOMNESS_PCT,
284        multiplier=config.API_RETRY_SLEEP_MULTIPLIER)
285    logging.debug('sleeping %.2f seconds before retry #%d', sleep_time,
286                  retry_count + 1)
287    time.sleep(sleep_time)
288    retry_count += 1

Execute all requests using the batch API and yield (request,response,exception) tuples.

def execute_single_request(request: Any) -> Tuple[Optional[Any], Optional[Exception]]:
291def execute_single_request(
292    request: Any) -> Tuple[Optional[Any], Optional[Exception]]:
293  """Executes a single API request and returns the response and exception."""
294  try:
295    response = request.execute(num_retries=config.API_RETRIES)
296    return response, None
297  except googleapiclient.errors.HttpError as e:
298    return None, e

Executes a single API request and returns the response and exception.