gcpdiag.queries.composer

Queries related to Composer.
class Environment(gcpdiag.models.Resource):
 29class Environment(models.Resource):
 30  """ Represents Composer environment """
 31  _resource_data: dict
 32
 33  def __init__(self, project_id: str, resource_data: dict):
 34    super().__init__(project_id)
 35    self._resource_data = resource_data
 36    self.region, self.name = self.parse_full_path()
 37    self.version_pattern = re.compile(r'composer-(.*)-airflow-(.*)')
 38
 39  @property
 40  def num_schedulers(self) -> int:
 41    return get_path(self._resource_data,
 42                    ('config', 'workloadsConfig', 'scheduler', 'count'),
 43                    default=1)
 44
 45  @property
 46  def worker_cpu(self) -> float:
 47    return get_path(self._resource_data,
 48                    ('config', 'workloadsConfig', 'worker', 'cpu'))
 49
 50  @property
 51  def worker_memory_gb(self) -> float:
 52    return get_path(self._resource_data,
 53                    ('config', 'workloadsConfig', 'worker', 'memoryGb'))
 54
 55  @property
 56  def worker_max_count(self) -> int:
 57    return get_path(self._resource_data,
 58                    ('config', 'workloadsConfig', 'worker', 'maxCount'))
 59
 60  @property
 61  def worker_concurrency(self) -> float:
 62
 63    def default_value():
 64      airflow_version = self.airflow_version
 65
 66      if version.parse(airflow_version) < version.parse('2.3.3'):
 67        return 12 * self.worker_cpu
 68      else:
 69        return min(32, 12 * self.worker_cpu, 8 * self.worker_memory_gb)
 70
 71    return float(
 72        self.airflow_config_overrides.get('celery-worker_concurrency',
 73                                          default_value()))
 74
 75  @property
 76  def parallelism(self) -> float:
 77    return float(self.airflow_config_overrides.get('core-parallelism', 'inf'))
 78
 79  @property
 80  def composer_version(self) -> str:
 81    v = self.version_pattern.search(self.image_version)
 82    assert v is not None
 83    return v.group(1)
 84
 85  @property
 86  def airflow_version(self) -> str:
 87    v = self.version_pattern.search(self.image_version)
 88    assert v is not None
 89    return v.group(2)
 90
 91  @property
 92  def is_composer2(self) -> bool:
 93    return self.composer_version.startswith('2')
 94
 95  @property
 96  def full_path(self) -> str:
 97    return self._resource_data['name']
 98
 99  @property
100  def state(self) -> str:
101    return self._resource_data['state']
102
103  @property
104  def image_version(self) -> str:
105    return self._resource_data['config']['softwareConfig']['imageVersion']
106
107  @property
108  def short_path(self) -> str:
109    return f'{self.project_id}/{self.region}/{self.name}'
110
111  @property
112  def airflow_config_overrides(self) -> dict:
113    return self._resource_data['config']['softwareConfig'].get(
114        'airflowConfigOverrides', {})
115
116  @property
117  def service_account(self) -> str:
118    sa = self._resource_data['config']['nodeConfig'].get('serviceAccount')
119    if sa is None:
120      # serviceAccount is marked as optional in REST API docs
121      # using a default GCE SA as a fallback
122      project_nr = crm.get_project(self.project_id).number
123      sa = f'{project_nr}-compute@developer.gserviceaccount.com'
124    return sa
125
126  def parse_full_path(self) -> Tuple[str, str]:
127    match = re.match(r'projects/[^/]*/locations/([^/]*)/environments/([^/]*)',
128                     self.full_path)
129    if not match:
130      raise RuntimeError(f'Can\'t parse full_path {self.full_path}')
131    return match.group(1), match.group(2)
132
133  def __str__(self) -> str:
134    return self.short_path
135
136  def is_private_ip(self) -> bool:
137    return self._resource_data['config']['privateEnvironmentConfig'].get(
138        'enablePrivateEnvironment', False)
139
140  @property
141  def gke_cluster(self) -> str:
142    return self._resource_data['config']['gkeCluster']

Represents Composer environment

Environment(project_id: str, resource_data: dict)
33  def __init__(self, project_id: str, resource_data: dict):
34    super().__init__(project_id)
35    self._resource_data = resource_data
36    self.region, self.name = self.parse_full_path()
37    self.version_pattern = re.compile(r'composer-(.*)-airflow-(.*)')
version_pattern
num_schedulers: int
39  @property
40  def num_schedulers(self) -> int:
41    return get_path(self._resource_data,
42                    ('config', 'workloadsConfig', 'scheduler', 'count'),
43                    default=1)
worker_cpu: float
45  @property
46  def worker_cpu(self) -> float:
47    return get_path(self._resource_data,
48                    ('config', 'workloadsConfig', 'worker', 'cpu'))
worker_memory_gb: float
50  @property
51  def worker_memory_gb(self) -> float:
52    return get_path(self._resource_data,
53                    ('config', 'workloadsConfig', 'worker', 'memoryGb'))
worker_max_count: int
55  @property
56  def worker_max_count(self) -> int:
57    return get_path(self._resource_data,
58                    ('config', 'workloadsConfig', 'worker', 'maxCount'))
worker_concurrency: float
60  @property
61  def worker_concurrency(self) -> float:
62
63    def default_value():
64      airflow_version = self.airflow_version
65
66      if version.parse(airflow_version) < version.parse('2.3.3'):
67        return 12 * self.worker_cpu
68      else:
69        return min(32, 12 * self.worker_cpu, 8 * self.worker_memory_gb)
70
71    return float(
72        self.airflow_config_overrides.get('celery-worker_concurrency',
73                                          default_value()))
parallelism: float
75  @property
76  def parallelism(self) -> float:
77    return float(self.airflow_config_overrides.get('core-parallelism', 'inf'))
composer_version: str
79  @property
80  def composer_version(self) -> str:
81    v = self.version_pattern.search(self.image_version)
82    assert v is not None
83    return v.group(1)
airflow_version: str
85  @property
86  def airflow_version(self) -> str:
87    v = self.version_pattern.search(self.image_version)
88    assert v is not None
89    return v.group(2)
is_composer2: bool
91  @property
92  def is_composer2(self) -> bool:
93    return self.composer_version.startswith('2')
full_path: str
95  @property
96  def full_path(self) -> str:
97    return self._resource_data['name']

Returns the full path of this resource.

Example: 'projects/gcpdiag-gke-1-9b90/zones/europe-west4-a/clusters/gke1'

state: str
 99  @property
100  def state(self) -> str:
101    return self._resource_data['state']
image_version: str
103  @property
104  def image_version(self) -> str:
105    return self._resource_data['config']['softwareConfig']['imageVersion']
short_path: str
107  @property
108  def short_path(self) -> str:
109    return f'{self.project_id}/{self.region}/{self.name}'

Returns the short name for this resource.

Note that it isn't clear from this name what kind of resource it is.

Example: 'gke1'

airflow_config_overrides: dict
111  @property
112  def airflow_config_overrides(self) -> dict:
113    return self._resource_data['config']['softwareConfig'].get(
114        'airflowConfigOverrides', {})
service_account: str
116  @property
117  def service_account(self) -> str:
118    sa = self._resource_data['config']['nodeConfig'].get('serviceAccount')
119    if sa is None:
120      # serviceAccount is marked as optional in REST API docs
121      # using a default GCE SA as a fallback
122      project_nr = crm.get_project(self.project_id).number
123      sa = f'{project_nr}-compute@developer.gserviceaccount.com'
124    return sa
def parse_full_path(self) -> Tuple[str, str]:
126  def parse_full_path(self) -> Tuple[str, str]:
127    match = re.match(r'projects/[^/]*/locations/([^/]*)/environments/([^/]*)',
128                     self.full_path)
129    if not match:
130      raise RuntimeError(f'Can\'t parse full_path {self.full_path}')
131    return match.group(1), match.group(2)
def is_private_ip(self) -> bool:
136  def is_private_ip(self) -> bool:
137    return self._resource_data['config']['privateEnvironmentConfig'].get(
138        'enablePrivateEnvironment', False)
gke_cluster: str
140  @property
141  def gke_cluster(self) -> str:
142    return self._resource_data['config']['gkeCluster']
COMPOSER_REGIONS = ['asia-northeast2', 'us-central1', 'northamerica-northeast1', 'us-west3', 'southamerica-east1', 'us-east1', 'asia-northeast1', 'europe-west1', 'europe-west2', 'asia-northeast3', 'us-west4', 'asia-east2', 'europe-central2', 'europe-west6', 'us-west2', 'australia-southeast1', 'europe-west3', 'asia-south1', 'us-west1', 'us-east4', 'asia-southeast1']
@caching.cached_api_call
def get_environments( context: gcpdiag.models.Context) -> Iterable[Environment]:
172@caching.cached_api_call
173def get_environments(context: models.Context) -> Iterable[Environment]:
174  environments: List[Environment] = []
175  if not apis.is_enabled(context.project_id, 'composer'):
176    return environments
177  api = apis.get_api('composer', 'v1', context.project_id)
178
179  for env in _query_regions_envs(COMPOSER_REGIONS, api, context.project_id):
180    # projects/{projectId}/locations/{locationId}/environments/{environmentId}.
181    result = re.match(r'projects/[^/]+/locations/([^/]+)/environments/([^/]+)',
182                      env['name'])
183    if not result:
184      logging.error('invalid composer name: %s', env['name'])
185      continue
186    location = result.group(1)
187    labels = env.get('labels', {})
188    name = result.group(2)
189    if not context.match_project_resource(
190        location=location, labels=labels, resource=name):
191      continue
192
193    environments.append(Environment(context.project_id, env))
194  return environments