gcpdiag.queries.gke

Queries related to GCP Kubernetes Engine clusters.
IPv4NetOrIPv6Net = typing.Union[ipaddress.IPv4Network, ipaddress.IPv6Network]
DEFAULT_MAX_PODS_PER_NODE = 110
class NodeConfig:
42class NodeConfig:
43  """Represents a GKE node pool configuration."""
44
45  def __init__(self, resource_data):
46    self._resource_data = resource_data
47
48  def has_accelerators(self) -> bool:
49    if 'accelerators' in self._resource_data:
50      return True
51    return False
52
53  @property
54  def machine_type(self) -> str:
55    return self._resource_data['machineType']
56
57  @property
58  def image_type(self) -> str:
59    return self._resource_data['imageType']
60
61  @property
62  def oauth_scopes(self) -> list:
63    return self._resource_data['oauthScopes']
64
65  @property
66  def has_serial_port_logging_enabled(self) -> bool:
67    """ Check if serial port logging is enabled in the node config.
68
69    Returns:
70      bool: True if serial port logging is enabled or not explicitly disabled.
71            False if explicitly disabled.
72    """
73    metadata = self._resource_data.get('metadata', {})
74    return metadata.get('serial-port-logging-enable', 'true').lower() == 'true'

Represents a GKE node pool configuration.

NodeConfig(resource_data)
45  def __init__(self, resource_data):
46    self._resource_data = resource_data
def has_accelerators(self) -> bool:
48  def has_accelerators(self) -> bool:
49    if 'accelerators' in self._resource_data:
50      return True
51    return False
machine_type: str
53  @property
54  def machine_type(self) -> str:
55    return self._resource_data['machineType']
image_type: str
57  @property
58  def image_type(self) -> str:
59    return self._resource_data['imageType']
oauth_scopes: list
61  @property
62  def oauth_scopes(self) -> list:
63    return self._resource_data['oauthScopes']
has_serial_port_logging_enabled: bool
65  @property
66  def has_serial_port_logging_enabled(self) -> bool:
67    """ Check if serial port logging is enabled in the node config.
68
69    Returns:
70      bool: True if serial port logging is enabled or not explicitly disabled.
71            False if explicitly disabled.
72    """
73    metadata = self._resource_data.get('metadata', {})
74    return metadata.get('serial-port-logging-enable', 'true').lower() == 'true'

Check if serial port logging is enabled in the node config.

Returns:

bool: True if serial port logging is enabled or not explicitly disabled. False if explicitly disabled.

class NodePool(gcpdiag.models.Resource):
 77class NodePool(models.Resource):
 78  """Represents a GKE node pool."""
 79
 80  version: Version
 81
 82  def __init__(self, cluster, resource_data):
 83    super().__init__(project_id=cluster.project_id)
 84    self._cluster = cluster
 85    self._resource_data = resource_data
 86    self.version = Version(self._resource_data['version'])
 87    self._migs = None
 88
 89  def _get_service_account(self) -> str:
 90    return self._resource_data.get('config', {}).get('serviceAccount', None)
 91
 92  @property
 93  def full_path(self) -> str:
 94    # https://container.googleapis.com/v1/projects/gcpdiag-gke1-aaaa/
 95    #   locations/europe-west1/clusters/gke2/nodePools/default-pool
 96    m = re.match(r'https://container.googleapis.com/v1/(.*)',
 97                 self._resource_data.get('selfLink', ''))
 98    if not m:
 99      raise RuntimeError('can\'t parse selfLink of nodepool resource')
100    return m.group(1)
101
102  @property
103  def short_path(self) -> str:
104    path = self.full_path
105    path = re.sub(r'^projects/', '', path)
106    path = re.sub(r'/locations/', '/', path)
107    path = re.sub(r'/zones/', '/', path)
108    path = re.sub(r'/clusters/', '/', path)
109    path = re.sub(r'/nodePools/', '/', path)
110    return path
111
112  @property
113  def name(self) -> str:
114    return self._resource_data['name']
115
116  @property
117  def config(self) -> NodeConfig:
118    return NodeConfig(self._resource_data['config'])
119
120  @property
121  def node_count(self) -> int:
122    return self._resource_data.get('initialNodeCount', 0)
123
124  def has_default_service_account(self) -> bool:
125    sa = self._get_service_account()
126    return sa == 'default'
127
128  def has_image_streaming_enabled(self) -> bool:
129    return get_path(self._resource_data, ('config', 'gcfsConfig', 'enabled'),
130                    default=False)
131
132  def has_md_concealment_enabled(self) -> bool:
133    # Empty ({}) workloadMetadataConfig means that 'Metadata concealment'
134    # (predecessor of Workload Identity) is enabled.
135    # https://cloud.google.com/kubernetes-engine/docs/how-to/protecting-cluster-metadata#concealment
136    return get_path(self._resource_data, ('config', 'workloadMetadataConfig'),
137                    default=None) == {}
138
139  def has_workload_identity_enabled(self) -> bool:
140    # 'Metadata concealment' (workloadMetadataConfig == {}) doesn't protect the
141    # default SA's token
142    return bool(
143        get_path(self._resource_data, ('config', 'workloadMetadataConfig'),
144                 default=None))
145
146  @property
147  def service_account(self) -> str:
148    sa = self._get_service_account()
149    if sa == 'default':
150      project_nr = crm.get_project(self.project_id).number
151      return f'{project_nr}-compute@developer.gserviceaccount.com'
152    else:
153      return sa
154
155  @property
156  def pod_ipv4_cidr_size(self) -> int:
157    return self._resource_data['podIpv4CidrSize']
158
159  @property
160  def pod_ipv4_cidr_block(self) -> Optional[IPv4NetOrIPv6Net]:
161    # Get the pod cidr range in use by the nodepool
162    pod_cidr = get_path(self._resource_data,
163                        ('networkConfig', 'podIpv4CidrBlock'),
164                        default=None)
165
166    if pod_cidr:
167      return ipaddress.ip_network(pod_cidr)
168    else:
169      return None
170
171  @property
172  def max_pod_per_node(self) -> int:
173    return int(
174        get_path(self._resource_data, ('maxPodsConstraint', 'maxPodsPerNode'),
175                 default=DEFAULT_MAX_PODS_PER_NODE))
176
177  @property
178  def cluster(self) -> 'Cluster':
179    return self._cluster
180
181  @property
182  def instance_groups(self) -> List[gce.ManagedInstanceGroup]:
183    if self._migs is None:
184      project_migs_by_selflink = {}
185      for m in gce.get_managed_instance_groups(
186          models.Context(project_id=self.project_id)).values():
187        project_migs_by_selflink[m.self_link] = m
188
189      self._migs = []
190      for url in self._resource_data.get('instanceGroupUrls', []):
191        try:
192          self._migs.append(project_migs_by_selflink[url])
193        except KeyError:
194          continue
195    return self._migs
196
197  @property
198  def node_tags(self) -> List[str]:
199    """Returns the firewall tags used for nodes in this cluster.
200
201    If the node tags can't be determined, [] is returned.
202    """
203    migs = self.instance_groups
204    if not migs:
205      return []
206    return migs[0].template.tags
207
208  def get_machine_type(self) -> str:
209    """Returns the machine type of the nodepool nodes"""
210    return self.config.machine_type

Represents a GKE node pool.

NodePool(cluster, resource_data)
82  def __init__(self, cluster, resource_data):
83    super().__init__(project_id=cluster.project_id)
84    self._cluster = cluster
85    self._resource_data = resource_data
86    self.version = Version(self._resource_data['version'])
87    self._migs = None
version: gcpdiag.utils.Version
full_path: str
 92  @property
 93  def full_path(self) -> str:
 94    # https://container.googleapis.com/v1/projects/gcpdiag-gke1-aaaa/
 95    #   locations/europe-west1/clusters/gke2/nodePools/default-pool
 96    m = re.match(r'https://container.googleapis.com/v1/(.*)',
 97                 self._resource_data.get('selfLink', ''))
 98    if not m:
 99      raise RuntimeError('can\'t parse selfLink of nodepool resource')
100    return m.group(1)

Returns the full path of this resource.

Example: 'projects/gcpdiag-gke-1-9b90/zones/europe-west4-a/clusters/gke1'

short_path: str
102  @property
103  def short_path(self) -> str:
104    path = self.full_path
105    path = re.sub(r'^projects/', '', path)
106    path = re.sub(r'/locations/', '/', path)
107    path = re.sub(r'/zones/', '/', path)
108    path = re.sub(r'/clusters/', '/', path)
109    path = re.sub(r'/nodePools/', '/', path)
110    return path

Returns the short name for this resource.

Note that it isn't clear from this name what kind of resource it is.

Example: 'gke1'

name: str
112  @property
113  def name(self) -> str:
114    return self._resource_data['name']
config: NodeConfig
116  @property
117  def config(self) -> NodeConfig:
118    return NodeConfig(self._resource_data['config'])
node_count: int
120  @property
121  def node_count(self) -> int:
122    return self._resource_data.get('initialNodeCount', 0)
def has_default_service_account(self) -> bool:
124  def has_default_service_account(self) -> bool:
125    sa = self._get_service_account()
126    return sa == 'default'
def has_image_streaming_enabled(self) -> bool:
128  def has_image_streaming_enabled(self) -> bool:
129    return get_path(self._resource_data, ('config', 'gcfsConfig', 'enabled'),
130                    default=False)
def has_md_concealment_enabled(self) -> bool:
132  def has_md_concealment_enabled(self) -> bool:
133    # Empty ({}) workloadMetadataConfig means that 'Metadata concealment'
134    # (predecessor of Workload Identity) is enabled.
135    # https://cloud.google.com/kubernetes-engine/docs/how-to/protecting-cluster-metadata#concealment
136    return get_path(self._resource_data, ('config', 'workloadMetadataConfig'),
137                    default=None) == {}
def has_workload_identity_enabled(self) -> bool:
139  def has_workload_identity_enabled(self) -> bool:
140    # 'Metadata concealment' (workloadMetadataConfig == {}) doesn't protect the
141    # default SA's token
142    return bool(
143        get_path(self._resource_data, ('config', 'workloadMetadataConfig'),
144                 default=None))
service_account: str
146  @property
147  def service_account(self) -> str:
148    sa = self._get_service_account()
149    if sa == 'default':
150      project_nr = crm.get_project(self.project_id).number
151      return f'{project_nr}-compute@developer.gserviceaccount.com'
152    else:
153      return sa
pod_ipv4_cidr_size: int
155  @property
156  def pod_ipv4_cidr_size(self) -> int:
157    return self._resource_data['podIpv4CidrSize']
pod_ipv4_cidr_block: Union[ipaddress.IPv4Network, ipaddress.IPv6Network, NoneType]
159  @property
160  def pod_ipv4_cidr_block(self) -> Optional[IPv4NetOrIPv6Net]:
161    # Get the pod cidr range in use by the nodepool
162    pod_cidr = get_path(self._resource_data,
163                        ('networkConfig', 'podIpv4CidrBlock'),
164                        default=None)
165
166    if pod_cidr:
167      return ipaddress.ip_network(pod_cidr)
168    else:
169      return None
max_pod_per_node: int
171  @property
172  def max_pod_per_node(self) -> int:
173    return int(
174        get_path(self._resource_data, ('maxPodsConstraint', 'maxPodsPerNode'),
175                 default=DEFAULT_MAX_PODS_PER_NODE))
cluster: Cluster
177  @property
178  def cluster(self) -> 'Cluster':
179    return self._cluster
instance_groups: List[gcpdiag.queries.gce.ManagedInstanceGroup]
181  @property
182  def instance_groups(self) -> List[gce.ManagedInstanceGroup]:
183    if self._migs is None:
184      project_migs_by_selflink = {}
185      for m in gce.get_managed_instance_groups(
186          models.Context(project_id=self.project_id)).values():
187        project_migs_by_selflink[m.self_link] = m
188
189      self._migs = []
190      for url in self._resource_data.get('instanceGroupUrls', []):
191        try:
192          self._migs.append(project_migs_by_selflink[url])
193        except KeyError:
194          continue
195    return self._migs
node_tags: List[str]
197  @property
198  def node_tags(self) -> List[str]:
199    """Returns the firewall tags used for nodes in this cluster.
200
201    If the node tags can't be determined, [] is returned.
202    """
203    migs = self.instance_groups
204    if not migs:
205      return []
206    return migs[0].template.tags

Returns the firewall tags used for nodes in this cluster.

If the node tags can't be determined, [] is returned.

def get_machine_type(self) -> str:
208  def get_machine_type(self) -> str:
209    """Returns the machine type of the nodepool nodes"""
210    return self.config.machine_type

Returns the machine type of the nodepool nodes

class UndefinedClusterPropertyError(builtins.Exception):
213class UndefinedClusterPropertyError(Exception):
214  """Thrown when a property of a cluster can't be determined for
215  some reason. For example, the cluster_hash can't be determined
216  because there are no nodepools defined."""
217  pass

Thrown when a property of a cluster can't be determined for some reason. For example, the cluster_hash can't be determined because there are no nodepools defined.

class Cluster(gcpdiag.models.Resource):
220class Cluster(models.Resource):
221  """Represents a GKE cluster.
222
223  https://cloud.google.com/kubernetes-engine/docs/reference/rest/v1/projects.locations.clusters#Cluster
224  """
225  _resource_data: dict
226  master_version: Version
227  _context: models.Context
228  _nodepools: Optional[List[NodePool]]
229
230  def __init__(self, project_id, resource_data, context: models.Context):
231    super().__init__(project_id=project_id)
232    self._resource_data = resource_data
233    self.master_version = Version(self._resource_data['currentMasterVersion'])
234    self._nodepools = None
235    self._context = context
236
237  @property
238  def full_path(self) -> str:
239    if utils.is_region(self._resource_data['location']):
240      return (f'projects/{self.project_id}/'
241              f'locations/{self.location}/clusters/{self.name}')
242    else:
243      return (f'projects/{self.project_id}/'
244              f'zones/{self.location}/clusters/{self.name}')
245
246  @property
247  def short_path(self) -> str:
248    path = self.full_path
249    path = re.sub(r'^projects/', '', path)
250    path = re.sub(r'/locations/', '/', path)
251    path = re.sub(r'/zones/', '/', path)
252    path = re.sub(r'/clusters/', '/', path)
253    return path
254
255  @property
256  def name(self) -> str:
257    return self._resource_data['name']
258
259  @property
260  def location(self) -> str:
261    return self._resource_data['location']
262
263  @property
264  def pod_ipv4_cidr(self) -> IPv4NetOrIPv6Net:
265    cidr = self._resource_data['clusterIpv4Cidr']
266    return ipaddress.ip_network(cidr)
267
268  @property
269  def current_node_count(self) -> int:
270    return self._resource_data.get('currentNodeCount', 0)
271
272  @property
273  def release_channel(self) -> Optional[str]:
274    try:
275      return self._resource_data['releaseChannel']['channel']
276    except KeyError:
277      return None
278
279  @property
280  def nap_node_image_type(self) -> Optional[str]:
281
282    return get_path(
283        self._resource_data,
284        ('autoscaling', 'autoprovisioningNodePoolDefaults', 'imageType'),
285        default=None)
286
287  @property
288  def app_layer_sec_key(self) -> str:
289    return self._resource_data['databaseEncryption'].get('keyName')
290
291  @property
292  def status(self) -> str:
293    return self._resource_data['status']
294
295  @property
296  def status_message(self) -> str:
297    return self._resource_data.get('statusMessage', None)
298
299  def has_app_layer_enc_enabled(self) -> bool:
300    # state := 'DECRYPTED' | 'ENCRYPTED', keyName := 'full_path_to_key_resouce'
301    return get_path(self._resource_data, ('databaseEncryption', 'state'),
302                    default=None) == 'ENCRYPTED'
303
304  def has_logging_enabled(self) -> bool:
305    return self._resource_data['loggingService'] != 'none'
306
307  def enabled_logging_components(self) -> List[str]:
308    return self._resource_data['loggingConfig']['componentConfig'][
309        'enableComponents']
310
311  def has_monitoring_enabled(self) -> bool:
312    return self._resource_data['monitoringService'] != 'none'
313
314  def enabled_monitoring_components(self) -> List[str]:
315    return self._resource_data['monitoringConfig']['componentConfig'][
316        'enableComponents']
317
318  def has_authenticator_group_enabled(self) -> bool:
319    return len(self._resource_data.get('authenticatorGroupsConfig', {})) > 0
320
321  def has_workload_identity_enabled(self) -> bool:
322    return len(self._resource_data.get('workloadIdentityConfig', {})) > 0
323
324  def has_http_load_balancing_enabled(self) -> bool:
325    # HTTP load balancing needs to be enabled to use GKE ingress
326    return not (get_path(self._resource_data,
327                         ('addonsConfig', 'httpLoadBalancing', 'disabled'),
328                         default=None) is True)
329
330  def has_network_policy_enabled(self) -> bool:
331    # Network policy enforcement
332    return get_path(self._resource_data,
333                    ('addonsConfig', 'networkPolicyConfig', 'disabled'),
334                    default=False) is not True
335
336  def has_dpv2_enabled(self) -> bool:
337    # Checks whether dataplane V2 is enabled in clusters
338    return (get_path(self._resource_data, ('networkConfig', 'datapathProvider'),
339                     default=None) == 'ADVANCED_DATAPATH')
340
341  def has_intra_node_visibility_enabled(self) -> bool:
342    if ('networkConfig' in self._resource_data and
343        'enableIntraNodeVisibility' in self._resource_data['networkConfig']):
344      return self._resource_data['networkConfig']['enableIntraNodeVisibility']
345    return False
346
347  def has_maintenance_window(self) -> bool:
348    # 'e3b0c442' is a hexadecimal string that represents the value of an empty
349    # string ('') in cryptography. If the maintenance windows are defined, the
350    # value of 'resourceVersion' is not empty ('e3b0c442').
351    return self._resource_data['maintenancePolicy'][
352        'resourceVersion'] != 'e3b0c442'
353
354  def has_image_streaming_enabled(self) -> bool:
355    """
356    Check if cluster has Image Streaming (aka  Google Container File System)
357    enabled
358    """
359    global_gcsfs = get_path(
360        self._resource_data,
361        ('nodePoolDefaults', 'nodeConfigDefaults', 'gcfsConfig', 'enabled'),
362        default=False)
363    # Check nodePoolDefaults settings
364    if global_gcsfs:
365      return True
366    for np in self.nodepools:
367      # Check if any nodepool has image streaming enabled
368      if np.has_image_streaming_enabled():
369        return True
370    return False
371
372  @property
373  def nodepools(self) -> List[NodePool]:
374    if self._nodepools is None:
375      self._nodepools = []
376      for n in self._resource_data.get('nodePools', []):
377        self._nodepools.append(NodePool(self, n))
378    return self._nodepools
379
380  @property
381  def network(self) -> network.Network:
382    # projects/gcpdiag-gke1-aaaa/global/networks/default
383    network_string = self._resource_data['networkConfig']['network']
384    m = re.match(r'projects/([^/]+)/global/networks/([^/]+)$', network_string)
385    if not m:
386      raise RuntimeError("can't parse network string: %s" % network_string)
387    return network.get_network(m.group(1), m.group(2), self._context)
388
389  @property
390  def subnetwork(self) -> Optional[models.Resource]:
391    # 'projects/gcpdiag-gke1-aaaa/regions/europe-west4/subnetworks/default'
392    if 'subnetwork' not in self._resource_data['networkConfig']:
393      return None
394
395    subnetwork_string = self._resource_data['networkConfig']['subnetwork']
396    m = re.match(r'projects/([^/]+)/regions/([^/]+)/subnetworks/([^/]+)$',
397                 subnetwork_string)
398    if not m:
399      raise RuntimeError("can't parse network string: %s" % subnetwork_string)
400    return network.get_subnetwork(m.group(1), m.group(2), m.group(3))
401
402  @property
403  def get_subnet_name(self) -> Optional[models.Resource]:
404    if 'subnetwork' not in self._resource_data:
405      return None
406    return self._resource_data['subnetwork']
407
408  @property
409  def get_nodepool_config(self) -> Optional[models.Resource]:
410    if 'nodePools' not in self._resource_data:
411      return None
412    return self._resource_data['nodePools']
413
414  @property
415  def get_network_string(self) -> str:
416    if 'networkConfig' not in self._resource_data:
417      return ''
418    if 'network' not in self._resource_data['networkConfig']:
419      return ''
420    return self._resource_data['networkConfig']['network']
421
422  @property
423  def is_private(self) -> bool:
424    if not 'privateClusterConfig' in self._resource_data:
425      return False
426
427    return self._resource_data['privateClusterConfig'].get(
428        'enablePrivateNodes', False)
429
430  @property
431  def is_vpc_native(self) -> bool:
432    return (get_path(self._resource_data,
433                     ('ipAllocationPolicy', 'useIpAliases'),
434                     default=False))
435
436  @property
437  def is_regional(self) -> bool:
438    return len(self._resource_data['locations']) > 1
439
440  @property
441  def cluster_ca_certificate(self) -> str:
442    return self._resource_data['masterAuth']['clusterCaCertificate']
443
444  @property
445  def endpoint(self) -> Optional[str]:
446    if 'endpoint' not in self._resource_data:
447      return None
448    return self._resource_data['endpoint']
449
450  @property
451  def is_autopilot(self) -> bool:
452    if not 'autopilot' in self._resource_data:
453      return False
454    return self._resource_data['autopilot'].get('enabled', False)
455
456  @property
457  def masters_cidr_list(self) -> Iterable[IPv4NetOrIPv6Net]:
458    if get_path(self._resource_data,
459                ('privateClusterConfig', 'masterIpv4CidrBlock'),
460                default=None):
461      return [
462          ipaddress.ip_network(self._resource_data['privateClusterConfig']
463                               ['masterIpv4CidrBlock'])
464      ]
465    else:
466      #only older clusters still have ssh firewall rules
467      if self.current_node_count and not self.cluster_hash:
468        logging.warning("couldn't retrieve cluster hash for cluster %s.",
469                        self.name)
470        return []
471      fw_rule_name = f'gke-{self.name}-{self.cluster_hash}-ssh'
472      rule = self.network.firewall.get_vpc_ingress_rules(name=fw_rule_name)
473      if rule and rule[0].is_enabled():
474        return rule[0].source_ranges
475      return []
476
477  @property
478  def cluster_hash(self) -> Optional[str]:
479    """Returns the "cluster hash" as used in automatic firewall rules for GKE clusters.
480    Cluster hash is the first 8 characters of cluster id.
481    See also: https://cloud.google.com/kubernetes-engine/docs/concepts/firewall-rules
482    """
483    if 'id' in self._resource_data:
484      return self._resource_data['id'][:8]
485    raise UndefinedClusterPropertyError('no id')
486
487  @property
488  def is_nodelocal_dnscache_enabled(self) -> bool:
489    """Returns True if NodeLocal DNSCache is enabled for the cluster."""
490    addons_config = self._resource_data.get('addonsConfig', {})
491    dns_cache_config = addons_config.get('dnsCacheConfig', {})
492    return dns_cache_config.get('enabled', False)
Cluster(project_id, resource_data, context: gcpdiag.models.Context)
230  def __init__(self, project_id, resource_data, context: models.Context):
231    super().__init__(project_id=project_id)
232    self._resource_data = resource_data
233    self.master_version = Version(self._resource_data['currentMasterVersion'])
234    self._nodepools = None
235    self._context = context
master_version: gcpdiag.utils.Version
full_path: str
237  @property
238  def full_path(self) -> str:
239    if utils.is_region(self._resource_data['location']):
240      return (f'projects/{self.project_id}/'
241              f'locations/{self.location}/clusters/{self.name}')
242    else:
243      return (f'projects/{self.project_id}/'
244              f'zones/{self.location}/clusters/{self.name}')

Returns the full path of this resource.

Example: 'projects/gcpdiag-gke-1-9b90/zones/europe-west4-a/clusters/gke1'

short_path: str
246  @property
247  def short_path(self) -> str:
248    path = self.full_path
249    path = re.sub(r'^projects/', '', path)
250    path = re.sub(r'/locations/', '/', path)
251    path = re.sub(r'/zones/', '/', path)
252    path = re.sub(r'/clusters/', '/', path)
253    return path

Returns the short name for this resource.

Note that it isn't clear from this name what kind of resource it is.

Example: 'gke1'

name: str
255  @property
256  def name(self) -> str:
257    return self._resource_data['name']
location: str
259  @property
260  def location(self) -> str:
261    return self._resource_data['location']
pod_ipv4_cidr: Union[ipaddress.IPv4Network, ipaddress.IPv6Network]
263  @property
264  def pod_ipv4_cidr(self) -> IPv4NetOrIPv6Net:
265    cidr = self._resource_data['clusterIpv4Cidr']
266    return ipaddress.ip_network(cidr)
current_node_count: int
268  @property
269  def current_node_count(self) -> int:
270    return self._resource_data.get('currentNodeCount', 0)
release_channel: Optional[str]
272  @property
273  def release_channel(self) -> Optional[str]:
274    try:
275      return self._resource_data['releaseChannel']['channel']
276    except KeyError:
277      return None
nap_node_image_type: Optional[str]
279  @property
280  def nap_node_image_type(self) -> Optional[str]:
281
282    return get_path(
283        self._resource_data,
284        ('autoscaling', 'autoprovisioningNodePoolDefaults', 'imageType'),
285        default=None)
app_layer_sec_key: str
287  @property
288  def app_layer_sec_key(self) -> str:
289    return self._resource_data['databaseEncryption'].get('keyName')
status: str
291  @property
292  def status(self) -> str:
293    return self._resource_data['status']
status_message: str
295  @property
296  def status_message(self) -> str:
297    return self._resource_data.get('statusMessage', None)
def has_app_layer_enc_enabled(self) -> bool:
299  def has_app_layer_enc_enabled(self) -> bool:
300    # state := 'DECRYPTED' | 'ENCRYPTED', keyName := 'full_path_to_key_resouce'
301    return get_path(self._resource_data, ('databaseEncryption', 'state'),
302                    default=None) == 'ENCRYPTED'
def has_logging_enabled(self) -> bool:
304  def has_logging_enabled(self) -> bool:
305    return self._resource_data['loggingService'] != 'none'
def enabled_logging_components(self) -> List[str]:
307  def enabled_logging_components(self) -> List[str]:
308    return self._resource_data['loggingConfig']['componentConfig'][
309        'enableComponents']
def has_monitoring_enabled(self) -> bool:
311  def has_monitoring_enabled(self) -> bool:
312    return self._resource_data['monitoringService'] != 'none'
def enabled_monitoring_components(self) -> List[str]:
314  def enabled_monitoring_components(self) -> List[str]:
315    return self._resource_data['monitoringConfig']['componentConfig'][
316        'enableComponents']
def has_authenticator_group_enabled(self) -> bool:
318  def has_authenticator_group_enabled(self) -> bool:
319    return len(self._resource_data.get('authenticatorGroupsConfig', {})) > 0
def has_workload_identity_enabled(self) -> bool:
321  def has_workload_identity_enabled(self) -> bool:
322    return len(self._resource_data.get('workloadIdentityConfig', {})) > 0
def has_http_load_balancing_enabled(self) -> bool:
324  def has_http_load_balancing_enabled(self) -> bool:
325    # HTTP load balancing needs to be enabled to use GKE ingress
326    return not (get_path(self._resource_data,
327                         ('addonsConfig', 'httpLoadBalancing', 'disabled'),
328                         default=None) is True)
def has_network_policy_enabled(self) -> bool:
330  def has_network_policy_enabled(self) -> bool:
331    # Network policy enforcement
332    return get_path(self._resource_data,
333                    ('addonsConfig', 'networkPolicyConfig', 'disabled'),
334                    default=False) is not True
def has_dpv2_enabled(self) -> bool:
336  def has_dpv2_enabled(self) -> bool:
337    # Checks whether dataplane V2 is enabled in clusters
338    return (get_path(self._resource_data, ('networkConfig', 'datapathProvider'),
339                     default=None) == 'ADVANCED_DATAPATH')
def has_intra_node_visibility_enabled(self) -> bool:
341  def has_intra_node_visibility_enabled(self) -> bool:
342    if ('networkConfig' in self._resource_data and
343        'enableIntraNodeVisibility' in self._resource_data['networkConfig']):
344      return self._resource_data['networkConfig']['enableIntraNodeVisibility']
345    return False
def has_maintenance_window(self) -> bool:
347  def has_maintenance_window(self) -> bool:
348    # 'e3b0c442' is a hexadecimal string that represents the value of an empty
349    # string ('') in cryptography. If the maintenance windows are defined, the
350    # value of 'resourceVersion' is not empty ('e3b0c442').
351    return self._resource_data['maintenancePolicy'][
352        'resourceVersion'] != 'e3b0c442'
def has_image_streaming_enabled(self) -> bool:
354  def has_image_streaming_enabled(self) -> bool:
355    """
356    Check if cluster has Image Streaming (aka  Google Container File System)
357    enabled
358    """
359    global_gcsfs = get_path(
360        self._resource_data,
361        ('nodePoolDefaults', 'nodeConfigDefaults', 'gcfsConfig', 'enabled'),
362        default=False)
363    # Check nodePoolDefaults settings
364    if global_gcsfs:
365      return True
366    for np in self.nodepools:
367      # Check if any nodepool has image streaming enabled
368      if np.has_image_streaming_enabled():
369        return True
370    return False

Check if cluster has Image Streaming (aka Google Container File System) enabled

nodepools: List[NodePool]
372  @property
373  def nodepools(self) -> List[NodePool]:
374    if self._nodepools is None:
375      self._nodepools = []
376      for n in self._resource_data.get('nodePools', []):
377        self._nodepools.append(NodePool(self, n))
378    return self._nodepools
network: gcpdiag.queries.network.Network
380  @property
381  def network(self) -> network.Network:
382    # projects/gcpdiag-gke1-aaaa/global/networks/default
383    network_string = self._resource_data['networkConfig']['network']
384    m = re.match(r'projects/([^/]+)/global/networks/([^/]+)$', network_string)
385    if not m:
386      raise RuntimeError("can't parse network string: %s" % network_string)
387    return network.get_network(m.group(1), m.group(2), self._context)
subnetwork: Optional[gcpdiag.models.Resource]
389  @property
390  def subnetwork(self) -> Optional[models.Resource]:
391    # 'projects/gcpdiag-gke1-aaaa/regions/europe-west4/subnetworks/default'
392    if 'subnetwork' not in self._resource_data['networkConfig']:
393      return None
394
395    subnetwork_string = self._resource_data['networkConfig']['subnetwork']
396    m = re.match(r'projects/([^/]+)/regions/([^/]+)/subnetworks/([^/]+)$',
397                 subnetwork_string)
398    if not m:
399      raise RuntimeError("can't parse network string: %s" % subnetwork_string)
400    return network.get_subnetwork(m.group(1), m.group(2), m.group(3))
get_subnet_name: Optional[gcpdiag.models.Resource]
402  @property
403  def get_subnet_name(self) -> Optional[models.Resource]:
404    if 'subnetwork' not in self._resource_data:
405      return None
406    return self._resource_data['subnetwork']
get_nodepool_config: Optional[gcpdiag.models.Resource]
408  @property
409  def get_nodepool_config(self) -> Optional[models.Resource]:
410    if 'nodePools' not in self._resource_data:
411      return None
412    return self._resource_data['nodePools']
get_network_string: str
414  @property
415  def get_network_string(self) -> str:
416    if 'networkConfig' not in self._resource_data:
417      return ''
418    if 'network' not in self._resource_data['networkConfig']:
419      return ''
420    return self._resource_data['networkConfig']['network']
is_private: bool
422  @property
423  def is_private(self) -> bool:
424    if not 'privateClusterConfig' in self._resource_data:
425      return False
426
427    return self._resource_data['privateClusterConfig'].get(
428        'enablePrivateNodes', False)
is_vpc_native: bool
430  @property
431  def is_vpc_native(self) -> bool:
432    return (get_path(self._resource_data,
433                     ('ipAllocationPolicy', 'useIpAliases'),
434                     default=False))
is_regional: bool
436  @property
437  def is_regional(self) -> bool:
438    return len(self._resource_data['locations']) > 1
cluster_ca_certificate: str
440  @property
441  def cluster_ca_certificate(self) -> str:
442    return self._resource_data['masterAuth']['clusterCaCertificate']
endpoint: Optional[str]
444  @property
445  def endpoint(self) -> Optional[str]:
446    if 'endpoint' not in self._resource_data:
447      return None
448    return self._resource_data['endpoint']
is_autopilot: bool
450  @property
451  def is_autopilot(self) -> bool:
452    if not 'autopilot' in self._resource_data:
453      return False
454    return self._resource_data['autopilot'].get('enabled', False)
masters_cidr_list: Iterable[Union[ipaddress.IPv4Network, ipaddress.IPv6Network]]
456  @property
457  def masters_cidr_list(self) -> Iterable[IPv4NetOrIPv6Net]:
458    if get_path(self._resource_data,
459                ('privateClusterConfig', 'masterIpv4CidrBlock'),
460                default=None):
461      return [
462          ipaddress.ip_network(self._resource_data['privateClusterConfig']
463                               ['masterIpv4CidrBlock'])
464      ]
465    else:
466      #only older clusters still have ssh firewall rules
467      if self.current_node_count and not self.cluster_hash:
468        logging.warning("couldn't retrieve cluster hash for cluster %s.",
469                        self.name)
470        return []
471      fw_rule_name = f'gke-{self.name}-{self.cluster_hash}-ssh'
472      rule = self.network.firewall.get_vpc_ingress_rules(name=fw_rule_name)
473      if rule and rule[0].is_enabled():
474        return rule[0].source_ranges
475      return []
cluster_hash: Optional[str]
477  @property
478  def cluster_hash(self) -> Optional[str]:
479    """Returns the "cluster hash" as used in automatic firewall rules for GKE clusters.
480    Cluster hash is the first 8 characters of cluster id.
481    See also: https://cloud.google.com/kubernetes-engine/docs/concepts/firewall-rules
482    """
483    if 'id' in self._resource_data:
484      return self._resource_data['id'][:8]
485    raise UndefinedClusterPropertyError('no id')

Returns the "cluster hash" as used in automatic firewall rules for GKE clusters. Cluster hash is the first 8 characters of cluster id. See also: https://cloud.google.com/kubernetes-engine/docs/concepts/firewall-rules

is_nodelocal_dnscache_enabled: bool
487  @property
488  def is_nodelocal_dnscache_enabled(self) -> bool:
489    """Returns True if NodeLocal DNSCache is enabled for the cluster."""
490    addons_config = self._resource_data.get('addonsConfig', {})
491    dns_cache_config = addons_config.get('dnsCacheConfig', {})
492    return dns_cache_config.get('enabled', False)

Returns True if NodeLocal DNSCache is enabled for the cluster.

@caching.cached_api_call
def get_clusters( context: gcpdiag.models.Context) -> Mapping[str, Cluster]:
495@caching.cached_api_call
496def get_clusters(context: models.Context) -> Mapping[str, Cluster]:
497  """Get a list of Cluster matching the given context, indexed by cluster full path."""
498  clusters: Dict[str, Cluster] = {}
499  if not apis.is_enabled(context.project_id, 'container'):
500    return clusters
501  container_api = apis.get_api('container', 'v1', context.project_id)
502  logging.debug('fetching list of GKE clusters in project %s',
503                context.project_id)
504  query = container_api.projects().locations().clusters().list(
505      parent=f'projects/{context.project_id}/locations/-')
506  try:
507    resp = query.execute(num_retries=config.API_RETRIES)
508    if 'clusters' not in resp:
509      return clusters
510    for resp_c in resp['clusters']:
511      # verify that we some minimal data that we expect
512      if 'name' not in resp_c or 'location' not in resp_c:
513        raise RuntimeError(
514            'missing data in projects.locations.clusters.list response')
515      if not context.match_project_resource(location=resp_c.get('location', ''),
516                                            labels=resp_c.get(
517                                                'resourceLabels', {}),
518                                            resource=resp_c.get('name', '')):
519        continue
520      c = Cluster(project_id=context.project_id,
521                  resource_data=resp_c,
522                  context=context)
523      clusters[c.full_path] = c
524  except googleapiclient.errors.HttpError as err:
525    raise utils.GcpApiError(err) from err
526  return clusters

Get a list of Cluster matching the given context, indexed by cluster full path.

@caching.cached_api_call
def get_cluster( project_id, cluster_id, location) -> Optional[Cluster]:
529@caching.cached_api_call
530def get_cluster(
531    project_id,
532    cluster_id,
533    location,
534) -> Union[Cluster, None]:
535  """Get a Cluster from project_id of a context."""
536  if not apis.is_enabled(project_id, 'container'):
537    return None
538  container_api = apis.get_api('container', 'v1', project_id)
539  logging.debug('fetching the GKE cluster %s in project %s', cluster_id,
540                project_id)
541  query = container_api.projects().locations().clusters().get(
542      name=f'projects/{project_id}/locations/{location}/clusters/{cluster_id}')
543  try:
544    resp = query.execute(num_retries=config.API_RETRIES)
545    if cluster_id not in str(resp):
546      raise RuntimeError(
547          'missing data in projects.locations.clusters.list response')
548  except googleapiclient.errors.HttpError as err:
549    raise utils.GcpApiError(err) from err
550  return Cluster(project_id=project_id,
551                 resource_data=resp,
552                 context=models.Context(project_id=project_id))

Get a Cluster from project_id of a context.

def get_valid_master_versions(project_id: str, location: str) -> List[str]:
567def get_valid_master_versions(project_id: str, location: str) -> List[str]:
568  """Get a list of valid GKE master versions."""
569  server_config = _get_server_config(project_id, location)
570  versions: List[str] = []
571
572  # channel versions may extend the list of all available versions.\
573  # Especially for the Rapid channel - many new versions only available in Rapid
574  # channel and not as a static version to make sure nobody stuck on that
575  # version for an extended period of time.
576  for c in server_config['channels']:
577    versions += c['validVersions']
578
579  versions += server_config['validMasterVersions']
580
581  return versions

Get a list of valid GKE master versions.

def get_valid_node_versions(project_id: str, location: str) -> List[str]:
584def get_valid_node_versions(project_id: str, location: str) -> List[str]:
585  """Get a list of valid GKE master versions."""
586  server_config = _get_server_config(project_id, location)
587  versions: List[str] = []
588
589  # See explanation in get_valid_master_versions
590  for c in server_config['channels']:
591    versions += c['validVersions']
592
593  versions += server_config['validNodeVersions']
594
595  return versions

Get a list of valid GKE master versions.

class Node(gcpdiag.models.Resource):
598class Node(models.Resource):
599  """Represents a GKE node.
600
601  This class useful for example to determine the GKE cluster when you only have
602  an GCE instance id (like from a metrics label). """
603
604  instance: gce.Instance
605  nodepool: NodePool
606  mig: gce.ManagedInstanceGroup
607
608  def __init__(self, instance, nodepool, mig):
609    super().__init__(project_id=instance.project_id)
610    self.instance = instance
611    self.nodepool = nodepool
612    self.mig = mig
613    pass
614
615  @property
616  def full_path(self) -> str:
617    return self.nodepool.cluster.full_path + '/nodes/' + self.instance.name
618
619  @property
620  def short_path(self) -> str:
621    #return self.nodepool.cluster.short_path + '/' + self.instance.name
622    return self.instance.short_path

Represents a GKE node.

This class useful for example to determine the GKE cluster when you only have an GCE instance id (like from a metrics label).

Node(instance, nodepool, mig)
608  def __init__(self, instance, nodepool, mig):
609    super().__init__(project_id=instance.project_id)
610    self.instance = instance
611    self.nodepool = nodepool
612    self.mig = mig
613    pass
nodepool: NodePool
full_path: str
615  @property
616  def full_path(self) -> str:
617    return self.nodepool.cluster.full_path + '/nodes/' + self.instance.name

Returns the full path of this resource.

Example: 'projects/gcpdiag-gke-1-9b90/zones/europe-west4-a/clusters/gke1'

short_path: str
619  @property
620  def short_path(self) -> str:
621    #return self.nodepool.cluster.short_path + '/' + self.instance.name
622    return self.instance.short_path

Returns the short name for this resource.

Note that it isn't clear from this name what kind of resource it is.

Example: 'gke1'

@functools.lru_cache()
def get_node_by_instance_id( context: gcpdiag.models.Context, instance_id: str) -> Node:
628@functools.lru_cache()
629def get_node_by_instance_id(context: models.Context, instance_id: str) -> Node:
630  """Get a gke.Node instance by instance id.
631
632  Throws a KeyError in case this instance is not found or isn't part of a GKE cluster.
633  """
634  # This will throw a KeyError if the instance is not found, which is also
635  # the behavior that we want for this function.
636  instance = gce.get_instances(context)[instance_id]
637  clusters = get_clusters(context)
638  try:
639    # instance.mig throws AttributeError if it isn't part of a mig
640    mig = instance.mig
641
642    # find a NodePool that uses this MIG
643    for c in clusters.values():
644      for np in c.nodepools:
645        for np_mig in np.instance_groups:
646          if mig == np_mig:
647            return Node(instance=instance, nodepool=np, mig=mig)
648
649    # if we didn't find a nodepool that owns this instance, raise a KeyError
650    raise KeyError('can\'t determine GKE cluster for instance %s' %
651                   (instance_id))
652
653  except AttributeError as err:
654    raise KeyError from err
655  return None

Get a gke.Node instance by instance id.

Throws a KeyError in case this instance is not found or isn't part of a GKE cluster.

@caching.cached_api_call
def get_release_schedule() -> Dict:
658@caching.cached_api_call
659def get_release_schedule() -> Dict:
660  """Extract the release schedule for gke clusters
661
662  Returns:
663    A dictionary of release schedule.
664  """
665  page_url = 'https://cloud.google.com/kubernetes-engine/docs/release-schedule'
666  release_data = {}
667  # estimate first month of the quarter
668  quarter_dates = {'Q1': '1', 'Q2': '4', 'Q3': '7', 'Q4': '10'}
669  try:
670    table = web.fetch_and_extract_table(page_url,
671                                        tag='table',
672                                        class_name='gke-release-schedule')
673
674    # Function to parse a date string or return None for 'N/A'
675    def parse_date(date_str) -> Optional[datetime.date]:
676      p = r'(?P<year>\d{4})-(?:(?P<quarter>Q[1-4])|(?P<month>[0-9]{1,2}))(?:-(?P<day>[0-9]{1,2}))?'
677      match = re.search(p, date_str)
678      # Handle incomplete dates in 'YYYY-MM' form
679      if match and match.group('month') and not match.group('day'):
680        return datetime.date.fromisoformat(f'{date_str}-15')
681      # Handle quarter year (for example, 2025-Q3) approximations that are updated when known.
682      # https://cloud.google.com/kubernetes-engine/docs/release-schedule.md#fn6
683      if match and match.group('quarter') and not match.group('day'):
684        date_str = f"{match.group('year')}-{quarter_dates[match.group('quarter')]}-01"
685        return datetime.date.fromisoformat(date_str)
686      if match and match.group('year') and match.group('month') and match.group(
687          'day'):
688        return datetime.date.fromisoformat(date_str)
689      # anything less like N/A return None
690      return None
691
692    def find_date_str_in_td(e):
693      """recursively find a date string in a td"""
694      if isinstance(e, str):
695        return e
696      if isinstance(e, bs4.element.Tag):
697        return find_date_str_in_td(e.next)
698      return None
699
700    # Find all table rows within tbody
701    rows = table.find('tbody').find_all('tr')
702
703    # Iterate over each row and extract the data
704    for row in rows:
705      # Extract all the columns (td elements)
706      cols = row.find_all('td')
707
708      # Extract relevant data
709
710      minor_version = cols[0].next.strip()
711      rapid_avail = parse_date(find_date_str_in_td(cols[1].next))
712      regular_avail = parse_date(find_date_str_in_td(cols[3].next))
713      stable_avail = parse_date(find_date_str_in_td(cols[5].next))
714      extended_avail = parse_date(find_date_str_in_td(cols[7].next))
715      end_of_standard_support = parse_date(find_date_str_in_td(cols[9].next))
716
717      # Add the extracted data into the dictionary in the desired format
718      release_data[minor_version] = {
719          'rapid_avail': rapid_avail,
720          'regular_avail': regular_avail,
721          'stable_avail': stable_avail,
722          'extended_avail': extended_avail,
723          'eol': end_of_standard_support,
724      }
725    return release_data
726  except (
727      requests.exceptions.RequestException,
728      AttributeError,
729      TypeError,
730      ValueError,
731      IndexError,
732  ) as e:
733    logging.error('Error in extracting gke release schedule: %s', e)
734    return release_data

Extract the release schedule for gke clusters

Returns:

A dictionary of release schedule.