gcpdiag.queries.composer
Queries related to Composer.
class
Environment(gcpdiag.models.Resource):
29class Environment(models.Resource): 30 """ Represents Composer environment """ 31 _resource_data: dict 32 33 def __init__(self, project_id: str, resource_data: dict): 34 super().__init__(project_id) 35 self._resource_data = resource_data 36 self.region, self.name = self.parse_full_path() 37 self.version_pattern = re.compile(r'composer-(.*)-airflow-(.*)') 38 39 @property 40 def num_schedulers(self) -> int: 41 return get_path(self._resource_data, 42 ('config', 'workloadsConfig', 'scheduler', 'count'), 43 default=1) 44 45 @property 46 def worker_cpu(self) -> float: 47 return get_path(self._resource_data, 48 ('config', 'workloadsConfig', 'worker', 'cpu')) 49 50 @property 51 def worker_memory_gb(self) -> float: 52 return get_path(self._resource_data, 53 ('config', 'workloadsConfig', 'worker', 'memoryGb')) 54 55 @property 56 def worker_max_count(self) -> int: 57 return get_path(self._resource_data, 58 ('config', 'workloadsConfig', 'worker', 'maxCount')) 59 60 @property 61 def worker_concurrency(self) -> float: 62 63 def default_value(): 64 airflow_version = self.airflow_version 65 66 if version.parse(airflow_version) < version.parse('2.3.3'): 67 return 12 * self.worker_cpu 68 else: 69 return min(32, 12 * self.worker_cpu, 8 * self.worker_memory_gb) 70 71 return float( 72 self.airflow_config_overrides.get('celery-worker_concurrency', 73 default_value())) 74 75 @property 76 def parallelism(self) -> float: 77 return float(self.airflow_config_overrides.get('core-parallelism', 'inf')) 78 79 @property 80 def composer_version(self) -> str: 81 v = self.version_pattern.search(self.image_version) 82 assert v is not None 83 return v.group(1) 84 85 @property 86 def airflow_version(self) -> str: 87 v = self.version_pattern.search(self.image_version) 88 assert v is not None 89 return v.group(2) 90 91 @property 92 def is_composer2(self) -> bool: 93 return self.composer_version.startswith('2') 94 95 @property 96 def full_path(self) -> str: 97 return self._resource_data['name'] 98 99 @property 100 def state(self) -> str: 101 return self._resource_data['state'] 102 103 @property 104 def image_version(self) -> str: 105 return self._resource_data['config']['softwareConfig']['imageVersion'] 106 107 @property 108 def short_path(self) -> str: 109 return f'{self.project_id}/{self.region}/{self.name}' 110 111 @property 112 def airflow_config_overrides(self) -> dict: 113 return self._resource_data['config']['softwareConfig'].get( 114 'airflowConfigOverrides', {}) 115 116 @property 117 def service_account(self) -> str: 118 sa = self._resource_data['config']['nodeConfig'].get('serviceAccount') 119 if sa is None: 120 # serviceAccount is marked as optional in REST API docs 121 # using a default GCE SA as a fallback 122 project_nr = crm.get_project(self.project_id).number 123 sa = f'{project_nr}-compute@developer.gserviceaccount.com' 124 return sa 125 126 def parse_full_path(self) -> Tuple[str, str]: 127 match = re.match(r'projects/[^/]*/locations/([^/]*)/environments/([^/]*)', 128 self.full_path) 129 if not match: 130 raise RuntimeError(f'Can\'t parse full_path {self.full_path}') 131 return match.group(1), match.group(2) 132 133 def __str__(self) -> str: 134 return self.short_path 135 136 def is_private_ip(self) -> bool: 137 return self._resource_data['config']['privateEnvironmentConfig'].get( 138 'enablePrivateEnvironment', False) 139 140 @property 141 def gke_cluster(self) -> str: 142 return self._resource_data['config']['gkeCluster']
Represents Composer environment
worker_concurrency: float
60 @property 61 def worker_concurrency(self) -> float: 62 63 def default_value(): 64 airflow_version = self.airflow_version 65 66 if version.parse(airflow_version) < version.parse('2.3.3'): 67 return 12 * self.worker_cpu 68 else: 69 return min(32, 12 * self.worker_cpu, 8 * self.worker_memory_gb) 70 71 return float( 72 self.airflow_config_overrides.get('celery-worker_concurrency', 73 default_value()))
full_path: str
Returns the full path of this resource.
Example: 'projects/gcpdiag-gke-1-9b90/zones/europe-west4-a/clusters/gke1'
short_path: str
107 @property 108 def short_path(self) -> str: 109 return f'{self.project_id}/{self.region}/{self.name}'
Returns the short name for this resource.
Note that it isn't clear from this name what kind of resource it is.
Example: 'gke1'
service_account: str
116 @property 117 def service_account(self) -> str: 118 sa = self._resource_data['config']['nodeConfig'].get('serviceAccount') 119 if sa is None: 120 # serviceAccount is marked as optional in REST API docs 121 # using a default GCE SA as a fallback 122 project_nr = crm.get_project(self.project_id).number 123 sa = f'{project_nr}-compute@developer.gserviceaccount.com' 124 return sa
COMPOSER_REGIONS =
['asia-northeast2', 'us-central1', 'northamerica-northeast1', 'us-west3', 'southamerica-east1', 'us-east1', 'asia-northeast1', 'europe-west1', 'europe-west2', 'asia-northeast3', 'us-west4', 'asia-east2', 'europe-central2', 'europe-west6', 'us-west2', 'australia-southeast1', 'europe-west3', 'asia-south1', 'us-west1', 'us-east4', 'asia-southeast1']
@caching.cached_api_call
def
get_environments( context: gcpdiag.models.Context) -> Iterable[Environment]:
172@caching.cached_api_call 173def get_environments(context: models.Context) -> Iterable[Environment]: 174 environments: List[Environment] = [] 175 if not apis.is_enabled(context.project_id, 'composer'): 176 return environments 177 api = apis.get_api('composer', 'v1', context.project_id) 178 179 for env in _query_regions_envs(COMPOSER_REGIONS, api, context.project_id): 180 # projects/{projectId}/locations/{locationId}/environments/{environmentId}. 181 result = re.match(r'projects/[^/]+/locations/([^/]+)/environments/([^/]+)', 182 env['name']) 183 if not result: 184 logging.error('invalid composer name: %s', env['name']) 185 continue 186 location = result.group(1) 187 labels = env.get('labels', {}) 188 name = result.group(2) 189 if not context.match_project_resource( 190 location=location, labels=labels, resource=name): 191 continue 192 193 environments.append(Environment(context.project_id, env)) 194 return environments