gcpdiag.queries.iam

Queries related to GCP Identity and Access Management.
class Role(gcpdiag.models.Resource):
33class Role(models.Resource):
34  """Represents an IAM role"""
35
36  def __init__(self, resource_data):
37    try:
38      project_id = utils.get_project_by_res_name(resource_data['name'])
39    except ValueError:
40      project_id = None
41
42    super().__init__(project_id=project_id)
43    self._resource_data = resource_data
44
45  @property
46  def name(self) -> str:
47    return self._resource_data['name']
48
49  @property
50  def full_path(self) -> str:
51    return self.name
52
53  @property
54  def permissions(self) -> List[str]:
55    # roles should usually include one or more permissions
56    return self._resource_data.get('includedPermissions', [])

Represents an IAM role

Role(resource_data)
36  def __init__(self, resource_data):
37    try:
38      project_id = utils.get_project_by_res_name(resource_data['name'])
39    except ValueError:
40      project_id = None
41
42    super().__init__(project_id=project_id)
43    self._resource_data = resource_data
name: str
45  @property
46  def name(self) -> str:
47    return self._resource_data['name']
full_path: str
49  @property
50  def full_path(self) -> str:
51    return self.name

Returns the full path of this resource.

Example: 'projects/gcpdiag-gke-1-9b90/zones/europe-west4-a/clusters/gke1'

permissions: List[str]
53  @property
54  def permissions(self) -> List[str]:
55    # roles should usually include one or more permissions
56    return self._resource_data.get('includedPermissions', [])
class RoleNotFoundError(builtins.Exception):
59class RoleNotFoundError(Exception):
60  pass

Common base class for all non-exit exceptions.

class BaseIAMPolicy(gcpdiag.models.Resource):
162class BaseIAMPolicy(models.Resource):
163  """Common class for IAM policies"""
164
165  _name: str
166  _policy_by_member: Dict[str, Any]
167
168  @property
169  def full_path(self):
170    return self._name
171
172  @abc.abstractmethod
173  def _is_resource_permission(self, permission: str) -> bool:
174    """Checks that a permission is applicable to the resource
175
176    Any role can be assigned on a resource level but only a subset of
177    permissions will be relevant to a resource
178    Irrelevant permissions are ignored in `has_role_permissions` method
179    """
180    pass
181
182  def _expand_policy(self, resource_data: Dict[str, Any]) -> Dict[str, Any]:
183    """Groups `getIamPolicy` bindings by member
184
185    API response contains a list of bindings of a role to members:
186    {
187      "bindings": [
188      {
189        "role": "roles/resourcemanager.organizationAdmin",
190        "members": [
191          "user:mike@example.com",
192          "serviceAccount:my-project-id@appspot.gserviceaccount.com"
193        ]
194      },
195      ...
196    }
197
198    This method will convert those bindings into the following structure:
199    {
200      "user:mike@example.com": {
201        "roles": { "roles/resourcemanager.organizationAdmin" },
202      },
203      "serviceAccount:my-project-id@appspot.gserviceaccount.com": {
204        "roles": { "roles/resourcemanager.organizationAdmin" },
205      },
206    }
207    """
208
209    policy_roles = set()
210    policy_by_member: Dict[str, Any] = defaultdict(dict)
211
212    # Empty lists are omitted in GCP API responses
213    for binding in resource_data.get('bindings', []):
214      if 'condition' in binding:
215        logging.warning(
216            'IAM binding contains a condition, which would be ignored: %s',
217            binding)
218
219      # IAM binding should always have a role and at least one member
220      policy_roles.add(binding['role'])
221      for member in binding['members']:
222        member_policy = policy_by_member[member]
223        member_policy.setdefault('roles', set()).add(binding['role'])
224
225    # Populate cache for IAM roles used in the policy
226    # Unlike `has_role_permissions` this part will be executed inside
227    # `prefetch_rule` and will benefit from multi-threading execution
228    for role in policy_roles:
229      # Ignore all errors - there could be no rules involving this role
230      try:
231        _get_iam_role(role, self.project_id)
232      except (RoleNotFoundError, utils.GcpApiError) as err:
233        # Ignore roles if cannot retrieve a role
234        # For example, due to lack of permissions
235        if isinstance(err, utils.GcpApiError):
236          logging.error('API failure getting IAM roles: %s', err)
237          raise utils.GcpApiError(err) from err
238        elif isinstance(err, RoleNotFoundError):
239          logging.warning("Unable to get IAM role '%s', ignoring: %s", role,
240                          err)
241
242    # Populate cache for service accounts used in the policy
243    # Note: not implemented as a generator expression because
244    # it looks ugly without assignment expressions, available
245    # only with Python >= 3.8.
246    sa_emails = set()
247    for member in policy_by_member.keys():
248      # Note: not matching / makes sure that we don't match for example fleet
249      # workload identities:
250      # https://cloud.google.com/anthos/multicluster-management/fleets/workload-identity
251      m = re.match(r'serviceAccount:([^/]+)$', member)
252      if m:
253        sa_emails.add(m.group(1))
254    _batch_fetch_service_accounts(list(sa_emails), self.project_id)
255
256    return policy_by_member
257
258  def _expand_member_policy(self, member: str):
259    """Expands member roles into set of permissions
260
261    Permissions are using "lazy" initialization and only expanded if needed
262    """
263    member_policy = self._policy_by_member.get(member)
264    if not member_policy or \
265        'permissions' in member_policy:
266      return
267
268    permissions = set()
269    for role in member_policy['roles']:
270      try:
271        permissions.update(_get_iam_role(role, self.project_id).permissions)
272      except (RoleNotFoundError, utils.GcpApiError) as err:
273        if isinstance(err, utils.GcpApiError):
274          logging.error('API failure getting IAM roles: %s', err)
275          raise utils.GcpApiError(err) from err
276        elif isinstance(err, RoleNotFoundError):
277          logging.warning("Unable to find IAM role '%s', ignoring: %s", role,
278                          err)
279    member_policy['permissions'] = permissions
280
281  def _is_active_member(self, member: str) -> bool:
282    """Checks that the member isn't disabled
283
284    Currently supports only service accounts and not other account types
285    Used in `has_role_permissions` and similar methods to ensure that
286    the member isn't disabled and permissions are effectively working
287    """
288
289    # If this is a service account, make sure that the service account is enabled.
290    # Note: not matching / makes sure that we don't match for example fleet
291    # workload identities:
292    # https://cloud.google.com/anthos/multicluster-management/fleets/workload-identity
293    m = re.match(r'serviceAccount:([^/]+)$', member)
294    if m:
295      if not is_service_account_enabled(m.group(1), self.project_id):
296        logging.info('service account %s is disabled', m.group(1))
297        return False
298
299    return True
300
301  def __init__(self, project_id: str, name: str, resource_data: Dict[str, Any]):
302    super().__init__(project_id)
303    self._name = name
304    self._policy_by_member = self._expand_policy(resource_data)
305
306  def get_member_permissions(self, member: str) -> List[str]:
307    """Return permissions for a member (either a user or serviceAccount).
308
309    The "member" can be a user or a service account and must be specified with
310    the IAM member syntax, i.e. using the prefixes `user:` or `serviceAccount:`.
311    """
312
313    if member not in self._policy_by_member:
314      return []
315
316    self._expand_member_policy(member)
317    return sorted(self._policy_by_member[member]['permissions'])
318
319  def get_members(self) -> List[str]:
320    """Returns the IAM members of the project.
321
322    The "member" can be a user or a service account and is specified with
323    the IAM member syntax, i.e. using the prefixes `user:` or `serviceAccount:`.
324    """
325    return list(self._policy_by_member.keys())
326
327  def get_member_type(self, member) -> Optional[str]:
328    """Returns the IAM members of the project.
329
330    The "member" can be a user or a service account and is specified with
331    the IAM member syntax, i.e. using the prefixes `user:` or `serviceAccount:`.
332    """
333    for m in self._policy_by_member.keys():
334      parts = m.split(':')
335      if member == parts[1]:
336        return parts[0]
337    return None
338
339  def has_permission(self, member: str, permission: str) -> bool:
340    """Return true if user or service account member has this permission.
341
342    Note that any indirect bindings, for example through group membership,
343    aren't supported and only direct bindings to this member are checked
344    """
345
346    if member not in self._policy_by_member:
347      return False
348
349    self._expand_member_policy(member)
350    if permission not in self._policy_by_member[member]['permissions']:
351      return False
352    return self._is_active_member(member)
353
354  def has_any_permission(self, member: str, permission: set[str]) -> bool:
355    """Return true if user or service account member has any of these permission.
356
357    Note that any indirect bindings, for example through group membership,
358    aren't supported and only direct bindings to this member are checked
359    """
360
361    if member not in self._policy_by_member:
362      return False
363
364    self._expand_member_policy(member)
365    if any(
366        p in self._policy_by_member[member]['permissions'] for p in permission):
367      return True
368    return self._is_active_member(member)
369
370  def _has_role(self, member: str, role: str) -> bool:
371    """Checks that the member has this role
372
373    It performs exact match and doesn't expand role to list of permissions.
374    Note that this method is not public because users of this module should
375    use has_role_permissions(), i.e. verify effective permissions instead of
376    roles."""
377
378    if member not in self._policy_by_member:
379      return False
380
381    if role not in self._policy_by_member[member]['roles']:
382      return False
383    return self._is_active_member(member)
384
385  def has_role_permissions(self, member: str, role: str) -> bool:
386    """Checks that this member has all the permissions defined by this role"""
387
388    if member not in self._policy_by_member:
389      return False
390
391    # Avoid expanding roles to permissions
392    if self._has_role(member, role):
393      # member status was already checked in `has_role`
394      return True
395
396    self._expand_member_policy(member)
397    role_permissions = {
398        p for p in _get_iam_role(role, self.project_id).permissions
399        if self._is_resource_permission(p)
400    }
401
402    missing_roles = role_permissions - self._policy_by_member[member][
403        'permissions']
404    if missing_roles:
405      logging.debug('member \'%s\' doesn\'t have permissions %s', member,
406                    ','.join(missing_roles))
407      return False
408    return self._is_active_member(member)

Common class for IAM policies

full_path
168  @property
169  def full_path(self):
170    return self._name

Returns the full path of this resource.

Example: 'projects/gcpdiag-gke-1-9b90/zones/europe-west4-a/clusters/gke1'

def get_member_permissions(self, member: str) -> List[str]:
306  def get_member_permissions(self, member: str) -> List[str]:
307    """Return permissions for a member (either a user or serviceAccount).
308
309    The "member" can be a user or a service account and must be specified with
310    the IAM member syntax, i.e. using the prefixes `user:` or `serviceAccount:`.
311    """
312
313    if member not in self._policy_by_member:
314      return []
315
316    self._expand_member_policy(member)
317    return sorted(self._policy_by_member[member]['permissions'])

Return permissions for a member (either a user or serviceAccount).

The "member" can be a user or a service account and must be specified with the IAM member syntax, i.e. using the prefixes user: or serviceAccount:.

def get_members(self) -> List[str]:
319  def get_members(self) -> List[str]:
320    """Returns the IAM members of the project.
321
322    The "member" can be a user or a service account and is specified with
323    the IAM member syntax, i.e. using the prefixes `user:` or `serviceAccount:`.
324    """
325    return list(self._policy_by_member.keys())

Returns the IAM members of the project.

The "member" can be a user or a service account and is specified with the IAM member syntax, i.e. using the prefixes user: or serviceAccount:.

def get_member_type(self, member) -> Optional[str]:
327  def get_member_type(self, member) -> Optional[str]:
328    """Returns the IAM members of the project.
329
330    The "member" can be a user or a service account and is specified with
331    the IAM member syntax, i.e. using the prefixes `user:` or `serviceAccount:`.
332    """
333    for m in self._policy_by_member.keys():
334      parts = m.split(':')
335      if member == parts[1]:
336        return parts[0]
337    return None

Returns the IAM members of the project.

The "member" can be a user or a service account and is specified with the IAM member syntax, i.e. using the prefixes user: or serviceAccount:.

def has_permission(self, member: str, permission: str) -> bool:
339  def has_permission(self, member: str, permission: str) -> bool:
340    """Return true if user or service account member has this permission.
341
342    Note that any indirect bindings, for example through group membership,
343    aren't supported and only direct bindings to this member are checked
344    """
345
346    if member not in self._policy_by_member:
347      return False
348
349    self._expand_member_policy(member)
350    if permission not in self._policy_by_member[member]['permissions']:
351      return False
352    return self._is_active_member(member)

Return true if user or service account member has this permission.

Note that any indirect bindings, for example through group membership, aren't supported and only direct bindings to this member are checked

def has_any_permission(self, member: str, permission: set[str]) -> bool:
354  def has_any_permission(self, member: str, permission: set[str]) -> bool:
355    """Return true if user or service account member has any of these permission.
356
357    Note that any indirect bindings, for example through group membership,
358    aren't supported and only direct bindings to this member are checked
359    """
360
361    if member not in self._policy_by_member:
362      return False
363
364    self._expand_member_policy(member)
365    if any(
366        p in self._policy_by_member[member]['permissions'] for p in permission):
367      return True
368    return self._is_active_member(member)

Return true if user or service account member has any of these permission.

Note that any indirect bindings, for example through group membership, aren't supported and only direct bindings to this member are checked

def has_role_permissions(self, member: str, role: str) -> bool:
385  def has_role_permissions(self, member: str, role: str) -> bool:
386    """Checks that this member has all the permissions defined by this role"""
387
388    if member not in self._policy_by_member:
389      return False
390
391    # Avoid expanding roles to permissions
392    if self._has_role(member, role):
393      # member status was already checked in `has_role`
394      return True
395
396    self._expand_member_policy(member)
397    role_permissions = {
398        p for p in _get_iam_role(role, self.project_id).permissions
399        if self._is_resource_permission(p)
400    }
401
402    missing_roles = role_permissions - self._policy_by_member[member][
403        'permissions']
404    if missing_roles:
405      logging.debug('member \'%s\' doesn\'t have permissions %s', member,
406                    ','.join(missing_roles))
407      return False
408    return self._is_active_member(member)

Checks that this member has all the permissions defined by this role

def fetch_iam_policy( request, resource_class: Type[BaseIAMPolicy], project_id: str, name: str):
411def fetch_iam_policy(request, resource_class: Type[BaseIAMPolicy],
412                     project_id: str, name: str):
413  """Executes `getIamPolicy` request and converts into a resource class
414
415  Supposed to be used by `get_*_policy` functions in gcpdiag.queries.* and
416  requires an API request, which can be executed, to be passed in parameters
417
418  An abstract policy request should look like:
419    class ResourcePolicy(BaseIAMPolicy):
420      pass
421
422    def get_resource_policy(name):
423      api_request = get_api(..).resources().get(name=name)
424      ...
425      return fetch_iam_policy(api_request, ResourcePolicy, project_id, name)
426
427  Note: API calls aren't cached and it should be done externally
428  """
429
430  logging.info('fetching IAM policy of \'%s\'', name)
431  try:
432    response = request.execute(num_retries=config.API_RETRIES)
433  except googleapiclient.errors.HttpError as err:
434    raise utils.GcpApiError(err) from err
435  return resource_class(project_id, name, response)

Executes getIamPolicy request and converts into a resource class

Supposed to be used by get_*_policy functions in gcpdiag.queries.* and requires an API request, which can be executed, to be passed in parameters

An abstract policy request should look like:

class ResourcePolicy(BaseIAMPolicy): pass

def get_resource_policy(name): api_request = get_api(..).resources().get(name=name) ... return fetch_iam_policy(api_request, ResourcePolicy, project_id, name)

Note: API calls aren't cached and it should be done externally

class ProjectPolicy(BaseIAMPolicy):
438class ProjectPolicy(BaseIAMPolicy):
439  """Represents the IAM policy of a single project.
440
441  Note that you should use the get_project_policy() method so that the
442  objects are cached and you don't re-fetch the project policy.
443
444  See also the API documentation:
445  https://cloud.google.com/resource-manager/reference/rest/v1/projects/getIamPolicy
446  """
447
448  def _is_resource_permission(self, permission: str) -> bool:
449    # Filter out permissions that can be granted only on organization or folders
450    # It also excludes some permissions that aren't supported in custom roles
451    #
452    # https://cloud.google.com/resource-manager/docs/access-control-proj#permissions
453    # https://cloud.google.com/monitoring/access-control#custom_roles
454    if permission.startswith('resourcemanager.projects.') or \
455        permission.startswith('stackdriver.projects.'):
456      return False
457    return True

Represents the IAM policy of a single project.

Note that you should use the get_project_policy() method so that the objects are cached and you don't re-fetch the project policy.

See also the API documentation: https://cloud.google.com/resource-manager/reference/rest/v1/projects/getIamPolicy

@caching.cached_api_call(in_memory=True)
def get_project_policy(project_id: str) -> ProjectPolicy:
460@caching.cached_api_call(in_memory=True)
461def get_project_policy(project_id: str) -> ProjectPolicy:
462  """Return the ProjectPolicy object for a project, caching the result."""
463
464  resource_name = f'projects/{project_id}'
465
466  crm_api = apis.get_api('cloudresourcemanager', 'v3', project_id)
467  request = crm_api.projects().getIamPolicy(resource='projects/' + project_id)
468
469  return fetch_iam_policy(request, ProjectPolicy, project_id, resource_name)

Return the ProjectPolicy object for a project, caching the result.

class ServiceAccount(gcpdiag.models.Resource):
472class ServiceAccount(models.Resource):
473  """ Class represents the service account.
474
475  Add more fields as needed from the declaration:
476  https://cloud.google.com/iam/docs/reference/rest/v1/projects.serviceAccounts#ServiceAccount
477  """
478  _resource_data: dict
479
480  def __init__(self, project_id, resource_data):
481    super().__init__(project_id=project_id)
482    self._resource_data = resource_data
483
484  @property
485  def name(self) -> str:
486    return self._resource_data['name']
487
488  @property
489  def email(self) -> str:
490    return self._resource_data['email']
491
492  @property
493  def unique_id(self) -> str:
494    return self._resource_data['uniqueId']
495
496  @property
497  def disabled(self) -> bool:
498    return self._resource_data.get('disabled', False)
499
500  @property
501  def full_path(self) -> str:
502    # example: "name":
503    # "projects/skanzhelev-gke-dev/serviceAccounts/test-service-account-1
504    #                               @skanzhelev-gke-dev.iam.gserviceaccount.com"
505    return self.name
506
507  @property
508  def short_path(self) -> str:
509    path = self.full_path
510    path = re.sub(r'^projects/', '', path)
511    path = re.sub(r'/serviceAccounts/', '/', path)
512    return path

Class represents the service account.

Add more fields as needed from the declaration: https://cloud.google.com/iam/docs/reference/rest/v1/projects.serviceAccounts#ServiceAccount

ServiceAccount(project_id, resource_data)
480  def __init__(self, project_id, resource_data):
481    super().__init__(project_id=project_id)
482    self._resource_data = resource_data
name: str
484  @property
485  def name(self) -> str:
486    return self._resource_data['name']
email: str
488  @property
489  def email(self) -> str:
490    return self._resource_data['email']
unique_id: str
492  @property
493  def unique_id(self) -> str:
494    return self._resource_data['uniqueId']
disabled: bool
496  @property
497  def disabled(self) -> bool:
498    return self._resource_data.get('disabled', False)
full_path: str
500  @property
501  def full_path(self) -> str:
502    # example: "name":
503    # "projects/skanzhelev-gke-dev/serviceAccounts/test-service-account-1
504    #                               @skanzhelev-gke-dev.iam.gserviceaccount.com"
505    return self.name

Returns the full path of this resource.

Example: 'projects/gcpdiag-gke-1-9b90/zones/europe-west4-a/clusters/gke1'

short_path: str
507  @property
508  def short_path(self) -> str:
509    path = self.full_path
510    path = re.sub(r'^projects/', '', path)
511    path = re.sub(r'/serviceAccounts/', '/', path)
512    return path

Returns the short name for this resource.

Note that it isn't clear from this name what kind of resource it is.

Example: 'gke1'

SERVICE_AGENT_DOMAINS = ('cloudservices.gserviceaccount.com', 'cloudbuild.gserviceaccount.com', 'cloudcomposer-accounts.iam.gserviceaccount.com', 'cloud-filer.iam.gserviceaccount.com', 'cloud-memcache-sa.iam.gserviceaccount.com', 'cloud-ml.google.com.iam.gserviceaccount.com', 'cloud-redis.iam.gserviceaccount.com', 'cloud-tpu.iam.gserviceaccount.com', 'compute-system.iam.gserviceaccount.com', 'container-analysis.iam.gserviceaccount.com', 'container-engine-robot.iam.gserviceaccount.com', 'containerregistry.iam.gserviceaccount.com', 'dataflow-service-producer-prod.iam.gserviceaccount.com', 'dataproc-accounts.iam.gserviceaccount.com', 'dlp-api.iam.gserviceaccount.com', 'endpoints-portal.iam.gserviceaccount.com', 'firebase-rules.iam.gserviceaccount.com', 'gae-api-prod.google.com.iam.gserviceaccount.com', 'gcf-admin-robot.iam.gserviceaccount.com', 'gcp-gae-service.iam.gserviceaccount.com', 'genomics-api.google.com.iam.gserviceaccount.com', 'remotebuildexecution.iam.gserviceaccount.com', 'serverless-robot-prod.iam.gserviceaccount.com', 'service-consumer-management.iam.gserviceaccount.com', 'service-networking.iam.gserviceaccount.com', 'sourcerepo-service-accounts.iam.gserviceaccount.com', 'appspot.gserviceaccount.com', 'cloudservices.gserviceaccount.com', 'crashlytics-bigquery-prod.iam.gserviceaccount.com', 'fcm-bq-export-prod.iam.gserviceaccount.com', 'firebase-sa-management.iam.gserviceaccount.com', 'performance-bq-export-prod.iam.gserviceaccount.com', 'predictions-bq-export-prod.iam.gserviceaccount.com', 'system.gserviceaccount.com')
DEFAULT_SERVICE_ACCOUNT_DOMAINS = ('appspot.gserviceaccount.com', 'developer.gserviceaccount.com')
def is_service_account_existing(email: str, billing_project_id: str) -> bool:
688def is_service_account_existing(email: str, billing_project_id: str) -> bool:
689  """Verify that a service account exists.
690
691  If we get a non-404 API error when retrieving the service account, we will assume
692  that the service account exists, not to throw false positives (but
693  a warning will be printed out).
694  """
695  # Make sure that the service account is fetched (this is also
696  # called by get_project_policy).
697  _batch_fetch_service_accounts([email], billing_project_id)
698  return email not in _service_account_cache_is_not_found

Verify that a service account exists.

If we get a non-404 API error when retrieving the service account, we will assume that the service account exists, not to throw false positives (but a warning will be printed out).

def is_service_account_enabled(email: str, billing_project_id: str) -> bool:
701def is_service_account_enabled(email: str, billing_project_id: str) -> bool:
702  """Verify that a service account exists and is enabled.
703
704  If we get an API error when retrieving the service account, we will assume
705  that the service account is enabled, not to throw false positives (but
706  a warning will be printed out).
707  """
708  _batch_fetch_service_accounts([email], billing_project_id)
709  return (email not in _service_account_cache_is_not_found) and \
710      not (email in _service_account_cache and _service_account_cache[email].disabled)

Verify that a service account exists and is enabled.

If we get an API error when retrieving the service account, we will assume that the service account is enabled, not to throw false positives (but a warning will be printed out).

class ServiceAccountIAMPolicy(BaseIAMPolicy):
713class ServiceAccountIAMPolicy(BaseIAMPolicy):
714
715  def _is_resource_permission(self, permission):
716    return True

Common class for IAM policies

@caching.cached_api_call(in_memory=True)
def get_service_account_iam_policy( project_id: str, service_account: str) -> ServiceAccountIAMPolicy:
719@caching.cached_api_call(in_memory=True)
720def get_service_account_iam_policy(
721    project_id: str, service_account: str) -> ServiceAccountIAMPolicy:
722  """Returns an IAM policy for a service account"""
723
724  resource_name = f'projects/{project_id}/serviceAccounts/{service_account}'
725
726  iam_api = apis.get_api('iam', 'v1', project_id)
727  request = iam_api.projects().serviceAccounts().getIamPolicy(
728      resource=resource_name)
729
730  return fetch_iam_policy(request, ServiceAccountIAMPolicy, project_id,
731                          resource_name)

Returns an IAM policy for a service account

@caching.cached_api_call(in_memory=True)
def get_service_account_list(project_id: str) -> List[ServiceAccount]:
734@caching.cached_api_call(in_memory=True)
735def get_service_account_list(project_id: str) -> List[ServiceAccount]:
736  """Returns list of service accounts"""
737
738  iam_api = apis.get_api('iam', 'v1', project_id)
739  project_name = f'projects/{project_id}'
740  request = iam_api.projects().serviceAccounts().list(name=project_name)
741  try:
742    response = request.execute(num_retries=config.API_RETRIES)
743  except googleapiclient.errors.HttpError as err:
744    raise utils.GcpApiError(err) from err
745  return [
746      ServiceAccount(project_id, service_account)
747      for service_account in response.get('accounts', [])
748  ]

Returns list of service accounts