gcpdiag.queries.gce

Queries related to GCP Compute Engine.
POSITIVE_BOOL_VALUES = {'1', 'TRUE', 'Y', 'YES'}
DATAPROC_LABEL = 'goog-dataproc-cluster-name'
GKE_LABEL = 'goog-gke-node'
class InstanceTemplate(gcpdiag.models.Resource):
 40class InstanceTemplate(models.Resource):
 41  """Represents a GCE Instance Template."""
 42
 43  _resource_data: dict
 44
 45  def __init__(self, project_id, resource_data):
 46    super().__init__(project_id=project_id)
 47    self._resource_data = resource_data
 48
 49  @property
 50  def self_link(self) -> str:
 51    return self._resource_data['selfLink']
 52
 53  @property
 54  def full_path(self) -> str:
 55    result = re.match(r'https://www.googleapis.com/compute/v1/(.*)',
 56                      self.self_link)
 57    if result:
 58      return result.group(1)
 59    else:
 60      return f'>> {self.self_link}'
 61
 62  @property
 63  def short_path(self) -> str:
 64    path = self.project_id + '/' + self.name
 65    return path
 66
 67  @property
 68  def name(self) -> str:
 69    return self._resource_data['name']
 70
 71  @property
 72  def tags(self) -> List[str]:
 73    return self._resource_data['properties'].get('tags', {}).get('items', [])
 74
 75  @property
 76  def service_account(self) -> Optional[str]:
 77    sa_list = self._resource_data['properties'].get('serviceAccounts', [])
 78    if not sa_list:
 79      return None
 80    email = sa_list[0]['email']
 81    if email == 'default':
 82      project_nr = crm.get_project(self._project_id).number
 83      return f'{project_nr}-compute@developer.gserviceaccount.com'
 84    return email
 85
 86  @property
 87  def network(self) -> network_q.Network:
 88    return network_q.get_network_from_url(
 89        self._resource_data['properties']['networkInterfaces'][0]['network'])
 90
 91  @property
 92  def subnetwork(self) -> network_q.Subnetwork:
 93    subnet_url = self._resource_data['properties']['networkInterfaces'][0][
 94        'subnetwork']
 95    return self.network.subnetworks[subnet_url]
 96
 97  def get_metadata(self, key: str) -> str:
 98    for item in self._resource_data['properties']['metadata']['items']:
 99      if item['key'] == key:
100        return item['value']
101    return ''

Represents a GCE Instance Template.

InstanceTemplate(project_id, resource_data)
45  def __init__(self, project_id, resource_data):
46    super().__init__(project_id=project_id)
47    self._resource_data = resource_data
full_path: str
53  @property
54  def full_path(self) -> str:
55    result = re.match(r'https://www.googleapis.com/compute/v1/(.*)',
56                      self.self_link)
57    if result:
58      return result.group(1)
59    else:
60      return f'>> {self.self_link}'

Returns the full path of this resource.

Example: 'projects/gcpdiag-gke-1-9b90/zones/europe-west4-a/clusters/gke1'

short_path: str
62  @property
63  def short_path(self) -> str:
64    path = self.project_id + '/' + self.name
65    return path

Returns the short name for this resource.

Note that it isn't clear from this name what kind of resource it is.

Example: 'gke1'

name: str
67  @property
68  def name(self) -> str:
69    return self._resource_data['name']
tags: List[str]
71  @property
72  def tags(self) -> List[str]:
73    return self._resource_data['properties'].get('tags', {}).get('items', [])
service_account: Optional[str]
75  @property
76  def service_account(self) -> Optional[str]:
77    sa_list = self._resource_data['properties'].get('serviceAccounts', [])
78    if not sa_list:
79      return None
80    email = sa_list[0]['email']
81    if email == 'default':
82      project_nr = crm.get_project(self._project_id).number
83      return f'{project_nr}-compute@developer.gserviceaccount.com'
84    return email
network: gcpdiag.queries.network.Network
86  @property
87  def network(self) -> network_q.Network:
88    return network_q.get_network_from_url(
89        self._resource_data['properties']['networkInterfaces'][0]['network'])
subnetwork: gcpdiag.queries.network.Subnetwork
91  @property
92  def subnetwork(self) -> network_q.Subnetwork:
93    subnet_url = self._resource_data['properties']['networkInterfaces'][0][
94        'subnetwork']
95    return self.network.subnetworks[subnet_url]
def get_metadata(self, key: str) -> str:
 97  def get_metadata(self, key: str) -> str:
 98    for item in self._resource_data['properties']['metadata']['items']:
 99      if item['key'] == key:
100        return item['value']
101    return ''
class InstanceGroup(gcpdiag.models.Resource):
104class InstanceGroup(models.Resource):
105  """Represents a GCE instance group."""
106
107  _resource_data: dict
108
109  def __init__(self, project_id, resource_data):
110    super().__init__(project_id=project_id)
111    self._resource_data = resource_data
112
113  @property
114  def full_path(self) -> str:
115    result = re.match(
116        r'https://www.googleapis.com/compute/v1/(.*)',
117        self._resource_data['selfLink'],
118    )
119    if result:
120      return result.group(1)
121    else:
122      return '>> ' + self._resource_data['selfLink']
123
124  @property
125  def short_path(self) -> str:
126    path = self.project_id + '/' + self.name
127    return path
128
129  @property
130  def self_link(self) -> str:
131    return self._resource_data['selfLink']
132
133  @property
134  def name(self) -> str:
135    return self._resource_data['name']
136
137  @property
138  def named_ports(self) -> List[dict]:
139    if 'namedPorts' in self._resource_data:
140      return self._resource_data['namedPorts']
141    return []
142
143  def has_named_ports(self) -> bool:
144    if 'namedPorts' in self._resource_data:
145      return True
146    return False

Represents a GCE instance group.

InstanceGroup(project_id, resource_data)
109  def __init__(self, project_id, resource_data):
110    super().__init__(project_id=project_id)
111    self._resource_data = resource_data
full_path: str
113  @property
114  def full_path(self) -> str:
115    result = re.match(
116        r'https://www.googleapis.com/compute/v1/(.*)',
117        self._resource_data['selfLink'],
118    )
119    if result:
120      return result.group(1)
121    else:
122      return '>> ' + self._resource_data['selfLink']

Returns the full path of this resource.

Example: 'projects/gcpdiag-gke-1-9b90/zones/europe-west4-a/clusters/gke1'

short_path: str
124  @property
125  def short_path(self) -> str:
126    path = self.project_id + '/' + self.name
127    return path

Returns the short name for this resource.

Note that it isn't clear from this name what kind of resource it is.

Example: 'gke1'

name: str
133  @property
134  def name(self) -> str:
135    return self._resource_data['name']
named_ports: List[dict]
137  @property
138  def named_ports(self) -> List[dict]:
139    if 'namedPorts' in self._resource_data:
140      return self._resource_data['namedPorts']
141    return []
def has_named_ports(self) -> bool:
143  def has_named_ports(self) -> bool:
144    if 'namedPorts' in self._resource_data:
145      return True
146    return False
class ManagedInstanceGroup(gcpdiag.models.Resource):
149class ManagedInstanceGroup(models.Resource):
150  """Represents a GCE managed instance group."""
151
152  _resource_data: dict
153  _region: Optional[str]
154
155  def __init__(self, project_id, resource_data):
156    super().__init__(project_id=project_id)
157    self._resource_data = resource_data
158    self._region = None
159
160  @property
161  def full_path(self) -> str:
162    result = re.match(
163        r'https://www.googleapis.com/compute/v1/(.*)',
164        self._resource_data['selfLink'],
165    )
166    if result:
167      return result.group(1)
168    else:
169      return '>> ' + self._resource_data['selfLink']
170
171  @property
172  def short_path(self) -> str:
173    path = self.project_id + '/' + self.name
174    return path
175
176  def is_gke(self) -> bool:
177    """Is this managed instance group part of a GKE cluster?
178
179    Note that the results are based on heuristics (the mig name),
180    which is not ideal.
181
182    Returns:
183        bool: True if this managed instance group is part of a GKE cluster.
184    """
185
186    # gke- is normal GKE, gk3- is GKE autopilot
187    return self.name.startswith('gke-') or self.name.startswith('gk3-')
188
189  @property
190  def self_link(self) -> str:
191    return self._resource_data['selfLink']
192
193  @property
194  def name(self) -> str:
195    return self._resource_data['name']
196
197  @property
198  def region(self) -> str:
199    if self._region is None:
200      if 'region' in self._resource_data:
201        m = re.search(r'/regions/([^/]+)$', self._resource_data['region'])
202        if not m:
203          raise RuntimeError("can't determine region of mig %s (%s)" %
204                             (self.name, self._resource_data['region']))
205        self._region = m.group(1)
206      elif 'zone' in self._resource_data:
207        m = re.search(r'/zones/([^/]+)$', self._resource_data['zone'])
208        if not m:
209          raise RuntimeError("can't determine region of mig %s (%s)" %
210                             (self.name, self._resource_data['region']))
211        zone = m.group(1)
212        self._region = utils.zone_region(zone)
213      else:
214        raise RuntimeError(
215            f"can't determine region of mig {self.name}, both region and zone"
216            " aren't set!")
217    return self._region
218
219  @property
220  def zone(self) -> Optional[str]:
221    if 'zone' in self._resource_data:
222      m = re.search(r'/zones/([^/]+)$', self._resource_data['zone'])
223      if not m:
224        raise RuntimeError("can't determine zone of mig %s (%s)" %
225                           (self.name, self._resource_data['zone']))
226      return m.group(1)
227    return None
228
229  def count_no_action_instances(self) -> int:
230    """number of instances in the mig that are running and have no scheduled actions."""
231    return self._resource_data['currentActions']['none']
232
233  def is_instance_member(self, project_id: str, region: str,
234                         instance_name: str):
235    """Given the project_id, region and instance name, is it a member of this MIG?"""
236    return (self.project_id == project_id and self.region == region and
237            instance_name.startswith(self._resource_data['baseInstanceName']))
238
239  @property
240  def template(self) -> InstanceTemplate:
241    if 'instanceTemplate' not in self._resource_data:
242      raise RuntimeError('instanceTemplate not set for MIG {self.name}')
243
244    m = re.match(
245        r'https://www.googleapis.com/compute/v1/(.*)',
246        self._resource_data['instanceTemplate'],
247    )
248
249    if not m:
250      raise RuntimeError("can't parse instanceTemplate: %s" %
251                         self._resource_data['instanceTemplate'])
252    template_self_link = m.group(1)
253    templates = get_instance_templates(self.project_id)
254    if template_self_link not in templates:
255      raise RuntimeError(
256          f'instanceTemplate {template_self_link} for MIG {self.name} not found'
257      )
258    return templates[template_self_link]
259
260  @property
261  def version_target_reached(self) -> bool:
262    return get_path(self._resource_data,
263                    ('status', 'versionTarget', 'isReached'))
264
265  def get(self, path: str, default: Any = None) -> Any:
266    """Gets a value from resource_data using a dot-separated path."""
267    return get_path(self._resource_data,
268                    tuple(path.split('.')),
269                    default=default)

Represents a GCE managed instance group.

ManagedInstanceGroup(project_id, resource_data)
155  def __init__(self, project_id, resource_data):
156    super().__init__(project_id=project_id)
157    self._resource_data = resource_data
158    self._region = None
full_path: str
160  @property
161  def full_path(self) -> str:
162    result = re.match(
163        r'https://www.googleapis.com/compute/v1/(.*)',
164        self._resource_data['selfLink'],
165    )
166    if result:
167      return result.group(1)
168    else:
169      return '>> ' + self._resource_data['selfLink']

Returns the full path of this resource.

Example: 'projects/gcpdiag-gke-1-9b90/zones/europe-west4-a/clusters/gke1'

short_path: str
171  @property
172  def short_path(self) -> str:
173    path = self.project_id + '/' + self.name
174    return path

Returns the short name for this resource.

Note that it isn't clear from this name what kind of resource it is.

Example: 'gke1'

def is_gke(self) -> bool:
176  def is_gke(self) -> bool:
177    """Is this managed instance group part of a GKE cluster?
178
179    Note that the results are based on heuristics (the mig name),
180    which is not ideal.
181
182    Returns:
183        bool: True if this managed instance group is part of a GKE cluster.
184    """
185
186    # gke- is normal GKE, gk3- is GKE autopilot
187    return self.name.startswith('gke-') or self.name.startswith('gk3-')

Is this managed instance group part of a GKE cluster?

Note that the results are based on heuristics (the mig name), which is not ideal.

Returns:

bool: True if this managed instance group is part of a GKE cluster.

name: str
193  @property
194  def name(self) -> str:
195    return self._resource_data['name']
region: str
197  @property
198  def region(self) -> str:
199    if self._region is None:
200      if 'region' in self._resource_data:
201        m = re.search(r'/regions/([^/]+)$', self._resource_data['region'])
202        if not m:
203          raise RuntimeError("can't determine region of mig %s (%s)" %
204                             (self.name, self._resource_data['region']))
205        self._region = m.group(1)
206      elif 'zone' in self._resource_data:
207        m = re.search(r'/zones/([^/]+)$', self._resource_data['zone'])
208        if not m:
209          raise RuntimeError("can't determine region of mig %s (%s)" %
210                             (self.name, self._resource_data['region']))
211        zone = m.group(1)
212        self._region = utils.zone_region(zone)
213      else:
214        raise RuntimeError(
215            f"can't determine region of mig {self.name}, both region and zone"
216            " aren't set!")
217    return self._region
zone: Optional[str]
219  @property
220  def zone(self) -> Optional[str]:
221    if 'zone' in self._resource_data:
222      m = re.search(r'/zones/([^/]+)$', self._resource_data['zone'])
223      if not m:
224        raise RuntimeError("can't determine zone of mig %s (%s)" %
225                           (self.name, self._resource_data['zone']))
226      return m.group(1)
227    return None
def count_no_action_instances(self) -> int:
229  def count_no_action_instances(self) -> int:
230    """number of instances in the mig that are running and have no scheduled actions."""
231    return self._resource_data['currentActions']['none']

number of instances in the mig that are running and have no scheduled actions.

def is_instance_member(self, project_id: str, region: str, instance_name: str):
233  def is_instance_member(self, project_id: str, region: str,
234                         instance_name: str):
235    """Given the project_id, region and instance name, is it a member of this MIG?"""
236    return (self.project_id == project_id and self.region == region and
237            instance_name.startswith(self._resource_data['baseInstanceName']))

Given the project_id, region and instance name, is it a member of this MIG?

template: InstanceTemplate
239  @property
240  def template(self) -> InstanceTemplate:
241    if 'instanceTemplate' not in self._resource_data:
242      raise RuntimeError('instanceTemplate not set for MIG {self.name}')
243
244    m = re.match(
245        r'https://www.googleapis.com/compute/v1/(.*)',
246        self._resource_data['instanceTemplate'],
247    )
248
249    if not m:
250      raise RuntimeError("can't parse instanceTemplate: %s" %
251                         self._resource_data['instanceTemplate'])
252    template_self_link = m.group(1)
253    templates = get_instance_templates(self.project_id)
254    if template_self_link not in templates:
255      raise RuntimeError(
256          f'instanceTemplate {template_self_link} for MIG {self.name} not found'
257      )
258    return templates[template_self_link]
version_target_reached: bool
260  @property
261  def version_target_reached(self) -> bool:
262    return get_path(self._resource_data,
263                    ('status', 'versionTarget', 'isReached'))
def get(self, path: str, default: Any = None) -> Any:
265  def get(self, path: str, default: Any = None) -> Any:
266    """Gets a value from resource_data using a dot-separated path."""
267    return get_path(self._resource_data,
268                    tuple(path.split('.')),
269                    default=default)

Gets a value from resource_data using a dot-separated path.

class Autoscaler(gcpdiag.models.Resource):
272class Autoscaler(models.Resource):
273  """Represents a GCE Autoscaler."""
274
275  _resource_data: dict
276
277  def __init__(self, project_id, resource_data):
278    super().__init__(project_id=project_id)
279    self._resource_data = resource_data
280
281  @property
282  def self_link(self) -> str:
283    return self._resource_data['selfLink']
284
285  @property
286  def full_path(self) -> str:
287    result = re.match(r'https://www.googleapis.com/compute/v1/(.*)',
288                      self.self_link)
289    if result:
290      return result.group(1)
291    else:
292      return f'>> {self.self_link}'
293
294  @property
295  def name(self) -> str:
296    return self._resource_data['name']
297
298  def get(self, path: str, default: Any = None) -> Any:
299    """Gets a value from resource_data using a dot-separated path."""
300    return get_path(self._resource_data,
301                    tuple(path.split('.')),
302                    default=default)

Represents a GCE Autoscaler.

Autoscaler(project_id, resource_data)
277  def __init__(self, project_id, resource_data):
278    super().__init__(project_id=project_id)
279    self._resource_data = resource_data
full_path: str
285  @property
286  def full_path(self) -> str:
287    result = re.match(r'https://www.googleapis.com/compute/v1/(.*)',
288                      self.self_link)
289    if result:
290      return result.group(1)
291    else:
292      return f'>> {self.self_link}'

Returns the full path of this resource.

Example: 'projects/gcpdiag-gke-1-9b90/zones/europe-west4-a/clusters/gke1'

name: str
294  @property
295  def name(self) -> str:
296    return self._resource_data['name']
def get(self, path: str, default: Any = None) -> Any:
298  def get(self, path: str, default: Any = None) -> Any:
299    """Gets a value from resource_data using a dot-separated path."""
300    return get_path(self._resource_data,
301                    tuple(path.split('.')),
302                    default=default)

Gets a value from resource_data using a dot-separated path.

class SerialPortOutput:
305class SerialPortOutput:
306  """Represents the full Serial Port Output (/dev/ttyS0 or COM1) of an instance.
307
308  contents is the full 1MB of the instance.
309  """
310
311  _project_id: str
312  _instance_id: str
313  _contents: List[str]
314
315  def __init__(self, project_id, instance_id, contents):
316    self._project_id = project_id
317    self._instance_id = instance_id
318    self._contents = contents
319
320  @property
321  def contents(self) -> List[str]:
322    return self._contents
323
324  @property
325  def instance_id(self) -> str:
326    return self._instance_id

Represents the full Serial Port Output (/dev/ttyS0 or COM1) of an instance.

contents is the full 1MB of the instance.

SerialPortOutput(project_id, instance_id, contents)
315  def __init__(self, project_id, instance_id, contents):
316    self._project_id = project_id
317    self._instance_id = instance_id
318    self._contents = contents
contents: List[str]
320  @property
321  def contents(self) -> List[str]:
322    return self._contents
instance_id: str
324  @property
325  def instance_id(self) -> str:
326    return self._instance_id
class Instance(gcpdiag.models.Resource):
329class Instance(models.Resource):
330  """Represents a GCE instance."""
331
332  _resource_data: dict
333  _region: Optional[str]
334
335  def __init__(self, project_id, resource_data):
336    super().__init__(project_id=project_id)
337    self._resource_data = resource_data
338    self._metadata_dict = None
339    self._region = None
340
341  @property
342  def id(self) -> str:
343    return self._resource_data['id']
344
345  @property
346  def name(self) -> str:
347    return self._resource_data['name']
348
349  @property
350  def full_path(self) -> str:
351    result = re.match(
352        r'https://www.googleapis.com/compute/v1/(.*)',
353        self._resource_data['selfLink'],
354    )
355    if result:
356      return result.group(1)
357    else:
358      return '>> ' + self._resource_data['selfLink']
359
360  @property
361  def short_path(self) -> str:
362    # Note: instance names must be unique per project,
363    # so no need to add the zone.
364    path = self.project_id + '/' + self.name
365    return path
366
367  @property
368  def creation_timestamp(self) -> datetime:
369    """VM creation time, as a *naive* `datetime` object."""
370    return (datetime.fromisoformat(
371        self._resource_data['creationTimestamp']).astimezone(
372            timezone.utc).replace(tzinfo=None))
373
374  @property
375  def region(self) -> str:
376    if self._region is None:
377      if 'zone' in self._resource_data:
378        m = re.search(r'/zones/([^/]+)$', self._resource_data['zone'])
379        if not m:
380          raise RuntimeError("can't determine region of instance %s (%s)" %
381                             (self.name, self._resource_data['region']))
382        zone = m.group(1)
383        self._region = utils.zone_region(zone)
384      else:
385        raise RuntimeError(
386            f"can't determine region of instance {self.name}, zone isn't set!")
387    return self._region
388
389  @property
390  def zone(self) -> str:
391    zone_uri = self._resource_data['zone']
392    m = re.search(r'/zones/([^/]+)$', zone_uri)
393    if m:
394      return m.group(1)
395    else:
396      raise RuntimeError(f"can't determine zone of instance {self.name}")
397
398  @property
399  def disks(self) -> List[dict]:
400    if 'disks' in self._resource_data:
401      return self._resource_data['disks']
402    return []
403
404  @property
405  def boot_disk_licenses(self) -> List[str]:
406    """Returns license names associated with boot disk."""
407    for disk in self.disks:
408      if disk.get('boot'):
409        return [
410            l.partition('/global/licenses/')[2]
411            for l in disk.get('licenses', [])
412        ]
413    return []
414
415  @property
416  def guest_os_features(self) -> List[str]:
417    """Returns guestOsFeatures types associated with boot disk."""
418    for disk in self.disks:
419      if disk.get('boot'):
420        return [f['type'] for f in disk.get('guestOsFeatures', [])]
421    return []
422
423  @property
424  def startrestricted(self) -> bool:
425    return self._resource_data['startRestricted']
426
427  def laststarttimestamp(self) -> str:
428    return self._resource_data['lastStartTimestamp']
429
430  def laststoptimestamp(self) -> str:
431    if 'lastStopTimestamp' in self._resource_data:
432      return self._resource_data['lastStopTimestamp']
433    return ''
434
435  def is_serial_port_logging_enabled(self) -> bool:
436    value = self.get_metadata('serial-port-logging-enable')
437    return bool(value and value.upper() in POSITIVE_BOOL_VALUES)
438
439  def is_oslogin_enabled(self) -> bool:
440    value = self.get_metadata('enable-oslogin')
441    return bool(value and value.upper() in POSITIVE_BOOL_VALUES)
442
443  def is_metadata_enabled(self, metadata_name) -> bool:
444    """Use to check for common boolean metadata value"""
445    value = self.get_metadata(metadata_name)
446    return bool(value and value.upper() in POSITIVE_BOOL_VALUES)
447
448  def has_label(self, label) -> bool:
449    return label in self.labels
450
451  def is_dataproc_instance(self) -> bool:
452    return self.has_label(DATAPROC_LABEL)
453
454  def is_gke_node(self) -> bool:
455    return self.has_label(GKE_LABEL)
456
457  @property
458  def is_preemptible_vm(self) -> bool:
459    return ('scheduling' in self._resource_data and
460            'preemptible' in self._resource_data['scheduling'] and
461            self._resource_data['scheduling']['preemptible'])
462
463  def min_cpu_platform(self) -> str:
464    if 'minCpuPlatform' in self._resource_data:
465      return self._resource_data['minCpuPlatform']
466    return 'None'
467
468  @property
469  def created_by_mig(self) -> bool:
470    """Return bool indicating if the instance part of a mig.
471
472    MIG which were part of MIG however have been removed or terminated will
473    return True.
474    """
475    created_by = self.get_metadata('created-by')
476    if created_by is None:
477      return False
478
479    created_by_match = re.match(
480        r'projects/([^/]+)/((?:regions|zones)/[^/]+/instanceGroupManagers/[^/]+)$',
481        created_by,
482    )
483    if not created_by_match:
484      return False
485    return True
486
487  def is_windows_machine(self) -> bool:
488    if 'disks' in self._resource_data:
489      disks = next(iter(self._resource_data['disks']))
490      if 'guestOsFeatures' in disks:
491        if 'WINDOWS' in [t['type'] for t in iter(disks['guestOsFeatures'])]:
492          return True
493    return False
494
495  def is_public_machine(self) -> bool:
496    if 'networkInterfaces' in self._resource_data:
497      return 'natIP' in str(self._resource_data['networkInterfaces'])
498    return False
499
500  def machine_type(self):
501    if 'machineType' in self._resource_data:
502      # return self._resource_data['machineType']
503      machine_type_uri = self._resource_data['machineType']
504      mt = re.search(r'/machineTypes/([^/]+)$', machine_type_uri)
505      if mt:
506        return mt.group(1)
507      else:
508        raise RuntimeError(
509            f"can't determine machineType of instance {self.name}")
510    return None
511
512  def check_license(self, licenses: List[str]) -> bool:
513    """Checks that a license is contained in a given license list."""
514    if 'disks' in self._resource_data:
515      for disk in self._resource_data['disks']:
516        if 'license' in str(disk):
517          for license_ in licenses:
518            for attached_license in disk['licenses']:
519              if license_ == attached_license.partition('/global/licenses/')[2]:
520                return True
521    return False
522
523  def get_boot_disk_image(self) -> str:
524    """Get VM's boot disk image."""
525    boot_disk_image: str = ''
526    for disk in self.disks:
527      if disk.get('boot', False):
528        disk_source = disk.get('source', '')
529        m = re.search(r'/disks/([^/]+)$', disk_source)
530        if not m:
531          raise RuntimeError(f"can't determine name of boot disk {disk_source}")
532        disk_name = m.group(1)
533        gce_disk: Disk = get_disk(self.project_id,
534                                  zone=self.zone,
535                                  disk_name=disk_name)
536        return gce_disk.source_image
537    return boot_disk_image
538
539  @property
540  def is_sole_tenant_vm(self) -> bool:
541    return bool('nodeAffinities' in self._resource_data['scheduling'])
542
543  @property
544  def network(self) -> network_q.Network:
545    # 'https://www.googleapis.com/compute/v1/projects/gcpdiag-gce1-aaaa/global/networks/default'
546    network_string = self._resource_data['networkInterfaces'][0]['network']
547    m = re.match(r'^.+/projects/([^/]+)/global/networks/([^/]+)$',
548                 network_string)
549    if not m:
550      raise RuntimeError("can't parse network string: %s" % network_string)
551    return network_q.get_network(m.group(1),
552                                 m.group(2),
553                                 context=models.Context(project_id=m.group(1)))
554
555  @property
556  def network_ips(self) -> List[network_q.IPv4AddrOrIPv6Addr]:
557    return [
558        ipaddress.ip_address(nic['networkIP'])
559        for nic in self._resource_data['networkInterfaces']
560    ]
561
562  @property
563  def get_network_interfaces(self):
564    return self._resource_data['networkInterfaces']
565
566  @property
567  def subnetworks(self) -> List[network_q.Subnetwork]:
568    subnetworks = []
569    for nic in self._resource_data['networkInterfaces']:
570      subnetworks.append(network_q.get_subnetwork_from_url(nic['subnetwork']))
571    return subnetworks
572
573  @property
574  def routes(self) -> List[network_q.Route]:
575    routes = []
576    for nic in self._resource_data['networkInterfaces']:
577      for route in network_q.get_routes(self.project_id):
578        if nic['network'] == route.network:
579          if route.tags == []:
580            routes.append(route)
581            continue
582          else:
583            temp = [x for x in self.tags if x in route.tags]
584            if len(temp) > 0:
585              routes.append(route)
586    return routes
587
588  def get_network_ip_for_instance_interface(
589      self, network: str) -> Optional[network_q.IPv4NetOrIPv6Net]:
590    """Get the network ip for a nic given a network name."""
591    for nic in self._resource_data['networkInterfaces']:
592      if nic.get('network') == network:
593        return ipaddress.ip_network(nic.get('networkIP'))
594    return None
595
596  def secure_boot_enabled(self) -> bool:
597    if 'shieldedInstanceConfig' in self._resource_data:
598      return self._resource_data['shieldedInstanceConfig']['enableSecureBoot']
599    return False
600
601  @property
602  def access_scopes(self) -> List[str]:
603    if 'serviceAccounts' in self._resource_data:
604      saccts = self._resource_data['serviceAccounts']
605      if isinstance(saccts, list) and len(saccts) >= 1:
606        return saccts[0].get('scopes', [])
607    return []
608
609  @property
610  def service_account(self) -> Optional[str]:
611    if 'serviceAccounts' in self._resource_data:
612      saccts = self._resource_data['serviceAccounts']
613      if isinstance(saccts, list) and len(saccts) >= 1:
614        return saccts[0]['email']
615    return None
616
617  @property
618  def tags(self) -> List[str]:
619    if 'tags' in self._resource_data:
620      if 'items' in self._resource_data['tags']:
621        return self._resource_data['tags']['items']
622    return []
623
624  def get_metadata(self, key: str) -> str:
625    if not self._metadata_dict:
626      self._metadata_dict = {}
627      if ('metadata' in self._resource_data and
628          'items' in self._resource_data['metadata']):
629        for item in self._resource_data['metadata']['items']:
630          if 'key' in item and 'value' in item:
631            self._metadata_dict[item['key']] = item['value']
632    project_metadata = get_project_metadata(self.project_id)
633    return self._metadata_dict.get(key, project_metadata.get(key))
634
635  @property
636  def status(self) -> str:
637    """VM Status."""
638    return self._resource_data.get('status', None)
639
640  @property
641  def is_running(self) -> bool:
642    """VM Status is indicated as running."""
643    return self._resource_data.get('status', False) == 'RUNNING'
644
645  @property
646  def network_interface_count(self) -> int:
647    """Returns the number of network interfaces attached to the instance."""
648    return len(self._resource_data.get('networkInterfaces', []))
649
650  @property  # type: ignore
651  @caching.cached_api_call(in_memory=True)
652  def mig(self) -> ManagedInstanceGroup:
653    """Return ManagedInstanceGroup that owns this instance.
654
655    Throws AttributeError in case it isn't MIG-managed.
656    """
657
658    created_by = self.get_metadata('created-by')
659    if created_by is None:
660      raise AttributeError(f'instance {self.id} is not managed by a mig')
661
662    # Example created-by:
663    # pylint: disable=line-too-long
664    # "projects/12340002/zones/europe-west4-a/instanceGroupManagers/gke-gke1-default-pool-e5e20a34-grp"
665    # (note how it uses a project number and not a project id...)
666    created_by_match = re.match(
667        r'projects/([^/]+)/((?:regions|zones)/[^/]+/instanceGroupManagers/[^/]+)$',
668        created_by,
669    )
670    if not created_by_match:
671      raise AttributeError(f'instance {self.id} is not managed by a mig'
672                           f' (created-by={created_by})')
673    project = crm.get_project(created_by_match.group(1))
674
675    mig_self_link = ('https://www.googleapis.com/compute/v1/'
676                     f'projects/{project.id}/{created_by_match.group(2)}')
677
678    # Try to find a matching mig.
679    context = models.Context(project_id=self.project_id)
680    all_migs = list(get_managed_instance_groups(context).values()) + list(
681        get_region_managed_instance_groups(context).values())
682
683    for mig in all_migs:
684      if mig.self_link == mig_self_link:
685        return mig
686
687    raise AttributeError(f'MIG not found for instance {self.id}. '
688                         f'Created by: {created_by}')
689
690  @property
691  def labels(self) -> dict:
692    return self._resource_data.get('labels', {})

Represents a GCE instance.

Instance(project_id, resource_data)
335  def __init__(self, project_id, resource_data):
336    super().__init__(project_id=project_id)
337    self._resource_data = resource_data
338    self._metadata_dict = None
339    self._region = None
id: str
341  @property
342  def id(self) -> str:
343    return self._resource_data['id']
name: str
345  @property
346  def name(self) -> str:
347    return self._resource_data['name']
full_path: str
349  @property
350  def full_path(self) -> str:
351    result = re.match(
352        r'https://www.googleapis.com/compute/v1/(.*)',
353        self._resource_data['selfLink'],
354    )
355    if result:
356      return result.group(1)
357    else:
358      return '>> ' + self._resource_data['selfLink']

Returns the full path of this resource.

Example: 'projects/gcpdiag-gke-1-9b90/zones/europe-west4-a/clusters/gke1'

short_path: str
360  @property
361  def short_path(self) -> str:
362    # Note: instance names must be unique per project,
363    # so no need to add the zone.
364    path = self.project_id + '/' + self.name
365    return path

Returns the short name for this resource.

Note that it isn't clear from this name what kind of resource it is.

Example: 'gke1'

creation_timestamp: datetime.datetime
367  @property
368  def creation_timestamp(self) -> datetime:
369    """VM creation time, as a *naive* `datetime` object."""
370    return (datetime.fromisoformat(
371        self._resource_data['creationTimestamp']).astimezone(
372            timezone.utc).replace(tzinfo=None))

VM creation time, as a naive datetime object.

region: str
374  @property
375  def region(self) -> str:
376    if self._region is None:
377      if 'zone' in self._resource_data:
378        m = re.search(r'/zones/([^/]+)$', self._resource_data['zone'])
379        if not m:
380          raise RuntimeError("can't determine region of instance %s (%s)" %
381                             (self.name, self._resource_data['region']))
382        zone = m.group(1)
383        self._region = utils.zone_region(zone)
384      else:
385        raise RuntimeError(
386            f"can't determine region of instance {self.name}, zone isn't set!")
387    return self._region
zone: str
389  @property
390  def zone(self) -> str:
391    zone_uri = self._resource_data['zone']
392    m = re.search(r'/zones/([^/]+)$', zone_uri)
393    if m:
394      return m.group(1)
395    else:
396      raise RuntimeError(f"can't determine zone of instance {self.name}")
disks: List[dict]
398  @property
399  def disks(self) -> List[dict]:
400    if 'disks' in self._resource_data:
401      return self._resource_data['disks']
402    return []
boot_disk_licenses: List[str]
404  @property
405  def boot_disk_licenses(self) -> List[str]:
406    """Returns license names associated with boot disk."""
407    for disk in self.disks:
408      if disk.get('boot'):
409        return [
410            l.partition('/global/licenses/')[2]
411            for l in disk.get('licenses', [])
412        ]
413    return []

Returns license names associated with boot disk.

guest_os_features: List[str]
415  @property
416  def guest_os_features(self) -> List[str]:
417    """Returns guestOsFeatures types associated with boot disk."""
418    for disk in self.disks:
419      if disk.get('boot'):
420        return [f['type'] for f in disk.get('guestOsFeatures', [])]
421    return []

Returns guestOsFeatures types associated with boot disk.

startrestricted: bool
423  @property
424  def startrestricted(self) -> bool:
425    return self._resource_data['startRestricted']
def laststarttimestamp(self) -> str:
427  def laststarttimestamp(self) -> str:
428    return self._resource_data['lastStartTimestamp']
def laststoptimestamp(self) -> str:
430  def laststoptimestamp(self) -> str:
431    if 'lastStopTimestamp' in self._resource_data:
432      return self._resource_data['lastStopTimestamp']
433    return ''
def is_serial_port_logging_enabled(self) -> bool:
435  def is_serial_port_logging_enabled(self) -> bool:
436    value = self.get_metadata('serial-port-logging-enable')
437    return bool(value and value.upper() in POSITIVE_BOOL_VALUES)
def is_oslogin_enabled(self) -> bool:
439  def is_oslogin_enabled(self) -> bool:
440    value = self.get_metadata('enable-oslogin')
441    return bool(value and value.upper() in POSITIVE_BOOL_VALUES)
def is_metadata_enabled(self, metadata_name) -> bool:
443  def is_metadata_enabled(self, metadata_name) -> bool:
444    """Use to check for common boolean metadata value"""
445    value = self.get_metadata(metadata_name)
446    return bool(value and value.upper() in POSITIVE_BOOL_VALUES)

Use to check for common boolean metadata value

def has_label(self, label) -> bool:
448  def has_label(self, label) -> bool:
449    return label in self.labels
def is_dataproc_instance(self) -> bool:
451  def is_dataproc_instance(self) -> bool:
452    return self.has_label(DATAPROC_LABEL)
def is_gke_node(self) -> bool:
454  def is_gke_node(self) -> bool:
455    return self.has_label(GKE_LABEL)
is_preemptible_vm: bool
457  @property
458  def is_preemptible_vm(self) -> bool:
459    return ('scheduling' in self._resource_data and
460            'preemptible' in self._resource_data['scheduling'] and
461            self._resource_data['scheduling']['preemptible'])
def min_cpu_platform(self) -> str:
463  def min_cpu_platform(self) -> str:
464    if 'minCpuPlatform' in self._resource_data:
465      return self._resource_data['minCpuPlatform']
466    return 'None'
created_by_mig: bool
468  @property
469  def created_by_mig(self) -> bool:
470    """Return bool indicating if the instance part of a mig.
471
472    MIG which were part of MIG however have been removed or terminated will
473    return True.
474    """
475    created_by = self.get_metadata('created-by')
476    if created_by is None:
477      return False
478
479    created_by_match = re.match(
480        r'projects/([^/]+)/((?:regions|zones)/[^/]+/instanceGroupManagers/[^/]+)$',
481        created_by,
482    )
483    if not created_by_match:
484      return False
485    return True

Return bool indicating if the instance part of a mig.

MIG which were part of MIG however have been removed or terminated will return True.

def is_windows_machine(self) -> bool:
487  def is_windows_machine(self) -> bool:
488    if 'disks' in self._resource_data:
489      disks = next(iter(self._resource_data['disks']))
490      if 'guestOsFeatures' in disks:
491        if 'WINDOWS' in [t['type'] for t in iter(disks['guestOsFeatures'])]:
492          return True
493    return False
def is_public_machine(self) -> bool:
495  def is_public_machine(self) -> bool:
496    if 'networkInterfaces' in self._resource_data:
497      return 'natIP' in str(self._resource_data['networkInterfaces'])
498    return False
def machine_type(self):
500  def machine_type(self):
501    if 'machineType' in self._resource_data:
502      # return self._resource_data['machineType']
503      machine_type_uri = self._resource_data['machineType']
504      mt = re.search(r'/machineTypes/([^/]+)$', machine_type_uri)
505      if mt:
506        return mt.group(1)
507      else:
508        raise RuntimeError(
509            f"can't determine machineType of instance {self.name}")
510    return None
def check_license(self, licenses: List[str]) -> bool:
512  def check_license(self, licenses: List[str]) -> bool:
513    """Checks that a license is contained in a given license list."""
514    if 'disks' in self._resource_data:
515      for disk in self._resource_data['disks']:
516        if 'license' in str(disk):
517          for license_ in licenses:
518            for attached_license in disk['licenses']:
519              if license_ == attached_license.partition('/global/licenses/')[2]:
520                return True
521    return False

Checks that a license is contained in a given license list.

def get_boot_disk_image(self) -> str:
523  def get_boot_disk_image(self) -> str:
524    """Get VM's boot disk image."""
525    boot_disk_image: str = ''
526    for disk in self.disks:
527      if disk.get('boot', False):
528        disk_source = disk.get('source', '')
529        m = re.search(r'/disks/([^/]+)$', disk_source)
530        if not m:
531          raise RuntimeError(f"can't determine name of boot disk {disk_source}")
532        disk_name = m.group(1)
533        gce_disk: Disk = get_disk(self.project_id,
534                                  zone=self.zone,
535                                  disk_name=disk_name)
536        return gce_disk.source_image
537    return boot_disk_image

Get VM's boot disk image.

is_sole_tenant_vm: bool
539  @property
540  def is_sole_tenant_vm(self) -> bool:
541    return bool('nodeAffinities' in self._resource_data['scheduling'])
network: gcpdiag.queries.network.Network
543  @property
544  def network(self) -> network_q.Network:
545    # 'https://www.googleapis.com/compute/v1/projects/gcpdiag-gce1-aaaa/global/networks/default'
546    network_string = self._resource_data['networkInterfaces'][0]['network']
547    m = re.match(r'^.+/projects/([^/]+)/global/networks/([^/]+)$',
548                 network_string)
549    if not m:
550      raise RuntimeError("can't parse network string: %s" % network_string)
551    return network_q.get_network(m.group(1),
552                                 m.group(2),
553                                 context=models.Context(project_id=m.group(1)))
network_ips: List[Union[ipaddress.IPv4Address, ipaddress.IPv6Address]]
555  @property
556  def network_ips(self) -> List[network_q.IPv4AddrOrIPv6Addr]:
557    return [
558        ipaddress.ip_address(nic['networkIP'])
559        for nic in self._resource_data['networkInterfaces']
560    ]
get_network_interfaces
562  @property
563  def get_network_interfaces(self):
564    return self._resource_data['networkInterfaces']
subnetworks: List[gcpdiag.queries.network.Subnetwork]
566  @property
567  def subnetworks(self) -> List[network_q.Subnetwork]:
568    subnetworks = []
569    for nic in self._resource_data['networkInterfaces']:
570      subnetworks.append(network_q.get_subnetwork_from_url(nic['subnetwork']))
571    return subnetworks
routes: List[gcpdiag.queries.network.Route]
573  @property
574  def routes(self) -> List[network_q.Route]:
575    routes = []
576    for nic in self._resource_data['networkInterfaces']:
577      for route in network_q.get_routes(self.project_id):
578        if nic['network'] == route.network:
579          if route.tags == []:
580            routes.append(route)
581            continue
582          else:
583            temp = [x for x in self.tags if x in route.tags]
584            if len(temp) > 0:
585              routes.append(route)
586    return routes
def get_network_ip_for_instance_interface( self, network: str) -> Union[ipaddress.IPv4Network, ipaddress.IPv6Network, NoneType]:
588  def get_network_ip_for_instance_interface(
589      self, network: str) -> Optional[network_q.IPv4NetOrIPv6Net]:
590    """Get the network ip for a nic given a network name."""
591    for nic in self._resource_data['networkInterfaces']:
592      if nic.get('network') == network:
593        return ipaddress.ip_network(nic.get('networkIP'))
594    return None

Get the network ip for a nic given a network name.

def secure_boot_enabled(self) -> bool:
596  def secure_boot_enabled(self) -> bool:
597    if 'shieldedInstanceConfig' in self._resource_data:
598      return self._resource_data['shieldedInstanceConfig']['enableSecureBoot']
599    return False
access_scopes: List[str]
601  @property
602  def access_scopes(self) -> List[str]:
603    if 'serviceAccounts' in self._resource_data:
604      saccts = self._resource_data['serviceAccounts']
605      if isinstance(saccts, list) and len(saccts) >= 1:
606        return saccts[0].get('scopes', [])
607    return []
service_account: Optional[str]
609  @property
610  def service_account(self) -> Optional[str]:
611    if 'serviceAccounts' in self._resource_data:
612      saccts = self._resource_data['serviceAccounts']
613      if isinstance(saccts, list) and len(saccts) >= 1:
614        return saccts[0]['email']
615    return None
tags: List[str]
617  @property
618  def tags(self) -> List[str]:
619    if 'tags' in self._resource_data:
620      if 'items' in self._resource_data['tags']:
621        return self._resource_data['tags']['items']
622    return []
def get_metadata(self, key: str) -> str:
624  def get_metadata(self, key: str) -> str:
625    if not self._metadata_dict:
626      self._metadata_dict = {}
627      if ('metadata' in self._resource_data and
628          'items' in self._resource_data['metadata']):
629        for item in self._resource_data['metadata']['items']:
630          if 'key' in item and 'value' in item:
631            self._metadata_dict[item['key']] = item['value']
632    project_metadata = get_project_metadata(self.project_id)
633    return self._metadata_dict.get(key, project_metadata.get(key))
status: str
635  @property
636  def status(self) -> str:
637    """VM Status."""
638    return self._resource_data.get('status', None)

VM Status.

is_running: bool
640  @property
641  def is_running(self) -> bool:
642    """VM Status is indicated as running."""
643    return self._resource_data.get('status', False) == 'RUNNING'

VM Status is indicated as running.

network_interface_count: int
645  @property
646  def network_interface_count(self) -> int:
647    """Returns the number of network interfaces attached to the instance."""
648    return len(self._resource_data.get('networkInterfaces', []))

Returns the number of network interfaces attached to the instance.

mig: ManagedInstanceGroup
650  @property  # type: ignore
651  @caching.cached_api_call(in_memory=True)
652  def mig(self) -> ManagedInstanceGroup:
653    """Return ManagedInstanceGroup that owns this instance.
654
655    Throws AttributeError in case it isn't MIG-managed.
656    """
657
658    created_by = self.get_metadata('created-by')
659    if created_by is None:
660      raise AttributeError(f'instance {self.id} is not managed by a mig')
661
662    # Example created-by:
663    # pylint: disable=line-too-long
664    # "projects/12340002/zones/europe-west4-a/instanceGroupManagers/gke-gke1-default-pool-e5e20a34-grp"
665    # (note how it uses a project number and not a project id...)
666    created_by_match = re.match(
667        r'projects/([^/]+)/((?:regions|zones)/[^/]+/instanceGroupManagers/[^/]+)$',
668        created_by,
669    )
670    if not created_by_match:
671      raise AttributeError(f'instance {self.id} is not managed by a mig'
672                           f' (created-by={created_by})')
673    project = crm.get_project(created_by_match.group(1))
674
675    mig_self_link = ('https://www.googleapis.com/compute/v1/'
676                     f'projects/{project.id}/{created_by_match.group(2)}')
677
678    # Try to find a matching mig.
679    context = models.Context(project_id=self.project_id)
680    all_migs = list(get_managed_instance_groups(context).values()) + list(
681        get_region_managed_instance_groups(context).values())
682
683    for mig in all_migs:
684      if mig.self_link == mig_self_link:
685        return mig
686
687    raise AttributeError(f'MIG not found for instance {self.id}. '
688                         f'Created by: {created_by}')

Return ManagedInstanceGroup that owns this instance.

Throws AttributeError in case it isn't MIG-managed.

labels: dict
690  @property
691  def labels(self) -> dict:
692    return self._resource_data.get('labels', {})
class Disk(gcpdiag.models.Resource):
695class Disk(models.Resource):
696  """Represents a GCE disk."""
697
698  _resource_data: dict
699
700  def __init__(self, project_id, resource_data):
701    super().__init__(project_id=project_id)
702    self._resource_data = resource_data
703
704  @property
705  def id(self) -> str:
706    return self._resource_data['id']
707
708  @property
709  def name(self) -> str:
710    return self._resource_data['name']
711
712  @property
713  def type(self) -> str:
714    disk_type = re.search(r'/diskTypes/([^/]+)$', self._resource_data['type'])
715    if not disk_type:
716      raise RuntimeError("can't determine type of the disk %s (%s)" %
717                         (self.name, self._resource_data['type']))
718    return disk_type.group(1)
719
720  @property
721  def users(self) -> list:
722    pattern = r'/instances/(.+)$'
723    # Extracting the instances
724    instances = []
725    for i in self._resource_data.get('users', []):
726      m = re.search(pattern, i)
727      if m:
728        instances.append(m.group(1))
729    return instances
730
731  @property
732  def zone(self) -> str:
733    m = re.search(r'/zones/([^/]+)$', self._resource_data['zone'])
734    if not m:
735      raise RuntimeError("can't determine zone of disk %s (%s)" %
736                         (self.name, self._resource_data['zone']))
737    return m.group(1)
738
739  @property
740  def source_image(self) -> str:
741    return self._resource_data.get('sourceImage', '')
742
743  @property
744  def full_path(self) -> str:
745    result = re.match(
746        r'https://www.googleapis.com/compute/v1/(.*)',
747        self._resource_data['selfLink'],
748    )
749    if result:
750      return result.group(1)
751    else:
752      return '>> ' + self._resource_data['selfLink']
753
754  @property
755  def short_path(self) -> str:
756    return f'{self.project_id}/{self.name}'
757
758  @property
759  def bootable(self) -> bool:
760    return 'guestOsFeatures' in self._resource_data
761
762  @property
763  def in_use(self) -> bool:
764    return 'users' in self._resource_data
765
766  @property
767  def size(self) -> int:
768    return self._resource_data['sizeGb']
769
770  @property
771  def provisionediops(self) -> Optional[int]:
772    return self._resource_data.get('provisionedIops')
773
774  @property
775  def has_snapshot_schedule(self) -> bool:
776    return 'resourcePolicies' in self._resource_data

Represents a GCE disk.

Disk(project_id, resource_data)
700  def __init__(self, project_id, resource_data):
701    super().__init__(project_id=project_id)
702    self._resource_data = resource_data
id: str
704  @property
705  def id(self) -> str:
706    return self._resource_data['id']
name: str
708  @property
709  def name(self) -> str:
710    return self._resource_data['name']
type: str
712  @property
713  def type(self) -> str:
714    disk_type = re.search(r'/diskTypes/([^/]+)$', self._resource_data['type'])
715    if not disk_type:
716      raise RuntimeError("can't determine type of the disk %s (%s)" %
717                         (self.name, self._resource_data['type']))
718    return disk_type.group(1)
users: list
720  @property
721  def users(self) -> list:
722    pattern = r'/instances/(.+)$'
723    # Extracting the instances
724    instances = []
725    for i in self._resource_data.get('users', []):
726      m = re.search(pattern, i)
727      if m:
728        instances.append(m.group(1))
729    return instances
zone: str
731  @property
732  def zone(self) -> str:
733    m = re.search(r'/zones/([^/]+)$', self._resource_data['zone'])
734    if not m:
735      raise RuntimeError("can't determine zone of disk %s (%s)" %
736                         (self.name, self._resource_data['zone']))
737    return m.group(1)
source_image: str
739  @property
740  def source_image(self) -> str:
741    return self._resource_data.get('sourceImage', '')
full_path: str
743  @property
744  def full_path(self) -> str:
745    result = re.match(
746        r'https://www.googleapis.com/compute/v1/(.*)',
747        self._resource_data['selfLink'],
748    )
749    if result:
750      return result.group(1)
751    else:
752      return '>> ' + self._resource_data['selfLink']

Returns the full path of this resource.

Example: 'projects/gcpdiag-gke-1-9b90/zones/europe-west4-a/clusters/gke1'

short_path: str
754  @property
755  def short_path(self) -> str:
756    return f'{self.project_id}/{self.name}'

Returns the short name for this resource.

Note that it isn't clear from this name what kind of resource it is.

Example: 'gke1'

bootable: bool
758  @property
759  def bootable(self) -> bool:
760    return 'guestOsFeatures' in self._resource_data
in_use: bool
762  @property
763  def in_use(self) -> bool:
764    return 'users' in self._resource_data
size: int
766  @property
767  def size(self) -> int:
768    return self._resource_data['sizeGb']
provisionediops: Optional[int]
770  @property
771  def provisionediops(self) -> Optional[int]:
772    return self._resource_data.get('provisionedIops')
has_snapshot_schedule: bool
774  @property
775  def has_snapshot_schedule(self) -> bool:
776    return 'resourcePolicies' in self._resource_data
@caching.cached_api_call(in_memory=True)
def get_gce_zones(project_id: str) -> Set[str]:
779@caching.cached_api_call(in_memory=True)
780def get_gce_zones(project_id: str) -> Set[str]:
781  try:
782    gce_api = apis.get_api('compute', 'v1', project_id)
783    logging.debug('listing gce zones of project %s', project_id)
784    request = gce_api.zones().list(project=project_id)
785    response = request.execute(num_retries=config.API_RETRIES)
786    if not response or 'items' not in response:
787      return set()
788    return {item['name'] for item in response['items'] if 'name' in item}
789  except googleapiclient.errors.HttpError as err:
790    raise utils.GcpApiError(err) from err
def get_gce_public_licences(project_id: str) -> List[str]:
793def get_gce_public_licences(project_id: str) -> List[str]:
794  """Returns a list of licenses based on publicly available image project"""
795  licenses = []
796  gce_api = apis.get_api('compute', 'v1', project_id)
797  logging.debug('listing licenses of project %s', project_id)
798  request = gce_api.licenses().list(project=project_id)
799  while request is not None:
800    response = request.execute()
801    for license_ in response['items']:
802      formatted_license = license_['selfLink'].partition('/global/licenses/')[2]
803      licenses.append(formatted_license)
804    request = gce_api.licenses().list_next(previous_request=request,
805                                           previous_response=response)
806  return licenses

Returns a list of licenses based on publicly available image project

def get_instance( project_id: str, zone: str, instance_name: str) -> Instance:
809def get_instance(project_id: str, zone: str, instance_name: str) -> Instance:
810  """Returns instance object matching instance name and zone"""
811  compute = apis.get_api('compute', 'v1', project_id)
812  request = compute.instances().get(project=project_id,
813                                    zone=zone,
814                                    instance=instance_name)
815
816  response = request.execute(num_retries=config.API_RETRIES)
817  return Instance(project_id, resource_data=response)

Returns instance object matching instance name and zone

@caching.cached_api_call(in_memory=True)
def get_instance_by_id( project_id: str, instance_id: str) -> Optional[Instance]:
820@caching.cached_api_call(in_memory=True)
821def get_instance_by_id(project_id: str, instance_id: str) -> Optional[Instance]:
822  """Returns instance object matching instance id in the project.
823
824  Searches all zones.
825
826  Args:
827    project_id: The ID of the GCP project.
828    instance_id: The unique ID of the GCE instance.
829  """
830  if not apis.is_enabled(project_id, 'compute'):
831    return None
832  gce_api = apis.get_api('compute', 'v1', project_id)
833  # Use aggregatedList with filter to efficiently find the instance by ID.
834  request = gce_api.instances().aggregatedList(project=project_id,
835                                               filter=f'id eq {instance_id}',
836                                               returnPartialSuccess=True)
837
838  while request:
839    response = request.execute(num_retries=config.API_RETRIES)
840    items = response.get('items', {})
841    for _, data in items.items():
842      if 'instances' in data:
843        for instance_data in data['instances']:
844          if str(instance_data.get('id')) == str(instance_id):
845            return Instance(project_id, instance_data)
846
847    request = gce_api.instances().aggregatedList_next(
848        previous_request=request, previous_response=response)
849
850  return None

Returns instance object matching instance id in the project.

Searches all zones.

Arguments:
  • project_id: The ID of the GCP project.
  • instance_id: The unique ID of the GCE instance.
@caching.cached_api_call(in_memory=True)
def get_global_operations( project: str, filter_str: Optional[str] = None, order_by: Optional[str] = None, max_results: Optional[int] = None, service_project_number: Optional[int] = None) -> List[Dict[str, Any]]:
853@caching.cached_api_call(in_memory=True)
854def get_global_operations(
855    project: str,
856    filter_str: Optional[str] = None,
857    order_by: Optional[str] = None,
858    max_results: Optional[int] = None,
859    service_project_number: Optional[int] = None,
860) -> List[Dict[str, Any]]:
861  """Returns global operations object matching project id."""
862  compute = apis.get_api('compute', 'v1', project)
863  logging.debug(('searching compute global operations'
864                 'logs in project %s with filter %s'), project, filter_str)
865  operations: List[Dict[str, Any]] = []
866  request = compute.globalOperations().aggregatedList(
867      project=project,
868      filter=filter_str,
869      orderBy=order_by,
870      maxResults=max_results,
871      serviceProjectNumber=service_project_number,
872      returnPartialSuccess=True,
873  )
874  while request:
875    response = request.execute(num_retries=config.API_RETRIES)
876    operations_by_regions = response.get('items', {})
877    for _, data in operations_by_regions.items():
878      if 'operations' not in data:
879        continue
880      operations.extend(data['operations'])
881    request = compute.globalOperations().aggregatedList_next(
882        previous_request=request, previous_response=response)
883  return operations

Returns global operations object matching project id.

@caching.cached_api_call(in_memory=True)
def get_disk(project_id: str, zone: str, disk_name: str) -> Disk:
886@caching.cached_api_call(in_memory=True)
887def get_disk(project_id: str, zone: str, disk_name: str) -> Disk:
888  """Returns disk object matching disk name and zone."""
889  compute = apis.get_api('compute', 'v1', project_id)
890  request = compute.disks().get(project=project_id, zone=zone, disk=disk_name)
891  response = request.execute(num_retries=config.API_RETRIES)
892  return Disk(project_id, resource_data=response)

Returns disk object matching disk name and zone.

def get_instance_group_manager( project_id: str, zone: str, instance_group_manager_name: str) -> ManagedInstanceGroup:
895def get_instance_group_manager(
896    project_id: str, zone: str,
897    instance_group_manager_name: str) -> ManagedInstanceGroup:
898  """Get a zonal ManagedInstanceGroup object by name and zone.
899
900  Args:
901    project_id: The project ID of the instance group manager.
902    zone: The zone of the instance group manager.
903    instance_group_manager_name: The name of the instance group manager.
904
905  Returns:
906    A ManagedInstanceGroup object.
907
908  Raises:
909    utils.GcpApiError: If the API call fails.
910  """
911  compute = apis.get_api('compute', 'v1', project_id)
912  request = compute.instanceGroupManagers().get(
913      project=project_id,
914      zone=zone,
915      instanceGroupManager=instance_group_manager_name)
916  try:
917    response = request.execute(num_retries=config.API_RETRIES)
918    return ManagedInstanceGroup(project_id, resource_data=response)
919  except googleapiclient.errors.HttpError as err:
920    raise utils.GcpApiError(err) from err

Get a zonal ManagedInstanceGroup object by name and zone.

Arguments:
  • project_id: The project ID of the instance group manager.
  • zone: The zone of the instance group manager.
  • instance_group_manager_name: The name of the instance group manager.
Returns:

A ManagedInstanceGroup object.

Raises:
  • utils.GcpApiError: If the API call fails.
def get_region_instance_group_manager( project_id: str, region: str, instance_group_manager_name: str) -> ManagedInstanceGroup:
923def get_region_instance_group_manager(
924    project_id: str, region: str,
925    instance_group_manager_name: str) -> ManagedInstanceGroup:
926  """Get a regional ManagedInstanceGroup object by name and region.
927
928  Args:
929    project_id: The project ID of the instance group manager.
930    region: The region of the instance group manager.
931    instance_group_manager_name: The name of the instance group manager.
932
933  Returns:
934    A ManagedInstanceGroup object.
935
936  Raises:
937    utils.GcpApiError: If the API call fails.
938  """
939  compute = apis.get_api('compute', 'v1', project_id)
940  request = compute.regionInstanceGroupManagers().get(
941      project=project_id,
942      region=region,
943      instanceGroupManager=instance_group_manager_name,
944  )
945  try:
946    response = request.execute(num_retries=config.API_RETRIES)
947    return ManagedInstanceGroup(project_id, resource_data=response)
948  except googleapiclient.errors.HttpError as err:
949    raise utils.GcpApiError(err) from err

Get a regional ManagedInstanceGroup object by name and region.

Arguments:
  • project_id: The project ID of the instance group manager.
  • region: The region of the instance group manager.
  • instance_group_manager_name: The name of the instance group manager.
Returns:

A ManagedInstanceGroup object.

Raises:
  • utils.GcpApiError: If the API call fails.
def get_autoscaler( project_id: str, zone: str, autoscaler_name: str) -> Autoscaler:
952def get_autoscaler(project_id: str, zone: str,
953                   autoscaler_name: str) -> Autoscaler:
954  """Get a zonal Autoscaler object by name and zone."""
955  compute = apis.get_api('compute', 'v1', project_id)
956  request = compute.autoscalers().get(project=project_id,
957                                      zone=zone,
958                                      autoscaler=autoscaler_name)
959  try:
960    response = request.execute(num_retries=config.API_RETRIES)
961    return Autoscaler(project_id, resource_data=response)
962  except googleapiclient.errors.HttpError as err:
963    raise utils.GcpApiError(err) from err

Get a zonal Autoscaler object by name and zone.

def get_region_autoscaler( project_id: str, region: str, autoscaler_name: str) -> Autoscaler:
966def get_region_autoscaler(project_id: str, region: str,
967                          autoscaler_name: str) -> Autoscaler:
968  """Get a regional Autoscaler object by name and region."""
969  compute = apis.get_api('compute', 'v1', project_id)
970  request = compute.regionAutoscalers().get(project=project_id,
971                                            region=region,
972                                            autoscaler=autoscaler_name)
973  try:
974    response = request.execute(num_retries=config.API_RETRIES)
975    return Autoscaler(project_id, resource_data=response)
976  except googleapiclient.errors.HttpError as err:
977    raise utils.GcpApiError(err) from err

Get a regional Autoscaler object by name and region.

@caching.cached_api_call(in_memory=True)
def get_instances( context: gcpdiag.models.Context) -> Mapping[str, Instance]:
 980@caching.cached_api_call(in_memory=True)
 981def get_instances(context: models.Context) -> Mapping[str, Instance]:
 982  """Get a list of Instance matching the given context, indexed by instance id."""
 983
 984  instances: Dict[str, Instance] = {}
 985  if not apis.is_enabled(context.project_id, 'compute'):
 986    return instances
 987  gce_api = apis.get_api('compute', 'v1', context.project_id)
 988  request = gce_api.instances().aggregatedList(project=context.project_id,
 989                                               returnPartialSuccess=True)
 990  logging.debug('listing gce instances of project %s', context.project_id)
 991  while request:  # Continue as long as there are pages
 992    response = request.execute(num_retries=config.API_RETRIES)
 993    instances_by_zones = response.get('items', {})
 994    for _, data_ in instances_by_zones.items():
 995      if 'instances' not in data_:
 996        continue
 997      for instance in data_['instances']:
 998        result = re.match(
 999            r'https://www.googleapis.com/compute/v1/projects/[^/]+/zones/([^/]+)/',
1000            instance['selfLink'],
1001        )
1002        if not result:
1003          logging.error(
1004              "instance %s selfLink didn't match regexp: %s",
1005              instance['id'],
1006              instance['selfLink'],
1007          )
1008          continue
1009        zone = result.group(1)
1010        labels = instance.get('labels', {})
1011        if not context.match_project_resource(
1012            resource=instance.get('name'), location=zone,
1013            labels=labels) and not context.match_project_resource(
1014                resource=instance.get('id'), location=zone, labels=labels):
1015          continue
1016        instances.update({
1017            instance['id']:
1018                Instance(project_id=context.project_id, resource_data=instance)
1019        })
1020    request = gce_api.instances().aggregatedList_next(
1021        previous_request=request, previous_response=response)
1022  return instances

Get a list of Instance matching the given context, indexed by instance id.

@caching.cached_api_call(in_memory=True)
def get_instance_groups( context: gcpdiag.models.Context) -> Mapping[str, InstanceGroup]:
1025@caching.cached_api_call(in_memory=True)
1026def get_instance_groups(context: models.Context) -> Mapping[str, InstanceGroup]:
1027  """Get a list of InstanceGroups matching the given context, indexed by name."""
1028  groups: Dict[str, InstanceGroup] = {}
1029  if not apis.is_enabled(context.project_id, 'compute'):
1030    return groups
1031  gce_api = apis.get_api('compute', 'v1', context.project_id)
1032  request = gce_api.instanceGroups().aggregatedList(project=context.project_id,
1033                                                    returnPartialSuccess=True)
1034  logging.debug('listing gce instance groups of project %s', context.project_id)
1035  while request:  # Continue as long as there are pages
1036    response = request.execute(num_retries=config.API_RETRIES)
1037    groups_by_zones = response.get('items', {})
1038    for _, data_ in groups_by_zones.items():
1039      if 'instanceGroups' not in data_:
1040        continue
1041      for group in data_['instanceGroups']:
1042        result = re.match(
1043            r'https://www.googleapis.com/compute/v1/projects/[^/]+/(zones|regions)/([^/]+)',
1044            group['selfLink'],
1045        )
1046        if not result:
1047          logging.error(
1048              "instance %s selfLink didn't match regexp: %s",
1049              group['id'],
1050              group['selfLink'],
1051          )
1052          continue
1053        location = result.group(2)
1054        labels = group.get('labels', {})
1055        resource = group.get('name', '')
1056        if not context.match_project_resource(
1057            location=location, labels=labels, resource=resource):
1058          continue
1059        instance_group = InstanceGroup(context.project_id, resource_data=group)
1060        groups[instance_group.full_path] = instance_group
1061    request = gce_api.instanceGroups().aggregatedList_next(
1062        previous_request=request, previous_response=response)
1063  return groups

Get a list of InstanceGroups matching the given context, indexed by name.

@caching.cached_api_call(in_memory=True)
def get_managed_instance_groups( context: gcpdiag.models.Context) -> Mapping[int, ManagedInstanceGroup]:
1066@caching.cached_api_call(in_memory=True)
1067def get_managed_instance_groups(
1068    context: models.Context,) -> Mapping[int, ManagedInstanceGroup]:
1069  """Get a list of zonal ManagedInstanceGroups matching the given context, indexed by mig id."""
1070
1071  migs: Dict[int, ManagedInstanceGroup] = {}
1072  if not apis.is_enabled(context.project_id, 'compute'):
1073    return migs
1074  gce_api = apis.get_api('compute', 'v1', context.project_id)
1075  request = gce_api.instanceGroupManagers().aggregatedList(
1076      project=context.project_id, returnPartialSuccess=True)
1077  logging.debug('listing zonal managed instance groups of project %s',
1078                context.project_id)
1079  while request:  # Continue as long as there are pages
1080    response = request.execute(num_retries=config.API_RETRIES)
1081    migs_by_zones = response.get('items', {})
1082    for _, data_ in migs_by_zones.items():
1083      if 'instanceGroupManagers' not in data_:
1084        continue
1085      for mig in data_['instanceGroupManagers']:
1086        result = re.match(
1087            r'https://www.googleapis.com/compute/v1/projects/[^/]+/(?:regions|zones)/([^/]+)/',
1088            mig['selfLink'],
1089        )
1090        if not result:
1091          logging.error(
1092              "mig %s selfLink didn't match regexp: %s",
1093              mig['name'],
1094              mig['selfLink'],
1095          )
1096          continue
1097        location = result.group(1)
1098        labels = mig.get('labels', {})
1099        resource = mig.get('name', '')
1100        if not context.match_project_resource(
1101            location=location, labels=labels, resource=resource):
1102          continue
1103        migs[mig['id']] = ManagedInstanceGroup(project_id=context.project_id,
1104                                               resource_data=mig)
1105    request = gce_api.instanceGroupManagers().aggregatedList_next(
1106        previous_request=request, previous_response=response)
1107  return migs

Get a list of zonal ManagedInstanceGroups matching the given context, indexed by mig id.

@caching.cached_api_call(in_memory=True)
def get_region_managed_instance_groups( context: gcpdiag.models.Context) -> Mapping[int, ManagedInstanceGroup]:
1110@caching.cached_api_call(in_memory=True)
1111def get_region_managed_instance_groups(
1112    context: models.Context,) -> Mapping[int, ManagedInstanceGroup]:
1113  """Get a list of regional ManagedInstanceGroups matching the given context, indexed by mig id."""
1114
1115  migs: Dict[int, ManagedInstanceGroup] = {}
1116  if not apis.is_enabled(context.project_id, 'compute'):
1117    return migs
1118  gce_api = apis.get_api('compute', 'v1', context.project_id)
1119  requests = [
1120      gce_api.regionInstanceGroupManagers().list(project=context.project_id,
1121                                                 region=r.name)
1122      for r in get_all_regions(context.project_id)
1123  ]
1124  logging.debug(
1125      'listing regional managed instance groups of project %s',
1126      context.project_id,
1127  )
1128  items = apis_utils.execute_concurrently_with_pagination(
1129      api=gce_api,
1130      requests=requests,
1131      next_function=gce_api.regionInstanceGroupManagers().list_next,
1132      context=context,
1133      log_text=
1134      f'listing regional managed instance groups of project {context.project_id}'
1135  )
1136  for i in items:
1137    result = re.match(
1138        r'https://www.googleapis.com/compute/v1/projects/[^/]+/(?:regions)/([^/]+)/',
1139        i['selfLink'],
1140    )
1141    if not result:
1142      logging.error("mig %s selfLink didn't match regexp: %s", i['name'],
1143                    i['selfLink'])
1144      continue
1145    location = result.group(1)
1146    labels = i.get('labels', {})
1147    name = i.get('name', '')
1148    if not context.match_project_resource(
1149        location=location, labels=labels, resource=name):
1150      continue
1151    migs[i['id']] = ManagedInstanceGroup(project_id=context.project_id,
1152                                         resource_data=i)
1153  return migs

Get a list of regional ManagedInstanceGroups matching the given context, indexed by mig id.

@caching.cached_api_call
def get_instance_templates(project_id: str) -> Mapping[str, InstanceTemplate]:
1156@caching.cached_api_call
1157def get_instance_templates(project_id: str) -> Mapping[str, InstanceTemplate]:
1158  logging.info('fetching instance templates')
1159  templates = {}
1160  gce_api = apis.get_api('compute', 'v1', project_id)
1161  request = gce_api.instanceTemplates().list(
1162      project=project_id,
1163      returnPartialSuccess=True,
1164      # Fetch only a subset of the fields to improve performance.
1165      fields=('items/name, items/properties/tags,'
1166              ' items/properties/networkInterfaces,'
1167              ' items/properties/serviceAccounts, items/properties/metadata'),
1168  )
1169  for t in apis_utils.list_all(
1170      request, next_function=gce_api.instanceTemplates().list_next):
1171    instance_template = InstanceTemplate(project_id, t)
1172    templates[instance_template.full_path] = instance_template
1173  return templates
@caching.cached_api_call
def get_project_metadata(project_id) -> Mapping[str, str]:
1176@caching.cached_api_call
1177def get_project_metadata(project_id) -> Mapping[str, str]:
1178  gce_api = apis.get_api('compute', 'v1', project_id)
1179  logging.debug('fetching metadata of project %s\n', project_id)
1180  query = gce_api.projects().get(project=project_id)
1181  try:
1182    response = query.execute(num_retries=config.API_RETRIES)
1183  except googleapiclient.errors.HttpError as err:
1184    raise utils.GcpApiError(err) from err
1185
1186  mapped_metadata: Dict[str, str] = {}
1187  metadata = response.get('commonInstanceMetadata')
1188  if metadata and 'items' in metadata:
1189    for m_item in metadata['items']:
1190      mapped_metadata[m_item.get('key')] = m_item.get('value')
1191  return mapped_metadata
@caching.cached_api_call
def get_instances_serial_port_output(context: gcpdiag.models.Context):
1194@caching.cached_api_call
1195def get_instances_serial_port_output(context: models.Context):
1196  """Get a list of serial port output for instances
1197
1198  which matches the given context, running and is not
1199  exported to cloud logging.
1200  """
1201  # Create temp storage (diskcache.Deque) for output
1202  deque = caching.get_tmp_deque('tmp-gce-serial-output-')
1203  if not apis.is_enabled(context.project_id, 'compute'):
1204    return deque
1205  gce_api = apis.get_api('compute', 'v1', context.project_id)
1206
1207  # Serial port output are rolled over on day 7 and limited to 1MB.
1208  # Fetching serial outputs are very expensive so optimize to fetch.
1209  # Only relevant instances as storage size can grow drastically for
1210  # massive projects. Think 1MB * N where N is some large number.
1211  requests = [
1212      gce_api.instances().getSerialPortOutput(
1213          project=i.project_id,
1214          zone=i.zone,
1215          instance=i.id,
1216          # To get all 1mb output
1217          start=-1000000,
1218      )
1219      for i in get_instances(context).values()
1220      # fetch running instances that do not export to cloud logging
1221      if not i.is_serial_port_logging_enabled() and i.is_running
1222  ]
1223  requests_start_time = datetime.now()
1224  # Note: We are limited to 1000 calls in a single batch request.
1225  # We have to use multiple batch requests in batches of 1000
1226  # https://github.com/googleapis/google-api-python-client/blob/main/docs/batch.md
1227  batch_size = 1000
1228  for i in range(0, len(requests), batch_size):
1229    batch_requests = requests[i:i + batch_size]
1230    for _, response, exception in apis_utils.execute_concurrently(
1231        api=gce_api, requests=batch_requests, context=context):
1232      if exception:
1233        if isinstance(exception, googleapiclient.errors.HttpError):
1234          raise utils.GcpApiError(exception) from exception
1235        else:
1236          raise exception
1237
1238      if response:
1239        result = re.match(
1240            r'https://www.googleapis.com/compute/v1/projects/([^/]+)/zones/[^/]+/instances/([^/]+)',
1241            response['selfLink'],
1242        )
1243        if not result:
1244          logging.error("instance selfLink didn't match regexp: %s",
1245                        response['selfLink'])
1246          return
1247
1248        project_id = result.group(1)
1249        instance_id = result.group(2)
1250        deque.appendleft(
1251            SerialPortOutput(
1252                project_id=project_id,
1253                instance_id=instance_id,
1254                contents=response['contents'].splitlines(),
1255            ))
1256  requests_end_time = datetime.now()
1257  logging.debug(
1258      'total serial logs processing time: %s, number of instances: %s',
1259      requests_end_time - requests_start_time,
1260      len(requests),
1261  )
1262  return deque

Get a list of serial port output for instances

which matches the given context, running and is not exported to cloud logging.

@caching.cached_api_call
def get_instance_serial_port_output( project_id, zone, instance_name) -> Optional[SerialPortOutput]:
1265@caching.cached_api_call
1266def get_instance_serial_port_output(
1267    project_id, zone, instance_name) -> Optional[SerialPortOutput]:
1268  """Get a list of serial port output for instances
1269
1270  which matches the given context, running and is not
1271  exported to cloud logging.
1272  """
1273  # Create temp storage (diskcache.Deque) for output
1274  if not apis.is_enabled(project_id, 'compute'):
1275    return None
1276  gce_api = apis.get_api('compute', 'v1', project_id)
1277
1278  request = gce_api.instances().getSerialPortOutput(
1279      project=project_id,
1280      zone=zone,
1281      instance=instance_name,
1282      # To get all 1mb output
1283      start=-1000000,
1284  )
1285  try:
1286    response = request.execute(num_retries=config.API_RETRIES)
1287  except googleapiclient.errors.HttpError:
1288    return None
1289
1290  if response:
1291    result = re.match(
1292        r'https://www.googleapis.com/compute/v1/projects/([^/]+)/zones/[^/]+/instances/([^/]+)',
1293        response['selfLink'],
1294    )
1295    if not result:
1296      logging.error("instance selfLink didn't match regexp: %s",
1297                    response['selfLink'])
1298      return None
1299
1300    project_id = result.group(1)
1301    instance_id = result.group(2)
1302    return SerialPortOutput(
1303        project_id,
1304        instance_id=instance_id,
1305        contents=response['contents'].splitlines(),
1306    )
1307  return None

Get a list of serial port output for instances

which matches the given context, running and is not exported to cloud logging.

class Region(gcpdiag.models.Resource):
1310class Region(models.Resource):
1311  """Represents a GCE Region."""
1312
1313  _resource_data: dict
1314
1315  def __init__(self, project_id, resource_data):
1316    super().__init__(project_id=project_id)
1317    self._resource_data = resource_data
1318
1319  @property
1320  def self_link(self) -> str:
1321    return self._resource_data['selfLink']
1322
1323  @property
1324  def full_path(self) -> str:
1325    result = re.match(r'https://www.googleapis.com/compute/v1/(.*)',
1326                      self.self_link)
1327    if result:
1328      return result.group(1)
1329    else:
1330      return f'>> {self.self_link}'
1331
1332  @property
1333  def name(self) -> str:
1334    return self._resource_data['name']

Represents a GCE Region.

Region(project_id, resource_data)
1315  def __init__(self, project_id, resource_data):
1316    super().__init__(project_id=project_id)
1317    self._resource_data = resource_data
full_path: str
1323  @property
1324  def full_path(self) -> str:
1325    result = re.match(r'https://www.googleapis.com/compute/v1/(.*)',
1326                      self.self_link)
1327    if result:
1328      return result.group(1)
1329    else:
1330      return f'>> {self.self_link}'

Returns the full path of this resource.

Example: 'projects/gcpdiag-gke-1-9b90/zones/europe-west4-a/clusters/gke1'

name: str
1332  @property
1333  def name(self) -> str:
1334    return self._resource_data['name']
@caching.cached_api_call
def get_all_regions(project_id: str) -> Iterable[Region]:
1337@caching.cached_api_call
1338def get_all_regions(project_id: str) -> Iterable[Region]:
1339  """Return list of all regions
1340
1341  Args:
1342      project_id (str): project id for this request
1343
1344  Raises:
1345      utils.GcpApiError: Raises GcpApiError in case of query issues
1346
1347  Returns:
1348      Iterable[Region]: Return list of all regions
1349  """
1350  try:
1351    gce_api = apis.get_api('compute', 'v1', project_id)
1352    request = gce_api.regions().list(project=project_id)
1353    response = request.execute(num_retries=config.API_RETRIES)
1354    if not response or 'items' not in response:
1355      return set()
1356
1357    return {
1358        Region(project_id, item) for item in response['items'] if 'name' in item
1359    }
1360  except googleapiclient.errors.HttpError as err:
1361    raise utils.GcpApiError(err) from err

Return list of all regions

Arguments:
  • project_id (str): project id for this request
Raises:
  • utils.GcpApiError: Raises GcpApiError in case of query issues
Returns:

Iterable[Region]: Return list of all regions

def get_regions_with_instances(context: gcpdiag.models.Context) -> Iterable[Region]:
1364def get_regions_with_instances(context: models.Context) -> Iterable[Region]:
1365  """Return list of regions with instances
1366
1367  Args:
1368      context (models.Context): context for this request
1369
1370  Returns:
1371      Iterable[Region]: Return list of regions which contains instances
1372  """
1373
1374  regions_of_instances = {i.region for i in get_instances(context).values()}
1375
1376  all_regions = get_all_regions(context.project_id)
1377  if not all_regions:
1378    return set()
1379
1380  return {r for r in all_regions if r.name in regions_of_instances}

Return list of regions with instances

Arguments:
  • context (models.Context): context for this request
Returns:

Iterable[Region]: Return list of regions which contains instances

@caching.cached_api_call
def get_all_disks(context: gcpdiag.models.Context) -> Iterable[Disk]:
1383@caching.cached_api_call
1384def get_all_disks(context: models.Context) -> Iterable[Disk]:
1385  """Get all disks in a project, matching the context.
1386
1387  Args:
1388    context: The project context.
1389
1390  Returns:
1391    An iterable of Disk objects.
1392  """
1393  project_id = context.project_id
1394  # Fetching only Zonal Disks(Regional disks exempted)
1395  try:
1396    gce_api = apis.get_api('compute', 'v1', project_id)
1397    requests = [
1398        gce_api.disks().list(project=project_id, zone=zone)
1399        for zone in get_gce_zones(project_id)
1400    ]
1401
1402    logging.debug('listing gce disks of project %s', project_id)
1403
1404    items = apis_utils.execute_concurrently_with_pagination(
1405        api=gce_api,
1406        requests=requests,
1407        next_function=gce_api.disks().list_next,
1408        context=context,
1409        log_text=f'listing GCE disks of project {project_id}')
1410
1411    return {Disk(project_id, item) for item in items}
1412
1413  except googleapiclient.errors.HttpError as err:
1414    raise utils.GcpApiError(err) from err

Get all disks in a project, matching the context.

Arguments:
  • context: The project context.
Returns:

An iterable of Disk objects.

@caching.cached_api_call
def get_all_disks_of_instance(context: gcpdiag.models.Context, zone: str, instance_name: str) -> dict:
1417@caching.cached_api_call
1418def get_all_disks_of_instance(context: models.Context, zone: str,
1419                              instance_name: str) -> dict:
1420  """Get all disks of a given instance.
1421
1422  Args:
1423    context: The project context.
1424    zone: The zone of the instance.
1425    instance_name: The name of the instance.
1426
1427  Returns:
1428    A dict of Disk objects keyed by disk name.
1429  """
1430  project_id = context.project_id
1431  # Fetching only Zonal Disks(Regional disks exempted) attached to an instance
1432  try:
1433    gce_api = apis.get_api('compute', 'v1', project_id)
1434    requests = [gce_api.disks().list(project=project_id, zone=zone)]
1435    logging.debug(
1436        'listing gce disks attached to instance %s in project %s',
1437        instance_name,
1438        project_id,
1439    )
1440
1441    items = apis_utils.execute_concurrently_with_pagination(
1442        api=gce_api,
1443        requests=requests,
1444        next_function=gce_api.disks().list_next,
1445        context=context,
1446        log_text=
1447        f'listing gce disks attached to instance {instance_name} in project {project_id}'
1448    )
1449    all_disk_list = {Disk(project_id, item) for item in items}
1450    disk_list = {}
1451    for disk in all_disk_list:
1452      if disk.users == [instance_name]:
1453        disk_list[disk.name] = disk
1454    return disk_list
1455
1456  except googleapiclient.errors.HttpError as err:
1457    raise utils.GcpApiError(err) from err

Get all disks of a given instance.

Arguments:
  • context: The project context.
  • zone: The zone of the instance.
  • instance_name: The name of the instance.
Returns:

A dict of Disk objects keyed by disk name.

class InstanceEffectiveFirewalls(gcpdiag.queries.network.EffectiveFirewalls):
1460class InstanceEffectiveFirewalls(network_q.EffectiveFirewalls):
1461  """Effective firewall rules for a network interface on a VM instance.
1462
1463  Includes org/folder firewall policies).
1464  """
1465
1466  _instance: Instance
1467  _nic: str
1468
1469  def __init__(self, instance, nic, resource_data):
1470    super().__init__(resource_data)
1471    self._instance = instance
1472    self._nic = nic

Effective firewall rules for a network interface on a VM instance.

Includes org/folder firewall policies).

InstanceEffectiveFirewalls(instance, nic, resource_data)
1469  def __init__(self, instance, nic, resource_data):
1470    super().__init__(resource_data)
1471    self._instance = instance
1472    self._nic = nic
@caching.cached_api_call(in_memory=True)
def get_instance_interface_effective_firewalls( instance: Instance, nic: str) -> InstanceEffectiveFirewalls:
1475@caching.cached_api_call(in_memory=True)
1476def get_instance_interface_effective_firewalls(
1477    instance: Instance, nic: str) -> InstanceEffectiveFirewalls:
1478  """Return effective firewalls for a network interface on the instance."""
1479  compute = apis.get_api('compute', 'v1', instance.project_id)
1480  request = compute.instances().getEffectiveFirewalls(
1481      project=instance.project_id,
1482      zone=instance.zone,
1483      instance=instance.name,
1484      networkInterface=nic,
1485  )
1486  response = request.execute(num_retries=config.API_RETRIES)
1487  return InstanceEffectiveFirewalls(Instance, nic, response)

Return effective firewalls for a network interface on the instance.

def is_project_serial_port_logging_enabled(project_id: str) -> bool:
1490def is_project_serial_port_logging_enabled(project_id: str) -> bool:
1491  if not apis.is_enabled(project_id, 'compute'):
1492    return False
1493
1494  value = get_project_metadata(
1495      project_id=project_id).get('serial-port-logging-enable')
1496  return bool(value and value.upper() in POSITIVE_BOOL_VALUES)
def is_serial_port_buffer_enabled():
1499def is_serial_port_buffer_enabled():
1500  return config.get('enable_gce_serial_buffer')
class SerialOutputQuery:
1517class SerialOutputQuery:
1518  """A serial output job that was started with prefetch_logs()."""
1519
1520  job: _SerialOutputJob
1521
1522  def __init__(self, job):
1523    self.job = job
1524
1525  @property
1526  def entries(self) -> Sequence:
1527    if not self.job.future:
1528      raise RuntimeError("Fetching serial logs wasn't executed. did you call"
1529                         ' execute_get_serial_port_output()?')
1530    elif self.job.future.running():
1531      logging.debug(
1532          'waiting for serial output results for project: %s',
1533          self.job.context.project_id,
1534      )
1535    return self.job.future.result()

A serial output job that was started with prefetch_logs().

SerialOutputQuery(job)
1522  def __init__(self, job):
1523    self.job = job
job: gcpdiag.queries.gce._SerialOutputJob
entries: Sequence
1525  @property
1526  def entries(self) -> Sequence:
1527    if not self.job.future:
1528      raise RuntimeError("Fetching serial logs wasn't executed. did you call"
1529                         ' execute_get_serial_port_output()?')
1530    elif self.job.future.running():
1531      logging.debug(
1532          'waiting for serial output results for project: %s',
1533          self.job.context.project_id,
1534      )
1535    return self.job.future.result()
jobs_todo: Dict[gcpdiag.models.Context, gcpdiag.queries.gce._SerialOutputJob] = {}
def execute_fetch_serial_port_outputs(query_executor: gcpdiag.executor.ContextAwareExecutor):
1541def execute_fetch_serial_port_outputs(
1542    query_executor: executor.ContextAwareExecutor):
1543  # start a thread to fetch serial log; processing logs can be large
1544  # depending on he number of instances in the project which aren't
1545  # logging to cloud logging. currently expects only one job but
1546  # implementing it so support for multiple projects is possible.
1547  global jobs_todo
1548  jobs_executing = jobs_todo
1549  jobs_todo = {}
1550  # query_executor = get_executor(context)
1551  for job in jobs_executing.values():
1552    job.future = query_executor.submit(get_instances_serial_port_output,
1553                                       job.context)
def fetch_serial_port_outputs(context: gcpdiag.models.Context) -> SerialOutputQuery:
1556def fetch_serial_port_outputs(context: models.Context) -> SerialOutputQuery:
1557  # Aggregate by context
1558  job = jobs_todo.setdefault(context, _SerialOutputJob(context=context))
1559  return SerialOutputQuery(job=job)
class HealthCheck(gcpdiag.models.Resource):
1563class HealthCheck(models.Resource):
1564  """A Health Check resource."""
1565
1566  _resource_data: dict
1567  _type: str
1568
1569  def __init__(self, project_id, resource_data):
1570    super().__init__(project_id=project_id)
1571    self._resource_data = resource_data
1572
1573  @property
1574  def name(self) -> str:
1575    return self._resource_data['name']
1576
1577  @property
1578  def full_path(self) -> str:
1579    result = re.match(r'https://www.googleapis.com/compute/v1/(.*)',
1580                      self.self_link)
1581    if result:
1582      return result.group(1)
1583    else:
1584      return f'>> {self.self_link}'
1585
1586  @property
1587  def short_path(self) -> str:
1588    path = self.project_id + '/' + self.name
1589    return path
1590
1591  @property
1592  def self_link(self) -> str:
1593    return self._resource_data['selfLink']
1594
1595  @property
1596  def is_log_enabled(self) -> bool:
1597    try:
1598      log_config = self._resource_data.get('logConfig', False)
1599      if log_config and log_config['enable']:
1600        return True
1601    except KeyError:
1602      return False
1603    return False
1604
1605  @property
1606  def region(self):
1607    url = self._resource_data.get('region', '')
1608    match = re.search(r'/([^/]+)/?$', url)
1609    if match:
1610      region = match.group(1)
1611      return region
1612    return None
1613
1614  @property
1615  def type(self) -> str:
1616    return self._resource_data['type']
1617
1618  @property
1619  def request_path(self) -> str:
1620    return self.get_health_check_property('requestPath', '/')
1621
1622  @property
1623  def request(self) -> str:
1624    return self.get_health_check_property('request')
1625
1626  @property
1627  def response(self) -> str:
1628    return self.get_health_check_property('response')
1629
1630  @property
1631  def port(self) -> int:
1632    return self.get_health_check_property('port')
1633
1634  @property
1635  def port_specification(self) -> str:
1636    return self.get_health_check_property('portSpecification', 'USE_FIXED_PORT')
1637
1638  @property
1639  def timeout_sec(self) -> int:
1640    return self._resource_data.get('timeoutSec', 5)
1641
1642  @property
1643  def check_interval_sec(self) -> int:
1644    return self._resource_data.get('checkIntervalSec', 5)
1645
1646  @property
1647  def unhealthy_threshold(self) -> int:
1648    return self._resource_data.get('unhealthyThreshold', 2)
1649
1650  @property
1651  def healthy_threshold(self) -> int:
1652    return self._resource_data.get('healthyThreshold', 2)
1653
1654  def get_health_check_property(self, property_name: str, default_value=None):
1655    health_check_types = {
1656        'HTTP': 'httpHealthCheck',
1657        'HTTPS': 'httpsHealthCheck',
1658        'HTTP2': 'http2HealthCheck',
1659        'TCP': 'tcpHealthCheck',
1660        'SSL': 'sslHealthCheck',
1661        'GRPC': 'grpcHealthCheck',
1662    }
1663    if self.type in health_check_types:
1664      health_check_data = self._resource_data.get(health_check_types[self.type])
1665      if health_check_data:
1666        return health_check_data.get(property_name) or default_value
1667    return default_value

A Health Check resource.

HealthCheck(project_id, resource_data)
1569  def __init__(self, project_id, resource_data):
1570    super().__init__(project_id=project_id)
1571    self._resource_data = resource_data
name: str
1573  @property
1574  def name(self) -> str:
1575    return self._resource_data['name']
full_path: str
1577  @property
1578  def full_path(self) -> str:
1579    result = re.match(r'https://www.googleapis.com/compute/v1/(.*)',
1580                      self.self_link)
1581    if result:
1582      return result.group(1)
1583    else:
1584      return f'>> {self.self_link}'

Returns the full path of this resource.

Example: 'projects/gcpdiag-gke-1-9b90/zones/europe-west4-a/clusters/gke1'

short_path: str
1586  @property
1587  def short_path(self) -> str:
1588    path = self.project_id + '/' + self.name
1589    return path

Returns the short name for this resource.

Note that it isn't clear from this name what kind of resource it is.

Example: 'gke1'

is_log_enabled: bool
1595  @property
1596  def is_log_enabled(self) -> bool:
1597    try:
1598      log_config = self._resource_data.get('logConfig', False)
1599      if log_config and log_config['enable']:
1600        return True
1601    except KeyError:
1602      return False
1603    return False
region
1605  @property
1606  def region(self):
1607    url = self._resource_data.get('region', '')
1608    match = re.search(r'/([^/]+)/?$', url)
1609    if match:
1610      region = match.group(1)
1611      return region
1612    return None
type: str
1614  @property
1615  def type(self) -> str:
1616    return self._resource_data['type']
request_path: str
1618  @property
1619  def request_path(self) -> str:
1620    return self.get_health_check_property('requestPath', '/')
request: str
1622  @property
1623  def request(self) -> str:
1624    return self.get_health_check_property('request')
response: str
1626  @property
1627  def response(self) -> str:
1628    return self.get_health_check_property('response')
port: int
1630  @property
1631  def port(self) -> int:
1632    return self.get_health_check_property('port')
port_specification: str
1634  @property
1635  def port_specification(self) -> str:
1636    return self.get_health_check_property('portSpecification', 'USE_FIXED_PORT')
timeout_sec: int
1638  @property
1639  def timeout_sec(self) -> int:
1640    return self._resource_data.get('timeoutSec', 5)
check_interval_sec: int
1642  @property
1643  def check_interval_sec(self) -> int:
1644    return self._resource_data.get('checkIntervalSec', 5)
unhealthy_threshold: int
1646  @property
1647  def unhealthy_threshold(self) -> int:
1648    return self._resource_data.get('unhealthyThreshold', 2)
healthy_threshold: int
1650  @property
1651  def healthy_threshold(self) -> int:
1652    return self._resource_data.get('healthyThreshold', 2)
def get_health_check_property(self, property_name: str, default_value=None):
1654  def get_health_check_property(self, property_name: str, default_value=None):
1655    health_check_types = {
1656        'HTTP': 'httpHealthCheck',
1657        'HTTPS': 'httpsHealthCheck',
1658        'HTTP2': 'http2HealthCheck',
1659        'TCP': 'tcpHealthCheck',
1660        'SSL': 'sslHealthCheck',
1661        'GRPC': 'grpcHealthCheck',
1662    }
1663    if self.type in health_check_types:
1664      health_check_data = self._resource_data.get(health_check_types[self.type])
1665      if health_check_data:
1666        return health_check_data.get(property_name) or default_value
1667    return default_value
@caching.cached_api_call(in_memory=True)
def get_health_check(project_id: str, health_check: str, region: str = None) -> object:
1670@caching.cached_api_call(in_memory=True)
1671def get_health_check(project_id: str,
1672                     health_check: str,
1673                     region: str = None) -> object:
1674  compute = apis.get_api('compute', 'v1', project_id)
1675  if not region or region == 'global':
1676    request = compute.healthChecks().get(project=project_id,
1677                                         healthCheck=health_check)
1678  else:
1679    request = compute.regionHealthChecks().get(project=project_id,
1680                                               healthCheck=health_check,
1681                                               region=region)
1682  response = request.execute(num_retries=config.API_RETRIES)
1683  return HealthCheck(project_id, response)
class NetworkEndpointGroup(gcpdiag.models.Resource):
1686class NetworkEndpointGroup(models.Resource):
1687  """A Network Endpoint Group resource."""
1688
1689  _resource_data: dict
1690  _type: str
1691
1692  def __init__(self, project_id, resource_data):
1693    super().__init__(project_id=project_id)
1694    self._resource_data = resource_data
1695
1696  @property
1697  def name(self) -> str:
1698    return self._resource_data['name']
1699
1700  @property
1701  def id(self) -> str:
1702    return self._resource_data['id']
1703
1704  @property
1705  def full_path(self) -> str:
1706    result = re.match(r'https://www.googleapis.com/compute/v1/(.*)',
1707                      self.self_link)
1708    if result:
1709      return result.group(1)
1710    else:
1711      return f'>> {self.self_link}'
1712
1713  @property
1714  def short_path(self) -> str:
1715    path = self.project_id + '/' + self.name
1716    return path
1717
1718  @property
1719  def self_link(self) -> str:
1720    return self._resource_data['selfLink']

A Network Endpoint Group resource.

NetworkEndpointGroup(project_id, resource_data)
1692  def __init__(self, project_id, resource_data):
1693    super().__init__(project_id=project_id)
1694    self._resource_data = resource_data
name: str
1696  @property
1697  def name(self) -> str:
1698    return self._resource_data['name']
id: str
1700  @property
1701  def id(self) -> str:
1702    return self._resource_data['id']
full_path: str
1704  @property
1705  def full_path(self) -> str:
1706    result = re.match(r'https://www.googleapis.com/compute/v1/(.*)',
1707                      self.self_link)
1708    if result:
1709      return result.group(1)
1710    else:
1711      return f'>> {self.self_link}'

Returns the full path of this resource.

Example: 'projects/gcpdiag-gke-1-9b90/zones/europe-west4-a/clusters/gke1'

short_path: str
1713  @property
1714  def short_path(self) -> str:
1715    path = self.project_id + '/' + self.name
1716    return path

Returns the short name for this resource.

Note that it isn't clear from this name what kind of resource it is.

Example: 'gke1'

@caching.cached_api_call(in_memory=True)
def get_zonal_network_endpoint_groups( context: gcpdiag.models.Context) -> Mapping[str, NetworkEndpointGroup]:
1723@caching.cached_api_call(in_memory=True)
1724def get_zonal_network_endpoint_groups(
1725    context: models.Context,) -> Mapping[str, NetworkEndpointGroup]:
1726  """Returns a list of Network Endpoint Groups in the project."""
1727  groups: Dict[str, NetworkEndpointGroup] = {}
1728  if not apis.is_enabled(context.project_id, 'compute'):
1729    return groups
1730  gce_api = apis.get_api('compute', 'v1', context.project_id)
1731  requests = [
1732      gce_api.networkEndpointGroups().list(project=context.project_id,
1733                                           zone=zone)
1734      for zone in get_gce_zones(context.project_id)
1735  ]
1736  logging.debug('listing gce networkEndpointGroups of project %s',
1737                context.project_id)
1738  items = apis_utils.execute_concurrently_with_pagination(
1739      api=gce_api,
1740      requests=requests,
1741      next_function=gce_api.networkEndpointGroups().list_next,
1742      context=context,
1743      log_text=(
1744          f'listing gce networkEndpointGroups of project {context.project_id}'),
1745  )
1746
1747  for i in items:
1748    result = re.match(
1749        r'https://www.googleapis.com/compute/v1/projects/[^/]+/zones/([^/]+)',
1750        i['selfLink'],
1751    )
1752    if not result:
1753      logging.error("instance %s selfLink didn't match regexp: %s", i['id'],
1754                    i['selfLink'])
1755      continue
1756    zone = result.group(1)
1757    labels = i.get('labels', {})
1758    resource = i.get('name', '')
1759    if not context.match_project_resource(
1760        location=zone, labels=labels, resource=resource):
1761      continue
1762    data = NetworkEndpointGroup(context.project_id, i)
1763    groups[data.full_path] = data
1764  return groups

Returns a list of Network Endpoint Groups in the project.