gcpdiag.queries.apis
Build and cache GCP APIs + handle authentication.
AUTH_SCOPES =
['openid', 'https://www.googleapis.com/auth/cloud-platform', 'https://www.googleapis.com/auth/userinfo.email']
def
set_credentials(cred_json):
def
get_credentials():
92def get_credentials(): 93 if _auth_method() == 'adc': 94 return _get_credentials_adc() 95 elif _auth_method() == 'key': 96 return _get_credentials_key() 97 else: 98 raise AssertionError( 99 'BUG: AUTH_METHOD method should be one of `adc` or `key`, ' 100 f'but got `{_auth_method()}` instead.' 101 ' Please report at https://gcpdiag.dev/issues/')
def
login():
111def login(): 112 """Force GCP login (this otherwise happens on the first get_api call).""" 113 get_credentials()
Force GCP login (this otherwise happens on the first get_api call).
def
get_user_email() -> str:
116def get_user_email() -> str: 117 if config.get('universe_domain') != 'googleapis.com': 118 return 'TPC user' 119 credentials = get_credentials().with_quota_project(None) 120 121 http = google_auth_httplib2.AuthorizedHttp(credentials, http=httplib2.Http()) 122 resp, content = http.request('https://www.googleapis.com/userinfo/v2/me') 123 if resp['status'] != '200': 124 raise RuntimeError(f"can't determine user email. status={resp['status']}") 125 data = json.loads(content) 126 logging.debug('determined my email address: %s', data['email']) 127 return data['email']
@caching.cached_api_call(in_memory=True)
def
get_api( service_name: str, version: str, project_id: Optional[str] = None, region: Optional[str] = None):
130@caching.cached_api_call(in_memory=True) 131def get_api(service_name: str, 132 version: str, 133 project_id: Optional[str] = None, 134 region: Optional[str] = None): 135 """Get an API object, as returned by googleapiclient.discovery.build. 136 137 If project_id is specified, this will be used as the billed project, and usually 138 you should put there the project id of the project that you are inspecting.""" 139 credentials = get_credentials() 140 141 def _request_builder(http, *args, **kwargs): 142 del http 143 144 if 'headers' in kwargs: 145 # thread safety: make sure that original dictionary isn't modified 146 kwargs['headers'] = kwargs['headers'].copy() 147 148 headers = kwargs.get('headers', {}) 149 headers['user-agent'] = f'gcpdiag/{config.VERSION} (gzip)' 150 if project_id: 151 headers['x-goog-user-project'] = _get_project_or_billing_id(project_id) 152 153 hooks.request_builder_hook(*args, **kwargs) 154 155 # thread safety: create a new AuthorizedHttp object for every request 156 # https://github.com/googleapis/google-api-python-client/blob/master/docs/thread_safety.md 157 new_http = google_auth_httplib2.AuthorizedHttp(credentials, 158 http=httplib2.Http()) 159 return googleapiclient.http.HttpRequest(new_http, *args, **kwargs) 160 161 universe_domain = config.get('universe_domain') 162 cred_universe = getattr(credentials, 'universe_domain', 'googleapis.com') 163 if cred_universe != universe_domain: 164 raise ValueError('credential universe_domain mismatch ' 165 f'{cred_universe} != {universe_domain}') 166 client_options = ClientOptions() 167 if universe_domain != 'googleapis.com': 168 client_options.universe_domain = universe_domain 169 if region: 170 client_options.api_endpoint = f'https://{region}-{service_name}.{universe_domain}' 171 else: 172 client_options.api_endpoint = f'https://{service_name}.{universe_domain}' 173 if service_name in ['compute']: 174 client_options.api_endpoint += f'/{service_name}/{version}' 175 api = discovery.build(service_name, 176 version, 177 cache_discovery=False, 178 credentials=credentials, 179 requestBuilder=_request_builder, 180 client_options=client_options) 181 return api
Get an API object, as returned by googleapiclient.discovery.build.
If project_id is specified, this will be used as the billed project, and usually you should put there the project id of the project that you are inspecting.
def
is_enabled(project_id: str, service_name: str) -> bool:
def
is_all_enabled(project_id: str, services: list) -> Dict[str, str]:
@caching.cached_api_call(in_memory=True)
def
list_services_with_state(project_id: str) -> Dict[str, str]:
218@caching.cached_api_call(in_memory=True) 219def list_services_with_state(project_id: str) -> Dict[str, str]: 220 logging.debug('listing all APIs with their state') 221 serviceusage = get_api('serviceusage', 'v1', project_id) 222 request = serviceusage.services().list(parent=f'projects/{project_id}') 223 apis_state: Dict[str, str] = {} 224 try: 225 while request is not None: 226 response = request.execute(num_retries=config.API_RETRIES) 227 for service in response['services']: 228 apis_state.setdefault(service['config']['name'], service['state']) 229 request = serviceusage.services().list_next(request, response) 230 except googleapiclient.errors.HttpError as err: 231 raise utils.GcpApiError(err) from err 232 return apis_state
def
verify_access(project_id: str):
235def verify_access(project_id: str): 236 """Verify that the user has access to the project, exit with an error otherwise.""" 237 238 try: 239 if not is_enabled(project_id, 'cloudresourcemanager'): 240 service = f'cloudresourcemanager.{config.get("universe_domain")}' 241 error_msg = ( 242 'Cloud Resource Manager API must be enabled. To enable, execute:\n' 243 f'gcloud services enable {service} --project={project_id}') 244 raise utils.GcpApiError(response=error_msg, 245 service=service, 246 reason='SERVICE_DISABLED') 247 if not is_enabled(project_id, 'iam'): 248 service = f'iam.{config.get("universe_domain")}' 249 error_msg = ( 250 'Identity and Access Management (IAM) API must be enabled. To enable, execute:\n' 251 f'gcloud services enable iam.{config.get("universe_domain")} --project={project_id}' 252 ) 253 raise utils.GcpApiError(response=error_msg, 254 service=service, 255 reason='SERVICE_DISABLED') 256 if not is_enabled(project_id, 'logging'): 257 service = f'logging.{config.get("universe_domain")}' 258 warning_msg = ( 259 'Cloud Logging API is not enabled (related rules will be skipped).' 260 ' To enable, execute:\n' 261 f'gcloud services enable logging.{config.get("universe_domain")} --project={project_id}\n' 262 ) 263 raise utils.GcpApiError(response=warning_msg, 264 service=service, 265 reason='SERVICE_DISABLED') 266 except utils.GcpApiError as err: 267 if 'SERVICE_DISABLED' == err.reason: 268 if f'serviceusage.{config.get("universe_domain")}' == err.service: 269 err.response += ( 270 '\nService Usage API must be enabled. To enable, execute:\n' 271 f'gcloud services enable serviceusage.{config.get("universe_domain")} ' 272 f'--project={project_id}') 273 else: 274 logging.error('can\'t access project %s: %s', project_id, err.message) 275 raise err 276 except exceptions.GoogleAuthError as err: 277 logging.error(err) 278 if _auth_method() == 'adc': 279 logging.error('Error using application default credentials. ' 280 'Try running: gcloud auth login --update-adc') 281 raise err 282 # Plug-in additional authorization verifications 283 hooks.verify_access_hook(project_id)
Verify that the user has access to the project, exit with an error otherwise.