gcpdiag.queries.gce

Queries related to GCP Compute Engine.
POSITIVE_BOOL_VALUES = {'Y', 'TRUE', 'YES', '1'}
DATAPROC_LABEL = 'goog-dataproc-cluster-name'
GKE_LABEL = 'goog-gke-node'
class InstanceTemplate(gcpdiag.models.Resource):
 39class InstanceTemplate(models.Resource):
 40  """Represents a GCE Instance Template."""
 41
 42  _resource_data: dict
 43
 44  def __init__(self, project_id, resource_data):
 45    super().__init__(project_id=project_id)
 46    self._resource_data = resource_data
 47
 48  @property
 49  def self_link(self) -> str:
 50    return self._resource_data['selfLink']
 51
 52  @property
 53  def full_path(self) -> str:
 54    result = re.match(r'https://www.googleapis.com/compute/v1/(.*)',
 55                      self.self_link)
 56    if result:
 57      return result.group(1)
 58    else:
 59      return f'>> {self.self_link}'
 60
 61  @property
 62  def short_path(self) -> str:
 63    path = self.project_id + '/' + self.name
 64    return path
 65
 66  @property
 67  def name(self) -> str:
 68    return self._resource_data['name']
 69
 70  @property
 71  def tags(self) -> List[str]:
 72    return self._resource_data['properties'].get('tags', {}).get('items', [])
 73
 74  @property
 75  def service_account(self) -> Optional[str]:
 76    sa_list = self._resource_data['properties'].get('serviceAccounts', [])
 77    if not sa_list:
 78      return None
 79    email = sa_list[0]['email']
 80    if email == 'default':
 81      project_nr = crm.get_project(self._project_id).number
 82      return f'{project_nr}-compute@developer.gserviceaccount.com'
 83    return email
 84
 85  @property
 86  def network(self) -> network_q.Network:
 87    return network_q.get_network_from_url(
 88        self._resource_data['properties']['networkInterfaces'][0]['network'])
 89
 90  @property
 91  def subnetwork(self) -> network_q.Subnetwork:
 92    subnet_url = self._resource_data['properties']['networkInterfaces'][0][
 93        'subnetwork']
 94    return self.network.subnetworks[subnet_url]
 95
 96  def get_metadata(self, key: str) -> str:
 97    for item in self._resource_data['properties']['metadata']['items']:
 98      if item['key'] == key:
 99        return item['value']
100    return ''

Represents a GCE Instance Template.

InstanceTemplate(project_id, resource_data)
44  def __init__(self, project_id, resource_data):
45    super().__init__(project_id=project_id)
46    self._resource_data = resource_data
full_path: str
52  @property
53  def full_path(self) -> str:
54    result = re.match(r'https://www.googleapis.com/compute/v1/(.*)',
55                      self.self_link)
56    if result:
57      return result.group(1)
58    else:
59      return f'>> {self.self_link}'

Returns the full path of this resource.

Example: 'projects/gcpdiag-gke-1-9b90/zones/europe-west4-a/clusters/gke1'

short_path: str
61  @property
62  def short_path(self) -> str:
63    path = self.project_id + '/' + self.name
64    return path

Returns the short name for this resource.

Note that it isn't clear from this name what kind of resource it is.

Example: 'gke1'

name: str
66  @property
67  def name(self) -> str:
68    return self._resource_data['name']
tags: List[str]
70  @property
71  def tags(self) -> List[str]:
72    return self._resource_data['properties'].get('tags', {}).get('items', [])
service_account: Optional[str]
74  @property
75  def service_account(self) -> Optional[str]:
76    sa_list = self._resource_data['properties'].get('serviceAccounts', [])
77    if not sa_list:
78      return None
79    email = sa_list[0]['email']
80    if email == 'default':
81      project_nr = crm.get_project(self._project_id).number
82      return f'{project_nr}-compute@developer.gserviceaccount.com'
83    return email
network: gcpdiag.queries.network.Network
85  @property
86  def network(self) -> network_q.Network:
87    return network_q.get_network_from_url(
88        self._resource_data['properties']['networkInterfaces'][0]['network'])
subnetwork: gcpdiag.queries.network.Subnetwork
90  @property
91  def subnetwork(self) -> network_q.Subnetwork:
92    subnet_url = self._resource_data['properties']['networkInterfaces'][0][
93        'subnetwork']
94    return self.network.subnetworks[subnet_url]
def get_metadata(self, key: str) -> str:
 96  def get_metadata(self, key: str) -> str:
 97    for item in self._resource_data['properties']['metadata']['items']:
 98      if item['key'] == key:
 99        return item['value']
100    return ''
class InstanceGroup(gcpdiag.models.Resource):
103class InstanceGroup(models.Resource):
104  """Represents a GCE instance group."""
105
106  _resource_data: dict
107
108  def __init__(self, project_id, resource_data):
109    super().__init__(project_id=project_id)
110    self._resource_data = resource_data
111
112  @property
113  def full_path(self) -> str:
114    result = re.match(
115        r'https://www.googleapis.com/compute/v1/(.*)',
116        self._resource_data['selfLink'],
117    )
118    if result:
119      return result.group(1)
120    else:
121      return '>> ' + self._resource_data['selfLink']
122
123  @property
124  def short_path(self) -> str:
125    path = self.project_id + '/' + self.name
126    return path
127
128  @property
129  def self_link(self) -> str:
130    return self._resource_data['selfLink']
131
132  @property
133  def name(self) -> str:
134    return self._resource_data['name']
135
136  @property
137  def named_ports(self) -> List[dict]:
138    if 'namedPorts' in self._resource_data:
139      return self._resource_data['namedPorts']
140    return []
141
142  def has_named_ports(self) -> bool:
143    if 'namedPorts' in self._resource_data:
144      return True
145    return False

Represents a GCE instance group.

InstanceGroup(project_id, resource_data)
108  def __init__(self, project_id, resource_data):
109    super().__init__(project_id=project_id)
110    self._resource_data = resource_data
full_path: str
112  @property
113  def full_path(self) -> str:
114    result = re.match(
115        r'https://www.googleapis.com/compute/v1/(.*)',
116        self._resource_data['selfLink'],
117    )
118    if result:
119      return result.group(1)
120    else:
121      return '>> ' + self._resource_data['selfLink']

Returns the full path of this resource.

Example: 'projects/gcpdiag-gke-1-9b90/zones/europe-west4-a/clusters/gke1'

short_path: str
123  @property
124  def short_path(self) -> str:
125    path = self.project_id + '/' + self.name
126    return path

Returns the short name for this resource.

Note that it isn't clear from this name what kind of resource it is.

Example: 'gke1'

name: str
132  @property
133  def name(self) -> str:
134    return self._resource_data['name']
named_ports: List[dict]
136  @property
137  def named_ports(self) -> List[dict]:
138    if 'namedPorts' in self._resource_data:
139      return self._resource_data['namedPorts']
140    return []
def has_named_ports(self) -> bool:
142  def has_named_ports(self) -> bool:
143    if 'namedPorts' in self._resource_data:
144      return True
145    return False
class ManagedInstanceGroup(gcpdiag.models.Resource):
148class ManagedInstanceGroup(models.Resource):
149  """Represents a GCE managed instance group."""
150
151  _resource_data: dict
152  _region: Optional[str]
153
154  def __init__(self, project_id, resource_data):
155    super().__init__(project_id=project_id)
156    self._resource_data = resource_data
157    self._region = None
158
159  @property
160  def full_path(self) -> str:
161    result = re.match(
162        r'https://www.googleapis.com/compute/v1/(.*)',
163        self._resource_data['selfLink'],
164    )
165    if result:
166      return result.group(1)
167    else:
168      return '>> ' + self._resource_data['selfLink']
169
170  @property
171  def short_path(self) -> str:
172    path = self.project_id + '/' + self.name
173    return path
174
175  def is_gke(self) -> bool:
176    """Is this managed instance group part of a GKE cluster?
177
178    Note that the results are based on heuristics (the mig name),
179    which is not ideal.
180    """
181
182    # gke- is normal GKE, gk3- is GKE autopilot
183    return self.name.startswith('gke-') or self.name.startswith('gk3-')
184
185  @property
186  def self_link(self) -> str:
187    return self._resource_data['selfLink']
188
189  @property
190  def name(self) -> str:
191    return self._resource_data['name']
192
193  @property
194  def region(self) -> str:
195    if self._region is None:
196      if 'region' in self._resource_data:
197        m = re.search(r'/regions/([^/]+)$', self._resource_data['region'])
198        if not m:
199          raise RuntimeError("can't determine region of mig %s (%s)" %
200                             (self.name, self._resource_data['region']))
201        self._region = m.group(1)
202      elif 'zone' in self._resource_data:
203        m = re.search(r'/zones/([^/]+)$', self._resource_data['zone'])
204        if not m:
205          raise RuntimeError("can't determine region of mig %s (%s)" %
206                             (self.name, self._resource_data['region']))
207        zone = m.group(1)
208        self._region = utils.zone_region(zone)
209      else:
210        raise RuntimeError(
211            f"can't determine region of mig {self.name}, both region and zone"
212            " aren't set!")
213    return self._region
214
215  def count_no_action_instances(self) -> int:
216    """number of instances in the mig that are running and have no scheduled actions."""
217    return self._resource_data['currentActions']['none']
218
219  def is_instance_member(self, project_id: str, region: str,
220                         instance_name: str):
221    """Given the project_id, region and instance name, is it a member of this MIG?"""
222    return (self.project_id == project_id and self.region == region and
223            instance_name.startswith(self._resource_data['baseInstanceName']))
224
225  @property
226  def template(self) -> InstanceTemplate:
227    if 'instanceTemplate' not in self._resource_data:
228      raise RuntimeError('instanceTemplate not set for MIG {self.name}')
229
230    m = re.match(
231        r'https://www.googleapis.com/compute/v1/(.*)',
232        self._resource_data['instanceTemplate'],
233    )
234
235    if not m:
236      raise RuntimeError("can't parse instanceTemplate: %s" %
237                         self._resource_data['instanceTemplate'])
238    template_self_link = m.group(1)
239    templates = get_instance_templates(self.project_id)
240    if template_self_link not in templates:
241      raise RuntimeError(
242          f'instanceTemplate {template_self_link} for MIG {self.name} not found'
243      )
244    return templates[template_self_link]

Represents a GCE managed instance group.

ManagedInstanceGroup(project_id, resource_data)
154  def __init__(self, project_id, resource_data):
155    super().__init__(project_id=project_id)
156    self._resource_data = resource_data
157    self._region = None
full_path: str
159  @property
160  def full_path(self) -> str:
161    result = re.match(
162        r'https://www.googleapis.com/compute/v1/(.*)',
163        self._resource_data['selfLink'],
164    )
165    if result:
166      return result.group(1)
167    else:
168      return '>> ' + self._resource_data['selfLink']

Returns the full path of this resource.

Example: 'projects/gcpdiag-gke-1-9b90/zones/europe-west4-a/clusters/gke1'

short_path: str
170  @property
171  def short_path(self) -> str:
172    path = self.project_id + '/' + self.name
173    return path

Returns the short name for this resource.

Note that it isn't clear from this name what kind of resource it is.

Example: 'gke1'

def is_gke(self) -> bool:
175  def is_gke(self) -> bool:
176    """Is this managed instance group part of a GKE cluster?
177
178    Note that the results are based on heuristics (the mig name),
179    which is not ideal.
180    """
181
182    # gke- is normal GKE, gk3- is GKE autopilot
183    return self.name.startswith('gke-') or self.name.startswith('gk3-')

Is this managed instance group part of a GKE cluster?

Note that the results are based on heuristics (the mig name), which is not ideal.

name: str
189  @property
190  def name(self) -> str:
191    return self._resource_data['name']
region: str
193  @property
194  def region(self) -> str:
195    if self._region is None:
196      if 'region' in self._resource_data:
197        m = re.search(r'/regions/([^/]+)$', self._resource_data['region'])
198        if not m:
199          raise RuntimeError("can't determine region of mig %s (%s)" %
200                             (self.name, self._resource_data['region']))
201        self._region = m.group(1)
202      elif 'zone' in self._resource_data:
203        m = re.search(r'/zones/([^/]+)$', self._resource_data['zone'])
204        if not m:
205          raise RuntimeError("can't determine region of mig %s (%s)" %
206                             (self.name, self._resource_data['region']))
207        zone = m.group(1)
208        self._region = utils.zone_region(zone)
209      else:
210        raise RuntimeError(
211            f"can't determine region of mig {self.name}, both region and zone"
212            " aren't set!")
213    return self._region
def count_no_action_instances(self) -> int:
215  def count_no_action_instances(self) -> int:
216    """number of instances in the mig that are running and have no scheduled actions."""
217    return self._resource_data['currentActions']['none']

number of instances in the mig that are running and have no scheduled actions.

def is_instance_member(self, project_id: str, region: str, instance_name: str):
219  def is_instance_member(self, project_id: str, region: str,
220                         instance_name: str):
221    """Given the project_id, region and instance name, is it a member of this MIG?"""
222    return (self.project_id == project_id and self.region == region and
223            instance_name.startswith(self._resource_data['baseInstanceName']))

Given the project_id, region and instance name, is it a member of this MIG?

template: InstanceTemplate
225  @property
226  def template(self) -> InstanceTemplate:
227    if 'instanceTemplate' not in self._resource_data:
228      raise RuntimeError('instanceTemplate not set for MIG {self.name}')
229
230    m = re.match(
231        r'https://www.googleapis.com/compute/v1/(.*)',
232        self._resource_data['instanceTemplate'],
233    )
234
235    if not m:
236      raise RuntimeError("can't parse instanceTemplate: %s" %
237                         self._resource_data['instanceTemplate'])
238    template_self_link = m.group(1)
239    templates = get_instance_templates(self.project_id)
240    if template_self_link not in templates:
241      raise RuntimeError(
242          f'instanceTemplate {template_self_link} for MIG {self.name} not found'
243      )
244    return templates[template_self_link]
class SerialPortOutput:
247class SerialPortOutput:
248  """Represents the full Serial Port Output (/dev/ttyS0 or COM1) of an instance.
249
250  contents is the full 1MB of the instance.
251  """
252
253  _project_id: str
254  _instance_id: str
255  _contents: List[str]
256
257  def __init__(self, project_id, instance_id, contents):
258    self._project_id = project_id
259    self._instance_id = instance_id
260    self._contents = contents
261
262  @property
263  def contents(self) -> List[str]:
264    return self._contents
265
266  @property
267  def instance_id(self) -> str:
268    return self._instance_id

Represents the full Serial Port Output (/dev/ttyS0 or COM1) of an instance.

contents is the full 1MB of the instance.

SerialPortOutput(project_id, instance_id, contents)
257  def __init__(self, project_id, instance_id, contents):
258    self._project_id = project_id
259    self._instance_id = instance_id
260    self._contents = contents
contents: List[str]
262  @property
263  def contents(self) -> List[str]:
264    return self._contents
instance_id: str
266  @property
267  def instance_id(self) -> str:
268    return self._instance_id
class Instance(gcpdiag.models.Resource):
271class Instance(models.Resource):
272  """Represents a GCE instance."""
273
274  _resource_data: dict
275  _region: Optional[str]
276
277  def __init__(self, project_id, resource_data):
278    super().__init__(project_id=project_id)
279    self._resource_data = resource_data
280    self._metadata_dict = None
281    self._region = None
282
283  @property
284  def id(self) -> str:
285    return self._resource_data['id']
286
287  @property
288  def name(self) -> str:
289    return self._resource_data['name']
290
291  @property
292  def full_path(self) -> str:
293    result = re.match(
294        r'https://www.googleapis.com/compute/v1/(.*)',
295        self._resource_data['selfLink'],
296    )
297    if result:
298      return result.group(1)
299    else:
300      return '>> ' + self._resource_data['selfLink']
301
302  @property
303  def short_path(self) -> str:
304    # Note: instance names must be unique per project, so no need to add the zone.
305    path = self.project_id + '/' + self.name
306    return path
307
308  @property
309  def creation_timestamp(self) -> datetime:
310    """VM creation time, as a *naive* `datetime` object."""
311    return (datetime.fromisoformat(
312        self._resource_data['creationTimestamp']).astimezone(
313            timezone.utc).replace(tzinfo=None))
314
315  @property
316  def region(self) -> str:
317    if self._region is None:
318      if 'zone' in self._resource_data:
319        m = re.search(r'/zones/([^/]+)$', self._resource_data['zone'])
320        if not m:
321          raise RuntimeError("can't determine region of instance %s (%s)" %
322                             (self.name, self._resource_data['region']))
323        zone = m.group(1)
324        self._region = utils.zone_region(zone)
325      else:
326        raise RuntimeError(
327            f"can't determine region of instance {self.name}, zone isn't set!")
328    return self._region
329
330  @property
331  def zone(self) -> str:
332    zone_uri = self._resource_data['zone']
333    m = re.search(r'/zones/([^/]+)$', zone_uri)
334    if m:
335      return m.group(1)
336    else:
337      raise RuntimeError(f"can't determine zone of instance {self.name}")
338
339  @property
340  def disks(self) -> List[dict]:
341    if 'disks' in self._resource_data:
342      return self._resource_data['disks']
343    return []
344
345  @property
346  def startrestricted(self) -> bool:
347    return self._resource_data['startRestricted']
348
349  def laststarttimestamp(self) -> str:
350    return self._resource_data['lastStartTimestamp']
351
352  def laststoptimestamp(self) -> str:
353    if 'lastStopTimestamp' in self._resource_data:
354      return self._resource_data['lastStopTimestamp']
355    return ''
356
357  def is_serial_port_logging_enabled(self) -> bool:
358    value = self.get_metadata('serial-port-logging-enable')
359    return bool(value and value.upper() in POSITIVE_BOOL_VALUES)
360
361  def is_oslogin_enabled(self) -> bool:
362    value = self.get_metadata('enable-oslogin')
363    return bool(value and value.upper() in POSITIVE_BOOL_VALUES)
364
365  def is_metadata_enabled(self, metadata_name) -> bool:
366    """Use to check for common boolen metadata value"""
367    value = self.get_metadata(metadata_name)
368    return bool(value and value.upper() in POSITIVE_BOOL_VALUES)
369
370  def has_label(self, label) -> bool:
371    return label in self.labels
372
373  def is_dataproc_instance(self) -> bool:
374    return self.has_label(DATAPROC_LABEL)
375
376  def is_gke_node(self) -> bool:
377    return self.has_label(GKE_LABEL)
378
379  def is_preemptible_vm(self) -> bool:
380    return ('scheduling' in self._resource_data and
381            'preemptible' in self._resource_data['scheduling'] and
382            self._resource_data['scheduling']['preemptible'])
383
384  def is_windows_machine(self) -> bool:
385    if 'disks' in self._resource_data:
386      disks = next(iter(self._resource_data['disks']))
387      if 'guestOsFeatures' in disks:
388        if 'WINDOWS' in [t['type'] for t in iter(disks['guestOsFeatures'])]:
389          return True
390    return False
391
392  def is_public_machine(self) -> bool:
393    if 'networkInterfaces' in self._resource_data:
394      return 'natIP' in str(self._resource_data['networkInterfaces'])
395    return False
396
397  def machine_type(self):
398    if 'machineType' in self._resource_data:
399      #return self._resource_data['machineType']
400      machine_type_uri = self._resource_data['machineType']
401      mt = re.search(r'/machineTypes/([^/]+)$', machine_type_uri)
402      if mt:
403        return mt.group(1)
404      else:
405        raise RuntimeError(
406            f"can't determine machineType of instance {self.name}")
407    return None
408
409  def check_license(self, licenses: List[str]) -> bool:
410    """Checks that a license is contained in a given license list"""
411    if 'disks' in self._resource_data:
412      for disk in self._resource_data['disks']:
413        if 'license' in str(disk):
414          for license_ in licenses:
415            for attached_license in disk['licenses']:
416              if license_ == attached_license.partition('/global/licenses/')[2]:
417                return True
418    return False
419
420  def get_boot_disk_image(self) -> str:
421    """Get VM's boot disk image"""
422    boot_disk_image: str = ''
423    for disk in self.disks:
424      if disk.get('boot', False):
425        disk_source = disk.get('source', '')
426        m = re.search(r'/disks/([^/]+)$', disk_source)
427        if not m:
428          raise RuntimeError(f"can't determine name of boot disk {disk_source}")
429        disk_name = m.group(1)
430        gce_disk: Disk = get_disk(self.project_id,
431                                  zone=self.zone,
432                                  disk_name=disk_name)
433        return gce_disk.source_image
434    return boot_disk_image
435
436  @property
437  def is_sole_tenant_vm(self) -> bool:
438    return bool('nodeAffinities' in self._resource_data['scheduling'])
439
440  @property
441  def network(self) -> network_q.Network:
442    # 'https://www.googleapis.com/compute/v1/projects/gcpdiag-gce1-aaaa/global/networks/default'
443    network_string = self._resource_data['networkInterfaces'][0]['network']
444    m = re.match(r'^.+/projects/([^/]+)/global/networks/([^/]+)$',
445                 network_string)
446    if not m:
447      raise RuntimeError("can't parse network string: %s" % network_string)
448    return network_q.get_network(m.group(1), m.group(2))
449
450  @property
451  def network_ips(self) -> List[network_q.IPv4AddrOrIPv6Addr]:
452    return [
453        ipaddress.ip_address(nic['networkIP'])
454        for nic in self._resource_data['networkInterfaces']
455    ]
456
457  @property
458  def get_network_interfaces(self):
459    return self._resource_data['networkInterfaces']
460
461  @property
462  def subnetworks(self) -> List[network_q.Subnetwork]:
463    subnetworks = []
464    for nic in self._resource_data['networkInterfaces']:
465      subnetworks.append(network_q.get_subnetwork_from_url(nic['subnetwork']))
466    return subnetworks
467
468  @property
469  def routes(self) -> List[network_q.Route]:
470    routes = []
471    for nic in self._resource_data['networkInterfaces']:
472      for route in network_q.get_routes(self.project_id):
473        if nic['network'] == route.network:
474          if route.tags == []:
475            routes.append(route)
476            continue
477          else:
478            temp = [x for x in self.tags if x in route.tags]
479            if len(temp) > 0:
480              routes.append(route)
481    return routes
482
483  def get_network_ip_for_instance_interface(
484      self, network: str) -> Optional[network_q.IPv4NetOrIPv6Net]:
485    """Get the network ip for a nic given a network name"""
486    for nic in self._resource_data['networkInterfaces']:
487      if nic.get('network') == network:
488        return ipaddress.ip_network(nic.get('networkIP'))
489    return None
490
491  def secure_boot_enabled(self) -> bool:
492    if 'shieldedInstanceConfig' in self._resource_data:
493      return self._resource_data['shieldedInstanceConfig']['enableSecureBoot']
494    return False
495
496  @property
497  def access_scopes(self) -> List[str]:
498    if 'serviceAccounts' in self._resource_data:
499      saccts = self._resource_data['serviceAccounts']
500      if isinstance(saccts, list) and len(saccts) >= 1:
501        return saccts[0].get('scopes', [])
502    return []
503
504  @property
505  def service_account(self) -> Optional[str]:
506    if 'serviceAccounts' in self._resource_data:
507      saccts = self._resource_data['serviceAccounts']
508      if isinstance(saccts, list) and len(saccts) >= 1:
509        return saccts[0]['email']
510    return None
511
512  @property
513  def tags(self) -> List[str]:
514    if 'tags' in self._resource_data:
515      if 'items' in self._resource_data['tags']:
516        return self._resource_data['tags']['items']
517    return []
518
519  def get_metadata(self, key: str) -> str:
520    if not self._metadata_dict:
521      self._metadata_dict = {}
522      if ('metadata' in self._resource_data and
523          'items' in self._resource_data['metadata']):
524        for item in self._resource_data['metadata']['items']:
525          if 'key' in item and 'value' in item:
526            self._metadata_dict[item['key']] = item['value']
527    project_metadata = get_project_metadata(self.project_id)
528    return self._metadata_dict.get(key, project_metadata.get(key))
529
530  @property
531  def status(self) -> str:
532    """VM Status"""
533    return self._resource_data.get('status', None)
534
535  @property
536  def is_running(self) -> bool:
537    """VM Status is indicated as running"""
538    return self._resource_data.get('status', False) == 'RUNNING'
539
540  @property  # type: ignore
541  @caching.cached_api_call(in_memory=True)
542  def mig(self) -> ManagedInstanceGroup:
543    """Return ManagedInstanceGroup that owns this instance.
544
545    Throws AttributeError in case it isn't MIG-managed.
546    """
547
548    created_by = self.get_metadata('created-by')
549    if created_by is None:
550      raise AttributeError(f'instance {self.id} is not managed by a mig')
551
552    # Example created-by:
553    # pylint: disable=line-too-long
554    # "projects/12340002/zones/europe-west4-a/instanceGroupManagers/gke-gke1-default-pool-e5e20a34-grp"
555    # (note how it uses a project number and not a project id...)
556    created_by_match = re.match(
557        r'projects/([^/]+)/((?:regions|zones)/[^/]+/instanceGroupManagers/[^/]+)$',
558        created_by,
559    )
560    if not created_by_match:
561      raise AttributeError(f'instance {self.id} is not managed by a mig'
562                           f' (created-by={created_by})')
563    project = crm.get_project(created_by_match.group(1))
564
565    mig_self_link = ('https://www.googleapis.com/compute/v1/'
566                     f'projects/{project.id}/{created_by_match.group(2)}')
567
568    # Try to find a matching mig.
569    for mig in get_managed_instance_groups(
570        models.Context(project_id=self.project_id)).values():
571      if mig.self_link == mig_self_link:
572        return mig
573
574    raise AttributeError(f'instance {self.id} is not managed by a mig')
575
576  @property
577  def labels(self) -> dict:
578    return self._resource_data.get('labels', {})

Represents a GCE instance.

Instance(project_id, resource_data)
277  def __init__(self, project_id, resource_data):
278    super().__init__(project_id=project_id)
279    self._resource_data = resource_data
280    self._metadata_dict = None
281    self._region = None
id: str
283  @property
284  def id(self) -> str:
285    return self._resource_data['id']
name: str
287  @property
288  def name(self) -> str:
289    return self._resource_data['name']
full_path: str
291  @property
292  def full_path(self) -> str:
293    result = re.match(
294        r'https://www.googleapis.com/compute/v1/(.*)',
295        self._resource_data['selfLink'],
296    )
297    if result:
298      return result.group(1)
299    else:
300      return '>> ' + self._resource_data['selfLink']

Returns the full path of this resource.

Example: 'projects/gcpdiag-gke-1-9b90/zones/europe-west4-a/clusters/gke1'

short_path: str
302  @property
303  def short_path(self) -> str:
304    # Note: instance names must be unique per project, so no need to add the zone.
305    path = self.project_id + '/' + self.name
306    return path

Returns the short name for this resource.

Note that it isn't clear from this name what kind of resource it is.

Example: 'gke1'

creation_timestamp: datetime.datetime
308  @property
309  def creation_timestamp(self) -> datetime:
310    """VM creation time, as a *naive* `datetime` object."""
311    return (datetime.fromisoformat(
312        self._resource_data['creationTimestamp']).astimezone(
313            timezone.utc).replace(tzinfo=None))

VM creation time, as a naive datetime object.

region: str
315  @property
316  def region(self) -> str:
317    if self._region is None:
318      if 'zone' in self._resource_data:
319        m = re.search(r'/zones/([^/]+)$', self._resource_data['zone'])
320        if not m:
321          raise RuntimeError("can't determine region of instance %s (%s)" %
322                             (self.name, self._resource_data['region']))
323        zone = m.group(1)
324        self._region = utils.zone_region(zone)
325      else:
326        raise RuntimeError(
327            f"can't determine region of instance {self.name}, zone isn't set!")
328    return self._region
zone: str
330  @property
331  def zone(self) -> str:
332    zone_uri = self._resource_data['zone']
333    m = re.search(r'/zones/([^/]+)$', zone_uri)
334    if m:
335      return m.group(1)
336    else:
337      raise RuntimeError(f"can't determine zone of instance {self.name}")
disks: List[dict]
339  @property
340  def disks(self) -> List[dict]:
341    if 'disks' in self._resource_data:
342      return self._resource_data['disks']
343    return []
startrestricted: bool
345  @property
346  def startrestricted(self) -> bool:
347    return self._resource_data['startRestricted']
def laststarttimestamp(self) -> str:
349  def laststarttimestamp(self) -> str:
350    return self._resource_data['lastStartTimestamp']
def laststoptimestamp(self) -> str:
352  def laststoptimestamp(self) -> str:
353    if 'lastStopTimestamp' in self._resource_data:
354      return self._resource_data['lastStopTimestamp']
355    return ''
def is_serial_port_logging_enabled(self) -> bool:
357  def is_serial_port_logging_enabled(self) -> bool:
358    value = self.get_metadata('serial-port-logging-enable')
359    return bool(value and value.upper() in POSITIVE_BOOL_VALUES)
def is_oslogin_enabled(self) -> bool:
361  def is_oslogin_enabled(self) -> bool:
362    value = self.get_metadata('enable-oslogin')
363    return bool(value and value.upper() in POSITIVE_BOOL_VALUES)
def is_metadata_enabled(self, metadata_name) -> bool:
365  def is_metadata_enabled(self, metadata_name) -> bool:
366    """Use to check for common boolen metadata value"""
367    value = self.get_metadata(metadata_name)
368    return bool(value and value.upper() in POSITIVE_BOOL_VALUES)

Use to check for common boolen metadata value

def has_label(self, label) -> bool:
370  def has_label(self, label) -> bool:
371    return label in self.labels
def is_dataproc_instance(self) -> bool:
373  def is_dataproc_instance(self) -> bool:
374    return self.has_label(DATAPROC_LABEL)
def is_gke_node(self) -> bool:
376  def is_gke_node(self) -> bool:
377    return self.has_label(GKE_LABEL)
def is_preemptible_vm(self) -> bool:
379  def is_preemptible_vm(self) -> bool:
380    return ('scheduling' in self._resource_data and
381            'preemptible' in self._resource_data['scheduling'] and
382            self._resource_data['scheduling']['preemptible'])
def is_windows_machine(self) -> bool:
384  def is_windows_machine(self) -> bool:
385    if 'disks' in self._resource_data:
386      disks = next(iter(self._resource_data['disks']))
387      if 'guestOsFeatures' in disks:
388        if 'WINDOWS' in [t['type'] for t in iter(disks['guestOsFeatures'])]:
389          return True
390    return False
def is_public_machine(self) -> bool:
392  def is_public_machine(self) -> bool:
393    if 'networkInterfaces' in self._resource_data:
394      return 'natIP' in str(self._resource_data['networkInterfaces'])
395    return False
def machine_type(self):
397  def machine_type(self):
398    if 'machineType' in self._resource_data:
399      #return self._resource_data['machineType']
400      machine_type_uri = self._resource_data['machineType']
401      mt = re.search(r'/machineTypes/([^/]+)$', machine_type_uri)
402      if mt:
403        return mt.group(1)
404      else:
405        raise RuntimeError(
406            f"can't determine machineType of instance {self.name}")
407    return None
def check_license(self, licenses: List[str]) -> bool:
409  def check_license(self, licenses: List[str]) -> bool:
410    """Checks that a license is contained in a given license list"""
411    if 'disks' in self._resource_data:
412      for disk in self._resource_data['disks']:
413        if 'license' in str(disk):
414          for license_ in licenses:
415            for attached_license in disk['licenses']:
416              if license_ == attached_license.partition('/global/licenses/')[2]:
417                return True
418    return False

Checks that a license is contained in a given license list

def get_boot_disk_image(self) -> str:
420  def get_boot_disk_image(self) -> str:
421    """Get VM's boot disk image"""
422    boot_disk_image: str = ''
423    for disk in self.disks:
424      if disk.get('boot', False):
425        disk_source = disk.get('source', '')
426        m = re.search(r'/disks/([^/]+)$', disk_source)
427        if not m:
428          raise RuntimeError(f"can't determine name of boot disk {disk_source}")
429        disk_name = m.group(1)
430        gce_disk: Disk = get_disk(self.project_id,
431                                  zone=self.zone,
432                                  disk_name=disk_name)
433        return gce_disk.source_image
434    return boot_disk_image

Get VM's boot disk image

is_sole_tenant_vm: bool
436  @property
437  def is_sole_tenant_vm(self) -> bool:
438    return bool('nodeAffinities' in self._resource_data['scheduling'])
network: gcpdiag.queries.network.Network
440  @property
441  def network(self) -> network_q.Network:
442    # 'https://www.googleapis.com/compute/v1/projects/gcpdiag-gce1-aaaa/global/networks/default'
443    network_string = self._resource_data['networkInterfaces'][0]['network']
444    m = re.match(r'^.+/projects/([^/]+)/global/networks/([^/]+)$',
445                 network_string)
446    if not m:
447      raise RuntimeError("can't parse network string: %s" % network_string)
448    return network_q.get_network(m.group(1), m.group(2))
network_ips: List[Union[ipaddress.IPv4Address, ipaddress.IPv6Address]]
450  @property
451  def network_ips(self) -> List[network_q.IPv4AddrOrIPv6Addr]:
452    return [
453        ipaddress.ip_address(nic['networkIP'])
454        for nic in self._resource_data['networkInterfaces']
455    ]
get_network_interfaces
457  @property
458  def get_network_interfaces(self):
459    return self._resource_data['networkInterfaces']
subnetworks: List[gcpdiag.queries.network.Subnetwork]
461  @property
462  def subnetworks(self) -> List[network_q.Subnetwork]:
463    subnetworks = []
464    for nic in self._resource_data['networkInterfaces']:
465      subnetworks.append(network_q.get_subnetwork_from_url(nic['subnetwork']))
466    return subnetworks
routes: List[gcpdiag.queries.network.Route]
468  @property
469  def routes(self) -> List[network_q.Route]:
470    routes = []
471    for nic in self._resource_data['networkInterfaces']:
472      for route in network_q.get_routes(self.project_id):
473        if nic['network'] == route.network:
474          if route.tags == []:
475            routes.append(route)
476            continue
477          else:
478            temp = [x for x in self.tags if x in route.tags]
479            if len(temp) > 0:
480              routes.append(route)
481    return routes
def get_network_ip_for_instance_interface( self, network: str) -> Union[ipaddress.IPv4Network, ipaddress.IPv6Network, NoneType]:
483  def get_network_ip_for_instance_interface(
484      self, network: str) -> Optional[network_q.IPv4NetOrIPv6Net]:
485    """Get the network ip for a nic given a network name"""
486    for nic in self._resource_data['networkInterfaces']:
487      if nic.get('network') == network:
488        return ipaddress.ip_network(nic.get('networkIP'))
489    return None

Get the network ip for a nic given a network name

def secure_boot_enabled(self) -> bool:
491  def secure_boot_enabled(self) -> bool:
492    if 'shieldedInstanceConfig' in self._resource_data:
493      return self._resource_data['shieldedInstanceConfig']['enableSecureBoot']
494    return False
access_scopes: List[str]
496  @property
497  def access_scopes(self) -> List[str]:
498    if 'serviceAccounts' in self._resource_data:
499      saccts = self._resource_data['serviceAccounts']
500      if isinstance(saccts, list) and len(saccts) >= 1:
501        return saccts[0].get('scopes', [])
502    return []
service_account: Optional[str]
504  @property
505  def service_account(self) -> Optional[str]:
506    if 'serviceAccounts' in self._resource_data:
507      saccts = self._resource_data['serviceAccounts']
508      if isinstance(saccts, list) and len(saccts) >= 1:
509        return saccts[0]['email']
510    return None
tags: List[str]
512  @property
513  def tags(self) -> List[str]:
514    if 'tags' in self._resource_data:
515      if 'items' in self._resource_data['tags']:
516        return self._resource_data['tags']['items']
517    return []
def get_metadata(self, key: str) -> str:
519  def get_metadata(self, key: str) -> str:
520    if not self._metadata_dict:
521      self._metadata_dict = {}
522      if ('metadata' in self._resource_data and
523          'items' in self._resource_data['metadata']):
524        for item in self._resource_data['metadata']['items']:
525          if 'key' in item and 'value' in item:
526            self._metadata_dict[item['key']] = item['value']
527    project_metadata = get_project_metadata(self.project_id)
528    return self._metadata_dict.get(key, project_metadata.get(key))
status: str
530  @property
531  def status(self) -> str:
532    """VM Status"""
533    return self._resource_data.get('status', None)

VM Status

is_running: bool
535  @property
536  def is_running(self) -> bool:
537    """VM Status is indicated as running"""
538    return self._resource_data.get('status', False) == 'RUNNING'

VM Status is indicated as running

mig: ManagedInstanceGroup
540  @property  # type: ignore
541  @caching.cached_api_call(in_memory=True)
542  def mig(self) -> ManagedInstanceGroup:
543    """Return ManagedInstanceGroup that owns this instance.
544
545    Throws AttributeError in case it isn't MIG-managed.
546    """
547
548    created_by = self.get_metadata('created-by')
549    if created_by is None:
550      raise AttributeError(f'instance {self.id} is not managed by a mig')
551
552    # Example created-by:
553    # pylint: disable=line-too-long
554    # "projects/12340002/zones/europe-west4-a/instanceGroupManagers/gke-gke1-default-pool-e5e20a34-grp"
555    # (note how it uses a project number and not a project id...)
556    created_by_match = re.match(
557        r'projects/([^/]+)/((?:regions|zones)/[^/]+/instanceGroupManagers/[^/]+)$',
558        created_by,
559    )
560    if not created_by_match:
561      raise AttributeError(f'instance {self.id} is not managed by a mig'
562                           f' (created-by={created_by})')
563    project = crm.get_project(created_by_match.group(1))
564
565    mig_self_link = ('https://www.googleapis.com/compute/v1/'
566                     f'projects/{project.id}/{created_by_match.group(2)}')
567
568    # Try to find a matching mig.
569    for mig in get_managed_instance_groups(
570        models.Context(project_id=self.project_id)).values():
571      if mig.self_link == mig_self_link:
572        return mig
573
574    raise AttributeError(f'instance {self.id} is not managed by a mig')

Return ManagedInstanceGroup that owns this instance.

Throws AttributeError in case it isn't MIG-managed.

labels: dict
576  @property
577  def labels(self) -> dict:
578    return self._resource_data.get('labels', {})
class Disk(gcpdiag.models.Resource):
581class Disk(models.Resource):
582  """Represents a GCE disk."""
583
584  _resource_data: dict
585
586  def __init__(self, project_id, resource_data):
587    super().__init__(project_id=project_id)
588    self._resource_data = resource_data
589
590  @property
591  def id(self) -> str:
592    return self._resource_data['id']
593
594  @property
595  def name(self) -> str:
596    return self._resource_data['name']
597
598  @property
599  def type(self) -> str:
600    disk_type = re.search(r'/diskTypes/([^/]+)$', self._resource_data['type'])
601    if not disk_type:
602      raise RuntimeError("can't determine type of the disk %s (%s)" %
603                         (self.name, self._resource_data['type']))
604    return disk_type.group(1)
605
606  @property
607  def users(self) -> list:
608    pattern = r'/instances/(.+)$'
609    # Extracting the instances
610    instances = []
611    for i in self._resource_data.get('users', []):
612      m = re.search(pattern, i)
613      if m:
614        instances.append(m.group(1))
615    return instances
616
617  @property
618  def zone(self) -> str:
619    m = re.search(r'/zones/([^/]+)$', self._resource_data['zone'])
620    if not m:
621      raise RuntimeError("can't determine zone of disk %s (%s)" %
622                         (self.name, self._resource_data['zone']))
623    return m.group(1)
624
625  @property
626  def source_image(self) -> str:
627    return self._resource_data.get('sourceImage', '')
628
629  @property
630  def full_path(self) -> str:
631    result = re.match(
632        r'https://www.googleapis.com/compute/v1/(.*)',
633        self._resource_data['selfLink'],
634    )
635    if result:
636      return result.group(1)
637    else:
638      return '>> ' + self._resource_data['selfLink']
639
640  @property
641  def short_path(self) -> str:
642    return f'{self.project_id}/{self.name}'
643
644  @property
645  def bootable(self) -> bool:
646    return 'guestOsFeatures' in self._resource_data
647
648  @property
649  def in_use(self) -> bool:
650    return 'users' in self._resource_data
651
652  @property
653  def size(self) -> int:
654    return self._resource_data['sizeGb']
655
656  @property
657  def provisionediops(self) -> Optional[int]:
658    return self._resource_data.get('provisionedIops')
659
660  @property
661  def has_snapshot_schedule(self) -> bool:
662    return 'resourcePolicies' in self._resource_data

Represents a GCE disk.

Disk(project_id, resource_data)
586  def __init__(self, project_id, resource_data):
587    super().__init__(project_id=project_id)
588    self._resource_data = resource_data
id: str
590  @property
591  def id(self) -> str:
592    return self._resource_data['id']
name: str
594  @property
595  def name(self) -> str:
596    return self._resource_data['name']
type: str
598  @property
599  def type(self) -> str:
600    disk_type = re.search(r'/diskTypes/([^/]+)$', self._resource_data['type'])
601    if not disk_type:
602      raise RuntimeError("can't determine type of the disk %s (%s)" %
603                         (self.name, self._resource_data['type']))
604    return disk_type.group(1)
users: list
606  @property
607  def users(self) -> list:
608    pattern = r'/instances/(.+)$'
609    # Extracting the instances
610    instances = []
611    for i in self._resource_data.get('users', []):
612      m = re.search(pattern, i)
613      if m:
614        instances.append(m.group(1))
615    return instances
zone: str
617  @property
618  def zone(self) -> str:
619    m = re.search(r'/zones/([^/]+)$', self._resource_data['zone'])
620    if not m:
621      raise RuntimeError("can't determine zone of disk %s (%s)" %
622                         (self.name, self._resource_data['zone']))
623    return m.group(1)
source_image: str
625  @property
626  def source_image(self) -> str:
627    return self._resource_data.get('sourceImage', '')
full_path: str
629  @property
630  def full_path(self) -> str:
631    result = re.match(
632        r'https://www.googleapis.com/compute/v1/(.*)',
633        self._resource_data['selfLink'],
634    )
635    if result:
636      return result.group(1)
637    else:
638      return '>> ' + self._resource_data['selfLink']

Returns the full path of this resource.

Example: 'projects/gcpdiag-gke-1-9b90/zones/europe-west4-a/clusters/gke1'

short_path: str
640  @property
641  def short_path(self) -> str:
642    return f'{self.project_id}/{self.name}'

Returns the short name for this resource.

Note that it isn't clear from this name what kind of resource it is.

Example: 'gke1'

bootable: bool
644  @property
645  def bootable(self) -> bool:
646    return 'guestOsFeatures' in self._resource_data
in_use: bool
648  @property
649  def in_use(self) -> bool:
650    return 'users' in self._resource_data
size: int
652  @property
653  def size(self) -> int:
654    return self._resource_data['sizeGb']
provisionediops: Optional[int]
656  @property
657  def provisionediops(self) -> Optional[int]:
658    return self._resource_data.get('provisionedIops')
has_snapshot_schedule: bool
660  @property
661  def has_snapshot_schedule(self) -> bool:
662    return 'resourcePolicies' in self._resource_data
@caching.cached_api_call(in_memory=True)
def get_gce_zones(project_id: str) -> Set[str]:
665@caching.cached_api_call(in_memory=True)
666def get_gce_zones(project_id: str) -> Set[str]:
667  try:
668    gce_api = apis.get_api('compute', 'v1', project_id)
669    logging.info('listing gce zones of project %s', project_id)
670    request = gce_api.zones().list(project=project_id)
671    response = request.execute(num_retries=config.API_RETRIES)
672    if not response or 'items' not in response:
673      return set()
674    return {item['name'] for item in response['items'] if 'name' in item}
675  except googleapiclient.errors.HttpError as err:
676    raise utils.GcpApiError(err) from err
def get_gce_public_licences(project_id: str) -> List[str]:
679def get_gce_public_licences(project_id: str) -> List[str]:
680  """Returns a list of licenses based on publicly available image project"""
681  licenses = []
682  gce_api = apis.get_api('compute', 'v1', project_id)
683  logging.info('listing licenses of project %s', project_id)
684  request = gce_api.licenses().list(project=project_id)
685  while request is not None:
686    response = request.execute()
687    for license_ in response['items']:
688      formatted_license = license_['selfLink'].partition('/global/licenses/')[2]
689      licenses.append(formatted_license)
690    request = gce_api.licenses().list_next(previous_request=request,
691                                           previous_response=response)
692  return licenses

Returns a list of licenses based on publicly available image project

def get_instance( project_id: str, zone: str, instance_name: str) -> Instance:
695def get_instance(project_id: str, zone: str, instance_name: str) -> Instance:
696  """Returns instance object matching instance name and zone"""
697  compute = apis.get_api('compute', 'v1', project_id)
698  request = compute.instances().get(project=project_id,
699                                    zone=zone,
700                                    instance=instance_name)
701
702  response = request.execute(num_retries=config.API_RETRIES)
703  return Instance(project_id, resource_data=response)

Returns instance object matching instance name and zone

@caching.cached_api_call(in_memory=True)
def get_disk(project_id: str, zone: str, disk_name: str) -> Disk:
706@caching.cached_api_call(in_memory=True)
707def get_disk(project_id: str, zone: str, disk_name: str) -> Disk:
708  """Returns disk object matching disk name and zone"""
709  compute = apis.get_api('compute', 'v1', project_id)
710  request = compute.disks().get(project=project_id, zone=zone, disk=disk_name)
711  response = request.execute(num_retries=config.API_RETRIES)
712  return Disk(project_id, resource_data=response)

Returns disk object matching disk name and zone

@caching.cached_api_call(in_memory=True)
def get_instances( context: gcpdiag.models.Context) -> Mapping[str, Instance]:
715@caching.cached_api_call(in_memory=True)
716def get_instances(context: models.Context) -> Mapping[str, Instance]:
717  """Get a list of Instance matching the given context, indexed by instance id."""
718
719  instances: Dict[str, Instance] = {}
720  if not apis.is_enabled(context.project_id, 'compute'):
721    return instances
722  gce_api = apis.get_api('compute', 'v1', context.project_id)
723  requests = [
724      gce_api.instances().list(project=context.project_id, zone=zone)
725      for zone in get_gce_zones(context.project_id)
726  ]
727  logging.info('listing gce instances of project %s', context.project_id)
728  items = apis_utils.multi_list_all(
729      requests=requests,
730      next_function=gce_api.instances().list_next,
731  )
732  for i in items:
733    result = re.match(
734        r'https://www.googleapis.com/compute/v1/projects/[^/]+/zones/([^/]+)/',
735        i['selfLink'],
736    )
737    if not result:
738      logging.error("instance %s selfLink didn't match regexp: %s", i['id'],
739                    i['selfLink'])
740      continue
741    zone = result.group(1)
742    labels = i.get('labels', {})
743    resource = i.get('name', '')
744    if not context.match_project_resource(
745        location=zone, labels=labels, resource=resource):
746      continue
747    instances[i['id']] = Instance(project_id=context.project_id,
748                                  resource_data=i)
749  return instances

Get a list of Instance matching the given context, indexed by instance id.

@caching.cached_api_call(in_memory=True)
def get_instance_groups( context: gcpdiag.models.Context) -> Mapping[str, InstanceGroup]:
752@caching.cached_api_call(in_memory=True)
753def get_instance_groups(context: models.Context) -> Mapping[str, InstanceGroup]:
754  """Get a list of InstanceGroups matching the given context, indexed by name."""
755  groups: Dict[str, InstanceGroup] = {}
756  if not apis.is_enabled(context.project_id, 'compute'):
757    return groups
758  gce_api = apis.get_api('compute', 'v1', context.project_id)
759  requests = [
760      gce_api.instanceGroups().list(project=context.project_id, zone=zone)
761      for zone in get_gce_zones(context.project_id)
762  ]
763  logging.info('listing gce instance groups of project %s', context.project_id)
764  items = apis_utils.multi_list_all(
765      requests=requests,
766      next_function=gce_api.instanceGroups().list_next,
767  )
768  for i in items:
769    result = re.match(
770        r'https://www.googleapis.com/compute/v1/projects/[^/]+/zones/([^/]+)',
771        i['selfLink'],
772    )
773    if not result:
774      logging.error("instance %s selfLink didn't match regexp: %s", i['id'],
775                    i['selfLink'])
776      continue
777    zone = result.group(1)
778    labels = i.get('labels', {})
779    resource = i.get('name', '')
780    if not context.match_project_resource(
781        location=zone, labels=labels, resource=resource):
782      continue
783    instance_group = InstanceGroup(context.project_id, i)
784    groups[instance_group.full_path] = instance_group
785  return groups

Get a list of InstanceGroups matching the given context, indexed by name.

@caching.cached_api_call(in_memory=True)
def get_managed_instance_groups( context: gcpdiag.models.Context) -> Mapping[int, ManagedInstanceGroup]:
788@caching.cached_api_call(in_memory=True)
789def get_managed_instance_groups(
790    context: models.Context,) -> Mapping[int, ManagedInstanceGroup]:
791  """Get a list of zonal ManagedInstanceGroups matching the given context, indexed by mig id."""
792
793  migs: Dict[int, ManagedInstanceGroup] = {}
794  if not apis.is_enabled(context.project_id, 'compute'):
795    return migs
796  gce_api = apis.get_api('compute', 'v1', context.project_id)
797  requests = [
798      gce_api.instanceGroupManagers().list(project=context.project_id,
799                                           zone=zone)
800      for zone in get_gce_zones(context.project_id)
801  ]
802  logging.info('listing zonal managed instance groups of project %s',
803               context.project_id)
804  items = apis_utils.multi_list_all(
805      requests=requests,
806      next_function=gce_api.instanceGroupManagers().list_next,
807  )
808  for i in items:
809    result = re.match(
810        r'https://www.googleapis.com/compute/v1/projects/[^/]+/(?:regions|zones)/([^/]+)/',
811        i['selfLink'],
812    )
813    if not result:
814      logging.error("mig %s selfLink didn't match regexp: %s", i['name'],
815                    i['selfLink'])
816      continue
817    location = result.group(1)
818    labels = i.get('labels', {})
819    resource = i.get('name', '')
820    if not context.match_project_resource(
821        location=location, labels=labels, resource=resource):
822      continue
823    migs[i['id']] = ManagedInstanceGroup(project_id=context.project_id,
824                                         resource_data=i)
825  return migs

Get a list of zonal ManagedInstanceGroups matching the given context, indexed by mig id.

@caching.cached_api_call(in_memory=True)
def get_region_managed_instance_groups( context: gcpdiag.models.Context) -> Mapping[int, ManagedInstanceGroup]:
828@caching.cached_api_call(in_memory=True)
829def get_region_managed_instance_groups(
830    context: models.Context,) -> Mapping[int, ManagedInstanceGroup]:
831  """Get a list of regional ManagedInstanceGroups matching the given context, indexed by mig id."""
832
833  migs: Dict[int, ManagedInstanceGroup] = {}
834  if not apis.is_enabled(context.project_id, 'compute'):
835    return migs
836  gce_api = apis.get_api('compute', 'v1', context.project_id)
837  requests = [
838      gce_api.regionInstanceGroupManagers().list(project=context.project_id,
839                                                 region=r.name)
840      for r in get_all_regions(context.project_id)
841  ]
842  logging.info('listing regional managed instance groups of project %s',
843               context.project_id)
844  items = apis_utils.multi_list_all(
845      requests=requests,
846      next_function=gce_api.regionInstanceGroupManagers().list_next,
847  )
848  for i in items:
849    result = re.match(
850        r'https://www.googleapis.com/compute/v1/projects/[^/]+/(?:regions)/([^/]+)/',
851        i['selfLink'],
852    )
853    if not result:
854      logging.error("mig %s selfLink didn't match regexp: %s", i['name'],
855                    i['selfLink'])
856      continue
857    location = result.group(1)
858    labels = i.get('labels', {})
859    name = i.get('name', '')
860    if not context.match_project_resource(
861        location=location, labels=labels, resource=name):
862      continue
863    migs[i['id']] = ManagedInstanceGroup(project_id=context.project_id,
864                                         resource_data=i)
865  return migs

Get a list of regional ManagedInstanceGroups matching the given context, indexed by mig id.

@caching.cached_api_call
def get_instance_templates(project_id: str) -> Mapping[str, InstanceTemplate]:
868@caching.cached_api_call
869def get_instance_templates(project_id: str) -> Mapping[str, InstanceTemplate]:
870  logging.info('fetching instance templates')
871  templates = {}
872  gce_api = apis.get_api('compute', 'v1', project_id)
873  request = gce_api.instanceTemplates().list(
874      project=project_id,
875      returnPartialSuccess=True,
876      # Fetch only a subset of the fields to improve performance.
877      fields=('items/name, items/properties/tags,'
878              ' items/properties/networkInterfaces,'
879              ' items/properties/serviceAccounts, items/properties/metadata'),
880  )
881  for t in apis_utils.list_all(
882      request, next_function=gce_api.instanceTemplates().list_next):
883    instance_template = InstanceTemplate(project_id, t)
884    templates[instance_template.full_path] = instance_template
885  return templates
@caching.cached_api_call
def get_project_metadata(project_id) -> Mapping[str, str]:
888@caching.cached_api_call
889def get_project_metadata(project_id) -> Mapping[str, str]:
890  gce_api = apis.get_api('compute', 'v1', project_id)
891  logging.info('fetching metadata of project %s\n', project_id)
892  query = gce_api.projects().get(project=project_id)
893  try:
894    response = query.execute(num_retries=config.API_RETRIES)
895  except googleapiclient.errors.HttpError as err:
896    raise utils.GcpApiError(err) from err
897
898  mapped_metadata: Dict[str, str] = {}
899  metadata = response.get('commonInstanceMetadata')
900  if metadata and 'items' in metadata:
901    for m_item in metadata['items']:
902      mapped_metadata[m_item.get('key')] = m_item.get('value')
903  return mapped_metadata
@caching.cached_api_call
def get_instances_serial_port_output(context: gcpdiag.models.Context):
906@caching.cached_api_call
907def get_instances_serial_port_output(context: models.Context):
908  """Get a list of serial port output for instances
909
910  which matches the given context, running and is not
911  exported to cloud logging.
912  """
913  # Create temp storage (diskcache.Deque) for output
914  deque = caching.get_tmp_deque('tmp-gce-serial-output-')
915  if not apis.is_enabled(context.project_id, 'compute'):
916    return deque
917  gce_api = apis.get_api('compute', 'v1', context.project_id)
918
919  # Serial port output are rolled over on day 7 and limited to 1MB.
920  # Fetching serial outputs are very expensive so optimize to fetch.
921  # Only relevant instances as storage size can grow drastically for
922  # massive projects. Think 1MB * N where N is some large number.
923  requests = [
924      gce_api.instances().getSerialPortOutput(
925          project=i.project_id,
926          zone=i.zone,
927          instance=i.id,
928          # To get all 1mb output
929          start=-1000000,
930      )
931      for i in get_instances(context).values()
932      # fetch running instances that do not export to cloud logging
933      if not i.is_serial_port_logging_enabled() and i.is_running
934  ]
935  requests_start_time = datetime.now()
936  # Note: We are limited to 1000 calls in a single batch request.
937  # We have to use multiple batch requests in batches of 1000
938  # https://github.com/googleapis/google-api-python-client/blob/main/docs/batch.md
939  batch_size = 1000
940  for i in range(0, len(requests), batch_size):
941    batch_requests = requests[i:i + batch_size]
942    for _, response, exception in apis_utils.batch_execute_all(
943        api=gce_api, requests=batch_requests):
944      if exception:
945        if isinstance(exception, googleapiclient.errors.HttpError):
946          raise utils.GcpApiError(exception) from exception
947        else:
948          raise exception
949
950      if response:
951        result = re.match(
952            r'https://www.googleapis.com/compute/v1/projects/([^/]+)/zones/[^/]+/instances/([^/]+)',
953            response['selfLink'],
954        )
955        if not result:
956          logging.error("instance selfLink didn't match regexp: %s",
957                        response['selfLink'])
958          return
959
960        project_id = result.group(1)
961        instance_id = result.group(2)
962        deque.appendleft(
963            SerialPortOutput(
964                project_id=project_id,
965                instance_id=instance_id,
966                contents=response['contents'].splitlines(),
967            ))
968  requests_end_time = datetime.now()
969  logging.debug(
970      'total serial logs processing time: %s, number of instances: %s',
971      requests_end_time - requests_start_time,
972      len(requests),
973  )
974  return deque

Get a list of serial port output for instances

which matches the given context, running and is not exported to cloud logging.

@caching.cached_api_call
def get_instance_serial_port_output( project_id, zone, instance_name) -> Optional[SerialPortOutput]:
 977@caching.cached_api_call
 978def get_instance_serial_port_output(
 979    project_id, zone, instance_name) -> Optional[SerialPortOutput]:
 980  """Get a list of serial port output for instances
 981
 982  which matches the given context, running and is not
 983  exported to cloud logging.
 984  """
 985  # Create temp storage (diskcache.Deque) for output
 986  if not apis.is_enabled(project_id, 'compute'):
 987    return None
 988  gce_api = apis.get_api('compute', 'v1', project_id)
 989
 990  request = gce_api.instances().getSerialPortOutput(
 991      project=project_id,
 992      zone=zone,
 993      instance=instance_name,
 994      # To get all 1mb output
 995      start=-1000000,
 996  )
 997  try:
 998    response = request.execute(num_retries=config.API_RETRIES)
 999  except googleapiclient.errors.HttpError:
1000    return None
1001
1002  if response:
1003    result = re.match(
1004        r'https://www.googleapis.com/compute/v1/projects/([^/]+)/zones/[^/]+/instances/([^/]+)',
1005        response['selfLink'],
1006    )
1007  if not result:
1008    logging.error("instance selfLink didn't match regexp: %s",
1009                  response['selfLink'])
1010    return None
1011
1012  project_id = result.group(1)
1013  instance_id = result.group(2)
1014  return SerialPortOutput(
1015      project_id,
1016      instance_id=instance_id,
1017      contents=response['contents'].splitlines(),
1018  )

Get a list of serial port output for instances

which matches the given context, running and is not exported to cloud logging.

class Region(gcpdiag.models.Resource):
1021class Region(models.Resource):
1022  """Represents a GCE Region."""
1023
1024  _resource_data: dict
1025
1026  def __init__(self, project_id, resource_data):
1027    super().__init__(project_id=project_id)
1028    self._resource_data = resource_data
1029
1030  @property
1031  def self_link(self) -> str:
1032    return self._resource_data['selfLink']
1033
1034  @property
1035  def full_path(self) -> str:
1036    result = re.match(r'https://www.googleapis.com/compute/v1/(.*)',
1037                      self.self_link)
1038    if result:
1039      return result.group(1)
1040    else:
1041      return f'>> {self.self_link}'
1042
1043  @property
1044  def name(self) -> str:
1045    return self._resource_data['name']

Represents a GCE Region.

Region(project_id, resource_data)
1026  def __init__(self, project_id, resource_data):
1027    super().__init__(project_id=project_id)
1028    self._resource_data = resource_data
full_path: str
1034  @property
1035  def full_path(self) -> str:
1036    result = re.match(r'https://www.googleapis.com/compute/v1/(.*)',
1037                      self.self_link)
1038    if result:
1039      return result.group(1)
1040    else:
1041      return f'>> {self.self_link}'

Returns the full path of this resource.

Example: 'projects/gcpdiag-gke-1-9b90/zones/europe-west4-a/clusters/gke1'

name: str
1043  @property
1044  def name(self) -> str:
1045    return self._resource_data['name']
@caching.cached_api_call
def get_all_regions(project_id: str) -> Iterable[Region]:
1048@caching.cached_api_call
1049def get_all_regions(project_id: str) -> Iterable[Region]:
1050  """Return list of all regions
1051
1052  Args:
1053      project_id (str): project id for this request
1054
1055  Raises:
1056      utils.GcpApiError: Raises GcpApiError in case of query issues
1057
1058  Returns:
1059      Iterable[Region]: Return list of all regions
1060  """
1061  try:
1062    gce_api = apis.get_api('compute', 'v1', project_id)
1063    request = gce_api.regions().list(project=project_id)
1064    response = request.execute(num_retries=config.API_RETRIES)
1065    if not response or 'items' not in response:
1066      return set()
1067
1068    return {
1069        Region(project_id, item) for item in response['items'] if 'name' in item
1070    }
1071  except googleapiclient.errors.HttpError as err:
1072    raise utils.GcpApiError(err) from err

Return list of all regions

Arguments:
  • project_id (str): project id for this request
Raises:
  • utils.GcpApiError: Raises GcpApiError in case of query issues
Returns:

Iterable[Region]: Return list of all regions

def get_regions_with_instances(context: gcpdiag.models.Context) -> Iterable[Region]:
1075def get_regions_with_instances(context: models.Context) -> Iterable[Region]:
1076  """Return list of regions with instances
1077
1078  Args:
1079      context (models.Context): context for this request
1080
1081  Returns:
1082      Iterable[Region]: Return list of regions which contains instances
1083  """
1084
1085  regions_of_instances = {i.region for i in get_instances(context).values()}
1086
1087  all_regions = get_all_regions(context.project_id)
1088  if not all_regions:
1089    return set()
1090
1091  return {r for r in all_regions if r.name in regions_of_instances}

Return list of regions with instances

Arguments:
  • context (models.Context): context for this request
Returns:

Iterable[Region]: Return list of regions which contains instances

@caching.cached_api_call
def get_all_disks(project_id: str) -> Iterable[Disk]:
1094@caching.cached_api_call
1095def get_all_disks(project_id: str) -> Iterable[Disk]:
1096  # Fetching only Zonal Disks(Regional disks exempted)
1097  try:
1098    gce_api = apis.get_api('compute', 'v1', project_id)
1099    requests = [
1100        gce_api.disks().list(project=project_id, zone=zone)
1101        for zone in get_gce_zones(project_id)
1102    ]
1103    logging.info('listing gce disks of project %s', project_id)
1104    items = apis_utils.multi_list_all(
1105        requests=requests,
1106        next_function=gce_api.disks().list_next,
1107    )
1108
1109    return {Disk(project_id, item) for item in items}
1110
1111  except googleapiclient.errors.HttpError as err:
1112    raise utils.GcpApiError(err) from err
@caching.cached_api_call
def get_all_disks_of_instance(project_id: str, zone: str, instance_name: str) -> dict:
1115@caching.cached_api_call
1116def get_all_disks_of_instance(project_id: str, zone: str,
1117                              instance_name: str) -> dict:
1118  # Fetching only Zonal Disks(Regional disks exempted) attached to an instance
1119  try:
1120    gce_api = apis.get_api('compute', 'v1', project_id)
1121    requests = [gce_api.disks().list(project=project_id, zone=zone)]
1122    logging.info('listing gce disks attached to instance %s in project %s',
1123                 instance_name, project_id)
1124    items = apis_utils.multi_list_all(
1125        requests=requests,
1126        next_function=gce_api.disks().list_next,
1127    )
1128    all_disk_list = {Disk(project_id, item) for item in items}
1129    disk_list = {}
1130    for disk in all_disk_list:
1131      if disk.users == [instance_name]:
1132        disk_list[disk.name] = disk
1133    return disk_list
1134
1135  except googleapiclient.errors.HttpError as err:
1136    raise utils.GcpApiError(err) from err
class InstanceEffectiveFirewalls(gcpdiag.queries.network.EffectiveFirewalls):
1139class InstanceEffectiveFirewalls(network_q.EffectiveFirewalls):
1140  """Effective firewall rules for a network interface on a VM instance.
1141
1142  Includes org/folder firewall policies).
1143  """
1144
1145  _instance: Instance
1146  _nic: str
1147
1148  def __init__(self, instance, nic, resource_data):
1149    super().__init__(resource_data)
1150    self._instance = instance
1151    self._nic = nic

Effective firewall rules for a network interface on a VM instance.

Includes org/folder firewall policies).

InstanceEffectiveFirewalls(instance, nic, resource_data)
1148  def __init__(self, instance, nic, resource_data):
1149    super().__init__(resource_data)
1150    self._instance = instance
1151    self._nic = nic
@caching.cached_api_call(in_memory=True)
def get_instance_interface_effective_firewalls( instance: Instance, nic: str) -> InstanceEffectiveFirewalls:
1154@caching.cached_api_call(in_memory=True)
1155def get_instance_interface_effective_firewalls(
1156    instance: Instance, nic: str) -> InstanceEffectiveFirewalls:
1157  """Return effective firewalls for a network interface on the instance"""
1158  compute = apis.get_api('compute', 'v1', instance.project_id)
1159  request = compute.instances().getEffectiveFirewalls(
1160      project=instance.project_id,
1161      zone=instance.zone,
1162      instance=instance.name,
1163      networkInterface=nic,
1164  )
1165  response = request.execute(num_retries=config.API_RETRIES)
1166  return InstanceEffectiveFirewalls(Instance, nic, response)

Return effective firewalls for a network interface on the instance

def is_project_serial_port_logging_enabled(project_id: str) -> bool:
1169def is_project_serial_port_logging_enabled(project_id: str) -> bool:
1170  if not apis.is_enabled(project_id, 'compute'):
1171    return False
1172
1173  value = get_project_metadata(
1174      project_id=project_id).get('serial-port-logging-enable')
1175  return bool(value and value.upper() in POSITIVE_BOOL_VALUES)
def is_serial_port_buffer_enabled():
1178def is_serial_port_buffer_enabled():
1179  return config.get('enable_gce_serial_buffer')
class SerialOutputQuery:
1196class SerialOutputQuery:
1197  """A serial output job that was started with prefetch_logs()."""
1198
1199  job: _SerialOutputJob
1200
1201  def __init__(self, job):
1202    self.job = job
1203
1204  @property
1205  def entries(self) -> Sequence:
1206    if not self.job.future:
1207      raise RuntimeError("Fetching serial logs wasn't executed. did you call"
1208                         ' execute_get_serial_port_output()?')
1209    elif self.job.future.running():
1210      logging.info(
1211          'waiting for serial output results for project: %s',
1212          self.job.context.project_id,
1213      )
1214    return self.job.future.result()

A serial output job that was started with prefetch_logs().

SerialOutputQuery(job)
1201  def __init__(self, job):
1202    self.job = job
job: gcpdiag.queries.gce._SerialOutputJob
entries: Sequence
1204  @property
1205  def entries(self) -> Sequence:
1206    if not self.job.future:
1207      raise RuntimeError("Fetching serial logs wasn't executed. did you call"
1208                         ' execute_get_serial_port_output()?')
1209    elif self.job.future.running():
1210      logging.info(
1211          'waiting for serial output results for project: %s',
1212          self.job.context.project_id,
1213      )
1214    return self.job.future.result()
jobs_todo: Dict[gcpdiag.models.Context, gcpdiag.queries.gce._SerialOutputJob] = {}
def execute_fetch_serial_port_outputs(executor: concurrent.futures._base.Executor):
1220def execute_fetch_serial_port_outputs(executor: concurrent.futures.Executor):
1221  # start a thread to fetch serial log; processing logs can be large
1222  # depending on he number of instances in the project which aren't logging to cloud logging
1223  # currently expects only one job but implementing it so support for multiple projects is possible.
1224  global jobs_todo
1225  jobs_executing = jobs_todo
1226  jobs_todo = {}
1227  for job in jobs_executing.values():
1228    job.future = executor.submit(get_instances_serial_port_output, job.context)
def fetch_serial_port_outputs(context: gcpdiag.models.Context) -> SerialOutputQuery:
1231def fetch_serial_port_outputs(context: models.Context) -> SerialOutputQuery:
1232  # Aggregate by context
1233  job = jobs_todo.setdefault(context, _SerialOutputJob(context=context))
1234  return SerialOutputQuery(job=job)
class HealthCheck(gcpdiag.models.Resource):
1238class HealthCheck(models.Resource):
1239  """A Health Check resource."""
1240
1241  _resource_data: dict
1242  _type: str
1243
1244  def __init__(self, project_id, resource_data):
1245    super().__init__(project_id=project_id)
1246    self._resource_data = resource_data
1247
1248  @property
1249  def name(self) -> str:
1250    return self._resource_data['name']
1251
1252  @property
1253  def full_path(self) -> str:
1254    result = re.match(r'https://www.googleapis.com/compute/v1/(.*)',
1255                      self.self_link)
1256    if result:
1257      return result.group(1)
1258    else:
1259      return f'>> {self.self_link}'
1260
1261  @property
1262  def short_path(self) -> str:
1263    path = self.project_id + '/' + self.name
1264    return path
1265
1266  @property
1267  def self_link(self) -> str:
1268    return self._resource_data['selfLink']
1269
1270  @property
1271  def is_log_enabled(self) -> bool:
1272    try:
1273      log_config = self._resource_data.get('logConfig', False)
1274      if log_config and log_config['enable']:
1275        return True
1276    except KeyError:
1277      return False
1278    return False
1279
1280  @property
1281  def type(self) -> str:
1282    return self._resource_data['type']
1283
1284  @property
1285  def request_path(self) -> str:
1286    return self.get_health_check_property('requestPath', '/')
1287
1288  @property
1289  def request(self) -> str:
1290    return self.get_health_check_property('request')
1291
1292  @property
1293  def response(self) -> str:
1294    return self.get_health_check_property('response')
1295
1296  @property
1297  def port(self) -> int:
1298    return self.get_health_check_property('port')
1299
1300  @property
1301  def port_specification(self) -> str:
1302    return self.get_health_check_property('portSpecification')
1303
1304  @property
1305  def timeout_sec(self) -> int:
1306    return self._resource_data.get('timeoutSec', 5)
1307
1308  def get_health_check_property(self, property_name: str, default_value=None):
1309    health_check_types = {
1310        'HTTP': 'httpHealthCheck',
1311        'HTTPS': 'httpsHealthCheck',
1312        'HTTP2': 'http2HealthCheck',
1313        'TCP': 'tcpHealthCheck',
1314        'SSL': 'sslHealthCheck',
1315        'GRPC': 'grpcHealthCheck',
1316    }
1317    if self.type in health_check_types:
1318      health_check_data = self._resource_data.get(health_check_types[self.type])
1319      if health_check_data:
1320        return health_check_data.get(property_name) or default_value
1321    return default_value

A Health Check resource.

HealthCheck(project_id, resource_data)
1244  def __init__(self, project_id, resource_data):
1245    super().__init__(project_id=project_id)
1246    self._resource_data = resource_data
name: str
1248  @property
1249  def name(self) -> str:
1250    return self._resource_data['name']
full_path: str
1252  @property
1253  def full_path(self) -> str:
1254    result = re.match(r'https://www.googleapis.com/compute/v1/(.*)',
1255                      self.self_link)
1256    if result:
1257      return result.group(1)
1258    else:
1259      return f'>> {self.self_link}'

Returns the full path of this resource.

Example: 'projects/gcpdiag-gke-1-9b90/zones/europe-west4-a/clusters/gke1'

short_path: str
1261  @property
1262  def short_path(self) -> str:
1263    path = self.project_id + '/' + self.name
1264    return path

Returns the short name for this resource.

Note that it isn't clear from this name what kind of resource it is.

Example: 'gke1'

is_log_enabled: bool
1270  @property
1271  def is_log_enabled(self) -> bool:
1272    try:
1273      log_config = self._resource_data.get('logConfig', False)
1274      if log_config and log_config['enable']:
1275        return True
1276    except KeyError:
1277      return False
1278    return False
type: str
1280  @property
1281  def type(self) -> str:
1282    return self._resource_data['type']
request_path: str
1284  @property
1285  def request_path(self) -> str:
1286    return self.get_health_check_property('requestPath', '/')
request: str
1288  @property
1289  def request(self) -> str:
1290    return self.get_health_check_property('request')
response: str
1292  @property
1293  def response(self) -> str:
1294    return self.get_health_check_property('response')
port: int
1296  @property
1297  def port(self) -> int:
1298    return self.get_health_check_property('port')
port_specification: str
1300  @property
1301  def port_specification(self) -> str:
1302    return self.get_health_check_property('portSpecification')
timeout_sec: int
1304  @property
1305  def timeout_sec(self) -> int:
1306    return self._resource_data.get('timeoutSec', 5)
def get_health_check_property(self, property_name: str, default_value=None):
1308  def get_health_check_property(self, property_name: str, default_value=None):
1309    health_check_types = {
1310        'HTTP': 'httpHealthCheck',
1311        'HTTPS': 'httpsHealthCheck',
1312        'HTTP2': 'http2HealthCheck',
1313        'TCP': 'tcpHealthCheck',
1314        'SSL': 'sslHealthCheck',
1315        'GRPC': 'grpcHealthCheck',
1316    }
1317    if self.type in health_check_types:
1318      health_check_data = self._resource_data.get(health_check_types[self.type])
1319      if health_check_data:
1320        return health_check_data.get(property_name) or default_value
1321    return default_value
@caching.cached_api_call(in_memory=True)
def get_health_check(project_id: str, health_check: str, region: str = None) -> object:
1324@caching.cached_api_call(in_memory=True)
1325def get_health_check(project_id: str,
1326                     health_check: str,
1327                     region: str = None) -> object:
1328  logging.info('fetching health check: %s', health_check)
1329  compute = apis.get_api('compute', 'v1', project_id)
1330  if not region:
1331    request = compute.healthChecks().get(project=project_id,
1332                                         healthCheck=health_check)
1333  else:
1334    request = compute.regionHealthChecks().get(project=project_id,
1335                                               healthCheck=health_check,
1336                                               region=region)
1337  response = request.execute(num_retries=config.API_RETRIES)
1338  return HealthCheck(project_id, response)
class NetworkEndpointGroup(gcpdiag.models.Resource):
1341class NetworkEndpointGroup(models.Resource):
1342  """A Network Endpoint Group resource."""
1343
1344  _resource_data: dict
1345  _type: str
1346
1347  def __init__(self, project_id, resource_data):
1348    super().__init__(project_id=project_id)
1349    self._resource_data = resource_data
1350
1351  @property
1352  def name(self) -> str:
1353    return self._resource_data['name']
1354
1355  @property
1356  def id(self) -> str:
1357    return self._resource_data['id']
1358
1359  @property
1360  def full_path(self) -> str:
1361    result = re.match(r'https://www.googleapis.com/compute/v1/(.*)',
1362                      self.self_link)
1363    if result:
1364      return result.group(1)
1365    else:
1366      return f'>> {self.self_link}'
1367
1368  @property
1369  def short_path(self) -> str:
1370    path = self.project_id + '/' + self.name
1371    return path
1372
1373  @property
1374  def self_link(self) -> str:
1375    return self._resource_data['selfLink']

A Network Endpoint Group resource.

NetworkEndpointGroup(project_id, resource_data)
1347  def __init__(self, project_id, resource_data):
1348    super().__init__(project_id=project_id)
1349    self._resource_data = resource_data
name: str
1351  @property
1352  def name(self) -> str:
1353    return self._resource_data['name']
id: str
1355  @property
1356  def id(self) -> str:
1357    return self._resource_data['id']
full_path: str
1359  @property
1360  def full_path(self) -> str:
1361    result = re.match(r'https://www.googleapis.com/compute/v1/(.*)',
1362                      self.self_link)
1363    if result:
1364      return result.group(1)
1365    else:
1366      return f'>> {self.self_link}'

Returns the full path of this resource.

Example: 'projects/gcpdiag-gke-1-9b90/zones/europe-west4-a/clusters/gke1'

short_path: str
1368  @property
1369  def short_path(self) -> str:
1370    path = self.project_id + '/' + self.name
1371    return path

Returns the short name for this resource.

Note that it isn't clear from this name what kind of resource it is.

Example: 'gke1'

@caching.cached_api_call(in_memory=True)
def get_zonal_network_endpoint_groups( context: gcpdiag.models.Context) -> Mapping[str, NetworkEndpointGroup]:
1378@caching.cached_api_call(in_memory=True)
1379def get_zonal_network_endpoint_groups(
1380    context: models.Context,) -> Mapping[str, NetworkEndpointGroup]:
1381  """Returns a list of Network Endpoint Groups in the project."""
1382  groups: Dict[str, NetworkEndpointGroup] = {}
1383  if not apis.is_enabled(context.project_id, 'compute'):
1384    return groups
1385  gce_api = apis.get_api('compute', 'v1', context.project_id)
1386  requests = [
1387      gce_api.networkEndpointGroups().list(project=context.project_id,
1388                                           zone=zone)
1389      for zone in get_gce_zones(context.project_id)
1390  ]
1391  logging.info('listing gce networkEndpointGroups of project %s',
1392               context.project_id)
1393  items = apis_utils.multi_list_all(
1394      requests=requests,
1395      next_function=gce_api.networkEndpointGroups().list_next,
1396  )
1397
1398  for i in items:
1399    result = re.match(
1400        r'https://www.googleapis.com/compute/v1/projects/[^/]+/zones/([^/]+)',
1401        i['selfLink'],
1402    )
1403    if not result:
1404      logging.error("instance %s selfLink didn't match regexp: %s", i['id'],
1405                    i['selfLink'])
1406      continue
1407    zone = result.group(1)
1408    labels = i.get('labels', {})
1409    resource = i.get('name', '')
1410    if not context.match_project_resource(
1411        location=zone, labels=labels, resource=resource):
1412      continue
1413    data = NetworkEndpointGroup(context.project_id, i)
1414    groups[data.full_path] = data
1415  return groups

Returns a list of Network Endpoint Groups in the project.