gcpdiag.queries.apis

Build and cache GCP APIs + handle authentication.
AUTH_SCOPES = ['openid', 'https://www.googleapis.com/auth/cloud-platform', 'https://www.googleapis.com/auth/userinfo.email']
def set_credentials(cred_json):
83def set_credentials(cred_json):
84  global _credentials
85  if not cred_json:
86    _credentials = None
87  else:
88    _credentials = oauth2_credentials.Credentials.from_authorized_user_info(
89        json.loads(cred_json))
def get_credentials():
 92def get_credentials():
 93  if _auth_method() == 'adc':
 94    return _get_credentials_adc()
 95  elif _auth_method() == 'key':
 96    return _get_credentials_key()
 97  else:
 98    raise AssertionError(
 99        'BUG: AUTH_METHOD method should be one of `adc` or `key`, '
100        f'but got `{_auth_method()}` instead.'
101        ' Please report at https://gcpdiag.dev/issues/')
def login():
111def login():
112  """Force GCP login (this otherwise happens on the first get_api call)."""
113  get_credentials()

Force GCP login (this otherwise happens on the first get_api call).

def get_user_email() -> str:
116def get_user_email() -> str:
117  if config.get('universe_domain') != 'googleapis.com':
118    return 'TPC user'
119  credentials = get_credentials().with_quota_project(None)
120
121  http = google_auth_httplib2.AuthorizedHttp(credentials, http=httplib2.Http())
122  resp, content = http.request('https://www.googleapis.com/userinfo/v2/me')
123  if resp['status'] != '200':
124    raise RuntimeError(f"can't determine user email. status={resp['status']}")
125  data = json.loads(content)
126  logging.debug('determined my email address: %s', data['email'])
127  return data['email']
@caching.cached_api_call(in_memory=True)
def get_api( service_name: str, version: str, project_id: Optional[str] = None, region: Optional[str] = None):
130@caching.cached_api_call(in_memory=True)
131def get_api(service_name: str,
132            version: str,
133            project_id: Optional[str] = None,
134            region: Optional[str] = None):
135  """Get an API object, as returned by googleapiclient.discovery.build.
136
137  If project_id is specified, this will be used as the billed project, and usually
138  you should put there the project id of the project that you are inspecting."""
139  credentials = get_credentials()
140
141  def _request_builder(http, *args, **kwargs):
142    del http
143
144    if 'headers' in kwargs:
145      # thread safety: make sure that original dictionary isn't modified
146      kwargs['headers'] = kwargs['headers'].copy()
147
148      headers = kwargs.get('headers', {})
149      headers['user-agent'] = f'gcpdiag/{config.VERSION} (gzip)'
150      if project_id:
151        headers['x-goog-user-project'] = _get_project_or_billing_id(project_id)
152
153    hooks.request_builder_hook(*args, **kwargs)
154
155    # thread safety: create a new AuthorizedHttp object for every request
156    # https://github.com/googleapis/google-api-python-client/blob/master/docs/thread_safety.md
157    new_http = google_auth_httplib2.AuthorizedHttp(credentials,
158                                                   http=httplib2.Http())
159    return googleapiclient.http.HttpRequest(new_http, *args, **kwargs)
160
161  universe_domain = config.get('universe_domain')
162  cred_universe = getattr(credentials, 'universe_domain', 'googleapis.com')
163  if cred_universe != universe_domain:
164    raise ValueError('credential universe_domain mismatch '
165                     f'{cred_universe} != {universe_domain}')
166  client_options = ClientOptions()
167  if universe_domain != 'googleapis.com':
168    client_options.universe_domain = universe_domain
169  if region:
170    client_options.api_endpoint = f'https://{region}-{service_name}.{universe_domain}'
171  else:
172    client_options.api_endpoint = f'https://{service_name}.{universe_domain}'
173  if service_name in ['compute', 'bigquery', 'storage', 'dns']:
174    client_options.api_endpoint += f'/{service_name}/{version}'
175  api = discovery.build(service_name,
176                        version,
177                        cache_discovery=False,
178                        credentials=credentials,
179                        requestBuilder=_request_builder,
180                        client_options=client_options)
181  return api

Get an API object, as returned by googleapiclient.discovery.build.

If project_id is specified, this will be used as the billed project, and usually you should put there the project id of the project that you are inspecting.

def is_enabled(project_id: str, service_name: str) -> bool:
213def is_enabled(project_id: str, service_name: str) -> bool:
214  universe_domain = config.get('universe_domain')
215  return f'{service_name}.{universe_domain}' in _list_enabled_apis(project_id)
def is_all_enabled(project_id: str, services: list) -> Dict[str, str]:
218def is_all_enabled(project_id: str, services: list) -> Dict[str, str]:
219  service_state = {}
220  for service in services:
221    if not is_enabled(project_id, service):
222      service_state[service] = 'DISABLED'
223    else:
224      service_state[service] = 'ENABLED'
225  return service_state
@caching.cached_api_call(in_memory=True)
def list_services_with_state(project_id: str) -> Dict[str, str]:
228@caching.cached_api_call(in_memory=True)
229def list_services_with_state(project_id: str) -> Dict[str, str]:
230  logging.debug('listing all APIs with their state')
231  serviceusage = get_api('serviceusage', 'v1', project_id)
232  request = serviceusage.services().list(parent=f'projects/{project_id}')
233  apis_state: Dict[str, str] = {}
234  try:
235    while request is not None:
236      response = request.execute(num_retries=config.API_RETRIES)
237      for service in response['services']:
238        apis_state.setdefault(service['config']['name'], service['state'])
239      request = serviceusage.services().list_next(request, response)
240  except googleapiclient.errors.HttpError as err:
241    raise utils.GcpApiError(err) from err
242  return apis_state
def verify_access(project_id: str):
245def verify_access(project_id: str):
246  """Verify that the user has access to the project, exit with an error otherwise."""
247
248  try:
249    if not is_enabled(project_id, 'cloudresourcemanager'):
250      service = f"cloudresourcemanager.{config.get('universe_domain')}"
251      error_msg = (
252          'Cloud Resource Manager API must be enabled. To enable, execute:\n'
253          f'gcloud services enable {service} --project={project_id}')
254      raise utils.GcpApiError(response=error_msg,
255                              service=service,
256                              reason='SERVICE_DISABLED')
257    if not is_enabled(project_id, 'iam'):
258      service = f"iam.{config.get('universe_domain')}"
259      error_msg = (
260          'Identity and Access Management (IAM) API must be enabled. To enable, execute:\n'
261          f"gcloud services enable iam.{config.get('universe_domain')} --project={project_id}"
262      )
263      raise utils.GcpApiError(response=error_msg,
264                              service=service,
265                              reason='SERVICE_DISABLED')
266    if not is_enabled(project_id, 'logging'):
267      service = f"logging.{config.get('universe_domain')}"
268      warning_msg = (
269          'Cloud Logging API is not enabled (related rules will be skipped).'
270          ' To enable, execute:\n'
271          f"gcloud services enable logging.{config.get('universe_domain')} --project={project_id}\n"
272      )
273      raise utils.GcpApiError(response=warning_msg,
274                              service=service,
275                              reason='SERVICE_DISABLED')
276  except utils.GcpApiError as err:
277    if 'SERVICE_DISABLED' == err.reason:
278      if f"serviceusage.{config.get('universe_domain')}" == err.service:
279        err.response += (
280            '\nService Usage API must be enabled. To enable, execute:\n'
281            f"gcloud services enable serviceusage.{config.get('universe_domain')} "
282            f'--project={project_id}')
283    else:
284      logging.error('can\'t access project %s: %s', project_id, err.message)
285    raise err
286  except exceptions.GoogleAuthError as err:
287    logging.error(err)
288    if _auth_method() == 'adc':
289      logging.error('Error using application default credentials. '
290                    'Try running: gcloud auth login --update-adc')
291    raise err
292  # Plug-in additional authorization verifications
293  hooks.verify_access_hook(project_id)

Verify that the user has access to the project, exit with an error otherwise.