gcpdiag.queries.iam

Queries related to GCP Identity and Access Management.
class Role(gcpdiag.models.Resource):
33class Role(models.Resource):
34  """Represents an IAM role"""
35
36  def __init__(self, resource_data):
37    try:
38      project_id = utils.get_project_by_res_name(resource_data['name'])
39    except ValueError:
40      project_id = None
41
42    super().__init__(project_id=project_id)
43    self._resource_data = resource_data
44
45  @property
46  def name(self) -> str:
47    return self._resource_data['name']
48
49  @property
50  def full_path(self) -> str:
51    return self.name
52
53  @property
54  def permissions(self) -> List[str]:
55    # roles should usually include one or more permissions
56    return self._resource_data.get('includedPermissions', [])

Represents an IAM role

Role(resource_data)
36  def __init__(self, resource_data):
37    try:
38      project_id = utils.get_project_by_res_name(resource_data['name'])
39    except ValueError:
40      project_id = None
41
42    super().__init__(project_id=project_id)
43    self._resource_data = resource_data
name: str
45  @property
46  def name(self) -> str:
47    return self._resource_data['name']
full_path: str
49  @property
50  def full_path(self) -> str:
51    return self.name

Returns the full path of this resource.

Example: 'projects/gcpdiag-gke-1-9b90/zones/europe-west4-a/clusters/gke1'

permissions: List[str]
53  @property
54  def permissions(self) -> List[str]:
55    # roles should usually include one or more permissions
56    return self._resource_data.get('includedPermissions', [])
class RoleNotFoundError(builtins.Exception):
59class RoleNotFoundError(Exception):
60  pass

Common base class for all non-exit exceptions.

class BaseIAMPolicy(gcpdiag.models.Resource):
162class BaseIAMPolicy(models.Resource):
163  """Common class for IAM policies"""
164
165  _name: str
166  _policy_by_member: Dict[str, Any]
167
168  @property
169  def full_path(self):
170    return self._name
171
172  @abc.abstractmethod
173  def _is_resource_permission(self, permission: str) -> bool:
174    """Checks that a permission is applicable to the resource
175
176    Any role can be assigned on a resource level but only a subset of
177    permissions will be relevant to a resource
178    Irrelevant permissions are ignored in `has_role_permissions` method
179    """
180    pass
181
182  def _expand_policy(self, resource_data: Dict[str, Any]) -> Dict[str, Any]:
183    """Groups `getIamPolicy` bindings by member
184
185    API response contains a list of bindings of a role to members:
186    {
187      "bindings": [
188      {
189        "role": "roles/resourcemanager.organizationAdmin",
190        "members": [
191          "user:mike@example.com",
192          "serviceAccount:my-project-id@appspot.gserviceaccount.com"
193        ]
194      },
195      ...
196    }
197
198    This method will convert those bindings into the following structure:
199    {
200      "user:mike@example.com": {
201        "roles": { "roles/resourcemanager.organizationAdmin" },
202      },
203      "serviceAccount:my-project-id@appspot.gserviceaccount.com": {
204        "roles": { "roles/resourcemanager.organizationAdmin" },
205      },
206    }
207    """
208
209    policy_roles = set()
210    policy_by_member: Dict[str, Any] = collections.defaultdict(dict)
211
212    # Empty lists are omitted in GCP API responses
213    for binding in resource_data.get('bindings', []):
214      if 'condition' in binding:
215        logging.warning(
216            'IAM binding contains a condition, which would be ignored: %s',
217            binding,
218        )
219
220      # IAM binding should always have a role and at least one member
221      policy_roles.add(binding['role'])
222      for member in binding['members']:
223        member_policy = policy_by_member[member]
224        member_policy.setdefault('roles', set()).add(binding['role'])
225
226    # Populate cache for IAM roles used in the policy
227    # Unlike `has_role_permissions` this part will be executed inside
228    # `prefetch_rule` and will benefit from multi-threading execution
229    for role in policy_roles:
230      # Ignore all errors - there could be no rules involving this role
231      try:
232        _get_iam_role(role, self.project_id)
233      except (RoleNotFoundError, utils.GcpApiError) as err:
234        # Ignore roles if cannot retrieve a role
235        # For example, due to lack of permissions
236        if isinstance(err, utils.GcpApiError):
237          logging.error('API failure getting IAM roles: %s', err)
238          raise utils.GcpApiError(err) from err
239        elif isinstance(err, RoleNotFoundError):
240          logging.warning("Unable to get IAM role '%s', ignoring: %s", role,
241                          err)
242
243    # Populate cache for service accounts used in the policy
244    # Note: not implemented as a generator expression because
245    # it looks ugly without assignment expressions, available
246    # only with Python >= 3.8.
247    sa_emails = set()
248    for member in policy_by_member.keys():
249      # Note: not matching / makes sure that we don't match for example fleet
250      # workload identities:
251      # https://cloud.google.com/anthos/multicluster-management/fleets/workload-identity
252      m = re.match(r'serviceAccount:([^/]+)$', member)
253      if m:
254        sa_emails.add(m.group(1))
255    _batch_fetch_service_accounts(list(sa_emails), self.context)
256
257    return policy_by_member
258
259  def _expand_member_policy(self, member: str):
260    """Expands member roles into set of permissions
261
262    Permissions are using "lazy" initialization and only expanded if needed
263    """
264    member_policy = self._policy_by_member.get(member)
265    if not member_policy or 'permissions' in member_policy:
266      return
267
268    permissions = set()
269    for role in member_policy['roles']:
270      try:
271        permissions.update(_get_iam_role(role, self.project_id).permissions)
272      except (RoleNotFoundError, utils.GcpApiError) as err:
273        if isinstance(err, utils.GcpApiError):
274          logging.error('API failure getting IAM roles: %s', err)
275          raise utils.GcpApiError(err) from err
276        elif isinstance(err, RoleNotFoundError):
277          logging.warning("Unable to find IAM role '%s', ignoring: %s", role,
278                          err)
279    member_policy['permissions'] = permissions
280
281  def _is_active_member(self, member: str) -> bool:
282    """Checks that the member isn't disabled
283
284    Currently supports only service accounts and not other account types
285    Used in `has_role_permissions` and similar methods to ensure that
286    the member isn't disabled and permissions are effectively working
287    """
288
289    # If this is a service account, make sure that the service account is enabled.
290    # Note: not matching / makes sure that we don't match for example fleet
291    # workload identities:
292    # https://cloud.google.com/anthos/multicluster-management/fleets/workload-identity
293    m = re.match(r'serviceAccount:([^/]+)$', member)
294    if m:
295      if not is_service_account_enabled(m.group(1), self.context):
296        logging.info('service account %s is disabled', m.group(1))
297        return False
298
299    return True
300
301  def __init__(self, project_id: Optional[str], name: str,
302               resource_data: Dict[str, Any], context: models.Context):
303    super().__init__(project_id)
304    self._name = name
305    self.context = context
306    self._policy_by_member = self._expand_policy(resource_data)
307
308  def get_member_permissions(self, member: str) -> List[str]:
309    """Return permissions for a member (either a user or serviceAccount).
310
311    The "member" can be a user or a service account and must be specified with
312    the IAM member syntax, i.e. using the prefixes `user:` or `serviceAccount:`.
313    """
314
315    if member not in self._policy_by_member:
316      return []
317
318    self._expand_member_policy(member)
319    return sorted(self._policy_by_member[member]['permissions'])
320
321  def get_members(self) -> List[str]:
322    """Returns the IAM members of the project.
323
324    The "member" can be a user or a service account and is specified with
325    the IAM member syntax, i.e. using the prefixes `user:` or `serviceAccount:`.
326    """
327    return list(self._policy_by_member.keys())
328
329  def get_member_type(self, member) -> Optional[str]:
330    """Returns the IAM members of the project.
331
332    The "member" can be a user or a service account and is specified with
333    the IAM member syntax, i.e. using the prefixes `user:` or `serviceAccount:`.
334    """
335    for m in self._policy_by_member.keys():
336      parts = m.split(':')
337      if member == parts[1]:
338        return parts[0]
339    return None
340
341  def has_permission(self, member: str, permission: str) -> bool:
342    """Return true if user or service account member has this permission.
343
344    Note that any indirect bindings, for example through group membership,
345    aren't supported and only direct bindings to this member are checked
346    """
347
348    if member not in self._policy_by_member:
349      return False
350
351    self._expand_member_policy(member)
352    if permission not in self._policy_by_member[member]['permissions']:
353      return False
354    return self._is_active_member(member)
355
356  def has_any_permission(self, member: str, permission: set[str]) -> bool:
357    """Return true if user or service account member has any of these permission.
358
359    Note that any indirect bindings, for example through group membership,
360    aren't supported and only direct bindings to this member are checked
361    """
362
363    if member not in self._policy_by_member:
364      return False
365
366    self._expand_member_policy(member)
367    if any(
368        p in self._policy_by_member[member]['permissions'] for p in permission):
369      return True
370    return self._is_active_member(member)
371
372  def _has_role(self, member: str, role: str) -> bool:
373    """Checks that the member has this role
374
375    It performs exact match and doesn't expand role to list of permissions.
376    Note that this method is not public because users of this module should
377    use has_role_permissions(), i.e. verify effective permissions instead of
378    roles.
379    """
380
381    if member not in self._policy_by_member:
382      return False
383
384    if role not in self._policy_by_member[member]['roles']:
385      return False
386    return self._is_active_member(member)
387
388  def has_role_permissions(self, member: str, role: str) -> bool:
389    """Checks that this member has all the permissions defined by this role"""
390
391    if member not in self._policy_by_member:
392      return False
393
394    # Avoid expanding roles to permissions
395    if self._has_role(member, role):
396      # member status was already checked in `has_role`
397      return True
398
399    self._expand_member_policy(member)
400    role_permissions = {
401        p for p in _get_iam_role(role, self.project_id).permissions
402        if self._is_resource_permission(p)
403    }
404
405    missing_roles = (role_permissions -
406                     self._policy_by_member[member]['permissions'])
407    if missing_roles:
408      logging.debug(
409          "member '%s' doesn't have permissions %s",
410          member,
411          ','.join(missing_roles),
412      )
413      return False
414    return self._is_active_member(member)

Common class for IAM policies

full_path
168  @property
169  def full_path(self):
170    return self._name

Returns the full path of this resource.

Example: 'projects/gcpdiag-gke-1-9b90/zones/europe-west4-a/clusters/gke1'

context
def get_member_permissions(self, member: str) -> List[str]:
308  def get_member_permissions(self, member: str) -> List[str]:
309    """Return permissions for a member (either a user or serviceAccount).
310
311    The "member" can be a user or a service account and must be specified with
312    the IAM member syntax, i.e. using the prefixes `user:` or `serviceAccount:`.
313    """
314
315    if member not in self._policy_by_member:
316      return []
317
318    self._expand_member_policy(member)
319    return sorted(self._policy_by_member[member]['permissions'])

Return permissions for a member (either a user or serviceAccount).

The "member" can be a user or a service account and must be specified with the IAM member syntax, i.e. using the prefixes user: or serviceAccount:.

def get_members(self) -> List[str]:
321  def get_members(self) -> List[str]:
322    """Returns the IAM members of the project.
323
324    The "member" can be a user or a service account and is specified with
325    the IAM member syntax, i.e. using the prefixes `user:` or `serviceAccount:`.
326    """
327    return list(self._policy_by_member.keys())

Returns the IAM members of the project.

The "member" can be a user or a service account and is specified with the IAM member syntax, i.e. using the prefixes user: or serviceAccount:.

def get_member_type(self, member) -> Optional[str]:
329  def get_member_type(self, member) -> Optional[str]:
330    """Returns the IAM members of the project.
331
332    The "member" can be a user or a service account and is specified with
333    the IAM member syntax, i.e. using the prefixes `user:` or `serviceAccount:`.
334    """
335    for m in self._policy_by_member.keys():
336      parts = m.split(':')
337      if member == parts[1]:
338        return parts[0]
339    return None

Returns the IAM members of the project.

The "member" can be a user or a service account and is specified with the IAM member syntax, i.e. using the prefixes user: or serviceAccount:.

def has_permission(self, member: str, permission: str) -> bool:
341  def has_permission(self, member: str, permission: str) -> bool:
342    """Return true if user or service account member has this permission.
343
344    Note that any indirect bindings, for example through group membership,
345    aren't supported and only direct bindings to this member are checked
346    """
347
348    if member not in self._policy_by_member:
349      return False
350
351    self._expand_member_policy(member)
352    if permission not in self._policy_by_member[member]['permissions']:
353      return False
354    return self._is_active_member(member)

Return true if user or service account member has this permission.

Note that any indirect bindings, for example through group membership, aren't supported and only direct bindings to this member are checked

def has_any_permission(self, member: str, permission: set[str]) -> bool:
356  def has_any_permission(self, member: str, permission: set[str]) -> bool:
357    """Return true if user or service account member has any of these permission.
358
359    Note that any indirect bindings, for example through group membership,
360    aren't supported and only direct bindings to this member are checked
361    """
362
363    if member not in self._policy_by_member:
364      return False
365
366    self._expand_member_policy(member)
367    if any(
368        p in self._policy_by_member[member]['permissions'] for p in permission):
369      return True
370    return self._is_active_member(member)

Return true if user or service account member has any of these permission.

Note that any indirect bindings, for example through group membership, aren't supported and only direct bindings to this member are checked

def has_role_permissions(self, member: str, role: str) -> bool:
388  def has_role_permissions(self, member: str, role: str) -> bool:
389    """Checks that this member has all the permissions defined by this role"""
390
391    if member not in self._policy_by_member:
392      return False
393
394    # Avoid expanding roles to permissions
395    if self._has_role(member, role):
396      # member status was already checked in `has_role`
397      return True
398
399    self._expand_member_policy(member)
400    role_permissions = {
401        p for p in _get_iam_role(role, self.project_id).permissions
402        if self._is_resource_permission(p)
403    }
404
405    missing_roles = (role_permissions -
406                     self._policy_by_member[member]['permissions'])
407    if missing_roles:
408      logging.debug(
409          "member '%s' doesn't have permissions %s",
410          member,
411          ','.join(missing_roles),
412      )
413      return False
414    return self._is_active_member(member)

Checks that this member has all the permissions defined by this role

def fetch_iam_policy( request, resource_class: Type[BaseIAMPolicy], project_id: Optional[str], name: str, context: gcpdiag.models.Context, raise_error_if_fails=True):
417def fetch_iam_policy(
418    request,
419    resource_class: Type[BaseIAMPolicy],
420    project_id: Optional[str],
421    name: str,
422    context: models.Context,
423    raise_error_if_fails=True,
424):
425  """Executes `getIamPolicy` request and converts into a resource class
426
427  Supposed to be used by `get_*_policy` functions in gcpdiag.queries.* and
428  requires an API request, which can be executed, to be passed in parameters
429
430  An abstract policy request should look like:
431    class ResourcePolicy(BaseIAMPolicy):
432      pass
433
434    def get_resource_policy(name):
435      api_request = get_api(..).resources().get(name=name)
436      ...
437      return fetch_iam_policy(api_request, ResourcePolicy, project_id, name)
438
439  Note: API calls aren't cached and it should be done externally
440  """
441
442  logging.info("fetching IAM policy of '%s'", name)
443  try:
444    response = request.execute(num_retries=config.API_RETRIES)
445  except googleapiclient.errors.HttpError as err:
446    if raise_error_if_fails:
447      raise utils.GcpApiError(err) from err
448    else:
449      return
450  return resource_class(project_id, name, response, context)

Executes getIamPolicy request and converts into a resource class

Supposed to be used by get_*_policy functions in gcpdiag.queries.* and requires an API request, which can be executed, to be passed in parameters

An abstract policy request should look like:

class ResourcePolicy(BaseIAMPolicy): pass

def get_resource_policy(name): api_request = get_api(..).resources().get(name=name) ... return fetch_iam_policy(api_request, ResourcePolicy, project_id, name)

Note: API calls aren't cached and it should be done externally

class ProjectPolicy(BaseIAMPolicy):
453class ProjectPolicy(BaseIAMPolicy):
454  """Represents the IAM policy of a single project.
455
456  Note that you should use the get_project_policy() method so that the
457  objects are cached and you don't re-fetch the project policy.
458
459  See also the API documentation:
460  https://cloud.google.com/resource-manager/reference/rest/v1/projects/getIamPolicy
461  """
462
463  def _is_resource_permission(self, permission: str) -> bool:
464    # Filter out permissions that can be granted only on organization or folders
465    # It also excludes some permissions that aren't supported in custom roles
466    #
467    # https://cloud.google.com/resource-manager/docs/access-control-proj#permissions
468    # https://cloud.google.com/monitoring/access-control#custom_roles
469    if permission.startswith('resourcemanager.projects.'
470                            ) or permission.startswith('stackdriver.projects.'):
471      return False
472    return True

Represents the IAM policy of a single project.

Note that you should use the get_project_policy() method so that the objects are cached and you don't re-fetch the project policy.

See also the API documentation: https://cloud.google.com/resource-manager/reference/rest/v1/projects/getIamPolicy

@caching.cached_api_call(in_memory=True)
def get_project_policy( context: gcpdiag.models.Context, raise_error_if_fails=True) -> ProjectPolicy:
475@caching.cached_api_call(in_memory=True)
476def get_project_policy(context: models.Context,
477                       raise_error_if_fails=True) -> ProjectPolicy:
478  """Return the ProjectPolicy object for a project, caching the result."""
479  project_id = context.project_id
480  resource_name = f'projects/{project_id}'
481
482  crm_api = apis.get_api('cloudresourcemanager', 'v3', project_id)
483  request = crm_api.projects().getIamPolicy(resource='projects/' + project_id)
484  return fetch_iam_policy(request, ProjectPolicy, project_id, resource_name,
485                          context, raise_error_if_fails)

Return the ProjectPolicy object for a project, caching the result.

class OrganizationPolicy(BaseIAMPolicy):
488class OrganizationPolicy(BaseIAMPolicy):
489  """Represents the IAM policy of a single organization using v1 API.
490
491  See also the API documentation:
492  https://cloud.google.com/resource-manager/reference/rest/v1/organizations/getIamPolicy
493  """
494
495  def _is_resource_permission(self, permission: str) -> bool:
496    # Filter out permissions that can be granted only on projects or folders
497    if permission.startswith(
498        'resourcemanager.projects.') or permission.startswith(
499            'resourcemanager.folders.'):
500      return False
501    return True

Represents the IAM policy of a single organization using v1 API.

See also the API documentation: https://cloud.google.com/resource-manager/reference/rest/v1/organizations/getIamPolicy

@caching.cached_api_call(in_memory=True)
def get_organization_policy( context: gcpdiag.models.Context, organization_id: str, raise_error_if_fails=True) -> OrganizationPolicy:
504@caching.cached_api_call(in_memory=True)
505def get_organization_policy(context: models.Context,
506                            organization_id: str,
507                            raise_error_if_fails=True) -> OrganizationPolicy:
508  """Return the OrganizationPolicy object for an organization, caching the result."""
509
510  resource_name = f'organizations/{organization_id}'
511
512  crm_api = apis.get_api('cloudresourcemanager', 'v1')
513  request = crm_api.organizations().getIamPolicy(resource=resource_name)
514  return fetch_iam_policy(request, OrganizationPolicy, None, resource_name,
515                          context, raise_error_if_fails)

Return the OrganizationPolicy object for an organization, caching the result.

class ServiceAccount(gcpdiag.models.Resource):
518class ServiceAccount(models.Resource):
519  """Class represents the service account.
520
521  Add more fields as needed from the declaration:
522  https://cloud.google.com/iam/docs/reference/rest/v1/projects.serviceAccounts#ServiceAccount
523  """
524
525  _resource_data: dict
526
527  def __init__(self, project_id, resource_data):
528    super().__init__(project_id=project_id)
529    self._resource_data = resource_data
530
531  @property
532  def name(self) -> str:
533    return self._resource_data['name']
534
535  @property
536  def email(self) -> str:
537    return self._resource_data['email']
538
539  @property
540  def unique_id(self) -> str:
541    return self._resource_data['uniqueId']
542
543  @property
544  def disabled(self) -> bool:
545    return self._resource_data.get('disabled', False)
546
547  @property
548  def full_path(self) -> str:
549    # example: "name":
550    # "projects/skanzhelev-gke-dev/serviceAccounts/test-service-account-1
551    #                               @skanzhelev-gke-dev.iam.gserviceaccount.com"
552    return self.name
553
554  @property
555  def short_path(self) -> str:
556    path = self.full_path
557    path = re.sub(r'^projects/', '', path)
558    path = re.sub(r'/serviceAccounts/', '/', path)
559    return path

Class represents the service account.

Add more fields as needed from the declaration: https://cloud.google.com/iam/docs/reference/rest/v1/projects.serviceAccounts#ServiceAccount

ServiceAccount(project_id, resource_data)
527  def __init__(self, project_id, resource_data):
528    super().__init__(project_id=project_id)
529    self._resource_data = resource_data
name: str
531  @property
532  def name(self) -> str:
533    return self._resource_data['name']
email: str
535  @property
536  def email(self) -> str:
537    return self._resource_data['email']
unique_id: str
539  @property
540  def unique_id(self) -> str:
541    return self._resource_data['uniqueId']
disabled: bool
543  @property
544  def disabled(self) -> bool:
545    return self._resource_data.get('disabled', False)
full_path: str
547  @property
548  def full_path(self) -> str:
549    # example: "name":
550    # "projects/skanzhelev-gke-dev/serviceAccounts/test-service-account-1
551    #                               @skanzhelev-gke-dev.iam.gserviceaccount.com"
552    return self.name

Returns the full path of this resource.

Example: 'projects/gcpdiag-gke-1-9b90/zones/europe-west4-a/clusters/gke1'

short_path: str
554  @property
555  def short_path(self) -> str:
556    path = self.full_path
557    path = re.sub(r'^projects/', '', path)
558    path = re.sub(r'/serviceAccounts/', '/', path)
559    return path

Returns the short name for this resource.

Note that it isn't clear from this name what kind of resource it is.

Example: 'gke1'

SERVICE_AGENT_DOMAINS = ('cloudservices.gserviceaccount.com', 'cloudbuild.gserviceaccount.com', 'cloudcomposer-accounts.iam.gserviceaccount.com', 'cloud-filer.iam.gserviceaccount.com', 'cloud-memcache-sa.iam.gserviceaccount.com', 'cloud-ml.google.com.iam.gserviceaccount.com', 'cloud-redis.iam.gserviceaccount.com', 'cloud-tpu.iam.gserviceaccount.com', 'compute-system.iam.gserviceaccount.com', 'container-analysis.iam.gserviceaccount.com', 'container-engine-robot.iam.gserviceaccount.com', 'containerregistry.iam.gserviceaccount.com', 'dataflow-service-producer-prod.iam.gserviceaccount.com', 'dataproc-accounts.iam.gserviceaccount.com', 'dlp-api.iam.gserviceaccount.com', 'endpoints-portal.iam.gserviceaccount.com', 'firebase-rules.iam.gserviceaccount.com', 'gae-api-prod.google.com.iam.gserviceaccount.com', 'gcf-admin-robot.iam.gserviceaccount.com', 'gcp-gae-service.iam.gserviceaccount.com', 'genomics-api.google.com.iam.gserviceaccount.com', 'remotebuildexecution.iam.gserviceaccount.com', 'serverless-robot-prod.iam.gserviceaccount.com', 'service-consumer-management.iam.gserviceaccount.com', 'service-networking.iam.gserviceaccount.com', 'sourcerepo-service-accounts.iam.gserviceaccount.com', 'appspot.gserviceaccount.com', 'cloudservices.gserviceaccount.com', 'crashlytics-bigquery-prod.iam.gserviceaccount.com', 'fcm-bq-export-prod.iam.gserviceaccount.com', 'firebase-sa-management.iam.gserviceaccount.com', 'performance-bq-export-prod.iam.gserviceaccount.com', 'predictions-bq-export-prod.iam.gserviceaccount.com', 'system.gserviceaccount.com')
DEFAULT_SERVICE_ACCOUNT_DOMAINS = ('appspot.gserviceaccount.com', 'developer.gserviceaccount.com')
def is_service_account_existing(email: str, context: gcpdiag.models.Context) -> bool:
760def is_service_account_existing(email: str, context: models.Context) -> bool:
761  """Verify that a service account exists.
762
763  If we get a non-404 API error when retrieving the service account, we will
764  assume
765  that the service account exists, not to throw false positives (but
766  a warning will be printed out).
767  """
768  # Make sure that the service account is fetched (this is also
769  # called by get_project_policy).
770  _batch_fetch_service_accounts([email], context)
771  return email not in _service_account_cache_is_not_found

Verify that a service account exists.

If we get a non-404 API error when retrieving the service account, we will assume that the service account exists, not to throw false positives (but a warning will be printed out).

def is_service_account_enabled(email: str, context: gcpdiag.models.Context) -> bool:
774def is_service_account_enabled(email: str, context: models.Context) -> bool:
775  """Verify that a service account exists and is enabled.
776
777  If we get an API error when retrieving the service account, we will assume
778  that the service account is enabled, not to throw false positives (but
779  a warning will be printed out).
780  """
781  _batch_fetch_service_accounts([email], context)
782  return (email not in _service_account_cache_is_not_found
783         ) and not (email in _service_account_cache and
784                    _service_account_cache[email].disabled)

Verify that a service account exists and is enabled.

If we get an API error when retrieving the service account, we will assume that the service account is enabled, not to throw false positives (but a warning will be printed out).

class ServiceAccountIAMPolicy(BaseIAMPolicy):
787class ServiceAccountIAMPolicy(BaseIAMPolicy):
788
789  def _is_resource_permission(self, permission):
790    return True

Common class for IAM policies

@caching.cached_api_call(in_memory=True)
def get_service_account_iam_policy( context: gcpdiag.models.Context, service_account: str) -> ServiceAccountIAMPolicy:
793@caching.cached_api_call(in_memory=True)
794def get_service_account_iam_policy(
795    context: models.Context, service_account: str) -> ServiceAccountIAMPolicy:
796  """Returns an IAM policy for a service account"""
797  project_id = context.project_id
798  resource_name = f'projects/{project_id}/serviceAccounts/{service_account}'
799
800  iam_api = apis.get_api('iam', 'v1', project_id)
801  request = (iam_api.projects().serviceAccounts().getIamPolicy(
802      resource=resource_name))
803  return fetch_iam_policy(request, ServiceAccountIAMPolicy, project_id,
804                          resource_name, context)

Returns an IAM policy for a service account

@caching.cached_api_call(in_memory=True)
def get_service_account_list(project_id: str) -> List[ServiceAccount]:
807@caching.cached_api_call(in_memory=True)
808def get_service_account_list(project_id: str) -> List[ServiceAccount]:
809  """Returns list of service accounts"""
810
811  iam_api = apis.get_api('iam', 'v1', project_id)
812  project_name = f'projects/{project_id}'
813  request = iam_api.projects().serviceAccounts().list(name=project_name,
814                                                      pageSize=100)
815  try:
816    response = request.execute(num_retries=config.API_RETRIES)
817  except googleapiclient.errors.HttpError as err:
818    raise utils.GcpApiError(err) from err
819  return [
820      ServiceAccount(project_id, service_account)
821      for service_account in response.get('accounts', [])
822  ]

Returns list of service accounts