gcpdiag.queries.gce

Queries related to GCP Compute Engine.
POSITIVE_BOOL_VALUES = {'Y', 'YES', '1', 'TRUE'}
DATAPROC_LABEL = 'goog-dataproc-cluster-name'
GKE_LABEL = 'goog-gke-node'
class InstanceTemplate(gcpdiag.models.Resource):
 40class InstanceTemplate(models.Resource):
 41  """Represents a GCE Instance Template."""
 42
 43  _resource_data: dict
 44
 45  def __init__(self, project_id, resource_data):
 46    super().__init__(project_id=project_id)
 47    self._resource_data = resource_data
 48
 49  @property
 50  def self_link(self) -> str:
 51    return self._resource_data['selfLink']
 52
 53  @property
 54  def full_path(self) -> str:
 55    result = re.match(r'https://www.googleapis.com/compute/v1/(.*)',
 56                      self.self_link)
 57    if result:
 58      return result.group(1)
 59    else:
 60      return f'>> {self.self_link}'
 61
 62  @property
 63  def short_path(self) -> str:
 64    path = self.project_id + '/' + self.name
 65    return path
 66
 67  @property
 68  def name(self) -> str:
 69    return self._resource_data['name']
 70
 71  @property
 72  def tags(self) -> List[str]:
 73    return self._resource_data['properties'].get('tags', {}).get('items', [])
 74
 75  @property
 76  def service_account(self) -> Optional[str]:
 77    sa_list = self._resource_data['properties'].get('serviceAccounts', [])
 78    if not sa_list:
 79      return None
 80    email = sa_list[0]['email']
 81    if email == 'default':
 82      project_nr = crm.get_project(self._project_id).number
 83      return f'{project_nr}-compute@developer.gserviceaccount.com'
 84    return email
 85
 86  @property
 87  def network(self) -> network_q.Network:
 88    return network_q.get_network_from_url(
 89        self._resource_data['properties']['networkInterfaces'][0]['network'])
 90
 91  @property
 92  def subnetwork(self) -> network_q.Subnetwork:
 93    subnet_url = self._resource_data['properties']['networkInterfaces'][0][
 94        'subnetwork']
 95    return self.network.subnetworks[subnet_url]
 96
 97  def get_metadata(self, key: str) -> str:
 98    for item in self._resource_data['properties']['metadata']['items']:
 99      if item['key'] == key:
100        return item['value']
101    return ''

Represents a GCE Instance Template.

InstanceTemplate(project_id, resource_data)
45  def __init__(self, project_id, resource_data):
46    super().__init__(project_id=project_id)
47    self._resource_data = resource_data
full_path: str
53  @property
54  def full_path(self) -> str:
55    result = re.match(r'https://www.googleapis.com/compute/v1/(.*)',
56                      self.self_link)
57    if result:
58      return result.group(1)
59    else:
60      return f'>> {self.self_link}'

Returns the full path of this resource.

Example: 'projects/gcpdiag-gke-1-9b90/zones/europe-west4-a/clusters/gke1'

short_path: str
62  @property
63  def short_path(self) -> str:
64    path = self.project_id + '/' + self.name
65    return path

Returns the short name for this resource.

Note that it isn't clear from this name what kind of resource it is.

Example: 'gke1'

name: str
67  @property
68  def name(self) -> str:
69    return self._resource_data['name']
tags: List[str]
71  @property
72  def tags(self) -> List[str]:
73    return self._resource_data['properties'].get('tags', {}).get('items', [])
service_account: Optional[str]
75  @property
76  def service_account(self) -> Optional[str]:
77    sa_list = self._resource_data['properties'].get('serviceAccounts', [])
78    if not sa_list:
79      return None
80    email = sa_list[0]['email']
81    if email == 'default':
82      project_nr = crm.get_project(self._project_id).number
83      return f'{project_nr}-compute@developer.gserviceaccount.com'
84    return email
network: gcpdiag.queries.network.Network
86  @property
87  def network(self) -> network_q.Network:
88    return network_q.get_network_from_url(
89        self._resource_data['properties']['networkInterfaces'][0]['network'])
subnetwork: gcpdiag.queries.network.Subnetwork
91  @property
92  def subnetwork(self) -> network_q.Subnetwork:
93    subnet_url = self._resource_data['properties']['networkInterfaces'][0][
94        'subnetwork']
95    return self.network.subnetworks[subnet_url]
def get_metadata(self, key: str) -> str:
 97  def get_metadata(self, key: str) -> str:
 98    for item in self._resource_data['properties']['metadata']['items']:
 99      if item['key'] == key:
100        return item['value']
101    return ''
class InstanceGroup(gcpdiag.models.Resource):
104class InstanceGroup(models.Resource):
105  """Represents a GCE instance group."""
106
107  _resource_data: dict
108
109  def __init__(self, project_id, resource_data):
110    super().__init__(project_id=project_id)
111    self._resource_data = resource_data
112
113  @property
114  def full_path(self) -> str:
115    result = re.match(
116        r'https://www.googleapis.com/compute/v1/(.*)',
117        self._resource_data['selfLink'],
118    )
119    if result:
120      return result.group(1)
121    else:
122      return '>> ' + self._resource_data['selfLink']
123
124  @property
125  def short_path(self) -> str:
126    path = self.project_id + '/' + self.name
127    return path
128
129  @property
130  def self_link(self) -> str:
131    return self._resource_data['selfLink']
132
133  @property
134  def name(self) -> str:
135    return self._resource_data['name']
136
137  @property
138  def named_ports(self) -> List[dict]:
139    if 'namedPorts' in self._resource_data:
140      return self._resource_data['namedPorts']
141    return []
142
143  def has_named_ports(self) -> bool:
144    if 'namedPorts' in self._resource_data:
145      return True
146    return False

Represents a GCE instance group.

InstanceGroup(project_id, resource_data)
109  def __init__(self, project_id, resource_data):
110    super().__init__(project_id=project_id)
111    self._resource_data = resource_data
full_path: str
113  @property
114  def full_path(self) -> str:
115    result = re.match(
116        r'https://www.googleapis.com/compute/v1/(.*)',
117        self._resource_data['selfLink'],
118    )
119    if result:
120      return result.group(1)
121    else:
122      return '>> ' + self._resource_data['selfLink']

Returns the full path of this resource.

Example: 'projects/gcpdiag-gke-1-9b90/zones/europe-west4-a/clusters/gke1'

short_path: str
124  @property
125  def short_path(self) -> str:
126    path = self.project_id + '/' + self.name
127    return path

Returns the short name for this resource.

Note that it isn't clear from this name what kind of resource it is.

Example: 'gke1'

name: str
133  @property
134  def name(self) -> str:
135    return self._resource_data['name']
named_ports: List[dict]
137  @property
138  def named_ports(self) -> List[dict]:
139    if 'namedPorts' in self._resource_data:
140      return self._resource_data['namedPorts']
141    return []
def has_named_ports(self) -> bool:
143  def has_named_ports(self) -> bool:
144    if 'namedPorts' in self._resource_data:
145      return True
146    return False
class ManagedInstanceGroup(gcpdiag.models.Resource):
149class ManagedInstanceGroup(models.Resource):
150  """Represents a GCE managed instance group."""
151
152  _resource_data: dict
153  _region: Optional[str]
154
155  def __init__(self, project_id, resource_data):
156    super().__init__(project_id=project_id)
157    self._resource_data = resource_data
158    self._region = None
159
160  @property
161  def full_path(self) -> str:
162    result = re.match(
163        r'https://www.googleapis.com/compute/v1/(.*)',
164        self._resource_data['selfLink'],
165    )
166    if result:
167      return result.group(1)
168    else:
169      return '>> ' + self._resource_data['selfLink']
170
171  @property
172  def short_path(self) -> str:
173    path = self.project_id + '/' + self.name
174    return path
175
176  def is_gke(self) -> bool:
177    """Is this managed instance group part of a GKE cluster?
178
179    Note that the results are based on heuristics (the mig name),
180    which is not ideal.
181
182    Returns:
183        bool: True if this managed instance group is part of a GKE cluster.
184    """
185
186    # gke- is normal GKE, gk3- is GKE autopilot
187    return self.name.startswith('gke-') or self.name.startswith('gk3-')
188
189  @property
190  def self_link(self) -> str:
191    return self._resource_data['selfLink']
192
193  @property
194  def name(self) -> str:
195    return self._resource_data['name']
196
197  @property
198  def region(self) -> str:
199    if self._region is None:
200      if 'region' in self._resource_data:
201        m = re.search(r'/regions/([^/]+)$', self._resource_data['region'])
202        if not m:
203          raise RuntimeError("can't determine region of mig %s (%s)" %
204                             (self.name, self._resource_data['region']))
205        self._region = m.group(1)
206      elif 'zone' in self._resource_data:
207        m = re.search(r'/zones/([^/]+)$', self._resource_data['zone'])
208        if not m:
209          raise RuntimeError("can't determine region of mig %s (%s)" %
210                             (self.name, self._resource_data['region']))
211        zone = m.group(1)
212        self._region = utils.zone_region(zone)
213      else:
214        raise RuntimeError(
215            f"can't determine region of mig {self.name}, both region and zone"
216            " aren't set!")
217    return self._region
218
219  def count_no_action_instances(self) -> int:
220    """number of instances in the mig that are running and have no scheduled actions."""
221    return self._resource_data['currentActions']['none']
222
223  def is_instance_member(self, project_id: str, region: str,
224                         instance_name: str):
225    """Given the project_id, region and instance name, is it a member of this MIG?"""
226    return (self.project_id == project_id and self.region == region and
227            instance_name.startswith(self._resource_data['baseInstanceName']))
228
229  @property
230  def template(self) -> InstanceTemplate:
231    if 'instanceTemplate' not in self._resource_data:
232      raise RuntimeError('instanceTemplate not set for MIG {self.name}')
233
234    m = re.match(
235        r'https://www.googleapis.com/compute/v1/(.*)',
236        self._resource_data['instanceTemplate'],
237    )
238
239    if not m:
240      raise RuntimeError("can't parse instanceTemplate: %s" %
241                         self._resource_data['instanceTemplate'])
242    template_self_link = m.group(1)
243    templates = get_instance_templates(self.project_id)
244    if template_self_link not in templates:
245      raise RuntimeError(
246          f'instanceTemplate {template_self_link} for MIG {self.name} not found'
247      )
248    return templates[template_self_link]
249
250  @property
251  def version_target_reached(self) -> bool:
252    return get_path(self._resource_data,
253                    ('status', 'versionTarget', 'isReached'))

Represents a GCE managed instance group.

ManagedInstanceGroup(project_id, resource_data)
155  def __init__(self, project_id, resource_data):
156    super().__init__(project_id=project_id)
157    self._resource_data = resource_data
158    self._region = None
full_path: str
160  @property
161  def full_path(self) -> str:
162    result = re.match(
163        r'https://www.googleapis.com/compute/v1/(.*)',
164        self._resource_data['selfLink'],
165    )
166    if result:
167      return result.group(1)
168    else:
169      return '>> ' + self._resource_data['selfLink']

Returns the full path of this resource.

Example: 'projects/gcpdiag-gke-1-9b90/zones/europe-west4-a/clusters/gke1'

short_path: str
171  @property
172  def short_path(self) -> str:
173    path = self.project_id + '/' + self.name
174    return path

Returns the short name for this resource.

Note that it isn't clear from this name what kind of resource it is.

Example: 'gke1'

def is_gke(self) -> bool:
176  def is_gke(self) -> bool:
177    """Is this managed instance group part of a GKE cluster?
178
179    Note that the results are based on heuristics (the mig name),
180    which is not ideal.
181
182    Returns:
183        bool: True if this managed instance group is part of a GKE cluster.
184    """
185
186    # gke- is normal GKE, gk3- is GKE autopilot
187    return self.name.startswith('gke-') or self.name.startswith('gk3-')

Is this managed instance group part of a GKE cluster?

Note that the results are based on heuristics (the mig name), which is not ideal.

Returns:

bool: True if this managed instance group is part of a GKE cluster.

name: str
193  @property
194  def name(self) -> str:
195    return self._resource_data['name']
region: str
197  @property
198  def region(self) -> str:
199    if self._region is None:
200      if 'region' in self._resource_data:
201        m = re.search(r'/regions/([^/]+)$', self._resource_data['region'])
202        if not m:
203          raise RuntimeError("can't determine region of mig %s (%s)" %
204                             (self.name, self._resource_data['region']))
205        self._region = m.group(1)
206      elif 'zone' in self._resource_data:
207        m = re.search(r'/zones/([^/]+)$', self._resource_data['zone'])
208        if not m:
209          raise RuntimeError("can't determine region of mig %s (%s)" %
210                             (self.name, self._resource_data['region']))
211        zone = m.group(1)
212        self._region = utils.zone_region(zone)
213      else:
214        raise RuntimeError(
215            f"can't determine region of mig {self.name}, both region and zone"
216            " aren't set!")
217    return self._region
def count_no_action_instances(self) -> int:
219  def count_no_action_instances(self) -> int:
220    """number of instances in the mig that are running and have no scheduled actions."""
221    return self._resource_data['currentActions']['none']

number of instances in the mig that are running and have no scheduled actions.

def is_instance_member(self, project_id: str, region: str, instance_name: str):
223  def is_instance_member(self, project_id: str, region: str,
224                         instance_name: str):
225    """Given the project_id, region and instance name, is it a member of this MIG?"""
226    return (self.project_id == project_id and self.region == region and
227            instance_name.startswith(self._resource_data['baseInstanceName']))

Given the project_id, region and instance name, is it a member of this MIG?

template: InstanceTemplate
229  @property
230  def template(self) -> InstanceTemplate:
231    if 'instanceTemplate' not in self._resource_data:
232      raise RuntimeError('instanceTemplate not set for MIG {self.name}')
233
234    m = re.match(
235        r'https://www.googleapis.com/compute/v1/(.*)',
236        self._resource_data['instanceTemplate'],
237    )
238
239    if not m:
240      raise RuntimeError("can't parse instanceTemplate: %s" %
241                         self._resource_data['instanceTemplate'])
242    template_self_link = m.group(1)
243    templates = get_instance_templates(self.project_id)
244    if template_self_link not in templates:
245      raise RuntimeError(
246          f'instanceTemplate {template_self_link} for MIG {self.name} not found'
247      )
248    return templates[template_self_link]
version_target_reached: bool
250  @property
251  def version_target_reached(self) -> bool:
252    return get_path(self._resource_data,
253                    ('status', 'versionTarget', 'isReached'))
class SerialPortOutput:
256class SerialPortOutput:
257  """Represents the full Serial Port Output (/dev/ttyS0 or COM1) of an instance.
258
259  contents is the full 1MB of the instance.
260  """
261
262  _project_id: str
263  _instance_id: str
264  _contents: List[str]
265
266  def __init__(self, project_id, instance_id, contents):
267    self._project_id = project_id
268    self._instance_id = instance_id
269    self._contents = contents
270
271  @property
272  def contents(self) -> List[str]:
273    return self._contents
274
275  @property
276  def instance_id(self) -> str:
277    return self._instance_id

Represents the full Serial Port Output (/dev/ttyS0 or COM1) of an instance.

contents is the full 1MB of the instance.

SerialPortOutput(project_id, instance_id, contents)
266  def __init__(self, project_id, instance_id, contents):
267    self._project_id = project_id
268    self._instance_id = instance_id
269    self._contents = contents
contents: List[str]
271  @property
272  def contents(self) -> List[str]:
273    return self._contents
instance_id: str
275  @property
276  def instance_id(self) -> str:
277    return self._instance_id
class Instance(gcpdiag.models.Resource):
280class Instance(models.Resource):
281  """Represents a GCE instance."""
282
283  _resource_data: dict
284  _region: Optional[str]
285
286  def __init__(self, project_id, resource_data):
287    super().__init__(project_id=project_id)
288    self._resource_data = resource_data
289    self._metadata_dict = None
290    self._region = None
291
292  @property
293  def id(self) -> str:
294    return self._resource_data['id']
295
296  @property
297  def name(self) -> str:
298    return self._resource_data['name']
299
300  @property
301  def full_path(self) -> str:
302    result = re.match(
303        r'https://www.googleapis.com/compute/v1/(.*)',
304        self._resource_data['selfLink'],
305    )
306    if result:
307      return result.group(1)
308    else:
309      return '>> ' + self._resource_data['selfLink']
310
311  @property
312  def short_path(self) -> str:
313    # Note: instance names must be unique per project,
314    # so no need to add the zone.
315    path = self.project_id + '/' + self.name
316    return path
317
318  @property
319  def creation_timestamp(self) -> datetime:
320    """VM creation time, as a *naive* `datetime` object."""
321    return (datetime.fromisoformat(
322        self._resource_data['creationTimestamp']).astimezone(
323            timezone.utc).replace(tzinfo=None))
324
325  @property
326  def region(self) -> str:
327    if self._region is None:
328      if 'zone' in self._resource_data:
329        m = re.search(r'/zones/([^/]+)$', self._resource_data['zone'])
330        if not m:
331          raise RuntimeError("can't determine region of instance %s (%s)" %
332                             (self.name, self._resource_data['region']))
333        zone = m.group(1)
334        self._region = utils.zone_region(zone)
335      else:
336        raise RuntimeError(
337            f"can't determine region of instance {self.name}, zone isn't set!")
338    return self._region
339
340  @property
341  def zone(self) -> str:
342    zone_uri = self._resource_data['zone']
343    m = re.search(r'/zones/([^/]+)$', zone_uri)
344    if m:
345      return m.group(1)
346    else:
347      raise RuntimeError(f"can't determine zone of instance {self.name}")
348
349  @property
350  def disks(self) -> List[dict]:
351    if 'disks' in self._resource_data:
352      return self._resource_data['disks']
353    return []
354
355  @property
356  def startrestricted(self) -> bool:
357    return self._resource_data['startRestricted']
358
359  def laststarttimestamp(self) -> str:
360    return self._resource_data['lastStartTimestamp']
361
362  def laststoptimestamp(self) -> str:
363    if 'lastStopTimestamp' in self._resource_data:
364      return self._resource_data['lastStopTimestamp']
365    return ''
366
367  def is_serial_port_logging_enabled(self) -> bool:
368    value = self.get_metadata('serial-port-logging-enable')
369    return bool(value and value.upper() in POSITIVE_BOOL_VALUES)
370
371  def is_oslogin_enabled(self) -> bool:
372    value = self.get_metadata('enable-oslogin')
373    return bool(value and value.upper() in POSITIVE_BOOL_VALUES)
374
375  def is_metadata_enabled(self, metadata_name) -> bool:
376    """Use to check for common boolean metadata value"""
377    value = self.get_metadata(metadata_name)
378    return bool(value and value.upper() in POSITIVE_BOOL_VALUES)
379
380  def has_label(self, label) -> bool:
381    return label in self.labels
382
383  def is_dataproc_instance(self) -> bool:
384    return self.has_label(DATAPROC_LABEL)
385
386  def is_gke_node(self) -> bool:
387    return self.has_label(GKE_LABEL)
388
389  def is_preemptible_vm(self) -> bool:
390    return ('scheduling' in self._resource_data and
391            'preemptible' in self._resource_data['scheduling'] and
392            self._resource_data['scheduling']['preemptible'])
393
394  @property
395  def created_by_mig(self) -> bool:
396    """Return bool indicating if the instance part of a mig.
397
398    MIG which were part of MIG however have been removed or terminated will return True.
399    """
400    created_by = self.get_metadata('created-by')
401    if created_by is None:
402      return False
403
404    created_by_match = re.match(
405        r'projects/([^/]+)/((?:regions|zones)/[^/]+/instanceGroupManagers/[^/]+)$',
406        created_by,
407    )
408    if not created_by_match:
409      return False
410    return True
411
412  def is_windows_machine(self) -> bool:
413    if 'disks' in self._resource_data:
414      disks = next(iter(self._resource_data['disks']))
415      if 'guestOsFeatures' in disks:
416        if 'WINDOWS' in [t['type'] for t in iter(disks['guestOsFeatures'])]:
417          return True
418    return False
419
420  def is_public_machine(self) -> bool:
421    if 'networkInterfaces' in self._resource_data:
422      return 'natIP' in str(self._resource_data['networkInterfaces'])
423    return False
424
425  def machine_type(self):
426    if 'machineType' in self._resource_data:
427      # return self._resource_data['machineType']
428      machine_type_uri = self._resource_data['machineType']
429      mt = re.search(r'/machineTypes/([^/]+)$', machine_type_uri)
430      if mt:
431        return mt.group(1)
432      else:
433        raise RuntimeError(
434            f"can't determine machineType of instance {self.name}")
435    return None
436
437  def check_license(self, licenses: List[str]) -> bool:
438    """Checks that a license is contained in a given license list."""
439    if 'disks' in self._resource_data:
440      for disk in self._resource_data['disks']:
441        if 'license' in str(disk):
442          for license_ in licenses:
443            for attached_license in disk['licenses']:
444              if license_ == attached_license.partition('/global/licenses/')[2]:
445                return True
446    return False
447
448  def get_boot_disk_image(self) -> str:
449    """Get VM's boot disk image."""
450    boot_disk_image: str = ''
451    for disk in self.disks:
452      if disk.get('boot', False):
453        disk_source = disk.get('source', '')
454        m = re.search(r'/disks/([^/]+)$', disk_source)
455        if not m:
456          raise RuntimeError(f"can't determine name of boot disk {disk_source}")
457        disk_name = m.group(1)
458        gce_disk: Disk = get_disk(self.project_id,
459                                  zone=self.zone,
460                                  disk_name=disk_name)
461        return gce_disk.source_image
462    return boot_disk_image
463
464  @property
465  def is_sole_tenant_vm(self) -> bool:
466    return bool('nodeAffinities' in self._resource_data['scheduling'])
467
468  @property
469  def network(self) -> network_q.Network:
470    # 'https://www.googleapis.com/compute/v1/projects/gcpdiag-gce1-aaaa/global/networks/default'
471    network_string = self._resource_data['networkInterfaces'][0]['network']
472    m = re.match(r'^.+/projects/([^/]+)/global/networks/([^/]+)$',
473                 network_string)
474    if not m:
475      raise RuntimeError("can't parse network string: %s" % network_string)
476    return network_q.get_network(m.group(1), m.group(2))
477
478  @property
479  def network_ips(self) -> List[network_q.IPv4AddrOrIPv6Addr]:
480    return [
481        ipaddress.ip_address(nic['networkIP'])
482        for nic in self._resource_data['networkInterfaces']
483    ]
484
485  @property
486  def get_network_interfaces(self):
487    return self._resource_data['networkInterfaces']
488
489  @property
490  def subnetworks(self) -> List[network_q.Subnetwork]:
491    subnetworks = []
492    for nic in self._resource_data['networkInterfaces']:
493      subnetworks.append(network_q.get_subnetwork_from_url(nic['subnetwork']))
494    return subnetworks
495
496  @property
497  def routes(self) -> List[network_q.Route]:
498    routes = []
499    for nic in self._resource_data['networkInterfaces']:
500      for route in network_q.get_routes(self.project_id):
501        if nic['network'] == route.network:
502          if route.tags == []:
503            routes.append(route)
504            continue
505          else:
506            temp = [x for x in self.tags if x in route.tags]
507            if len(temp) > 0:
508              routes.append(route)
509    return routes
510
511  def get_network_ip_for_instance_interface(
512      self, network: str) -> Optional[network_q.IPv4NetOrIPv6Net]:
513    """Get the network ip for a nic given a network name."""
514    for nic in self._resource_data['networkInterfaces']:
515      if nic.get('network') == network:
516        return ipaddress.ip_network(nic.get('networkIP'))
517    return None
518
519  def secure_boot_enabled(self) -> bool:
520    if 'shieldedInstanceConfig' in self._resource_data:
521      return self._resource_data['shieldedInstanceConfig']['enableSecureBoot']
522    return False
523
524  @property
525  def access_scopes(self) -> List[str]:
526    if 'serviceAccounts' in self._resource_data:
527      saccts = self._resource_data['serviceAccounts']
528      if isinstance(saccts, list) and len(saccts) >= 1:
529        return saccts[0].get('scopes', [])
530    return []
531
532  @property
533  def service_account(self) -> Optional[str]:
534    if 'serviceAccounts' in self._resource_data:
535      saccts = self._resource_data['serviceAccounts']
536      if isinstance(saccts, list) and len(saccts) >= 1:
537        return saccts[0]['email']
538    return None
539
540  @property
541  def tags(self) -> List[str]:
542    if 'tags' in self._resource_data:
543      if 'items' in self._resource_data['tags']:
544        return self._resource_data['tags']['items']
545    return []
546
547  def get_metadata(self, key: str) -> str:
548    if not self._metadata_dict:
549      self._metadata_dict = {}
550      if ('metadata' in self._resource_data and
551          'items' in self._resource_data['metadata']):
552        for item in self._resource_data['metadata']['items']:
553          if 'key' in item and 'value' in item:
554            self._metadata_dict[item['key']] = item['value']
555    project_metadata = get_project_metadata(self.project_id)
556    return self._metadata_dict.get(key, project_metadata.get(key))
557
558  @property
559  def status(self) -> str:
560    """VM Status."""
561    return self._resource_data.get('status', None)
562
563  @property
564  def is_running(self) -> bool:
565    """VM Status is indicated as running."""
566    return self._resource_data.get('status', False) == 'RUNNING'
567
568  @property  # type: ignore
569  @caching.cached_api_call(in_memory=True)
570  def mig(self) -> ManagedInstanceGroup:
571    """Return ManagedInstanceGroup that owns this instance.
572
573    Throws AttributeError in case it isn't MIG-managed.
574    """
575
576    created_by = self.get_metadata('created-by')
577    if created_by is None:
578      raise AttributeError(f'instance {self.id} is not managed by a mig')
579
580    # Example created-by:
581    # pylint: disable=line-too-long
582    # "projects/12340002/zones/europe-west4-a/instanceGroupManagers/gke-gke1-default-pool-e5e20a34-grp"
583    # (note how it uses a project number and not a project id...)
584    created_by_match = re.match(
585        r'projects/([^/]+)/((?:regions|zones)/[^/]+/instanceGroupManagers/[^/]+)$',
586        created_by,
587    )
588    if not created_by_match:
589      raise AttributeError(f'instance {self.id} is not managed by a mig'
590                           f' (created-by={created_by})')
591    project = crm.get_project(created_by_match.group(1))
592
593    mig_self_link = ('https://www.googleapis.com/compute/v1/'
594                     f'projects/{project.id}/{created_by_match.group(2)}')
595
596    # Try to find a matching mig.
597    for mig in get_managed_instance_groups(
598        models.Context(project_id=self.project_id)).values():
599      if mig.self_link == mig_self_link:
600        return mig
601
602    raise AttributeError(f'instance {self.id} is not managed by a mig')
603
604  @property
605  def labels(self) -> dict:
606    return self._resource_data.get('labels', {})

Represents a GCE instance.

Instance(project_id, resource_data)
286  def __init__(self, project_id, resource_data):
287    super().__init__(project_id=project_id)
288    self._resource_data = resource_data
289    self._metadata_dict = None
290    self._region = None
id: str
292  @property
293  def id(self) -> str:
294    return self._resource_data['id']
name: str
296  @property
297  def name(self) -> str:
298    return self._resource_data['name']
full_path: str
300  @property
301  def full_path(self) -> str:
302    result = re.match(
303        r'https://www.googleapis.com/compute/v1/(.*)',
304        self._resource_data['selfLink'],
305    )
306    if result:
307      return result.group(1)
308    else:
309      return '>> ' + self._resource_data['selfLink']

Returns the full path of this resource.

Example: 'projects/gcpdiag-gke-1-9b90/zones/europe-west4-a/clusters/gke1'

short_path: str
311  @property
312  def short_path(self) -> str:
313    # Note: instance names must be unique per project,
314    # so no need to add the zone.
315    path = self.project_id + '/' + self.name
316    return path

Returns the short name for this resource.

Note that it isn't clear from this name what kind of resource it is.

Example: 'gke1'

creation_timestamp: datetime.datetime
318  @property
319  def creation_timestamp(self) -> datetime:
320    """VM creation time, as a *naive* `datetime` object."""
321    return (datetime.fromisoformat(
322        self._resource_data['creationTimestamp']).astimezone(
323            timezone.utc).replace(tzinfo=None))

VM creation time, as a naive datetime object.

region: str
325  @property
326  def region(self) -> str:
327    if self._region is None:
328      if 'zone' in self._resource_data:
329        m = re.search(r'/zones/([^/]+)$', self._resource_data['zone'])
330        if not m:
331          raise RuntimeError("can't determine region of instance %s (%s)" %
332                             (self.name, self._resource_data['region']))
333        zone = m.group(1)
334        self._region = utils.zone_region(zone)
335      else:
336        raise RuntimeError(
337            f"can't determine region of instance {self.name}, zone isn't set!")
338    return self._region
zone: str
340  @property
341  def zone(self) -> str:
342    zone_uri = self._resource_data['zone']
343    m = re.search(r'/zones/([^/]+)$', zone_uri)
344    if m:
345      return m.group(1)
346    else:
347      raise RuntimeError(f"can't determine zone of instance {self.name}")
disks: List[dict]
349  @property
350  def disks(self) -> List[dict]:
351    if 'disks' in self._resource_data:
352      return self._resource_data['disks']
353    return []
startrestricted: bool
355  @property
356  def startrestricted(self) -> bool:
357    return self._resource_data['startRestricted']
def laststarttimestamp(self) -> str:
359  def laststarttimestamp(self) -> str:
360    return self._resource_data['lastStartTimestamp']
def laststoptimestamp(self) -> str:
362  def laststoptimestamp(self) -> str:
363    if 'lastStopTimestamp' in self._resource_data:
364      return self._resource_data['lastStopTimestamp']
365    return ''
def is_serial_port_logging_enabled(self) -> bool:
367  def is_serial_port_logging_enabled(self) -> bool:
368    value = self.get_metadata('serial-port-logging-enable')
369    return bool(value and value.upper() in POSITIVE_BOOL_VALUES)
def is_oslogin_enabled(self) -> bool:
371  def is_oslogin_enabled(self) -> bool:
372    value = self.get_metadata('enable-oslogin')
373    return bool(value and value.upper() in POSITIVE_BOOL_VALUES)
def is_metadata_enabled(self, metadata_name) -> bool:
375  def is_metadata_enabled(self, metadata_name) -> bool:
376    """Use to check for common boolean metadata value"""
377    value = self.get_metadata(metadata_name)
378    return bool(value and value.upper() in POSITIVE_BOOL_VALUES)

Use to check for common boolean metadata value

def has_label(self, label) -> bool:
380  def has_label(self, label) -> bool:
381    return label in self.labels
def is_dataproc_instance(self) -> bool:
383  def is_dataproc_instance(self) -> bool:
384    return self.has_label(DATAPROC_LABEL)
def is_gke_node(self) -> bool:
386  def is_gke_node(self) -> bool:
387    return self.has_label(GKE_LABEL)
def is_preemptible_vm(self) -> bool:
389  def is_preemptible_vm(self) -> bool:
390    return ('scheduling' in self._resource_data and
391            'preemptible' in self._resource_data['scheduling'] and
392            self._resource_data['scheduling']['preemptible'])
created_by_mig: bool
394  @property
395  def created_by_mig(self) -> bool:
396    """Return bool indicating if the instance part of a mig.
397
398    MIG which were part of MIG however have been removed or terminated will return True.
399    """
400    created_by = self.get_metadata('created-by')
401    if created_by is None:
402      return False
403
404    created_by_match = re.match(
405        r'projects/([^/]+)/((?:regions|zones)/[^/]+/instanceGroupManagers/[^/]+)$',
406        created_by,
407    )
408    if not created_by_match:
409      return False
410    return True

Return bool indicating if the instance part of a mig.

MIG which were part of MIG however have been removed or terminated will return True.

def is_windows_machine(self) -> bool:
412  def is_windows_machine(self) -> bool:
413    if 'disks' in self._resource_data:
414      disks = next(iter(self._resource_data['disks']))
415      if 'guestOsFeatures' in disks:
416        if 'WINDOWS' in [t['type'] for t in iter(disks['guestOsFeatures'])]:
417          return True
418    return False
def is_public_machine(self) -> bool:
420  def is_public_machine(self) -> bool:
421    if 'networkInterfaces' in self._resource_data:
422      return 'natIP' in str(self._resource_data['networkInterfaces'])
423    return False
def machine_type(self):
425  def machine_type(self):
426    if 'machineType' in self._resource_data:
427      # return self._resource_data['machineType']
428      machine_type_uri = self._resource_data['machineType']
429      mt = re.search(r'/machineTypes/([^/]+)$', machine_type_uri)
430      if mt:
431        return mt.group(1)
432      else:
433        raise RuntimeError(
434            f"can't determine machineType of instance {self.name}")
435    return None
def check_license(self, licenses: List[str]) -> bool:
437  def check_license(self, licenses: List[str]) -> bool:
438    """Checks that a license is contained in a given license list."""
439    if 'disks' in self._resource_data:
440      for disk in self._resource_data['disks']:
441        if 'license' in str(disk):
442          for license_ in licenses:
443            for attached_license in disk['licenses']:
444              if license_ == attached_license.partition('/global/licenses/')[2]:
445                return True
446    return False

Checks that a license is contained in a given license list.

def get_boot_disk_image(self) -> str:
448  def get_boot_disk_image(self) -> str:
449    """Get VM's boot disk image."""
450    boot_disk_image: str = ''
451    for disk in self.disks:
452      if disk.get('boot', False):
453        disk_source = disk.get('source', '')
454        m = re.search(r'/disks/([^/]+)$', disk_source)
455        if not m:
456          raise RuntimeError(f"can't determine name of boot disk {disk_source}")
457        disk_name = m.group(1)
458        gce_disk: Disk = get_disk(self.project_id,
459                                  zone=self.zone,
460                                  disk_name=disk_name)
461        return gce_disk.source_image
462    return boot_disk_image

Get VM's boot disk image.

is_sole_tenant_vm: bool
464  @property
465  def is_sole_tenant_vm(self) -> bool:
466    return bool('nodeAffinities' in self._resource_data['scheduling'])
network: gcpdiag.queries.network.Network
468  @property
469  def network(self) -> network_q.Network:
470    # 'https://www.googleapis.com/compute/v1/projects/gcpdiag-gce1-aaaa/global/networks/default'
471    network_string = self._resource_data['networkInterfaces'][0]['network']
472    m = re.match(r'^.+/projects/([^/]+)/global/networks/([^/]+)$',
473                 network_string)
474    if not m:
475      raise RuntimeError("can't parse network string: %s" % network_string)
476    return network_q.get_network(m.group(1), m.group(2))
network_ips: List[Union[ipaddress.IPv4Address, ipaddress.IPv6Address]]
478  @property
479  def network_ips(self) -> List[network_q.IPv4AddrOrIPv6Addr]:
480    return [
481        ipaddress.ip_address(nic['networkIP'])
482        for nic in self._resource_data['networkInterfaces']
483    ]
get_network_interfaces
485  @property
486  def get_network_interfaces(self):
487    return self._resource_data['networkInterfaces']
subnetworks: List[gcpdiag.queries.network.Subnetwork]
489  @property
490  def subnetworks(self) -> List[network_q.Subnetwork]:
491    subnetworks = []
492    for nic in self._resource_data['networkInterfaces']:
493      subnetworks.append(network_q.get_subnetwork_from_url(nic['subnetwork']))
494    return subnetworks
routes: List[gcpdiag.queries.network.Route]
496  @property
497  def routes(self) -> List[network_q.Route]:
498    routes = []
499    for nic in self._resource_data['networkInterfaces']:
500      for route in network_q.get_routes(self.project_id):
501        if nic['network'] == route.network:
502          if route.tags == []:
503            routes.append(route)
504            continue
505          else:
506            temp = [x for x in self.tags if x in route.tags]
507            if len(temp) > 0:
508              routes.append(route)
509    return routes
def get_network_ip_for_instance_interface( self, network: str) -> Union[ipaddress.IPv4Network, ipaddress.IPv6Network, NoneType]:
511  def get_network_ip_for_instance_interface(
512      self, network: str) -> Optional[network_q.IPv4NetOrIPv6Net]:
513    """Get the network ip for a nic given a network name."""
514    for nic in self._resource_data['networkInterfaces']:
515      if nic.get('network') == network:
516        return ipaddress.ip_network(nic.get('networkIP'))
517    return None

Get the network ip for a nic given a network name.

def secure_boot_enabled(self) -> bool:
519  def secure_boot_enabled(self) -> bool:
520    if 'shieldedInstanceConfig' in self._resource_data:
521      return self._resource_data['shieldedInstanceConfig']['enableSecureBoot']
522    return False
access_scopes: List[str]
524  @property
525  def access_scopes(self) -> List[str]:
526    if 'serviceAccounts' in self._resource_data:
527      saccts = self._resource_data['serviceAccounts']
528      if isinstance(saccts, list) and len(saccts) >= 1:
529        return saccts[0].get('scopes', [])
530    return []
service_account: Optional[str]
532  @property
533  def service_account(self) -> Optional[str]:
534    if 'serviceAccounts' in self._resource_data:
535      saccts = self._resource_data['serviceAccounts']
536      if isinstance(saccts, list) and len(saccts) >= 1:
537        return saccts[0]['email']
538    return None
tags: List[str]
540  @property
541  def tags(self) -> List[str]:
542    if 'tags' in self._resource_data:
543      if 'items' in self._resource_data['tags']:
544        return self._resource_data['tags']['items']
545    return []
def get_metadata(self, key: str) -> str:
547  def get_metadata(self, key: str) -> str:
548    if not self._metadata_dict:
549      self._metadata_dict = {}
550      if ('metadata' in self._resource_data and
551          'items' in self._resource_data['metadata']):
552        for item in self._resource_data['metadata']['items']:
553          if 'key' in item and 'value' in item:
554            self._metadata_dict[item['key']] = item['value']
555    project_metadata = get_project_metadata(self.project_id)
556    return self._metadata_dict.get(key, project_metadata.get(key))
status: str
558  @property
559  def status(self) -> str:
560    """VM Status."""
561    return self._resource_data.get('status', None)

VM Status.

is_running: bool
563  @property
564  def is_running(self) -> bool:
565    """VM Status is indicated as running."""
566    return self._resource_data.get('status', False) == 'RUNNING'

VM Status is indicated as running.

mig: ManagedInstanceGroup
568  @property  # type: ignore
569  @caching.cached_api_call(in_memory=True)
570  def mig(self) -> ManagedInstanceGroup:
571    """Return ManagedInstanceGroup that owns this instance.
572
573    Throws AttributeError in case it isn't MIG-managed.
574    """
575
576    created_by = self.get_metadata('created-by')
577    if created_by is None:
578      raise AttributeError(f'instance {self.id} is not managed by a mig')
579
580    # Example created-by:
581    # pylint: disable=line-too-long
582    # "projects/12340002/zones/europe-west4-a/instanceGroupManagers/gke-gke1-default-pool-e5e20a34-grp"
583    # (note how it uses a project number and not a project id...)
584    created_by_match = re.match(
585        r'projects/([^/]+)/((?:regions|zones)/[^/]+/instanceGroupManagers/[^/]+)$',
586        created_by,
587    )
588    if not created_by_match:
589      raise AttributeError(f'instance {self.id} is not managed by a mig'
590                           f' (created-by={created_by})')
591    project = crm.get_project(created_by_match.group(1))
592
593    mig_self_link = ('https://www.googleapis.com/compute/v1/'
594                     f'projects/{project.id}/{created_by_match.group(2)}')
595
596    # Try to find a matching mig.
597    for mig in get_managed_instance_groups(
598        models.Context(project_id=self.project_id)).values():
599      if mig.self_link == mig_self_link:
600        return mig
601
602    raise AttributeError(f'instance {self.id} is not managed by a mig')

Return ManagedInstanceGroup that owns this instance.

Throws AttributeError in case it isn't MIG-managed.

labels: dict
604  @property
605  def labels(self) -> dict:
606    return self._resource_data.get('labels', {})
class Disk(gcpdiag.models.Resource):
609class Disk(models.Resource):
610  """Represents a GCE disk."""
611
612  _resource_data: dict
613
614  def __init__(self, project_id, resource_data):
615    super().__init__(project_id=project_id)
616    self._resource_data = resource_data
617
618  @property
619  def id(self) -> str:
620    return self._resource_data['id']
621
622  @property
623  def name(self) -> str:
624    return self._resource_data['name']
625
626  @property
627  def type(self) -> str:
628    disk_type = re.search(r'/diskTypes/([^/]+)$', self._resource_data['type'])
629    if not disk_type:
630      raise RuntimeError("can't determine type of the disk %s (%s)" %
631                         (self.name, self._resource_data['type']))
632    return disk_type.group(1)
633
634  @property
635  def users(self) -> list:
636    pattern = r'/instances/(.+)$'
637    # Extracting the instances
638    instances = []
639    for i in self._resource_data.get('users', []):
640      m = re.search(pattern, i)
641      if m:
642        instances.append(m.group(1))
643    return instances
644
645  @property
646  def zone(self) -> str:
647    m = re.search(r'/zones/([^/]+)$', self._resource_data['zone'])
648    if not m:
649      raise RuntimeError("can't determine zone of disk %s (%s)" %
650                         (self.name, self._resource_data['zone']))
651    return m.group(1)
652
653  @property
654  def source_image(self) -> str:
655    return self._resource_data.get('sourceImage', '')
656
657  @property
658  def full_path(self) -> str:
659    result = re.match(
660        r'https://www.googleapis.com/compute/v1/(.*)',
661        self._resource_data['selfLink'],
662    )
663    if result:
664      return result.group(1)
665    else:
666      return '>> ' + self._resource_data['selfLink']
667
668  @property
669  def short_path(self) -> str:
670    return f'{self.project_id}/{self.name}'
671
672  @property
673  def bootable(self) -> bool:
674    return 'guestOsFeatures' in self._resource_data
675
676  @property
677  def in_use(self) -> bool:
678    return 'users' in self._resource_data
679
680  @property
681  def size(self) -> int:
682    return self._resource_data['sizeGb']
683
684  @property
685  def provisionediops(self) -> Optional[int]:
686    return self._resource_data.get('provisionedIops')
687
688  @property
689  def has_snapshot_schedule(self) -> bool:
690    return 'resourcePolicies' in self._resource_data

Represents a GCE disk.

Disk(project_id, resource_data)
614  def __init__(self, project_id, resource_data):
615    super().__init__(project_id=project_id)
616    self._resource_data = resource_data
id: str
618  @property
619  def id(self) -> str:
620    return self._resource_data['id']
name: str
622  @property
623  def name(self) -> str:
624    return self._resource_data['name']
type: str
626  @property
627  def type(self) -> str:
628    disk_type = re.search(r'/diskTypes/([^/]+)$', self._resource_data['type'])
629    if not disk_type:
630      raise RuntimeError("can't determine type of the disk %s (%s)" %
631                         (self.name, self._resource_data['type']))
632    return disk_type.group(1)
users: list
634  @property
635  def users(self) -> list:
636    pattern = r'/instances/(.+)$'
637    # Extracting the instances
638    instances = []
639    for i in self._resource_data.get('users', []):
640      m = re.search(pattern, i)
641      if m:
642        instances.append(m.group(1))
643    return instances
zone: str
645  @property
646  def zone(self) -> str:
647    m = re.search(r'/zones/([^/]+)$', self._resource_data['zone'])
648    if not m:
649      raise RuntimeError("can't determine zone of disk %s (%s)" %
650                         (self.name, self._resource_data['zone']))
651    return m.group(1)
source_image: str
653  @property
654  def source_image(self) -> str:
655    return self._resource_data.get('sourceImage', '')
full_path: str
657  @property
658  def full_path(self) -> str:
659    result = re.match(
660        r'https://www.googleapis.com/compute/v1/(.*)',
661        self._resource_data['selfLink'],
662    )
663    if result:
664      return result.group(1)
665    else:
666      return '>> ' + self._resource_data['selfLink']

Returns the full path of this resource.

Example: 'projects/gcpdiag-gke-1-9b90/zones/europe-west4-a/clusters/gke1'

short_path: str
668  @property
669  def short_path(self) -> str:
670    return f'{self.project_id}/{self.name}'

Returns the short name for this resource.

Note that it isn't clear from this name what kind of resource it is.

Example: 'gke1'

bootable: bool
672  @property
673  def bootable(self) -> bool:
674    return 'guestOsFeatures' in self._resource_data
in_use: bool
676  @property
677  def in_use(self) -> bool:
678    return 'users' in self._resource_data
size: int
680  @property
681  def size(self) -> int:
682    return self._resource_data['sizeGb']
provisionediops: Optional[int]
684  @property
685  def provisionediops(self) -> Optional[int]:
686    return self._resource_data.get('provisionedIops')
has_snapshot_schedule: bool
688  @property
689  def has_snapshot_schedule(self) -> bool:
690    return 'resourcePolicies' in self._resource_data
@caching.cached_api_call(in_memory=True)
def get_gce_zones(project_id: str) -> Set[str]:
693@caching.cached_api_call(in_memory=True)
694def get_gce_zones(project_id: str) -> Set[str]:
695  try:
696    gce_api = apis.get_api('compute', 'v1', project_id)
697    logging.debug('listing gce zones of project %s', project_id)
698    request = gce_api.zones().list(project=project_id)
699    response = request.execute(num_retries=config.API_RETRIES)
700    if not response or 'items' not in response:
701      return set()
702    return {item['name'] for item in response['items'] if 'name' in item}
703  except googleapiclient.errors.HttpError as err:
704    raise utils.GcpApiError(err) from err
def get_gce_public_licences(project_id: str) -> List[str]:
707def get_gce_public_licences(project_id: str) -> List[str]:
708  """Returns a list of licenses based on publicly available image project"""
709  licenses = []
710  gce_api = apis.get_api('compute', 'v1', project_id)
711  logging.debug('listing licenses of project %s', project_id)
712  request = gce_api.licenses().list(project=project_id)
713  while request is not None:
714    response = request.execute()
715    for license_ in response['items']:
716      formatted_license = license_['selfLink'].partition('/global/licenses/')[2]
717      licenses.append(formatted_license)
718    request = gce_api.licenses().list_next(previous_request=request,
719                                           previous_response=response)
720  return licenses

Returns a list of licenses based on publicly available image project

def get_instance( project_id: str, zone: str, instance_name: str) -> Instance:
723def get_instance(project_id: str, zone: str, instance_name: str) -> Instance:
724  """Returns instance object matching instance name and zone"""
725  compute = apis.get_api('compute', 'v1', project_id)
726  request = compute.instances().get(project=project_id,
727                                    zone=zone,
728                                    instance=instance_name)
729
730  response = request.execute(num_retries=config.API_RETRIES)
731  return Instance(project_id, resource_data=response)

Returns instance object matching instance name and zone

@caching.cached_api_call(in_memory=True)
def get_global_operations( project: str, filter_str: Optional[str] = None, order_by: Optional[str] = None, max_results: Optional[int] = None, service_project_number: Optional[int] = None) -> List[Dict[str, Any]]:
734@caching.cached_api_call(in_memory=True)
735def get_global_operations(
736    project: str,
737    filter_str: Optional[str] = None,
738    order_by: Optional[str] = None,
739    max_results: Optional[int] = None,
740    service_project_number: Optional[int] = None,
741) -> List[Dict[str, Any]]:
742  """Returns global operations object matching project id."""
743  compute = apis.get_api('compute', 'v1', project)
744  logging.debug(('searching compute global operations'
745                 'logs in project %s with filter %s'), project, filter_str)
746  operations: List[Dict[str, Any]] = []
747  request = compute.globalOperations().aggregatedList(
748      project=project,
749      filter=filter_str,
750      orderBy=order_by,
751      maxResults=max_results,
752      serviceProjectNumber=service_project_number,
753      returnPartialSuccess=True,
754  )
755  while request:
756    response = request.execute(num_retries=config.API_RETRIES)
757    operations_by_regions = response.get('items', {})
758    for _, data in operations_by_regions.items():
759      if 'operations' not in data:
760        continue
761      operations.extend(data['operations'])
762    request = compute.globalOperations().aggregatedList_next(
763        previous_request=request, previous_response=response)
764  return operations

Returns global operations object matching project id.

@caching.cached_api_call(in_memory=True)
def get_disk(project_id: str, zone: str, disk_name: str) -> Disk:
767@caching.cached_api_call(in_memory=True)
768def get_disk(project_id: str, zone: str, disk_name: str) -> Disk:
769  """Returns disk object matching disk name and zone."""
770  compute = apis.get_api('compute', 'v1', project_id)
771  request = compute.disks().get(project=project_id, zone=zone, disk=disk_name)
772  response = request.execute(num_retries=config.API_RETRIES)
773  return Disk(project_id, resource_data=response)

Returns disk object matching disk name and zone.

@caching.cached_api_call(in_memory=True)
def get_instances( context: gcpdiag.models.Context) -> Mapping[str, Instance]:
776@caching.cached_api_call(in_memory=True)
777def get_instances(context: models.Context) -> Mapping[str, Instance]:
778  """Get a list of Instance matching the given context, indexed by instance id."""
779
780  instances: Dict[str, Instance] = {}
781  if not apis.is_enabled(context.project_id, 'compute'):
782    return instances
783  gce_api = apis.get_api('compute', 'v1', context.project_id)
784  request = gce_api.instances().aggregatedList(project=context.project_id,
785                                               returnPartialSuccess=True)
786  logging.debug('listing gce instances of project %s', context.project_id)
787  while request:  # Continue as long as there are pages
788    response = request.execute(num_retries=config.API_RETRIES)
789    instances_by_zones = response.get('items', {})
790    for _, data_ in instances_by_zones.items():
791      if 'instances' not in data_:
792        continue
793      for instance in data_['instances']:
794        result = re.match(
795            r'https://www.googleapis.com/compute/v1/projects/[^/]+/zones/([^/]+)/',
796            instance['selfLink'],
797        )
798        if not result:
799          logging.error(
800              "instance %s selfLink didn't match regexp: %s",
801              instance['id'],
802              instance['selfLink'],
803          )
804          continue
805        zone = result.group(1)
806        labels = instance.get('labels', {})
807        resource = instance.get('name', '')
808        if not context.match_project_resource(
809            location=zone, labels=labels, resource=resource):
810          continue
811        instances.update({
812            instance['id']:
813                Instance(project_id=context.project_id, resource_data=instance)
814        })
815    request = gce_api.instances().aggregatedList_next(
816        previous_request=request, previous_response=response)
817  return instances

Get a list of Instance matching the given context, indexed by instance id.

@caching.cached_api_call(in_memory=True)
def get_instance_groups( context: gcpdiag.models.Context) -> Mapping[str, InstanceGroup]:
820@caching.cached_api_call(in_memory=True)
821def get_instance_groups(context: models.Context) -> Mapping[str, InstanceGroup]:
822  """Get a list of InstanceGroups matching the given context, indexed by name."""
823  groups: Dict[str, InstanceGroup] = {}
824  if not apis.is_enabled(context.project_id, 'compute'):
825    return groups
826  gce_api = apis.get_api('compute', 'v1', context.project_id)
827  request = gce_api.instanceGroups().aggregatedList(project=context.project_id,
828                                                    returnPartialSuccess=True)
829  logging.debug('listing gce instance groups of project %s', context.project_id)
830  while request:  # Continue as long as there are pages
831    response = request.execute(num_retries=config.API_RETRIES)
832    groups_by_zones = response.get('items', {})
833    for _, data_ in groups_by_zones.items():
834      if 'instanceGroups' not in data_:
835        continue
836      for group in data_['instanceGroups']:
837        result = re.match(
838            r'https://www.googleapis.com/compute/v1/projects/[^/]+/zones/([^/]+)',
839            group['selfLink'],
840        )
841        if not result:
842          logging.error(
843              "instance %s selfLink didn't match regexp: %s",
844              group['id'],
845              group['selfLink'],
846          )
847          continue
848        zone = result.group(1)
849        labels = group.get('labels', {})
850        resource = group.get('name', '')
851        if not context.match_project_resource(
852            location=zone, labels=labels, resource=resource):
853          continue
854        instance_group = InstanceGroup(context.project_id, resource_data=group)
855        groups[instance_group.full_path] = instance_group
856    request = gce_api.instanceGroups().aggregatedList_next(
857        previous_request=request, previous_response=response)
858  return groups

Get a list of InstanceGroups matching the given context, indexed by name.

@caching.cached_api_call(in_memory=True)
def get_managed_instance_groups( context: gcpdiag.models.Context) -> Mapping[int, ManagedInstanceGroup]:
861@caching.cached_api_call(in_memory=True)
862def get_managed_instance_groups(
863    context: models.Context,) -> Mapping[int, ManagedInstanceGroup]:
864  """Get a list of zonal ManagedInstanceGroups matching the given context, indexed by mig id."""
865
866  migs: Dict[int, ManagedInstanceGroup] = {}
867  if not apis.is_enabled(context.project_id, 'compute'):
868    return migs
869  gce_api = apis.get_api('compute', 'v1', context.project_id)
870  request = gce_api.instanceGroupManagers().aggregatedList(
871      project=context.project_id, returnPartialSuccess=True)
872  logging.debug('listing zonal managed instance groups of project %s',
873                context.project_id)
874  while request:  # Continue as long as there are pages
875    response = request.execute(num_retries=config.API_RETRIES)
876    migs_by_zones = response.get('items', {})
877    for _, data_ in migs_by_zones.items():
878      if 'instanceGroupManagers' not in data_:
879        continue
880      for mig in data_['instanceGroupManagers']:
881        result = re.match(
882            r'https://www.googleapis.com/compute/v1/projects/[^/]+/(?:regions|zones)/([^/]+)/',
883            mig['selfLink'],
884        )
885        if not result:
886          logging.error(
887              "mig %s selfLink didn't match regexp: %s",
888              mig['name'],
889              mig['selfLink'],
890          )
891          continue
892        location = result.group(1)
893        labels = mig.get('labels', {})
894        resource = mig.get('name', '')
895        if not context.match_project_resource(
896            location=location, labels=labels, resource=resource):
897          continue
898        migs[mig['id']] = ManagedInstanceGroup(project_id=context.project_id,
899                                               resource_data=mig)
900    request = gce_api.instanceGroupManagers().aggregatedList_next(
901        previous_request=request, previous_response=response)
902  return migs

Get a list of zonal ManagedInstanceGroups matching the given context, indexed by mig id.

@caching.cached_api_call(in_memory=True)
def get_region_managed_instance_groups( context: gcpdiag.models.Context) -> Mapping[int, ManagedInstanceGroup]:
905@caching.cached_api_call(in_memory=True)
906def get_region_managed_instance_groups(
907    context: models.Context,) -> Mapping[int, ManagedInstanceGroup]:
908  """Get a list of regional ManagedInstanceGroups matching the given context, indexed by mig id."""
909
910  migs: Dict[int, ManagedInstanceGroup] = {}
911  if not apis.is_enabled(context.project_id, 'compute'):
912    return migs
913  gce_api = apis.get_api('compute', 'v1', context.project_id)
914  requests = [
915      gce_api.regionInstanceGroupManagers().list(project=context.project_id,
916                                                 region=r.name)
917      for r in get_all_regions(context.project_id)
918  ]
919  logging.debug(
920      'listing regional managed instance groups of project %s',
921      context.project_id,
922  )
923  items = apis_utils.multi_list_all(
924      requests=requests,
925      next_function=gce_api.regionInstanceGroupManagers().list_next,
926  )
927  for i in items:
928    result = re.match(
929        r'https://www.googleapis.com/compute/v1/projects/[^/]+/(?:regions)/([^/]+)/',
930        i['selfLink'],
931    )
932    if not result:
933      logging.error("mig %s selfLink didn't match regexp: %s", i['name'],
934                    i['selfLink'])
935      continue
936    location = result.group(1)
937    labels = i.get('labels', {})
938    name = i.get('name', '')
939    if not context.match_project_resource(
940        location=location, labels=labels, resource=name):
941      continue
942    migs[i['id']] = ManagedInstanceGroup(project_id=context.project_id,
943                                         resource_data=i)
944  return migs

Get a list of regional ManagedInstanceGroups matching the given context, indexed by mig id.

@caching.cached_api_call
def get_instance_templates(project_id: str) -> Mapping[str, InstanceTemplate]:
947@caching.cached_api_call
948def get_instance_templates(project_id: str) -> Mapping[str, InstanceTemplate]:
949  logging.info('fetching instance templates')
950  templates = {}
951  gce_api = apis.get_api('compute', 'v1', project_id)
952  request = gce_api.instanceTemplates().list(
953      project=project_id,
954      returnPartialSuccess=True,
955      # Fetch only a subset of the fields to improve performance.
956      fields=('items/name, items/properties/tags,'
957              ' items/properties/networkInterfaces,'
958              ' items/properties/serviceAccounts, items/properties/metadata'),
959  )
960  for t in apis_utils.list_all(
961      request, next_function=gce_api.instanceTemplates().list_next):
962    instance_template = InstanceTemplate(project_id, t)
963    templates[instance_template.full_path] = instance_template
964  return templates
@caching.cached_api_call
def get_project_metadata(project_id) -> Mapping[str, str]:
967@caching.cached_api_call
968def get_project_metadata(project_id) -> Mapping[str, str]:
969  gce_api = apis.get_api('compute', 'v1', project_id)
970  logging.debug('fetching metadata of project %s\n', project_id)
971  query = gce_api.projects().get(project=project_id)
972  try:
973    response = query.execute(num_retries=config.API_RETRIES)
974  except googleapiclient.errors.HttpError as err:
975    raise utils.GcpApiError(err) from err
976
977  mapped_metadata: Dict[str, str] = {}
978  metadata = response.get('commonInstanceMetadata')
979  if metadata and 'items' in metadata:
980    for m_item in metadata['items']:
981      mapped_metadata[m_item.get('key')] = m_item.get('value')
982  return mapped_metadata
@caching.cached_api_call
def get_instances_serial_port_output(context: gcpdiag.models.Context):
 985@caching.cached_api_call
 986def get_instances_serial_port_output(context: models.Context):
 987  """Get a list of serial port output for instances
 988
 989  which matches the given context, running and is not
 990  exported to cloud logging.
 991  """
 992  # Create temp storage (diskcache.Deque) for output
 993  deque = caching.get_tmp_deque('tmp-gce-serial-output-')
 994  if not apis.is_enabled(context.project_id, 'compute'):
 995    return deque
 996  gce_api = apis.get_api('compute', 'v1', context.project_id)
 997
 998  # Serial port output are rolled over on day 7 and limited to 1MB.
 999  # Fetching serial outputs are very expensive so optimize to fetch.
1000  # Only relevant instances as storage size can grow drastically for
1001  # massive projects. Think 1MB * N where N is some large number.
1002  requests = [
1003      gce_api.instances().getSerialPortOutput(
1004          project=i.project_id,
1005          zone=i.zone,
1006          instance=i.id,
1007          # To get all 1mb output
1008          start=-1000000,
1009      )
1010      for i in get_instances(context).values()
1011      # fetch running instances that do not export to cloud logging
1012      if not i.is_serial_port_logging_enabled() and i.is_running
1013  ]
1014  requests_start_time = datetime.now()
1015  # Note: We are limited to 1000 calls in a single batch request.
1016  # We have to use multiple batch requests in batches of 1000
1017  # https://github.com/googleapis/google-api-python-client/blob/main/docs/batch.md
1018  batch_size = 1000
1019  for i in range(0, len(requests), batch_size):
1020    batch_requests = requests[i:i + batch_size]
1021    for _, response, exception in apis_utils.batch_execute_all(
1022        api=gce_api, requests=batch_requests):
1023      if exception:
1024        if isinstance(exception, googleapiclient.errors.HttpError):
1025          raise utils.GcpApiError(exception) from exception
1026        else:
1027          raise exception
1028
1029      if response:
1030        result = re.match(
1031            r'https://www.googleapis.com/compute/v1/projects/([^/]+)/zones/[^/]+/instances/([^/]+)',
1032            response['selfLink'],
1033        )
1034        if not result:
1035          logging.error("instance selfLink didn't match regexp: %s",
1036                        response['selfLink'])
1037          return
1038
1039        project_id = result.group(1)
1040        instance_id = result.group(2)
1041        deque.appendleft(
1042            SerialPortOutput(
1043                project_id=project_id,
1044                instance_id=instance_id,
1045                contents=response['contents'].splitlines(),
1046            ))
1047  requests_end_time = datetime.now()
1048  logging.debug(
1049      'total serial logs processing time: %s, number of instances: %s',
1050      requests_end_time - requests_start_time,
1051      len(requests),
1052  )
1053  return deque

Get a list of serial port output for instances

which matches the given context, running and is not exported to cloud logging.

@caching.cached_api_call
def get_instance_serial_port_output( project_id, zone, instance_name) -> Optional[SerialPortOutput]:
1056@caching.cached_api_call
1057def get_instance_serial_port_output(
1058    project_id, zone, instance_name) -> Optional[SerialPortOutput]:
1059  """Get a list of serial port output for instances
1060
1061  which matches the given context, running and is not
1062  exported to cloud logging.
1063  """
1064  # Create temp storage (diskcache.Deque) for output
1065  if not apis.is_enabled(project_id, 'compute'):
1066    return None
1067  gce_api = apis.get_api('compute', 'v1', project_id)
1068
1069  request = gce_api.instances().getSerialPortOutput(
1070      project=project_id,
1071      zone=zone,
1072      instance=instance_name,
1073      # To get all 1mb output
1074      start=-1000000,
1075  )
1076  try:
1077    response = request.execute(num_retries=config.API_RETRIES)
1078  except googleapiclient.errors.HttpError:
1079    return None
1080
1081  if response:
1082    result = re.match(
1083        r'https://www.googleapis.com/compute/v1/projects/([^/]+)/zones/[^/]+/instances/([^/]+)',
1084        response['selfLink'],
1085    )
1086    if not result:
1087      logging.error("instance selfLink didn't match regexp: %s",
1088                    response['selfLink'])
1089      return None
1090
1091    project_id = result.group(1)
1092    instance_id = result.group(2)
1093    return SerialPortOutput(
1094        project_id,
1095        instance_id=instance_id,
1096        contents=response['contents'].splitlines(),
1097    )
1098  return None

Get a list of serial port output for instances

which matches the given context, running and is not exported to cloud logging.

class Region(gcpdiag.models.Resource):
1101class Region(models.Resource):
1102  """Represents a GCE Region."""
1103
1104  _resource_data: dict
1105
1106  def __init__(self, project_id, resource_data):
1107    super().__init__(project_id=project_id)
1108    self._resource_data = resource_data
1109
1110  @property
1111  def self_link(self) -> str:
1112    return self._resource_data['selfLink']
1113
1114  @property
1115  def full_path(self) -> str:
1116    result = re.match(r'https://www.googleapis.com/compute/v1/(.*)',
1117                      self.self_link)
1118    if result:
1119      return result.group(1)
1120    else:
1121      return f'>> {self.self_link}'
1122
1123  @property
1124  def name(self) -> str:
1125    return self._resource_data['name']

Represents a GCE Region.

Region(project_id, resource_data)
1106  def __init__(self, project_id, resource_data):
1107    super().__init__(project_id=project_id)
1108    self._resource_data = resource_data
full_path: str
1114  @property
1115  def full_path(self) -> str:
1116    result = re.match(r'https://www.googleapis.com/compute/v1/(.*)',
1117                      self.self_link)
1118    if result:
1119      return result.group(1)
1120    else:
1121      return f'>> {self.self_link}'

Returns the full path of this resource.

Example: 'projects/gcpdiag-gke-1-9b90/zones/europe-west4-a/clusters/gke1'

name: str
1123  @property
1124  def name(self) -> str:
1125    return self._resource_data['name']
@caching.cached_api_call
def get_all_regions(project_id: str) -> Iterable[Region]:
1128@caching.cached_api_call
1129def get_all_regions(project_id: str) -> Iterable[Region]:
1130  """Return list of all regions
1131
1132  Args:
1133      project_id (str): project id for this request
1134
1135  Raises:
1136      utils.GcpApiError: Raises GcpApiError in case of query issues
1137
1138  Returns:
1139      Iterable[Region]: Return list of all regions
1140  """
1141  try:
1142    gce_api = apis.get_api('compute', 'v1', project_id)
1143    request = gce_api.regions().list(project=project_id)
1144    response = request.execute(num_retries=config.API_RETRIES)
1145    if not response or 'items' not in response:
1146      return set()
1147
1148    return {
1149        Region(project_id, item) for item in response['items'] if 'name' in item
1150    }
1151  except googleapiclient.errors.HttpError as err:
1152    raise utils.GcpApiError(err) from err

Return list of all regions

Arguments:
  • project_id (str): project id for this request
Raises:
  • utils.GcpApiError: Raises GcpApiError in case of query issues
Returns:

Iterable[Region]: Return list of all regions

def get_regions_with_instances(context: gcpdiag.models.Context) -> Iterable[Region]:
1155def get_regions_with_instances(context: models.Context) -> Iterable[Region]:
1156  """Return list of regions with instances
1157
1158  Args:
1159      context (models.Context): context for this request
1160
1161  Returns:
1162      Iterable[Region]: Return list of regions which contains instances
1163  """
1164
1165  regions_of_instances = {i.region for i in get_instances(context).values()}
1166
1167  all_regions = get_all_regions(context.project_id)
1168  if not all_regions:
1169    return set()
1170
1171  return {r for r in all_regions if r.name in regions_of_instances}

Return list of regions with instances

Arguments:
  • context (models.Context): context for this request
Returns:

Iterable[Region]: Return list of regions which contains instances

@caching.cached_api_call
def get_all_disks(project_id: str) -> Iterable[Disk]:
1174@caching.cached_api_call
1175def get_all_disks(project_id: str) -> Iterable[Disk]:
1176  # Fetching only Zonal Disks(Regional disks exempted)
1177  try:
1178    gce_api = apis.get_api('compute', 'v1', project_id)
1179    requests = [
1180        gce_api.disks().list(project=project_id, zone=zone)
1181        for zone in get_gce_zones(project_id)
1182    ]
1183    logging.debug('listing gce disks of project %s', project_id)
1184    items = apis_utils.multi_list_all(
1185        requests=requests,
1186        next_function=gce_api.disks().list_next,
1187    )
1188
1189    return {Disk(project_id, item) for item in items}
1190
1191  except googleapiclient.errors.HttpError as err:
1192    raise utils.GcpApiError(err) from err
@caching.cached_api_call
def get_all_disks_of_instance(project_id: str, zone: str, instance_name: str) -> dict:
1195@caching.cached_api_call
1196def get_all_disks_of_instance(project_id: str, zone: str,
1197                              instance_name: str) -> dict:
1198  # Fetching only Zonal Disks(Regional disks exempted) attached to an instance
1199  try:
1200    gce_api = apis.get_api('compute', 'v1', project_id)
1201    requests = [gce_api.disks().list(project=project_id, zone=zone)]
1202    logging.debug(
1203        'listing gce disks attached to instance %s in project %s',
1204        instance_name,
1205        project_id,
1206    )
1207    items = apis_utils.multi_list_all(
1208        requests=requests,
1209        next_function=gce_api.disks().list_next,
1210    )
1211    all_disk_list = {Disk(project_id, item) for item in items}
1212    disk_list = {}
1213    for disk in all_disk_list:
1214      if disk.users == [instance_name]:
1215        disk_list[disk.name] = disk
1216    return disk_list
1217
1218  except googleapiclient.errors.HttpError as err:
1219    raise utils.GcpApiError(err) from err
class InstanceEffectiveFirewalls(gcpdiag.queries.network.EffectiveFirewalls):
1222class InstanceEffectiveFirewalls(network_q.EffectiveFirewalls):
1223  """Effective firewall rules for a network interface on a VM instance.
1224
1225  Includes org/folder firewall policies).
1226  """
1227
1228  _instance: Instance
1229  _nic: str
1230
1231  def __init__(self, instance, nic, resource_data):
1232    super().__init__(resource_data)
1233    self._instance = instance
1234    self._nic = nic

Effective firewall rules for a network interface on a VM instance.

Includes org/folder firewall policies).

InstanceEffectiveFirewalls(instance, nic, resource_data)
1231  def __init__(self, instance, nic, resource_data):
1232    super().__init__(resource_data)
1233    self._instance = instance
1234    self._nic = nic
@caching.cached_api_call(in_memory=True)
def get_instance_interface_effective_firewalls( instance: Instance, nic: str) -> InstanceEffectiveFirewalls:
1237@caching.cached_api_call(in_memory=True)
1238def get_instance_interface_effective_firewalls(
1239    instance: Instance, nic: str) -> InstanceEffectiveFirewalls:
1240  """Return effective firewalls for a network interface on the instance."""
1241  compute = apis.get_api('compute', 'v1', instance.project_id)
1242  request = compute.instances().getEffectiveFirewalls(
1243      project=instance.project_id,
1244      zone=instance.zone,
1245      instance=instance.name,
1246      networkInterface=nic,
1247  )
1248  response = request.execute(num_retries=config.API_RETRIES)
1249  return InstanceEffectiveFirewalls(Instance, nic, response)

Return effective firewalls for a network interface on the instance.

def is_project_serial_port_logging_enabled(project_id: str) -> bool:
1252def is_project_serial_port_logging_enabled(project_id: str) -> bool:
1253  if not apis.is_enabled(project_id, 'compute'):
1254    return False
1255
1256  value = get_project_metadata(
1257      project_id=project_id).get('serial-port-logging-enable')
1258  return bool(value and value.upper() in POSITIVE_BOOL_VALUES)
def is_serial_port_buffer_enabled():
1261def is_serial_port_buffer_enabled():
1262  return config.get('enable_gce_serial_buffer')
class SerialOutputQuery:
1279class SerialOutputQuery:
1280  """A serial output job that was started with prefetch_logs()."""
1281
1282  job: _SerialOutputJob
1283
1284  def __init__(self, job):
1285    self.job = job
1286
1287  @property
1288  def entries(self) -> Sequence:
1289    if not self.job.future:
1290      raise RuntimeError("Fetching serial logs wasn't executed. did you call"
1291                         ' execute_get_serial_port_output()?')
1292    elif self.job.future.running():
1293      logging.debug(
1294          'waiting for serial output results for project: %s',
1295          self.job.context.project_id,
1296      )
1297    return self.job.future.result()

A serial output job that was started with prefetch_logs().

SerialOutputQuery(job)
1284  def __init__(self, job):
1285    self.job = job
job: gcpdiag.queries.gce._SerialOutputJob
entries: Sequence
1287  @property
1288  def entries(self) -> Sequence:
1289    if not self.job.future:
1290      raise RuntimeError("Fetching serial logs wasn't executed. did you call"
1291                         ' execute_get_serial_port_output()?')
1292    elif self.job.future.running():
1293      logging.debug(
1294          'waiting for serial output results for project: %s',
1295          self.job.context.project_id,
1296      )
1297    return self.job.future.result()
jobs_todo: Dict[gcpdiag.models.Context, gcpdiag.queries.gce._SerialOutputJob] = {}
def execute_fetch_serial_port_outputs(executor: concurrent.futures._base.Executor):
1303def execute_fetch_serial_port_outputs(executor: concurrent.futures.Executor):
1304  # start a thread to fetch serial log; processing logs can be large
1305  # depending on he number of instances in the project which aren't
1306  # logging to cloud logging. currently expects only one job but
1307  # implementing it so support for multiple projects is possible.
1308  global jobs_todo
1309  jobs_executing = jobs_todo
1310  jobs_todo = {}
1311  for job in jobs_executing.values():
1312    job.future = executor.submit(get_instances_serial_port_output, job.context)
def fetch_serial_port_outputs(context: gcpdiag.models.Context) -> SerialOutputQuery:
1315def fetch_serial_port_outputs(context: models.Context) -> SerialOutputQuery:
1316  # Aggregate by context
1317  job = jobs_todo.setdefault(context, _SerialOutputJob(context=context))
1318  return SerialOutputQuery(job=job)
class HealthCheck(gcpdiag.models.Resource):
1322class HealthCheck(models.Resource):
1323  """A Health Check resource."""
1324
1325  _resource_data: dict
1326  _type: str
1327
1328  def __init__(self, project_id, resource_data):
1329    super().__init__(project_id=project_id)
1330    self._resource_data = resource_data
1331
1332  @property
1333  def name(self) -> str:
1334    return self._resource_data['name']
1335
1336  @property
1337  def full_path(self) -> str:
1338    result = re.match(r'https://www.googleapis.com/compute/v1/(.*)',
1339                      self.self_link)
1340    if result:
1341      return result.group(1)
1342    else:
1343      return f'>> {self.self_link}'
1344
1345  @property
1346  def short_path(self) -> str:
1347    path = self.project_id + '/' + self.name
1348    return path
1349
1350  @property
1351  def self_link(self) -> str:
1352    return self._resource_data['selfLink']
1353
1354  @property
1355  def is_log_enabled(self) -> bool:
1356    try:
1357      log_config = self._resource_data.get('logConfig', False)
1358      if log_config and log_config['enable']:
1359        return True
1360    except KeyError:
1361      return False
1362    return False
1363
1364  @property
1365  def region(self):
1366    url = self._resource_data.get('region', '')
1367    match = re.search(r'/([^/]+)/?$', url)
1368    if match:
1369      region = match.group(1)
1370      return region
1371    return None
1372
1373  @property
1374  def type(self) -> str:
1375    return self._resource_data['type']
1376
1377  @property
1378  def request_path(self) -> str:
1379    return self.get_health_check_property('requestPath', '/')
1380
1381  @property
1382  def request(self) -> str:
1383    return self.get_health_check_property('request')
1384
1385  @property
1386  def response(self) -> str:
1387    return self.get_health_check_property('response')
1388
1389  @property
1390  def port(self) -> int:
1391    return self.get_health_check_property('port')
1392
1393  @property
1394  def port_specification(self) -> str:
1395    return self.get_health_check_property('portSpecification')
1396
1397  @property
1398  def timeout_sec(self) -> int:
1399    return self._resource_data.get('timeoutSec', 5)
1400
1401  def get_health_check_property(self, property_name: str, default_value=None):
1402    health_check_types = {
1403        'HTTP': 'httpHealthCheck',
1404        'HTTPS': 'httpsHealthCheck',
1405        'HTTP2': 'http2HealthCheck',
1406        'TCP': 'tcpHealthCheck',
1407        'SSL': 'sslHealthCheck',
1408        'GRPC': 'grpcHealthCheck',
1409    }
1410    if self.type in health_check_types:
1411      health_check_data = self._resource_data.get(health_check_types[self.type])
1412      if health_check_data:
1413        return health_check_data.get(property_name) or default_value
1414    return default_value

A Health Check resource.

HealthCheck(project_id, resource_data)
1328  def __init__(self, project_id, resource_data):
1329    super().__init__(project_id=project_id)
1330    self._resource_data = resource_data
name: str
1332  @property
1333  def name(self) -> str:
1334    return self._resource_data['name']
full_path: str
1336  @property
1337  def full_path(self) -> str:
1338    result = re.match(r'https://www.googleapis.com/compute/v1/(.*)',
1339                      self.self_link)
1340    if result:
1341      return result.group(1)
1342    else:
1343      return f'>> {self.self_link}'

Returns the full path of this resource.

Example: 'projects/gcpdiag-gke-1-9b90/zones/europe-west4-a/clusters/gke1'

short_path: str
1345  @property
1346  def short_path(self) -> str:
1347    path = self.project_id + '/' + self.name
1348    return path

Returns the short name for this resource.

Note that it isn't clear from this name what kind of resource it is.

Example: 'gke1'

is_log_enabled: bool
1354  @property
1355  def is_log_enabled(self) -> bool:
1356    try:
1357      log_config = self._resource_data.get('logConfig', False)
1358      if log_config and log_config['enable']:
1359        return True
1360    except KeyError:
1361      return False
1362    return False
region
1364  @property
1365  def region(self):
1366    url = self._resource_data.get('region', '')
1367    match = re.search(r'/([^/]+)/?$', url)
1368    if match:
1369      region = match.group(1)
1370      return region
1371    return None
type: str
1373  @property
1374  def type(self) -> str:
1375    return self._resource_data['type']
request_path: str
1377  @property
1378  def request_path(self) -> str:
1379    return self.get_health_check_property('requestPath', '/')
request: str
1381  @property
1382  def request(self) -> str:
1383    return self.get_health_check_property('request')
response: str
1385  @property
1386  def response(self) -> str:
1387    return self.get_health_check_property('response')
port: int
1389  @property
1390  def port(self) -> int:
1391    return self.get_health_check_property('port')
port_specification: str
1393  @property
1394  def port_specification(self) -> str:
1395    return self.get_health_check_property('portSpecification')
timeout_sec: int
1397  @property
1398  def timeout_sec(self) -> int:
1399    return self._resource_data.get('timeoutSec', 5)
def get_health_check_property(self, property_name: str, default_value=None):
1401  def get_health_check_property(self, property_name: str, default_value=None):
1402    health_check_types = {
1403        'HTTP': 'httpHealthCheck',
1404        'HTTPS': 'httpsHealthCheck',
1405        'HTTP2': 'http2HealthCheck',
1406        'TCP': 'tcpHealthCheck',
1407        'SSL': 'sslHealthCheck',
1408        'GRPC': 'grpcHealthCheck',
1409    }
1410    if self.type in health_check_types:
1411      health_check_data = self._resource_data.get(health_check_types[self.type])
1412      if health_check_data:
1413        return health_check_data.get(property_name) or default_value
1414    return default_value
@caching.cached_api_call(in_memory=True)
def get_health_check(project_id: str, health_check: str, region: str = None) -> object:
1417@caching.cached_api_call(in_memory=True)
1418def get_health_check(project_id: str,
1419                     health_check: str,
1420                     region: str = None) -> object:
1421  logging.debug('fetching health check: %s', health_check)
1422  compute = apis.get_api('compute', 'v1', project_id)
1423  if not region:
1424    request = compute.healthChecks().get(project=project_id,
1425                                         healthCheck=health_check)
1426  else:
1427    request = compute.regionHealthChecks().get(project=project_id,
1428                                               healthCheck=health_check,
1429                                               region=region)
1430  response = request.execute(num_retries=config.API_RETRIES)
1431  return HealthCheck(project_id, response)
class NetworkEndpointGroup(gcpdiag.models.Resource):
1434class NetworkEndpointGroup(models.Resource):
1435  """A Network Endpoint Group resource."""
1436
1437  _resource_data: dict
1438  _type: str
1439
1440  def __init__(self, project_id, resource_data):
1441    super().__init__(project_id=project_id)
1442    self._resource_data = resource_data
1443
1444  @property
1445  def name(self) -> str:
1446    return self._resource_data['name']
1447
1448  @property
1449  def id(self) -> str:
1450    return self._resource_data['id']
1451
1452  @property
1453  def full_path(self) -> str:
1454    result = re.match(r'https://www.googleapis.com/compute/v1/(.*)',
1455                      self.self_link)
1456    if result:
1457      return result.group(1)
1458    else:
1459      return f'>> {self.self_link}'
1460
1461  @property
1462  def short_path(self) -> str:
1463    path = self.project_id + '/' + self.name
1464    return path
1465
1466  @property
1467  def self_link(self) -> str:
1468    return self._resource_data['selfLink']

A Network Endpoint Group resource.

NetworkEndpointGroup(project_id, resource_data)
1440  def __init__(self, project_id, resource_data):
1441    super().__init__(project_id=project_id)
1442    self._resource_data = resource_data
name: str
1444  @property
1445  def name(self) -> str:
1446    return self._resource_data['name']
id: str
1448  @property
1449  def id(self) -> str:
1450    return self._resource_data['id']
full_path: str
1452  @property
1453  def full_path(self) -> str:
1454    result = re.match(r'https://www.googleapis.com/compute/v1/(.*)',
1455                      self.self_link)
1456    if result:
1457      return result.group(1)
1458    else:
1459      return f'>> {self.self_link}'

Returns the full path of this resource.

Example: 'projects/gcpdiag-gke-1-9b90/zones/europe-west4-a/clusters/gke1'

short_path: str
1461  @property
1462  def short_path(self) -> str:
1463    path = self.project_id + '/' + self.name
1464    return path

Returns the short name for this resource.

Note that it isn't clear from this name what kind of resource it is.

Example: 'gke1'

@caching.cached_api_call(in_memory=True)
def get_zonal_network_endpoint_groups( context: gcpdiag.models.Context) -> Mapping[str, NetworkEndpointGroup]:
1471@caching.cached_api_call(in_memory=True)
1472def get_zonal_network_endpoint_groups(
1473    context: models.Context,) -> Mapping[str, NetworkEndpointGroup]:
1474  """Returns a list of Network Endpoint Groups in the project."""
1475  groups: Dict[str, NetworkEndpointGroup] = {}
1476  if not apis.is_enabled(context.project_id, 'compute'):
1477    return groups
1478  gce_api = apis.get_api('compute', 'v1', context.project_id)
1479  requests = [
1480      gce_api.networkEndpointGroups().list(project=context.project_id,
1481                                           zone=zone)
1482      for zone in get_gce_zones(context.project_id)
1483  ]
1484  logging.debug('listing gce networkEndpointGroups of project %s',
1485                context.project_id)
1486  items = apis_utils.multi_list_all(
1487      requests=requests,
1488      next_function=gce_api.networkEndpointGroups().list_next,
1489  )
1490
1491  for i in items:
1492    result = re.match(
1493        r'https://www.googleapis.com/compute/v1/projects/[^/]+/zones/([^/]+)',
1494        i['selfLink'],
1495    )
1496    if not result:
1497      logging.error("instance %s selfLink didn't match regexp: %s", i['id'],
1498                    i['selfLink'])
1499      continue
1500    zone = result.group(1)
1501    labels = i.get('labels', {})
1502    resource = i.get('name', '')
1503    if not context.match_project_resource(
1504        location=zone, labels=labels, resource=resource):
1505      continue
1506    data = NetworkEndpointGroup(context.project_id, i)
1507    groups[data.full_path] = data
1508  return groups

Returns a list of Network Endpoint Groups in the project.