gcpdiag.queries.kubectl
Queries related to Kubectl plugins.
def
get_config_path():
class
KubectlExecutor:
35class KubectlExecutor: 36 """Represents a kubectl executor.""" 37 38 lock: threading.Lock 39 40 def __init__(self, cluster: gke.Cluster): 41 self.cluster = cluster 42 self.lock = threading.Lock() 43 44 def make_kube_config(self) -> bool: 45 """Add a new kubernetes context for kubectl plugin CLIs.""" 46 47 cfg: dict = {} 48 if not os.path.isfile(get_config_path()): 49 cfg['apiVersion'] = 'v1' 50 cfg['users'] = [{ 51 'name': 'gcpdiag', 52 'user': { 53 'exec': { 54 'apiVersion': 'client.authentication.k8s.io/v1beta1', 55 'command': 'gke-gcloud-auth-plugin', 56 'installHint': 'x', 57 'provideClusterInfo': True, 58 }, 59 }, 60 }] 61 cfg['clusters'] = [] 62 cfg['contexts'] = [] 63 else: 64 with open(get_config_path(), encoding='UTF-8') as f: 65 cfg = yaml.safe_load(f) 66 67 if self.cluster.endpoint is None: 68 logging.warning('No kubernetes API server endpoint found for cluster %s', 69 self.cluster.short_path) 70 return False 71 72 kubecontext = 'gcpdiag-ctx-' + self.cluster.name 73 74 cfg['clusters'].append({ 75 'cluster': { 76 'certificate-authority-data': self.cluster.cluster_ca_certificate, 77 'server': 'https://' + self.cluster.endpoint, 78 }, 79 'name': self.cluster.short_path, 80 }) 81 cfg['contexts'].append({ 82 'context': { 83 'cluster': self.cluster.short_path, 84 'user': 'gcpdiag', 85 }, 86 'name': kubecontext, 87 }) 88 89 self.kubecontext = kubecontext 90 91 config_text = yaml.dump(cfg, default_flow_style=False) 92 with open(get_config_path(), 'w', encoding='UTF-8') as config_file: 93 config_file.write(config_text) 94 config_file.close() 95 96 return True 97 98 def kubectl_execute(self, command_list: list[str]): 99 """ Execute a kubectl command. 100 101 Will take a list of strings which contains all the command and parameters to be executed 102 and return the stdout and stderr of the execution. 103 """ 104 res = subprocess.run(command_list, 105 check=False, 106 capture_output=True, 107 text=True) 108 return res.stdout, res.stderr
Represents a kubectl executor.
KubectlExecutor(cluster: gcpdiag.queries.gke.Cluster)
def
make_kube_config(self) -> bool:
44 def make_kube_config(self) -> bool: 45 """Add a new kubernetes context for kubectl plugin CLIs.""" 46 47 cfg: dict = {} 48 if not os.path.isfile(get_config_path()): 49 cfg['apiVersion'] = 'v1' 50 cfg['users'] = [{ 51 'name': 'gcpdiag', 52 'user': { 53 'exec': { 54 'apiVersion': 'client.authentication.k8s.io/v1beta1', 55 'command': 'gke-gcloud-auth-plugin', 56 'installHint': 'x', 57 'provideClusterInfo': True, 58 }, 59 }, 60 }] 61 cfg['clusters'] = [] 62 cfg['contexts'] = [] 63 else: 64 with open(get_config_path(), encoding='UTF-8') as f: 65 cfg = yaml.safe_load(f) 66 67 if self.cluster.endpoint is None: 68 logging.warning('No kubernetes API server endpoint found for cluster %s', 69 self.cluster.short_path) 70 return False 71 72 kubecontext = 'gcpdiag-ctx-' + self.cluster.name 73 74 cfg['clusters'].append({ 75 'cluster': { 76 'certificate-authority-data': self.cluster.cluster_ca_certificate, 77 'server': 'https://' + self.cluster.endpoint, 78 }, 79 'name': self.cluster.short_path, 80 }) 81 cfg['contexts'].append({ 82 'context': { 83 'cluster': self.cluster.short_path, 84 'user': 'gcpdiag', 85 }, 86 'name': kubecontext, 87 }) 88 89 self.kubecontext = kubecontext 90 91 config_text = yaml.dump(cfg, default_flow_style=False) 92 with open(get_config_path(), 'w', encoding='UTF-8') as config_file: 93 config_file.write(config_text) 94 config_file.close() 95 96 return True
Add a new kubernetes context for kubectl plugin CLIs.
def
kubectl_execute(self, command_list: list[str]):
98 def kubectl_execute(self, command_list: list[str]): 99 """ Execute a kubectl command. 100 101 Will take a list of strings which contains all the command and parameters to be executed 102 and return the stdout and stderr of the execution. 103 """ 104 res = subprocess.run(command_list, 105 check=False, 106 capture_output=True, 107 text=True) 108 return res.stdout, res.stderr
Execute a kubectl command.
Will take a list of strings which contains all the command and parameters to be executed and return the stdout and stderr of the execution.
111def verify_auth(executor: KubectlExecutor) -> bool: 112 """ Verify the authentication for kubernetes by running kubeclt cluster-info. 113 114 Will raise a warning and return False if authentication failed. 115 """ 116 _, stderr = executor.kubectl_execute([ 117 'kubectl', 'cluster-info', '--kubeconfig', 118 get_config_path(), '--context', executor.kubecontext 119 ]) 120 if stderr: 121 logging.warning('Failed to authenticate kubectl for cluster %s: %s', 122 executor.cluster.short_path, stderr.strip('\n')) 123 return False 124 return True
Verify the authentication for kubernetes by running kubeclt cluster-info.
Will raise a warning and return False if authentication failed.
134@functools.lru_cache() 135def get_kubectl_executor(c: gke.Cluster): 136 """ Create a kubectl_executor for a GKE cluster. """ 137 executor = KubectlExecutor(cluster=c) 138 with executor.lock: 139 if not executor.make_kube_config(): 140 return None 141 try: 142 if not verify_auth(executor): 143 logging.warning('Authentication failed for cluster %s', c.short_path) 144 return None 145 except FileNotFoundError as err: 146 logging.warning('Can not inspect Kubernetes resources: %s: %s', 147 type(err).__name__, err) 148 return None 149 return executor
Create a kubectl_executor for a GKE cluster.
def
clean_up():
152def clean_up(): 153 """ Delete the kubeconfig file generated for gcpdiag. """ 154 try: 155 os.remove(get_config_path()) 156 except OSError as err: 157 logging.debug('Error cleaning up kubeconfig file used by gcpdiag: %s: %s', 158 type(err).__name__, err)
Delete the kubeconfig file generated for gcpdiag.
def
error_message(rule_name, kind, namespace, name, message) -> str: