gcpdiag.queries.kms

Queries related to GCP Cloud Key Management.
class CryptoKey(gcpdiag.models.Resource):
28class CryptoKey(models.Resource):
29  """Represents a KMS Crypto Key.
30
31  See also the API documentation:
32  https://cloud.google.com/kms/docs/reference/rest/v1/projects.locations.keyRings.cryptoKeys
33  """
34
35  @property
36  def full_path(self) -> str:
37    return self._resource_data['name']
38
39  @property
40  def name(self) -> str:
41    return self._resource_data['name']
42
43  def is_destroyed(self) -> bool:
44    return self._resource_data['primary'].get('state') == 'DESTROYED'
45
46  def is_enabled(self) -> bool:
47    return self._resource_data['primary'].get('state') == 'ENABLED'
48
49  def __init__(self, project_id, resource_data):
50    super().__init__(project_id=project_id)
51    self._resource_data = resource_data

Represents a KMS Crypto Key.

See also the API documentation: https://cloud.google.com/kms/docs/reference/rest/v1/projects.locations.keyRings.cryptoKeys

CryptoKey(project_id, resource_data)
49  def __init__(self, project_id, resource_data):
50    super().__init__(project_id=project_id)
51    self._resource_data = resource_data
full_path: str
35  @property
36  def full_path(self) -> str:
37    return self._resource_data['name']

Returns the full path of this resource.

Example: 'projects/gcpdiag-gke-1-9b90/zones/europe-west4-a/clusters/gke1'

name: str
39  @property
40  def name(self) -> str:
41    return self._resource_data['name']
def is_destroyed(self) -> bool:
43  def is_destroyed(self) -> bool:
44    return self._resource_data['primary'].get('state') == 'DESTROYED'
def is_enabled(self) -> bool:
46  def is_enabled(self) -> bool:
47    return self._resource_data['primary'].get('state') == 'ENABLED'
class KMSCryptoKeyIAMPolicy(gcpdiag.queries.iam.BaseIAMPolicy):
54class KMSCryptoKeyIAMPolicy(iam.BaseIAMPolicy):
55
56  def _is_resource_permission(self, permission):
57    return True

Common class for IAM policies

@caching.cached_api_call
def get_crypto_key(key_name: str) -> CryptoKey:
60@caching.cached_api_call
61def get_crypto_key(key_name: str) -> CryptoKey:
62  """Get a Crypto Key object by its resource name, caching the result."""
63
64  project_id = utils.get_project_by_res_name(key_name)
65  kms_api = apis.get_api('cloudkms', 'v1', project_id)
66  query = kms_api.projects().locations().keyRings().cryptoKeys().get(
67      name=key_name)
68  logging.info('fetching KMS Key %s in project %s',
69               utils.extract_value_from_res_name(key_name, 'cryptoKeys'),
70               project_id)
71  try:
72    resource_data = query.execute(num_retries=config.API_RETRIES)
73  except googleapiclient.errors.HttpError as err:
74    raise GcpApiError(err) from err
75  return CryptoKey(project_id, resource_data)

Get a Crypto Key object by its resource name, caching the result.

@caching.cached_api_call
def get_crypto_key_iam_policy(key_name: str) -> KMSCryptoKeyIAMPolicy:
78@caching.cached_api_call
79def get_crypto_key_iam_policy(key_name: str) -> KMSCryptoKeyIAMPolicy:
80
81  project_id = utils.get_project_by_res_name(key_name)
82  kms_api = apis.get_api('cloudkms', 'v1', project_id)
83
84  query = kms_api.projects().locations().keyRings().cryptoKeys().getIamPolicy(
85      resource=key_name)
86  return iam.fetch_iam_policy(query, KMSCryptoKeyIAMPolicy, project_id,
87                              key_name)