gcpdiag.queries.gce
39class InstanceTemplate(models.Resource): 40 """Represents a GCE Instance Template.""" 41 42 _resource_data: dict 43 44 def __init__(self, project_id, resource_data): 45 super().__init__(project_id=project_id) 46 self._resource_data = resource_data 47 48 @property 49 def self_link(self) -> str: 50 return self._resource_data['selfLink'] 51 52 @property 53 def full_path(self) -> str: 54 result = re.match(r'https://www.googleapis.com/compute/v1/(.*)', 55 self.self_link) 56 if result: 57 return result.group(1) 58 else: 59 return f'>> {self.self_link}' 60 61 @property 62 def short_path(self) -> str: 63 path = self.project_id + '/' + self.name 64 return path 65 66 @property 67 def name(self) -> str: 68 return self._resource_data['name'] 69 70 @property 71 def tags(self) -> List[str]: 72 return self._resource_data['properties'].get('tags', {}).get('items', []) 73 74 @property 75 def service_account(self) -> Optional[str]: 76 sa_list = self._resource_data['properties'].get('serviceAccounts', []) 77 if not sa_list: 78 return None 79 email = sa_list[0]['email'] 80 if email == 'default': 81 project_nr = crm.get_project(self._project_id).number 82 return f'{project_nr}-compute@developer.gserviceaccount.com' 83 return email 84 85 @property 86 def network(self) -> network_q.Network: 87 return network_q.get_network_from_url( 88 self._resource_data['properties']['networkInterfaces'][0]['network']) 89 90 @property 91 def subnetwork(self) -> network_q.Subnetwork: 92 subnet_url = self._resource_data['properties']['networkInterfaces'][0][ 93 'subnetwork'] 94 return self.network.subnetworks[subnet_url] 95 96 def get_metadata(self, key: str) -> str: 97 for item in self._resource_data['properties']['metadata']['items']: 98 if item['key'] == key: 99 return item['value'] 100 return ''
Represents a GCE Instance Template.
52 @property 53 def full_path(self) -> str: 54 result = re.match(r'https://www.googleapis.com/compute/v1/(.*)', 55 self.self_link) 56 if result: 57 return result.group(1) 58 else: 59 return f'>> {self.self_link}'
Returns the full path of this resource.
Example: 'projects/gcpdiag-gke-1-9b90/zones/europe-west4-a/clusters/gke1'
61 @property 62 def short_path(self) -> str: 63 path = self.project_id + '/' + self.name 64 return path
Returns the short name for this resource.
Note that it isn't clear from this name what kind of resource it is.
Example: 'gke1'
74 @property 75 def service_account(self) -> Optional[str]: 76 sa_list = self._resource_data['properties'].get('serviceAccounts', []) 77 if not sa_list: 78 return None 79 email = sa_list[0]['email'] 80 if email == 'default': 81 project_nr = crm.get_project(self._project_id).number 82 return f'{project_nr}-compute@developer.gserviceaccount.com' 83 return email
Inherited Members
- gcpdiag.models.Resource
- project_id
103class InstanceGroup(models.Resource): 104 """Represents a GCE instance group.""" 105 106 _resource_data: dict 107 108 def __init__(self, project_id, resource_data): 109 super().__init__(project_id=project_id) 110 self._resource_data = resource_data 111 112 @property 113 def full_path(self) -> str: 114 result = re.match( 115 r'https://www.googleapis.com/compute/v1/(.*)', 116 self._resource_data['selfLink'], 117 ) 118 if result: 119 return result.group(1) 120 else: 121 return '>> ' + self._resource_data['selfLink'] 122 123 @property 124 def short_path(self) -> str: 125 path = self.project_id + '/' + self.name 126 return path 127 128 @property 129 def self_link(self) -> str: 130 return self._resource_data['selfLink'] 131 132 @property 133 def name(self) -> str: 134 return self._resource_data['name'] 135 136 @property 137 def named_ports(self) -> List[dict]: 138 if 'namedPorts' in self._resource_data: 139 return self._resource_data['namedPorts'] 140 return [] 141 142 def has_named_ports(self) -> bool: 143 if 'namedPorts' in self._resource_data: 144 return True 145 return False
Represents a GCE instance group.
112 @property 113 def full_path(self) -> str: 114 result = re.match( 115 r'https://www.googleapis.com/compute/v1/(.*)', 116 self._resource_data['selfLink'], 117 ) 118 if result: 119 return result.group(1) 120 else: 121 return '>> ' + self._resource_data['selfLink']
Returns the full path of this resource.
Example: 'projects/gcpdiag-gke-1-9b90/zones/europe-west4-a/clusters/gke1'
123 @property 124 def short_path(self) -> str: 125 path = self.project_id + '/' + self.name 126 return path
Returns the short name for this resource.
Note that it isn't clear from this name what kind of resource it is.
Example: 'gke1'
Inherited Members
- gcpdiag.models.Resource
- project_id
148class ManagedInstanceGroup(models.Resource): 149 """Represents a GCE managed instance group.""" 150 151 _resource_data: dict 152 _region: Optional[str] 153 154 def __init__(self, project_id, resource_data): 155 super().__init__(project_id=project_id) 156 self._resource_data = resource_data 157 self._region = None 158 159 @property 160 def full_path(self) -> str: 161 result = re.match( 162 r'https://www.googleapis.com/compute/v1/(.*)', 163 self._resource_data['selfLink'], 164 ) 165 if result: 166 return result.group(1) 167 else: 168 return '>> ' + self._resource_data['selfLink'] 169 170 @property 171 def short_path(self) -> str: 172 path = self.project_id + '/' + self.name 173 return path 174 175 def is_gke(self) -> bool: 176 """Is this managed instance group part of a GKE cluster? 177 178 Note that the results are based on heuristics (the mig name), 179 which is not ideal. 180 """ 181 182 # gke- is normal GKE, gk3- is GKE autopilot 183 return self.name.startswith('gke-') or self.name.startswith('gk3-') 184 185 @property 186 def self_link(self) -> str: 187 return self._resource_data['selfLink'] 188 189 @property 190 def name(self) -> str: 191 return self._resource_data['name'] 192 193 @property 194 def region(self) -> str: 195 if self._region is None: 196 if 'region' in self._resource_data: 197 m = re.search(r'/regions/([^/]+)$', self._resource_data['region']) 198 if not m: 199 raise RuntimeError("can't determine region of mig %s (%s)" % 200 (self.name, self._resource_data['region'])) 201 self._region = m.group(1) 202 elif 'zone' in self._resource_data: 203 m = re.search(r'/zones/([^/]+)$', self._resource_data['zone']) 204 if not m: 205 raise RuntimeError("can't determine region of mig %s (%s)" % 206 (self.name, self._resource_data['region'])) 207 zone = m.group(1) 208 self._region = utils.zone_region(zone) 209 else: 210 raise RuntimeError( 211 f"can't determine region of mig {self.name}, both region and zone" 212 " aren't set!") 213 return self._region 214 215 def count_no_action_instances(self) -> int: 216 """number of instances in the mig that are running and have no scheduled actions.""" 217 return self._resource_data['currentActions']['none'] 218 219 def is_instance_member(self, project_id: str, region: str, 220 instance_name: str): 221 """Given the project_id, region and instance name, is it a member of this MIG?""" 222 return (self.project_id == project_id and self.region == region and 223 instance_name.startswith(self._resource_data['baseInstanceName'])) 224 225 @property 226 def template(self) -> InstanceTemplate: 227 if 'instanceTemplate' not in self._resource_data: 228 raise RuntimeError('instanceTemplate not set for MIG {self.name}') 229 m = re.match( 230 r'https://www.googleapis.com/compute/v1/projects/([^/]+)/global/instanceTemplates/([^/]+)', 231 self._resource_data['instanceTemplate'], 232 ) 233 if not m: 234 raise RuntimeError("can't parse instanceTemplate: %s" % 235 self._resource_data['instanceTemplate']) 236 (project_id, template_name) = (m.group(1), m.group(2)) 237 templates = get_instance_templates(project_id) 238 if template_name not in templates: 239 raise RuntimeError( 240 f'instanceTemplate {template_name} for MIG {self.name} not found') 241 return templates[template_name]
Represents a GCE managed instance group.
159 @property 160 def full_path(self) -> str: 161 result = re.match( 162 r'https://www.googleapis.com/compute/v1/(.*)', 163 self._resource_data['selfLink'], 164 ) 165 if result: 166 return result.group(1) 167 else: 168 return '>> ' + self._resource_data['selfLink']
Returns the full path of this resource.
Example: 'projects/gcpdiag-gke-1-9b90/zones/europe-west4-a/clusters/gke1'
170 @property 171 def short_path(self) -> str: 172 path = self.project_id + '/' + self.name 173 return path
Returns the short name for this resource.
Note that it isn't clear from this name what kind of resource it is.
Example: 'gke1'
175 def is_gke(self) -> bool: 176 """Is this managed instance group part of a GKE cluster? 177 178 Note that the results are based on heuristics (the mig name), 179 which is not ideal. 180 """ 181 182 # gke- is normal GKE, gk3- is GKE autopilot 183 return self.name.startswith('gke-') or self.name.startswith('gk3-')
Is this managed instance group part of a GKE cluster?
Note that the results are based on heuristics (the mig name), which is not ideal.
193 @property 194 def region(self) -> str: 195 if self._region is None: 196 if 'region' in self._resource_data: 197 m = re.search(r'/regions/([^/]+)$', self._resource_data['region']) 198 if not m: 199 raise RuntimeError("can't determine region of mig %s (%s)" % 200 (self.name, self._resource_data['region'])) 201 self._region = m.group(1) 202 elif 'zone' in self._resource_data: 203 m = re.search(r'/zones/([^/]+)$', self._resource_data['zone']) 204 if not m: 205 raise RuntimeError("can't determine region of mig %s (%s)" % 206 (self.name, self._resource_data['region'])) 207 zone = m.group(1) 208 self._region = utils.zone_region(zone) 209 else: 210 raise RuntimeError( 211 f"can't determine region of mig {self.name}, both region and zone" 212 " aren't set!") 213 return self._region
215 def count_no_action_instances(self) -> int: 216 """number of instances in the mig that are running and have no scheduled actions.""" 217 return self._resource_data['currentActions']['none']
number of instances in the mig that are running and have no scheduled actions.
219 def is_instance_member(self, project_id: str, region: str, 220 instance_name: str): 221 """Given the project_id, region and instance name, is it a member of this MIG?""" 222 return (self.project_id == project_id and self.region == region and 223 instance_name.startswith(self._resource_data['baseInstanceName']))
Given the project_id, region and instance name, is it a member of this MIG?
225 @property 226 def template(self) -> InstanceTemplate: 227 if 'instanceTemplate' not in self._resource_data: 228 raise RuntimeError('instanceTemplate not set for MIG {self.name}') 229 m = re.match( 230 r'https://www.googleapis.com/compute/v1/projects/([^/]+)/global/instanceTemplates/([^/]+)', 231 self._resource_data['instanceTemplate'], 232 ) 233 if not m: 234 raise RuntimeError("can't parse instanceTemplate: %s" % 235 self._resource_data['instanceTemplate']) 236 (project_id, template_name) = (m.group(1), m.group(2)) 237 templates = get_instance_templates(project_id) 238 if template_name not in templates: 239 raise RuntimeError( 240 f'instanceTemplate {template_name} for MIG {self.name} not found') 241 return templates[template_name]
Inherited Members
- gcpdiag.models.Resource
- project_id
244class SerialPortOutput: 245 """Represents the full Serial Port Output (/dev/ttyS0 or COM1) of an instance. 246 247 contents is the full 1MB of the instance. 248 """ 249 250 _project_id: str 251 _instance_id: str 252 _contents: List[str] 253 254 def __init__(self, project_id, instance_id, contents): 255 self._project_id = project_id 256 self._instance_id = instance_id 257 self._contents = contents 258 259 @property 260 def contents(self) -> List[str]: 261 return self._contents 262 263 @property 264 def instance_id(self) -> str: 265 return self._instance_id
Represents the full Serial Port Output (/dev/ttyS0 or COM1) of an instance.
contents is the full 1MB of the instance.
268class Instance(models.Resource): 269 """Represents a GCE instance.""" 270 271 _resource_data: dict 272 _region: Optional[str] 273 274 def __init__(self, project_id, resource_data): 275 super().__init__(project_id=project_id) 276 self._resource_data = resource_data 277 self._metadata_dict = None 278 self._region = None 279 280 @property 281 def id(self) -> str: 282 return self._resource_data['id'] 283 284 @property 285 def name(self) -> str: 286 return self._resource_data['name'] 287 288 @property 289 def full_path(self) -> str: 290 result = re.match( 291 r'https://www.googleapis.com/compute/v1/(.*)', 292 self._resource_data['selfLink'], 293 ) 294 if result: 295 return result.group(1) 296 else: 297 return '>> ' + self._resource_data['selfLink'] 298 299 @property 300 def short_path(self) -> str: 301 # Note: instance names must be unique per project, so no need to add the zone. 302 path = self.project_id + '/' + self.name 303 return path 304 305 @property 306 def creation_timestamp(self) -> datetime: 307 """VM creation time, as a *naive* `datetime` object.""" 308 return (datetime.fromisoformat( 309 self._resource_data['creationTimestamp']).astimezone( 310 timezone.utc).replace(tzinfo=None)) 311 312 @property 313 def region(self) -> str: 314 if self._region is None: 315 if 'zone' in self._resource_data: 316 m = re.search(r'/zones/([^/]+)$', self._resource_data['zone']) 317 if not m: 318 raise RuntimeError("can't determine region of instance %s (%s)" % 319 (self.name, self._resource_data['region'])) 320 zone = m.group(1) 321 self._region = utils.zone_region(zone) 322 else: 323 raise RuntimeError( 324 f"can't determine region of instance {self.name}, zone isn't set!") 325 return self._region 326 327 @property 328 def zone(self) -> str: 329 zone_uri = self._resource_data['zone'] 330 m = re.search(r'/zones/([^/]+)$', zone_uri) 331 if m: 332 return m.group(1) 333 else: 334 raise RuntimeError(f"can't determine zone of instance {self.name}") 335 336 @property 337 def disks(self) -> List[str]: 338 if 'disks' in self._resource_data: 339 return self._resource_data['disks'] 340 return [] 341 342 @property 343 def startrestricted(self) -> bool: 344 return self._resource_data['startRestricted'] 345 346 def is_serial_port_logging_enabled(self) -> bool: 347 value = self.get_metadata('serial-port-logging-enable') 348 return bool(value and value.upper() in POSITIVE_BOOL_VALUES) 349 350 def is_oslogin_enabled(self) -> bool: 351 value = self.get_metadata('enable-oslogin') 352 return bool(value and value.upper() in POSITIVE_BOOL_VALUES) 353 354 def is_metadata_enabled(self, metadata_name) -> bool: 355 """Use to check for common boolen metadata value""" 356 value = self.get_metadata(metadata_name) 357 return bool(value and value.upper() in POSITIVE_BOOL_VALUES) 358 359 def has_label(self, label) -> bool: 360 return label in self.labels 361 362 def is_dataproc_instance(self) -> bool: 363 return self.has_label(DATAPROC_LABEL) 364 365 def is_gke_node(self) -> bool: 366 return self.has_label(GKE_LABEL) 367 368 def is_preemptible_vm(self) -> bool: 369 return ('scheduling' in self._resource_data and 370 'preemptible' in self._resource_data['scheduling'] and 371 self._resource_data['scheduling']['preemptible']) 372 373 def is_windows_machine(self) -> bool: 374 if 'disks' in self._resource_data: 375 disks = next(iter(self._resource_data['disks'])) 376 if 'guestOsFeatures' in disks: 377 if 'WINDOWS' in [t['type'] for t in iter(disks['guestOsFeatures'])]: 378 return True 379 return False 380 381 def is_public_machine(self) -> bool: 382 if 'networkInterfaces' in self._resource_data: 383 return 'natIP' in str(self._resource_data['networkInterfaces']) 384 return False 385 386 def machine_type(self): 387 if 'machineType' in self._resource_data: 388 return self._resource_data['machineType'] 389 return None 390 391 def check_license(self, licenses: List[str]) -> bool: 392 """Checks that a licence is contained in a given license list""" 393 if 'disks' in self._resource_data: 394 for disk in self._resource_data['disks']: 395 if 'license' in str(disk): 396 for license_ in licenses: 397 for attached_license in disk['licenses']: 398 if license_ == attached_license.partition('/global/licenses/')[2]: 399 return True 400 return False 401 402 @property 403 def network(self) -> network_q.Network: 404 # 'https://www.googleapis.com/compute/v1/projects/gcpdiag-gce1-aaaa/global/networks/default' 405 network_string = self._resource_data['networkInterfaces'][0]['network'] 406 m = re.match(r'^.+/projects/([^/]+)/global/networks/([^/]+)$', 407 network_string) 408 if not m: 409 raise RuntimeError("can't parse network string: %s" % network_string) 410 return network_q.get_network(m.group(1), m.group(2)) 411 412 @property 413 def network_ips(self) -> List[network_q.IPv4AddrOrIPv6Addr]: 414 return [ 415 ipaddress.ip_address(nic['networkIP']) 416 for nic in self._resource_data['networkInterfaces'] 417 ] 418 419 @property 420 def get_network_interfaces(self): 421 return self._resource_data['networkInterfaces'] 422 423 @property 424 def subnetworks(self) -> List[network_q.Subnetwork]: 425 subnetworks = [] 426 for nic in self._resource_data['networkInterfaces']: 427 subnetworks.append(network_q.get_subnetwork_from_url(nic['subnetwork'])) 428 return subnetworks 429 430 @property 431 def routes(self) -> List[network_q.Route]: 432 routes = [] 433 for nic in self._resource_data['networkInterfaces']: 434 for route in network_q.get_routes(self.project_id): 435 if nic['network'] == route.network: 436 if route.tags == []: 437 routes.append(route) 438 continue 439 else: 440 temp = [x for x in self.tags if x in route.tags] 441 if len(temp) > 0: 442 routes.append(route) 443 return routes 444 445 def get_network_ip_for_instance_interface( 446 self, network: str) -> Optional[network_q.IPv4NetOrIPv6Net]: 447 """Get the network ip for a nic given a network name""" 448 for nic in self._resource_data['networkInterfaces']: 449 if nic.get('network') == network: 450 return ipaddress.ip_network(nic.get('networkIP')) 451 return None 452 453 def secure_boot_enabled(self) -> bool: 454 if 'shieldedInstanceConfig' in self._resource_data: 455 return self._resource_data['shieldedInstanceConfig']['enableSecureBoot'] 456 return False 457 458 @property 459 def access_scopes(self) -> List[str]: 460 if 'serviceAccounts' in self._resource_data: 461 saccts = self._resource_data['serviceAccounts'] 462 if isinstance(saccts, list) and len(saccts) >= 1: 463 return saccts[0].get('scopes', []) 464 return [] 465 466 @property 467 def service_account(self) -> Optional[str]: 468 if 'serviceAccounts' in self._resource_data: 469 saccts = self._resource_data['serviceAccounts'] 470 if isinstance(saccts, list) and len(saccts) >= 1: 471 return saccts[0]['email'] 472 return None 473 474 @property 475 def tags(self) -> List[str]: 476 if 'tags' in self._resource_data: 477 if 'items' in self._resource_data['tags']: 478 return self._resource_data['tags']['items'] 479 return [] 480 481 def get_metadata(self, key: str) -> str: 482 if not self._metadata_dict: 483 self._metadata_dict = {} 484 if ('metadata' in self._resource_data and 485 'items' in self._resource_data['metadata']): 486 for item in self._resource_data['metadata']['items']: 487 if 'key' in item and 'value' in item: 488 self._metadata_dict[item['key']] = item['value'] 489 project_metadata = get_project_metadata(self.project_id) 490 return self._metadata_dict.get(key, project_metadata.get(key)) 491 492 @property 493 def status(self) -> str: 494 """VM Status""" 495 return self._resource_data.get('status', None) 496 497 @property 498 def is_running(self) -> bool: 499 """VM Status is indicated as running""" 500 return self._resource_data.get('status', False) == 'RUNNING' 501 502 @property # type: ignore 503 @caching.cached_api_call(in_memory=True) 504 def mig(self) -> ManagedInstanceGroup: 505 """Return ManagedInstanceGroup that owns this instance. 506 507 Throws AttributeError in case it isn't MIG-managed. 508 """ 509 510 created_by = self.get_metadata('created-by') 511 if created_by is None: 512 raise AttributeError(f'instance {self.id} is not managed by a mig') 513 514 # Example created-by: 515 # pylint: disable=line-too-long 516 # "projects/12340002/zones/europe-west4-a/instanceGroupManagers/gke-gke1-default-pool-e5e20a34-grp" 517 # (note how it uses a project number and not a project id...) 518 created_by_match = re.match( 519 r'projects/([^/]+)/((?:regions|zones)/[^/]+/instanceGroupManagers/[^/]+)$', 520 created_by, 521 ) 522 if not created_by_match: 523 raise AttributeError(f'instance {self.id} is not managed by a mig' 524 f' (created-by={created_by})') 525 project = crm.get_project(created_by_match.group(1)) 526 527 mig_self_link = ('https://www.googleapis.com/compute/v1/' 528 f'projects/{project.id}/{created_by_match.group(2)}') 529 530 # Try to find a matching mig. 531 for mig in get_managed_instance_groups( 532 models.Context(project_id=self.project_id)).values(): 533 if mig.self_link == mig_self_link: 534 return mig 535 536 raise AttributeError(f'instance {self.id} is not managed by a mig') 537 538 @property 539 def labels(self) -> dict: 540 return self._resource_data.get('labels', {})
Represents a GCE instance.
288 @property 289 def full_path(self) -> str: 290 result = re.match( 291 r'https://www.googleapis.com/compute/v1/(.*)', 292 self._resource_data['selfLink'], 293 ) 294 if result: 295 return result.group(1) 296 else: 297 return '>> ' + self._resource_data['selfLink']
Returns the full path of this resource.
Example: 'projects/gcpdiag-gke-1-9b90/zones/europe-west4-a/clusters/gke1'
299 @property 300 def short_path(self) -> str: 301 # Note: instance names must be unique per project, so no need to add the zone. 302 path = self.project_id + '/' + self.name 303 return path
Returns the short name for this resource.
Note that it isn't clear from this name what kind of resource it is.
Example: 'gke1'
305 @property 306 def creation_timestamp(self) -> datetime: 307 """VM creation time, as a *naive* `datetime` object.""" 308 return (datetime.fromisoformat( 309 self._resource_data['creationTimestamp']).astimezone( 310 timezone.utc).replace(tzinfo=None))
VM creation time, as a naive datetime
object.
312 @property 313 def region(self) -> str: 314 if self._region is None: 315 if 'zone' in self._resource_data: 316 m = re.search(r'/zones/([^/]+)$', self._resource_data['zone']) 317 if not m: 318 raise RuntimeError("can't determine region of instance %s (%s)" % 319 (self.name, self._resource_data['region'])) 320 zone = m.group(1) 321 self._region = utils.zone_region(zone) 322 else: 323 raise RuntimeError( 324 f"can't determine region of instance {self.name}, zone isn't set!") 325 return self._region
354 def is_metadata_enabled(self, metadata_name) -> bool: 355 """Use to check for common boolen metadata value""" 356 value = self.get_metadata(metadata_name) 357 return bool(value and value.upper() in POSITIVE_BOOL_VALUES)
Use to check for common boolen metadata value
391 def check_license(self, licenses: List[str]) -> bool: 392 """Checks that a licence is contained in a given license list""" 393 if 'disks' in self._resource_data: 394 for disk in self._resource_data['disks']: 395 if 'license' in str(disk): 396 for license_ in licenses: 397 for attached_license in disk['licenses']: 398 if license_ == attached_license.partition('/global/licenses/')[2]: 399 return True 400 return False
Checks that a licence is contained in a given license list
402 @property 403 def network(self) -> network_q.Network: 404 # 'https://www.googleapis.com/compute/v1/projects/gcpdiag-gce1-aaaa/global/networks/default' 405 network_string = self._resource_data['networkInterfaces'][0]['network'] 406 m = re.match(r'^.+/projects/([^/]+)/global/networks/([^/]+)$', 407 network_string) 408 if not m: 409 raise RuntimeError("can't parse network string: %s" % network_string) 410 return network_q.get_network(m.group(1), m.group(2))
430 @property 431 def routes(self) -> List[network_q.Route]: 432 routes = [] 433 for nic in self._resource_data['networkInterfaces']: 434 for route in network_q.get_routes(self.project_id): 435 if nic['network'] == route.network: 436 if route.tags == []: 437 routes.append(route) 438 continue 439 else: 440 temp = [x for x in self.tags if x in route.tags] 441 if len(temp) > 0: 442 routes.append(route) 443 return routes
445 def get_network_ip_for_instance_interface( 446 self, network: str) -> Optional[network_q.IPv4NetOrIPv6Net]: 447 """Get the network ip for a nic given a network name""" 448 for nic in self._resource_data['networkInterfaces']: 449 if nic.get('network') == network: 450 return ipaddress.ip_network(nic.get('networkIP')) 451 return None
Get the network ip for a nic given a network name
481 def get_metadata(self, key: str) -> str: 482 if not self._metadata_dict: 483 self._metadata_dict = {} 484 if ('metadata' in self._resource_data and 485 'items' in self._resource_data['metadata']): 486 for item in self._resource_data['metadata']['items']: 487 if 'key' in item and 'value' in item: 488 self._metadata_dict[item['key']] = item['value'] 489 project_metadata = get_project_metadata(self.project_id) 490 return self._metadata_dict.get(key, project_metadata.get(key))
492 @property 493 def status(self) -> str: 494 """VM Status""" 495 return self._resource_data.get('status', None)
VM Status
497 @property 498 def is_running(self) -> bool: 499 """VM Status is indicated as running""" 500 return self._resource_data.get('status', False) == 'RUNNING'
VM Status is indicated as running
502 @property # type: ignore 503 @caching.cached_api_call(in_memory=True) 504 def mig(self) -> ManagedInstanceGroup: 505 """Return ManagedInstanceGroup that owns this instance. 506 507 Throws AttributeError in case it isn't MIG-managed. 508 """ 509 510 created_by = self.get_metadata('created-by') 511 if created_by is None: 512 raise AttributeError(f'instance {self.id} is not managed by a mig') 513 514 # Example created-by: 515 # pylint: disable=line-too-long 516 # "projects/12340002/zones/europe-west4-a/instanceGroupManagers/gke-gke1-default-pool-e5e20a34-grp" 517 # (note how it uses a project number and not a project id...) 518 created_by_match = re.match( 519 r'projects/([^/]+)/((?:regions|zones)/[^/]+/instanceGroupManagers/[^/]+)$', 520 created_by, 521 ) 522 if not created_by_match: 523 raise AttributeError(f'instance {self.id} is not managed by a mig' 524 f' (created-by={created_by})') 525 project = crm.get_project(created_by_match.group(1)) 526 527 mig_self_link = ('https://www.googleapis.com/compute/v1/' 528 f'projects/{project.id}/{created_by_match.group(2)}') 529 530 # Try to find a matching mig. 531 for mig in get_managed_instance_groups( 532 models.Context(project_id=self.project_id)).values(): 533 if mig.self_link == mig_self_link: 534 return mig 535 536 raise AttributeError(f'instance {self.id} is not managed by a mig')
Return ManagedInstanceGroup that owns this instance.
Throws AttributeError in case it isn't MIG-managed.
Inherited Members
- gcpdiag.models.Resource
- project_id
543class Disk(models.Resource): 544 """Represents a GCE disk.""" 545 546 _resource_data: dict 547 548 def __init__(self, project_id, resource_data): 549 super().__init__(project_id=project_id) 550 self._resource_data = resource_data 551 552 @property 553 def id(self) -> str: 554 return self._resource_data['id'] 555 556 @property 557 def name(self) -> str: 558 return self._resource_data['name'] 559 560 @property 561 def type(self) -> str: 562 disk_type = re.search(r'/diskTypes/([^/]+)$', self._resource_data['type']) 563 if not disk_type: 564 raise RuntimeError("can't determine type of the disk %s (%s)" % 565 (self.name, self._resource_data['type'])) 566 return disk_type.group(1) 567 568 @property 569 def users(self) -> list: 570 pattern = r'/instances/(.+)$' 571 # Extracting the instances 572 instances = [] 573 for i in self._resource_data.get('users', []): 574 m = re.search(pattern, i) 575 if m: 576 instances.append(m.group(1)) 577 return instances 578 579 @property 580 def zone(self) -> str: 581 m = re.search(r'/zones/([^/]+)$', self._resource_data['zone']) 582 if not m: 583 raise RuntimeError("can't determine zone of disk %s (%s)" % 584 (self.name, self._resource_data['zone'])) 585 return m.group(1) 586 587 @property 588 def full_path(self) -> str: 589 result = re.match( 590 r'https://www.googleapis.com/compute/v1/(.*)', 591 self._resource_data['selfLink'], 592 ) 593 if result: 594 return result.group(1) 595 else: 596 return '>> ' + self._resource_data['selfLink'] 597 598 @property 599 def short_path(self) -> str: 600 return f'{self.project_id}/{self.name}' 601 602 @property 603 def bootable(self) -> bool: 604 return 'guestOsFeatures' in self._resource_data 605 606 @property 607 def in_use(self) -> bool: 608 return 'users' in self._resource_data 609 610 @property 611 def has_snapshot_schedule(self) -> bool: 612 return 'resourcePolicies' in self._resource_data
Represents a GCE disk.
587 @property 588 def full_path(self) -> str: 589 result = re.match( 590 r'https://www.googleapis.com/compute/v1/(.*)', 591 self._resource_data['selfLink'], 592 ) 593 if result: 594 return result.group(1) 595 else: 596 return '>> ' + self._resource_data['selfLink']
Returns the full path of this resource.
Example: 'projects/gcpdiag-gke-1-9b90/zones/europe-west4-a/clusters/gke1'
Returns the short name for this resource.
Note that it isn't clear from this name what kind of resource it is.
Example: 'gke1'
Inherited Members
- gcpdiag.models.Resource
- project_id
615@caching.cached_api_call(in_memory=True) 616def get_gce_zones(project_id: str) -> Set[str]: 617 try: 618 gce_api = apis.get_api('compute', 'v1', project_id) 619 logging.info('listing gce zones of project %s', project_id) 620 request = gce_api.zones().list(project=project_id) 621 response = request.execute(num_retries=config.API_RETRIES) 622 if not response or 'items' not in response: 623 return set() 624 return {item['name'] for item in response['items'] if 'name' in item} 625 except googleapiclient.errors.HttpError as err: 626 raise utils.GcpApiError(err) from err
629def get_gce_public_licences(project_id: str) -> List[str]: 630 """Returns a list of licenses based on publicly available image project""" 631 licenses = [] 632 gce_api = apis.get_api('compute', 'v1', project_id) 633 logging.info('listing licenses of project %s', project_id) 634 request = gce_api.licenses().list(project=project_id) 635 while request is not None: 636 response = request.execute() 637 for license_ in response['items']: 638 formatted_license = license_['selfLink'].partition('/global/licenses/')[2] 639 licenses.append(formatted_license) 640 request = gce_api.licenses().list_next(previous_request=request, 641 previous_response=response) 642 return licenses
Returns a list of licenses based on publicly available image project
645def get_instance(project_id: str, zone: str, instance_name: str) -> Instance: 646 """Returns instance object matching instance name and zone""" 647 compute = apis.get_api('compute', 'v1', project_id) 648 request = compute.instances().get(project=project_id, 649 zone=zone, 650 instance=instance_name) 651 652 response = request.execute(num_retries=config.API_RETRIES) 653 return Instance(project_id, resource_data=response)
Returns instance object matching instance name and zone
656@caching.cached_api_call(in_memory=True) 657def get_instances(context: models.Context) -> Mapping[str, Instance]: 658 """Get a list of Instance matching the given context, indexed by instance id.""" 659 660 instances: Dict[str, Instance] = {} 661 if not apis.is_enabled(context.project_id, 'compute'): 662 return instances 663 gce_api = apis.get_api('compute', 'v1', context.project_id) 664 requests = [ 665 gce_api.instances().list(project=context.project_id, zone=zone) 666 for zone in get_gce_zones(context.project_id) 667 ] 668 items = apis_utils.batch_list_all( 669 api=gce_api, 670 requests=requests, 671 next_function=gce_api.instances().list_next, 672 log_text=f'listing gce instances of project {context.project_id}', 673 ) 674 for i in items: 675 result = re.match( 676 r'https://www.googleapis.com/compute/v1/projects/[^/]+/zones/([^/]+)/', 677 i['selfLink'], 678 ) 679 if not result: 680 logging.error("instance %s selfLink didn't match regexp: %s", i['id'], 681 i['selfLink']) 682 continue 683 zone = result.group(1) 684 labels = i.get('labels', {}) 685 resource = i.get('name', '') 686 if not context.match_project_resource( 687 location=zone, labels=labels, resource=resource): 688 continue 689 instances[i['id']] = Instance(project_id=context.project_id, 690 resource_data=i) 691 return instances
Get a list of Instance matching the given context, indexed by instance id.
694@caching.cached_api_call(in_memory=True) 695def get_instance_groups(context: models.Context) -> Mapping[str, InstanceGroup]: 696 """Get a list of InstanceGroups matching the given context, indexed by name.""" 697 groups: Dict[str, InstanceGroup] = {} 698 if not apis.is_enabled(context.project_id, 'compute'): 699 return groups 700 gce_api = apis.get_api('compute', 'v1', context.project_id) 701 requests = [ 702 gce_api.instanceGroups().list(project=context.project_id, zone=zone) 703 for zone in get_gce_zones(context.project_id) 704 ] 705 items = apis_utils.batch_list_all( 706 api=gce_api, 707 requests=requests, 708 next_function=gce_api.instanceGroups().list_next, 709 log_text=f'listing gce instances of project {context.project_id}', 710 ) 711 for i in items: 712 result = re.match( 713 r'https://www.googleapis.com/compute/v1/projects/[^/]+/zones/([^/]+)', 714 i['selfLink'], 715 ) 716 if not result: 717 logging.error("instance %s selfLink didn't match regexp: %s", i['id'], 718 i['selfLink']) 719 continue 720 zone = result.group(1) 721 labels = i.get('labels', {}) 722 resource = i.get('name', '') 723 if not context.match_project_resource( 724 location=zone, labels=labels, resource=resource): 725 continue 726 groups[i['name']] = InstanceGroup(context.project_id, i) 727 return groups
Get a list of InstanceGroups matching the given context, indexed by name.
730@caching.cached_api_call(in_memory=True) 731def get_managed_instance_groups( 732 context: models.Context,) -> Mapping[int, ManagedInstanceGroup]: 733 """Get a list of zonal ManagedInstanceGroups matching the given context, indexed by mig id.""" 734 735 migs: Dict[int, ManagedInstanceGroup] = {} 736 if not apis.is_enabled(context.project_id, 'compute'): 737 return migs 738 gce_api = apis.get_api('compute', 'v1', context.project_id) 739 requests = [ 740 gce_api.instanceGroupManagers().list(project=context.project_id, 741 zone=zone) 742 for zone in get_gce_zones(context.project_id) 743 ] 744 items = apis_utils.batch_list_all( 745 api=gce_api, 746 requests=requests, 747 next_function=gce_api.instanceGroupManagers().list_next, 748 log_text=('listing zonal managed instance groups of project' 749 f' {context.project_id}'), 750 ) 751 for i in items: 752 result = re.match( 753 r'https://www.googleapis.com/compute/v1/projects/[^/]+/(?:regions|zones)/([^/]+)/', 754 i['selfLink'], 755 ) 756 if not result: 757 logging.error("mig %s selfLink didn't match regexp: %s", i['name'], 758 i['selfLink']) 759 continue 760 location = result.group(1) 761 labels = i.get('labels', {}) 762 resource = i.get('name', '') 763 if not context.match_project_resource( 764 location=location, labels=labels, resource=resource): 765 continue 766 migs[i['id']] = ManagedInstanceGroup(project_id=context.project_id, 767 resource_data=i) 768 return migs
Get a list of zonal ManagedInstanceGroups matching the given context, indexed by mig id.
771@caching.cached_api_call(in_memory=True) 772def get_region_managed_instance_groups( 773 context: models.Context,) -> Mapping[int, ManagedInstanceGroup]: 774 """Get a list of regional ManagedInstanceGroups matching the given context, indexed by mig id.""" 775 776 migs: Dict[int, ManagedInstanceGroup] = {} 777 if not apis.is_enabled(context.project_id, 'compute'): 778 return migs 779 gce_api = apis.get_api('compute', 'v1', context.project_id) 780 requests = [ 781 gce_api.regionInstanceGroupManagers().list(project=context.project_id, 782 region=r.name) 783 for r in get_all_regions(context.project_id) 784 ] 785 items = apis_utils.batch_list_all( 786 api=gce_api, 787 requests=requests, 788 next_function=gce_api.regionInstanceGroupManagers().list_next, 789 log_text=('listing regional managed instance groups of project' 790 f' {context.project_id}'), 791 ) 792 for i in items: 793 result = re.match( 794 r'https://www.googleapis.com/compute/v1/projects/[^/]+/(?:regions)/([^/]+)/', 795 i['selfLink'], 796 ) 797 if not result: 798 logging.error("mig %s selfLink didn't match regexp: %s", i['name'], 799 i['selfLink']) 800 continue 801 location = result.group(1) 802 labels = i.get('labels', {}) 803 name = i.get('name', '') 804 if not context.match_project_resource( 805 location=location, labels=labels, resource=name): 806 continue 807 migs[i['id']] = ManagedInstanceGroup(project_id=context.project_id, 808 resource_data=i) 809 return migs
Get a list of regional ManagedInstanceGroups matching the given context, indexed by mig id.
812@caching.cached_api_call 813def get_instance_templates(project_id: str) -> Mapping[str, InstanceTemplate]: 814 logging.info('fetching instance templates') 815 templates = {} 816 gce_api = apis.get_api('compute', 'v1', project_id) 817 request = gce_api.instanceTemplates().list( 818 project=project_id, 819 returnPartialSuccess=True, 820 # Fetch only a subset of the fields to improve performance. 821 fields=('items/name, items/properties/tags,' 822 ' items/properties/networkInterfaces,' 823 ' items/properties/serviceAccounts, items/properties/metadata'), 824 ) 825 for t in apis_utils.list_all( 826 request, next_function=gce_api.instanceTemplates().list_next): 827 templates[t['name']] = InstanceTemplate(project_id, t) 828 return templates
831@caching.cached_api_call 832def get_project_metadata(project_id) -> Mapping[str, str]: 833 gce_api = apis.get_api('compute', 'v1', project_id) 834 logging.info('fetching metadata of project %s\n', project_id) 835 query = gce_api.projects().get(project=project_id) 836 try: 837 response = query.execute(num_retries=config.API_RETRIES) 838 except googleapiclient.errors.HttpError as err: 839 raise utils.GcpApiError(err) from err 840 841 mapped_metadata: Dict[str, str] = {} 842 metadata = response.get('commonInstanceMetadata') 843 if metadata and 'items' in metadata: 844 for m_item in metadata['items']: 845 mapped_metadata[m_item.get('key')] = m_item.get('value') 846 return mapped_metadata
849@caching.cached_api_call 850def get_instances_serial_port_output(context: models.Context): 851 """Get a list of serial port output for instances 852 853 which matche the given context, running and is not 854 exported to cloud logging. 855 """ 856 # Create temp storage (diskcache.Deque) for output 857 deque = caching.get_tmp_deque('tmp-gce-serial-output-') 858 if not apis.is_enabled(context.project_id, 'compute'): 859 return deque 860 gce_api = apis.get_api('compute', 'v1', context.project_id) 861 862 # Serial port output are rolled over on day 7 and limited to 1MB. 863 # Fetching serial outputs are very expensive so optimize to fetch. 864 # Only relevant instances as storage size can grow drastically for 865 # massive projects. Think 1MB * N where N is some large number. 866 requests = [ 867 gce_api.instances().getSerialPortOutput( 868 project=i.project_id, 869 zone=i.zone, 870 instance=i.id, 871 # To get all 1mb output 872 start=-1000000, 873 ) 874 for i in get_instances(context).values() 875 # fetch running instances that do not export to cloud logging 876 if not i.is_serial_port_logging_enabled() and i.is_running 877 ] 878 requests_start_time = datetime.now() 879 # Note: We are limited to 1000 calls in a single batch request. 880 # We have to use multiple batch requests in batches of 1000 881 # https://github.com/googleapis/google-api-python-client/blob/main/docs/batch.md 882 batch_size = 1000 883 for i in range(0, len(requests), batch_size): 884 batch_requests = requests[i:i + batch_size] 885 for _, response, exception in apis_utils.batch_execute_all( 886 api=gce_api, requests=batch_requests): 887 if exception: 888 if isinstance(exception, googleapiclient.errors.HttpError): 889 raise utils.GcpApiError(exception) from exception 890 else: 891 raise exception 892 893 if response: 894 result = re.match( 895 r'https://www.googleapis.com/compute/v1/projects/([^/]+)/zones/[^/]+/instances/([^/]+)', 896 response['selfLink'], 897 ) 898 if not result: 899 logging.error("instance selfLink didn't match regexp: %s", 900 response['selfLink']) 901 return 902 903 project_id = result.group(1) 904 instance_id = result.group(2) 905 deque.appendleft( 906 SerialPortOutput( 907 project_id=project_id, 908 instance_id=instance_id, 909 contents=response['contents'].splitlines(), 910 )) 911 requests_end_time = datetime.now() 912 logging.debug( 913 'total serial logs processing time: %s, number of instances: %s', 914 requests_end_time - requests_start_time, 915 len(requests), 916 ) 917 return deque
Get a list of serial port output for instances
which matche the given context, running and is not exported to cloud logging.
920def get_instance_serial_port_output( 921 project_id, zone, instance_name) -> Optional[SerialPortOutput]: 922 """Get a list of serial port output for instances 923 924 which matche the given context, running and is not 925 exported to cloud logging. 926 """ 927 # Create temp storage (diskcache.Deque) for output 928 if not apis.is_enabled(project_id, 'compute'): 929 return None 930 gce_api = apis.get_api('compute', 'v1', project_id) 931 932 request = gce_api.instances().getSerialPortOutput( 933 project=project_id, 934 zone=zone, 935 instance=instance_name, 936 # To get all 1mb output 937 start=-1000000, 938 ) 939 try: 940 response = request.execute(num_retries=config.API_RETRIES) 941 except googleapiclient.errors.HttpError: 942 return None 943 944 if response: 945 result = re.match( 946 r'https://www.googleapis.com/compute/v1/projects/([^/]+)/zones/[^/]+/instances/([^/]+)', 947 response['selfLink'], 948 ) 949 if not result: 950 logging.error("instance selfLink didn't match regexp: %s", 951 response['selfLink']) 952 return None 953 954 project_id = result.group(1) 955 instance_id = result.group(2) 956 return SerialPortOutput( 957 project_id, 958 instance_id=instance_id, 959 contents=response['contents'].splitlines(), 960 )
Get a list of serial port output for instances
which matche the given context, running and is not exported to cloud logging.
963class Region(models.Resource): 964 """Represents a GCE Region.""" 965 966 _resource_data: dict 967 968 def __init__(self, project_id, resource_data): 969 super().__init__(project_id=project_id) 970 self._resource_data = resource_data 971 972 @property 973 def self_link(self) -> str: 974 return self._resource_data['selfLink'] 975 976 @property 977 def full_path(self) -> str: 978 result = re.match(r'https://www.googleapis.com/compute/v1/(.*)', 979 self.self_link) 980 if result: 981 return result.group(1) 982 else: 983 return f'>> {self.self_link}' 984 985 @property 986 def name(self) -> str: 987 return self._resource_data['name']
Represents a GCE Region.
976 @property 977 def full_path(self) -> str: 978 result = re.match(r'https://www.googleapis.com/compute/v1/(.*)', 979 self.self_link) 980 if result: 981 return result.group(1) 982 else: 983 return f'>> {self.self_link}'
Returns the full path of this resource.
Example: 'projects/gcpdiag-gke-1-9b90/zones/europe-west4-a/clusters/gke1'
Inherited Members
- gcpdiag.models.Resource
- project_id
- short_path
990@caching.cached_api_call 991def get_all_regions(project_id: str) -> Iterable[Region]: 992 """Return list of all regions 993 994 Args: 995 project_id (str): project id for this request 996 997 Raises: 998 utils.GcpApiError: Raises GcpApiError in case of query issues 999 1000 Returns: 1001 Iterable[Region]: Return list of all regions 1002 """ 1003 try: 1004 gce_api = apis.get_api('compute', 'v1', project_id) 1005 request = gce_api.regions().list(project=project_id) 1006 response = request.execute(num_retries=config.API_RETRIES) 1007 if not response or 'items' not in response: 1008 return set() 1009 1010 return { 1011 Region(project_id, item) for item in response['items'] if 'name' in item 1012 } 1013 except googleapiclient.errors.HttpError as err: 1014 raise utils.GcpApiError(err) from err
Return list of all regions
Arguments:
- project_id (str): project id for this request
Raises:
- utils.GcpApiError: Raises GcpApiError in case of query issues
Returns:
Iterable[Region]: Return list of all regions
1017def get_regions_with_instances(context: models.Context) -> Iterable[Region]: 1018 """Return list of regions with instances 1019 1020 Args: 1021 context (models.Context): context for this request 1022 1023 Returns: 1024 Iterable[Region]: Return list of regions which contains instances 1025 """ 1026 1027 regions_of_instances = {i.region for i in get_instances(context).values()} 1028 1029 all_regions = get_all_regions(context.project_id) 1030 if not all_regions: 1031 return set() 1032 1033 return {r for r in all_regions if r.name in regions_of_instances}
Return list of regions with instances
Arguments:
- context (models.Context): context for this request
Returns:
Iterable[Region]: Return list of regions which contains instances
1036@caching.cached_api_call 1037def get_all_disks(project_id: str) -> Iterable[Disk]: 1038 try: 1039 gce_api = apis.get_api('compute', 'v1', project_id) 1040 requests = [ 1041 gce_api.disks().list(project=project_id, zone=zone) 1042 for zone in get_gce_zones(project_id) 1043 ] 1044 items = apis_utils.batch_list_all( 1045 api=gce_api, 1046 requests=requests, 1047 next_function=gce_api.disks().list_next, 1048 log_text=f'listing gce disks of project {project_id}', 1049 ) 1050 1051 return {Disk(project_id, item) for item in items} 1052 1053 except googleapiclient.errors.HttpError as err: 1054 raise utils.GcpApiError(err) from err
1057class InstanceEffectiveFirewalls(network_q.EffectiveFirewalls): 1058 """Effective firewall rules for a network interace on a VM instance. 1059 1060 Includes org/folder firewall policies). 1061 """ 1062 1063 _instance: Instance 1064 _nic: str 1065 1066 def __init__(self, instance, nic, resource_data): 1067 super().__init__(resource_data) 1068 self._instance = instance 1069 self._nic = nic
Effective firewall rules for a network interace on a VM instance.
Includes org/folder firewall policies).
1072@caching.cached_api_call(in_memory=True) 1073def get_instance_interface_effective_firewalls( 1074 instance: Instance, nic: str) -> InstanceEffectiveFirewalls: 1075 """Return effective firewalls for a network interface on the instance""" 1076 compute = apis.get_api('compute', 'v1', instance.project_id) 1077 request = compute.instances().getEffectiveFirewalls( 1078 project=instance.project_id, 1079 zone=instance.zone, 1080 instance=instance.name, 1081 networkInterface=nic, 1082 ) 1083 response = request.execute(num_retries=config.API_RETRIES) 1084 return InstanceEffectiveFirewalls(Instance, nic, response)
Return effective firewalls for a network interface on the instance
1087def is_project_serial_port_logging_enabled(project_id: str) -> bool: 1088 if not apis.is_enabled(project_id, 'compute'): 1089 return False 1090 1091 value = get_project_metadata( 1092 project_id=project_id).get('serial-port-logging-enable') 1093 return bool(value and value.upper() in POSITIVE_BOOL_VALUES)
1114class SerialOutputQuery: 1115 """A serial output job that was started with prefetch_logs().""" 1116 1117 job: _SerialOutputJob 1118 1119 def __init__(self, job): 1120 self.job = job 1121 1122 @property 1123 def entries(self) -> Sequence: 1124 if not self.job.future: 1125 raise RuntimeError("Fetching serial logs wasn't executed. did you call" 1126 ' execute_get_serial_port_output()?') 1127 elif self.job.future.running(): 1128 logging.info( 1129 'waiting for serial output results for project: %s', 1130 self.job.context.project_id, 1131 ) 1132 return self.job.future.result()
A serial output job that was started with prefetch_logs().
1122 @property 1123 def entries(self) -> Sequence: 1124 if not self.job.future: 1125 raise RuntimeError("Fetching serial logs wasn't executed. did you call" 1126 ' execute_get_serial_port_output()?') 1127 elif self.job.future.running(): 1128 logging.info( 1129 'waiting for serial output results for project: %s', 1130 self.job.context.project_id, 1131 ) 1132 return self.job.future.result()
1138def execute_fetch_serial_port_outputs(executor: concurrent.futures.Executor): 1139 # start a thread to fetch serial log; processing logs can be large 1140 # depending on he number of instances in the project which aren't logging to cloud logging 1141 # currently expects only one job but implementing it so support for multiple projects is possible. 1142 global jobs_todo 1143 jobs_executing = jobs_todo 1144 jobs_todo = {} 1145 for job in jobs_executing.values(): 1146 job.future = executor.submit(get_instances_serial_port_output, job.context)
1156class HealthCheck(models.Resource): 1157 """A Health Check resource.""" 1158 1159 _resource_data: dict 1160 _type: str 1161 1162 def __init__(self, project_id, resource_data): 1163 super().__init__(project_id=project_id) 1164 self._resource_data = resource_data 1165 1166 @property 1167 def name(self) -> str: 1168 return self._resource_data['name'] 1169 1170 @property 1171 def full_path(self) -> str: 1172 result = re.match(r'https://www.googleapis.com/compute/v1/(.*)', 1173 self.self_link) 1174 if result: 1175 return result.group(1) 1176 else: 1177 return f'>> {self.self_link}' 1178 1179 @property 1180 def short_path(self) -> str: 1181 path = self.project_id + '/' + self.name 1182 return path 1183 1184 @property 1185 def self_link(self) -> str: 1186 return self._resource_data['selfLink'] 1187 1188 @property 1189 def is_log_enabled(self) -> bool: 1190 try: 1191 log_config = self._resource_data.get('logConfig', False) 1192 if log_config and log_config['enable']: 1193 return True 1194 except KeyError: 1195 return False 1196 return False
A Health Check resource.
1170 @property 1171 def full_path(self) -> str: 1172 result = re.match(r'https://www.googleapis.com/compute/v1/(.*)', 1173 self.self_link) 1174 if result: 1175 return result.group(1) 1176 else: 1177 return f'>> {self.self_link}'
Returns the full path of this resource.
Example: 'projects/gcpdiag-gke-1-9b90/zones/europe-west4-a/clusters/gke1'
1179 @property 1180 def short_path(self) -> str: 1181 path = self.project_id + '/' + self.name 1182 return path
Returns the short name for this resource.
Note that it isn't clear from this name what kind of resource it is.
Example: 'gke1'
Inherited Members
- gcpdiag.models.Resource
- project_id
1199@caching.cached_api_call(in_memory=True) 1200def get_health_check(project_id: str, health_check: str) -> object: 1201 logging.info('fetching health check: %s', health_check) 1202 compute = apis.get_api('compute', 'v1', project_id) 1203 request = compute.healthChecks().get(project=project_id, 1204 healthCheck=health_check) 1205 response = request.execute(num_retries=config.API_RETRIES) 1206 return HealthCheck(project_id, response)