gcpdiag.queries.kubectl

Queries related to Kubectl plugins.
def get_config_path():
31def get_config_path():
32  return config.get_cache_dir() + '/gcpdiag-config'
class KubectlExecutor:
 35class KubectlExecutor:
 36  """Represents a kubectl executor."""
 37
 38  lock: threading.Lock
 39
 40  def __init__(self, cluster: gke.Cluster):
 41    self.cluster = cluster
 42    self.lock = threading.Lock()
 43
 44  def make_kube_config(self) -> bool:
 45    """Add a new kubernetes context for kubectl plugin CLIs."""
 46
 47    cfg: dict = {}
 48    if not os.path.isfile(get_config_path()):
 49      cfg['apiVersion'] = 'v1'
 50      cfg['users'] = [{
 51          'name': 'gcpdiag',
 52          'user': {
 53              'exec': {
 54                  'apiVersion': 'client.authentication.k8s.io/v1beta1',
 55                  'command': 'gke-gcloud-auth-plugin',
 56                  'installHint': 'x',
 57                  'provideClusterInfo': True,
 58              },
 59          },
 60      }]
 61      cfg['clusters'] = []
 62      cfg['contexts'] = []
 63    else:
 64      with open(get_config_path(), encoding='UTF-8') as f:
 65        cfg = yaml.safe_load(f)
 66
 67    if self.cluster.endpoint is None:
 68      logging.warning('No kubernetes API server endpoint found for cluster %s',
 69                      self.cluster.short_path)
 70      return False
 71
 72    kubecontext = 'gcpdiag-ctx-' + self.cluster.name
 73
 74    cfg['clusters'].append({
 75        'cluster': {
 76            'certificate-authority-data': self.cluster.cluster_ca_certificate,
 77            'server': 'https://' + self.cluster.endpoint,
 78        },
 79        'name': self.cluster.short_path,
 80    })
 81    cfg['contexts'].append({
 82        'context': {
 83            'cluster': self.cluster.short_path,
 84            'user': 'gcpdiag',
 85        },
 86        'name': kubecontext,
 87    })
 88
 89    self.kubecontext = kubecontext
 90
 91    config_text = yaml.dump(cfg, default_flow_style=False)
 92    with open(get_config_path(), 'w', encoding='UTF-8') as config_file:
 93      config_file.write(config_text)
 94      config_file.close()
 95
 96    return True
 97
 98  def kubectl_execute(self, command_list: list[str]):
 99    """ Execute a kubectl command.
100
101      Will take a list of strings which contains all the command and parameters to be executed
102      and return the stdout and stderr of the execution.
103    """
104    res = subprocess.run(command_list,
105                         check=False,
106                         capture_output=True,
107                         text=True)
108    return res.stdout, res.stderr

Represents a kubectl executor.

KubectlExecutor(cluster: gcpdiag.queries.gke.Cluster)
40  def __init__(self, cluster: gke.Cluster):
41    self.cluster = cluster
42    self.lock = threading.Lock()
lock: <built-in function allocate_lock>
cluster
def make_kube_config(self) -> bool:
44  def make_kube_config(self) -> bool:
45    """Add a new kubernetes context for kubectl plugin CLIs."""
46
47    cfg: dict = {}
48    if not os.path.isfile(get_config_path()):
49      cfg['apiVersion'] = 'v1'
50      cfg['users'] = [{
51          'name': 'gcpdiag',
52          'user': {
53              'exec': {
54                  'apiVersion': 'client.authentication.k8s.io/v1beta1',
55                  'command': 'gke-gcloud-auth-plugin',
56                  'installHint': 'x',
57                  'provideClusterInfo': True,
58              },
59          },
60      }]
61      cfg['clusters'] = []
62      cfg['contexts'] = []
63    else:
64      with open(get_config_path(), encoding='UTF-8') as f:
65        cfg = yaml.safe_load(f)
66
67    if self.cluster.endpoint is None:
68      logging.warning('No kubernetes API server endpoint found for cluster %s',
69                      self.cluster.short_path)
70      return False
71
72    kubecontext = 'gcpdiag-ctx-' + self.cluster.name
73
74    cfg['clusters'].append({
75        'cluster': {
76            'certificate-authority-data': self.cluster.cluster_ca_certificate,
77            'server': 'https://' + self.cluster.endpoint,
78        },
79        'name': self.cluster.short_path,
80    })
81    cfg['contexts'].append({
82        'context': {
83            'cluster': self.cluster.short_path,
84            'user': 'gcpdiag',
85        },
86        'name': kubecontext,
87    })
88
89    self.kubecontext = kubecontext
90
91    config_text = yaml.dump(cfg, default_flow_style=False)
92    with open(get_config_path(), 'w', encoding='UTF-8') as config_file:
93      config_file.write(config_text)
94      config_file.close()
95
96    return True

Add a new kubernetes context for kubectl plugin CLIs.

def kubectl_execute(self, command_list: list[str]):
 98  def kubectl_execute(self, command_list: list[str]):
 99    """ Execute a kubectl command.
100
101      Will take a list of strings which contains all the command and parameters to be executed
102      and return the stdout and stderr of the execution.
103    """
104    res = subprocess.run(command_list,
105                         check=False,
106                         capture_output=True,
107                         text=True)
108    return res.stdout, res.stderr

Execute a kubectl command.

Will take a list of strings which contains all the command and parameters to be executed and return the stdout and stderr of the execution.

def verify_auth(executor: KubectlExecutor) -> bool:
111def verify_auth(executor: KubectlExecutor) -> bool:
112  """ Verify the authentication for kubernetes by running kubeclt cluster-info.
113
114  Will raise a warning and return False if authentication failed.
115  """
116  _, stderr = executor.kubectl_execute([
117      'kubectl', 'cluster-info', '--kubeconfig',
118      get_config_path(), '--context', executor.kubecontext
119  ])
120  if stderr:
121    logging.warning('Failed to authenticate kubectl for cluster %s: %s',
122                    executor.cluster.short_path, stderr.strip('\n'))
123    return False
124  return True

Verify the authentication for kubernetes by running kubeclt cluster-info.

Will raise a warning and return False if authentication failed.

def check_gke_ingress(executor: KubectlExecutor):
127def check_gke_ingress(executor: KubectlExecutor):
128  return executor.kubectl_execute([
129      'kubectl', 'check-gke-ingress', '--kubeconfig',
130      get_config_path(), '--context', executor.kubecontext
131  ])
@functools.lru_cache()
def get_kubectl_executor(c: gcpdiag.queries.gke.Cluster):
134@functools.lru_cache()
135def get_kubectl_executor(c: gke.Cluster):
136  """ Create a kubectl_executor for a GKE cluster. """
137  executor = KubectlExecutor(cluster=c)
138  with executor.lock:
139    if not executor.make_kube_config():
140      return None
141  try:
142    if not verify_auth(executor):
143      logging.warning('Authentication failed for cluster %s', c.short_path)
144      return None
145  except FileNotFoundError as err:
146    logging.warning('Can not inspect Kubernetes resources: %s: %s',
147                    type(err).__name__, err)
148    return None
149  return executor

Create a kubectl_executor for a GKE cluster.

def clean_up():
152def clean_up():
153  """ Delete the kubeconfig file generated for gcpdiag. """
154  try:
155    os.remove(get_config_path())
156  except OSError as err:
157    logging.debug('Error cleaning up kubeconfig file used by gcpdiag: %s: %s',
158                  type(err).__name__, err)

Delete the kubeconfig file generated for gcpdiag.

def error_message(rule_name, kind, namespace, name, message) -> str:
161def error_message(rule_name, kind, namespace, name, message) -> str:
162  return f'Check rule {rule_name} on {kind} {namespace}/{name} failed: {message}\n'