
Build and cache GCP APIs + handle authentication.
AUTH_SCOPES = ['openid', 'https://www.googleapis.com/auth/cloud-platform', 'https://www.googleapis.com/auth/userinfo.email']
83def set_credentials(cred_json):
84  global _credentials
85  if not cred_json:
86    _credentials = None
87  else:
88    _credentials = oauth2_credentials.Credentials.from_authorized_user_info(
89        json.loads(cred_json))
 92def get_credentials():
 93  if _auth_method() == 'adc':
 94    return _get_credentials_adc()
 95  elif _auth_method() == 'key':
 96    return _get_credentials_key()
 97  else:
 98    raise AssertionError(
 99        'BUG: AUTH_METHOD method should be one of `adc` or `key`, '
100        f'but got `{_auth_method()}` instead.'
101        ' Please report at https://gcpdiag.dev/issues/')
111def login():
112  """Force GCP login (this otherwise happens on the first get_api call)."""
113  get_credentials()

def get_user_email() -> str:
116def get_user_email() -> str:
117  if config.get('universe_domain') != 'googleapis.com':
118    return 'TPC user'
119  credentials = get_credentials().with_quota_project(None)
121  http = google_auth_httplib2.AuthorizedHttp(credentials, http=httplib2.Http())
122  resp, content = http.request('https://www.googleapis.com/userinfo/v2/me')
123  if resp['status'] != '200':
124    raise RuntimeError(f"can't determine user email. status={resp['status']}")
125  data = json.loads(content)
126  logging.debug('determined my email address: %s', data['email'])
127  return data['email']
131def get_api(service_name: str,
132            version: str,
133            project_id: Optional[str] = None,
134            region: Optional[str] = None):
135  """Get an API object, as returned by googleapiclient.discovery.build.
137  If project_id is specified, this will be used as the billed project, and usually
138  you should put there the project id of the project that you are inspecting."""
139  credentials = get_credentials()
141  def _request_builder(http, *args, **kwargs):
142    del http
144    if 'headers' in kwargs:
145      # thread safety: make sure that original dictionary isn't modified
146      kwargs['headers'] = kwargs['headers'].copy()
148      headers = kwargs.get('headers', {})
149      headers['user-agent'] = f'gcpdiag/{config.VERSION} (gzip)'
150      if project_id:
151        headers['x-goog-user-project'] = _get_project_or_billing_id(project_id)
153    hooks.request_builder_hook(*args, **kwargs)
155    # thread safety: create a new AuthorizedHttp object for every request
156    # https://github.com/googleapis/google-api-python-client/blob/master/docs/thread_safety.md
157    new_http = google_auth_httplib2.AuthorizedHttp(credentials,
158                                                   http=httplib2.Http())
159    return googleapiclient.http.HttpRequest(new_http, *args, **kwargs)
161  universe_domain = config.get('universe_domain')
162  cred_universe = getattr(credentials, 'universe_domain', 'googleapis.com')
163  if cred_universe != universe_domain:
164    raise ValueError('credential universe_domain mismatch '
165                     f'{cred_universe} != {universe_domain}')
166  client_options = ClientOptions()
167  if universe_domain != 'googleapis.com':
168    client_options.universe_domain = universe_domain
169  if region:
170    client_options.api_endpoint = f'https://{region}-{service_name}.{universe_domain}'
171  else:
172    client_options.api_endpoint = f'https://{service_name}.{universe_domain}'
173  if service_name in ['compute']:
174    client_options.api_endpoint += f'/{service_name}/{version}'
175  api = discovery.build(service_name,
176                        version,
177                        cache_discovery=False,
178                        credentials=credentials,
179                        requestBuilder=_request_builder,
180                        client_options=client_options)
181  return api

203def is_enabled(project_id: str, service_name: str) -> bool:
204  universe_domain = config.get('universe_domain')
205  return f'{service_name}.{universe_domain}' in _list_enabled_apis(project_id)
208def is_all_enabled(project_id: str, services: list) -> Dict[str, str]:
209  service_state = {}
210  for service in services:
211    if not is_enabled(project_id, service):
212      service_state[service] = 'DISABLED'
213    else:
214      service_state[service] = 'ENABLED'
215  return service_state
219def list_services_with_state(project_id: str) -> Dict[str, str]:
220  logging.debug('listing all APIs with their state')
221  serviceusage = get_api('serviceusage', 'v1', project_id)
222  request = serviceusage.services().list(parent=f'projects/{project_id}')
223  apis_state: Dict[str, str] = {}
224  try:
225    while request is not None:
226      response = request.execute(num_retries=config.API_RETRIES)
227      for service in response['services']:
228        apis_state.setdefault(service['config']['name'], service['state'])
229      request = serviceusage.services().list_next(request, response)
230  except googleapiclient.errors.HttpError as err:
231    raise utils.GcpApiError(err) from err
232  return apis_state
235def verify_access(project_id: str):
236  """Verify that the user has access to the project, exit with an error otherwise."""
238  try:
239    if not is_enabled(project_id, 'cloudresourcemanager'):
240      service = f'cloudresourcemanager.{config.get("universe_domain")}'
241      error_msg = (
242          'Cloud Resource Manager API must be enabled. To enable, execute:\n'
243          f'gcloud services enable {service} --project={project_id}')
244      raise utils.GcpApiError(response=error_msg,
245                              service=service,
246                              reason='SERVICE_DISABLED')
247    if not is_enabled(project_id, 'iam'):
248      service = f'iam.{config.get("universe_domain")}'
249      error_msg = (
250          'Identity and Access Management (IAM) API must be enabled. To enable, execute:\n'
251          f'gcloud services enable iam.{config.get("universe_domain")} --project={project_id}'
252      )
253      raise utils.GcpApiError(response=error_msg,
254                              service=service,
255                              reason='SERVICE_DISABLED')
256    if not is_enabled(project_id, 'logging'):
257      service = f'logging.{config.get("universe_domain")}'
258      warning_msg = (
259          'Cloud Logging API is not enabled (related rules will be skipped).'
260          ' To enable, execute:\n'
261          f'gcloud services enable logging.{config.get("universe_domain")} --project={project_id}\n'
262      )
263      raise utils.GcpApiError(response=warning_msg,
264                              service=service,
265                              reason='SERVICE_DISABLED')
266  except utils.GcpApiError as err:
267    if 'SERVICE_DISABLED' == err.reason:
268      if f'serviceusage.{config.get("universe_domain")}' == err.service:
269        err.response += (
270            '\nService Usage API must be enabled. To enable, execute:\n'
271            f'gcloud services enable serviceusage.{config.get("universe_domain")} '
272            f'--project={project_id}')
273    else:
274      logging.error('can\'t access project %s: %s', project_id, err.message)
275    raise err
276  except exceptions.GoogleAuthError as err:
277    logging.error(err)
278    if _auth_method() == 'adc':
279      logging.error('Error using application default credentials. '
280                    'Try running: gcloud auth login --update-adc')
281    raise err
282  # Plug-in additional authorization verifications
283  hooks.verify_access_hook(project_id)

