gcpdiag.queries.cloudasset

Queries related to GCP Cloud Asset Inventory.
class AssetResource(gcpdiag.models.Resource):
29class AssetResource(models.Resource):
30  """Represents Resource Retrieved from the Cloud Asset Inventory."""
31  _resource_data: dict
32
33  def __init__(self, project_id, resource_data):
34    super().__init__(project_id=project_id)
35    self._resource_data = resource_data
36
37  @property
38  def name(self) -> str:
39    m = re.search(r'//(.+)', self._resource_data['name'])
40    if not m:
41      raise RuntimeError('can\'t determine name of service %s' %
42                         (self._resource_data['name']))
43    return m.group(1)
44
45  @property
46  def full_path(self) -> str:
47    return self.name
48
49  @property
50  def asset_type(self) -> str:
51    return self._resource_data['assetType']

Represents Resource Retrieved from the Cloud Asset Inventory.

AssetResource(project_id, resource_data)
33  def __init__(self, project_id, resource_data):
34    super().__init__(project_id=project_id)
35    self._resource_data = resource_data
name: str
37  @property
38  def name(self) -> str:
39    m = re.search(r'//(.+)', self._resource_data['name'])
40    if not m:
41      raise RuntimeError('can\'t determine name of service %s' %
42                         (self._resource_data['name']))
43    return m.group(1)
full_path: str
45  @property
46  def full_path(self) -> str:
47    return self.name

Returns the full path of this resource.

Example: 'projects/gcpdiag-gke-1-9b90/zones/europe-west4-a/clusters/gke1'

asset_type: str
49  @property
50  def asset_type(self) -> str:
51    return self._resource_data['assetType']
@caching.cached_api_call
def search_all_resources( project_id: str, asset_type: Optional[str] = None, query: Optional[str] = None) -> Mapping[str, AssetResource]:
54@caching.cached_api_call
55def search_all_resources(
56    project_id: str,
57    asset_type: Optional[str] = None,
58    query: Optional[str] = None,
59) -> Mapping[str, AssetResource]:
60  """Searches all resources in the project."""
61  resources: Dict[str, AssetResource] = {}
62
63  if not apis.is_enabled(project_id, 'cloudasset'):
64    return resources
65  cloudasset_api = apis.get_api('cloudasset', 'v1', project_id)
66  logging.info('fetching list of resources in the project %s', project_id)
67  request = cloudasset_api.v1().searchAllResources(
68      scope=f'projects/{project_id}', assetTypes=asset_type, query=query)
69  response = request.execute(num_retries=config.API_RETRIES)
70  try:
71    if 'results' in response:
72      for resource in response['results']:
73        resources[resource['name']] = AssetResource(project_id, resource)
74  except googleapiclient.errors.HttpError as err:
75    raise utils.GcpApiError(err) from err
76  return resources

Searches all resources in the project.