gcpdiag.queries.gke

Queries related to GCP Kubernetes Engine clusters.
IPv4NetOrIPv6Net = typing.Union[ipaddress.IPv4Network, ipaddress.IPv6Network]
DEFAULT_MAX_PODS_PER_NODE = 110
class NodeConfig:
42class NodeConfig:
43  """Represents a GKE node pool configuration."""
44
45  def __init__(self, resource_data):
46    self._resource_data = resource_data
47
48  def has_accelerators(self) -> bool:
49    if 'accelerators' in self._resource_data:
50      return True
51    return False
52
53  @property
54  def machine_type(self) -> str:
55    return self._resource_data['machineType']
56
57  @property
58  def image_type(self) -> str:
59    return self._resource_data['imageType']
60
61  @property
62  def oauth_scopes(self) -> list:
63    return self._resource_data['oauthScopes']
64
65  @property
66  def has_serial_port_logging_enabled(self) -> bool:
67    """ Check if serial port logging is enabled in the node config.
68
69    Returns:
70      bool: True if serial port logging is enabled or not explicitly disabled.
71            False if explicitly disabled.
72    """
73    metadata = self._resource_data.get('metadata', {})
74    return metadata.get('serial-port-logging-enable', 'true').lower() == 'true'

Represents a GKE node pool configuration.

NodeConfig(resource_data)
45  def __init__(self, resource_data):
46    self._resource_data = resource_data
def has_accelerators(self) -> bool:
48  def has_accelerators(self) -> bool:
49    if 'accelerators' in self._resource_data:
50      return True
51    return False
machine_type: str
53  @property
54  def machine_type(self) -> str:
55    return self._resource_data['machineType']
image_type: str
57  @property
58  def image_type(self) -> str:
59    return self._resource_data['imageType']
oauth_scopes: list
61  @property
62  def oauth_scopes(self) -> list:
63    return self._resource_data['oauthScopes']
has_serial_port_logging_enabled: bool
65  @property
66  def has_serial_port_logging_enabled(self) -> bool:
67    """ Check if serial port logging is enabled in the node config.
68
69    Returns:
70      bool: True if serial port logging is enabled or not explicitly disabled.
71            False if explicitly disabled.
72    """
73    metadata = self._resource_data.get('metadata', {})
74    return metadata.get('serial-port-logging-enable', 'true').lower() == 'true'

Check if serial port logging is enabled in the node config.

Returns:

bool: True if serial port logging is enabled or not explicitly disabled. False if explicitly disabled.

class NodePool(gcpdiag.models.Resource):
 77class NodePool(models.Resource):
 78  """Represents a GKE node pool."""
 79
 80  version: Version
 81
 82  def __init__(self, cluster, resource_data):
 83    super().__init__(project_id=cluster.project_id)
 84    self._cluster = cluster
 85    self._resource_data = resource_data
 86    self.version = Version(self._resource_data['version'])
 87    self._migs = None
 88
 89  def _get_service_account(self) -> str:
 90    return self._resource_data.get('config', {}).get('serviceAccount', None)
 91
 92  @property
 93  def full_path(self) -> str:
 94    # https://container.googleapis.com/v1/projects/gcpdiag-gke1-aaaa/
 95    #   locations/europe-west1/clusters/gke2/nodePools/default-pool
 96    m = re.match(r'https://container.googleapis.com/v1/(.*)',
 97                 self._resource_data.get('selfLink', ''))
 98    if not m:
 99      raise RuntimeError('can\'t parse selfLink of nodepool resource')
100    return m.group(1)
101
102  @property
103  def short_path(self) -> str:
104    path = self.full_path
105    path = re.sub(r'^projects/', '', path)
106    path = re.sub(r'/locations/', '/', path)
107    path = re.sub(r'/zones/', '/', path)
108    path = re.sub(r'/clusters/', '/', path)
109    path = re.sub(r'/nodePools/', '/', path)
110    return path
111
112  @property
113  def name(self) -> str:
114    return self._resource_data['name']
115
116  @property
117  def config(self) -> NodeConfig:
118    return NodeConfig(self._resource_data['config'])
119
120  @property
121  def node_count(self) -> int:
122    return self._resource_data.get('initialNodeCount', 0)
123
124  def has_default_service_account(self) -> bool:
125    sa = self._get_service_account()
126    return sa == 'default'
127
128  def has_image_streaming_enabled(self) -> bool:
129    return get_path(self._resource_data, ('config', 'gcfsConfig', 'enabled'),
130                    default=False)
131
132  def has_md_concealment_enabled(self) -> bool:
133    # Empty ({}) workloadMetadataConfig means that 'Metadata concealment'
134    # (predecessor of Workload Identity) is enabled.
135    # https://cloud.google.com/kubernetes-engine/docs/how-to/protecting-cluster-metadata#concealment
136    return get_path(self._resource_data, ('config', 'workloadMetadataConfig'),
137                    default=None) == {}
138
139  def has_workload_identity_enabled(self) -> bool:
140    # 'Metadata concealment' (workloadMetadataConfig == {}) doesn't protect the
141    # default SA's token
142    return bool(
143        get_path(self._resource_data, ('config', 'workloadMetadataConfig'),
144                 default=None))
145
146  @property
147  def service_account(self) -> str:
148    sa = self._get_service_account()
149    if sa == 'default':
150      project_nr = crm.get_project(self.project_id).number
151      return f'{project_nr}-compute@developer.gserviceaccount.com'
152    else:
153      return sa
154
155  @property
156  def pod_ipv4_cidr_size(self) -> int:
157    return self._resource_data['podIpv4CidrSize']
158
159  @property
160  def pod_ipv4_cidr_block(self) -> Optional[IPv4NetOrIPv6Net]:
161    # Get the pod cidr range in use by the nodepool
162    pod_cidr = get_path(self._resource_data,
163                        ('networkConfig', 'podIpv4CidrBlock'),
164                        default=None)
165
166    if pod_cidr:
167      return ipaddress.ip_network(pod_cidr)
168    else:
169      return None
170
171  @property
172  def max_pod_per_node(self) -> int:
173    return int(
174        get_path(self._resource_data, ('maxPodsConstraint', 'maxPodsPerNode'),
175                 default=DEFAULT_MAX_PODS_PER_NODE))
176
177  @property
178  def cluster(self) -> 'Cluster':
179    return self._cluster
180
181  @property
182  def instance_groups(self) -> List[gce.ManagedInstanceGroup]:
183    if self._migs is None:
184      project_migs_by_selflink = {}
185      for m in gce.get_managed_instance_groups(
186          models.Context(project_id=self.project_id)).values():
187        project_migs_by_selflink[m.self_link] = m
188
189      self._migs = []
190      for url in self._resource_data.get('instanceGroupUrls', []):
191        try:
192          self._migs.append(project_migs_by_selflink[url])
193        except KeyError:
194          continue
195    return self._migs
196
197  @property
198  def node_tags(self) -> List[str]:
199    """Returns the firewall tags used for nodes in this cluster.
200
201    If the node tags can't be determined, [] is returned.
202    """
203    migs = self.instance_groups
204    if not migs:
205      return []
206    return migs[0].template.tags
207
208  def get_machine_type(self) -> str:
209    """Returns the machine type of the nodepool nodes"""
210    return self.config.machine_type

Represents a GKE node pool.

NodePool(cluster, resource_data)
82  def __init__(self, cluster, resource_data):
83    super().__init__(project_id=cluster.project_id)
84    self._cluster = cluster
85    self._resource_data = resource_data
86    self.version = Version(self._resource_data['version'])
87    self._migs = None
version: gcpdiag.utils.Version
full_path: str
 92  @property
 93  def full_path(self) -> str:
 94    # https://container.googleapis.com/v1/projects/gcpdiag-gke1-aaaa/
 95    #   locations/europe-west1/clusters/gke2/nodePools/default-pool
 96    m = re.match(r'https://container.googleapis.com/v1/(.*)',
 97                 self._resource_data.get('selfLink', ''))
 98    if not m:
 99      raise RuntimeError('can\'t parse selfLink of nodepool resource')
100    return m.group(1)

Returns the full path of this resource.

Example: 'projects/gcpdiag-gke-1-9b90/zones/europe-west4-a/clusters/gke1'

short_path: str
102  @property
103  def short_path(self) -> str:
104    path = self.full_path
105    path = re.sub(r'^projects/', '', path)
106    path = re.sub(r'/locations/', '/', path)
107    path = re.sub(r'/zones/', '/', path)
108    path = re.sub(r'/clusters/', '/', path)
109    path = re.sub(r'/nodePools/', '/', path)
110    return path

Returns the short name for this resource.

Note that it isn't clear from this name what kind of resource it is.

Example: 'gke1'

name: str
112  @property
113  def name(self) -> str:
114    return self._resource_data['name']
config: NodeConfig
116  @property
117  def config(self) -> NodeConfig:
118    return NodeConfig(self._resource_data['config'])
node_count: int
120  @property
121  def node_count(self) -> int:
122    return self._resource_data.get('initialNodeCount', 0)
def has_default_service_account(self) -> bool:
124  def has_default_service_account(self) -> bool:
125    sa = self._get_service_account()
126    return sa == 'default'
def has_image_streaming_enabled(self) -> bool:
128  def has_image_streaming_enabled(self) -> bool:
129    return get_path(self._resource_data, ('config', 'gcfsConfig', 'enabled'),
130                    default=False)
def has_md_concealment_enabled(self) -> bool:
132  def has_md_concealment_enabled(self) -> bool:
133    # Empty ({}) workloadMetadataConfig means that 'Metadata concealment'
134    # (predecessor of Workload Identity) is enabled.
135    # https://cloud.google.com/kubernetes-engine/docs/how-to/protecting-cluster-metadata#concealment
136    return get_path(self._resource_data, ('config', 'workloadMetadataConfig'),
137                    default=None) == {}
def has_workload_identity_enabled(self) -> bool:
139  def has_workload_identity_enabled(self) -> bool:
140    # 'Metadata concealment' (workloadMetadataConfig == {}) doesn't protect the
141    # default SA's token
142    return bool(
143        get_path(self._resource_data, ('config', 'workloadMetadataConfig'),
144                 default=None))
service_account: str
146  @property
147  def service_account(self) -> str:
148    sa = self._get_service_account()
149    if sa == 'default':
150      project_nr = crm.get_project(self.project_id).number
151      return f'{project_nr}-compute@developer.gserviceaccount.com'
152    else:
153      return sa
pod_ipv4_cidr_size: int
155  @property
156  def pod_ipv4_cidr_size(self) -> int:
157    return self._resource_data['podIpv4CidrSize']
pod_ipv4_cidr_block: Union[ipaddress.IPv4Network, ipaddress.IPv6Network, NoneType]
159  @property
160  def pod_ipv4_cidr_block(self) -> Optional[IPv4NetOrIPv6Net]:
161    # Get the pod cidr range in use by the nodepool
162    pod_cidr = get_path(self._resource_data,
163                        ('networkConfig', 'podIpv4CidrBlock'),
164                        default=None)
165
166    if pod_cidr:
167      return ipaddress.ip_network(pod_cidr)
168    else:
169      return None
max_pod_per_node: int
171  @property
172  def max_pod_per_node(self) -> int:
173    return int(
174        get_path(self._resource_data, ('maxPodsConstraint', 'maxPodsPerNode'),
175                 default=DEFAULT_MAX_PODS_PER_NODE))
cluster: Cluster
177  @property
178  def cluster(self) -> 'Cluster':
179    return self._cluster
instance_groups: List[gcpdiag.queries.gce.ManagedInstanceGroup]
181  @property
182  def instance_groups(self) -> List[gce.ManagedInstanceGroup]:
183    if self._migs is None:
184      project_migs_by_selflink = {}
185      for m in gce.get_managed_instance_groups(
186          models.Context(project_id=self.project_id)).values():
187        project_migs_by_selflink[m.self_link] = m
188
189      self._migs = []
190      for url in self._resource_data.get('instanceGroupUrls', []):
191        try:
192          self._migs.append(project_migs_by_selflink[url])
193        except KeyError:
194          continue
195    return self._migs
node_tags: List[str]
197  @property
198  def node_tags(self) -> List[str]:
199    """Returns the firewall tags used for nodes in this cluster.
200
201    If the node tags can't be determined, [] is returned.
202    """
203    migs = self.instance_groups
204    if not migs:
205      return []
206    return migs[0].template.tags

Returns the firewall tags used for nodes in this cluster.

If the node tags can't be determined, [] is returned.

def get_machine_type(self) -> str:
208  def get_machine_type(self) -> str:
209    """Returns the machine type of the nodepool nodes"""
210    return self.config.machine_type

Returns the machine type of the nodepool nodes

class UndefinedClusterPropertyError(builtins.Exception):
213class UndefinedClusterPropertyError(Exception):
214  """Thrown when a property of a cluster can't be determined for
215  some reason. For example, the cluster_hash can't be determined
216  because there are no nodepools defined."""
217  pass

Thrown when a property of a cluster can't be determined for some reason. For example, the cluster_hash can't be determined because there are no nodepools defined.

class Cluster(gcpdiag.models.Resource):
220class Cluster(models.Resource):
221  """Represents a GKE cluster.
222
223  https://cloud.google.com/kubernetes-engine/docs/reference/rest/v1/projects.locations.clusters#Cluster
224  """
225  _resource_data: dict
226  master_version: Version
227
228  def __init__(self, project_id, resource_data):
229    super().__init__(project_id=project_id)
230    self._resource_data = resource_data
231    self.master_version = Version(self._resource_data['currentMasterVersion'])
232    self._nodepools = None
233
234  @property
235  def full_path(self) -> str:
236    if utils.is_region(self._resource_data['location']):
237      return (f'projects/{self.project_id}/'
238              f'locations/{self.location}/clusters/{self.name}')
239    else:
240      return (f'projects/{self.project_id}/'
241              f'zones/{self.location}/clusters/{self.name}')
242
243  @property
244  def short_path(self) -> str:
245    path = self.full_path
246    path = re.sub(r'^projects/', '', path)
247    path = re.sub(r'/locations/', '/', path)
248    path = re.sub(r'/zones/', '/', path)
249    path = re.sub(r'/clusters/', '/', path)
250    return path
251
252  @property
253  def name(self) -> str:
254    return self._resource_data['name']
255
256  @property
257  def location(self) -> str:
258    return self._resource_data['location']
259
260  @property
261  def pod_ipv4_cidr(self) -> IPv4NetOrIPv6Net:
262    cidr = self._resource_data['clusterIpv4Cidr']
263    return ipaddress.ip_network(cidr)
264
265  @property
266  def current_node_count(self) -> int:
267    return self._resource_data.get('currentNodeCount', 0)
268
269  @property
270  def release_channel(self) -> Optional[str]:
271    try:
272      return self._resource_data['releaseChannel']['channel']
273    except KeyError:
274      return None
275
276  @property
277  def nap_node_image_type(self) -> Optional[str]:
278
279    return get_path(
280        self._resource_data,
281        ('autoscaling', 'autoprovisioningNodePoolDefaults', 'imageType'),
282        default=None)
283
284  @property
285  def app_layer_sec_key(self) -> str:
286    return self._resource_data['databaseEncryption'].get('keyName')
287
288  @property
289  def status(self) -> str:
290    return self._resource_data['status']
291
292  @property
293  def status_message(self) -> str:
294    return self._resource_data.get('statusMessage', None)
295
296  def has_app_layer_enc_enabled(self) -> bool:
297    # state := 'DECRYPTED' | 'ENCRYPTED', keyName := 'full_path_to_key_resouce'
298    return get_path(self._resource_data, ('databaseEncryption', 'state'),
299                    default=None) == 'ENCRYPTED'
300
301  def has_logging_enabled(self) -> bool:
302    return self._resource_data['loggingService'] != 'none'
303
304  def enabled_logging_components(self) -> List[str]:
305    return self._resource_data['loggingConfig']['componentConfig'][
306        'enableComponents']
307
308  def has_monitoring_enabled(self) -> bool:
309    return self._resource_data['monitoringService'] != 'none'
310
311  def enabled_monitoring_components(self) -> List[str]:
312    return self._resource_data['monitoringConfig']['componentConfig'][
313        'enableComponents']
314
315  def has_authenticator_group_enabled(self) -> bool:
316    return len(self._resource_data.get('authenticatorGroupsConfig', {})) > 0
317
318  def has_workload_identity_enabled(self) -> bool:
319    return len(self._resource_data.get('workloadIdentityConfig', {})) > 0
320
321  def has_http_load_balancing_enabled(self) -> bool:
322    # HTTP load balancing needs to be enabled to use GKE ingress
323    return not (get_path(self._resource_data,
324                         ('addonsConfig', 'httpLoadBalancing', 'disabled'),
325                         default=None) is True)
326
327  def has_network_policy_enabled(self) -> bool:
328    # Network policy enforcement
329    return get_path(self._resource_data,
330                    ('addonsConfig', 'networkPolicyConfig', 'disabled'),
331                    default=False) is not True
332
333  def has_dpv2_enabled(self) -> bool:
334    # Checks whether dataplane V2 is enabled in clusters
335    return (get_path(self._resource_data, ('networkConfig', 'datapathProvider'),
336                     default=None) == 'ADVANCED_DATAPATH')
337
338  def has_intra_node_visibility_enabled(self) -> bool:
339    if ('networkConfig' in self._resource_data and
340        'enableIntraNodeVisibility' in self._resource_data['networkConfig']):
341      return self._resource_data['networkConfig']['enableIntraNodeVisibility']
342    return False
343
344  def has_maintenance_window(self) -> bool:
345    # 'e3b0c442' is a hexadecimal string that represents the value of an empty
346    # string ('') in cryptography. If the maintenance windows are defined, the
347    # value of 'resourceVersion' is not empty ('e3b0c442').
348    return self._resource_data['maintenancePolicy'][
349        'resourceVersion'] != 'e3b0c442'
350
351  def has_image_streaming_enabled(self) -> bool:
352    """
353    Check if cluster has Image Streaming (aka  Google Container File System)
354    enabled
355    """
356    global_gcsfs = get_path(
357        self._resource_data,
358        ('nodePoolDefaults', 'nodeConfigDefaults', 'gcfsConfig', 'enabled'),
359        default=False)
360    # Check nodePoolDefaults settings
361    if global_gcsfs:
362      return True
363    for np in self.nodepools:
364      # Check if any nodepool has image streaming enabled
365      if np.has_image_streaming_enabled():
366        return True
367    return False
368
369  @property
370  def nodepools(self) -> Iterable[NodePool]:
371    if self._nodepools is None:
372      self._nodepools = []
373      for n in self._resource_data.get('nodePools', []):
374        self._nodepools.append(NodePool(self, n))
375    return self._nodepools
376
377  @property
378  def network(self) -> network.Network:
379    # projects/gcpdiag-gke1-aaaa/global/networks/default
380    network_string = self._resource_data['networkConfig']['network']
381    m = re.match(r'projects/([^/]+)/global/networks/([^/]+)$', network_string)
382    if not m:
383      raise RuntimeError("can't parse network string: %s" % network_string)
384    return network.get_network(m.group(1), m.group(2))
385
386  @property
387  def subnetwork(self) -> Optional[models.Resource]:
388    # 'projects/gcpdiag-gke1-aaaa/regions/europe-west4/subnetworks/default'
389    if 'subnetwork' not in self._resource_data['networkConfig']:
390      return None
391
392    subnetwork_string = self._resource_data['networkConfig']['subnetwork']
393    m = re.match(r'projects/([^/]+)/regions/([^/]+)/subnetworks/([^/]+)$',
394                 subnetwork_string)
395    if not m:
396      raise RuntimeError("can't parse network string: %s" % subnetwork_string)
397    return network.get_subnetwork(m.group(1), m.group(2), m.group(3))
398
399  @property
400  def get_subnet_name(self) -> Optional[models.Resource]:
401    if 'subnetwork' not in self._resource_data:
402      return None
403    return self._resource_data['subnetwork']
404
405  @property
406  def get_nodepool_config(self) -> Optional[models.Resource]:
407    if 'nodePools' not in self._resource_data:
408      return None
409    return self._resource_data['nodePools']
410
411  @property
412  def get_network_string(self) -> str:
413    if 'networkConfig' not in self._resource_data:
414      return ''
415    if 'network' not in self._resource_data['networkConfig']:
416      return ''
417    return self._resource_data['networkConfig']['network']
418
419  @property
420  def is_private(self) -> bool:
421    if not 'privateClusterConfig' in self._resource_data:
422      return False
423
424    return self._resource_data['privateClusterConfig'].get(
425        'enablePrivateNodes', False)
426
427  @property
428  def is_vpc_native(self) -> bool:
429    return (get_path(self._resource_data,
430                     ('ipAllocationPolicy', 'useIpAliases'),
431                     default=False))
432
433  @property
434  def is_regional(self) -> bool:
435    return len(self._resource_data['locations']) > 1
436
437  @property
438  def cluster_ca_certificate(self) -> str:
439    return self._resource_data['masterAuth']['clusterCaCertificate']
440
441  @property
442  def endpoint(self) -> Optional[str]:
443    if 'endpoint' not in self._resource_data:
444      return None
445    return self._resource_data['endpoint']
446
447  @property
448  def is_autopilot(self) -> bool:
449    if not 'autopilot' in self._resource_data:
450      return False
451    return self._resource_data['autopilot'].get('enabled', False)
452
453  @property
454  def masters_cidr_list(self) -> Iterable[IPv4NetOrIPv6Net]:
455    if get_path(self._resource_data,
456                ('privateClusterConfig', 'masterIpv4CidrBlock'),
457                default=None):
458      return [
459          ipaddress.ip_network(self._resource_data['privateClusterConfig']
460                               ['masterIpv4CidrBlock'])
461      ]
462    else:
463      #only older clusters still have ssh firewall rules
464      if self.current_node_count and not self.cluster_hash:
465        logging.warning("couldn't retrieve cluster hash for cluster %s.",
466                        self.name)
467        return []
468      fw_rule_name = f'gke-{self.name}-{self.cluster_hash}-ssh'
469      rule = self.network.firewall.get_vpc_ingress_rules(name=fw_rule_name)
470      if rule and rule[0].is_enabled():
471        return rule[0].source_ranges
472      return []
473
474  @property
475  def cluster_hash(self) -> Optional[str]:
476    """Returns the "cluster hash" as used in automatic firewall rules for GKE clusters.
477    Cluster hash is the first 8 characters of cluster id.
478    See also: https://cloud.google.com/kubernetes-engine/docs/concepts/firewall-rules
479    """
480    if 'id' in self._resource_data:
481      return self._resource_data['id'][:8]
482    raise UndefinedClusterPropertyError('no id')
Cluster(project_id, resource_data)
228  def __init__(self, project_id, resource_data):
229    super().__init__(project_id=project_id)
230    self._resource_data = resource_data
231    self.master_version = Version(self._resource_data['currentMasterVersion'])
232    self._nodepools = None
master_version: gcpdiag.utils.Version
full_path: str
234  @property
235  def full_path(self) -> str:
236    if utils.is_region(self._resource_data['location']):
237      return (f'projects/{self.project_id}/'
238              f'locations/{self.location}/clusters/{self.name}')
239    else:
240      return (f'projects/{self.project_id}/'
241              f'zones/{self.location}/clusters/{self.name}')

Returns the full path of this resource.

Example: 'projects/gcpdiag-gke-1-9b90/zones/europe-west4-a/clusters/gke1'

short_path: str
243  @property
244  def short_path(self) -> str:
245    path = self.full_path
246    path = re.sub(r'^projects/', '', path)
247    path = re.sub(r'/locations/', '/', path)
248    path = re.sub(r'/zones/', '/', path)
249    path = re.sub(r'/clusters/', '/', path)
250    return path

Returns the short name for this resource.

Note that it isn't clear from this name what kind of resource it is.

Example: 'gke1'

name: str
252  @property
253  def name(self) -> str:
254    return self._resource_data['name']
location: str
256  @property
257  def location(self) -> str:
258    return self._resource_data['location']
pod_ipv4_cidr: Union[ipaddress.IPv4Network, ipaddress.IPv6Network]
260  @property
261  def pod_ipv4_cidr(self) -> IPv4NetOrIPv6Net:
262    cidr = self._resource_data['clusterIpv4Cidr']
263    return ipaddress.ip_network(cidr)
current_node_count: int
265  @property
266  def current_node_count(self) -> int:
267    return self._resource_data.get('currentNodeCount', 0)
release_channel: Optional[str]
269  @property
270  def release_channel(self) -> Optional[str]:
271    try:
272      return self._resource_data['releaseChannel']['channel']
273    except KeyError:
274      return None
nap_node_image_type: Optional[str]
276  @property
277  def nap_node_image_type(self) -> Optional[str]:
278
279    return get_path(
280        self._resource_data,
281        ('autoscaling', 'autoprovisioningNodePoolDefaults', 'imageType'),
282        default=None)
app_layer_sec_key: str
284  @property
285  def app_layer_sec_key(self) -> str:
286    return self._resource_data['databaseEncryption'].get('keyName')
status: str
288  @property
289  def status(self) -> str:
290    return self._resource_data['status']
status_message: str
292  @property
293  def status_message(self) -> str:
294    return self._resource_data.get('statusMessage', None)
def has_app_layer_enc_enabled(self) -> bool:
296  def has_app_layer_enc_enabled(self) -> bool:
297    # state := 'DECRYPTED' | 'ENCRYPTED', keyName := 'full_path_to_key_resouce'
298    return get_path(self._resource_data, ('databaseEncryption', 'state'),
299                    default=None) == 'ENCRYPTED'
def has_logging_enabled(self) -> bool:
301  def has_logging_enabled(self) -> bool:
302    return self._resource_data['loggingService'] != 'none'
def enabled_logging_components(self) -> List[str]:
304  def enabled_logging_components(self) -> List[str]:
305    return self._resource_data['loggingConfig']['componentConfig'][
306        'enableComponents']
def has_monitoring_enabled(self) -> bool:
308  def has_monitoring_enabled(self) -> bool:
309    return self._resource_data['monitoringService'] != 'none'
def enabled_monitoring_components(self) -> List[str]:
311  def enabled_monitoring_components(self) -> List[str]:
312    return self._resource_data['monitoringConfig']['componentConfig'][
313        'enableComponents']
def has_authenticator_group_enabled(self) -> bool:
315  def has_authenticator_group_enabled(self) -> bool:
316    return len(self._resource_data.get('authenticatorGroupsConfig', {})) > 0
def has_workload_identity_enabled(self) -> bool:
318  def has_workload_identity_enabled(self) -> bool:
319    return len(self._resource_data.get('workloadIdentityConfig', {})) > 0
def has_http_load_balancing_enabled(self) -> bool:
321  def has_http_load_balancing_enabled(self) -> bool:
322    # HTTP load balancing needs to be enabled to use GKE ingress
323    return not (get_path(self._resource_data,
324                         ('addonsConfig', 'httpLoadBalancing', 'disabled'),
325                         default=None) is True)
def has_network_policy_enabled(self) -> bool:
327  def has_network_policy_enabled(self) -> bool:
328    # Network policy enforcement
329    return get_path(self._resource_data,
330                    ('addonsConfig', 'networkPolicyConfig', 'disabled'),
331                    default=False) is not True
def has_dpv2_enabled(self) -> bool:
333  def has_dpv2_enabled(self) -> bool:
334    # Checks whether dataplane V2 is enabled in clusters
335    return (get_path(self._resource_data, ('networkConfig', 'datapathProvider'),
336                     default=None) == 'ADVANCED_DATAPATH')
def has_intra_node_visibility_enabled(self) -> bool:
338  def has_intra_node_visibility_enabled(self) -> bool:
339    if ('networkConfig' in self._resource_data and
340        'enableIntraNodeVisibility' in self._resource_data['networkConfig']):
341      return self._resource_data['networkConfig']['enableIntraNodeVisibility']
342    return False
def has_maintenance_window(self) -> bool:
344  def has_maintenance_window(self) -> bool:
345    # 'e3b0c442' is a hexadecimal string that represents the value of an empty
346    # string ('') in cryptography. If the maintenance windows are defined, the
347    # value of 'resourceVersion' is not empty ('e3b0c442').
348    return self._resource_data['maintenancePolicy'][
349        'resourceVersion'] != 'e3b0c442'
def has_image_streaming_enabled(self) -> bool:
351  def has_image_streaming_enabled(self) -> bool:
352    """
353    Check if cluster has Image Streaming (aka  Google Container File System)
354    enabled
355    """
356    global_gcsfs = get_path(
357        self._resource_data,
358        ('nodePoolDefaults', 'nodeConfigDefaults', 'gcfsConfig', 'enabled'),
359        default=False)
360    # Check nodePoolDefaults settings
361    if global_gcsfs:
362      return True
363    for np in self.nodepools:
364      # Check if any nodepool has image streaming enabled
365      if np.has_image_streaming_enabled():
366        return True
367    return False

Check if cluster has Image Streaming (aka Google Container File System) enabled

nodepools: Iterable[NodePool]
369  @property
370  def nodepools(self) -> Iterable[NodePool]:
371    if self._nodepools is None:
372      self._nodepools = []
373      for n in self._resource_data.get('nodePools', []):
374        self._nodepools.append(NodePool(self, n))
375    return self._nodepools
network: gcpdiag.queries.network.Network
377  @property
378  def network(self) -> network.Network:
379    # projects/gcpdiag-gke1-aaaa/global/networks/default
380    network_string = self._resource_data['networkConfig']['network']
381    m = re.match(r'projects/([^/]+)/global/networks/([^/]+)$', network_string)
382    if not m:
383      raise RuntimeError("can't parse network string: %s" % network_string)
384    return network.get_network(m.group(1), m.group(2))
subnetwork: Optional[gcpdiag.models.Resource]
386  @property
387  def subnetwork(self) -> Optional[models.Resource]:
388    # 'projects/gcpdiag-gke1-aaaa/regions/europe-west4/subnetworks/default'
389    if 'subnetwork' not in self._resource_data['networkConfig']:
390      return None
391
392    subnetwork_string = self._resource_data['networkConfig']['subnetwork']
393    m = re.match(r'projects/([^/]+)/regions/([^/]+)/subnetworks/([^/]+)$',
394                 subnetwork_string)
395    if not m:
396      raise RuntimeError("can't parse network string: %s" % subnetwork_string)
397    return network.get_subnetwork(m.group(1), m.group(2), m.group(3))
get_subnet_name: Optional[gcpdiag.models.Resource]
399  @property
400  def get_subnet_name(self) -> Optional[models.Resource]:
401    if 'subnetwork' not in self._resource_data:
402      return None
403    return self._resource_data['subnetwork']
get_nodepool_config: Optional[gcpdiag.models.Resource]
405  @property
406  def get_nodepool_config(self) -> Optional[models.Resource]:
407    if 'nodePools' not in self._resource_data:
408      return None
409    return self._resource_data['nodePools']
get_network_string: str
411  @property
412  def get_network_string(self) -> str:
413    if 'networkConfig' not in self._resource_data:
414      return ''
415    if 'network' not in self._resource_data['networkConfig']:
416      return ''
417    return self._resource_data['networkConfig']['network']
is_private: bool
419  @property
420  def is_private(self) -> bool:
421    if not 'privateClusterConfig' in self._resource_data:
422      return False
423
424    return self._resource_data['privateClusterConfig'].get(
425        'enablePrivateNodes', False)
is_vpc_native: bool
427  @property
428  def is_vpc_native(self) -> bool:
429    return (get_path(self._resource_data,
430                     ('ipAllocationPolicy', 'useIpAliases'),
431                     default=False))
is_regional: bool
433  @property
434  def is_regional(self) -> bool:
435    return len(self._resource_data['locations']) > 1
cluster_ca_certificate: str
437  @property
438  def cluster_ca_certificate(self) -> str:
439    return self._resource_data['masterAuth']['clusterCaCertificate']
endpoint: Optional[str]
441  @property
442  def endpoint(self) -> Optional[str]:
443    if 'endpoint' not in self._resource_data:
444      return None
445    return self._resource_data['endpoint']
is_autopilot: bool
447  @property
448  def is_autopilot(self) -> bool:
449    if not 'autopilot' in self._resource_data:
450      return False
451    return self._resource_data['autopilot'].get('enabled', False)
masters_cidr_list: Iterable[Union[ipaddress.IPv4Network, ipaddress.IPv6Network]]
453  @property
454  def masters_cidr_list(self) -> Iterable[IPv4NetOrIPv6Net]:
455    if get_path(self._resource_data,
456                ('privateClusterConfig', 'masterIpv4CidrBlock'),
457                default=None):
458      return [
459          ipaddress.ip_network(self._resource_data['privateClusterConfig']
460                               ['masterIpv4CidrBlock'])
461      ]
462    else:
463      #only older clusters still have ssh firewall rules
464      if self.current_node_count and not self.cluster_hash:
465        logging.warning("couldn't retrieve cluster hash for cluster %s.",
466                        self.name)
467        return []
468      fw_rule_name = f'gke-{self.name}-{self.cluster_hash}-ssh'
469      rule = self.network.firewall.get_vpc_ingress_rules(name=fw_rule_name)
470      if rule and rule[0].is_enabled():
471        return rule[0].source_ranges
472      return []
cluster_hash: Optional[str]
474  @property
475  def cluster_hash(self) -> Optional[str]:
476    """Returns the "cluster hash" as used in automatic firewall rules for GKE clusters.
477    Cluster hash is the first 8 characters of cluster id.
478    See also: https://cloud.google.com/kubernetes-engine/docs/concepts/firewall-rules
479    """
480    if 'id' in self._resource_data:
481      return self._resource_data['id'][:8]
482    raise UndefinedClusterPropertyError('no id')

Returns the "cluster hash" as used in automatic firewall rules for GKE clusters. Cluster hash is the first 8 characters of cluster id. See also: https://cloud.google.com/kubernetes-engine/docs/concepts/firewall-rules

@caching.cached_api_call
def get_clusters( context: gcpdiag.models.Context) -> Mapping[str, Cluster]:
485@caching.cached_api_call
486def get_clusters(context: models.Context) -> Mapping[str, Cluster]:
487  """Get a list of Cluster matching the given context, indexed by cluster full path."""
488  clusters: Dict[str, Cluster] = {}
489  if not apis.is_enabled(context.project_id, 'container'):
490    return clusters
491  container_api = apis.get_api('container', 'v1', context.project_id)
492  logging.info('fetching list of GKE clusters in project %s',
493               context.project_id)
494  query = container_api.projects().locations().clusters().list(
495      parent=f'projects/{context.project_id}/locations/-')
496  try:
497    resp = query.execute(num_retries=config.API_RETRIES)
498    if 'clusters' not in resp:
499      return clusters
500    for resp_c in resp['clusters']:
501      # verify that we some minimal data that we expect
502      if 'name' not in resp_c or 'location' not in resp_c:
503        raise RuntimeError(
504            'missing data in projects.locations.clusters.list response')
505      if not context.match_project_resource(location=resp_c.get('location', ''),
506                                            labels=resp_c.get(
507                                                'resourceLabels', {}),
508                                            resource=resp_c.get('name', '')):
509        continue
510      c = Cluster(project_id=context.project_id, resource_data=resp_c)
511      clusters[c.full_path] = c
512  except googleapiclient.errors.HttpError as err:
513    raise utils.GcpApiError(err) from err
514  return clusters

Get a list of Cluster matching the given context, indexed by cluster full path.

@caching.cached_api_call
def get_cluster( project_id, cluster_id, location) -> Optional[Cluster]:
517@caching.cached_api_call
518def get_cluster(project_id, cluster_id, location) -> Union[Cluster, None]:
519  """Get a Cluster from project_id of a context."""
520  if not apis.is_enabled(project_id, 'container'):
521    return None
522  container_api = apis.get_api('container', 'v1', project_id)
523  logging.info('fetching the GKE cluster %s in project %s', cluster_id,
524               project_id)
525  query = container_api.projects().locations().clusters().get(
526      name=f'projects/{project_id}/locations/{location}/clusters/{cluster_id}')
527  try:
528    resp = query.execute(num_retries=config.API_RETRIES)
529    if cluster_id not in str(resp):
530      raise RuntimeError(
531          'missing data in projects.locations.clusters.list response')
532  except googleapiclient.errors.HttpError as err:
533    raise utils.GcpApiError(err) from err
534  return Cluster(project_id=project_id, resource_data=resp)

Get a Cluster from project_id of a context.

def get_valid_master_versions(project_id: str, location: str) -> List[str]:
549def get_valid_master_versions(project_id: str, location: str) -> List[str]:
550  """Get a list of valid GKE master versions."""
551  server_config = _get_server_config(project_id, location)
552  versions: List[str] = []
553
554  # channel versions may extend the list of all available versions.\
555  # Especially for the Rapid channel - many new versions only available in Rapid
556  # channel and not as a static version to make sure nobody stuck on that
557  # version for an extended period of time.
558  for c in server_config['channels']:
559    versions += c['validVersions']
560
561  versions += server_config['validMasterVersions']
562
563  return versions

Get a list of valid GKE master versions.

def get_valid_node_versions(project_id: str, location: str) -> List[str]:
566def get_valid_node_versions(project_id: str, location: str) -> List[str]:
567  """Get a list of valid GKE master versions."""
568  server_config = _get_server_config(project_id, location)
569  versions: List[str] = []
570
571  # See explanation in get_valid_master_versions
572  for c in server_config['channels']:
573    versions += c['validVersions']
574
575  versions += server_config['validNodeVersions']
576
577  return versions

Get a list of valid GKE master versions.

class Node(gcpdiag.models.Resource):
580class Node(models.Resource):
581  """Represents a GKE node.
582
583  This class useful for example to determine the GKE cluster when you only have
584  an GCE instance id (like from a metrics label). """
585
586  instance: gce.Instance
587  nodepool: NodePool
588  mig: gce.ManagedInstanceGroup
589
590  def __init__(self, instance, nodepool, mig):
591    super().__init__(project_id=instance.project_id)
592    self.instance = instance
593    self.nodepool = nodepool
594    self.mig = mig
595    pass
596
597  @property
598  def full_path(self) -> str:
599    return self.nodepool.cluster.full_path + '/nodes/' + self.instance.name
600
601  @property
602  def short_path(self) -> str:
603    #return self.nodepool.cluster.short_path + '/' + self.instance.name
604    return self.instance.short_path

Represents a GKE node.

This class useful for example to determine the GKE cluster when you only have an GCE instance id (like from a metrics label).

Node(instance, nodepool, mig)
590  def __init__(self, instance, nodepool, mig):
591    super().__init__(project_id=instance.project_id)
592    self.instance = instance
593    self.nodepool = nodepool
594    self.mig = mig
595    pass
nodepool: NodePool
full_path: str
597  @property
598  def full_path(self) -> str:
599    return self.nodepool.cluster.full_path + '/nodes/' + self.instance.name

Returns the full path of this resource.

Example: 'projects/gcpdiag-gke-1-9b90/zones/europe-west4-a/clusters/gke1'

short_path: str
601  @property
602  def short_path(self) -> str:
603    #return self.nodepool.cluster.short_path + '/' + self.instance.name
604    return self.instance.short_path

Returns the short name for this resource.

Note that it isn't clear from this name what kind of resource it is.

Example: 'gke1'

@functools.lru_cache()
def get_node_by_instance_id( context: gcpdiag.models.Context, instance_id: str) -> Node:
610@functools.lru_cache()
611def get_node_by_instance_id(context: models.Context, instance_id: str) -> Node:
612  """Get a gke.Node instance by instance id.
613
614  Throws a KeyError in case this instance is not found or isn't part of a GKE cluster.
615  """
616  # This will throw a KeyError if the instance is not found, which is also
617  # the behavior that we want for this function.
618  instance = gce.get_instances(context)[instance_id]
619  clusters = get_clusters(context)
620  try:
621    # instance.mig throws AttributeError if it isn't part of a mig
622    mig = instance.mig
623
624    # find a NodePool that uses this MIG
625    for c in clusters.values():
626      for np in c.nodepools:
627        for np_mig in np.instance_groups:
628          if mig == np_mig:
629            return Node(instance=instance, nodepool=np, mig=mig)
630
631    # if we didn't find a nodepool that owns this instance, raise a KeyError
632    raise KeyError('can\'t determine GKE cluster for instance %s' %
633                   (instance_id))
634
635  except AttributeError as err:
636    raise KeyError from err
637  return None

Get a gke.Node instance by instance id.

Throws a KeyError in case this instance is not found or isn't part of a GKE cluster.

@caching.cached_api_call
def get_release_schedule() -> Dict:
640@caching.cached_api_call
641def get_release_schedule() -> Dict:
642  """Extract the release schedule for gke clusters
643
644  Returns:
645    A dictionary of release schedule.
646  """
647  page_url = 'https://cloud.google.com/kubernetes-engine/docs/release-schedule'
648  release_data = {}
649  # estimate first month of the quarter
650  quarter_dates = {'Q1': '1', 'Q2': '4', 'Q3': '7', 'Q4': '10'}
651  try:
652    table = web.fetch_and_extract_table(page_url,
653                                        tag='table',
654                                        class_name='gke-release-schedule')
655
656    # Function to parse a date string or return None for 'N/A'
657    def parse_date(date_str) -> Optional[datetime.date]:
658      p = r'(?P<year>\d{4})-(?:(?P<quarter>Q[1-4])|(?P<month>[0-9]{1,2}))(?:-(?P<day>[0-9]{1,2}))?'
659      match = re.search(p, date_str)
660      # Handle incomplete dates in 'YYYY-MM' form
661      if match and match.group('month') and not match.group('day'):
662        return datetime.date.fromisoformat(f'{date_str}-15')
663      # Handle quarter year (for example, 2025-Q3) approximations that are updated when known.
664      # https://cloud.google.com/kubernetes-engine/docs/release-schedule.md#fn6
665      if match and match.group('quarter') and not match.group('day'):
666        date_str = f"{match.group('year')}-{quarter_dates[match.group('quarter')]}-01"
667        return datetime.date.fromisoformat(date_str)
668      if match and match.group('year') and match.group('month') and match.group(
669          'day'):
670        return datetime.date.fromisoformat(date_str)
671      # anything less like N/A return None
672      return None
673
674    def find_date_str_in_td(e):
675      """recursively find a date string in a td"""
676      if isinstance(e, str):
677        return e
678      if isinstance(e, bs4.element.Tag):
679        return find_date_str_in_td(e.next)
680      return None
681
682    # Find all table rows within tbody
683    rows = table.find('tbody').find_all('tr')
684
685    # Iterate over each row and extract the data
686    for row in rows:
687      # Extract all the columns (td elements)
688      cols = row.find_all('td')
689
690      # Extract relevant data
691
692      minor_version = cols[0].next.strip()
693      rapid_avail = parse_date(find_date_str_in_td(cols[1].next))
694      regular_avail = parse_date(find_date_str_in_td(cols[3].next))
695      stable_avail = parse_date(find_date_str_in_td(cols[5].next))
696      extended_avail = parse_date(find_date_str_in_td(cols[7].next))
697      end_of_standard_support = parse_date(find_date_str_in_td(cols[9].next))
698
699      # Add the extracted data into the dictionary in the desired format
700      release_data[minor_version] = {
701          'rapid_avail': rapid_avail,
702          'regular_avail': regular_avail,
703          'stable_avail': stable_avail,
704          'extended_avail': extended_avail,
705          'eol': end_of_standard_support,
706      }
707    return release_data
708  except (
709      requests.exceptions.RequestException,
710      AttributeError,
711      TypeError,
712      ValueError,
713      IndexError,
714  ) as e:
715    logging.error('Error in extracting gke release schedule: %s', e)
716    return release_data

Extract the release schedule for gke clusters

Returns:

A dictionary of release schedule.