gcpdiag.queries.gke

Queries related to GCP Kubernetes Engine clusters.
IPv4NetOrIPv6Net = typing.Union[ipaddress.IPv4Network, ipaddress.IPv6Network]
DEFAULT_MAX_PODS_PER_NODE = 110
class NodeConfig:
41class NodeConfig:
42  """Represents a GKE node pool configuration."""
43
44  def __init__(self, resource_data):
45    self._resource_data = resource_data
46
47  def has_accelerators(self) -> bool:
48    if 'accelerators' in self._resource_data:
49      return True
50    return False
51
52  @property
53  def machine_type(self) -> str:
54    return self._resource_data['machineType']
55
56  @property
57  def image_type(self) -> str:
58    return self._resource_data['imageType']
59
60  @property
61  def oauth_scopes(self) -> list:
62    return self._resource_data['oauthScopes']

Represents a GKE node pool configuration.

NodeConfig(resource_data)
44  def __init__(self, resource_data):
45    self._resource_data = resource_data
def has_accelerators(self) -> bool:
47  def has_accelerators(self) -> bool:
48    if 'accelerators' in self._resource_data:
49      return True
50    return False
machine_type: str
52  @property
53  def machine_type(self) -> str:
54    return self._resource_data['machineType']
image_type: str
56  @property
57  def image_type(self) -> str:
58    return self._resource_data['imageType']
oauth_scopes: list
60  @property
61  def oauth_scopes(self) -> list:
62    return self._resource_data['oauthScopes']
class NodePool(gcpdiag.models.Resource):
 65class NodePool(models.Resource):
 66  """Represents a GKE node pool."""
 67
 68  version: Version
 69
 70  def __init__(self, cluster, resource_data):
 71    super().__init__(project_id=cluster.project_id)
 72    self._cluster = cluster
 73    self._resource_data = resource_data
 74    self.version = Version(self._resource_data['version'])
 75    self._migs = None
 76
 77  def _get_service_account(self) -> str:
 78    return self._resource_data.get('config', {}).get('serviceAccount', None)
 79
 80  @property
 81  def full_path(self) -> str:
 82    # https://container.googleapis.com/v1/projects/gcpdiag-gke1-aaaa/
 83    #   locations/europe-west1/clusters/gke2/nodePools/default-pool
 84    m = re.match(r'https://container.googleapis.com/v1/(.*)',
 85                 self._resource_data.get('selfLink', ''))
 86    if not m:
 87      raise RuntimeError('can\'t parse selfLink of nodepool resource')
 88    return m.group(1)
 89
 90  @property
 91  def short_path(self) -> str:
 92    path = self.full_path
 93    path = re.sub(r'^projects/', '', path)
 94    path = re.sub(r'/locations/', '/', path)
 95    path = re.sub(r'/zones/', '/', path)
 96    path = re.sub(r'/clusters/', '/', path)
 97    path = re.sub(r'/nodePools/', '/', path)
 98    return path
 99
100  @property
101  def name(self) -> str:
102    return self._resource_data['name']
103
104  @property
105  def config(self) -> NodeConfig:
106    return NodeConfig(self._resource_data['config'])
107
108  @property
109  def node_count(self) -> int:
110    return self._resource_data.get('initialNodeCount', 0)
111
112  def has_default_service_account(self) -> bool:
113    sa = self._get_service_account()
114    return sa == 'default'
115
116  def has_image_streaming_enabled(self) -> bool:
117    return get_path(self._resource_data, ('config', 'gcfsConfig', 'enabled'),
118                    default=False)
119
120  def has_md_concealment_enabled(self) -> bool:
121    # Empty ({}) workloadMetadataConfig means that 'Metadata concealment'
122    # (predecessor of Workload Identity) is enabled.
123    # https://cloud.google.com/kubernetes-engine/docs/how-to/protecting-cluster-metadata#concealment
124    return get_path(self._resource_data, ('config', 'workloadMetadataConfig'),
125                    default=None) == {}
126
127  def has_workload_identity_enabled(self) -> bool:
128    # 'Metadata concealment' (workloadMetadataConfig == {}) doesn't protect the
129    # default SA's token
130    return bool(
131        get_path(self._resource_data, ('config', 'workloadMetadataConfig'),
132                 default=None))
133
134  @property
135  def service_account(self) -> str:
136    sa = self._get_service_account()
137    if sa == 'default':
138      project_nr = crm.get_project(self.project_id).number
139      return f'{project_nr}-compute@developer.gserviceaccount.com'
140    else:
141      return sa
142
143  @property
144  def pod_ipv4_cidr_size(self) -> int:
145    return self._resource_data['podIpv4CidrSize']
146
147  @property
148  def pod_ipv4_cidr_block(self) -> Optional[IPv4NetOrIPv6Net]:
149    # Get the pod cidr range in use by the nodepool
150    pod_cidr = get_path(self._resource_data,
151                        ('networkConfig', 'podIpv4CidrBlock'),
152                        default=None)
153
154    if pod_cidr:
155      return ipaddress.ip_network(pod_cidr)
156    else:
157      return None
158
159  @property
160  def max_pod_per_node(self) -> int:
161    return int(
162        get_path(self._resource_data, ('maxPodsConstraint', 'maxPodsPerNode'),
163                 default=DEFAULT_MAX_PODS_PER_NODE))
164
165  @property
166  def cluster(self) -> 'Cluster':
167    return self._cluster
168
169  @property
170  def instance_groups(self) -> List[gce.ManagedInstanceGroup]:
171    if self._migs is None:
172      project_migs_by_selflink = {}
173      for m in gce.get_managed_instance_groups(
174          models.Context(project_id=self.project_id)).values():
175        project_migs_by_selflink[m.self_link] = m
176
177      self._migs = []
178      for url in self._resource_data.get('instanceGroupUrls', []):
179        try:
180          self._migs.append(project_migs_by_selflink[url])
181        except KeyError:
182          continue
183    return self._migs
184
185  @property
186  def node_tags(self) -> List[str]:
187    """Returns the firewall tags used for nodes in this cluster.
188
189    If the node tags can't be determined, [] is returned.
190    """
191    migs = self.instance_groups
192    if not migs:
193      return []
194    return migs[0].template.tags
195
196  def get_machine_type(self) -> str:
197    """Returns the machine type of the nodepool nodes"""
198    return self.config.machine_type

Represents a GKE node pool.

NodePool(cluster, resource_data)
70  def __init__(self, cluster, resource_data):
71    super().__init__(project_id=cluster.project_id)
72    self._cluster = cluster
73    self._resource_data = resource_data
74    self.version = Version(self._resource_data['version'])
75    self._migs = None
version: gcpdiag.utils.Version
full_path: str
80  @property
81  def full_path(self) -> str:
82    # https://container.googleapis.com/v1/projects/gcpdiag-gke1-aaaa/
83    #   locations/europe-west1/clusters/gke2/nodePools/default-pool
84    m = re.match(r'https://container.googleapis.com/v1/(.*)',
85                 self._resource_data.get('selfLink', ''))
86    if not m:
87      raise RuntimeError('can\'t parse selfLink of nodepool resource')
88    return m.group(1)

Returns the full path of this resource.

Example: 'projects/gcpdiag-gke-1-9b90/zones/europe-west4-a/clusters/gke1'

short_path: str
90  @property
91  def short_path(self) -> str:
92    path = self.full_path
93    path = re.sub(r'^projects/', '', path)
94    path = re.sub(r'/locations/', '/', path)
95    path = re.sub(r'/zones/', '/', path)
96    path = re.sub(r'/clusters/', '/', path)
97    path = re.sub(r'/nodePools/', '/', path)
98    return path

Returns the short name for this resource.

Note that it isn't clear from this name what kind of resource it is.

Example: 'gke1'

name: str
100  @property
101  def name(self) -> str:
102    return self._resource_data['name']
config: NodeConfig
104  @property
105  def config(self) -> NodeConfig:
106    return NodeConfig(self._resource_data['config'])
node_count: int
108  @property
109  def node_count(self) -> int:
110    return self._resource_data.get('initialNodeCount', 0)
def has_default_service_account(self) -> bool:
112  def has_default_service_account(self) -> bool:
113    sa = self._get_service_account()
114    return sa == 'default'
def has_image_streaming_enabled(self) -> bool:
116  def has_image_streaming_enabled(self) -> bool:
117    return get_path(self._resource_data, ('config', 'gcfsConfig', 'enabled'),
118                    default=False)
def has_md_concealment_enabled(self) -> bool:
120  def has_md_concealment_enabled(self) -> bool:
121    # Empty ({}) workloadMetadataConfig means that 'Metadata concealment'
122    # (predecessor of Workload Identity) is enabled.
123    # https://cloud.google.com/kubernetes-engine/docs/how-to/protecting-cluster-metadata#concealment
124    return get_path(self._resource_data, ('config', 'workloadMetadataConfig'),
125                    default=None) == {}
def has_workload_identity_enabled(self) -> bool:
127  def has_workload_identity_enabled(self) -> bool:
128    # 'Metadata concealment' (workloadMetadataConfig == {}) doesn't protect the
129    # default SA's token
130    return bool(
131        get_path(self._resource_data, ('config', 'workloadMetadataConfig'),
132                 default=None))
service_account: str
134  @property
135  def service_account(self) -> str:
136    sa = self._get_service_account()
137    if sa == 'default':
138      project_nr = crm.get_project(self.project_id).number
139      return f'{project_nr}-compute@developer.gserviceaccount.com'
140    else:
141      return sa
pod_ipv4_cidr_size: int
143  @property
144  def pod_ipv4_cidr_size(self) -> int:
145    return self._resource_data['podIpv4CidrSize']
pod_ipv4_cidr_block: Union[ipaddress.IPv4Network, ipaddress.IPv6Network, NoneType]
147  @property
148  def pod_ipv4_cidr_block(self) -> Optional[IPv4NetOrIPv6Net]:
149    # Get the pod cidr range in use by the nodepool
150    pod_cidr = get_path(self._resource_data,
151                        ('networkConfig', 'podIpv4CidrBlock'),
152                        default=None)
153
154    if pod_cidr:
155      return ipaddress.ip_network(pod_cidr)
156    else:
157      return None
max_pod_per_node: int
159  @property
160  def max_pod_per_node(self) -> int:
161    return int(
162        get_path(self._resource_data, ('maxPodsConstraint', 'maxPodsPerNode'),
163                 default=DEFAULT_MAX_PODS_PER_NODE))
cluster: Cluster
165  @property
166  def cluster(self) -> 'Cluster':
167    return self._cluster
instance_groups: List[gcpdiag.queries.gce.ManagedInstanceGroup]
169  @property
170  def instance_groups(self) -> List[gce.ManagedInstanceGroup]:
171    if self._migs is None:
172      project_migs_by_selflink = {}
173      for m in gce.get_managed_instance_groups(
174          models.Context(project_id=self.project_id)).values():
175        project_migs_by_selflink[m.self_link] = m
176
177      self._migs = []
178      for url in self._resource_data.get('instanceGroupUrls', []):
179        try:
180          self._migs.append(project_migs_by_selflink[url])
181        except KeyError:
182          continue
183    return self._migs
node_tags: List[str]
185  @property
186  def node_tags(self) -> List[str]:
187    """Returns the firewall tags used for nodes in this cluster.
188
189    If the node tags can't be determined, [] is returned.
190    """
191    migs = self.instance_groups
192    if not migs:
193      return []
194    return migs[0].template.tags

Returns the firewall tags used for nodes in this cluster.

If the node tags can't be determined, [] is returned.

def get_machine_type(self) -> str:
196  def get_machine_type(self) -> str:
197    """Returns the machine type of the nodepool nodes"""
198    return self.config.machine_type

Returns the machine type of the nodepool nodes

class UndefinedClusterPropertyError(builtins.Exception):
201class UndefinedClusterPropertyError(Exception):
202  """Thrown when a property of a cluster can't be determined for
203  some reason. For example, the cluster_hash can't be determined
204  because there are no nodepools defined."""
205  pass

Thrown when a property of a cluster can't be determined for some reason. For example, the cluster_hash can't be determined because there are no nodepools defined.

class Cluster(gcpdiag.models.Resource):
208class Cluster(models.Resource):
209  """Represents a GKE cluster.
210
211  https://cloud.google.com/kubernetes-engine/docs/reference/rest/v1/projects.locations.clusters#Cluster
212  """
213  _resource_data: dict
214  master_version: Version
215
216  def __init__(self, project_id, resource_data):
217    super().__init__(project_id=project_id)
218    self._resource_data = resource_data
219    self.master_version = Version(self._resource_data['currentMasterVersion'])
220    self._nodepools = None
221
222  @property
223  def full_path(self) -> str:
224    if utils.is_region(self._resource_data['location']):
225      return (f'projects/{self.project_id}/'
226              f'locations/{self.location}/clusters/{self.name}')
227    else:
228      return (f'projects/{self.project_id}/'
229              f'zones/{self.location}/clusters/{self.name}')
230
231  @property
232  def short_path(self) -> str:
233    path = self.full_path
234    path = re.sub(r'^projects/', '', path)
235    path = re.sub(r'/locations/', '/', path)
236    path = re.sub(r'/zones/', '/', path)
237    path = re.sub(r'/clusters/', '/', path)
238    return path
239
240  @property
241  def name(self) -> str:
242    return self._resource_data['name']
243
244  @property
245  def location(self) -> str:
246    return self._resource_data['location']
247
248  @property
249  def pod_ipv4_cidr(self) -> IPv4NetOrIPv6Net:
250    cidr = self._resource_data['clusterIpv4Cidr']
251    return ipaddress.ip_network(cidr)
252
253  @property
254  def current_node_count(self) -> int:
255    return self._resource_data.get('currentNodeCount', 0)
256
257  @property
258  def release_channel(self) -> Optional[str]:
259    try:
260      return self._resource_data['releaseChannel']['channel']
261    except KeyError:
262      return None
263
264  @property
265  def nap_node_image_type(self) -> Optional[str]:
266
267    return get_path(
268        self._resource_data,
269        ('autoscaling', 'autoprovisioningNodePoolDefaults', 'imageType'),
270        default=None)
271
272  @property
273  def app_layer_sec_key(self) -> str:
274    return self._resource_data['databaseEncryption'].get('keyName')
275
276  @property
277  def status(self) -> str:
278    return self._resource_data['status']
279
280  @property
281  def status_message(self) -> str:
282    return self._resource_data.get('statusMessage', None)
283
284  def has_app_layer_enc_enabled(self) -> bool:
285    # state := 'DECRYPTED' | 'ENCRYPTED', keyName := 'full_path_to_key_resouce'
286    return get_path(self._resource_data, ('databaseEncryption', 'state'),
287                    default=None) == 'ENCRYPTED'
288
289  def has_logging_enabled(self) -> bool:
290    return self._resource_data['loggingService'] != 'none'
291
292  def enabled_logging_components(self) -> List[str]:
293    return self._resource_data['loggingConfig']['componentConfig'][
294        'enableComponents']
295
296  def has_monitoring_enabled(self) -> bool:
297    return self._resource_data['monitoringService'] != 'none'
298
299  def enabled_monitoring_components(self) -> List[str]:
300    return self._resource_data['monitoringConfig']['componentConfig'][
301        'enableComponents']
302
303  def has_authenticator_group_enabled(self) -> bool:
304    return len(self._resource_data.get('authenticatorGroupsConfig', {})) > 0
305
306  def has_workload_identity_enabled(self) -> bool:
307    return len(self._resource_data.get('workloadIdentityConfig', {})) > 0
308
309  def has_http_load_balancing_enabled(self) -> bool:
310    # HTTP load balancing needs to be enabled to use GKE ingress
311    return not (get_path(self._resource_data,
312                         ('addonsConfig', 'httpLoadBalancing', 'disabled'),
313                         default=None) is True)
314
315  def has_network_policy_enabled(self) -> bool:
316    # Network policy enforcement
317    return not (get_path(self._resource_data,
318                         ('addonsConfig', 'networkPolicyConfig', 'disabled'),
319                         default=False) is True)
320
321  def has_dpv2_enabled(self) -> bool:
322    # Checks whether dataplane V2 is enabled in clusters
323    return (get_path(self._resource_data, ('networkConfig', 'datapathProvider'),
324                     default=None) == 'ADVANCED_DATAPATH')
325
326  def has_intra_node_visibility_enabled(self) -> bool:
327    if ('networkConfig' in self._resource_data and
328        'enableIntraNodeVisibility' in self._resource_data['networkConfig']):
329      return self._resource_data['networkConfig']['enableIntraNodeVisibility']
330    return False
331
332  def has_maintenance_window(self) -> bool:
333    # 'e3b0c442' is a hexadecimal string that represents the value of an empty
334    # string ('') in cryptography. If the maintenance windows are defined, the
335    # value of 'resourceVersion' is not empty ('e3b0c442').
336    return self._resource_data['maintenancePolicy'][
337        'resourceVersion'] != 'e3b0c442'
338
339  def has_image_streaming_enabled(self) -> bool:
340    """
341    Check if cluster has Image Streaming (aka  Google Container File System)
342    enabled
343    """
344    global_gcsfs = get_path(
345        self._resource_data,
346        ('nodePoolDefaults', 'nodeConfigDefaults', 'gcfsConfig', 'enabled'),
347        default=False)
348    # Check nodePoolDefaults settings
349    if global_gcsfs:
350      return True
351    for np in self.nodepools:
352      # Check if any nodepool has image streaming enabled
353      if np.has_image_streaming_enabled():
354        return True
355    return False
356
357  @property
358  def nodepools(self) -> Iterable[NodePool]:
359    if self._nodepools is None:
360      self._nodepools = []
361      for n in self._resource_data.get('nodePools', []):
362        self._nodepools.append(NodePool(self, n))
363    return self._nodepools
364
365  @property
366  def network(self) -> network.Network:
367    # projects/gcpdiag-gke1-aaaa/global/networks/default
368    network_string = self._resource_data['networkConfig']['network']
369    m = re.match(r'projects/([^/]+)/global/networks/([^/]+)$', network_string)
370    if not m:
371      raise RuntimeError("can't parse network string: %s" % network_string)
372    return network.get_network(m.group(1), m.group(2))
373
374  @property
375  def subnetwork(self) -> Optional[models.Resource]:
376    # 'projects/gcpdiag-gke1-aaaa/regions/europe-west4/subnetworks/default'
377    if 'subnetwork' not in self._resource_data['networkConfig']:
378      return None
379
380    subnetwork_string = self._resource_data['networkConfig']['subnetwork']
381    m = re.match(r'projects/([^/]+)/regions/([^/]+)/subnetworks/([^/]+)$',
382                 subnetwork_string)
383    if not m:
384      raise RuntimeError("can't parse network string: %s" % subnetwork_string)
385    return network.get_subnetwork(m.group(1), m.group(2), m.group(3))
386
387  @property
388  def is_private(self) -> bool:
389    if not 'privateClusterConfig' in self._resource_data:
390      return False
391
392    return self._resource_data['privateClusterConfig'].get(
393        'enablePrivateNodes', False)
394
395  @property
396  def is_vpc_native(self) -> bool:
397    return (get_path(self._resource_data,
398                     ('ipAllocationPolicy', 'useIpAliases'),
399                     default=False))
400
401  @property
402  def is_regional(self) -> bool:
403    return len(self._resource_data['locations']) > 1
404
405  @property
406  def cluster_ca_certificate(self) -> str:
407    return self._resource_data['masterAuth']['clusterCaCertificate']
408
409  @property
410  def endpoint(self) -> Optional[str]:
411    if 'endpoint' not in self._resource_data:
412      return None
413    return self._resource_data['endpoint']
414
415  @property
416  def is_autopilot(self) -> bool:
417    if not 'autopilot' in self._resource_data:
418      return False
419    return self._resource_data['autopilot'].get('enabled', False)
420
421  @property
422  def masters_cidr_list(self) -> Iterable[IPv4NetOrIPv6Net]:
423    if get_path(self._resource_data,
424                ('privateClusterConfig', 'masterIpv4CidrBlock'),
425                default=None):
426      return [
427          ipaddress.ip_network(self._resource_data['privateClusterConfig']
428                               ['masterIpv4CidrBlock'])
429      ]
430    else:
431      #only older clusters still have ssh firewall rules
432      if self.current_node_count and not self.cluster_hash:
433        logging.warning("couldn't retrieve cluster hash for cluster %s.",
434                        self.name)
435        return []
436      fw_rule_name = f'gke-{self.name}-{self.cluster_hash}-ssh'
437      rule = self.network.firewall.get_vpc_ingress_rules(name=fw_rule_name)
438      if rule and rule[0].is_enabled():
439        return rule[0].source_ranges
440      return []
441
442  @property
443  def cluster_hash(self) -> Optional[str]:
444    """Returns the "cluster hash" as used in automatic firewall rules for GKE clusters.
445    Cluster hash is the first 8 characters of cluster id.
446    See also: https://cloud.google.com/kubernetes-engine/docs/concepts/firewall-rules
447    """
448    if 'id' in self._resource_data:
449      return self._resource_data['id'][:8]
450    raise UndefinedClusterPropertyError('no id')
Cluster(project_id, resource_data)
216  def __init__(self, project_id, resource_data):
217    super().__init__(project_id=project_id)
218    self._resource_data = resource_data
219    self.master_version = Version(self._resource_data['currentMasterVersion'])
220    self._nodepools = None
master_version: gcpdiag.utils.Version
full_path: str
222  @property
223  def full_path(self) -> str:
224    if utils.is_region(self._resource_data['location']):
225      return (f'projects/{self.project_id}/'
226              f'locations/{self.location}/clusters/{self.name}')
227    else:
228      return (f'projects/{self.project_id}/'
229              f'zones/{self.location}/clusters/{self.name}')

Returns the full path of this resource.

Example: 'projects/gcpdiag-gke-1-9b90/zones/europe-west4-a/clusters/gke1'

short_path: str
231  @property
232  def short_path(self) -> str:
233    path = self.full_path
234    path = re.sub(r'^projects/', '', path)
235    path = re.sub(r'/locations/', '/', path)
236    path = re.sub(r'/zones/', '/', path)
237    path = re.sub(r'/clusters/', '/', path)
238    return path

Returns the short name for this resource.

Note that it isn't clear from this name what kind of resource it is.

Example: 'gke1'

name: str
240  @property
241  def name(self) -> str:
242    return self._resource_data['name']
location: str
244  @property
245  def location(self) -> str:
246    return self._resource_data['location']
pod_ipv4_cidr: Union[ipaddress.IPv4Network, ipaddress.IPv6Network]
248  @property
249  def pod_ipv4_cidr(self) -> IPv4NetOrIPv6Net:
250    cidr = self._resource_data['clusterIpv4Cidr']
251    return ipaddress.ip_network(cidr)
current_node_count: int
253  @property
254  def current_node_count(self) -> int:
255    return self._resource_data.get('currentNodeCount', 0)
release_channel: Optional[str]
257  @property
258  def release_channel(self) -> Optional[str]:
259    try:
260      return self._resource_data['releaseChannel']['channel']
261    except KeyError:
262      return None
nap_node_image_type: Optional[str]
264  @property
265  def nap_node_image_type(self) -> Optional[str]:
266
267    return get_path(
268        self._resource_data,
269        ('autoscaling', 'autoprovisioningNodePoolDefaults', 'imageType'),
270        default=None)
app_layer_sec_key: str
272  @property
273  def app_layer_sec_key(self) -> str:
274    return self._resource_data['databaseEncryption'].get('keyName')
status: str
276  @property
277  def status(self) -> str:
278    return self._resource_data['status']
status_message: str
280  @property
281  def status_message(self) -> str:
282    return self._resource_data.get('statusMessage', None)
def has_app_layer_enc_enabled(self) -> bool:
284  def has_app_layer_enc_enabled(self) -> bool:
285    # state := 'DECRYPTED' | 'ENCRYPTED', keyName := 'full_path_to_key_resouce'
286    return get_path(self._resource_data, ('databaseEncryption', 'state'),
287                    default=None) == 'ENCRYPTED'
def has_logging_enabled(self) -> bool:
289  def has_logging_enabled(self) -> bool:
290    return self._resource_data['loggingService'] != 'none'
def enabled_logging_components(self) -> List[str]:
292  def enabled_logging_components(self) -> List[str]:
293    return self._resource_data['loggingConfig']['componentConfig'][
294        'enableComponents']
def has_monitoring_enabled(self) -> bool:
296  def has_monitoring_enabled(self) -> bool:
297    return self._resource_data['monitoringService'] != 'none'
def enabled_monitoring_components(self) -> List[str]:
299  def enabled_monitoring_components(self) -> List[str]:
300    return self._resource_data['monitoringConfig']['componentConfig'][
301        'enableComponents']
def has_authenticator_group_enabled(self) -> bool:
303  def has_authenticator_group_enabled(self) -> bool:
304    return len(self._resource_data.get('authenticatorGroupsConfig', {})) > 0
def has_workload_identity_enabled(self) -> bool:
306  def has_workload_identity_enabled(self) -> bool:
307    return len(self._resource_data.get('workloadIdentityConfig', {})) > 0
def has_http_load_balancing_enabled(self) -> bool:
309  def has_http_load_balancing_enabled(self) -> bool:
310    # HTTP load balancing needs to be enabled to use GKE ingress
311    return not (get_path(self._resource_data,
312                         ('addonsConfig', 'httpLoadBalancing', 'disabled'),
313                         default=None) is True)
def has_network_policy_enabled(self) -> bool:
315  def has_network_policy_enabled(self) -> bool:
316    # Network policy enforcement
317    return not (get_path(self._resource_data,
318                         ('addonsConfig', 'networkPolicyConfig', 'disabled'),
319                         default=False) is True)
def has_dpv2_enabled(self) -> bool:
321  def has_dpv2_enabled(self) -> bool:
322    # Checks whether dataplane V2 is enabled in clusters
323    return (get_path(self._resource_data, ('networkConfig', 'datapathProvider'),
324                     default=None) == 'ADVANCED_DATAPATH')
def has_intra_node_visibility_enabled(self) -> bool:
326  def has_intra_node_visibility_enabled(self) -> bool:
327    if ('networkConfig' in self._resource_data and
328        'enableIntraNodeVisibility' in self._resource_data['networkConfig']):
329      return self._resource_data['networkConfig']['enableIntraNodeVisibility']
330    return False
def has_maintenance_window(self) -> bool:
332  def has_maintenance_window(self) -> bool:
333    # 'e3b0c442' is a hexadecimal string that represents the value of an empty
334    # string ('') in cryptography. If the maintenance windows are defined, the
335    # value of 'resourceVersion' is not empty ('e3b0c442').
336    return self._resource_data['maintenancePolicy'][
337        'resourceVersion'] != 'e3b0c442'
def has_image_streaming_enabled(self) -> bool:
339  def has_image_streaming_enabled(self) -> bool:
340    """
341    Check if cluster has Image Streaming (aka  Google Container File System)
342    enabled
343    """
344    global_gcsfs = get_path(
345        self._resource_data,
346        ('nodePoolDefaults', 'nodeConfigDefaults', 'gcfsConfig', 'enabled'),
347        default=False)
348    # Check nodePoolDefaults settings
349    if global_gcsfs:
350      return True
351    for np in self.nodepools:
352      # Check if any nodepool has image streaming enabled
353      if np.has_image_streaming_enabled():
354        return True
355    return False

Check if cluster has Image Streaming (aka Google Container File System) enabled

nodepools: Iterable[NodePool]
357  @property
358  def nodepools(self) -> Iterable[NodePool]:
359    if self._nodepools is None:
360      self._nodepools = []
361      for n in self._resource_data.get('nodePools', []):
362        self._nodepools.append(NodePool(self, n))
363    return self._nodepools
network: gcpdiag.queries.network.Network
365  @property
366  def network(self) -> network.Network:
367    # projects/gcpdiag-gke1-aaaa/global/networks/default
368    network_string = self._resource_data['networkConfig']['network']
369    m = re.match(r'projects/([^/]+)/global/networks/([^/]+)$', network_string)
370    if not m:
371      raise RuntimeError("can't parse network string: %s" % network_string)
372    return network.get_network(m.group(1), m.group(2))
subnetwork: Optional[gcpdiag.models.Resource]
374  @property
375  def subnetwork(self) -> Optional[models.Resource]:
376    # 'projects/gcpdiag-gke1-aaaa/regions/europe-west4/subnetworks/default'
377    if 'subnetwork' not in self._resource_data['networkConfig']:
378      return None
379
380    subnetwork_string = self._resource_data['networkConfig']['subnetwork']
381    m = re.match(r'projects/([^/]+)/regions/([^/]+)/subnetworks/([^/]+)$',
382                 subnetwork_string)
383    if not m:
384      raise RuntimeError("can't parse network string: %s" % subnetwork_string)
385    return network.get_subnetwork(m.group(1), m.group(2), m.group(3))
is_private: bool
387  @property
388  def is_private(self) -> bool:
389    if not 'privateClusterConfig' in self._resource_data:
390      return False
391
392    return self._resource_data['privateClusterConfig'].get(
393        'enablePrivateNodes', False)
is_vpc_native: bool
395  @property
396  def is_vpc_native(self) -> bool:
397    return (get_path(self._resource_data,
398                     ('ipAllocationPolicy', 'useIpAliases'),
399                     default=False))
is_regional: bool
401  @property
402  def is_regional(self) -> bool:
403    return len(self._resource_data['locations']) > 1
cluster_ca_certificate: str
405  @property
406  def cluster_ca_certificate(self) -> str:
407    return self._resource_data['masterAuth']['clusterCaCertificate']
endpoint: Optional[str]
409  @property
410  def endpoint(self) -> Optional[str]:
411    if 'endpoint' not in self._resource_data:
412      return None
413    return self._resource_data['endpoint']
is_autopilot: bool
415  @property
416  def is_autopilot(self) -> bool:
417    if not 'autopilot' in self._resource_data:
418      return False
419    return self._resource_data['autopilot'].get('enabled', False)
masters_cidr_list: Iterable[Union[ipaddress.IPv4Network, ipaddress.IPv6Network]]
421  @property
422  def masters_cidr_list(self) -> Iterable[IPv4NetOrIPv6Net]:
423    if get_path(self._resource_data,
424                ('privateClusterConfig', 'masterIpv4CidrBlock'),
425                default=None):
426      return [
427          ipaddress.ip_network(self._resource_data['privateClusterConfig']
428                               ['masterIpv4CidrBlock'])
429      ]
430    else:
431      #only older clusters still have ssh firewall rules
432      if self.current_node_count and not self.cluster_hash:
433        logging.warning("couldn't retrieve cluster hash for cluster %s.",
434                        self.name)
435        return []
436      fw_rule_name = f'gke-{self.name}-{self.cluster_hash}-ssh'
437      rule = self.network.firewall.get_vpc_ingress_rules(name=fw_rule_name)
438      if rule and rule[0].is_enabled():
439        return rule[0].source_ranges
440      return []
cluster_hash: Optional[str]
442  @property
443  def cluster_hash(self) -> Optional[str]:
444    """Returns the "cluster hash" as used in automatic firewall rules for GKE clusters.
445    Cluster hash is the first 8 characters of cluster id.
446    See also: https://cloud.google.com/kubernetes-engine/docs/concepts/firewall-rules
447    """
448    if 'id' in self._resource_data:
449      return self._resource_data['id'][:8]
450    raise UndefinedClusterPropertyError('no id')

Returns the "cluster hash" as used in automatic firewall rules for GKE clusters. Cluster hash is the first 8 characters of cluster id. See also: https://cloud.google.com/kubernetes-engine/docs/concepts/firewall-rules

@caching.cached_api_call
def get_clusters( context: gcpdiag.models.Context) -> Mapping[str, Cluster]:
453@caching.cached_api_call
454def get_clusters(context: models.Context) -> Mapping[str, Cluster]:
455  """Get a list of Cluster matching the given context, indexed by cluster full path."""
456  clusters: Dict[str, Cluster] = {}
457  if not apis.is_enabled(context.project_id, 'container'):
458    return clusters
459  container_api = apis.get_api('container', 'v1', context.project_id)
460  logging.info('fetching list of GKE clusters in project %s',
461               context.project_id)
462  query = container_api.projects().locations().clusters().list(
463      parent=f'projects/{context.project_id}/locations/-')
464  try:
465    resp = query.execute(num_retries=config.API_RETRIES)
466    if 'clusters' not in resp:
467      return clusters
468    for resp_c in resp['clusters']:
469      # verify that we some minimal data that we expect
470      if 'name' not in resp_c or 'location' not in resp_c:
471        raise RuntimeError(
472            'missing data in projects.locations.clusters.list response')
473      if not context.match_project_resource(location=resp_c.get('location', ''),
474                                            labels=resp_c.get(
475                                                'resourceLabels', {}),
476                                            resource=resp_c.get('name', '')):
477        continue
478      c = Cluster(project_id=context.project_id, resource_data=resp_c)
479      clusters[c.full_path] = c
480  except googleapiclient.errors.HttpError as err:
481    raise utils.GcpApiError(err) from err
482  return clusters

Get a list of Cluster matching the given context, indexed by cluster full path.

@caching.cached_api_call
def get_cluster( project_id, cluster_id, location) -> Optional[Cluster]:
485@caching.cached_api_call
486def get_cluster(project_id, cluster_id, location) -> Union[Cluster, None]:
487  """Get a Cluster from project_id of a context."""
488  if not apis.is_enabled(project_id, 'container'):
489    return None
490  container_api = apis.get_api('container', 'v1', project_id)
491  logging.info('fetching the GKE cluster %s in project %s', cluster_id,
492               project_id)
493  query = container_api.projects().locations().clusters().get(
494      name=f'projects/{project_id}/locations/{location}/clusters/{cluster_id}')
495  try:
496    resp = query.execute(num_retries=config.API_RETRIES)
497    if cluster_id not in str(resp):
498      raise RuntimeError(
499          'missing data in projects.locations.clusters.list response')
500  except googleapiclient.errors.HttpError as err:
501    raise utils.GcpApiError(err) from err
502  return Cluster(project_id=project_id, resource_data=resp)

Get a Cluster from project_id of a context.

def get_valid_master_versions(project_id: str, location: str) -> List[str]:
517def get_valid_master_versions(project_id: str, location: str) -> List[str]:
518  """Get a list of valid GKE master versions."""
519  server_config = _get_server_config(project_id, location)
520  versions: List[str] = []
521
522  # channel versions may extend the list of all available versions.\
523  # Especially for the Rapid channel - many new versions only available in Rapid
524  # channel and not as a static version to make sure nobody stuck on that
525  # version for an extended period of time.
526  for c in server_config['channels']:
527    versions += c['validVersions']
528
529  versions += server_config['validMasterVersions']
530
531  return versions

Get a list of valid GKE master versions.

def get_valid_node_versions(project_id: str, location: str) -> List[str]:
534def get_valid_node_versions(project_id: str, location: str) -> List[str]:
535  """Get a list of valid GKE master versions."""
536  server_config = _get_server_config(project_id, location)
537  versions: List[str] = []
538
539  # See explanation in get_valid_master_versions
540  for c in server_config['channels']:
541    versions += c['validVersions']
542
543  versions += server_config['validNodeVersions']
544
545  return versions

Get a list of valid GKE master versions.

class Node(gcpdiag.models.Resource):
548class Node(models.Resource):
549  """Represents a GKE node.
550
551  This class useful for example to determine the GKE cluster when you only have
552  an GCE instance id (like from a metrics label). """
553
554  instance: gce.Instance
555  nodepool: NodePool
556  mig: gce.ManagedInstanceGroup
557
558  def __init__(self, instance, nodepool, mig):
559    super().__init__(project_id=instance.project_id)
560    self.instance = instance
561    self.nodepool = nodepool
562    self.mig = mig
563    pass
564
565  @property
566  def full_path(self) -> str:
567    return self.nodepool.cluster.full_path + '/nodes/' + self.instance.name
568
569  @property
570  def short_path(self) -> str:
571    #return self.nodepool.cluster.short_path + '/' + self.instance.name
572    return self.instance.short_path

Represents a GKE node.

This class useful for example to determine the GKE cluster when you only have an GCE instance id (like from a metrics label).

Node(instance, nodepool, mig)
558  def __init__(self, instance, nodepool, mig):
559    super().__init__(project_id=instance.project_id)
560    self.instance = instance
561    self.nodepool = nodepool
562    self.mig = mig
563    pass
nodepool: NodePool
full_path: str
565  @property
566  def full_path(self) -> str:
567    return self.nodepool.cluster.full_path + '/nodes/' + self.instance.name

Returns the full path of this resource.

Example: 'projects/gcpdiag-gke-1-9b90/zones/europe-west4-a/clusters/gke1'

short_path: str
569  @property
570  def short_path(self) -> str:
571    #return self.nodepool.cluster.short_path + '/' + self.instance.name
572    return self.instance.short_path

Returns the short name for this resource.

Note that it isn't clear from this name what kind of resource it is.

Example: 'gke1'

@functools.lru_cache()
def get_node_by_instance_id( context: gcpdiag.models.Context, instance_id: str) -> Node:
578@functools.lru_cache()
579def get_node_by_instance_id(context: models.Context, instance_id: str) -> Node:
580  """Get a gke.Node instance by instance id.
581
582  Throws a KeyError in case this instance is not found or isn't part of a GKE cluster.
583  """
584  # This will throw a KeyError if the instance is not found, which is also
585  # the behavior that we want for this function.
586  instance = gce.get_instances(context)[instance_id]
587  clusters = get_clusters(context)
588  try:
589    # instance.mig throws AttributeError if it isn't part of a mig
590    mig = instance.mig
591
592    # find a NodePool that uses this MIG
593    for c in clusters.values():
594      for np in c.nodepools:
595        for np_mig in np.instance_groups:
596          if mig == np_mig:
597            return Node(instance=instance, nodepool=np, mig=mig)
598
599    # if we didn't find a nodepool that owns this instance, raise a KeyError
600    raise KeyError('can\'t determine GKE cluster for instance %s' %
601                   (instance_id))
602
603  except AttributeError as err:
604    raise KeyError from err
605  return None

Get a gke.Node instance by instance id.

Throws a KeyError in case this instance is not found or isn't part of a GKE cluster.

@caching.cached_api_call
def get_release_schedule() -> Dict:
608@caching.cached_api_call
609def get_release_schedule() -> Dict:
610  """Extract the release schdule for gke clusters
611
612  Returns:
613    A dictionary of release schdule.
614  """
615  page_url = 'https://cloud.google.com/kubernetes-engine/docs/release-schedule'
616  release_data = {}
617  # estimate first month of the quarter
618  quarter_dates = {'Q1': '1', 'Q2': '4', 'Q3': '7', 'Q4': '10'}
619  try:
620    table = web.fetch_and_extract_table(page_url,
621                                        tag='table',
622                                        class_name='gke-release-schedule')
623
624    # Function to parse a date string or return None for 'N/A'
625    def parse_date(date_str) -> Optional[datetime.date]:
626      p = r'(?P<year>\d{4})-(?:(?P<quarter>Q[1-4])|(?P<month>[0-9]{1,2}))(?:-(?P<day>[0-9]{1,2}))?'
627      match = re.search(p, date_str)
628      # Handle incomplete dates in 'YYYY-MM' form
629      if match and match.group('month') and not match.group('day'):
630        return datetime.date.fromisoformat(f'{date_str}-15')
631      # Handle quarter year (for example, 2025-Q3) approximations that are updated when known.
632      # https://cloud.google.com/kubernetes-engine/docs/release-schedule.md#fn6
633      if match and match.group('quarter') and not match.group('day'):
634        date_str = f"{match.group('year')}-{quarter_dates[match.group('quarter')]}-01"
635        return datetime.date.fromisoformat(date_str)
636      if match and match.group('year') and match.group('month') and match.group(
637          'day'):
638        return datetime.date.fromisoformat(date_str)
639      # anything less like N/A return None
640      return None
641
642    def find_date_str_in_td(e):
643      """recursively find a date string in a td"""
644      if isinstance(e, str):
645        return e
646      if isinstance(e, bs4.element.Tag):
647        return find_date_str_in_td(e.next)
648      return None
649
650    # Find all table rows within tbody
651    rows = table.find('tbody').find_all('tr')
652
653    # Iterate over each row and extract the data
654    for row in rows:
655      # Extract all the columns (td elements)
656      cols = row.find_all('td')
657
658      # Extract relevant data
659
660      minor_version = cols[0].next.strip()
661      rapid_avail = parse_date(find_date_str_in_td(cols[1].next))
662      regular_avail = parse_date(find_date_str_in_td(cols[3].next))
663      stable_avail = parse_date(find_date_str_in_td(cols[5].next))
664      extended_avail = parse_date(find_date_str_in_td(cols[7].next))
665      end_of_standard_support = parse_date(find_date_str_in_td(cols[9].next))
666
667      # Add the extracted data into the dictionary in the desired format
668      release_data[minor_version] = {
669          'rapid_avail': rapid_avail,
670          'regular_avail': regular_avail,
671          'stable_avail': stable_avail,
672          'extended_avail': extended_avail,
673          'eol': end_of_standard_support,
674      }
675    return release_data
676  except (
677      requests.exceptions.RequestException,
678      AttributeError,
679      TypeError,
680      ValueError,
681      IndexError,
682  ) as e:
683    logging.error('Error in extracting gke release schedule: %s', e)
684    return release_data

Extract the release schdule for gke clusters

Returns:

A dictionary of release schdule.