gcpdiag.queries.iam
33class Role(models.Resource): 34 """Represents an IAM role""" 35 36 def __init__(self, resource_data): 37 try: 38 project_id = utils.get_project_by_res_name(resource_data['name']) 39 except ValueError: 40 project_id = None 41 42 super().__init__(project_id=project_id) 43 self._resource_data = resource_data 44 45 @property 46 def name(self) -> str: 47 return self._resource_data['name'] 48 49 @property 50 def full_path(self) -> str: 51 return self.name 52 53 @property 54 def permissions(self) -> List[str]: 55 # roles should usually include one or more permissions 56 return self._resource_data.get('includedPermissions', [])
Represents an IAM role
Common base class for all non-exit exceptions.
162class BaseIAMPolicy(models.Resource): 163 """Common class for IAM policies""" 164 165 _name: str 166 _policy_by_member: Dict[str, Any] 167 168 @property 169 def full_path(self): 170 return self._name 171 172 @abc.abstractmethod 173 def _is_resource_permission(self, permission: str) -> bool: 174 """Checks that a permission is applicable to the resource 175 176 Any role can be assigned on a resource level but only a subset of 177 permissions will be relevant to a resource 178 Irrelevant permissions are ignored in `has_role_permissions` method 179 """ 180 pass 181 182 def _expand_policy(self, resource_data: Dict[str, Any]) -> Dict[str, Any]: 183 """Groups `getIamPolicy` bindings by member 184 185 API response contains a list of bindings of a role to members: 186 { 187 "bindings": [ 188 { 189 "role": "roles/resourcemanager.organizationAdmin", 190 "members": [ 191 "user:mike@example.com", 192 "serviceAccount:my-project-id@appspot.gserviceaccount.com" 193 ] 194 }, 195 ... 196 } 197 198 This method will convert those bindings into the following structure: 199 { 200 "user:mike@example.com": { 201 "roles": { "roles/resourcemanager.organizationAdmin" }, 202 }, 203 "serviceAccount:my-project-id@appspot.gserviceaccount.com": { 204 "roles": { "roles/resourcemanager.organizationAdmin" }, 205 }, 206 } 207 """ 208 209 policy_roles = set() 210 policy_by_member: Dict[str, Any] = collections.defaultdict(dict) 211 212 # Empty lists are omitted in GCP API responses 213 for binding in resource_data.get('bindings', []): 214 if 'condition' in binding: 215 logging.warning( 216 'IAM binding contains a condition, which would be ignored: %s', 217 binding, 218 ) 219 220 # IAM binding should always have a role and at least one member 221 policy_roles.add(binding['role']) 222 for member in binding['members']: 223 member_policy = policy_by_member[member] 224 member_policy.setdefault('roles', set()).add(binding['role']) 225 226 # Populate cache for IAM roles used in the policy 227 # Unlike `has_role_permissions` this part will be executed inside 228 # `prefetch_rule` and will benefit from multi-threading execution 229 for role in policy_roles: 230 # Ignore all errors - there could be no rules involving this role 231 try: 232 _get_iam_role(role, self.project_id) 233 except (RoleNotFoundError, utils.GcpApiError) as err: 234 # Ignore roles if cannot retrieve a role 235 # For example, due to lack of permissions 236 if isinstance(err, utils.GcpApiError): 237 logging.error('API failure getting IAM roles: %s', err) 238 raise utils.GcpApiError(err) from err 239 elif isinstance(err, RoleNotFoundError): 240 logging.warning("Unable to get IAM role '%s', ignoring: %s", role, 241 err) 242 243 # Populate cache for service accounts used in the policy 244 # Note: not implemented as a generator expression because 245 # it looks ugly without assignment expressions, available 246 # only with Python >= 3.8. 247 sa_emails = set() 248 for member in policy_by_member.keys(): 249 # Note: not matching / makes sure that we don't match for example fleet 250 # workload identities: 251 # https://cloud.google.com/anthos/multicluster-management/fleets/workload-identity 252 m = re.match(r'serviceAccount:([^/]+)$', member) 253 if m: 254 sa_emails.add(m.group(1)) 255 _batch_fetch_service_accounts(list(sa_emails), self.context) 256 257 return policy_by_member 258 259 def _expand_member_policy(self, member: str): 260 """Expands member roles into set of permissions 261 262 Permissions are using "lazy" initialization and only expanded if needed 263 """ 264 member_policy = self._policy_by_member.get(member) 265 if not member_policy or 'permissions' in member_policy: 266 return 267 268 permissions = set() 269 for role in member_policy['roles']: 270 try: 271 permissions.update(_get_iam_role(role, self.project_id).permissions) 272 except (RoleNotFoundError, utils.GcpApiError) as err: 273 if isinstance(err, utils.GcpApiError): 274 logging.error('API failure getting IAM roles: %s', err) 275 raise utils.GcpApiError(err) from err 276 elif isinstance(err, RoleNotFoundError): 277 logging.warning("Unable to find IAM role '%s', ignoring: %s", role, 278 err) 279 member_policy['permissions'] = permissions 280 281 def _is_active_member(self, member: str) -> bool: 282 """Checks that the member isn't disabled 283 284 Currently supports only service accounts and not other account types 285 Used in `has_role_permissions` and similar methods to ensure that 286 the member isn't disabled and permissions are effectively working 287 """ 288 289 # If this is a service account, make sure that the service account is enabled. 290 # Note: not matching / makes sure that we don't match for example fleet 291 # workload identities: 292 # https://cloud.google.com/anthos/multicluster-management/fleets/workload-identity 293 m = re.match(r'serviceAccount:([^/]+)$', member) 294 if m: 295 if not is_service_account_enabled(m.group(1), self.context): 296 logging.info('service account %s is disabled', m.group(1)) 297 return False 298 299 return True 300 301 def __init__(self, project_id: Optional[str], name: str, 302 resource_data: Dict[str, Any], context: models.Context): 303 super().__init__(project_id) 304 self._name = name 305 self.context = context 306 self._policy_by_member = self._expand_policy(resource_data) 307 308 def get_member_permissions(self, member: str) -> List[str]: 309 """Return permissions for a member (either a user or serviceAccount). 310 311 The "member" can be a user or a service account and must be specified with 312 the IAM member syntax, i.e. using the prefixes `user:` or `serviceAccount:`. 313 """ 314 315 if member not in self._policy_by_member: 316 return [] 317 318 self._expand_member_policy(member) 319 return sorted(self._policy_by_member[member]['permissions']) 320 321 def get_members(self) -> List[str]: 322 """Returns the IAM members of the project. 323 324 The "member" can be a user or a service account and is specified with 325 the IAM member syntax, i.e. using the prefixes `user:` or `serviceAccount:`. 326 """ 327 return list(self._policy_by_member.keys()) 328 329 def get_member_type(self, member) -> Optional[str]: 330 """Returns the IAM members of the project. 331 332 The "member" can be a user or a service account and is specified with 333 the IAM member syntax, i.e. using the prefixes `user:` or `serviceAccount:`. 334 """ 335 for m in self._policy_by_member.keys(): 336 parts = m.split(':') 337 if member == parts[1]: 338 return parts[0] 339 return None 340 341 def has_permission(self, member: str, permission: str) -> bool: 342 """Return true if user or service account member has this permission. 343 344 Note that any indirect bindings, for example through group membership, 345 aren't supported and only direct bindings to this member are checked 346 """ 347 348 if member not in self._policy_by_member: 349 return False 350 351 self._expand_member_policy(member) 352 if permission not in self._policy_by_member[member]['permissions']: 353 return False 354 return self._is_active_member(member) 355 356 def has_any_permission(self, member: str, permission: set[str]) -> bool: 357 """Return true if user or service account member has any of these permission. 358 359 Note that any indirect bindings, for example through group membership, 360 aren't supported and only direct bindings to this member are checked 361 """ 362 363 if member not in self._policy_by_member: 364 return False 365 366 self._expand_member_policy(member) 367 if any( 368 p in self._policy_by_member[member]['permissions'] for p in permission): 369 return True 370 return self._is_active_member(member) 371 372 def _has_role(self, member: str, role: str) -> bool: 373 """Checks that the member has this role 374 375 It performs exact match and doesn't expand role to list of permissions. 376 Note that this method is not public because users of this module should 377 use has_role_permissions(), i.e. verify effective permissions instead of 378 roles. 379 """ 380 381 if member not in self._policy_by_member: 382 return False 383 384 if role not in self._policy_by_member[member]['roles']: 385 return False 386 return self._is_active_member(member) 387 388 def has_role_permissions(self, member: str, role: str) -> bool: 389 """Checks that this member has all the permissions defined by this role""" 390 391 if member not in self._policy_by_member: 392 return False 393 394 # Avoid expanding roles to permissions 395 if self._has_role(member, role): 396 # member status was already checked in `has_role` 397 return True 398 399 self._expand_member_policy(member) 400 role_permissions = { 401 p for p in _get_iam_role(role, self.project_id).permissions 402 if self._is_resource_permission(p) 403 } 404 405 missing_roles = (role_permissions - 406 self._policy_by_member[member]['permissions']) 407 if missing_roles: 408 logging.debug( 409 "member '%s' doesn't have permissions %s", 410 member, 411 ','.join(missing_roles), 412 ) 413 return False 414 return self._is_active_member(member)
Common class for IAM policies
Returns the full path of this resource.
Example: 'projects/gcpdiag-gke-1-9b90/zones/europe-west4-a/clusters/gke1'
308 def get_member_permissions(self, member: str) -> List[str]: 309 """Return permissions for a member (either a user or serviceAccount). 310 311 The "member" can be a user or a service account and must be specified with 312 the IAM member syntax, i.e. using the prefixes `user:` or `serviceAccount:`. 313 """ 314 315 if member not in self._policy_by_member: 316 return [] 317 318 self._expand_member_policy(member) 319 return sorted(self._policy_by_member[member]['permissions'])
Return permissions for a member (either a user or serviceAccount).
The "member" can be a user or a service account and must be specified with
the IAM member syntax, i.e. using the prefixes user: or serviceAccount:.
321 def get_members(self) -> List[str]: 322 """Returns the IAM members of the project. 323 324 The "member" can be a user or a service account and is specified with 325 the IAM member syntax, i.e. using the prefixes `user:` or `serviceAccount:`. 326 """ 327 return list(self._policy_by_member.keys())
Returns the IAM members of the project.
The "member" can be a user or a service account and is specified with
the IAM member syntax, i.e. using the prefixes user: or serviceAccount:.
329 def get_member_type(self, member) -> Optional[str]: 330 """Returns the IAM members of the project. 331 332 The "member" can be a user or a service account and is specified with 333 the IAM member syntax, i.e. using the prefixes `user:` or `serviceAccount:`. 334 """ 335 for m in self._policy_by_member.keys(): 336 parts = m.split(':') 337 if member == parts[1]: 338 return parts[0] 339 return None
Returns the IAM members of the project.
The "member" can be a user or a service account and is specified with
the IAM member syntax, i.e. using the prefixes user: or serviceAccount:.
341 def has_permission(self, member: str, permission: str) -> bool: 342 """Return true if user or service account member has this permission. 343 344 Note that any indirect bindings, for example through group membership, 345 aren't supported and only direct bindings to this member are checked 346 """ 347 348 if member not in self._policy_by_member: 349 return False 350 351 self._expand_member_policy(member) 352 if permission not in self._policy_by_member[member]['permissions']: 353 return False 354 return self._is_active_member(member)
Return true if user or service account member has this permission.
Note that any indirect bindings, for example through group membership, aren't supported and only direct bindings to this member are checked
356 def has_any_permission(self, member: str, permission: set[str]) -> bool: 357 """Return true if user or service account member has any of these permission. 358 359 Note that any indirect bindings, for example through group membership, 360 aren't supported and only direct bindings to this member are checked 361 """ 362 363 if member not in self._policy_by_member: 364 return False 365 366 self._expand_member_policy(member) 367 if any( 368 p in self._policy_by_member[member]['permissions'] for p in permission): 369 return True 370 return self._is_active_member(member)
Return true if user or service account member has any of these permission.
Note that any indirect bindings, for example through group membership, aren't supported and only direct bindings to this member are checked
388 def has_role_permissions(self, member: str, role: str) -> bool: 389 """Checks that this member has all the permissions defined by this role""" 390 391 if member not in self._policy_by_member: 392 return False 393 394 # Avoid expanding roles to permissions 395 if self._has_role(member, role): 396 # member status was already checked in `has_role` 397 return True 398 399 self._expand_member_policy(member) 400 role_permissions = { 401 p for p in _get_iam_role(role, self.project_id).permissions 402 if self._is_resource_permission(p) 403 } 404 405 missing_roles = (role_permissions - 406 self._policy_by_member[member]['permissions']) 407 if missing_roles: 408 logging.debug( 409 "member '%s' doesn't have permissions %s", 410 member, 411 ','.join(missing_roles), 412 ) 413 return False 414 return self._is_active_member(member)
Checks that this member has all the permissions defined by this role
417def fetch_iam_policy( 418 request, 419 resource_class: Type[BaseIAMPolicy], 420 project_id: Optional[str], 421 name: str, 422 context: models.Context, 423 raise_error_if_fails=True, 424): 425 """Executes `getIamPolicy` request and converts into a resource class 426 427 Supposed to be used by `get_*_policy` functions in gcpdiag.queries.* and 428 requires an API request, which can be executed, to be passed in parameters 429 430 An abstract policy request should look like: 431 class ResourcePolicy(BaseIAMPolicy): 432 pass 433 434 def get_resource_policy(name): 435 api_request = get_api(..).resources().get(name=name) 436 ... 437 return fetch_iam_policy(api_request, ResourcePolicy, project_id, name) 438 439 Note: API calls aren't cached and it should be done externally 440 """ 441 442 logging.info("fetching IAM policy of '%s'", name) 443 try: 444 response = request.execute(num_retries=config.API_RETRIES) 445 except googleapiclient.errors.HttpError as err: 446 if raise_error_if_fails: 447 raise utils.GcpApiError(err) from err 448 else: 449 return 450 return resource_class(project_id, name, response, context)
Executes getIamPolicy request and converts into a resource class
Supposed to be used by get_*_policy functions in gcpdiag.queries.* and
requires an API request, which can be executed, to be passed in parameters
An abstract policy request should look like:
class ResourcePolicy(BaseIAMPolicy): pass
def get_resource_policy(name): api_request = get_api(..).resources().get(name=name) ... return fetch_iam_policy(api_request, ResourcePolicy, project_id, name)
Note: API calls aren't cached and it should be done externally
453class ProjectPolicy(BaseIAMPolicy): 454 """Represents the IAM policy of a single project. 455 456 Note that you should use the get_project_policy() method so that the 457 objects are cached and you don't re-fetch the project policy. 458 459 See also the API documentation: 460 https://cloud.google.com/resource-manager/reference/rest/v1/projects/getIamPolicy 461 """ 462 463 def _is_resource_permission(self, permission: str) -> bool: 464 # Filter out permissions that can be granted only on organization or folders 465 # It also excludes some permissions that aren't supported in custom roles 466 # 467 # https://cloud.google.com/resource-manager/docs/access-control-proj#permissions 468 # https://cloud.google.com/monitoring/access-control#custom_roles 469 if permission.startswith('resourcemanager.projects.' 470 ) or permission.startswith('stackdriver.projects.'): 471 return False 472 return True
Represents the IAM policy of a single project.
Note that you should use the get_project_policy() method so that the objects are cached and you don't re-fetch the project policy.
See also the API documentation: https://cloud.google.com/resource-manager/reference/rest/v1/projects/getIamPolicy
475@caching.cached_api_call(in_memory=True) 476def get_project_policy(context: models.Context, 477 raise_error_if_fails=True) -> ProjectPolicy: 478 """Return the ProjectPolicy object for a project, caching the result.""" 479 project_id = context.project_id 480 resource_name = f'projects/{project_id}' 481 482 crm_api = apis.get_api('cloudresourcemanager', 'v3', project_id) 483 request = crm_api.projects().getIamPolicy(resource='projects/' + project_id) 484 return fetch_iam_policy(request, ProjectPolicy, project_id, resource_name, 485 context, raise_error_if_fails)
Return the ProjectPolicy object for a project, caching the result.
488class OrganizationPolicy(BaseIAMPolicy): 489 """Represents the IAM policy of a single organization using v1 API. 490 491 See also the API documentation: 492 https://cloud.google.com/resource-manager/reference/rest/v1/organizations/getIamPolicy 493 """ 494 495 def _is_resource_permission(self, permission: str) -> bool: 496 # Filter out permissions that can be granted only on projects or folders 497 if permission.startswith( 498 'resourcemanager.projects.') or permission.startswith( 499 'resourcemanager.folders.'): 500 return False 501 return True
Represents the IAM policy of a single organization using v1 API.
See also the API documentation: https://cloud.google.com/resource-manager/reference/rest/v1/organizations/getIamPolicy
504@caching.cached_api_call(in_memory=True) 505def get_organization_policy(context: models.Context, 506 organization_id: str, 507 raise_error_if_fails=True) -> OrganizationPolicy: 508 """Return the OrganizationPolicy object for an organization, caching the result.""" 509 510 resource_name = f'organizations/{organization_id}' 511 512 crm_api = apis.get_api('cloudresourcemanager', 'v1') 513 request = crm_api.organizations().getIamPolicy(resource=resource_name) 514 return fetch_iam_policy(request, OrganizationPolicy, None, resource_name, 515 context, raise_error_if_fails)
Return the OrganizationPolicy object for an organization, caching the result.
518class ServiceAccount(models.Resource): 519 """Class represents the service account. 520 521 Add more fields as needed from the declaration: 522 https://cloud.google.com/iam/docs/reference/rest/v1/projects.serviceAccounts#ServiceAccount 523 """ 524 525 _resource_data: dict 526 527 def __init__(self, project_id, resource_data): 528 super().__init__(project_id=project_id) 529 self._resource_data = resource_data 530 531 @property 532 def name(self) -> str: 533 return self._resource_data['name'] 534 535 @property 536 def email(self) -> str: 537 return self._resource_data['email'] 538 539 @property 540 def unique_id(self) -> str: 541 return self._resource_data['uniqueId'] 542 543 @property 544 def disabled(self) -> bool: 545 return self._resource_data.get('disabled', False) 546 547 @property 548 def full_path(self) -> str: 549 # example: "name": 550 # "projects/skanzhelev-gke-dev/serviceAccounts/test-service-account-1 551 # @skanzhelev-gke-dev.iam.gserviceaccount.com" 552 return self.name 553 554 @property 555 def short_path(self) -> str: 556 path = self.full_path 557 path = re.sub(r'^projects/', '', path) 558 path = re.sub(r'/serviceAccounts/', '/', path) 559 return path
Class represents the service account.
Add more fields as needed from the declaration: https://cloud.google.com/iam/docs/reference/rest/v1/projects.serviceAccounts#ServiceAccount
547 @property 548 def full_path(self) -> str: 549 # example: "name": 550 # "projects/skanzhelev-gke-dev/serviceAccounts/test-service-account-1 551 # @skanzhelev-gke-dev.iam.gserviceaccount.com" 552 return self.name
Returns the full path of this resource.
Example: 'projects/gcpdiag-gke-1-9b90/zones/europe-west4-a/clusters/gke1'
554 @property 555 def short_path(self) -> str: 556 path = self.full_path 557 path = re.sub(r'^projects/', '', path) 558 path = re.sub(r'/serviceAccounts/', '/', path) 559 return path
Returns the short name for this resource.
Note that it isn't clear from this name what kind of resource it is.
Example: 'gke1'
760def is_service_account_existing(email: str, context: models.Context) -> bool: 761 """Verify that a service account exists. 762 763 If we get a non-404 API error when retrieving the service account, we will 764 assume 765 that the service account exists, not to throw false positives (but 766 a warning will be printed out). 767 """ 768 # Make sure that the service account is fetched (this is also 769 # called by get_project_policy). 770 _batch_fetch_service_accounts([email], context) 771 return email not in _service_account_cache_is_not_found
Verify that a service account exists.
If we get a non-404 API error when retrieving the service account, we will assume that the service account exists, not to throw false positives (but a warning will be printed out).
774def is_service_account_enabled(email: str, context: models.Context) -> bool: 775 """Verify that a service account exists and is enabled. 776 777 If we get an API error when retrieving the service account, we will assume 778 that the service account is enabled, not to throw false positives (but 779 a warning will be printed out). 780 """ 781 _batch_fetch_service_accounts([email], context) 782 return (email not in _service_account_cache_is_not_found 783 ) and not (email in _service_account_cache and 784 _service_account_cache[email].disabled)
Verify that a service account exists and is enabled.
If we get an API error when retrieving the service account, we will assume that the service account is enabled, not to throw false positives (but a warning will be printed out).
787class ServiceAccountIAMPolicy(BaseIAMPolicy): 788 789 def _is_resource_permission(self, permission): 790 return True
Common class for IAM policies
793@caching.cached_api_call(in_memory=True) 794def get_service_account_iam_policy( 795 context: models.Context, service_account: str) -> ServiceAccountIAMPolicy: 796 """Returns an IAM policy for a service account""" 797 project_id = context.project_id 798 resource_name = f'projects/{project_id}/serviceAccounts/{service_account}' 799 800 iam_api = apis.get_api('iam', 'v1', project_id) 801 request = (iam_api.projects().serviceAccounts().getIamPolicy( 802 resource=resource_name)) 803 return fetch_iam_policy(request, ServiceAccountIAMPolicy, project_id, 804 resource_name, context)
Returns an IAM policy for a service account
807@caching.cached_api_call(in_memory=True) 808def get_service_account_list(project_id: str) -> List[ServiceAccount]: 809 """Returns list of service accounts""" 810 811 iam_api = apis.get_api('iam', 'v1', project_id) 812 project_name = f'projects/{project_id}' 813 request = iam_api.projects().serviceAccounts().list(name=project_name, 814 pageSize=100) 815 try: 816 response = request.execute(num_retries=config.API_RETRIES) 817 except googleapiclient.errors.HttpError as err: 818 raise utils.GcpApiError(err) from err 819 return [ 820 ServiceAccount(project_id, service_account) 821 for service_account in response.get('accounts', []) 822 ]
Returns list of service accounts