gcpdiag.queries.gke

Queries related to GCP Kubernetes Engine clusters.
IPv4NetOrIPv6Net = typing.Union[ipaddress.IPv4Network, ipaddress.IPv6Network]
DEFAULT_MAX_PODS_PER_NODE = 110
class NodeConfig:
38class NodeConfig:
39  """Represents a GKE node pool configuration."""
40
41  def __init__(self, resource_data):
42    self._resource_data = resource_data
43
44  def has_accelerators(self) -> bool:
45    if 'accelerators' in self._resource_data:
46      return True
47    return False
48
49  @property
50  def machine_type(self) -> str:
51    return self._resource_data['machineType']
52
53  @property
54  def image_type(self) -> str:
55    return self._resource_data['imageType']
56
57  @property
58  def oauth_scopes(self) -> list:
59    return self._resource_data['oauthScopes']

Represents a GKE node pool configuration.

NodeConfig(resource_data)
41  def __init__(self, resource_data):
42    self._resource_data = resource_data
def has_accelerators(self) -> bool:
44  def has_accelerators(self) -> bool:
45    if 'accelerators' in self._resource_data:
46      return True
47    return False
machine_type: str
49  @property
50  def machine_type(self) -> str:
51    return self._resource_data['machineType']
image_type: str
53  @property
54  def image_type(self) -> str:
55    return self._resource_data['imageType']
oauth_scopes: list
57  @property
58  def oauth_scopes(self) -> list:
59    return self._resource_data['oauthScopes']
class NodePool(gcpdiag.models.Resource):
 62class NodePool(models.Resource):
 63  """Represents a GKE node pool."""
 64
 65  version: Version
 66
 67  def __init__(self, cluster, resource_data):
 68    super().__init__(project_id=cluster.project_id)
 69    self._cluster = cluster
 70    self._resource_data = resource_data
 71    self.version = Version(self._resource_data['version'])
 72    self._migs = None
 73
 74  def _get_service_account(self) -> str:
 75    return self._resource_data.get('config', {}).get('serviceAccount', None)
 76
 77  @property
 78  def full_path(self) -> str:
 79    # https://container.googleapis.com/v1/projects/gcpdiag-gke1-aaaa/
 80    #   locations/europe-west1/clusters/gke2/nodePools/default-pool
 81    m = re.match(r'https://container.googleapis.com/v1/(.*)',
 82                 self._resource_data.get('selfLink', ''))
 83    if not m:
 84      raise RuntimeError('can\'t parse selfLink of nodepool resource')
 85    return m.group(1)
 86
 87  @property
 88  def short_path(self) -> str:
 89    path = self.full_path
 90    path = re.sub(r'^projects/', '', path)
 91    path = re.sub(r'/locations/', '/', path)
 92    path = re.sub(r'/zones/', '/', path)
 93    path = re.sub(r'/clusters/', '/', path)
 94    path = re.sub(r'/nodePools/', '/', path)
 95    return path
 96
 97  @property
 98  def name(self) -> str:
 99    return self._resource_data['name']
100
101  @property
102  def config(self) -> NodeConfig:
103    return NodeConfig(self._resource_data['config'])
104
105  @property
106  def node_count(self) -> int:
107    return self._resource_data.get('initialNodeCount', 0)
108
109  def has_default_service_account(self) -> bool:
110    sa = self._get_service_account()
111    return sa == 'default'
112
113  def has_image_streaming_enabled(self) -> bool:
114    return get_path(self._resource_data, ('config', 'gcfsConfig', 'enabled'),
115                    default=False)
116
117  def has_md_concealment_enabled(self) -> bool:
118    # Empty ({}) workloadMetadataConfig means that 'Metadata concealment'
119    # (predecessor of Workload Identity) is enabled.
120    # https://cloud.google.com/kubernetes-engine/docs/how-to/protecting-cluster-metadata#concealment
121    return get_path(self._resource_data, ('config', 'workloadMetadataConfig'),
122                    default=None) == {}
123
124  def has_workload_identity_enabled(self) -> bool:
125    # 'Metadata concealment' (workloadMetadataConfig == {}) doesn't protect the
126    # default SA's token
127    return bool(
128        get_path(self._resource_data, ('config', 'workloadMetadataConfig'),
129                 default=None))
130
131  @property
132  def service_account(self) -> str:
133    sa = self._get_service_account()
134    if sa == 'default':
135      project_nr = crm.get_project(self.project_id).number
136      return f'{project_nr}-compute@developer.gserviceaccount.com'
137    else:
138      return sa
139
140  @property
141  def pod_ipv4_cidr_size(self) -> int:
142    return self._resource_data['podIpv4CidrSize']
143
144  @property
145  def pod_ipv4_cidr_block(self) -> Optional[IPv4NetOrIPv6Net]:
146    # Get the pod cidr range in use by the nodepool
147    pod_cidr = get_path(self._resource_data,
148                        ('networkConfig', 'podIpv4CidrBlock'),
149                        default=None)
150
151    if pod_cidr:
152      return ipaddress.ip_network(pod_cidr)
153    else:
154      return None
155
156  @property
157  def max_pod_per_node(self) -> int:
158    return int(
159        get_path(self._resource_data, ('maxPodsConstraint', 'maxPodsPerNode'),
160                 default=DEFAULT_MAX_PODS_PER_NODE))
161
162  @property
163  def cluster(self) -> 'Cluster':
164    return self._cluster
165
166  @property
167  def instance_groups(self) -> List[gce.ManagedInstanceGroup]:
168    if self._migs is None:
169      project_migs_by_selflink = {}
170      for m in gce.get_managed_instance_groups(
171          models.Context(project_id=self.project_id)).values():
172        project_migs_by_selflink[m.self_link] = m
173
174      self._migs = []
175      for url in self._resource_data.get('instanceGroupUrls', []):
176        try:
177          self._migs.append(project_migs_by_selflink[url])
178        except KeyError:
179          continue
180    return self._migs
181
182  @property
183  def node_tags(self) -> List[str]:
184    """Returns the firewall tags used for nodes in this cluster.
185
186    If the node tags can't be determined, [] is returned.
187    """
188    migs = self.instance_groups
189    if not migs:
190      return []
191    return migs[0].template.tags
192
193  def get_machine_type(self) -> str:
194    """Returns the machine type of the nodepool nodes"""
195    return self.config.machine_type

Represents a GKE node pool.

NodePool(cluster, resource_data)
67  def __init__(self, cluster, resource_data):
68    super().__init__(project_id=cluster.project_id)
69    self._cluster = cluster
70    self._resource_data = resource_data
71    self.version = Version(self._resource_data['version'])
72    self._migs = None
version: gcpdiag.utils.Version
full_path: str
77  @property
78  def full_path(self) -> str:
79    # https://container.googleapis.com/v1/projects/gcpdiag-gke1-aaaa/
80    #   locations/europe-west1/clusters/gke2/nodePools/default-pool
81    m = re.match(r'https://container.googleapis.com/v1/(.*)',
82                 self._resource_data.get('selfLink', ''))
83    if not m:
84      raise RuntimeError('can\'t parse selfLink of nodepool resource')
85    return m.group(1)

Returns the full path of this resource.

Example: 'projects/gcpdiag-gke-1-9b90/zones/europe-west4-a/clusters/gke1'

short_path: str
87  @property
88  def short_path(self) -> str:
89    path = self.full_path
90    path = re.sub(r'^projects/', '', path)
91    path = re.sub(r'/locations/', '/', path)
92    path = re.sub(r'/zones/', '/', path)
93    path = re.sub(r'/clusters/', '/', path)
94    path = re.sub(r'/nodePools/', '/', path)
95    return path

Returns the short name for this resource.

Note that it isn't clear from this name what kind of resource it is.

Example: 'gke1'

name: str
97  @property
98  def name(self) -> str:
99    return self._resource_data['name']
config: NodeConfig
101  @property
102  def config(self) -> NodeConfig:
103    return NodeConfig(self._resource_data['config'])
node_count: int
105  @property
106  def node_count(self) -> int:
107    return self._resource_data.get('initialNodeCount', 0)
def has_default_service_account(self) -> bool:
109  def has_default_service_account(self) -> bool:
110    sa = self._get_service_account()
111    return sa == 'default'
def has_image_streaming_enabled(self) -> bool:
113  def has_image_streaming_enabled(self) -> bool:
114    return get_path(self._resource_data, ('config', 'gcfsConfig', 'enabled'),
115                    default=False)
def has_md_concealment_enabled(self) -> bool:
117  def has_md_concealment_enabled(self) -> bool:
118    # Empty ({}) workloadMetadataConfig means that 'Metadata concealment'
119    # (predecessor of Workload Identity) is enabled.
120    # https://cloud.google.com/kubernetes-engine/docs/how-to/protecting-cluster-metadata#concealment
121    return get_path(self._resource_data, ('config', 'workloadMetadataConfig'),
122                    default=None) == {}
def has_workload_identity_enabled(self) -> bool:
124  def has_workload_identity_enabled(self) -> bool:
125    # 'Metadata concealment' (workloadMetadataConfig == {}) doesn't protect the
126    # default SA's token
127    return bool(
128        get_path(self._resource_data, ('config', 'workloadMetadataConfig'),
129                 default=None))
service_account: str
131  @property
132  def service_account(self) -> str:
133    sa = self._get_service_account()
134    if sa == 'default':
135      project_nr = crm.get_project(self.project_id).number
136      return f'{project_nr}-compute@developer.gserviceaccount.com'
137    else:
138      return sa
pod_ipv4_cidr_size: int
140  @property
141  def pod_ipv4_cidr_size(self) -> int:
142    return self._resource_data['podIpv4CidrSize']
pod_ipv4_cidr_block: Union[ipaddress.IPv4Network, ipaddress.IPv6Network, NoneType]
144  @property
145  def pod_ipv4_cidr_block(self) -> Optional[IPv4NetOrIPv6Net]:
146    # Get the pod cidr range in use by the nodepool
147    pod_cidr = get_path(self._resource_data,
148                        ('networkConfig', 'podIpv4CidrBlock'),
149                        default=None)
150
151    if pod_cidr:
152      return ipaddress.ip_network(pod_cidr)
153    else:
154      return None
max_pod_per_node: int
156  @property
157  def max_pod_per_node(self) -> int:
158    return int(
159        get_path(self._resource_data, ('maxPodsConstraint', 'maxPodsPerNode'),
160                 default=DEFAULT_MAX_PODS_PER_NODE))
cluster: Cluster
162  @property
163  def cluster(self) -> 'Cluster':
164    return self._cluster
instance_groups: List[gcpdiag.queries.gce.ManagedInstanceGroup]
166  @property
167  def instance_groups(self) -> List[gce.ManagedInstanceGroup]:
168    if self._migs is None:
169      project_migs_by_selflink = {}
170      for m in gce.get_managed_instance_groups(
171          models.Context(project_id=self.project_id)).values():
172        project_migs_by_selflink[m.self_link] = m
173
174      self._migs = []
175      for url in self._resource_data.get('instanceGroupUrls', []):
176        try:
177          self._migs.append(project_migs_by_selflink[url])
178        except KeyError:
179          continue
180    return self._migs
node_tags: List[str]
182  @property
183  def node_tags(self) -> List[str]:
184    """Returns the firewall tags used for nodes in this cluster.
185
186    If the node tags can't be determined, [] is returned.
187    """
188    migs = self.instance_groups
189    if not migs:
190      return []
191    return migs[0].template.tags

Returns the firewall tags used for nodes in this cluster.

If the node tags can't be determined, [] is returned.

def get_machine_type(self) -> str:
193  def get_machine_type(self) -> str:
194    """Returns the machine type of the nodepool nodes"""
195    return self.config.machine_type

Returns the machine type of the nodepool nodes

Inherited Members
gcpdiag.models.Resource
project_id
class UndefinedClusterPropertyError(builtins.Exception):
198class UndefinedClusterPropertyError(Exception):
199  """Thrown when a property of a cluster can't be determined for
200  some reason. For example, the cluster_hash can't be determined
201  because there are no nodepools defined."""
202  pass

Thrown when a property of a cluster can't be determined for some reason. For example, the cluster_hash can't be determined because there are no nodepools defined.

Inherited Members
builtins.Exception
Exception
builtins.BaseException
with_traceback
args
class Cluster(gcpdiag.models.Resource):
205class Cluster(models.Resource):
206  """Represents a GKE cluster.
207
208  https://cloud.google.com/kubernetes-engine/docs/reference/rest/v1/projects.locations.clusters#Cluster
209  """
210  _resource_data: dict
211  master_version: Version
212
213  def __init__(self, project_id, resource_data):
214    super().__init__(project_id=project_id)
215    self._resource_data = resource_data
216    self.master_version = Version(self._resource_data['currentMasterVersion'])
217    self._nodepools = None
218
219  @property
220  def full_path(self) -> str:
221    if utils.is_region(self._resource_data['location']):
222      return (f'projects/{self.project_id}/'
223              f'locations/{self.location}/clusters/{self.name}')
224    else:
225      return (f'projects/{self.project_id}/'
226              f'zones/{self.location}/clusters/{self.name}')
227
228  @property
229  def short_path(self) -> str:
230    path = self.full_path
231    path = re.sub(r'^projects/', '', path)
232    path = re.sub(r'/locations/', '/', path)
233    path = re.sub(r'/zones/', '/', path)
234    path = re.sub(r'/clusters/', '/', path)
235    return path
236
237  @property
238  def name(self) -> str:
239    return self._resource_data['name']
240
241  @property
242  def location(self) -> str:
243    return self._resource_data['location']
244
245  @property
246  def pod_ipv4_cidr(self) -> IPv4NetOrIPv6Net:
247    cidr = self._resource_data['clusterIpv4Cidr']
248    return ipaddress.ip_network(cidr)
249
250  @property
251  def current_node_count(self) -> int:
252    return self._resource_data.get('currentNodeCount', 0)
253
254  @property
255  def release_channel(self) -> Optional[str]:
256    try:
257      return self._resource_data['releaseChannel']['channel']
258    except KeyError:
259      return None
260
261  @property
262  def nap_node_image_type(self) -> Optional[str]:
263
264    return get_path(
265        self._resource_data,
266        ('autoscaling', 'autoprovisioningNodePoolDefaults', 'imageType'),
267        default=None)
268
269  @property
270  def app_layer_sec_key(self) -> str:
271    return self._resource_data['databaseEncryption'].get('keyName')
272
273  def has_app_layer_enc_enabled(self) -> bool:
274    # state := 'DECRYPTED' | 'ENCRYPTED', keyName := 'full_path_to_key_resouce'
275    return get_path(self._resource_data, ('databaseEncryption', 'state'),
276                    default=None) == 'ENCRYPTED'
277
278  def has_logging_enabled(self) -> bool:
279    return self._resource_data['loggingService'] != 'none'
280
281  def enabled_logging_components(self) -> List[str]:
282    return self._resource_data['loggingConfig']['componentConfig'][
283        'enableComponents']
284
285  def has_monitoring_enabled(self) -> bool:
286    return self._resource_data['monitoringService'] != 'none'
287
288  def has_authenticator_group_enabled(self) -> bool:
289    return len(self._resource_data.get('authenticatorGroupsConfig', {})) > 0
290
291  def has_workload_identity_enabled(self) -> bool:
292    return len(self._resource_data.get('workloadIdentityConfig', {})) > 0
293
294  def has_http_load_balancing_enabled(self) -> bool:
295    # HTTP load balancing needs to be enabled to use GKE ingress
296    return not (get_path(self._resource_data,
297                         ('addonsConfig', 'httpLoadBalancing', 'disabled'),
298                         default=None) is True)
299
300  def has_network_policy_enabled(self) -> bool:
301    # Network policy enforcement
302    return not (get_path(self._resource_data,
303                         ('addonsConfig', 'networkPolicyConfig', 'disabled'),
304                         default=False) is True)
305
306  def has_dpv2_enabled(self) -> bool:
307    # Checks whether dataplane V2 is enabled in clusters
308    return (get_path(self._resource_data, ('networkConfig', 'datapathProvider'),
309                     default=None) == 'ADVANCED_DATAPATH')
310
311  def has_intra_node_visibility_enabled(self) -> bool:
312    if ('networkConfig' in self._resource_data and
313        'enableIntraNodeVisibility' in self._resource_data['networkConfig']):
314      return self._resource_data['networkConfig']['enableIntraNodeVisibility']
315    return False
316
317  def has_maintenance_window(self) -> bool:
318    # 'e3b0c442' is a hexadecimal string that represents the value of an empty
319    # string ('') in cryptography. If the maintenance windows are defined, the
320    # value of 'resourceVersion' is not empty ('e3b0c442').
321    return self._resource_data['maintenancePolicy'][
322        'resourceVersion'] != 'e3b0c442'
323
324  def has_image_streaming_enabled(self) -> bool:
325    """
326    Check if cluster has Image Streaming (aka  Google Container File System)
327    enabled
328    """
329    global_gcsfs = get_path(
330        self._resource_data,
331        ('nodePoolDefaults', 'nodeConfigDefaults', 'gcfsConfig', 'enabled'),
332        default=False)
333    # Check nodePoolDefaults settings
334    if global_gcsfs:
335      return True
336    for np in self.nodepools:
337      # Check if any nodepool has image streaming enabled
338      if np.has_image_streaming_enabled():
339        return True
340    return False
341
342  @property
343  def nodepools(self) -> Iterable[NodePool]:
344    if self._nodepools is None:
345      self._nodepools = []
346      for n in self._resource_data.get('nodePools', []):
347        self._nodepools.append(NodePool(self, n))
348    return self._nodepools
349
350  @property
351  def network(self) -> network.Network:
352    # projects/gcpdiag-gke1-aaaa/global/networks/default
353    network_string = self._resource_data['networkConfig']['network']
354    m = re.match(r'projects/([^/]+)/global/networks/([^/]+)$', network_string)
355    if not m:
356      raise RuntimeError("can't parse network string: %s" % network_string)
357    return network.get_network(m.group(1), m.group(2))
358
359  @property
360  def subnetwork(self) -> Optional[models.Resource]:
361    # 'projects/gcpdiag-gke1-aaaa/regions/europe-west4/subnetworks/default'
362    if 'subnetwork' not in self._resource_data['networkConfig']:
363      return None
364
365    subnetwork_string = self._resource_data['networkConfig']['subnetwork']
366    m = re.match(r'projects/([^/]+)/regions/([^/]+)/subnetworks/([^/]+)$',
367                 subnetwork_string)
368    if not m:
369      raise RuntimeError("can't parse network string: %s" % subnetwork_string)
370    return network.get_subnetwork(m.group(1), m.group(2), m.group(3))
371
372  @property
373  def is_private(self) -> bool:
374    if not 'privateClusterConfig' in self._resource_data:
375      return False
376
377    return self._resource_data['privateClusterConfig'].get(
378        'enablePrivateNodes', False)
379
380  @property
381  def is_vpc_native(self) -> bool:
382    return (get_path(self._resource_data,
383                     ('ipAllocationPolicy', 'useIpAliases'),
384                     default=False))
385
386  @property
387  def is_regional(self) -> bool:
388    return len(self._resource_data['locations']) > 1
389
390  @property
391  def cluster_ca_certificate(self) -> str:
392    return self._resource_data['masterAuth']['clusterCaCertificate']
393
394  @property
395  def endpoint(self) -> Optional[str]:
396    if 'endpoint' not in self._resource_data:
397      return None
398    return self._resource_data['endpoint']
399
400  @property
401  def is_autopilot(self) -> bool:
402    if not 'autopilot' in self._resource_data:
403      return False
404    return self._resource_data['autopilot'].get('enabled', False)
405
406  @property
407  def masters_cidr_list(self) -> Iterable[IPv4NetOrIPv6Net]:
408    if get_path(self._resource_data,
409                ('privateClusterConfig', 'masterIpv4CidrBlock'),
410                default=None):
411      return [
412          ipaddress.ip_network(self._resource_data['privateClusterConfig']
413                               ['masterIpv4CidrBlock'])
414      ]
415    else:
416      #only older clusters still have ssh firewall rules
417      if self.current_node_count and not self.cluster_hash:
418        logging.warning("couldn't retrieve cluster hash for cluster %s.",
419                        self.name)
420        return []
421      fw_rule_name = f'gke-{self.name}-{self.cluster_hash}-ssh'
422      rule = self.network.firewall.get_vpc_ingress_rules(name=fw_rule_name)
423      if rule and rule[0].is_enabled():
424        return rule[0].source_ranges
425      return []
426
427  @property
428  def cluster_hash(self) -> Optional[str]:
429    """Returns the "cluster hash" as used in automatic firewall rules for GKE clusters.
430    Cluster hash is the first 8 characters of cluster id.
431    See also: https://cloud.google.com/kubernetes-engine/docs/concepts/firewall-rules
432    """
433    if 'id' in self._resource_data:
434      return self._resource_data['id'][:8]
435    raise UndefinedClusterPropertyError('no id')
Cluster(project_id, resource_data)
213  def __init__(self, project_id, resource_data):
214    super().__init__(project_id=project_id)
215    self._resource_data = resource_data
216    self.master_version = Version(self._resource_data['currentMasterVersion'])
217    self._nodepools = None
master_version: gcpdiag.utils.Version
full_path: str
219  @property
220  def full_path(self) -> str:
221    if utils.is_region(self._resource_data['location']):
222      return (f'projects/{self.project_id}/'
223              f'locations/{self.location}/clusters/{self.name}')
224    else:
225      return (f'projects/{self.project_id}/'
226              f'zones/{self.location}/clusters/{self.name}')

Returns the full path of this resource.

Example: 'projects/gcpdiag-gke-1-9b90/zones/europe-west4-a/clusters/gke1'

short_path: str
228  @property
229  def short_path(self) -> str:
230    path = self.full_path
231    path = re.sub(r'^projects/', '', path)
232    path = re.sub(r'/locations/', '/', path)
233    path = re.sub(r'/zones/', '/', path)
234    path = re.sub(r'/clusters/', '/', path)
235    return path

Returns the short name for this resource.

Note that it isn't clear from this name what kind of resource it is.

Example: 'gke1'

name: str
237  @property
238  def name(self) -> str:
239    return self._resource_data['name']
location: str
241  @property
242  def location(self) -> str:
243    return self._resource_data['location']
pod_ipv4_cidr: Union[ipaddress.IPv4Network, ipaddress.IPv6Network]
245  @property
246  def pod_ipv4_cidr(self) -> IPv4NetOrIPv6Net:
247    cidr = self._resource_data['clusterIpv4Cidr']
248    return ipaddress.ip_network(cidr)
current_node_count: int
250  @property
251  def current_node_count(self) -> int:
252    return self._resource_data.get('currentNodeCount', 0)
release_channel: Optional[str]
254  @property
255  def release_channel(self) -> Optional[str]:
256    try:
257      return self._resource_data['releaseChannel']['channel']
258    except KeyError:
259      return None
nap_node_image_type: Optional[str]
261  @property
262  def nap_node_image_type(self) -> Optional[str]:
263
264    return get_path(
265        self._resource_data,
266        ('autoscaling', 'autoprovisioningNodePoolDefaults', 'imageType'),
267        default=None)
app_layer_sec_key: str
269  @property
270  def app_layer_sec_key(self) -> str:
271    return self._resource_data['databaseEncryption'].get('keyName')
def has_app_layer_enc_enabled(self) -> bool:
273  def has_app_layer_enc_enabled(self) -> bool:
274    # state := 'DECRYPTED' | 'ENCRYPTED', keyName := 'full_path_to_key_resouce'
275    return get_path(self._resource_data, ('databaseEncryption', 'state'),
276                    default=None) == 'ENCRYPTED'
def has_logging_enabled(self) -> bool:
278  def has_logging_enabled(self) -> bool:
279    return self._resource_data['loggingService'] != 'none'
def enabled_logging_components(self) -> List[str]:
281  def enabled_logging_components(self) -> List[str]:
282    return self._resource_data['loggingConfig']['componentConfig'][
283        'enableComponents']
def has_monitoring_enabled(self) -> bool:
285  def has_monitoring_enabled(self) -> bool:
286    return self._resource_data['monitoringService'] != 'none'
def has_authenticator_group_enabled(self) -> bool:
288  def has_authenticator_group_enabled(self) -> bool:
289    return len(self._resource_data.get('authenticatorGroupsConfig', {})) > 0
def has_workload_identity_enabled(self) -> bool:
291  def has_workload_identity_enabled(self) -> bool:
292    return len(self._resource_data.get('workloadIdentityConfig', {})) > 0
def has_http_load_balancing_enabled(self) -> bool:
294  def has_http_load_balancing_enabled(self) -> bool:
295    # HTTP load balancing needs to be enabled to use GKE ingress
296    return not (get_path(self._resource_data,
297                         ('addonsConfig', 'httpLoadBalancing', 'disabled'),
298                         default=None) is True)
def has_network_policy_enabled(self) -> bool:
300  def has_network_policy_enabled(self) -> bool:
301    # Network policy enforcement
302    return not (get_path(self._resource_data,
303                         ('addonsConfig', 'networkPolicyConfig', 'disabled'),
304                         default=False) is True)
def has_dpv2_enabled(self) -> bool:
306  def has_dpv2_enabled(self) -> bool:
307    # Checks whether dataplane V2 is enabled in clusters
308    return (get_path(self._resource_data, ('networkConfig', 'datapathProvider'),
309                     default=None) == 'ADVANCED_DATAPATH')
def has_intra_node_visibility_enabled(self) -> bool:
311  def has_intra_node_visibility_enabled(self) -> bool:
312    if ('networkConfig' in self._resource_data and
313        'enableIntraNodeVisibility' in self._resource_data['networkConfig']):
314      return self._resource_data['networkConfig']['enableIntraNodeVisibility']
315    return False
def has_maintenance_window(self) -> bool:
317  def has_maintenance_window(self) -> bool:
318    # 'e3b0c442' is a hexadecimal string that represents the value of an empty
319    # string ('') in cryptography. If the maintenance windows are defined, the
320    # value of 'resourceVersion' is not empty ('e3b0c442').
321    return self._resource_data['maintenancePolicy'][
322        'resourceVersion'] != 'e3b0c442'
def has_image_streaming_enabled(self) -> bool:
324  def has_image_streaming_enabled(self) -> bool:
325    """
326    Check if cluster has Image Streaming (aka  Google Container File System)
327    enabled
328    """
329    global_gcsfs = get_path(
330        self._resource_data,
331        ('nodePoolDefaults', 'nodeConfigDefaults', 'gcfsConfig', 'enabled'),
332        default=False)
333    # Check nodePoolDefaults settings
334    if global_gcsfs:
335      return True
336    for np in self.nodepools:
337      # Check if any nodepool has image streaming enabled
338      if np.has_image_streaming_enabled():
339        return True
340    return False

Check if cluster has Image Streaming (aka Google Container File System) enabled

nodepools: Iterable[NodePool]
342  @property
343  def nodepools(self) -> Iterable[NodePool]:
344    if self._nodepools is None:
345      self._nodepools = []
346      for n in self._resource_data.get('nodePools', []):
347        self._nodepools.append(NodePool(self, n))
348    return self._nodepools
network: gcpdiag.queries.network.Network
350  @property
351  def network(self) -> network.Network:
352    # projects/gcpdiag-gke1-aaaa/global/networks/default
353    network_string = self._resource_data['networkConfig']['network']
354    m = re.match(r'projects/([^/]+)/global/networks/([^/]+)$', network_string)
355    if not m:
356      raise RuntimeError("can't parse network string: %s" % network_string)
357    return network.get_network(m.group(1), m.group(2))
subnetwork: Optional[gcpdiag.models.Resource]
359  @property
360  def subnetwork(self) -> Optional[models.Resource]:
361    # 'projects/gcpdiag-gke1-aaaa/regions/europe-west4/subnetworks/default'
362    if 'subnetwork' not in self._resource_data['networkConfig']:
363      return None
364
365    subnetwork_string = self._resource_data['networkConfig']['subnetwork']
366    m = re.match(r'projects/([^/]+)/regions/([^/]+)/subnetworks/([^/]+)$',
367                 subnetwork_string)
368    if not m:
369      raise RuntimeError("can't parse network string: %s" % subnetwork_string)
370    return network.get_subnetwork(m.group(1), m.group(2), m.group(3))
is_private: bool
372  @property
373  def is_private(self) -> bool:
374    if not 'privateClusterConfig' in self._resource_data:
375      return False
376
377    return self._resource_data['privateClusterConfig'].get(
378        'enablePrivateNodes', False)
is_vpc_native: bool
380  @property
381  def is_vpc_native(self) -> bool:
382    return (get_path(self._resource_data,
383                     ('ipAllocationPolicy', 'useIpAliases'),
384                     default=False))
is_regional: bool
386  @property
387  def is_regional(self) -> bool:
388    return len(self._resource_data['locations']) > 1
cluster_ca_certificate: str
390  @property
391  def cluster_ca_certificate(self) -> str:
392    return self._resource_data['masterAuth']['clusterCaCertificate']
endpoint: Optional[str]
394  @property
395  def endpoint(self) -> Optional[str]:
396    if 'endpoint' not in self._resource_data:
397      return None
398    return self._resource_data['endpoint']
is_autopilot: bool
400  @property
401  def is_autopilot(self) -> bool:
402    if not 'autopilot' in self._resource_data:
403      return False
404    return self._resource_data['autopilot'].get('enabled', False)
masters_cidr_list: Iterable[Union[ipaddress.IPv4Network, ipaddress.IPv6Network]]
406  @property
407  def masters_cidr_list(self) -> Iterable[IPv4NetOrIPv6Net]:
408    if get_path(self._resource_data,
409                ('privateClusterConfig', 'masterIpv4CidrBlock'),
410                default=None):
411      return [
412          ipaddress.ip_network(self._resource_data['privateClusterConfig']
413                               ['masterIpv4CidrBlock'])
414      ]
415    else:
416      #only older clusters still have ssh firewall rules
417      if self.current_node_count and not self.cluster_hash:
418        logging.warning("couldn't retrieve cluster hash for cluster %s.",
419                        self.name)
420        return []
421      fw_rule_name = f'gke-{self.name}-{self.cluster_hash}-ssh'
422      rule = self.network.firewall.get_vpc_ingress_rules(name=fw_rule_name)
423      if rule and rule[0].is_enabled():
424        return rule[0].source_ranges
425      return []
cluster_hash: Optional[str]
427  @property
428  def cluster_hash(self) -> Optional[str]:
429    """Returns the "cluster hash" as used in automatic firewall rules for GKE clusters.
430    Cluster hash is the first 8 characters of cluster id.
431    See also: https://cloud.google.com/kubernetes-engine/docs/concepts/firewall-rules
432    """
433    if 'id' in self._resource_data:
434      return self._resource_data['id'][:8]
435    raise UndefinedClusterPropertyError('no id')

Returns the "cluster hash" as used in automatic firewall rules for GKE clusters. Cluster hash is the first 8 characters of cluster id. See also: https://cloud.google.com/kubernetes-engine/docs/concepts/firewall-rules

Inherited Members
gcpdiag.models.Resource
project_id
@caching.cached_api_call
def get_clusters( context: gcpdiag.models.Context) -> Mapping[str, Cluster]:
438@caching.cached_api_call
439def get_clusters(context: models.Context) -> Mapping[str, Cluster]:
440  """Get a list of Cluster matching the given context, indexed by cluster full path."""
441  clusters: Dict[str, Cluster] = {}
442  if not apis.is_enabled(context.project_id, 'container'):
443    return clusters
444  container_api = apis.get_api('container', 'v1', context.project_id)
445  logging.info('fetching list of GKE clusters in project %s',
446               context.project_id)
447  query = container_api.projects().locations().clusters().list(
448      parent=f'projects/{context.project_id}/locations/-')
449  try:
450    resp = query.execute(num_retries=config.API_RETRIES)
451    if 'clusters' not in resp:
452      return clusters
453    for resp_c in resp['clusters']:
454      # verify that we some minimal data that we expect
455      if 'name' not in resp_c or 'location' not in resp_c:
456        raise RuntimeError(
457            'missing data in projects.locations.clusters.list response')
458      if not context.match_project_resource(location=resp_c.get('location', ''),
459                                            labels=resp_c.get(
460                                                'resourceLabels', {}),
461                                            resource=resp_c.get('name', '')):
462        continue
463      c = Cluster(project_id=context.project_id, resource_data=resp_c)
464      clusters[c.full_path] = c
465  except googleapiclient.errors.HttpError as err:
466    raise utils.GcpApiError(err) from err
467  return clusters

Get a list of Cluster matching the given context, indexed by cluster full path.

def get_valid_master_versions(project_id: str, location: str) -> List[str]:
482def get_valid_master_versions(project_id: str, location: str) -> List[str]:
483  """Get a list of valid GKE master versions."""
484  server_config = _get_server_config(project_id, location)
485  versions: List[str] = []
486
487  # channel versions may extend the list of all available versions.\
488  # Especially for the Rapid channel - many new versions only available in Rapid
489  # channel and not as a static version to make sure nobody stuck on that
490  # version for an extended period of time.
491  for c in server_config['channels']:
492    versions += c['validVersions']
493
494  versions += server_config['validMasterVersions']
495
496  return versions

Get a list of valid GKE master versions.

def get_valid_node_versions(project_id: str, location: str) -> List[str]:
499def get_valid_node_versions(project_id: str, location: str) -> List[str]:
500  """Get a list of valid GKE master versions."""
501  server_config = _get_server_config(project_id, location)
502  versions: List[str] = []
503
504  # See explanaition in get_valid_master_versions
505  for c in server_config['channels']:
506    versions += c['validVersions']
507
508  versions += server_config['validNodeVersions']
509
510  return versions

Get a list of valid GKE master versions.

class Node(gcpdiag.models.Resource):
513class Node(models.Resource):
514  """Represents a GKE node.
515
516  This class useful for example to determine the GKE cluster when you only have
517  an GCE instance id (like from a metrics label). """
518
519  instance: gce.Instance
520  nodepool: NodePool
521  mig: gce.ManagedInstanceGroup
522
523  def __init__(self, instance, nodepool, mig):
524    super().__init__(project_id=instance.project_id)
525    self.instance = instance
526    self.nodepool = nodepool
527    self.mig = mig
528    pass
529
530  @property
531  def full_path(self) -> str:
532    return self.nodepool.cluster.full_path + '/nodes/' + self.instance.name
533
534  @property
535  def short_path(self) -> str:
536    #return self.nodepool.cluster.short_path + '/' + self.instance.name
537    return self.instance.short_path

Represents a GKE node.

This class useful for example to determine the GKE cluster when you only have an GCE instance id (like from a metrics label).

Node(instance, nodepool, mig)
523  def __init__(self, instance, nodepool, mig):
524    super().__init__(project_id=instance.project_id)
525    self.instance = instance
526    self.nodepool = nodepool
527    self.mig = mig
528    pass
nodepool: NodePool
full_path: str
530  @property
531  def full_path(self) -> str:
532    return self.nodepool.cluster.full_path + '/nodes/' + self.instance.name

Returns the full path of this resource.

Example: 'projects/gcpdiag-gke-1-9b90/zones/europe-west4-a/clusters/gke1'

short_path: str
534  @property
535  def short_path(self) -> str:
536    #return self.nodepool.cluster.short_path + '/' + self.instance.name
537    return self.instance.short_path

Returns the short name for this resource.

Note that it isn't clear from this name what kind of resource it is.

Example: 'gke1'

Inherited Members
gcpdiag.models.Resource
project_id
@functools.lru_cache()
def get_node_by_instance_id( context: gcpdiag.models.Context, instance_id: str) -> Node:
543@functools.lru_cache()
544def get_node_by_instance_id(context: models.Context, instance_id: str) -> Node:
545  """Get a gke.Node instance by instance id.
546
547  Throws a KeyError in case this instance is not found or isn't part of a GKE cluster.
548  """
549  # This will throw a KeyError if the instance is not found, which is also
550  # the behavior that we want for this function.
551  instance = gce.get_instances(context)[instance_id]
552  clusters = get_clusters(context)
553  try:
554    # instance.mig throws AttributeError if it isn't part of a mig
555    mig = instance.mig
556
557    # find a NodePool that uses this MIG
558    for c in clusters.values():
559      for np in c.nodepools:
560        for np_mig in np.instance_groups:
561          if mig == np_mig:
562            return Node(instance=instance, nodepool=np, mig=mig)
563
564    # if we didn't find a nodepool that owns this instance, raise a KeyError
565    raise KeyError('can\'t determine GKE cluster for instance %s' %
566                   (instance_id))
567
568  except AttributeError as err:
569    raise KeyError from err
570  return None

Get a gke.Node instance by instance id.

Throws a KeyError in case this instance is not found or isn't part of a GKE cluster.