gcpdiag.queries.apis
Build and cache GCP APIs + handle authentication.
AUTH_SCOPES =
['openid', 'https://www.googleapis.com/auth/cloud-platform', 'https://www.googleapis.com/auth/userinfo.email']
def
set_credentials(cred_json):
def
get_credentials():
120def get_credentials(): 121 if _auth_method() == 'adc': 122 return _get_credentials_adc() 123 elif _auth_method() == 'key': 124 return _get_credentials_key() 125 else: 126 raise AssertionError( 127 'BUG: AUTH_METHOD method should be one of `adc` or `key`, ' 128 f'but got `{_auth_method()}` instead.' 129 ' Please report at https://gcpdiag.dev/issues/')
def
login():
139def login(): 140 """Force GCP login (this otherwise happens on the first get_api call).""" 141 get_credentials()
Force GCP login (this otherwise happens on the first get_api call).
def
get_user_email() -> str:
144def get_user_email() -> str: 145 if config.get('universe_domain'): 146 return 'TPC user' 147 credentials = get_credentials().with_quota_project(None) 148 149 http = google_auth_httplib2.AuthorizedHttp(credentials, http=httplib2.Http()) 150 resp, content = http.request('https://www.googleapis.com/userinfo/v2/me') 151 if resp['status'] != '200': 152 raise RuntimeError(f"can't determine user email. status={resp['status']}") 153 data = json.loads(content) 154 logging.debug('determined my email address: %s', data['email']) 155 return data['email']
@caching.cached_api_call(in_memory=True)
def
get_api( service_name: str, version: str, project_id: Optional[str] = None, region: Optional[str] = None):
158@caching.cached_api_call(in_memory=True) 159def get_api(service_name: str, 160 version: str, 161 project_id: Optional[str] = None, 162 region: Optional[str] = None): 163 """Get an API object, as returned by googleapiclient.discovery.build. 164 165 If project_id is specified, this will be used as the billed project, and usually 166 you should put there the project id of the project that you are inspecting.""" 167 credentials = get_credentials() 168 169 def _request_builder(http, *args, **kwargs): 170 del http 171 172 if 'headers' in kwargs: 173 # thread safety: make sure that original dictionary isn't modified 174 kwargs['headers'] = kwargs['headers'].copy() 175 176 headers = kwargs.get('headers', {}) 177 headers['user-agent'] = f'gcpdiag/{config.VERSION} (gzip)' 178 if project_id: 179 headers['x-goog-user-project'] = _get_project_or_billing_id(project_id) 180 181 hooks.request_builder_hook(*args, **kwargs) 182 183 # thread safety: create a new AuthorizedHttp object for every request 184 # https://github.com/googleapis/google-api-python-client/blob/master/docs/thread_safety.md 185 new_http = google_auth_httplib2.AuthorizedHttp(credentials, 186 http=httplib2.Http()) 187 return googleapiclient.http.HttpRequest(new_http, *args, **kwargs) 188 189 universe_domain = config.get('universe_domain') 190 cred_universe = getattr(credentials, 'universe_domain') 191 if cred_universe != universe_domain: 192 raise ValueError('credential universe_domain mismatch ' 193 f'{cred_universe} != {universe_domain}') 194 client_options = ClientOptions() 195 if universe_domain != 'googleapis.com': 196 client_options.universe_domain = universe_domain 197 if region: 198 client_options.api_endpoint = f'https://{region}-{service_name}.{universe_domain}' 199 else: 200 client_options.api_endpoint = f'https://{service_name}.{universe_domain}' 201 if service_name in ['compute']: 202 client_options.api_endpoint += f'/{service_name}/{version}' 203 api = discovery.build(service_name, 204 version, 205 cache_discovery=False, 206 credentials=credentials, 207 requestBuilder=_request_builder, 208 client_options=client_options) 209 return api
Get an API object, as returned by googleapiclient.discovery.build.
If project_id is specified, this will be used as the billed project, and usually you should put there the project id of the project that you are inspecting.
def
is_enabled(project_id: str, service_name: str) -> bool:
@caching.cached_api_call(in_memory=True)
def
list_services_with_state(project_id: str) -> Dict[str, str]:
234@caching.cached_api_call(in_memory=True) 235def list_services_with_state(project_id: str) -> Dict[str, str]: 236 logging.debug('listing all APIs with their state') 237 serviceusage = get_api('serviceusage', 'v1', project_id) 238 request = serviceusage.services().list(parent=f'projects/{project_id}') 239 apis_state: Dict[str, str] = {} 240 try: 241 while request is not None: 242 response = request.execute(num_retries=config.API_RETRIES) 243 for service in response['services']: 244 apis_state.setdefault(service['config']['name'], service['state']) 245 request = serviceusage.services().list_next(request, response) 246 except googleapiclient.errors.HttpError as err: 247 raise utils.GcpApiError(err) from err 248 return apis_state
def
verify_access(project_id: str):
251def verify_access(project_id: str): 252 """Verify that the user has access to the project, exit with an error otherwise.""" 253 254 try: 255 if not is_enabled(project_id, 'cloudresourcemanager'): 256 print( 257 ('ERROR: Cloud Resource Manager API must be enabled. To enable,' 258 ' execute:\ngcloud services enable' 259 f' cloudresourcemanager.googleapis.com --project={project_id}'), 260 file=sys.stdout, 261 ) 262 sys.exit(1) 263 264 if not is_enabled(project_id, 'iam'): 265 print( 266 ('ERROR: Identity and Access Management (IAM) API must be' 267 ' enabled. To enable, execute:\ngcloud services enable' 268 f' iam.googleapis.com --project={project_id}'), 269 file=sys.stdout, 270 ) 271 sys.exit(1) 272 273 if not is_enabled(project_id, 'logging'): 274 print( 275 ('WARNING: Cloud Logging API is not enabled (related rules will' 276 ' be skipped). To enable, execute:\ngcloud services enable' 277 f' logging.googleapis.com --project={project_id}\n'), 278 file=sys.stdout, 279 ) 280 except utils.GcpApiError as err: 281 if ('SERVICE_DISABLED' == err.reason and 282 'serviceusage.googleapis.com' == err.service): 283 print( 284 ('ERROR: Service Usage API must be enabled. To enable,' 285 ' execute:\ngcloud services enable serviceusage.googleapis.com' 286 f' --project={project_id}'), 287 file=sys.stdout, 288 ) 289 else: 290 print( 291 f"ERROR: can't access project {project_id}: {err.message}.", 292 file=sys.stdout, 293 ) 294 sys.exit(1) 295 except exceptions.GoogleAuthError as err: 296 print(f'ERROR: {err}', file=sys.stdout) 297 if _auth_method() == 'adc': 298 print( 299 ('Error using application default credentials. ' 300 'Try running: gcloud auth login --update-adc'), 301 file=sys.stderr, 302 ) 303 sys.exit(1) 304 305 # Plug-in additional authorization verifications 306 hooks.verify_access_hook(project_id)
Verify that the user has access to the project, exit with an error otherwise.
def
make_request(*args, **kwargs):
309def make_request(*args, **kwargs): 310 credentials = get_credentials() 311 if 'headers' not in kwargs: 312 kwargs['headers'] = {} 313 headers = {} 314 headers['Authorization'] = f'Bearer {credentials.token}' 315 kwargs['headers'] = headers 316 response = requests.request(*args, **kwargs) 317 318 if response is not None and response.status_code == 200: 319 return response.json()