gcpdiag.queries.iam
33class Role(models.Resource): 34 """Represents an IAM role""" 35 36 def __init__(self, resource_data): 37 try: 38 project_id = utils.get_project_by_res_name(resource_data['name']) 39 except ValueError: 40 project_id = None 41 42 super().__init__(project_id=project_id) 43 self._resource_data = resource_data 44 45 @property 46 def name(self) -> str: 47 return self._resource_data['name'] 48 49 @property 50 def full_path(self) -> str: 51 return self.name 52 53 @property 54 def permissions(self) -> List[str]: 55 # roles should usually include one or more permissions 56 return self._resource_data.get('includedPermissions', [])
Represents an IAM role
Common base class for all non-exit exceptions.
162class BaseIAMPolicy(models.Resource): 163 """Common class for IAM policies""" 164 165 _name: str 166 _policy_by_member: Dict[str, Any] 167 168 @property 169 def full_path(self): 170 return self._name 171 172 @abc.abstractmethod 173 def _is_resource_permission(self, permission: str) -> bool: 174 """Checks that a permission is applicable to the resource 175 176 Any role can be assigned on a resource level but only a subset of 177 permissions will be relevant to a resource 178 Irrelevant permissions are ignored in `has_role_permissions` method 179 """ 180 pass 181 182 def _expand_policy(self, resource_data: Dict[str, Any]) -> Dict[str, Any]: 183 """Groups `getIamPolicy` bindings by member 184 185 API response contains a list of bindings of a role to members: 186 { 187 "bindings": [ 188 { 189 "role": "roles/resourcemanager.organizationAdmin", 190 "members": [ 191 "user:mike@example.com", 192 "serviceAccount:my-project-id@appspot.gserviceaccount.com" 193 ] 194 }, 195 ... 196 } 197 198 This method will convert those bindings into the following structure: 199 { 200 "user:mike@example.com": { 201 "roles": { "roles/resourcemanager.organizationAdmin" }, 202 }, 203 "serviceAccount:my-project-id@appspot.gserviceaccount.com": { 204 "roles": { "roles/resourcemanager.organizationAdmin" }, 205 }, 206 } 207 """ 208 209 policy_roles = set() 210 policy_by_member: Dict[str, Any] = defaultdict(dict) 211 212 # Empty lists are omitted in GCP API responses 213 for binding in resource_data.get('bindings', []): 214 if 'condition' in binding: 215 logging.warning( 216 'IAM binding contains a condition, which would be ignored: %s', 217 binding) 218 219 # IAM binding should always have a role and at least one member 220 policy_roles.add(binding['role']) 221 for member in binding['members']: 222 member_policy = policy_by_member[member] 223 member_policy.setdefault('roles', set()).add(binding['role']) 224 225 # Populate cache for IAM roles used in the policy 226 # Unlike `has_role_permissions` this part will be executed inside 227 # `prefetch_rule` and will benefit from multi-threading execution 228 for role in policy_roles: 229 # Ignore all errors - there could be no rules involving this role 230 try: 231 _get_iam_role(role, self.project_id) 232 except (RoleNotFoundError, utils.GcpApiError) as err: 233 # Ignore roles if cannot retrieve a role 234 # For example, due to lack of permissions 235 if isinstance(err, utils.GcpApiError): 236 logging.error('API failure getting IAM roles: %s', err) 237 raise utils.GcpApiError(err) from err 238 elif isinstance(err, RoleNotFoundError): 239 logging.warning("Unable to get IAM role '%s', ignoring: %s", role, 240 err) 241 242 # Populate cache for service accounts used in the policy 243 # Note: not implemented as a generator expression because 244 # it looks ugly without assignment expressions, available 245 # only with Python >= 3.8. 246 sa_emails = set() 247 for member in policy_by_member.keys(): 248 # Note: not matching / makes sure that we don't match for example fleet 249 # workload identities: 250 # https://cloud.google.com/anthos/multicluster-management/fleets/workload-identity 251 m = re.match(r'serviceAccount:([^/]+)$', member) 252 if m: 253 sa_emails.add(m.group(1)) 254 _batch_fetch_service_accounts(list(sa_emails), self.project_id) 255 256 return policy_by_member 257 258 def _expand_member_policy(self, member: str): 259 """Expands member roles into set of permissions 260 261 Permissions are using "lazy" initialization and only expanded if needed 262 """ 263 member_policy = self._policy_by_member.get(member) 264 if not member_policy or \ 265 'permissions' in member_policy: 266 return 267 268 permissions = set() 269 for role in member_policy['roles']: 270 try: 271 permissions.update(_get_iam_role(role, self.project_id).permissions) 272 except (RoleNotFoundError, utils.GcpApiError) as err: 273 if isinstance(err, utils.GcpApiError): 274 logging.error('API failure getting IAM roles: %s', err) 275 raise utils.GcpApiError(err) from err 276 elif isinstance(err, RoleNotFoundError): 277 logging.warning("Unable to find IAM role '%s', ignoring: %s", role, 278 err) 279 member_policy['permissions'] = permissions 280 281 def _is_active_member(self, member: str) -> bool: 282 """Checks that the member isn't disabled 283 284 Currently supports only service accounts and not other account types 285 Used in `has_role_permissions` and similar methods to ensure that 286 the member isn't disabled and permissions are effectively working 287 """ 288 289 # If this is a service account, make sure that the service account is enabled. 290 # Note: not matching / makes sure that we don't match for example fleet 291 # workload identities: 292 # https://cloud.google.com/anthos/multicluster-management/fleets/workload-identity 293 m = re.match(r'serviceAccount:([^/]+)$', member) 294 if m: 295 if not is_service_account_enabled(m.group(1), self.project_id): 296 logging.info('service account %s is disabled', m.group(1)) 297 return False 298 299 return True 300 301 def __init__(self, project_id: str, name: str, resource_data: Dict[str, Any]): 302 super().__init__(project_id) 303 self._name = name 304 self._policy_by_member = self._expand_policy(resource_data) 305 306 def get_member_permissions(self, member: str) -> List[str]: 307 """Return permissions for a member (either a user or serviceAccount). 308 309 The "member" can be a user or a service account and must be specified with 310 the IAM member syntax, i.e. using the prefixes `user:` or `serviceAccount:`. 311 """ 312 313 if member not in self._policy_by_member: 314 return [] 315 316 self._expand_member_policy(member) 317 return sorted(self._policy_by_member[member]['permissions']) 318 319 def get_members(self) -> List[str]: 320 """Returns the IAM members of the project. 321 322 The "member" can be a user or a service account and is specified with 323 the IAM member syntax, i.e. using the prefixes `user:` or `serviceAccount:`. 324 """ 325 return list(self._policy_by_member.keys()) 326 327 def get_member_type(self, member) -> Optional[str]: 328 """Returns the IAM members of the project. 329 330 The "member" can be a user or a service account and is specified with 331 the IAM member syntax, i.e. using the prefixes `user:` or `serviceAccount:`. 332 """ 333 for m in self._policy_by_member.keys(): 334 parts = m.split(':') 335 if member == parts[1]: 336 return parts[0] 337 return None 338 339 def has_permission(self, member: str, permission: str) -> bool: 340 """Return true if user or service account member has this permission. 341 342 Note that any indirect bindings, for example through group membership, 343 aren't supported and only direct bindings to this member are checked 344 """ 345 346 if member not in self._policy_by_member: 347 return False 348 349 self._expand_member_policy(member) 350 if permission not in self._policy_by_member[member]['permissions']: 351 return False 352 return self._is_active_member(member) 353 354 def has_any_permission(self, member: str, permission: set[str]) -> bool: 355 """Return true if user or service account member has any of these permission. 356 357 Note that any indirect bindings, for example through group membership, 358 aren't supported and only direct bindings to this member are checked 359 """ 360 361 if member not in self._policy_by_member: 362 return False 363 364 self._expand_member_policy(member) 365 if any( 366 p in self._policy_by_member[member]['permissions'] for p in permission): 367 return True 368 return self._is_active_member(member) 369 370 def _has_role(self, member: str, role: str) -> bool: 371 """Checks that the member has this role 372 373 It performs exact match and doesn't expand role to list of permissions. 374 Note that this method is not public because users of this module should 375 use has_role_permissions(), i.e. verify effective permissions instead of 376 roles.""" 377 378 if member not in self._policy_by_member: 379 return False 380 381 if role not in self._policy_by_member[member]['roles']: 382 return False 383 return self._is_active_member(member) 384 385 def has_role_permissions(self, member: str, role: str) -> bool: 386 """Checks that this member has all the permissions defined by this role""" 387 388 if member not in self._policy_by_member: 389 return False 390 391 # Avoid expanding roles to permissions 392 if self._has_role(member, role): 393 # member status was already checked in `has_role` 394 return True 395 396 self._expand_member_policy(member) 397 role_permissions = { 398 p for p in _get_iam_role(role, self.project_id).permissions 399 if self._is_resource_permission(p) 400 } 401 402 missing_roles = role_permissions - self._policy_by_member[member][ 403 'permissions'] 404 if missing_roles: 405 logging.debug('member \'%s\' doesn\'t have permissions %s', member, 406 ','.join(missing_roles)) 407 return False 408 return self._is_active_member(member)
Common class for IAM policies
Returns the full path of this resource.
Example: 'projects/gcpdiag-gke-1-9b90/zones/europe-west4-a/clusters/gke1'
306 def get_member_permissions(self, member: str) -> List[str]: 307 """Return permissions for a member (either a user or serviceAccount). 308 309 The "member" can be a user or a service account and must be specified with 310 the IAM member syntax, i.e. using the prefixes `user:` or `serviceAccount:`. 311 """ 312 313 if member not in self._policy_by_member: 314 return [] 315 316 self._expand_member_policy(member) 317 return sorted(self._policy_by_member[member]['permissions'])
Return permissions for a member (either a user or serviceAccount).
The "member" can be a user or a service account and must be specified with
the IAM member syntax, i.e. using the prefixes user:
or serviceAccount:
.
319 def get_members(self) -> List[str]: 320 """Returns the IAM members of the project. 321 322 The "member" can be a user or a service account and is specified with 323 the IAM member syntax, i.e. using the prefixes `user:` or `serviceAccount:`. 324 """ 325 return list(self._policy_by_member.keys())
Returns the IAM members of the project.
The "member" can be a user or a service account and is specified with
the IAM member syntax, i.e. using the prefixes user:
or serviceAccount:
.
327 def get_member_type(self, member) -> Optional[str]: 328 """Returns the IAM members of the project. 329 330 The "member" can be a user or a service account and is specified with 331 the IAM member syntax, i.e. using the prefixes `user:` or `serviceAccount:`. 332 """ 333 for m in self._policy_by_member.keys(): 334 parts = m.split(':') 335 if member == parts[1]: 336 return parts[0] 337 return None
Returns the IAM members of the project.
The "member" can be a user or a service account and is specified with
the IAM member syntax, i.e. using the prefixes user:
or serviceAccount:
.
339 def has_permission(self, member: str, permission: str) -> bool: 340 """Return true if user or service account member has this permission. 341 342 Note that any indirect bindings, for example through group membership, 343 aren't supported and only direct bindings to this member are checked 344 """ 345 346 if member not in self._policy_by_member: 347 return False 348 349 self._expand_member_policy(member) 350 if permission not in self._policy_by_member[member]['permissions']: 351 return False 352 return self._is_active_member(member)
Return true if user or service account member has this permission.
Note that any indirect bindings, for example through group membership, aren't supported and only direct bindings to this member are checked
354 def has_any_permission(self, member: str, permission: set[str]) -> bool: 355 """Return true if user or service account member has any of these permission. 356 357 Note that any indirect bindings, for example through group membership, 358 aren't supported and only direct bindings to this member are checked 359 """ 360 361 if member not in self._policy_by_member: 362 return False 363 364 self._expand_member_policy(member) 365 if any( 366 p in self._policy_by_member[member]['permissions'] for p in permission): 367 return True 368 return self._is_active_member(member)
Return true if user or service account member has any of these permission.
Note that any indirect bindings, for example through group membership, aren't supported and only direct bindings to this member are checked
385 def has_role_permissions(self, member: str, role: str) -> bool: 386 """Checks that this member has all the permissions defined by this role""" 387 388 if member not in self._policy_by_member: 389 return False 390 391 # Avoid expanding roles to permissions 392 if self._has_role(member, role): 393 # member status was already checked in `has_role` 394 return True 395 396 self._expand_member_policy(member) 397 role_permissions = { 398 p for p in _get_iam_role(role, self.project_id).permissions 399 if self._is_resource_permission(p) 400 } 401 402 missing_roles = role_permissions - self._policy_by_member[member][ 403 'permissions'] 404 if missing_roles: 405 logging.debug('member \'%s\' doesn\'t have permissions %s', member, 406 ','.join(missing_roles)) 407 return False 408 return self._is_active_member(member)
Checks that this member has all the permissions defined by this role
411def fetch_iam_policy(request, resource_class: Type[BaseIAMPolicy], 412 project_id: str, name: str): 413 """Executes `getIamPolicy` request and converts into a resource class 414 415 Supposed to be used by `get_*_policy` functions in gcpdiag.queries.* and 416 requires an API request, which can be executed, to be passed in parameters 417 418 An abstract policy request should look like: 419 class ResourcePolicy(BaseIAMPolicy): 420 pass 421 422 def get_resource_policy(name): 423 api_request = get_api(..).resources().get(name=name) 424 ... 425 return fetch_iam_policy(api_request, ResourcePolicy, project_id, name) 426 427 Note: API calls aren't cached and it should be done externally 428 """ 429 430 logging.info('fetching IAM policy of \'%s\'', name) 431 try: 432 response = request.execute(num_retries=config.API_RETRIES) 433 except googleapiclient.errors.HttpError as err: 434 raise utils.GcpApiError(err) from err 435 return resource_class(project_id, name, response)
Executes getIamPolicy
request and converts into a resource class
Supposed to be used by get_*_policy
functions in gcpdiag.queries.* and
requires an API request, which can be executed, to be passed in parameters
An abstract policy request should look like:
class ResourcePolicy(BaseIAMPolicy): pass
def get_resource_policy(name): api_request = get_api(..).resources().get(name=name) ... return fetch_iam_policy(api_request, ResourcePolicy, project_id, name)
Note: API calls aren't cached and it should be done externally
438class ProjectPolicy(BaseIAMPolicy): 439 """Represents the IAM policy of a single project. 440 441 Note that you should use the get_project_policy() method so that the 442 objects are cached and you don't re-fetch the project policy. 443 444 See also the API documentation: 445 https://cloud.google.com/resource-manager/reference/rest/v1/projects/getIamPolicy 446 """ 447 448 def _is_resource_permission(self, permission: str) -> bool: 449 # Filter out permissions that can be granted only on organization or folders 450 # It also excludes some permissions that aren't supported in custom roles 451 # 452 # https://cloud.google.com/resource-manager/docs/access-control-proj#permissions 453 # https://cloud.google.com/monitoring/access-control#custom_roles 454 if permission.startswith('resourcemanager.projects.') or \ 455 permission.startswith('stackdriver.projects.'): 456 return False 457 return True
Represents the IAM policy of a single project.
Note that you should use the get_project_policy() method so that the objects are cached and you don't re-fetch the project policy.
See also the API documentation: https://cloud.google.com/resource-manager/reference/rest/v1/projects/getIamPolicy
460@caching.cached_api_call(in_memory=True) 461def get_project_policy(project_id: str) -> ProjectPolicy: 462 """Return the ProjectPolicy object for a project, caching the result.""" 463 464 resource_name = f'projects/{project_id}' 465 466 crm_api = apis.get_api('cloudresourcemanager', 'v3', project_id) 467 request = crm_api.projects().getIamPolicy(resource='projects/' + project_id) 468 469 return fetch_iam_policy(request, ProjectPolicy, project_id, resource_name)
Return the ProjectPolicy object for a project, caching the result.
472class ServiceAccount(models.Resource): 473 """ Class represents the service account. 474 475 Add more fields as needed from the declaration: 476 https://cloud.google.com/iam/docs/reference/rest/v1/projects.serviceAccounts#ServiceAccount 477 """ 478 _resource_data: dict 479 480 def __init__(self, project_id, resource_data): 481 super().__init__(project_id=project_id) 482 self._resource_data = resource_data 483 484 @property 485 def name(self) -> str: 486 return self._resource_data['name'] 487 488 @property 489 def email(self) -> str: 490 return self._resource_data['email'] 491 492 @property 493 def unique_id(self) -> str: 494 return self._resource_data['uniqueId'] 495 496 @property 497 def disabled(self) -> bool: 498 return self._resource_data.get('disabled', False) 499 500 @property 501 def full_path(self) -> str: 502 # example: "name": 503 # "projects/skanzhelev-gke-dev/serviceAccounts/test-service-account-1 504 # @skanzhelev-gke-dev.iam.gserviceaccount.com" 505 return self.name 506 507 @property 508 def short_path(self) -> str: 509 path = self.full_path 510 path = re.sub(r'^projects/', '', path) 511 path = re.sub(r'/serviceAccounts/', '/', path) 512 return path
Class represents the service account.
Add more fields as needed from the declaration: https://cloud.google.com/iam/docs/reference/rest/v1/projects.serviceAccounts#ServiceAccount
500 @property 501 def full_path(self) -> str: 502 # example: "name": 503 # "projects/skanzhelev-gke-dev/serviceAccounts/test-service-account-1 504 # @skanzhelev-gke-dev.iam.gserviceaccount.com" 505 return self.name
Returns the full path of this resource.
Example: 'projects/gcpdiag-gke-1-9b90/zones/europe-west4-a/clusters/gke1'
507 @property 508 def short_path(self) -> str: 509 path = self.full_path 510 path = re.sub(r'^projects/', '', path) 511 path = re.sub(r'/serviceAccounts/', '/', path) 512 return path
Returns the short name for this resource.
Note that it isn't clear from this name what kind of resource it is.
Example: 'gke1'
688def is_service_account_existing(email: str, billing_project_id: str) -> bool: 689 """Verify that a service account exists. 690 691 If we get a non-404 API error when retrieving the service account, we will assume 692 that the service account exists, not to throw false positives (but 693 a warning will be printed out). 694 """ 695 # Make sure that the service account is fetched (this is also 696 # called by get_project_policy). 697 _batch_fetch_service_accounts([email], billing_project_id) 698 return email not in _service_account_cache_is_not_found
Verify that a service account exists.
If we get a non-404 API error when retrieving the service account, we will assume that the service account exists, not to throw false positives (but a warning will be printed out).
701def is_service_account_enabled(email: str, billing_project_id: str) -> bool: 702 """Verify that a service account exists and is enabled. 703 704 If we get an API error when retrieving the service account, we will assume 705 that the service account is enabled, not to throw false positives (but 706 a warning will be printed out). 707 """ 708 _batch_fetch_service_accounts([email], billing_project_id) 709 return (email not in _service_account_cache_is_not_found) and \ 710 not (email in _service_account_cache and _service_account_cache[email].disabled)
Verify that a service account exists and is enabled.
If we get an API error when retrieving the service account, we will assume that the service account is enabled, not to throw false positives (but a warning will be printed out).
713class ServiceAccountIAMPolicy(BaseIAMPolicy): 714 715 def _is_resource_permission(self, permission): 716 return True
Common class for IAM policies
719@caching.cached_api_call(in_memory=True) 720def get_service_account_iam_policy( 721 project_id: str, service_account: str) -> ServiceAccountIAMPolicy: 722 """Returns an IAM policy for a service account""" 723 724 resource_name = f'projects/{project_id}/serviceAccounts/{service_account}' 725 726 iam_api = apis.get_api('iam', 'v1', project_id) 727 request = iam_api.projects().serviceAccounts().getIamPolicy( 728 resource=resource_name) 729 730 return fetch_iam_policy(request, ServiceAccountIAMPolicy, project_id, 731 resource_name)
Returns an IAM policy for a service account
734@caching.cached_api_call(in_memory=True) 735def get_service_account_list(project_id: str) -> List[ServiceAccount]: 736 """Returns list of service accounts""" 737 738 iam_api = apis.get_api('iam', 'v1', project_id) 739 project_name = f'projects/{project_id}' 740 request = iam_api.projects().serviceAccounts().list(name=project_name) 741 try: 742 response = request.execute(num_retries=config.API_RETRIES) 743 except googleapiclient.errors.HttpError as err: 744 raise utils.GcpApiError(err) from err 745 return [ 746 ServiceAccount(project_id, service_account) 747 for service_account in response.get('accounts', []) 748 ]
Returns list of service accounts