gcpdiag.queries.artifact_registry

Queries related to GCP Artifact Registry
class ArtifactRegistryIAMPolicy(gcpdiag.queries.iam.BaseIAMPolicy):
26class ArtifactRegistryIAMPolicy(iam.BaseIAMPolicy):
27
28  def _is_resource_permission(self, permission):
29    return True

Common class for IAM policies

@dataclasses.dataclass
class ProjectSettings:
32@dataclasses.dataclass
33class ProjectSettings:
34  legacy_redirect: bool
ProjectSettings(legacy_redirect: bool)
legacy_redirect: bool
@caching.cached_api_call(in_memory=True)
def get_registry_iam_policy( context: gcpdiag.models.Context, location: str, registry_name: str) -> ArtifactRegistryIAMPolicy:
37@caching.cached_api_call(in_memory=True)
38def get_registry_iam_policy(context: models.Context, location: str,
39                            registry_name: str) -> ArtifactRegistryIAMPolicy:
40  project_id = context.project_id
41  ar_api = apis.get_api('artifactregistry', 'v1', project_id)
42  registry_id = 'projects/{}/locations/{}/repositories/{}'.format(
43      project_id, location, registry_name)
44  request = ar_api.projects().locations().repositories().getIamPolicy(
45      resource=registry_id)
46  return iam.fetch_iam_policy(request, ArtifactRegistryIAMPolicy, project_id,
47                              registry_id, context)
@caching.cached_api_call(in_memory=True)
def get_project_settings(project_id: str) -> ProjectSettings:
50@caching.cached_api_call(in_memory=True)
51def get_project_settings(project_id: str) -> ProjectSettings:
52  ar_api = apis.get_api('artifactregistry', 'v1', project_id)
53  response = ar_api.projects().getProjectSettings(
54      name=f'projects/{project_id}/projectSettings').execute(
55          num_retries=config.API_RETRIES)
56  return ProjectSettings(legacy_redirect=response.get('legacyRedirectionState')
57                         == 'REDIRECTION_FROM_GCR_IO_ENABLED')