gcpdiag.queries.gce

Queries related to GCP Compute Engine.
POSITIVE_BOOL_VALUES = {'1', 'Y', 'YES', 'TRUE'}
DATAPROC_LABEL = 'goog-dataproc-cluster-name'
GKE_LABEL = 'goog-gke-node'
class InstanceTemplate(gcpdiag.models.Resource):
 39class InstanceTemplate(models.Resource):
 40  """Represents a GCE Instance Template."""
 41
 42  _resource_data: dict
 43
 44  def __init__(self, project_id, resource_data):
 45    super().__init__(project_id=project_id)
 46    self._resource_data = resource_data
 47
 48  @property
 49  def self_link(self) -> str:
 50    return self._resource_data['selfLink']
 51
 52  @property
 53  def full_path(self) -> str:
 54    result = re.match(r'https://www.googleapis.com/compute/v1/(.*)',
 55                      self.self_link)
 56    if result:
 57      return result.group(1)
 58    else:
 59      return f'>> {self.self_link}'
 60
 61  @property
 62  def short_path(self) -> str:
 63    path = self.project_id + '/' + self.name
 64    return path
 65
 66  @property
 67  def name(self) -> str:
 68    return self._resource_data['name']
 69
 70  @property
 71  def tags(self) -> List[str]:
 72    return self._resource_data['properties'].get('tags', {}).get('items', [])
 73
 74  @property
 75  def service_account(self) -> Optional[str]:
 76    sa_list = self._resource_data['properties'].get('serviceAccounts', [])
 77    if not sa_list:
 78      return None
 79    email = sa_list[0]['email']
 80    if email == 'default':
 81      project_nr = crm.get_project(self._project_id).number
 82      return f'{project_nr}-compute@developer.gserviceaccount.com'
 83    return email
 84
 85  @property
 86  def network(self) -> network_q.Network:
 87    return network_q.get_network_from_url(
 88        self._resource_data['properties']['networkInterfaces'][0]['network'])
 89
 90  @property
 91  def subnetwork(self) -> network_q.Subnetwork:
 92    subnet_url = self._resource_data['properties']['networkInterfaces'][0][
 93        'subnetwork']
 94    return self.network.subnetworks[subnet_url]
 95
 96  def get_metadata(self, key: str) -> str:
 97    for item in self._resource_data['properties']['metadata']['items']:
 98      if item['key'] == key:
 99        return item['value']
100    return ''

Represents a GCE Instance Template.

InstanceTemplate(project_id, resource_data)
44  def __init__(self, project_id, resource_data):
45    super().__init__(project_id=project_id)
46    self._resource_data = resource_data
full_path: str
52  @property
53  def full_path(self) -> str:
54    result = re.match(r'https://www.googleapis.com/compute/v1/(.*)',
55                      self.self_link)
56    if result:
57      return result.group(1)
58    else:
59      return f'>> {self.self_link}'

Returns the full path of this resource.

Example: 'projects/gcpdiag-gke-1-9b90/zones/europe-west4-a/clusters/gke1'

short_path: str
61  @property
62  def short_path(self) -> str:
63    path = self.project_id + '/' + self.name
64    return path

Returns the short name for this resource.

Note that it isn't clear from this name what kind of resource it is.

Example: 'gke1'

name: str
66  @property
67  def name(self) -> str:
68    return self._resource_data['name']
tags: List[str]
70  @property
71  def tags(self) -> List[str]:
72    return self._resource_data['properties'].get('tags', {}).get('items', [])
service_account: Optional[str]
74  @property
75  def service_account(self) -> Optional[str]:
76    sa_list = self._resource_data['properties'].get('serviceAccounts', [])
77    if not sa_list:
78      return None
79    email = sa_list[0]['email']
80    if email == 'default':
81      project_nr = crm.get_project(self._project_id).number
82      return f'{project_nr}-compute@developer.gserviceaccount.com'
83    return email
network: gcpdiag.queries.network.Network
85  @property
86  def network(self) -> network_q.Network:
87    return network_q.get_network_from_url(
88        self._resource_data['properties']['networkInterfaces'][0]['network'])
subnetwork: gcpdiag.queries.network.Subnetwork
90  @property
91  def subnetwork(self) -> network_q.Subnetwork:
92    subnet_url = self._resource_data['properties']['networkInterfaces'][0][
93        'subnetwork']
94    return self.network.subnetworks[subnet_url]
def get_metadata(self, key: str) -> str:
 96  def get_metadata(self, key: str) -> str:
 97    for item in self._resource_data['properties']['metadata']['items']:
 98      if item['key'] == key:
 99        return item['value']
100    return ''
class InstanceGroup(gcpdiag.models.Resource):
103class InstanceGroup(models.Resource):
104  """Represents a GCE instance group."""
105
106  _resource_data: dict
107
108  def __init__(self, project_id, resource_data):
109    super().__init__(project_id=project_id)
110    self._resource_data = resource_data
111
112  @property
113  def full_path(self) -> str:
114    result = re.match(
115        r'https://www.googleapis.com/compute/v1/(.*)',
116        self._resource_data['selfLink'],
117    )
118    if result:
119      return result.group(1)
120    else:
121      return '>> ' + self._resource_data['selfLink']
122
123  @property
124  def short_path(self) -> str:
125    path = self.project_id + '/' + self.name
126    return path
127
128  @property
129  def self_link(self) -> str:
130    return self._resource_data['selfLink']
131
132  @property
133  def name(self) -> str:
134    return self._resource_data['name']
135
136  @property
137  def named_ports(self) -> List[dict]:
138    if 'namedPorts' in self._resource_data:
139      return self._resource_data['namedPorts']
140    return []
141
142  def has_named_ports(self) -> bool:
143    if 'namedPorts' in self._resource_data:
144      return True
145    return False

Represents a GCE instance group.

InstanceGroup(project_id, resource_data)
108  def __init__(self, project_id, resource_data):
109    super().__init__(project_id=project_id)
110    self._resource_data = resource_data
full_path: str
112  @property
113  def full_path(self) -> str:
114    result = re.match(
115        r'https://www.googleapis.com/compute/v1/(.*)',
116        self._resource_data['selfLink'],
117    )
118    if result:
119      return result.group(1)
120    else:
121      return '>> ' + self._resource_data['selfLink']

Returns the full path of this resource.

Example: 'projects/gcpdiag-gke-1-9b90/zones/europe-west4-a/clusters/gke1'

short_path: str
123  @property
124  def short_path(self) -> str:
125    path = self.project_id + '/' + self.name
126    return path

Returns the short name for this resource.

Note that it isn't clear from this name what kind of resource it is.

Example: 'gke1'

name: str
132  @property
133  def name(self) -> str:
134    return self._resource_data['name']
named_ports: List[dict]
136  @property
137  def named_ports(self) -> List[dict]:
138    if 'namedPorts' in self._resource_data:
139      return self._resource_data['namedPorts']
140    return []
def has_named_ports(self) -> bool:
142  def has_named_ports(self) -> bool:
143    if 'namedPorts' in self._resource_data:
144      return True
145    return False
class ManagedInstanceGroup(gcpdiag.models.Resource):
148class ManagedInstanceGroup(models.Resource):
149  """Represents a GCE managed instance group."""
150
151  _resource_data: dict
152  _region: Optional[str]
153
154  def __init__(self, project_id, resource_data):
155    super().__init__(project_id=project_id)
156    self._resource_data = resource_data
157    self._region = None
158
159  @property
160  def full_path(self) -> str:
161    result = re.match(
162        r'https://www.googleapis.com/compute/v1/(.*)',
163        self._resource_data['selfLink'],
164    )
165    if result:
166      return result.group(1)
167    else:
168      return '>> ' + self._resource_data['selfLink']
169
170  @property
171  def short_path(self) -> str:
172    path = self.project_id + '/' + self.name
173    return path
174
175  def is_gke(self) -> bool:
176    """Is this managed instance group part of a GKE cluster?
177
178    Note that the results are based on heuristics (the mig name),
179    which is not ideal.
180    """
181
182    # gke- is normal GKE, gk3- is GKE autopilot
183    return self.name.startswith('gke-') or self.name.startswith('gk3-')
184
185  @property
186  def self_link(self) -> str:
187    return self._resource_data['selfLink']
188
189  @property
190  def name(self) -> str:
191    return self._resource_data['name']
192
193  @property
194  def region(self) -> str:
195    if self._region is None:
196      if 'region' in self._resource_data:
197        m = re.search(r'/regions/([^/]+)$', self._resource_data['region'])
198        if not m:
199          raise RuntimeError("can't determine region of mig %s (%s)" %
200                             (self.name, self._resource_data['region']))
201        self._region = m.group(1)
202      elif 'zone' in self._resource_data:
203        m = re.search(r'/zones/([^/]+)$', self._resource_data['zone'])
204        if not m:
205          raise RuntimeError("can't determine region of mig %s (%s)" %
206                             (self.name, self._resource_data['region']))
207        zone = m.group(1)
208        self._region = utils.zone_region(zone)
209      else:
210        raise RuntimeError(
211            f"can't determine region of mig {self.name}, both region and zone"
212            " aren't set!")
213    return self._region
214
215  def count_no_action_instances(self) -> int:
216    """number of instances in the mig that are running and have no scheduled actions."""
217    return self._resource_data['currentActions']['none']
218
219  def is_instance_member(self, project_id: str, region: str,
220                         instance_name: str):
221    """Given the project_id, region and instance name, is it a member of this MIG?"""
222    return (self.project_id == project_id and self.region == region and
223            instance_name.startswith(self._resource_data['baseInstanceName']))
224
225  @property
226  def template(self) -> InstanceTemplate:
227    if 'instanceTemplate' not in self._resource_data:
228      raise RuntimeError('instanceTemplate not set for MIG {self.name}')
229
230    m = re.match(
231        r'https://www.googleapis.com/compute/v1/(.*)',
232        self._resource_data['instanceTemplate'],
233    )
234
235    if not m:
236      raise RuntimeError("can't parse instanceTemplate: %s" %
237                         self._resource_data['instanceTemplate'])
238    template_self_link = m.group(1)
239    templates = get_instance_templates(self.project_id)
240    if template_self_link not in templates:
241      raise RuntimeError(
242          f'instanceTemplate {template_self_link} for MIG {self.name} not found'
243      )
244    return templates[template_self_link]

Represents a GCE managed instance group.

ManagedInstanceGroup(project_id, resource_data)
154  def __init__(self, project_id, resource_data):
155    super().__init__(project_id=project_id)
156    self._resource_data = resource_data
157    self._region = None
full_path: str
159  @property
160  def full_path(self) -> str:
161    result = re.match(
162        r'https://www.googleapis.com/compute/v1/(.*)',
163        self._resource_data['selfLink'],
164    )
165    if result:
166      return result.group(1)
167    else:
168      return '>> ' + self._resource_data['selfLink']

Returns the full path of this resource.

Example: 'projects/gcpdiag-gke-1-9b90/zones/europe-west4-a/clusters/gke1'

short_path: str
170  @property
171  def short_path(self) -> str:
172    path = self.project_id + '/' + self.name
173    return path

Returns the short name for this resource.

Note that it isn't clear from this name what kind of resource it is.

Example: 'gke1'

def is_gke(self) -> bool:
175  def is_gke(self) -> bool:
176    """Is this managed instance group part of a GKE cluster?
177
178    Note that the results are based on heuristics (the mig name),
179    which is not ideal.
180    """
181
182    # gke- is normal GKE, gk3- is GKE autopilot
183    return self.name.startswith('gke-') or self.name.startswith('gk3-')

Is this managed instance group part of a GKE cluster?

Note that the results are based on heuristics (the mig name), which is not ideal.

name: str
189  @property
190  def name(self) -> str:
191    return self._resource_data['name']
region: str
193  @property
194  def region(self) -> str:
195    if self._region is None:
196      if 'region' in self._resource_data:
197        m = re.search(r'/regions/([^/]+)$', self._resource_data['region'])
198        if not m:
199          raise RuntimeError("can't determine region of mig %s (%s)" %
200                             (self.name, self._resource_data['region']))
201        self._region = m.group(1)
202      elif 'zone' in self._resource_data:
203        m = re.search(r'/zones/([^/]+)$', self._resource_data['zone'])
204        if not m:
205          raise RuntimeError("can't determine region of mig %s (%s)" %
206                             (self.name, self._resource_data['region']))
207        zone = m.group(1)
208        self._region = utils.zone_region(zone)
209      else:
210        raise RuntimeError(
211            f"can't determine region of mig {self.name}, both region and zone"
212            " aren't set!")
213    return self._region
def count_no_action_instances(self) -> int:
215  def count_no_action_instances(self) -> int:
216    """number of instances in the mig that are running and have no scheduled actions."""
217    return self._resource_data['currentActions']['none']

number of instances in the mig that are running and have no scheduled actions.

def is_instance_member(self, project_id: str, region: str, instance_name: str):
219  def is_instance_member(self, project_id: str, region: str,
220                         instance_name: str):
221    """Given the project_id, region and instance name, is it a member of this MIG?"""
222    return (self.project_id == project_id and self.region == region and
223            instance_name.startswith(self._resource_data['baseInstanceName']))

Given the project_id, region and instance name, is it a member of this MIG?

template: InstanceTemplate
225  @property
226  def template(self) -> InstanceTemplate:
227    if 'instanceTemplate' not in self._resource_data:
228      raise RuntimeError('instanceTemplate not set for MIG {self.name}')
229
230    m = re.match(
231        r'https://www.googleapis.com/compute/v1/(.*)',
232        self._resource_data['instanceTemplate'],
233    )
234
235    if not m:
236      raise RuntimeError("can't parse instanceTemplate: %s" %
237                         self._resource_data['instanceTemplate'])
238    template_self_link = m.group(1)
239    templates = get_instance_templates(self.project_id)
240    if template_self_link not in templates:
241      raise RuntimeError(
242          f'instanceTemplate {template_self_link} for MIG {self.name} not found'
243      )
244    return templates[template_self_link]
class SerialPortOutput:
247class SerialPortOutput:
248  """Represents the full Serial Port Output (/dev/ttyS0 or COM1) of an instance.
249
250  contents is the full 1MB of the instance.
251  """
252
253  _project_id: str
254  _instance_id: str
255  _contents: List[str]
256
257  def __init__(self, project_id, instance_id, contents):
258    self._project_id = project_id
259    self._instance_id = instance_id
260    self._contents = contents
261
262  @property
263  def contents(self) -> List[str]:
264    return self._contents
265
266  @property
267  def instance_id(self) -> str:
268    return self._instance_id

Represents the full Serial Port Output (/dev/ttyS0 or COM1) of an instance.

contents is the full 1MB of the instance.

SerialPortOutput(project_id, instance_id, contents)
257  def __init__(self, project_id, instance_id, contents):
258    self._project_id = project_id
259    self._instance_id = instance_id
260    self._contents = contents
contents: List[str]
262  @property
263  def contents(self) -> List[str]:
264    return self._contents
instance_id: str
266  @property
267  def instance_id(self) -> str:
268    return self._instance_id
class Instance(gcpdiag.models.Resource):
271class Instance(models.Resource):
272  """Represents a GCE instance."""
273
274  _resource_data: dict
275  _region: Optional[str]
276
277  def __init__(self, project_id, resource_data):
278    super().__init__(project_id=project_id)
279    self._resource_data = resource_data
280    self._metadata_dict = None
281    self._region = None
282
283  @property
284  def id(self) -> str:
285    return self._resource_data['id']
286
287  @property
288  def name(self) -> str:
289    return self._resource_data['name']
290
291  @property
292  def full_path(self) -> str:
293    result = re.match(
294        r'https://www.googleapis.com/compute/v1/(.*)',
295        self._resource_data['selfLink'],
296    )
297    if result:
298      return result.group(1)
299    else:
300      return '>> ' + self._resource_data['selfLink']
301
302  @property
303  def short_path(self) -> str:
304    # Note: instance names must be unique per project, so no need to add the zone.
305    path = self.project_id + '/' + self.name
306    return path
307
308  @property
309  def creation_timestamp(self) -> datetime:
310    """VM creation time, as a *naive* `datetime` object."""
311    return (datetime.fromisoformat(
312        self._resource_data['creationTimestamp']).astimezone(
313            timezone.utc).replace(tzinfo=None))
314
315  @property
316  def region(self) -> str:
317    if self._region is None:
318      if 'zone' in self._resource_data:
319        m = re.search(r'/zones/([^/]+)$', self._resource_data['zone'])
320        if not m:
321          raise RuntimeError("can't determine region of instance %s (%s)" %
322                             (self.name, self._resource_data['region']))
323        zone = m.group(1)
324        self._region = utils.zone_region(zone)
325      else:
326        raise RuntimeError(
327            f"can't determine region of instance {self.name}, zone isn't set!")
328    return self._region
329
330  @property
331  def zone(self) -> str:
332    zone_uri = self._resource_data['zone']
333    m = re.search(r'/zones/([^/]+)$', zone_uri)
334    if m:
335      return m.group(1)
336    else:
337      raise RuntimeError(f"can't determine zone of instance {self.name}")
338
339  @property
340  def disks(self) -> List[dict]:
341    if 'disks' in self._resource_data:
342      return self._resource_data['disks']
343    return []
344
345  @property
346  def startrestricted(self) -> bool:
347    return self._resource_data['startRestricted']
348
349  def laststarttimestamp(self) -> str:
350    return self._resource_data['lastStartTimestamp']
351
352  def laststoptimestamp(self) -> str:
353    if 'lastStopTimestamp' in self._resource_data:
354      return self._resource_data['lastStopTimestamp']
355    return ''
356
357  def is_serial_port_logging_enabled(self) -> bool:
358    value = self.get_metadata('serial-port-logging-enable')
359    return bool(value and value.upper() in POSITIVE_BOOL_VALUES)
360
361  def is_oslogin_enabled(self) -> bool:
362    value = self.get_metadata('enable-oslogin')
363    return bool(value and value.upper() in POSITIVE_BOOL_VALUES)
364
365  def is_metadata_enabled(self, metadata_name) -> bool:
366    """Use to check for common boolen metadata value"""
367    value = self.get_metadata(metadata_name)
368    return bool(value and value.upper() in POSITIVE_BOOL_VALUES)
369
370  def has_label(self, label) -> bool:
371    return label in self.labels
372
373  def is_dataproc_instance(self) -> bool:
374    return self.has_label(DATAPROC_LABEL)
375
376  def is_gke_node(self) -> bool:
377    return self.has_label(GKE_LABEL)
378
379  def is_preemptible_vm(self) -> bool:
380    return ('scheduling' in self._resource_data and
381            'preemptible' in self._resource_data['scheduling'] and
382            self._resource_data['scheduling']['preemptible'])
383
384  def is_windows_machine(self) -> bool:
385    if 'disks' in self._resource_data:
386      disks = next(iter(self._resource_data['disks']))
387      if 'guestOsFeatures' in disks:
388        if 'WINDOWS' in [t['type'] for t in iter(disks['guestOsFeatures'])]:
389          return True
390    return False
391
392  def is_public_machine(self) -> bool:
393    if 'networkInterfaces' in self._resource_data:
394      return 'natIP' in str(self._resource_data['networkInterfaces'])
395    return False
396
397  def machine_type(self):
398    if 'machineType' in self._resource_data:
399      #return self._resource_data['machineType']
400      machine_type_uri = self._resource_data['machineType']
401      mt = re.search(r'/machineTypes/([^/]+)$', machine_type_uri)
402      if mt:
403        return mt.group(1)
404      else:
405        raise RuntimeError(
406            f"can't determine machineType of instance {self.name}")
407    return None
408
409  def check_license(self, licenses: List[str]) -> bool:
410    """Checks that a license is contained in a given license list"""
411    if 'disks' in self._resource_data:
412      for disk in self._resource_data['disks']:
413        if 'license' in str(disk):
414          for license_ in licenses:
415            for attached_license in disk['licenses']:
416              if license_ == attached_license.partition('/global/licenses/')[2]:
417                return True
418    return False
419
420  def get_boot_disk_image(self) -> str:
421    """Get VM's boot disk image"""
422    boot_disk_image: str = ''
423    for disk in self.disks:
424      if disk.get('boot', False):
425        disk_source = disk.get('source', '')
426        m = re.search(r'/disks/([^/]+)$', disk_source)
427        if not m:
428          raise RuntimeError(f"can't determine name of boot disk {disk_source}")
429        disk_name = m.group(1)
430        gce_disk: Disk = get_disk(self.project_id,
431                                  zone=self.zone,
432                                  disk_name=disk_name)
433        return gce_disk.source_image
434    return boot_disk_image
435
436  @property
437  def is_sole_tenant_vm(self) -> bool:
438    return bool('nodeAffinities' in self._resource_data['scheduling'])
439
440  @property
441  def network(self) -> network_q.Network:
442    # 'https://www.googleapis.com/compute/v1/projects/gcpdiag-gce1-aaaa/global/networks/default'
443    network_string = self._resource_data['networkInterfaces'][0]['network']
444    m = re.match(r'^.+/projects/([^/]+)/global/networks/([^/]+)$',
445                 network_string)
446    if not m:
447      raise RuntimeError("can't parse network string: %s" % network_string)
448    return network_q.get_network(m.group(1), m.group(2))
449
450  @property
451  def network_ips(self) -> List[network_q.IPv4AddrOrIPv6Addr]:
452    return [
453        ipaddress.ip_address(nic['networkIP'])
454        for nic in self._resource_data['networkInterfaces']
455    ]
456
457  @property
458  def get_network_interfaces(self):
459    return self._resource_data['networkInterfaces']
460
461  @property
462  def subnetworks(self) -> List[network_q.Subnetwork]:
463    subnetworks = []
464    for nic in self._resource_data['networkInterfaces']:
465      subnetworks.append(network_q.get_subnetwork_from_url(nic['subnetwork']))
466    return subnetworks
467
468  @property
469  def routes(self) -> List[network_q.Route]:
470    routes = []
471    for nic in self._resource_data['networkInterfaces']:
472      for route in network_q.get_routes(self.project_id):
473        if nic['network'] == route.network:
474          if route.tags == []:
475            routes.append(route)
476            continue
477          else:
478            temp = [x for x in self.tags if x in route.tags]
479            if len(temp) > 0:
480              routes.append(route)
481    return routes
482
483  def get_network_ip_for_instance_interface(
484      self, network: str) -> Optional[network_q.IPv4NetOrIPv6Net]:
485    """Get the network ip for a nic given a network name"""
486    for nic in self._resource_data['networkInterfaces']:
487      if nic.get('network') == network:
488        return ipaddress.ip_network(nic.get('networkIP'))
489    return None
490
491  def secure_boot_enabled(self) -> bool:
492    if 'shieldedInstanceConfig' in self._resource_data:
493      return self._resource_data['shieldedInstanceConfig']['enableSecureBoot']
494    return False
495
496  @property
497  def access_scopes(self) -> List[str]:
498    if 'serviceAccounts' in self._resource_data:
499      saccts = self._resource_data['serviceAccounts']
500      if isinstance(saccts, list) and len(saccts) >= 1:
501        return saccts[0].get('scopes', [])
502    return []
503
504  @property
505  def service_account(self) -> Optional[str]:
506    if 'serviceAccounts' in self._resource_data:
507      saccts = self._resource_data['serviceAccounts']
508      if isinstance(saccts, list) and len(saccts) >= 1:
509        return saccts[0]['email']
510    return None
511
512  @property
513  def tags(self) -> List[str]:
514    if 'tags' in self._resource_data:
515      if 'items' in self._resource_data['tags']:
516        return self._resource_data['tags']['items']
517    return []
518
519  def get_metadata(self, key: str) -> str:
520    if not self._metadata_dict:
521      self._metadata_dict = {}
522      if ('metadata' in self._resource_data and
523          'items' in self._resource_data['metadata']):
524        for item in self._resource_data['metadata']['items']:
525          if 'key' in item and 'value' in item:
526            self._metadata_dict[item['key']] = item['value']
527    project_metadata = get_project_metadata(self.project_id)
528    return self._metadata_dict.get(key, project_metadata.get(key))
529
530  @property
531  def status(self) -> str:
532    """VM Status"""
533    return self._resource_data.get('status', None)
534
535  @property
536  def is_running(self) -> bool:
537    """VM Status is indicated as running"""
538    return self._resource_data.get('status', False) == 'RUNNING'
539
540  @property  # type: ignore
541  @caching.cached_api_call(in_memory=True)
542  def mig(self) -> ManagedInstanceGroup:
543    """Return ManagedInstanceGroup that owns this instance.
544
545    Throws AttributeError in case it isn't MIG-managed.
546    """
547
548    created_by = self.get_metadata('created-by')
549    if created_by is None:
550      raise AttributeError(f'instance {self.id} is not managed by a mig')
551
552    # Example created-by:
553    # pylint: disable=line-too-long
554    # "projects/12340002/zones/europe-west4-a/instanceGroupManagers/gke-gke1-default-pool-e5e20a34-grp"
555    # (note how it uses a project number and not a project id...)
556    created_by_match = re.match(
557        r'projects/([^/]+)/((?:regions|zones)/[^/]+/instanceGroupManagers/[^/]+)$',
558        created_by,
559    )
560    if not created_by_match:
561      raise AttributeError(f'instance {self.id} is not managed by a mig'
562                           f' (created-by={created_by})')
563    project = crm.get_project(created_by_match.group(1))
564
565    mig_self_link = ('https://www.googleapis.com/compute/v1/'
566                     f'projects/{project.id}/{created_by_match.group(2)}')
567
568    # Try to find a matching mig.
569    for mig in get_managed_instance_groups(
570        models.Context(project_id=self.project_id)).values():
571      if mig.self_link == mig_self_link:
572        return mig
573
574    raise AttributeError(f'instance {self.id} is not managed by a mig')
575
576  @property
577  def labels(self) -> dict:
578    return self._resource_data.get('labels', {})

Represents a GCE instance.

Instance(project_id, resource_data)
277  def __init__(self, project_id, resource_data):
278    super().__init__(project_id=project_id)
279    self._resource_data = resource_data
280    self._metadata_dict = None
281    self._region = None
id: str
283  @property
284  def id(self) -> str:
285    return self._resource_data['id']
name: str
287  @property
288  def name(self) -> str:
289    return self._resource_data['name']
full_path: str
291  @property
292  def full_path(self) -> str:
293    result = re.match(
294        r'https://www.googleapis.com/compute/v1/(.*)',
295        self._resource_data['selfLink'],
296    )
297    if result:
298      return result.group(1)
299    else:
300      return '>> ' + self._resource_data['selfLink']

Returns the full path of this resource.

Example: 'projects/gcpdiag-gke-1-9b90/zones/europe-west4-a/clusters/gke1'

short_path: str
302  @property
303  def short_path(self) -> str:
304    # Note: instance names must be unique per project, so no need to add the zone.
305    path = self.project_id + '/' + self.name
306    return path

Returns the short name for this resource.

Note that it isn't clear from this name what kind of resource it is.

Example: 'gke1'

creation_timestamp: datetime.datetime
308  @property
309  def creation_timestamp(self) -> datetime:
310    """VM creation time, as a *naive* `datetime` object."""
311    return (datetime.fromisoformat(
312        self._resource_data['creationTimestamp']).astimezone(
313            timezone.utc).replace(tzinfo=None))

VM creation time, as a naive datetime object.

region: str
315  @property
316  def region(self) -> str:
317    if self._region is None:
318      if 'zone' in self._resource_data:
319        m = re.search(r'/zones/([^/]+)$', self._resource_data['zone'])
320        if not m:
321          raise RuntimeError("can't determine region of instance %s (%s)" %
322                             (self.name, self._resource_data['region']))
323        zone = m.group(1)
324        self._region = utils.zone_region(zone)
325      else:
326        raise RuntimeError(
327            f"can't determine region of instance {self.name}, zone isn't set!")
328    return self._region
zone: str
330  @property
331  def zone(self) -> str:
332    zone_uri = self._resource_data['zone']
333    m = re.search(r'/zones/([^/]+)$', zone_uri)
334    if m:
335      return m.group(1)
336    else:
337      raise RuntimeError(f"can't determine zone of instance {self.name}")
disks: List[dict]
339  @property
340  def disks(self) -> List[dict]:
341    if 'disks' in self._resource_data:
342      return self._resource_data['disks']
343    return []
startrestricted: bool
345  @property
346  def startrestricted(self) -> bool:
347    return self._resource_data['startRestricted']
def laststarttimestamp(self) -> str:
349  def laststarttimestamp(self) -> str:
350    return self._resource_data['lastStartTimestamp']
def laststoptimestamp(self) -> str:
352  def laststoptimestamp(self) -> str:
353    if 'lastStopTimestamp' in self._resource_data:
354      return self._resource_data['lastStopTimestamp']
355    return ''
def is_serial_port_logging_enabled(self) -> bool:
357  def is_serial_port_logging_enabled(self) -> bool:
358    value = self.get_metadata('serial-port-logging-enable')
359    return bool(value and value.upper() in POSITIVE_BOOL_VALUES)
def is_oslogin_enabled(self) -> bool:
361  def is_oslogin_enabled(self) -> bool:
362    value = self.get_metadata('enable-oslogin')
363    return bool(value and value.upper() in POSITIVE_BOOL_VALUES)
def is_metadata_enabled(self, metadata_name) -> bool:
365  def is_metadata_enabled(self, metadata_name) -> bool:
366    """Use to check for common boolen metadata value"""
367    value = self.get_metadata(metadata_name)
368    return bool(value and value.upper() in POSITIVE_BOOL_VALUES)

Use to check for common boolen metadata value

def has_label(self, label) -> bool:
370  def has_label(self, label) -> bool:
371    return label in self.labels
def is_dataproc_instance(self) -> bool:
373  def is_dataproc_instance(self) -> bool:
374    return self.has_label(DATAPROC_LABEL)
def is_gke_node(self) -> bool:
376  def is_gke_node(self) -> bool:
377    return self.has_label(GKE_LABEL)
def is_preemptible_vm(self) -> bool:
379  def is_preemptible_vm(self) -> bool:
380    return ('scheduling' in self._resource_data and
381            'preemptible' in self._resource_data['scheduling'] and
382            self._resource_data['scheduling']['preemptible'])
def is_windows_machine(self) -> bool:
384  def is_windows_machine(self) -> bool:
385    if 'disks' in self._resource_data:
386      disks = next(iter(self._resource_data['disks']))
387      if 'guestOsFeatures' in disks:
388        if 'WINDOWS' in [t['type'] for t in iter(disks['guestOsFeatures'])]:
389          return True
390    return False
def is_public_machine(self) -> bool:
392  def is_public_machine(self) -> bool:
393    if 'networkInterfaces' in self._resource_data:
394      return 'natIP' in str(self._resource_data['networkInterfaces'])
395    return False
def machine_type(self):
397  def machine_type(self):
398    if 'machineType' in self._resource_data:
399      #return self._resource_data['machineType']
400      machine_type_uri = self._resource_data['machineType']
401      mt = re.search(r'/machineTypes/([^/]+)$', machine_type_uri)
402      if mt:
403        return mt.group(1)
404      else:
405        raise RuntimeError(
406            f"can't determine machineType of instance {self.name}")
407    return None
def check_license(self, licenses: List[str]) -> bool:
409  def check_license(self, licenses: List[str]) -> bool:
410    """Checks that a license is contained in a given license list"""
411    if 'disks' in self._resource_data:
412      for disk in self._resource_data['disks']:
413        if 'license' in str(disk):
414          for license_ in licenses:
415            for attached_license in disk['licenses']:
416              if license_ == attached_license.partition('/global/licenses/')[2]:
417                return True
418    return False

Checks that a license is contained in a given license list

def get_boot_disk_image(self) -> str:
420  def get_boot_disk_image(self) -> str:
421    """Get VM's boot disk image"""
422    boot_disk_image: str = ''
423    for disk in self.disks:
424      if disk.get('boot', False):
425        disk_source = disk.get('source', '')
426        m = re.search(r'/disks/([^/]+)$', disk_source)
427        if not m:
428          raise RuntimeError(f"can't determine name of boot disk {disk_source}")
429        disk_name = m.group(1)
430        gce_disk: Disk = get_disk(self.project_id,
431                                  zone=self.zone,
432                                  disk_name=disk_name)
433        return gce_disk.source_image
434    return boot_disk_image

Get VM's boot disk image

is_sole_tenant_vm: bool
436  @property
437  def is_sole_tenant_vm(self) -> bool:
438    return bool('nodeAffinities' in self._resource_data['scheduling'])
network: gcpdiag.queries.network.Network
440  @property
441  def network(self) -> network_q.Network:
442    # 'https://www.googleapis.com/compute/v1/projects/gcpdiag-gce1-aaaa/global/networks/default'
443    network_string = self._resource_data['networkInterfaces'][0]['network']
444    m = re.match(r'^.+/projects/([^/]+)/global/networks/([^/]+)$',
445                 network_string)
446    if not m:
447      raise RuntimeError("can't parse network string: %s" % network_string)
448    return network_q.get_network(m.group(1), m.group(2))
network_ips: List[Union[ipaddress.IPv4Address, ipaddress.IPv6Address]]
450  @property
451  def network_ips(self) -> List[network_q.IPv4AddrOrIPv6Addr]:
452    return [
453        ipaddress.ip_address(nic['networkIP'])
454        for nic in self._resource_data['networkInterfaces']
455    ]
get_network_interfaces
457  @property
458  def get_network_interfaces(self):
459    return self._resource_data['networkInterfaces']
subnetworks: List[gcpdiag.queries.network.Subnetwork]
461  @property
462  def subnetworks(self) -> List[network_q.Subnetwork]:
463    subnetworks = []
464    for nic in self._resource_data['networkInterfaces']:
465      subnetworks.append(network_q.get_subnetwork_from_url(nic['subnetwork']))
466    return subnetworks
routes: List[gcpdiag.queries.network.Route]
468  @property
469  def routes(self) -> List[network_q.Route]:
470    routes = []
471    for nic in self._resource_data['networkInterfaces']:
472      for route in network_q.get_routes(self.project_id):
473        if nic['network'] == route.network:
474          if route.tags == []:
475            routes.append(route)
476            continue
477          else:
478            temp = [x for x in self.tags if x in route.tags]
479            if len(temp) > 0:
480              routes.append(route)
481    return routes
def get_network_ip_for_instance_interface( self, network: str) -> Union[ipaddress.IPv4Network, ipaddress.IPv6Network, NoneType]:
483  def get_network_ip_for_instance_interface(
484      self, network: str) -> Optional[network_q.IPv4NetOrIPv6Net]:
485    """Get the network ip for a nic given a network name"""
486    for nic in self._resource_data['networkInterfaces']:
487      if nic.get('network') == network:
488        return ipaddress.ip_network(nic.get('networkIP'))
489    return None

Get the network ip for a nic given a network name

def secure_boot_enabled(self) -> bool:
491  def secure_boot_enabled(self) -> bool:
492    if 'shieldedInstanceConfig' in self._resource_data:
493      return self._resource_data['shieldedInstanceConfig']['enableSecureBoot']
494    return False
access_scopes: List[str]
496  @property
497  def access_scopes(self) -> List[str]:
498    if 'serviceAccounts' in self._resource_data:
499      saccts = self._resource_data['serviceAccounts']
500      if isinstance(saccts, list) and len(saccts) >= 1:
501        return saccts[0].get('scopes', [])
502    return []
service_account: Optional[str]
504  @property
505  def service_account(self) -> Optional[str]:
506    if 'serviceAccounts' in self._resource_data:
507      saccts = self._resource_data['serviceAccounts']
508      if isinstance(saccts, list) and len(saccts) >= 1:
509        return saccts[0]['email']
510    return None
tags: List[str]
512  @property
513  def tags(self) -> List[str]:
514    if 'tags' in self._resource_data:
515      if 'items' in self._resource_data['tags']:
516        return self._resource_data['tags']['items']
517    return []
def get_metadata(self, key: str) -> str:
519  def get_metadata(self, key: str) -> str:
520    if not self._metadata_dict:
521      self._metadata_dict = {}
522      if ('metadata' in self._resource_data and
523          'items' in self._resource_data['metadata']):
524        for item in self._resource_data['metadata']['items']:
525          if 'key' in item and 'value' in item:
526            self._metadata_dict[item['key']] = item['value']
527    project_metadata = get_project_metadata(self.project_id)
528    return self._metadata_dict.get(key, project_metadata.get(key))
status: str
530  @property
531  def status(self) -> str:
532    """VM Status"""
533    return self._resource_data.get('status', None)

VM Status

is_running: bool
535  @property
536  def is_running(self) -> bool:
537    """VM Status is indicated as running"""
538    return self._resource_data.get('status', False) == 'RUNNING'

VM Status is indicated as running

mig: ManagedInstanceGroup
540  @property  # type: ignore
541  @caching.cached_api_call(in_memory=True)
542  def mig(self) -> ManagedInstanceGroup:
543    """Return ManagedInstanceGroup that owns this instance.
544
545    Throws AttributeError in case it isn't MIG-managed.
546    """
547
548    created_by = self.get_metadata('created-by')
549    if created_by is None:
550      raise AttributeError(f'instance {self.id} is not managed by a mig')
551
552    # Example created-by:
553    # pylint: disable=line-too-long
554    # "projects/12340002/zones/europe-west4-a/instanceGroupManagers/gke-gke1-default-pool-e5e20a34-grp"
555    # (note how it uses a project number and not a project id...)
556    created_by_match = re.match(
557        r'projects/([^/]+)/((?:regions|zones)/[^/]+/instanceGroupManagers/[^/]+)$',
558        created_by,
559    )
560    if not created_by_match:
561      raise AttributeError(f'instance {self.id} is not managed by a mig'
562                           f' (created-by={created_by})')
563    project = crm.get_project(created_by_match.group(1))
564
565    mig_self_link = ('https://www.googleapis.com/compute/v1/'
566                     f'projects/{project.id}/{created_by_match.group(2)}')
567
568    # Try to find a matching mig.
569    for mig in get_managed_instance_groups(
570        models.Context(project_id=self.project_id)).values():
571      if mig.self_link == mig_self_link:
572        return mig
573
574    raise AttributeError(f'instance {self.id} is not managed by a mig')

Return ManagedInstanceGroup that owns this instance.

Throws AttributeError in case it isn't MIG-managed.

labels: dict
576  @property
577  def labels(self) -> dict:
578    return self._resource_data.get('labels', {})
class Disk(gcpdiag.models.Resource):
581class Disk(models.Resource):
582  """Represents a GCE disk."""
583
584  _resource_data: dict
585
586  def __init__(self, project_id, resource_data):
587    super().__init__(project_id=project_id)
588    self._resource_data = resource_data
589
590  @property
591  def id(self) -> str:
592    return self._resource_data['id']
593
594  @property
595  def name(self) -> str:
596    return self._resource_data['name']
597
598  @property
599  def type(self) -> str:
600    disk_type = re.search(r'/diskTypes/([^/]+)$', self._resource_data['type'])
601    if not disk_type:
602      raise RuntimeError("can't determine type of the disk %s (%s)" %
603                         (self.name, self._resource_data['type']))
604    return disk_type.group(1)
605
606  @property
607  def users(self) -> list:
608    pattern = r'/instances/(.+)$'
609    # Extracting the instances
610    instances = []
611    for i in self._resource_data.get('users', []):
612      m = re.search(pattern, i)
613      if m:
614        instances.append(m.group(1))
615    return instances
616
617  @property
618  def zone(self) -> str:
619    m = re.search(r'/zones/([^/]+)$', self._resource_data['zone'])
620    if not m:
621      raise RuntimeError("can't determine zone of disk %s (%s)" %
622                         (self.name, self._resource_data['zone']))
623    return m.group(1)
624
625  @property
626  def source_image(self) -> str:
627    return self._resource_data.get('sourceImage', '')
628
629  @property
630  def full_path(self) -> str:
631    result = re.match(
632        r'https://www.googleapis.com/compute/v1/(.*)',
633        self._resource_data['selfLink'],
634    )
635    if result:
636      return result.group(1)
637    else:
638      return '>> ' + self._resource_data['selfLink']
639
640  @property
641  def short_path(self) -> str:
642    return f'{self.project_id}/{self.name}'
643
644  @property
645  def bootable(self) -> bool:
646    return 'guestOsFeatures' in self._resource_data
647
648  @property
649  def in_use(self) -> bool:
650    return 'users' in self._resource_data
651
652  @property
653  def size(self) -> int:
654    return self._resource_data['sizeGb']
655
656  @property
657  def provisionediops(self) -> Optional[int]:
658    return self._resource_data.get('provisionedIops')
659
660  @property
661  def has_snapshot_schedule(self) -> bool:
662    return 'resourcePolicies' in self._resource_data

Represents a GCE disk.

Disk(project_id, resource_data)
586  def __init__(self, project_id, resource_data):
587    super().__init__(project_id=project_id)
588    self._resource_data = resource_data
id: str
590  @property
591  def id(self) -> str:
592    return self._resource_data['id']
name: str
594  @property
595  def name(self) -> str:
596    return self._resource_data['name']
type: str
598  @property
599  def type(self) -> str:
600    disk_type = re.search(r'/diskTypes/([^/]+)$', self._resource_data['type'])
601    if not disk_type:
602      raise RuntimeError("can't determine type of the disk %s (%s)" %
603                         (self.name, self._resource_data['type']))
604    return disk_type.group(1)
users: list
606  @property
607  def users(self) -> list:
608    pattern = r'/instances/(.+)$'
609    # Extracting the instances
610    instances = []
611    for i in self._resource_data.get('users', []):
612      m = re.search(pattern, i)
613      if m:
614        instances.append(m.group(1))
615    return instances
zone: str
617  @property
618  def zone(self) -> str:
619    m = re.search(r'/zones/([^/]+)$', self._resource_data['zone'])
620    if not m:
621      raise RuntimeError("can't determine zone of disk %s (%s)" %
622                         (self.name, self._resource_data['zone']))
623    return m.group(1)
source_image: str
625  @property
626  def source_image(self) -> str:
627    return self._resource_data.get('sourceImage', '')
full_path: str
629  @property
630  def full_path(self) -> str:
631    result = re.match(
632        r'https://www.googleapis.com/compute/v1/(.*)',
633        self._resource_data['selfLink'],
634    )
635    if result:
636      return result.group(1)
637    else:
638      return '>> ' + self._resource_data['selfLink']

Returns the full path of this resource.

Example: 'projects/gcpdiag-gke-1-9b90/zones/europe-west4-a/clusters/gke1'

short_path: str
640  @property
641  def short_path(self) -> str:
642    return f'{self.project_id}/{self.name}'

Returns the short name for this resource.

Note that it isn't clear from this name what kind of resource it is.

Example: 'gke1'

bootable: bool
644  @property
645  def bootable(self) -> bool:
646    return 'guestOsFeatures' in self._resource_data
in_use: bool
648  @property
649  def in_use(self) -> bool:
650    return 'users' in self._resource_data
size: int
652  @property
653  def size(self) -> int:
654    return self._resource_data['sizeGb']
provisionediops: Optional[int]
656  @property
657  def provisionediops(self) -> Optional[int]:
658    return self._resource_data.get('provisionedIops')
has_snapshot_schedule: bool
660  @property
661  def has_snapshot_schedule(self) -> bool:
662    return 'resourcePolicies' in self._resource_data
@caching.cached_api_call(in_memory=True)
def get_gce_zones(project_id: str) -> Set[str]:
665@caching.cached_api_call(in_memory=True)
666def get_gce_zones(project_id: str) -> Set[str]:
667  try:
668    gce_api = apis.get_api('compute', 'v1', project_id)
669    logging.info('listing gce zones of project %s', project_id)
670    request = gce_api.zones().list(project=project_id)
671    response = request.execute(num_retries=config.API_RETRIES)
672    if not response or 'items' not in response:
673      return set()
674    return {item['name'] for item in response['items'] if 'name' in item}
675  except googleapiclient.errors.HttpError as err:
676    raise utils.GcpApiError(err) from err
def get_gce_public_licences(project_id: str) -> List[str]:
679def get_gce_public_licences(project_id: str) -> List[str]:
680  """Returns a list of licenses based on publicly available image project"""
681  licenses = []
682  gce_api = apis.get_api('compute', 'v1', project_id)
683  logging.info('listing licenses of project %s', project_id)
684  request = gce_api.licenses().list(project=project_id)
685  while request is not None:
686    response = request.execute()
687    for license_ in response['items']:
688      formatted_license = license_['selfLink'].partition('/global/licenses/')[2]
689      licenses.append(formatted_license)
690    request = gce_api.licenses().list_next(previous_request=request,
691                                           previous_response=response)
692  return licenses

Returns a list of licenses based on publicly available image project

def get_instance( project_id: str, zone: str, instance_name: str) -> Instance:
695def get_instance(project_id: str, zone: str, instance_name: str) -> Instance:
696  """Returns instance object matching instance name and zone"""
697  compute = apis.get_api('compute', 'v1', project_id)
698  request = compute.instances().get(project=project_id,
699                                    zone=zone,
700                                    instance=instance_name)
701
702  response = request.execute(num_retries=config.API_RETRIES)
703  return Instance(project_id, resource_data=response)

Returns instance object matching instance name and zone

@caching.cached_api_call(in_memory=True)
def get_disk(project_id: str, zone: str, disk_name: str) -> Disk:
706@caching.cached_api_call(in_memory=True)
707def get_disk(project_id: str, zone: str, disk_name: str) -> Disk:
708  """Returns disk object matching disk name and zone"""
709  compute = apis.get_api('compute', 'v1', project_id)
710  request = compute.disks().get(project=project_id, zone=zone, disk=disk_name)
711  response = request.execute(num_retries=config.API_RETRIES)
712  return Disk(project_id, resource_data=response)

Returns disk object matching disk name and zone

@caching.cached_api_call(in_memory=True)
def get_instances( context: gcpdiag.models.Context) -> Mapping[str, Instance]:
715@caching.cached_api_call(in_memory=True)
716def get_instances(context: models.Context) -> Mapping[str, Instance]:
717  """Get a list of Instance matching the given context, indexed by instance id."""
718
719  instances: Dict[str, Instance] = {}
720  if not apis.is_enabled(context.project_id, 'compute'):
721    return instances
722  gce_api = apis.get_api('compute', 'v1', context.project_id)
723  requests = [
724      gce_api.instances().list(project=context.project_id, zone=zone)
725      for zone in get_gce_zones(context.project_id)
726  ]
727  logging.info('listing gce instances of project %s', context.project_id)
728  items = apis_utils.multi_list_all(
729      requests=requests,
730      next_function=gce_api.instances().list_next,
731  )
732  for i in items:
733    result = re.match(
734        r'https://www.googleapis.com/compute/v1/projects/[^/]+/zones/([^/]+)/',
735        i['selfLink'],
736    )
737    if not result:
738      logging.error("instance %s selfLink didn't match regexp: %s", i['id'],
739                    i['selfLink'])
740      continue
741    zone = result.group(1)
742    labels = i.get('labels', {})
743    resource = i.get('name', '')
744    if not context.match_project_resource(
745        location=zone, labels=labels, resource=resource):
746      continue
747    instances[i['id']] = Instance(project_id=context.project_id,
748                                  resource_data=i)
749  return instances

Get a list of Instance matching the given context, indexed by instance id.

@caching.cached_api_call(in_memory=True)
def get_instance_groups( context: gcpdiag.models.Context) -> Mapping[str, InstanceGroup]:
752@caching.cached_api_call(in_memory=True)
753def get_instance_groups(context: models.Context) -> Mapping[str, InstanceGroup]:
754  """Get a list of InstanceGroups matching the given context, indexed by name."""
755  groups: Dict[str, InstanceGroup] = {}
756  if not apis.is_enabled(context.project_id, 'compute'):
757    return groups
758  gce_api = apis.get_api('compute', 'v1', context.project_id)
759  requests = [
760      gce_api.instanceGroups().list(project=context.project_id, zone=zone)
761      for zone in get_gce_zones(context.project_id)
762  ]
763  logging.info('listing gce instance groups of project %s', context.project_id)
764  items = apis_utils.multi_list_all(
765      requests=requests,
766      next_function=gce_api.instanceGroups().list_next,
767  )
768  for i in items:
769    result = re.match(
770        r'https://www.googleapis.com/compute/v1/projects/[^/]+/zones/([^/]+)',
771        i['selfLink'],
772    )
773    if not result:
774      logging.error("instance %s selfLink didn't match regexp: %s", i['id'],
775                    i['selfLink'])
776      continue
777    zone = result.group(1)
778    labels = i.get('labels', {})
779    resource = i.get('name', '')
780    if not context.match_project_resource(
781        location=zone, labels=labels, resource=resource):
782      continue
783    instance_group = InstanceGroup(context.project_id, i)
784    groups[instance_group.full_path] = instance_group
785  return groups

Get a list of InstanceGroups matching the given context, indexed by name.

@caching.cached_api_call(in_memory=True)
def get_managed_instance_groups( context: gcpdiag.models.Context) -> Mapping[int, ManagedInstanceGroup]:
788@caching.cached_api_call(in_memory=True)
789def get_managed_instance_groups(
790    context: models.Context,) -> Mapping[int, ManagedInstanceGroup]:
791  """Get a list of zonal ManagedInstanceGroups matching the given context, indexed by mig id."""
792
793  migs: Dict[int, ManagedInstanceGroup] = {}
794  if not apis.is_enabled(context.project_id, 'compute'):
795    return migs
796  gce_api = apis.get_api('compute', 'v1', context.project_id)
797  requests = [
798      gce_api.instanceGroupManagers().list(project=context.project_id,
799                                           zone=zone)
800      for zone in get_gce_zones(context.project_id)
801  ]
802  logging.info('listing zonal managed instance groups of project %s',
803               context.project_id)
804  items = apis_utils.multi_list_all(
805      requests=requests,
806      next_function=gce_api.instanceGroupManagers().list_next,
807  )
808  for i in items:
809    result = re.match(
810        r'https://www.googleapis.com/compute/v1/projects/[^/]+/(?:regions|zones)/([^/]+)/',
811        i['selfLink'],
812    )
813    if not result:
814      logging.error("mig %s selfLink didn't match regexp: %s", i['name'],
815                    i['selfLink'])
816      continue
817    location = result.group(1)
818    labels = i.get('labels', {})
819    resource = i.get('name', '')
820    if not context.match_project_resource(
821        location=location, labels=labels, resource=resource):
822      continue
823    migs[i['id']] = ManagedInstanceGroup(project_id=context.project_id,
824                                         resource_data=i)
825  return migs

Get a list of zonal ManagedInstanceGroups matching the given context, indexed by mig id.

@caching.cached_api_call(in_memory=True)
def get_region_managed_instance_groups( context: gcpdiag.models.Context) -> Mapping[int, ManagedInstanceGroup]:
828@caching.cached_api_call(in_memory=True)
829def get_region_managed_instance_groups(
830    context: models.Context,) -> Mapping[int, ManagedInstanceGroup]:
831  """Get a list of regional ManagedInstanceGroups matching the given context, indexed by mig id."""
832
833  migs: Dict[int, ManagedInstanceGroup] = {}
834  if not apis.is_enabled(context.project_id, 'compute'):
835    return migs
836  gce_api = apis.get_api('compute', 'v1', context.project_id)
837  requests = [
838      gce_api.regionInstanceGroupManagers().list(project=context.project_id,
839                                                 region=r.name)
840      for r in get_all_regions(context.project_id)
841  ]
842  logging.info('listing regional managed instance groups of project %s',
843               context.project_id)
844  items = apis_utils.multi_list_all(
845      requests=requests,
846      next_function=gce_api.regionInstanceGroupManagers().list_next,
847  )
848  for i in items:
849    result = re.match(
850        r'https://www.googleapis.com/compute/v1/projects/[^/]+/(?:regions)/([^/]+)/',
851        i['selfLink'],
852    )
853    if not result:
854      logging.error("mig %s selfLink didn't match regexp: %s", i['name'],
855                    i['selfLink'])
856      continue
857    location = result.group(1)
858    labels = i.get('labels', {})
859    name = i.get('name', '')
860    if not context.match_project_resource(
861        location=location, labels=labels, resource=name):
862      continue
863    migs[i['id']] = ManagedInstanceGroup(project_id=context.project_id,
864                                         resource_data=i)
865  return migs

Get a list of regional ManagedInstanceGroups matching the given context, indexed by mig id.

@caching.cached_api_call
def get_instance_templates(project_id: str) -> Mapping[str, InstanceTemplate]:
868@caching.cached_api_call
869def get_instance_templates(project_id: str) -> Mapping[str, InstanceTemplate]:
870  logging.info('fetching instance templates')
871  templates = {}
872  gce_api = apis.get_api('compute', 'v1', project_id)
873  request = gce_api.instanceTemplates().list(
874      project=project_id,
875      returnPartialSuccess=True,
876      # Fetch only a subset of the fields to improve performance.
877      fields=('items/name, items/properties/tags,'
878              ' items/properties/networkInterfaces,'
879              ' items/properties/serviceAccounts, items/properties/metadata'),
880  )
881  for t in apis_utils.list_all(
882      request, next_function=gce_api.instanceTemplates().list_next):
883    instance_template = InstanceTemplate(project_id, t)
884    templates[instance_template.full_path] = instance_template
885  return templates
@caching.cached_api_call
def get_project_metadata(project_id) -> Mapping[str, str]:
888@caching.cached_api_call
889def get_project_metadata(project_id) -> Mapping[str, str]:
890  gce_api = apis.get_api('compute', 'v1', project_id)
891  logging.info('fetching metadata of project %s\n', project_id)
892  query = gce_api.projects().get(project=project_id)
893  try:
894    response = query.execute(num_retries=config.API_RETRIES)
895  except googleapiclient.errors.HttpError as err:
896    raise utils.GcpApiError(err) from err
897
898  mapped_metadata: Dict[str, str] = {}
899  metadata = response.get('commonInstanceMetadata')
900  if metadata and 'items' in metadata:
901    for m_item in metadata['items']:
902      mapped_metadata[m_item.get('key')] = m_item.get('value')
903  return mapped_metadata
@caching.cached_api_call
def get_instances_serial_port_output(context: gcpdiag.models.Context):
906@caching.cached_api_call
907def get_instances_serial_port_output(context: models.Context):
908  """Get a list of serial port output for instances
909
910  which matches the given context, running and is not
911  exported to cloud logging.
912  """
913  # Create temp storage (diskcache.Deque) for output
914  deque = caching.get_tmp_deque('tmp-gce-serial-output-')
915  if not apis.is_enabled(context.project_id, 'compute'):
916    return deque
917  gce_api = apis.get_api('compute', 'v1', context.project_id)
918
919  # Serial port output are rolled over on day 7 and limited to 1MB.
920  # Fetching serial outputs are very expensive so optimize to fetch.
921  # Only relevant instances as storage size can grow drastically for
922  # massive projects. Think 1MB * N where N is some large number.
923  requests = [
924      gce_api.instances().getSerialPortOutput(
925          project=i.project_id,
926          zone=i.zone,
927          instance=i.id,
928          # To get all 1mb output
929          start=-1000000,
930      )
931      for i in get_instances(context).values()
932      # fetch running instances that do not export to cloud logging
933      if not i.is_serial_port_logging_enabled() and i.is_running
934  ]
935  requests_start_time = datetime.now()
936  # Note: We are limited to 1000 calls in a single batch request.
937  # We have to use multiple batch requests in batches of 1000
938  # https://github.com/googleapis/google-api-python-client/blob/main/docs/batch.md
939  batch_size = 1000
940  for i in range(0, len(requests), batch_size):
941    batch_requests = requests[i:i + batch_size]
942    for _, response, exception in apis_utils.batch_execute_all(
943        api=gce_api, requests=batch_requests):
944      if exception:
945        if isinstance(exception, googleapiclient.errors.HttpError):
946          raise utils.GcpApiError(exception) from exception
947        else:
948          raise exception
949
950      if response:
951        result = re.match(
952            r'https://www.googleapis.com/compute/v1/projects/([^/]+)/zones/[^/]+/instances/([^/]+)',
953            response['selfLink'],
954        )
955        if not result:
956          logging.error("instance selfLink didn't match regexp: %s",
957                        response['selfLink'])
958          return
959
960        project_id = result.group(1)
961        instance_id = result.group(2)
962        deque.appendleft(
963            SerialPortOutput(
964                project_id=project_id,
965                instance_id=instance_id,
966                contents=response['contents'].splitlines(),
967            ))
968  requests_end_time = datetime.now()
969  logging.debug(
970      'total serial logs processing time: %s, number of instances: %s',
971      requests_end_time - requests_start_time,
972      len(requests),
973  )
974  return deque

Get a list of serial port output for instances

which matches the given context, running and is not exported to cloud logging.

@caching.cached_api_call
def get_instance_serial_port_output( project_id, zone, instance_name) -> Optional[SerialPortOutput]:
 977@caching.cached_api_call
 978def get_instance_serial_port_output(
 979    project_id, zone, instance_name) -> Optional[SerialPortOutput]:
 980  """Get a list of serial port output for instances
 981
 982  which matches the given context, running and is not
 983  exported to cloud logging.
 984  """
 985  # Create temp storage (diskcache.Deque) for output
 986  if not apis.is_enabled(project_id, 'compute'):
 987    return None
 988  gce_api = apis.get_api('compute', 'v1', project_id)
 989
 990  request = gce_api.instances().getSerialPortOutput(
 991      project=project_id,
 992      zone=zone,
 993      instance=instance_name,
 994      # To get all 1mb output
 995      start=-1000000,
 996  )
 997  try:
 998    response = request.execute(num_retries=config.API_RETRIES)
 999  except googleapiclient.errors.HttpError:
1000    return None
1001
1002  if response:
1003    result = re.match(
1004        r'https://www.googleapis.com/compute/v1/projects/([^/]+)/zones/[^/]+/instances/([^/]+)',
1005        response['selfLink'],
1006    )
1007    if not result:
1008      logging.error("instance selfLink didn't match regexp: %s",
1009                    response['selfLink'])
1010      return None
1011
1012    project_id = result.group(1)
1013    instance_id = result.group(2)
1014    return SerialPortOutput(
1015        project_id,
1016        instance_id=instance_id,
1017        contents=response['contents'].splitlines(),
1018    )
1019  return None

Get a list of serial port output for instances

which matches the given context, running and is not exported to cloud logging.

class Region(gcpdiag.models.Resource):
1022class Region(models.Resource):
1023  """Represents a GCE Region."""
1024
1025  _resource_data: dict
1026
1027  def __init__(self, project_id, resource_data):
1028    super().__init__(project_id=project_id)
1029    self._resource_data = resource_data
1030
1031  @property
1032  def self_link(self) -> str:
1033    return self._resource_data['selfLink']
1034
1035  @property
1036  def full_path(self) -> str:
1037    result = re.match(r'https://www.googleapis.com/compute/v1/(.*)',
1038                      self.self_link)
1039    if result:
1040      return result.group(1)
1041    else:
1042      return f'>> {self.self_link}'
1043
1044  @property
1045  def name(self) -> str:
1046    return self._resource_data['name']

Represents a GCE Region.

Region(project_id, resource_data)
1027  def __init__(self, project_id, resource_data):
1028    super().__init__(project_id=project_id)
1029    self._resource_data = resource_data
full_path: str
1035  @property
1036  def full_path(self) -> str:
1037    result = re.match(r'https://www.googleapis.com/compute/v1/(.*)',
1038                      self.self_link)
1039    if result:
1040      return result.group(1)
1041    else:
1042      return f'>> {self.self_link}'

Returns the full path of this resource.

Example: 'projects/gcpdiag-gke-1-9b90/zones/europe-west4-a/clusters/gke1'

name: str
1044  @property
1045  def name(self) -> str:
1046    return self._resource_data['name']
@caching.cached_api_call
def get_all_regions(project_id: str) -> Iterable[Region]:
1049@caching.cached_api_call
1050def get_all_regions(project_id: str) -> Iterable[Region]:
1051  """Return list of all regions
1052
1053  Args:
1054      project_id (str): project id for this request
1055
1056  Raises:
1057      utils.GcpApiError: Raises GcpApiError in case of query issues
1058
1059  Returns:
1060      Iterable[Region]: Return list of all regions
1061  """
1062  try:
1063    gce_api = apis.get_api('compute', 'v1', project_id)
1064    request = gce_api.regions().list(project=project_id)
1065    response = request.execute(num_retries=config.API_RETRIES)
1066    if not response or 'items' not in response:
1067      return set()
1068
1069    return {
1070        Region(project_id, item) for item in response['items'] if 'name' in item
1071    }
1072  except googleapiclient.errors.HttpError as err:
1073    raise utils.GcpApiError(err) from err

Return list of all regions

Arguments:
  • project_id (str): project id for this request
Raises:
  • utils.GcpApiError: Raises GcpApiError in case of query issues
Returns:

Iterable[Region]: Return list of all regions

def get_regions_with_instances(context: gcpdiag.models.Context) -> Iterable[Region]:
1076def get_regions_with_instances(context: models.Context) -> Iterable[Region]:
1077  """Return list of regions with instances
1078
1079  Args:
1080      context (models.Context): context for this request
1081
1082  Returns:
1083      Iterable[Region]: Return list of regions which contains instances
1084  """
1085
1086  regions_of_instances = {i.region for i in get_instances(context).values()}
1087
1088  all_regions = get_all_regions(context.project_id)
1089  if not all_regions:
1090    return set()
1091
1092  return {r for r in all_regions if r.name in regions_of_instances}

Return list of regions with instances

Arguments:
  • context (models.Context): context for this request
Returns:

Iterable[Region]: Return list of regions which contains instances

@caching.cached_api_call
def get_all_disks(project_id: str) -> Iterable[Disk]:
1095@caching.cached_api_call
1096def get_all_disks(project_id: str) -> Iterable[Disk]:
1097  # Fetching only Zonal Disks(Regional disks exempted)
1098  try:
1099    gce_api = apis.get_api('compute', 'v1', project_id)
1100    requests = [
1101        gce_api.disks().list(project=project_id, zone=zone)
1102        for zone in get_gce_zones(project_id)
1103    ]
1104    logging.info('listing gce disks of project %s', project_id)
1105    items = apis_utils.multi_list_all(
1106        requests=requests,
1107        next_function=gce_api.disks().list_next,
1108    )
1109
1110    return {Disk(project_id, item) for item in items}
1111
1112  except googleapiclient.errors.HttpError as err:
1113    raise utils.GcpApiError(err) from err
@caching.cached_api_call
def get_all_disks_of_instance(project_id: str, zone: str, instance_name: str) -> dict:
1116@caching.cached_api_call
1117def get_all_disks_of_instance(project_id: str, zone: str,
1118                              instance_name: str) -> dict:
1119  # Fetching only Zonal Disks(Regional disks exempted) attached to an instance
1120  try:
1121    gce_api = apis.get_api('compute', 'v1', project_id)
1122    requests = [gce_api.disks().list(project=project_id, zone=zone)]
1123    logging.info('listing gce disks attached to instance %s in project %s',
1124                 instance_name, project_id)
1125    items = apis_utils.multi_list_all(
1126        requests=requests,
1127        next_function=gce_api.disks().list_next,
1128    )
1129    all_disk_list = {Disk(project_id, item) for item in items}
1130    disk_list = {}
1131    for disk in all_disk_list:
1132      if disk.users == [instance_name]:
1133        disk_list[disk.name] = disk
1134    return disk_list
1135
1136  except googleapiclient.errors.HttpError as err:
1137    raise utils.GcpApiError(err) from err
class InstanceEffectiveFirewalls(gcpdiag.queries.network.EffectiveFirewalls):
1140class InstanceEffectiveFirewalls(network_q.EffectiveFirewalls):
1141  """Effective firewall rules for a network interface on a VM instance.
1142
1143  Includes org/folder firewall policies).
1144  """
1145
1146  _instance: Instance
1147  _nic: str
1148
1149  def __init__(self, instance, nic, resource_data):
1150    super().__init__(resource_data)
1151    self._instance = instance
1152    self._nic = nic

Effective firewall rules for a network interface on a VM instance.

Includes org/folder firewall policies).

InstanceEffectiveFirewalls(instance, nic, resource_data)
1149  def __init__(self, instance, nic, resource_data):
1150    super().__init__(resource_data)
1151    self._instance = instance
1152    self._nic = nic
@caching.cached_api_call(in_memory=True)
def get_instance_interface_effective_firewalls( instance: Instance, nic: str) -> InstanceEffectiveFirewalls:
1155@caching.cached_api_call(in_memory=True)
1156def get_instance_interface_effective_firewalls(
1157    instance: Instance, nic: str) -> InstanceEffectiveFirewalls:
1158  """Return effective firewalls for a network interface on the instance"""
1159  compute = apis.get_api('compute', 'v1', instance.project_id)
1160  request = compute.instances().getEffectiveFirewalls(
1161      project=instance.project_id,
1162      zone=instance.zone,
1163      instance=instance.name,
1164      networkInterface=nic,
1165  )
1166  response = request.execute(num_retries=config.API_RETRIES)
1167  return InstanceEffectiveFirewalls(Instance, nic, response)

Return effective firewalls for a network interface on the instance

def is_project_serial_port_logging_enabled(project_id: str) -> bool:
1170def is_project_serial_port_logging_enabled(project_id: str) -> bool:
1171  if not apis.is_enabled(project_id, 'compute'):
1172    return False
1173
1174  value = get_project_metadata(
1175      project_id=project_id).get('serial-port-logging-enable')
1176  return bool(value and value.upper() in POSITIVE_BOOL_VALUES)
def is_serial_port_buffer_enabled():
1179def is_serial_port_buffer_enabled():
1180  return config.get('enable_gce_serial_buffer')
class SerialOutputQuery:
1197class SerialOutputQuery:
1198  """A serial output job that was started with prefetch_logs()."""
1199
1200  job: _SerialOutputJob
1201
1202  def __init__(self, job):
1203    self.job = job
1204
1205  @property
1206  def entries(self) -> Sequence:
1207    if not self.job.future:
1208      raise RuntimeError("Fetching serial logs wasn't executed. did you call"
1209                         ' execute_get_serial_port_output()?')
1210    elif self.job.future.running():
1211      logging.info(
1212          'waiting for serial output results for project: %s',
1213          self.job.context.project_id,
1214      )
1215    return self.job.future.result()

A serial output job that was started with prefetch_logs().

SerialOutputQuery(job)
1202  def __init__(self, job):
1203    self.job = job
job: gcpdiag.queries.gce._SerialOutputJob
entries: Sequence
1205  @property
1206  def entries(self) -> Sequence:
1207    if not self.job.future:
1208      raise RuntimeError("Fetching serial logs wasn't executed. did you call"
1209                         ' execute_get_serial_port_output()?')
1210    elif self.job.future.running():
1211      logging.info(
1212          'waiting for serial output results for project: %s',
1213          self.job.context.project_id,
1214      )
1215    return self.job.future.result()
jobs_todo: Dict[gcpdiag.models.Context, gcpdiag.queries.gce._SerialOutputJob] = {}
def execute_fetch_serial_port_outputs(executor: concurrent.futures._base.Executor):
1221def execute_fetch_serial_port_outputs(executor: concurrent.futures.Executor):
1222  # start a thread to fetch serial log; processing logs can be large
1223  # depending on he number of instances in the project which aren't logging to cloud logging
1224  # currently expects only one job but implementing it so support for multiple projects is possible.
1225  global jobs_todo
1226  jobs_executing = jobs_todo
1227  jobs_todo = {}
1228  for job in jobs_executing.values():
1229    job.future = executor.submit(get_instances_serial_port_output, job.context)
def fetch_serial_port_outputs(context: gcpdiag.models.Context) -> SerialOutputQuery:
1232def fetch_serial_port_outputs(context: models.Context) -> SerialOutputQuery:
1233  # Aggregate by context
1234  job = jobs_todo.setdefault(context, _SerialOutputJob(context=context))
1235  return SerialOutputQuery(job=job)
class HealthCheck(gcpdiag.models.Resource):
1239class HealthCheck(models.Resource):
1240  """A Health Check resource."""
1241
1242  _resource_data: dict
1243  _type: str
1244
1245  def __init__(self, project_id, resource_data):
1246    super().__init__(project_id=project_id)
1247    self._resource_data = resource_data
1248
1249  @property
1250  def name(self) -> str:
1251    return self._resource_data['name']
1252
1253  @property
1254  def full_path(self) -> str:
1255    result = re.match(r'https://www.googleapis.com/compute/v1/(.*)',
1256                      self.self_link)
1257    if result:
1258      return result.group(1)
1259    else:
1260      return f'>> {self.self_link}'
1261
1262  @property
1263  def short_path(self) -> str:
1264    path = self.project_id + '/' + self.name
1265    return path
1266
1267  @property
1268  def self_link(self) -> str:
1269    return self._resource_data['selfLink']
1270
1271  @property
1272  def is_log_enabled(self) -> bool:
1273    try:
1274      log_config = self._resource_data.get('logConfig', False)
1275      if log_config and log_config['enable']:
1276        return True
1277    except KeyError:
1278      return False
1279    return False
1280
1281  @property
1282  def region(self):
1283    url = self._resource_data.get('region', '')
1284    match = re.search(r'/([^/]+)/?$', url)
1285    if match:
1286      region = match.group(1)
1287      return region
1288    return None
1289
1290  @property
1291  def type(self) -> str:
1292    return self._resource_data['type']
1293
1294  @property
1295  def request_path(self) -> str:
1296    return self.get_health_check_property('requestPath', '/')
1297
1298  @property
1299  def request(self) -> str:
1300    return self.get_health_check_property('request')
1301
1302  @property
1303  def response(self) -> str:
1304    return self.get_health_check_property('response')
1305
1306  @property
1307  def port(self) -> int:
1308    return self.get_health_check_property('port')
1309
1310  @property
1311  def port_specification(self) -> str:
1312    return self.get_health_check_property('portSpecification')
1313
1314  @property
1315  def timeout_sec(self) -> int:
1316    return self._resource_data.get('timeoutSec', 5)
1317
1318  def get_health_check_property(self, property_name: str, default_value=None):
1319    health_check_types = {
1320        'HTTP': 'httpHealthCheck',
1321        'HTTPS': 'httpsHealthCheck',
1322        'HTTP2': 'http2HealthCheck',
1323        'TCP': 'tcpHealthCheck',
1324        'SSL': 'sslHealthCheck',
1325        'GRPC': 'grpcHealthCheck',
1326    }
1327    if self.type in health_check_types:
1328      health_check_data = self._resource_data.get(health_check_types[self.type])
1329      if health_check_data:
1330        return health_check_data.get(property_name) or default_value
1331    return default_value

A Health Check resource.

HealthCheck(project_id, resource_data)
1245  def __init__(self, project_id, resource_data):
1246    super().__init__(project_id=project_id)
1247    self._resource_data = resource_data
name: str
1249  @property
1250  def name(self) -> str:
1251    return self._resource_data['name']
full_path: str
1253  @property
1254  def full_path(self) -> str:
1255    result = re.match(r'https://www.googleapis.com/compute/v1/(.*)',
1256                      self.self_link)
1257    if result:
1258      return result.group(1)
1259    else:
1260      return f'>> {self.self_link}'

Returns the full path of this resource.

Example: 'projects/gcpdiag-gke-1-9b90/zones/europe-west4-a/clusters/gke1'

short_path: str
1262  @property
1263  def short_path(self) -> str:
1264    path = self.project_id + '/' + self.name
1265    return path

Returns the short name for this resource.

Note that it isn't clear from this name what kind of resource it is.

Example: 'gke1'

is_log_enabled: bool
1271  @property
1272  def is_log_enabled(self) -> bool:
1273    try:
1274      log_config = self._resource_data.get('logConfig', False)
1275      if log_config and log_config['enable']:
1276        return True
1277    except KeyError:
1278      return False
1279    return False
region
1281  @property
1282  def region(self):
1283    url = self._resource_data.get('region', '')
1284    match = re.search(r'/([^/]+)/?$', url)
1285    if match:
1286      region = match.group(1)
1287      return region
1288    return None
type: str
1290  @property
1291  def type(self) -> str:
1292    return self._resource_data['type']
request_path: str
1294  @property
1295  def request_path(self) -> str:
1296    return self.get_health_check_property('requestPath', '/')
request: str
1298  @property
1299  def request(self) -> str:
1300    return self.get_health_check_property('request')
response: str
1302  @property
1303  def response(self) -> str:
1304    return self.get_health_check_property('response')
port: int
1306  @property
1307  def port(self) -> int:
1308    return self.get_health_check_property('port')
port_specification: str
1310  @property
1311  def port_specification(self) -> str:
1312    return self.get_health_check_property('portSpecification')
timeout_sec: int
1314  @property
1315  def timeout_sec(self) -> int:
1316    return self._resource_data.get('timeoutSec', 5)
def get_health_check_property(self, property_name: str, default_value=None):
1318  def get_health_check_property(self, property_name: str, default_value=None):
1319    health_check_types = {
1320        'HTTP': 'httpHealthCheck',
1321        'HTTPS': 'httpsHealthCheck',
1322        'HTTP2': 'http2HealthCheck',
1323        'TCP': 'tcpHealthCheck',
1324        'SSL': 'sslHealthCheck',
1325        'GRPC': 'grpcHealthCheck',
1326    }
1327    if self.type in health_check_types:
1328      health_check_data = self._resource_data.get(health_check_types[self.type])
1329      if health_check_data:
1330        return health_check_data.get(property_name) or default_value
1331    return default_value
@caching.cached_api_call(in_memory=True)
def get_health_check(project_id: str, health_check: str, region: str = None) -> object:
1334@caching.cached_api_call(in_memory=True)
1335def get_health_check(project_id: str,
1336                     health_check: str,
1337                     region: str = None) -> object:
1338  logging.info('fetching health check: %s', health_check)
1339  compute = apis.get_api('compute', 'v1', project_id)
1340  if not region:
1341    request = compute.healthChecks().get(project=project_id,
1342                                         healthCheck=health_check)
1343  else:
1344    request = compute.regionHealthChecks().get(project=project_id,
1345                                               healthCheck=health_check,
1346                                               region=region)
1347  response = request.execute(num_retries=config.API_RETRIES)
1348  return HealthCheck(project_id, response)
class NetworkEndpointGroup(gcpdiag.models.Resource):
1351class NetworkEndpointGroup(models.Resource):
1352  """A Network Endpoint Group resource."""
1353
1354  _resource_data: dict
1355  _type: str
1356
1357  def __init__(self, project_id, resource_data):
1358    super().__init__(project_id=project_id)
1359    self._resource_data = resource_data
1360
1361  @property
1362  def name(self) -> str:
1363    return self._resource_data['name']
1364
1365  @property
1366  def id(self) -> str:
1367    return self._resource_data['id']
1368
1369  @property
1370  def full_path(self) -> str:
1371    result = re.match(r'https://www.googleapis.com/compute/v1/(.*)',
1372                      self.self_link)
1373    if result:
1374      return result.group(1)
1375    else:
1376      return f'>> {self.self_link}'
1377
1378  @property
1379  def short_path(self) -> str:
1380    path = self.project_id + '/' + self.name
1381    return path
1382
1383  @property
1384  def self_link(self) -> str:
1385    return self._resource_data['selfLink']

A Network Endpoint Group resource.

NetworkEndpointGroup(project_id, resource_data)
1357  def __init__(self, project_id, resource_data):
1358    super().__init__(project_id=project_id)
1359    self._resource_data = resource_data
name: str
1361  @property
1362  def name(self) -> str:
1363    return self._resource_data['name']
id: str
1365  @property
1366  def id(self) -> str:
1367    return self._resource_data['id']
full_path: str
1369  @property
1370  def full_path(self) -> str:
1371    result = re.match(r'https://www.googleapis.com/compute/v1/(.*)',
1372                      self.self_link)
1373    if result:
1374      return result.group(1)
1375    else:
1376      return f'>> {self.self_link}'

Returns the full path of this resource.

Example: 'projects/gcpdiag-gke-1-9b90/zones/europe-west4-a/clusters/gke1'

short_path: str
1378  @property
1379  def short_path(self) -> str:
1380    path = self.project_id + '/' + self.name
1381    return path

Returns the short name for this resource.

Note that it isn't clear from this name what kind of resource it is.

Example: 'gke1'

@caching.cached_api_call(in_memory=True)
def get_zonal_network_endpoint_groups( context: gcpdiag.models.Context) -> Mapping[str, NetworkEndpointGroup]:
1388@caching.cached_api_call(in_memory=True)
1389def get_zonal_network_endpoint_groups(
1390    context: models.Context,) -> Mapping[str, NetworkEndpointGroup]:
1391  """Returns a list of Network Endpoint Groups in the project."""
1392  groups: Dict[str, NetworkEndpointGroup] = {}
1393  if not apis.is_enabled(context.project_id, 'compute'):
1394    return groups
1395  gce_api = apis.get_api('compute', 'v1', context.project_id)
1396  requests = [
1397      gce_api.networkEndpointGroups().list(project=context.project_id,
1398                                           zone=zone)
1399      for zone in get_gce_zones(context.project_id)
1400  ]
1401  logging.info('listing gce networkEndpointGroups of project %s',
1402               context.project_id)
1403  items = apis_utils.multi_list_all(
1404      requests=requests,
1405      next_function=gce_api.networkEndpointGroups().list_next,
1406  )
1407
1408  for i in items:
1409    result = re.match(
1410        r'https://www.googleapis.com/compute/v1/projects/[^/]+/zones/([^/]+)',
1411        i['selfLink'],
1412    )
1413    if not result:
1414      logging.error("instance %s selfLink didn't match regexp: %s", i['id'],
1415                    i['selfLink'])
1416      continue
1417    zone = result.group(1)
1418    labels = i.get('labels', {})
1419    resource = i.get('name', '')
1420    if not context.match_project_resource(
1421        location=zone, labels=labels, resource=resource):
1422      continue
1423    data = NetworkEndpointGroup(context.project_id, i)
1424    groups[data.full_path] = data
1425  return groups

Returns a list of Network Endpoint Groups in the project.