gcpdiag.queries.iam
32class Role(models.Resource): 33 """Represents an IAM role""" 34 35 def __init__(self, resource_data): 36 try: 37 project_id = utils.get_project_by_res_name(resource_data['name']) 38 except ValueError: 39 project_id = None 40 41 super().__init__(project_id=project_id) 42 self._resource_data = resource_data 43 44 @property 45 def name(self) -> str: 46 return self._resource_data['name'] 47 48 @property 49 def full_path(self) -> str: 50 return self.name 51 52 @property 53 def permissions(self) -> List[str]: 54 # roles should usually include one or more permissions 55 return self._resource_data.get('includedPermissions', [])
Represents an IAM role
Returns the full path of this resource.
Example: 'projects/gcpdiag-gke-1-9b90/zones/europe-west4-a/clusters/gke1'
Inherited Members
- gcpdiag.models.Resource
- project_id
- short_path
Common base class for all non-exit exceptions.
Inherited Members
- builtins.Exception
- Exception
- builtins.BaseException
- with_traceback
- args
159class BaseIAMPolicy(models.Resource): 160 """Common class for IAM policies""" 161 162 _name: str 163 _policy_by_member: Dict[str, Any] 164 165 @property 166 def full_path(self): 167 return self._name 168 169 @abc.abstractmethod 170 def _is_resource_permission(self, permission: str) -> bool: 171 """Checks that a permission is applicable to the resource 172 173 Any role can be assigned on a resource level but only a subset of 174 permissions will be relevant to a resource 175 Irrelevant permissions are ignored in `has_role_permissions` method 176 """ 177 pass 178 179 def _expand_policy(self, resource_data: Dict[str, Any]) -> Dict[str, Any]: 180 """Groups `getIamPolicy` bindings by member 181 182 API response contains a list of bindings of a role to members: 183 { 184 "bindings": [ 185 { 186 "role": "roles/resourcemanager.organizationAdmin", 187 "members": [ 188 "user:mike@example.com", 189 "serviceAccount:my-project-id@appspot.gserviceaccount.com" 190 ] 191 }, 192 ... 193 } 194 195 This method will convert those bindings into the following structure: 196 { 197 "user:mike@example.com": { 198 "roles": { "roles/resourcemanager.organizationAdmin" }, 199 }, 200 "serviceAccount:my-project-id@appspot.gserviceaccount.com": { 201 "roles": { "roles/resourcemanager.organizationAdmin" }, 202 }, 203 } 204 """ 205 206 policy_roles = set() 207 policy_by_member: Dict[str, Any] = defaultdict(dict) 208 209 # Empty lists are omitted in GCP API responses 210 for binding in resource_data.get('bindings', []): 211 if 'condition' in binding: 212 logging.warning( 213 'IAM binding contains a condition, which would be ignored: %s', 214 binding) 215 216 # IAM binding should always have a role and at least one member 217 policy_roles.add(binding['role']) 218 for member in binding['members']: 219 member_policy = policy_by_member[member] 220 member_policy.setdefault('roles', set()).add(binding['role']) 221 222 # Populate cache for IAM roles used in the policy 223 # Unlike `has_role_permissions` this part will be executed inside 224 # `prefetch_rule` and will benefit from multi-threading execution 225 for role in policy_roles: 226 # Ignore all errors - there could be no rules involving this role 227 try: 228 _get_iam_role(role, self.project_id) 229 except (RoleNotFoundError, utils.GcpApiError): 230 pass 231 232 # Populate cache for service accounts used in the policy 233 # Note: not implemented as a generator expression because 234 # it looks ugly without assignment expressions, available 235 # only with Python >= 3.8. 236 sa_emails = set() 237 for member in policy_by_member.keys(): 238 # Note: not matching / makes sure that we don't match for example fleet 239 # workload identities: 240 # https://cloud.google.com/anthos/multicluster-management/fleets/workload-identity 241 m = re.match(r'serviceAccount:([^/]+)$', member) 242 if m: 243 sa_emails.add(m.group(1)) 244 _batch_fetch_service_accounts(list(sa_emails), self.project_id) 245 246 return policy_by_member 247 248 def _expand_member_policy(self, member: str): 249 """Expands member roles into set of permissions 250 251 Permissions are using "lazy" initialization and only expanded if needed 252 """ 253 member_policy = self._policy_by_member.get(member) 254 if not member_policy or \ 255 'permissions' in member_policy: 256 return 257 258 permissions = set() 259 for role in member_policy['roles']: 260 try: 261 permissions.update(_get_iam_role(role, self.project_id).permissions) 262 except (RoleNotFoundError, utils.GcpApiError) as err: 263 # Ignore roles if cannot retrieve a role 264 # For example, due to lack of permissions in an organization 265 logging.warning('Unable to find IAM role \'%s\', ignoring: %s', role, 266 err) 267 member_policy['permissions'] = permissions 268 269 def _is_active_member(self, member: str) -> bool: 270 """Checks that the member isn't disabled 271 272 Currently supports only service accounts and not other account types 273 Used in `has_role_permissions` and similar methods to ensure that 274 the member isn't disabled and permissions are effectively working 275 """ 276 277 # If this is a service account, make sure that the service account is enabled. 278 # Note: not matching / makes sure that we don't match for example fleet 279 # workload identities: 280 # https://cloud.google.com/anthos/multicluster-management/fleets/workload-identity 281 m = re.match(r'serviceAccount:([^/]+)$', member) 282 if m: 283 if not is_service_account_enabled(m.group(1), self.project_id): 284 logging.info('service account %s is disabled', m.group(1)) 285 return False 286 287 return True 288 289 def __init__(self, project_id: str, name: str, resource_data: Dict[str, Any]): 290 super().__init__(project_id) 291 self._name = name 292 self._policy_by_member = self._expand_policy(resource_data) 293 294 def get_member_permissions(self, member: str) -> List[str]: 295 """Return permissions for a member (either a user or serviceAccount). 296 297 The "member" can be a user or a service account and must be specified with 298 the IAM member syntax, i.e. using the prefixes `user:` or `serviceAccount:`. 299 """ 300 301 if member not in self._policy_by_member: 302 return [] 303 304 self._expand_member_policy(member) 305 return sorted(self._policy_by_member[member]['permissions']) 306 307 def get_members(self) -> List[str]: 308 """Returns the IAM members of the project. 309 310 The "member" can be a user or a service account and is specified with 311 the IAM member syntax, i.e. using the prefixes `user:` or `serviceAccount:`. 312 """ 313 return list(self._policy_by_member.keys()) 314 315 def get_member_type(self, member) -> Optional[str]: 316 """Returns the IAM members of the project. 317 318 The "member" can be a user or a service account and is specified with 319 the IAM member syntax, i.e. using the prefixes `user:` or `serviceAccount:`. 320 """ 321 for m in self._policy_by_member.keys(): 322 parts = m.split(':') 323 if member == parts[1]: 324 return parts[0] 325 return None 326 327 def has_permission(self, member: str, permission: str) -> bool: 328 """Return true if user or service account member has this permission. 329 330 Note that any indirect bindings, for example through group membership, 331 aren't supported and only direct bindings to this member are checked 332 """ 333 334 if member not in self._policy_by_member: 335 return False 336 337 self._expand_member_policy(member) 338 if permission not in self._policy_by_member[member]['permissions']: 339 return False 340 return self._is_active_member(member) 341 342 def has_any_permission(self, member: str, permission: set[str]) -> bool: 343 """Return true if user or service account member has any of these permission. 344 345 Note that any indirect bindings, for example through group membership, 346 aren't supported and only direct bindings to this member are checked 347 """ 348 349 if member not in self._policy_by_member: 350 return False 351 352 self._expand_member_policy(member) 353 if any( 354 p in self._policy_by_member[member]['permissions'] for p in permission): 355 return True 356 return self._is_active_member(member) 357 358 def _has_role(self, member: str, role: str) -> bool: 359 """Checks that the member has this role 360 361 It performs exact match and doesn't expand role to list of permissions. 362 Note that this method is not public because users of this module should 363 use has_role_permissions(), i.e. verify effective permissions instead of 364 roles.""" 365 366 if member not in self._policy_by_member: 367 return False 368 369 if role not in self._policy_by_member[member]['roles']: 370 return False 371 return self._is_active_member(member) 372 373 def has_role_permissions(self, member: str, role: str) -> bool: 374 """Checks that this member has all the permissions defined by this role""" 375 376 if member not in self._policy_by_member: 377 return False 378 379 # Avoid expanding roles to permissions 380 if self._has_role(member, role): 381 # member status was already checked in `has_role` 382 return True 383 384 self._expand_member_policy(member) 385 role_permissions = { 386 p for p in _get_iam_role(role, self.project_id).permissions 387 if self._is_resource_permission(p) 388 } 389 390 missing_roles = role_permissions - self._policy_by_member[member][ 391 'permissions'] 392 if missing_roles: 393 logging.debug('member \'%s\' doesn\'t have permissions %s', member, 394 ','.join(missing_roles)) 395 return False 396 return self._is_active_member(member)
Common class for IAM policies
Returns the full path of this resource.
Example: 'projects/gcpdiag-gke-1-9b90/zones/europe-west4-a/clusters/gke1'
294 def get_member_permissions(self, member: str) -> List[str]: 295 """Return permissions for a member (either a user or serviceAccount). 296 297 The "member" can be a user or a service account and must be specified with 298 the IAM member syntax, i.e. using the prefixes `user:` or `serviceAccount:`. 299 """ 300 301 if member not in self._policy_by_member: 302 return [] 303 304 self._expand_member_policy(member) 305 return sorted(self._policy_by_member[member]['permissions'])
Return permissions for a member (either a user or serviceAccount).
The "member" can be a user or a service account and must be specified with
the IAM member syntax, i.e. using the prefixes user:
or serviceAccount:
.
307 def get_members(self) -> List[str]: 308 """Returns the IAM members of the project. 309 310 The "member" can be a user or a service account and is specified with 311 the IAM member syntax, i.e. using the prefixes `user:` or `serviceAccount:`. 312 """ 313 return list(self._policy_by_member.keys())
Returns the IAM members of the project.
The "member" can be a user or a service account and is specified with
the IAM member syntax, i.e. using the prefixes user:
or serviceAccount:
.
315 def get_member_type(self, member) -> Optional[str]: 316 """Returns the IAM members of the project. 317 318 The "member" can be a user or a service account and is specified with 319 the IAM member syntax, i.e. using the prefixes `user:` or `serviceAccount:`. 320 """ 321 for m in self._policy_by_member.keys(): 322 parts = m.split(':') 323 if member == parts[1]: 324 return parts[0] 325 return None
Returns the IAM members of the project.
The "member" can be a user or a service account and is specified with
the IAM member syntax, i.e. using the prefixes user:
or serviceAccount:
.
327 def has_permission(self, member: str, permission: str) -> bool: 328 """Return true if user or service account member has this permission. 329 330 Note that any indirect bindings, for example through group membership, 331 aren't supported and only direct bindings to this member are checked 332 """ 333 334 if member not in self._policy_by_member: 335 return False 336 337 self._expand_member_policy(member) 338 if permission not in self._policy_by_member[member]['permissions']: 339 return False 340 return self._is_active_member(member)
Return true if user or service account member has this permission.
Note that any indirect bindings, for example through group membership, aren't supported and only direct bindings to this member are checked
342 def has_any_permission(self, member: str, permission: set[str]) -> bool: 343 """Return true if user or service account member has any of these permission. 344 345 Note that any indirect bindings, for example through group membership, 346 aren't supported and only direct bindings to this member are checked 347 """ 348 349 if member not in self._policy_by_member: 350 return False 351 352 self._expand_member_policy(member) 353 if any( 354 p in self._policy_by_member[member]['permissions'] for p in permission): 355 return True 356 return self._is_active_member(member)
Return true if user or service account member has any of these permission.
Note that any indirect bindings, for example through group membership, aren't supported and only direct bindings to this member are checked
373 def has_role_permissions(self, member: str, role: str) -> bool: 374 """Checks that this member has all the permissions defined by this role""" 375 376 if member not in self._policy_by_member: 377 return False 378 379 # Avoid expanding roles to permissions 380 if self._has_role(member, role): 381 # member status was already checked in `has_role` 382 return True 383 384 self._expand_member_policy(member) 385 role_permissions = { 386 p for p in _get_iam_role(role, self.project_id).permissions 387 if self._is_resource_permission(p) 388 } 389 390 missing_roles = role_permissions - self._policy_by_member[member][ 391 'permissions'] 392 if missing_roles: 393 logging.debug('member \'%s\' doesn\'t have permissions %s', member, 394 ','.join(missing_roles)) 395 return False 396 return self._is_active_member(member)
Checks that this member has all the permissions defined by this role
Inherited Members
- gcpdiag.models.Resource
- project_id
- short_path
399def fetch_iam_policy(request, resource_class: Type[BaseIAMPolicy], 400 project_id: str, name: str): 401 """Executes `getIamPolicy` request and converts into a resource class 402 403 Supposed to be used by `get_*_policy` functions in gcpdiag.queries.* and 404 requires an API request, which can be executed, to be passed in parameters 405 406 An abstract policy request should look like: 407 class ResourcePolicy(BaseIAMPolicy): 408 pass 409 410 def get_resource_policy(name): 411 api_request = get_api(..).resources().get(name=name) 412 ... 413 return fetch_iam_policy(api_request, ResourcePolicy, project_id, name) 414 415 Note: API calls aren't cached and it should be done externally 416 """ 417 418 logging.info('fetching IAM policy of \'%s\'', name) 419 try: 420 response = request.execute(num_retries=config.API_RETRIES) 421 except googleapiclient.errors.HttpError as err: 422 raise utils.GcpApiError(err) from err 423 return resource_class(project_id, name, response)
Executes getIamPolicy
request and converts into a resource class
Supposed to be used by get_*_policy
functions in gcpdiag.queries.* and
requires an API request, which can be executed, to be passed in parameters
An abstract policy request should look like:
class ResourcePolicy(BaseIAMPolicy): pass
def get_resource_policy(name): api_request = get_api(..).resources().get(name=name) ... return fetch_iam_policy(api_request, ResourcePolicy, project_id, name)
Note: API calls aren't cached and it should be done externally
426class ProjectPolicy(BaseIAMPolicy): 427 """Represents the IAM policy of a single project. 428 429 Note that you should use the get_project_policy() method so that the 430 objects are cached and you don't re-fetch the project policy. 431 432 See also the API documentation: 433 https://cloud.google.com/resource-manager/reference/rest/v1/projects/getIamPolicy 434 """ 435 436 def _is_resource_permission(self, permission: str) -> bool: 437 # Filter out permissions that can be granted only on organization or folders 438 # It also excludes some permissions that aren't supported in custom roles 439 # 440 # https://cloud.google.com/resource-manager/docs/access-control-proj#permissions 441 # https://cloud.google.com/monitoring/access-control#custom_roles 442 if permission.startswith('resourcemanager.projects.') or \ 443 permission.startswith('stackdriver.projects.'): 444 return False 445 return True
Represents the IAM policy of a single project.
Note that you should use the get_project_policy() method so that the objects are cached and you don't re-fetch the project policy.
See also the API documentation: https://cloud.google.com/resource-manager/reference/rest/v1/projects/getIamPolicy
Inherited Members
- BaseIAMPolicy
- BaseIAMPolicy
- full_path
- get_member_permissions
- get_members
- get_member_type
- has_permission
- has_any_permission
- has_role_permissions
- gcpdiag.models.Resource
- project_id
- short_path
448@caching.cached_api_call(in_memory=True) 449def get_project_policy(project_id: str) -> ProjectPolicy: 450 """Return the ProjectPolicy object for a project, caching the result.""" 451 452 resource_name = f'projects/{project_id}' 453 454 crm_api = apis.get_api('cloudresourcemanager', 'v1', project_id) 455 request = crm_api.projects().getIamPolicy(resource=project_id) 456 457 return fetch_iam_policy(request, ProjectPolicy, project_id, resource_name)
Return the ProjectPolicy object for a project, caching the result.
460class ServiceAccount(models.Resource): 461 """ Class represents the service account. 462 463 Add more fields as needed from the declaration: 464 https://cloud.google.com/iam/docs/reference/rest/v1/projects.serviceAccounts#ServiceAccount 465 """ 466 _resource_data: dict 467 468 def __init__(self, project_id, resource_data): 469 super().__init__(project_id=project_id) 470 self._resource_data = resource_data 471 472 @property 473 def name(self) -> str: 474 return self._resource_data['name'] 475 476 @property 477 def email(self) -> str: 478 return self._resource_data['email'] 479 480 @property 481 def unique_id(self) -> str: 482 return self._resource_data['uniqueId'] 483 484 @property 485 def disabled(self) -> bool: 486 return self._resource_data.get('disabled', False) 487 488 @property 489 def full_path(self) -> str: 490 # example: "name": 491 # "projects/skanzhelev-gke-dev/serviceAccounts/test-service-account-1 492 # @skanzhelev-gke-dev.iam.gserviceaccount.com" 493 return self.name 494 495 @property 496 def short_path(self) -> str: 497 path = self.full_path 498 path = re.sub(r'^projects/', '', path) 499 path = re.sub(r'/serviceAccounts/', '/', path) 500 return path
Class represents the service account.
Add more fields as needed from the declaration: https://cloud.google.com/iam/docs/reference/rest/v1/projects.serviceAccounts#ServiceAccount
488 @property 489 def full_path(self) -> str: 490 # example: "name": 491 # "projects/skanzhelev-gke-dev/serviceAccounts/test-service-account-1 492 # @skanzhelev-gke-dev.iam.gserviceaccount.com" 493 return self.name
Returns the full path of this resource.
Example: 'projects/gcpdiag-gke-1-9b90/zones/europe-west4-a/clusters/gke1'
495 @property 496 def short_path(self) -> str: 497 path = self.full_path 498 path = re.sub(r'^projects/', '', path) 499 path = re.sub(r'/serviceAccounts/', '/', path) 500 return path
Returns the short name for this resource.
Note that it isn't clear from this name what kind of resource it is.
Example: 'gke1'
Inherited Members
- gcpdiag.models.Resource
- project_id
672def is_service_account_existing(email: str, billing_project_id: str) -> bool: 673 """Verify that a service account exists. 674 675 If we get a non-404 API error when retrieving the service account, we will assume 676 that the service account exists, not to throw false positives (but 677 a warning will be printed out). 678 """ 679 # Make sure that the service account is fetched (this is also 680 # called by get_project_policy). 681 _batch_fetch_service_accounts([email], billing_project_id) 682 return email not in _service_account_cache_is_not_found
Verify that a service account exists.
If we get a non-404 API error when retrieving the service account, we will assume that the service account exists, not to throw false positives (but a warning will be printed out).
685def is_service_account_enabled(email: str, billing_project_id: str) -> bool: 686 """Verify that a service account exists and is enabled. 687 688 If we get an API error when retrieving the service account, we will assume 689 that the service account is enabled, not to throw false positives (but 690 a warning will be printed out). 691 """ 692 _batch_fetch_service_accounts([email], billing_project_id) 693 return (email not in _service_account_cache_is_not_found) and \ 694 not (email in _service_account_cache and _service_account_cache[email].disabled)
Verify that a service account exists and is enabled.
If we get an API error when retrieving the service account, we will assume that the service account is enabled, not to throw false positives (but a warning will be printed out).
697class ServiceAccountIAMPolicy(BaseIAMPolicy): 698 699 def _is_resource_permission(self, permission): 700 return True
Common class for IAM policies
Inherited Members
- BaseIAMPolicy
- BaseIAMPolicy
- full_path
- get_member_permissions
- get_members
- get_member_type
- has_permission
- has_any_permission
- has_role_permissions
- gcpdiag.models.Resource
- project_id
- short_path
703@caching.cached_api_call(in_memory=True) 704def get_service_account_iam_policy( 705 project_id: str, service_account: str) -> ServiceAccountIAMPolicy: 706 """Returns an IAM policy for a service account""" 707 708 resource_name = f'projects/{project_id}/serviceAccounts/{service_account}' 709 710 iam_api = apis.get_api('iam', 'v1', project_id) 711 request = iam_api.projects().serviceAccounts().getIamPolicy( 712 resource=resource_name) 713 714 return fetch_iam_policy(request, ServiceAccountIAMPolicy, project_id, 715 resource_name)
Returns an IAM policy for a service account
718@caching.cached_api_call(in_memory=True) 719def get_service_account_list(project_id: str) -> List[ServiceAccount]: 720 """Returns list of service accounts""" 721 722 iam_api = apis.get_api('iam', 'v1', project_id) 723 project_name = f'projects/{project_id}' 724 request = iam_api.projects().serviceAccounts().list(name=project_name) 725 try: 726 response = request.execute(num_retries=config.API_RETRIES) 727 except googleapiclient.errors.HttpError as err: 728 raise utils.GcpApiError(err) from err 729 return [ 730 ServiceAccount(project_id, service_account) 731 for service_account in response.get('accounts', []) 732 ]
Returns list of service accounts