gcpdiag.queries.dataproc

Queries related to Dataproc.
class Cluster(gcpdiag.models.Resource):
 29class Cluster(models.Resource):
 30  """Represents Dataproc Cluster"""
 31
 32  name: str
 33  _resource_data: Mapping
 34
 35  def __init__(self, name: str, project_id: str, resource_data: Mapping):
 36    super().__init__(project_id)
 37    self.name = name
 38    self._resource_data = resource_data
 39
 40  def is_running(self) -> bool:
 41    return self.status == 'RUNNING'
 42
 43  def get_software_property(self, property_name) -> str:
 44    return self._resource_data['config']['softwareConfig']['properties'].get(
 45        property_name)
 46
 47  def is_stackdriver_logging_enabled(self) -> bool:
 48    # Unless overridden during create,
 49    # properties with default values are not returned,
 50    # therefore get_software_property should only return when its false
 51    return (not self.get_software_property(
 52        'dataproc:dataproc.logging.stackdriver.enable') == 'false')
 53
 54  def is_stackdriver_monitoring_enabled(self) -> bool:
 55    return (self.get_software_property(
 56        'dataproc:dataproc.monitoring.stackdriver.enable') == 'true')
 57
 58  @property
 59  def region(self) -> str:
 60    """biggest regions have a trailing '-d' at most in its zoneUri
 61
 62    https://www.googleapis.com/compute/v1/projects/dataproc1/zones/us-central1-d
 63    """
 64    return self._resource_data['config']['gceClusterConfig']['zoneUri'].split(
 65        '/')[-1][0:-2]
 66
 67  @property
 68  def zone(self) -> Optional[str]:
 69    zone = (self._resource_data.get('config', {}).get('gceClusterConfig',
 70                                                      {}).get('zoneUri'))
 71    if zone:
 72      m = re.search(r'/zones/([^/]+)$', zone)
 73      if m:
 74        return m.group(1)
 75    raise RuntimeError(f"can't determine zone for cluster {self.name}")
 76
 77  @property
 78  def full_path(self) -> str:
 79    return (
 80        f'projects/{self.project_id}/regions/{self.region}/clusters/{self.name}'
 81    )
 82
 83  @property
 84  def short_path(self) -> str:
 85    return f'{self.project_id}/{self.region}/{self.name}'
 86
 87  @property
 88  def status(self) -> str:
 89    return self._resource_data['status']['state']
 90
 91  def __str__(self) -> str:
 92    return self.short_path
 93
 94  @property
 95  def cluster_uuid(self) -> str:
 96    return self._resource_data['clusterUuid']
 97
 98  @property
 99  def image_version(self):
100    return self._resource_data['config']['softwareConfig']['imageVersion']
101
102  @property
103  def vm_service_account_email(self):
104    sa = self._resource_data['config']['gceClusterConfig'].get('serviceAccount')
105    if sa is None:
106      sa = crm.get_project(self.project_id).default_compute_service_account
107    return sa
108
109  @property
110  def is_custom_gcs_connector(self) -> bool:
111    return bool(
112        self._resource_data.get('config', {}).get('gceClusterConfig', {}).get(
113            'metadata', {}).get('GCS_CONNECTOR_VERSION'))
114
115  @property
116  def cluster_provided_bq_connector(self):
117    """Check user-supplied BigQuery connector on the cluster level"""
118    bigquery_connector = (self._resource_data.get('config', {}).get(
119        'gceClusterConfig', {}).get('metadata',
120                                    {}).get('SPARK_BQ_CONNECTOR_VERSION'))
121    if not bigquery_connector:
122      bigquery_connector = (self._resource_data.get('config', {}).get(
123          'gceClusterConfig', {}).get('metadata',
124                                      {}).get('SPARK_BQ_CONNECTOR_URL'))
125      if bigquery_connector:
126        if bigquery_connector == 'spark-bigquery-latest.jar':
127          return 'spark-bigquery-latest'
128        else:
129          match = re.search(
130              r'spark-bigquery(?:-with-dependencies_\d+\.\d+)?-(\d+\.\d+\.\d+)\.jar',
131              bigquery_connector)
132          if match:
133            return match.group(1)
134      # If returns None, it means that the cluster is using the default,
135      # pre-installed BQ connector for the image version
136      return bigquery_connector
137
138  @property
139  def is_gce_cluster(self) -> bool:
140    return bool(self._resource_data.get('config', {}).get('gceClusterConfig'))
141
142  @property
143  def gce_network_uri(self) -> Optional[str]:
144    """Get network uri from cluster network or subnetwork"""
145    if not self.is_gce_cluster:
146      raise RuntimeError(
147          'Can not return network URI for a Dataproc on GKE cluster')
148    network_uri = (self._resource_data.get('config',
149                                           {}).get('gceClusterConfig',
150                                                   {}).get('networkUri'))
151    if not network_uri:
152      subnetwork_uri = (self._resource_data.get('config', {}).get(
153          'gceClusterConfig', {}).get('subnetworkUri'))
154      network_uri = network.get_subnetwork_from_url(subnetwork_uri).network
155    return network_uri
156
157  @property
158  def gce_subnetwork_uri(self) -> Optional[str]:
159    """Get subnetwork uri from cluster subnetwork."""
160    if not self.is_gce_cluster:
161      raise RuntimeError(
162          'Can not return subnetwork URI for a Dataproc on GKE cluster')
163    subnetwork_uri = (self._resource_data.get('config',
164                                              {}).get('gceClusterConfig',
165                                                      {}).get('subnetworkUri'))
166    if not subnetwork_uri:
167      subnetwork_uri = ('https://www.googleapis.com/compute/v1/projects/' +
168                        self.project_id + '/regions/' + self.region +
169                        '/subnetworks/default')
170    return subnetwork_uri
171
172  @property
173  def is_single_node_cluster(self) -> bool:
174    workers = (self._resource_data.get('config',
175                                       {}).get('workerConfig',
176                                               {}).get('numInstances', 0))
177    return workers == 0
178
179  @property
180  def is_ha_cluster(self) -> bool:
181    masters = (self._resource_data.get('config',
182                                       {}).get('masterConfig',
183                                               {}).get('numInstances', 1))
184    return masters != 1
185
186  @property
187  def is_internal_ip_only(self) -> bool:
188    # internalIpOnly is set to true by default when creating a
189    # Dataproc 2.2 image version cluster.
190    # The default should be false in older versions instead.
191    internal_ip_only = self._resource_data['config']['gceClusterConfig'][
192        'internalIpOnly']
193    return internal_ip_only
194
195  @property
196  def has_autoscaling_policy(self) -> bool:
197    """Checks if an autoscaling policy is configured for the cluster."""
198    return bool(self._resource_data['config'].get('autoscalingConfig', {}))
199
200  @property
201  def autoscaling_policy_id(self) -> str:
202    """Returns the autoscaling policy ID for the cluster."""
203    if self.has_autoscaling_policy:
204      return (self._resource_data['config'].get('autoscalingConfig',
205                                                {}).get('policyUri',
206                                                        '').split('/')[-1])
207    else:
208      return ''
209
210  @property
211  def number_of_primary_workers(self) -> float:
212    """Gets the number of primary worker nodes in the cluster."""
213    return (self._resource_data['config'].get('workerConfig',
214                                              {}).get('numInstances', 0))
215
216  @property
217  def number_of_secondary_workers(self) -> float:
218    """Gets the number of secondary worker nodes in the cluster."""
219    return (self._resource_data['config'].get('secondaryWorkerConfig',
220                                              {}).get('numInstances', 0))
221
222  @property
223  def is_preemptible_primary_workers(self) -> bool:
224    """Checks if the primary worker nodes in the cluster are preemptible."""
225    return (self._resource_data['config'].get('workerConfig',
226                                              {}).get('isPreemptible', False))
227
228  @property
229  def is_preemptible_secondary_workers(self) -> bool:
230    """Checks if the secondary worker nodes in the cluster are preemptible."""
231    return (self._resource_data['config'].get('secondaryWorkerConfig',
232                                              {}).get('isPreemptible', False))
233
234  @property
235  def initialization_actions(self) -> List[str]:
236    return self._resource_data['config'].get('initializationActions', [])

Represents Dataproc Cluster

Cluster(name: str, project_id: str, resource_data: Mapping)
35  def __init__(self, name: str, project_id: str, resource_data: Mapping):
36    super().__init__(project_id)
37    self.name = name
38    self._resource_data = resource_data
name: str
def is_running(self) -> bool:
40  def is_running(self) -> bool:
41    return self.status == 'RUNNING'
def get_software_property(self, property_name) -> str:
43  def get_software_property(self, property_name) -> str:
44    return self._resource_data['config']['softwareConfig']['properties'].get(
45        property_name)
def is_stackdriver_logging_enabled(self) -> bool:
47  def is_stackdriver_logging_enabled(self) -> bool:
48    # Unless overridden during create,
49    # properties with default values are not returned,
50    # therefore get_software_property should only return when its false
51    return (not self.get_software_property(
52        'dataproc:dataproc.logging.stackdriver.enable') == 'false')
def is_stackdriver_monitoring_enabled(self) -> bool:
54  def is_stackdriver_monitoring_enabled(self) -> bool:
55    return (self.get_software_property(
56        'dataproc:dataproc.monitoring.stackdriver.enable') == 'true')
region: str
58  @property
59  def region(self) -> str:
60    """biggest regions have a trailing '-d' at most in its zoneUri
61
62    https://www.googleapis.com/compute/v1/projects/dataproc1/zones/us-central1-d
63    """
64    return self._resource_data['config']['gceClusterConfig']['zoneUri'].split(
65        '/')[-1][0:-2]

biggest regions have a trailing '-d' at most in its zoneUri

https://www.googleapis.com/compute/v1/projects/dataproc1/zones/us-central1-d

zone: Optional[str]
67  @property
68  def zone(self) -> Optional[str]:
69    zone = (self._resource_data.get('config', {}).get('gceClusterConfig',
70                                                      {}).get('zoneUri'))
71    if zone:
72      m = re.search(r'/zones/([^/]+)$', zone)
73      if m:
74        return m.group(1)
75    raise RuntimeError(f"can't determine zone for cluster {self.name}")
full_path: str
77  @property
78  def full_path(self) -> str:
79    return (
80        f'projects/{self.project_id}/regions/{self.region}/clusters/{self.name}'
81    )

Returns the full path of this resource.

Example: 'projects/gcpdiag-gke-1-9b90/zones/europe-west4-a/clusters/gke1'

short_path: str
83  @property
84  def short_path(self) -> str:
85    return f'{self.project_id}/{self.region}/{self.name}'

Returns the short name for this resource.

Note that it isn't clear from this name what kind of resource it is.

Example: 'gke1'

status: str
87  @property
88  def status(self) -> str:
89    return self._resource_data['status']['state']
cluster_uuid: str
94  @property
95  def cluster_uuid(self) -> str:
96    return self._resource_data['clusterUuid']
image_version
 98  @property
 99  def image_version(self):
100    return self._resource_data['config']['softwareConfig']['imageVersion']
vm_service_account_email
102  @property
103  def vm_service_account_email(self):
104    sa = self._resource_data['config']['gceClusterConfig'].get('serviceAccount')
105    if sa is None:
106      sa = crm.get_project(self.project_id).default_compute_service_account
107    return sa
is_custom_gcs_connector: bool
109  @property
110  def is_custom_gcs_connector(self) -> bool:
111    return bool(
112        self._resource_data.get('config', {}).get('gceClusterConfig', {}).get(
113            'metadata', {}).get('GCS_CONNECTOR_VERSION'))
cluster_provided_bq_connector
115  @property
116  def cluster_provided_bq_connector(self):
117    """Check user-supplied BigQuery connector on the cluster level"""
118    bigquery_connector = (self._resource_data.get('config', {}).get(
119        'gceClusterConfig', {}).get('metadata',
120                                    {}).get('SPARK_BQ_CONNECTOR_VERSION'))
121    if not bigquery_connector:
122      bigquery_connector = (self._resource_data.get('config', {}).get(
123          'gceClusterConfig', {}).get('metadata',
124                                      {}).get('SPARK_BQ_CONNECTOR_URL'))
125      if bigquery_connector:
126        if bigquery_connector == 'spark-bigquery-latest.jar':
127          return 'spark-bigquery-latest'
128        else:
129          match = re.search(
130              r'spark-bigquery(?:-with-dependencies_\d+\.\d+)?-(\d+\.\d+\.\d+)\.jar',
131              bigquery_connector)
132          if match:
133            return match.group(1)
134      # If returns None, it means that the cluster is using the default,
135      # pre-installed BQ connector for the image version
136      return bigquery_connector

Check user-supplied BigQuery connector on the cluster level

is_gce_cluster: bool
138  @property
139  def is_gce_cluster(self) -> bool:
140    return bool(self._resource_data.get('config', {}).get('gceClusterConfig'))
gce_network_uri: Optional[str]
142  @property
143  def gce_network_uri(self) -> Optional[str]:
144    """Get network uri from cluster network or subnetwork"""
145    if not self.is_gce_cluster:
146      raise RuntimeError(
147          'Can not return network URI for a Dataproc on GKE cluster')
148    network_uri = (self._resource_data.get('config',
149                                           {}).get('gceClusterConfig',
150                                                   {}).get('networkUri'))
151    if not network_uri:
152      subnetwork_uri = (self._resource_data.get('config', {}).get(
153          'gceClusterConfig', {}).get('subnetworkUri'))
154      network_uri = network.get_subnetwork_from_url(subnetwork_uri).network
155    return network_uri

Get network uri from cluster network or subnetwork

gce_subnetwork_uri: Optional[str]
157  @property
158  def gce_subnetwork_uri(self) -> Optional[str]:
159    """Get subnetwork uri from cluster subnetwork."""
160    if not self.is_gce_cluster:
161      raise RuntimeError(
162          'Can not return subnetwork URI for a Dataproc on GKE cluster')
163    subnetwork_uri = (self._resource_data.get('config',
164                                              {}).get('gceClusterConfig',
165                                                      {}).get('subnetworkUri'))
166    if not subnetwork_uri:
167      subnetwork_uri = ('https://www.googleapis.com/compute/v1/projects/' +
168                        self.project_id + '/regions/' + self.region +
169                        '/subnetworks/default')
170    return subnetwork_uri

Get subnetwork uri from cluster subnetwork.

is_single_node_cluster: bool
172  @property
173  def is_single_node_cluster(self) -> bool:
174    workers = (self._resource_data.get('config',
175                                       {}).get('workerConfig',
176                                               {}).get('numInstances', 0))
177    return workers == 0
is_ha_cluster: bool
179  @property
180  def is_ha_cluster(self) -> bool:
181    masters = (self._resource_data.get('config',
182                                       {}).get('masterConfig',
183                                               {}).get('numInstances', 1))
184    return masters != 1
is_internal_ip_only: bool
186  @property
187  def is_internal_ip_only(self) -> bool:
188    # internalIpOnly is set to true by default when creating a
189    # Dataproc 2.2 image version cluster.
190    # The default should be false in older versions instead.
191    internal_ip_only = self._resource_data['config']['gceClusterConfig'][
192        'internalIpOnly']
193    return internal_ip_only
has_autoscaling_policy: bool
195  @property
196  def has_autoscaling_policy(self) -> bool:
197    """Checks if an autoscaling policy is configured for the cluster."""
198    return bool(self._resource_data['config'].get('autoscalingConfig', {}))

Checks if an autoscaling policy is configured for the cluster.

autoscaling_policy_id: str
200  @property
201  def autoscaling_policy_id(self) -> str:
202    """Returns the autoscaling policy ID for the cluster."""
203    if self.has_autoscaling_policy:
204      return (self._resource_data['config'].get('autoscalingConfig',
205                                                {}).get('policyUri',
206                                                        '').split('/')[-1])
207    else:
208      return ''

Returns the autoscaling policy ID for the cluster.

number_of_primary_workers: float
210  @property
211  def number_of_primary_workers(self) -> float:
212    """Gets the number of primary worker nodes in the cluster."""
213    return (self._resource_data['config'].get('workerConfig',
214                                              {}).get('numInstances', 0))

Gets the number of primary worker nodes in the cluster.

number_of_secondary_workers: float
216  @property
217  def number_of_secondary_workers(self) -> float:
218    """Gets the number of secondary worker nodes in the cluster."""
219    return (self._resource_data['config'].get('secondaryWorkerConfig',
220                                              {}).get('numInstances', 0))

Gets the number of secondary worker nodes in the cluster.

is_preemptible_primary_workers: bool
222  @property
223  def is_preemptible_primary_workers(self) -> bool:
224    """Checks if the primary worker nodes in the cluster are preemptible."""
225    return (self._resource_data['config'].get('workerConfig',
226                                              {}).get('isPreemptible', False))

Checks if the primary worker nodes in the cluster are preemptible.

is_preemptible_secondary_workers: bool
228  @property
229  def is_preemptible_secondary_workers(self) -> bool:
230    """Checks if the secondary worker nodes in the cluster are preemptible."""
231    return (self._resource_data['config'].get('secondaryWorkerConfig',
232                                              {}).get('isPreemptible', False))

Checks if the secondary worker nodes in the cluster are preemptible.

initialization_actions: List[str]
234  @property
235  def initialization_actions(self) -> List[str]:
236    return self._resource_data['config'].get('initializationActions', [])
class Region:
239class Region:
240  """Represents Dataproc region"""
241
242  project_id: str
243  region: str
244
245  def __init__(self, project_id: str, region: str):
246    self.project_id = project_id
247    self.region = region
248
249  def get_clusters(self, context: models.Context) -> Iterable[Cluster]:
250    clusters = []
251    for cluster in self.query_api():
252      if not context.match_project_resource(resource=cluster.get('clusterName'),
253                                            labels=cluster.get('labels', {})):
254        continue
255      c = Cluster(
256          name=cluster['clusterName'],
257          project_id=self.project_id,
258          resource_data=cluster,
259      )
260      clusters.append(c)
261    return clusters
262
263  def query_api(self) -> Iterable[dict]:
264    try:
265      api = apis.get_api('dataproc', 'v1', self.project_id)
266      query = (api.projects().regions().clusters().list(
267          projectId=self.project_id, region=self.region))
268      # be careful not to retry too many times because querying all regions
269      # sometimes causes requests to fail permanently
270      resp = query.execute(num_retries=1)
271      return resp.get('clusters', [])
272    except googleapiclient.errors.HttpError as err:
273      # b/371526148 investigate permission denied error
274      logging.error(err)
275      return []
276      # raise utils.GcpApiError(err) from err

Represents Dataproc region

Region(project_id: str, region: str)
245  def __init__(self, project_id: str, region: str):
246    self.project_id = project_id
247    self.region = region
project_id: str
region: str
def get_clusters( self, context: gcpdiag.models.Context) -> Iterable[Cluster]:
249  def get_clusters(self, context: models.Context) -> Iterable[Cluster]:
250    clusters = []
251    for cluster in self.query_api():
252      if not context.match_project_resource(resource=cluster.get('clusterName'),
253                                            labels=cluster.get('labels', {})):
254        continue
255      c = Cluster(
256          name=cluster['clusterName'],
257          project_id=self.project_id,
258          resource_data=cluster,
259      )
260      clusters.append(c)
261    return clusters
def query_api(self) -> Iterable[dict]:
263  def query_api(self) -> Iterable[dict]:
264    try:
265      api = apis.get_api('dataproc', 'v1', self.project_id)
266      query = (api.projects().regions().clusters().list(
267          projectId=self.project_id, region=self.region))
268      # be careful not to retry too many times because querying all regions
269      # sometimes causes requests to fail permanently
270      resp = query.execute(num_retries=1)
271      return resp.get('clusters', [])
272    except googleapiclient.errors.HttpError as err:
273      # b/371526148 investigate permission denied error
274      logging.error(err)
275      return []
276      # raise utils.GcpApiError(err) from err
class Dataproc:
279class Dataproc:
280  """Represents Dataproc product"""
281
282  project_id: str
283
284  def __init__(self, project_id: str):
285    self.project_id = project_id
286
287  def get_regions(self) -> Iterable[Region]:
288    return [
289        Region(self.project_id, r.name)
290        for r in gce.get_all_regions(self.project_id)
291    ]
292
293  def is_api_enabled(self) -> bool:
294    return apis.is_enabled(self.project_id, 'dataproc')

Represents Dataproc product

Dataproc(project_id: str)
284  def __init__(self, project_id: str):
285    self.project_id = project_id
project_id: str
def get_regions(self) -> Iterable[Region]:
287  def get_regions(self) -> Iterable[Region]:
288    return [
289        Region(self.project_id, r.name)
290        for r in gce.get_all_regions(self.project_id)
291    ]
def is_api_enabled(self) -> bool:
293  def is_api_enabled(self) -> bool:
294    return apis.is_enabled(self.project_id, 'dataproc')
@caching.cached_api_call
def get_clusters( context: gcpdiag.models.Context) -> Iterable[Cluster]:
297@caching.cached_api_call
298def get_clusters(context: models.Context) -> Iterable[Cluster]:
299  r: List[Cluster] = []
300  dataproc = Dataproc(context.project_id)
301  if not dataproc.is_api_enabled():
302    return r
303  executor = get_executor()
304  for clusters in executor.map(lambda r: r.get_clusters(context),
305                               dataproc.get_regions()):
306    r += clusters
307  return r
@caching.cached_api_call
def get_cluster( cluster_name, region, project) -> Optional[Cluster]:
310@caching.cached_api_call
311def get_cluster(cluster_name, region, project) -> Optional[Cluster]:
312  api = apis.get_api('dataproc', 'v1', project)
313  request = api.projects().regions().clusters().get(projectId=project,
314                                                    clusterName=cluster_name,
315                                                    region=region)
316  try:
317    r = request.execute(num_retries=config.API_RETRIES)
318  except googleapiclient.errors.HttpError as err:
319    logging.error(err)
320    return None
321  return Cluster(r['clusterName'], project_id=r['projectId'], resource_data=r)
class AutoScalingPolicy(gcpdiag.models.Resource):
324class AutoScalingPolicy(models.Resource):
325  """AutoScalingPolicy."""
326
327  _resource_data: dict
328
329  def __init__(self, project_id, resource_data, region):
330    super().__init__(project_id=project_id)
331    self._resource_data = resource_data
332    self.region = region
333
334  @property
335  def policy_id(self) -> str:
336    return self._resource_data['id']
337
338  @property
339  def full_path(self) -> str:
340    return self._resource_data['name']
341
342  @property
343  def short_path(self) -> str:
344    return f'{self.project_id}/{self.region}/{self.policy_id}'
345
346  @property
347  def name(self) -> str:
348    return self._resource_data['name']
349
350  @property
351  def scale_down_factor(self) -> float:
352    return self._resource_data['basicAlgorithm']['yarnConfig'].get(
353        'scaleDownFactor', 0.0)
354
355  @property
356  def has_graceful_decommission_timeout(self) -> bool:
357    """Checks if a graceful decommission timeout is configured in the autoscaling policy."""
358    return bool(
359        self._resource_data.get('basicAlgorithm',
360                                {}).get('yarnConfig',
361                                        {}).get('gracefulDecommissionTimeout',
362                                                {}))
363
364  @property
365  def graceful_decommission_timeout(self) -> float:
366    """Gets the configured graceful decommission timeout in the autoscaling policy."""
367    return (self._resource_data.get('basicAlgorithm',
368                                    {}).get('yarnConfig', {}).get(
369                                        'gracefulDecommissionTimeout', -1))

AutoScalingPolicy.

AutoScalingPolicy(project_id, resource_data, region)
329  def __init__(self, project_id, resource_data, region):
330    super().__init__(project_id=project_id)
331    self._resource_data = resource_data
332    self.region = region
region
policy_id: str
334  @property
335  def policy_id(self) -> str:
336    return self._resource_data['id']
full_path: str
338  @property
339  def full_path(self) -> str:
340    return self._resource_data['name']

Returns the full path of this resource.

Example: 'projects/gcpdiag-gke-1-9b90/zones/europe-west4-a/clusters/gke1'

short_path: str
342  @property
343  def short_path(self) -> str:
344    return f'{self.project_id}/{self.region}/{self.policy_id}'

Returns the short name for this resource.

Note that it isn't clear from this name what kind of resource it is.

Example: 'gke1'

name: str
346  @property
347  def name(self) -> str:
348    return self._resource_data['name']
scale_down_factor: float
350  @property
351  def scale_down_factor(self) -> float:
352    return self._resource_data['basicAlgorithm']['yarnConfig'].get(
353        'scaleDownFactor', 0.0)
has_graceful_decommission_timeout: bool
355  @property
356  def has_graceful_decommission_timeout(self) -> bool:
357    """Checks if a graceful decommission timeout is configured in the autoscaling policy."""
358    return bool(
359        self._resource_data.get('basicAlgorithm',
360                                {}).get('yarnConfig',
361                                        {}).get('gracefulDecommissionTimeout',
362                                                {}))

Checks if a graceful decommission timeout is configured in the autoscaling policy.

graceful_decommission_timeout: float
364  @property
365  def graceful_decommission_timeout(self) -> float:
366    """Gets the configured graceful decommission timeout in the autoscaling policy."""
367    return (self._resource_data.get('basicAlgorithm',
368                                    {}).get('yarnConfig', {}).get(
369                                        'gracefulDecommissionTimeout', -1))

Gets the configured graceful decommission timeout in the autoscaling policy.

@caching.cached_api_call
def get_auto_scaling_policy( project_id: str, region: str, policy_id: str) -> AutoScalingPolicy:
372@caching.cached_api_call
373def get_auto_scaling_policy(project_id: str, region: str,
374                            policy_id: str) -> AutoScalingPolicy:
375  # logging.info('fetching autoscalingpolicy: %s', project_id)
376  dataproc = apis.get_api('dataproc', 'v1', project_id)
377  name = (
378      f'projects/{project_id}/regions/{region}/autoscalingPolicies/{policy_id}')
379  try:
380    request = dataproc.projects().regions().autoscalingPolicies().get(name=name)
381    response = request.execute(num_retries=config.API_RETRIES)
382    return AutoScalingPolicy(project_id, response, region)
383  except googleapiclient.errors.HttpError as err:
384    raise utils.GcpApiError(err) from err
@caching.cached_api_call
def list_auto_scaling_policies( project_id: str, region: str) -> List[AutoScalingPolicy]:
387@caching.cached_api_call
388def list_auto_scaling_policies(project_id: str,
389                               region: str) -> List[AutoScalingPolicy]:
390  """Lists all autoscaling policies in the given project and region."""
391  dataproc = apis.get_api('dataproc', 'v1', project_id)
392  parent = f'projects/{project_id}/regions/{region}'
393  try:
394    request = (dataproc.projects().regions().autoscalingPolicies().list(
395        parent=parent))
396    response = request.execute(num_retries=config.API_RETRIES)
397    return [
398        AutoScalingPolicy(project_id, policy_data, region)
399        for policy_data in response.get('policies', [])
400    ]
401  except googleapiclient.errors.HttpError as err:
402    raise utils.GcpApiError(err) from err

Lists all autoscaling policies in the given project and region.

class Job(gcpdiag.models.Resource):
405class Job(models.Resource):
406  """Job."""
407
408  _resource_data: dict
409
410  def __init__(self, project_id, job_id, region, resource_data):
411    super().__init__(project_id=project_id)
412    self._resource_data = resource_data
413    self.region = region
414    self.job_id = job_id
415
416  @property
417  def full_path(self) -> str:
418    return (
419        f'projects/{self.project_id}/regions/{self.region}/jobs/{self.job_id}')
420
421  @property
422  def short_path(self) -> str:
423    return f'{self.project_id}/{self.region}/{self.job_id}'
424
425  @property
426  def cluster_name(self) -> str:
427    return self._resource_data['placement']['clusterName']
428
429  @property
430  def cluster_uuid(self) -> str:
431    return self._resource_data['placement']['clusterUuid']
432
433  @property
434  def state(self):
435    return self._resource_data['status']['state']
436
437  @property
438  def details(self):
439    if self._resource_data['status']['state'] == 'ERROR':
440      return self._resource_data['status']['details']
441    return None
442
443  @property
444  def status_history(self):
445    status_history_dict = {}
446    for previous_status in self._resource_data['statusHistory']:
447      if previous_status['state'] not in status_history_dict:
448        status_history_dict[
449            previous_status['state']] = previous_status['stateStartTime']
450
451    return status_history_dict
452
453  @property
454  def yarn_applications(self):
455    return self._resource_data['yarnApplications']
456
457  @property
458  def driver_output_resource_uri(self):
459    return self._resource_data.get('driverOutputResourceUri')
460
461  @property
462  def job_uuid(self):
463    return self._resource_data.get('jobUuid')
464
465  @property
466  def job_provided_bq_connector(self):
467    """Check user-supplied BigQuery connector on the job level"""
468    jar_file_uris = (self._resource_data.get('sparkJob', {}).get('jarFileUris'))
469    if jar_file_uris is not None:
470      for file in jar_file_uris:
471        if 'spark-bigquery-latest.jar' in file:
472          return 'spark-bigquery-latest'
473        else:
474          match = re.search(
475              r'spark-bigquery(?:-with-dependencies_\d+\.\d+)?-(\d+\.\d+\.\d+)\.jar',
476              file)
477          if match:
478            return match.group(1)
479    return None

Job.

Job(project_id, job_id, region, resource_data)
410  def __init__(self, project_id, job_id, region, resource_data):
411    super().__init__(project_id=project_id)
412    self._resource_data = resource_data
413    self.region = region
414    self.job_id = job_id
region
job_id
full_path: str
416  @property
417  def full_path(self) -> str:
418    return (
419        f'projects/{self.project_id}/regions/{self.region}/jobs/{self.job_id}')

Returns the full path of this resource.

Example: 'projects/gcpdiag-gke-1-9b90/zones/europe-west4-a/clusters/gke1'

short_path: str
421  @property
422  def short_path(self) -> str:
423    return f'{self.project_id}/{self.region}/{self.job_id}'

Returns the short name for this resource.

Note that it isn't clear from this name what kind of resource it is.

Example: 'gke1'

cluster_name: str
425  @property
426  def cluster_name(self) -> str:
427    return self._resource_data['placement']['clusterName']
cluster_uuid: str
429  @property
430  def cluster_uuid(self) -> str:
431    return self._resource_data['placement']['clusterUuid']
state
433  @property
434  def state(self):
435    return self._resource_data['status']['state']
details
437  @property
438  def details(self):
439    if self._resource_data['status']['state'] == 'ERROR':
440      return self._resource_data['status']['details']
441    return None
status_history
443  @property
444  def status_history(self):
445    status_history_dict = {}
446    for previous_status in self._resource_data['statusHistory']:
447      if previous_status['state'] not in status_history_dict:
448        status_history_dict[
449            previous_status['state']] = previous_status['stateStartTime']
450
451    return status_history_dict
yarn_applications
453  @property
454  def yarn_applications(self):
455    return self._resource_data['yarnApplications']
driver_output_resource_uri
457  @property
458  def driver_output_resource_uri(self):
459    return self._resource_data.get('driverOutputResourceUri')
job_uuid
461  @property
462  def job_uuid(self):
463    return self._resource_data.get('jobUuid')
job_provided_bq_connector
465  @property
466  def job_provided_bq_connector(self):
467    """Check user-supplied BigQuery connector on the job level"""
468    jar_file_uris = (self._resource_data.get('sparkJob', {}).get('jarFileUris'))
469    if jar_file_uris is not None:
470      for file in jar_file_uris:
471        if 'spark-bigquery-latest.jar' in file:
472          return 'spark-bigquery-latest'
473        else:
474          match = re.search(
475              r'spark-bigquery(?:-with-dependencies_\d+\.\d+)?-(\d+\.\d+\.\d+)\.jar',
476              file)
477          if match:
478            return match.group(1)
479    return None

Check user-supplied BigQuery connector on the job level

@caching.cached_api_call
def get_job_by_jobid(project_id: str, region: str, job_id: str):
482@caching.cached_api_call
483def get_job_by_jobid(project_id: str, region: str, job_id: str):
484  dataproc = apis.get_api('dataproc', 'v1', project_id)
485  try:
486    request = (dataproc.projects().regions().jobs().get(projectId=project_id,
487                                                        region=region,
488                                                        jobId=job_id))
489    response = request.execute(num_retries=config.API_RETRIES)
490    return Job(project_id, region, job_id, response)
491  except googleapiclient.errors.HttpError as err:
492    raise utils.GcpApiError(err) from err
@caching.cached_api_call
def extract_dataproc_supported_version() -> list[str]:
495@caching.cached_api_call
496def extract_dataproc_supported_version() -> list[str]:
497  """Extract the supported Dataproc versions(use Debian as representative).
498  """
499
500  page_url = 'https://cloud.google.com/dataproc/docs/concepts/versioning/dataproc-version-clusters'
501
502  try:
503    table = web.fetch_and_extract_table(page_url,
504                                        tag='h3',
505                                        tag_id='debian_images')
506    if table:
507      rows = table.find_all('tr')[1:]  #Skip the header row
508      version_list = []
509
510      for row in rows:
511        dp_version = row.find_all('td')[0].get_text().strip().split('-')[0]
512        version_list.append(dp_version)
513      return version_list
514
515    else:
516      return []
517  except (
518      requests.exceptions.RequestException,
519      AttributeError,
520      TypeError,
521      ValueError,
522      IndexError,
523  ) as e:
524    logging.error(
525        'Error in extracting dataproc versions: %s',
526        e,
527    )
528    return []

Extract the supported Dataproc versions(use Debian as representative).

@caching.cached_api_call
def extract_dataproc_bigquery_version(image_version) -> list[str]:
531@caching.cached_api_call
532def extract_dataproc_bigquery_version(image_version) -> list[str]:
533  """Extract Dataproc BigQuery connector versions based on image version GCP documentation.
534  """
535
536  page_url = ('https://cloud.google.com/dataproc/docs/concepts/versioning/'
537              'dataproc-release-' + image_version)
538
539  try:
540    table = web.fetch_and_extract_table(page_url, tag='div')
541    bq_version = []
542    if table:
543      rows = table.find_all('tr')[1:]
544      for row in rows:
545        cells = row.find_all('td')
546        if 'BigQuery Connector' in cells[0].get_text(strip=True):
547          bq_version = cells[1].get_text(strip=True)
548    return bq_version
549  except (
550      requests.exceptions.RequestException,
551      AttributeError,
552      TypeError,
553      ValueError,
554      IndexError,
555  ) as e:
556    logging.error(
557        '%s Error in extracting BigQuery connector versions.'
558        '  Please check BigQuery Connector version on %s',
559        e,
560        page_url,
561    )
562    return []

Extract Dataproc BigQuery connector versions based on image version GCP documentation.