gcpdiag.queries.apis_utils

GCP API-related utility functions.
def list_all( request, next_function: Callable, response_keyword='items') -> Iterator[Any]:
28def list_all(request,
29             next_function: Callable,
30             response_keyword='items') -> Iterator[Any]:
31  """Execute GCP API `request` and subsequently call `next_function` until
32  there are no more results. Assumes that it is a list method and that
33  the results are under a `items` key."""
34
35  while True:
36    try:
37      response = request.execute(num_retries=config.API_RETRIES)
38    except googleapiclient.errors.HttpError as err:
39      raise utils.GcpApiError(err) from err
40
41    # Empty lists are omitted in GCP API responses
42    if response_keyword in response:
43      yield from response[response_keyword]
44
45    request = next_function(previous_request=request,
46                            previous_response=response)
47    if request is None:
48      break

Execute GCP API request and subsequently call next_function until there are no more results. Assumes that it is a list method and that the results are under a items key.

def multi_list_all(requests: list, next_function: Callable) -> Iterator[Any]:
51def multi_list_all(
52    requests: list,
53    next_function: Callable,
54) -> Iterator[Any]:
55  for req in requests:
56    yield from list_all(req, next_function)
def batch_list_all( api, requests: list, next_function: Callable, log_text: str, response_keyword='items'):
59def batch_list_all(api,
60                   requests: list,
61                   next_function: Callable,
62                   log_text: str,
63                   response_keyword='items'):
64  """Similar to list_all but using batch API except in TPC environment."""
65
66  if 'googleapis.com' not in requests[0].uri:
67    #  the api client library does not handle batch api calls for TPC yet, so
68    #  the batch is processed and collected one at a time in that case
69    for req in requests:
70      yield from list_all(req, next_function)
71  else:
72    yield from _original_batch(api, requests, next_function, log_text,
73                               response_keyword)

Similar to list_all but using batch API except in TPC environment.

def should_retry(resp_status):
111def should_retry(resp_status):
112  if resp_status >= 500:
113    return True
114  if resp_status == 429:  # too many requests
115    return True
116  return False
def get_nth_exponential_random_retry(n, random_pct, mutiplier, random_fn=None):
119def get_nth_exponential_random_retry(n, random_pct, mutiplier, random_fn=None):
120  random_fn = random_fn or random.random
121  return (1 - random_fn() * random_pct) * mutiplier**n
def batch_execute_all(api, requests: list):
124def batch_execute_all(api, requests: list):
125  """Execute all `requests` using the batch API and yield (request,response,exception)
126  tuples."""
127  # results: (request, result, exception) tuples
128  results: List[Tuple[Any, Optional[Any], Optional[Exception]]] = []
129  requests_todo = requests
130  requests_in_flight: List = []
131  retry_count = 0
132
133  def fetch_all_cb(request_id, response, exception):
134    try:
135      request = requests_in_flight[int(request_id)]
136    except (IndexError, ValueError, TypeError):
137      logging.debug(
138          'BUG: Cannot find request %r in list of pending requests, dropping request.',
139          request_id)
140      return
141
142    if exception:
143      if isinstance(exception, googleapiclient.errors.HttpError) and \
144        should_retry(exception.status_code) and \
145        retry_count < config.API_RETRIES:
146        logging.debug('received HTTP error status code %d from API, retrying',
147                      exception.status_code)
148        requests_todo.append(request)
149      else:
150        results.append((request, None, utils.GcpApiError(exception)))
151      return
152
153    if not response:
154      return
155
156    results.append((request, response, None))
157
158  while True:
159    requests_in_flight = requests_todo
160    requests_todo = []
161    results = []
162
163    # Do the batch API request
164    try:
165      batch = api.new_batch_http_request()
166      for i, req in enumerate(requests_in_flight):
167        batch.add(req, callback=fetch_all_cb, request_id=str(i))
168      batch.execute()
169    except (googleapiclient.errors.HttpError, httplib2.HttpLib2Error) as err:
170      if isinstance(err, googleapiclient.errors.HttpError):
171        error_msg = f'received HTTP error status code {err.status_code} from Batch API, retrying'
172      else:
173        error_msg = f'received exception from Batch API: {err}, retrying'
174      if (not isinstance(err, googleapiclient.errors.HttpError) or \
175          should_retry(err.status_code)) \
176          and retry_count < config.API_RETRIES:
177        logging.debug(error_msg)
178        requests_todo = requests_in_flight
179        results = []
180      else:
181        raise utils.GcpApiError(err) from err
182
183    # Yield results
184    yield from results
185
186    # If no requests_todo, means we are done.
187    if not requests_todo:
188      break
189
190    # for example: retry delay: 20% is random, progression: 1, 1.4, 2.0, 2.7, ... 28.9 (10 retries)
191    sleep_time = get_nth_exponential_random_retry(
192        n=retry_count,
193        random_pct=config.API_RETRY_SLEEP_RANDOMNESS_PCT,
194        mutiplier=config.API_RETRY_SLEEP_MULTIPLIER)
195    logging.debug('sleeping %.2f seconds before retry #%d', sleep_time,
196                  retry_count + 1)
197    time.sleep(sleep_time)
198    retry_count += 1

Execute all requests using the batch API and yield (request,response,exception) tuples.