gcpdiag.queries.looker
Queries related to GCP Looker Core.
class
Instance(gcpdiag.models.Resource):
28class Instance(models.Resource): 29 """Represents a Looker Core Instance. 30 31 https://cloud.google.com/looker/docs/reference/rest/v1/projects.locations.instances#Instance 32 """ 33 34 _resource_data: Dict 35 master_version: Version 36 37 def __init__(self, project_id, resource_data): 38 super().__init__(project_id=project_id) 39 self._resource_data = resource_data 40 41 @property 42 def name(self) -> str: 43 return self._resource_data['name'] 44 45 @property 46 def create_time(self) -> str: 47 return self._resource_data['createTime'] 48 49 @property 50 def update_time(self) -> str: 51 return self._resource_data['updateTime'] 52 53 @property 54 def status(self) -> str: 55 return self._resource_data['state'] 56 57 @property 58 def platform_edition(self) -> str: 59 return self._resource_data['platformEdition'] 60 61 @property 62 def looker_version(self) -> str: 63 return self._resource_data['lookerVersion'] 64 65 @property 66 def egress_public_ip(self) -> str: 67 return self._resource_data['egressPublicIp'] 68 69 @property 70 def ingress_public_ip(self) -> str: 71 return self._resource_data['ingressPublicIp'] 72 73 @property 74 def looker_uri(self) -> str: 75 return self._resource_data['lookerUri'] 76 77 @property 78 def public_ip_enabled(self) -> str: 79 return self._resource_data['publicIpEnabled'] 80 81 @property 82 def full_path(self) -> str: 83 return self._resource_data['name']
Represents a Looker Core Instance.
https://cloud.google.com/looker/docs/reference/rest/v1/projects.locations.instances#Instance
@caching.cached_api_call
def
get_instances( context: gcpdiag.models.Context) -> Dict[str, Instance]:
86@caching.cached_api_call 87def get_instances(context: models.Context) -> Dict[str, Instance]: 88 """Get a list of Instances from the given GCP project""" 89 instances: Dict[str, Instance] = {} 90 if not apis.is_enabled(context.project_id, 'looker'): 91 return instances 92 looker_api = apis.get_api('looker', 'v1', context.project_id) 93 l_filter = 'projects/' + context.project_id 94 try: 95 logging.info('fetching list of all locations in given project %s', 96 context.project_id) 97 for l in apis_utils.list_all( 98 request=looker_api.projects().locations().list(name=l_filter), 99 next_function=looker_api.projects().locations().list_next, 100 response_keyword='locations'): 101 logging.info('fetching list of all locations in given location %s', 102 l['locationId']) 103 i_filter = 'projects/' + context.project_id + '/locations/' + l[ 104 'locationId'] 105 for i in apis_utils.list_all( 106 request=looker_api.projects().locations().instances().list( 107 parent=i_filter), 108 next_function=looker_api.projects().locations().instances().list_next, 109 response_keyword='instances'): 110 111 if not context.match_project_resource(resource=i.get('name', '')): 112 continue 113 i = Instance(project_id=context.project_id, resource_data=i) 114 instances[i.name] = i 115 except googleapiclient.errors.HttpError as err: 116 logging.error('failed to list instances: %s', err) 117 raise utils.GcpApiError(err) from err 118 return instances
Get a list of Instances from the given GCP project
@caching.cached_api_call
def
get_only_instances( context: gcpdiag.models.Context) -> Dict[str, Instance]:
121@caching.cached_api_call 122def get_only_instances(context: models.Context) -> Dict[str, Instance]: 123 """Get a list of Instances from the given GCP project""" 124 instances: Dict[str, Instance] = {} 125 if not apis.is_enabled(context.project_id, 'looker'): 126 return instances 127 looker_api = apis.get_api('looker', 'v1', context.project_id) 128 129 logging.info('fetching list of all locations in given project %s', 130 context.project_id) 131 try: 132 i_filter = 'projects/' + context.project_id + '/locations/us-central1' 133 for i in apis_utils.list_all( 134 request=looker_api.projects().locations().instances().list( 135 parent=i_filter), 136 next_function=looker_api.projects().locations().instances().list_next, 137 response_keyword='instances'): 138 139 if not context.match_project_resource(resource=i.get('name', '')): 140 continue 141 i = Instance(project_id=context.project_id, resource_data=i) 142 instances[i.name] = i 143 except googleapiclient.errors.HttpError as err: 144 logging.error('failed to list instances: %s', err) 145 raise utils.GcpApiError(err) from err 146 return instances
Get a list of Instances from the given GCP project