gcpdiag.queries.dataproc
29class Cluster(models.Resource): 30 """Represents Dataproc Cluster""" 31 32 name: str 33 _resource_data: Mapping 34 35 def __init__(self, name: str, project_id: str, resource_data: Mapping): 36 super().__init__(project_id) 37 self.name = name 38 self._resource_data = resource_data 39 40 def is_running(self) -> bool: 41 return self.status == 'RUNNING' 42 43 def get_software_property(self, property_name) -> str: 44 return self._resource_data['config']['softwareConfig']['properties'].get( 45 property_name) 46 47 def is_stackdriver_logging_enabled(self) -> bool: 48 # Unless overridden during create, 49 # properties with default values are not returned, 50 # therefore get_software_property should only return when its false 51 return (not self.get_software_property( 52 'dataproc:dataproc.logging.stackdriver.enable') == 'false') 53 54 def is_stackdriver_monitoring_enabled(self) -> bool: 55 return (self.get_software_property( 56 'dataproc:dataproc.monitoring.stackdriver.enable') == 'true') 57 58 @property 59 def region(self) -> str: 60 """biggest regions have a trailing '-d' at most in its zoneUri 61 62 https://www.googleapis.com/compute/v1/projects/dataproc1/zones/us-central1-d 63 """ 64 return self._resource_data['config']['gceClusterConfig']['zoneUri'].split( 65 '/')[-1][0:-2] 66 67 @property 68 def zone(self) -> Optional[str]: 69 zone = (self._resource_data.get('config', {}).get('gceClusterConfig', 70 {}).get('zoneUri')) 71 if zone: 72 m = re.search(r'/zones/([^/]+)$', zone) 73 if m: 74 return m.group(1) 75 raise RuntimeError(f"can't determine zone for cluster {self.name}") 76 77 @property 78 def full_path(self) -> str: 79 return ( 80 f'projects/{self.project_id}/regions/{self.region}/clusters/{self.name}' 81 ) 82 83 @property 84 def short_path(self) -> str: 85 return f'{self.project_id}/{self.region}/{self.name}' 86 87 @property 88 def status(self) -> str: 89 return self._resource_data['status']['state'] 90 91 def __str__(self) -> str: 92 return self.short_path 93 94 @property 95 def cluster_uuid(self) -> str: 96 return self._resource_data['clusterUuid'] 97 98 @property 99 def image_version(self): 100 return self._resource_data['config']['softwareConfig']['imageVersion'] 101 102 @property 103 def vm_service_account_email(self): 104 sa = self._resource_data['config']['gceClusterConfig'].get('serviceAccount') 105 if sa is None: 106 sa = crm.get_project(self.project_id).default_compute_service_account 107 return sa 108 109 @property 110 def is_custom_gcs_connector(self) -> bool: 111 return bool( 112 self._resource_data.get('config', {}).get('gceClusterConfig', {}).get( 113 'metadata', {}).get('GCS_CONNECTOR_VERSION')) 114 115 @property 116 def cluster_provided_bq_connector(self): 117 """Check user-supplied BigQuery connector on the cluster level""" 118 bigquery_connector = (self._resource_data.get('config', {}).get( 119 'gceClusterConfig', {}).get('metadata', 120 {}).get('SPARK_BQ_CONNECTOR_VERSION')) 121 if not bigquery_connector: 122 bigquery_connector = (self._resource_data.get('config', {}).get( 123 'gceClusterConfig', {}).get('metadata', 124 {}).get('SPARK_BQ_CONNECTOR_URL')) 125 if bigquery_connector: 126 if bigquery_connector == 'spark-bigquery-latest.jar': 127 return 'spark-bigquery-latest' 128 else: 129 match = re.search( 130 r'spark-bigquery(?:-with-dependencies_\d+\.\d+)?-(\d+\.\d+\.\d+)\.jar', 131 bigquery_connector) 132 if match: 133 return match.group(1) 134 # If returns None, it means that the cluster is using the default, 135 # pre-installed BQ connector for the image version 136 return bigquery_connector 137 138 @property 139 def is_gce_cluster(self) -> bool: 140 return bool(self._resource_data.get('config', {}).get('gceClusterConfig')) 141 142 @property 143 def gce_network_uri(self) -> Optional[str]: 144 """Get network uri from cluster network or subnetwork""" 145 if not self.is_gce_cluster: 146 raise RuntimeError( 147 'Can not return network URI for a Dataproc on GKE cluster') 148 network_uri = (self._resource_data.get('config', 149 {}).get('gceClusterConfig', 150 {}).get('networkUri')) 151 if not network_uri: 152 subnetwork_uri = (self._resource_data.get('config', {}).get( 153 'gceClusterConfig', {}).get('subnetworkUri')) 154 network_uri = network.get_subnetwork_from_url(subnetwork_uri).network 155 return network_uri 156 157 @property 158 def gce_subnetwork_uri(self) -> Optional[str]: 159 """Get subnetwork uri from cluster subnetwork.""" 160 if not self.is_gce_cluster: 161 raise RuntimeError( 162 'Can not return subnetwork URI for a Dataproc on GKE cluster') 163 subnetwork_uri = (self._resource_data.get('config', 164 {}).get('gceClusterConfig', 165 {}).get('subnetworkUri')) 166 if not subnetwork_uri: 167 subnetwork_uri = ('https://www.googleapis.com/compute/v1/projects/' + 168 self.project_id + '/regions/' + self.region + 169 '/subnetworks/default') 170 return subnetwork_uri 171 172 @property 173 def is_single_node_cluster(self) -> bool: 174 workers = (self._resource_data.get('config', 175 {}).get('workerConfig', 176 {}).get('numInstances', 0)) 177 return workers == 0 178 179 @property 180 def is_ha_cluster(self) -> bool: 181 masters = (self._resource_data.get('config', 182 {}).get('masterConfig', 183 {}).get('numInstances', 1)) 184 return masters != 1 185 186 @property 187 def is_internal_ip_only(self) -> bool: 188 # internalIpOnly is set to true by default when creating a 189 # Dataproc 2.2 image version cluster. 190 # The default should be false in older versions instead. 191 internal_ip_only = self._resource_data['config']['gceClusterConfig'][ 192 'internalIpOnly'] 193 return internal_ip_only 194 195 @property 196 def has_autoscaling_policy(self) -> bool: 197 """Checks if an autoscaling policy is configured for the cluster.""" 198 return bool(self._resource_data['config'].get('autoscalingConfig', {})) 199 200 @property 201 def autoscaling_policy_id(self) -> str: 202 """Returns the autoscaling policy ID for the cluster.""" 203 if self.has_autoscaling_policy: 204 return (self._resource_data['config'].get('autoscalingConfig', 205 {}).get('policyUri', 206 '').split('/')[-1]) 207 else: 208 return '' 209 210 @property 211 def number_of_primary_workers(self) -> float: 212 """Gets the number of primary worker nodes in the cluster.""" 213 return (self._resource_data['config'].get('workerConfig', 214 {}).get('numInstances', 0)) 215 216 @property 217 def number_of_secondary_workers(self) -> float: 218 """Gets the number of secondary worker nodes in the cluster.""" 219 return (self._resource_data['config'].get('secondaryWorkerConfig', 220 {}).get('numInstances', 0)) 221 222 @property 223 def is_preemptible_primary_workers(self) -> bool: 224 """Checks if the primary worker nodes in the cluster are preemptible.""" 225 return (self._resource_data['config'].get('workerConfig', 226 {}).get('isPreemptible', False)) 227 228 @property 229 def is_preemptible_secondary_workers(self) -> bool: 230 """Checks if the secondary worker nodes in the cluster are preemptible.""" 231 return (self._resource_data['config'].get('secondaryWorkerConfig', 232 {}).get('isPreemptible', False)) 233 234 @property 235 def initialization_actions(self) -> List[str]: 236 return self._resource_data['config'].get('initializationActions', [])
Represents Dataproc Cluster
47 def is_stackdriver_logging_enabled(self) -> bool: 48 # Unless overridden during create, 49 # properties with default values are not returned, 50 # therefore get_software_property should only return when its false 51 return (not self.get_software_property( 52 'dataproc:dataproc.logging.stackdriver.enable') == 'false')
58 @property 59 def region(self) -> str: 60 """biggest regions have a trailing '-d' at most in its zoneUri 61 62 https://www.googleapis.com/compute/v1/projects/dataproc1/zones/us-central1-d 63 """ 64 return self._resource_data['config']['gceClusterConfig']['zoneUri'].split( 65 '/')[-1][0:-2]
biggest regions have a trailing '-d' at most in its zoneUri
https://www.googleapis.com/compute/v1/projects/dataproc1/zones/us-central1-d
67 @property 68 def zone(self) -> Optional[str]: 69 zone = (self._resource_data.get('config', {}).get('gceClusterConfig', 70 {}).get('zoneUri')) 71 if zone: 72 m = re.search(r'/zones/([^/]+)$', zone) 73 if m: 74 return m.group(1) 75 raise RuntimeError(f"can't determine zone for cluster {self.name}")
77 @property 78 def full_path(self) -> str: 79 return ( 80 f'projects/{self.project_id}/regions/{self.region}/clusters/{self.name}' 81 )
Returns the full path of this resource.
Example: 'projects/gcpdiag-gke-1-9b90/zones/europe-west4-a/clusters/gke1'
83 @property 84 def short_path(self) -> str: 85 return f'{self.project_id}/{self.region}/{self.name}'
Returns the short name for this resource.
Note that it isn't clear from this name what kind of resource it is.
Example: 'gke1'
115 @property 116 def cluster_provided_bq_connector(self): 117 """Check user-supplied BigQuery connector on the cluster level""" 118 bigquery_connector = (self._resource_data.get('config', {}).get( 119 'gceClusterConfig', {}).get('metadata', 120 {}).get('SPARK_BQ_CONNECTOR_VERSION')) 121 if not bigquery_connector: 122 bigquery_connector = (self._resource_data.get('config', {}).get( 123 'gceClusterConfig', {}).get('metadata', 124 {}).get('SPARK_BQ_CONNECTOR_URL')) 125 if bigquery_connector: 126 if bigquery_connector == 'spark-bigquery-latest.jar': 127 return 'spark-bigquery-latest' 128 else: 129 match = re.search( 130 r'spark-bigquery(?:-with-dependencies_\d+\.\d+)?-(\d+\.\d+\.\d+)\.jar', 131 bigquery_connector) 132 if match: 133 return match.group(1) 134 # If returns None, it means that the cluster is using the default, 135 # pre-installed BQ connector for the image version 136 return bigquery_connector
Check user-supplied BigQuery connector on the cluster level
142 @property 143 def gce_network_uri(self) -> Optional[str]: 144 """Get network uri from cluster network or subnetwork""" 145 if not self.is_gce_cluster: 146 raise RuntimeError( 147 'Can not return network URI for a Dataproc on GKE cluster') 148 network_uri = (self._resource_data.get('config', 149 {}).get('gceClusterConfig', 150 {}).get('networkUri')) 151 if not network_uri: 152 subnetwork_uri = (self._resource_data.get('config', {}).get( 153 'gceClusterConfig', {}).get('subnetworkUri')) 154 network_uri = network.get_subnetwork_from_url(subnetwork_uri).network 155 return network_uri
Get network uri from cluster network or subnetwork
157 @property 158 def gce_subnetwork_uri(self) -> Optional[str]: 159 """Get subnetwork uri from cluster subnetwork.""" 160 if not self.is_gce_cluster: 161 raise RuntimeError( 162 'Can not return subnetwork URI for a Dataproc on GKE cluster') 163 subnetwork_uri = (self._resource_data.get('config', 164 {}).get('gceClusterConfig', 165 {}).get('subnetworkUri')) 166 if not subnetwork_uri: 167 subnetwork_uri = ('https://www.googleapis.com/compute/v1/projects/' + 168 self.project_id + '/regions/' + self.region + 169 '/subnetworks/default') 170 return subnetwork_uri
Get subnetwork uri from cluster subnetwork.
186 @property 187 def is_internal_ip_only(self) -> bool: 188 # internalIpOnly is set to true by default when creating a 189 # Dataproc 2.2 image version cluster. 190 # The default should be false in older versions instead. 191 internal_ip_only = self._resource_data['config']['gceClusterConfig'][ 192 'internalIpOnly'] 193 return internal_ip_only
195 @property 196 def has_autoscaling_policy(self) -> bool: 197 """Checks if an autoscaling policy is configured for the cluster.""" 198 return bool(self._resource_data['config'].get('autoscalingConfig', {}))
Checks if an autoscaling policy is configured for the cluster.
200 @property 201 def autoscaling_policy_id(self) -> str: 202 """Returns the autoscaling policy ID for the cluster.""" 203 if self.has_autoscaling_policy: 204 return (self._resource_data['config'].get('autoscalingConfig', 205 {}).get('policyUri', 206 '').split('/')[-1]) 207 else: 208 return ''
Returns the autoscaling policy ID for the cluster.
210 @property 211 def number_of_primary_workers(self) -> float: 212 """Gets the number of primary worker nodes in the cluster.""" 213 return (self._resource_data['config'].get('workerConfig', 214 {}).get('numInstances', 0))
Gets the number of primary worker nodes in the cluster.
216 @property 217 def number_of_secondary_workers(self) -> float: 218 """Gets the number of secondary worker nodes in the cluster.""" 219 return (self._resource_data['config'].get('secondaryWorkerConfig', 220 {}).get('numInstances', 0))
Gets the number of secondary worker nodes in the cluster.
222 @property 223 def is_preemptible_primary_workers(self) -> bool: 224 """Checks if the primary worker nodes in the cluster are preemptible.""" 225 return (self._resource_data['config'].get('workerConfig', 226 {}).get('isPreemptible', False))
Checks if the primary worker nodes in the cluster are preemptible.
228 @property 229 def is_preemptible_secondary_workers(self) -> bool: 230 """Checks if the secondary worker nodes in the cluster are preemptible.""" 231 return (self._resource_data['config'].get('secondaryWorkerConfig', 232 {}).get('isPreemptible', False))
Checks if the secondary worker nodes in the cluster are preemptible.
239class Region: 240 """Represents Dataproc region""" 241 242 project_id: str 243 region: str 244 245 def __init__(self, project_id: str, region: str): 246 self.project_id = project_id 247 self.region = region 248 249 def get_clusters(self, context: models.Context) -> Iterable[Cluster]: 250 clusters = [] 251 for cluster in self.query_api(): 252 if not context.match_project_resource(resource=cluster.get('clusterName'), 253 labels=cluster.get('labels', {})): 254 continue 255 c = Cluster( 256 name=cluster['clusterName'], 257 project_id=self.project_id, 258 resource_data=cluster, 259 ) 260 clusters.append(c) 261 return clusters 262 263 def query_api(self) -> Iterable[dict]: 264 try: 265 api = apis.get_api('dataproc', 'v1', self.project_id) 266 query = (api.projects().regions().clusters().list( 267 projectId=self.project_id, region=self.region)) 268 # be careful not to retry too many times because querying all regions 269 # sometimes causes requests to fail permanently 270 resp = query.execute(num_retries=1) 271 return resp.get('clusters', []) 272 except googleapiclient.errors.HttpError as err: 273 # b/371526148 investigate permission denied error 274 logging.error(err) 275 return [] 276 # raise utils.GcpApiError(err) from err
Represents Dataproc region
249 def get_clusters(self, context: models.Context) -> Iterable[Cluster]: 250 clusters = [] 251 for cluster in self.query_api(): 252 if not context.match_project_resource(resource=cluster.get('clusterName'), 253 labels=cluster.get('labels', {})): 254 continue 255 c = Cluster( 256 name=cluster['clusterName'], 257 project_id=self.project_id, 258 resource_data=cluster, 259 ) 260 clusters.append(c) 261 return clusters
263 def query_api(self) -> Iterable[dict]: 264 try: 265 api = apis.get_api('dataproc', 'v1', self.project_id) 266 query = (api.projects().regions().clusters().list( 267 projectId=self.project_id, region=self.region)) 268 # be careful not to retry too many times because querying all regions 269 # sometimes causes requests to fail permanently 270 resp = query.execute(num_retries=1) 271 return resp.get('clusters', []) 272 except googleapiclient.errors.HttpError as err: 273 # b/371526148 investigate permission denied error 274 logging.error(err) 275 return [] 276 # raise utils.GcpApiError(err) from err
279class Dataproc: 280 """Represents Dataproc product""" 281 282 project_id: str 283 284 def __init__(self, project_id: str): 285 self.project_id = project_id 286 287 def get_regions(self) -> Iterable[Region]: 288 return [ 289 Region(self.project_id, r.name) 290 for r in gce.get_all_regions(self.project_id) 291 ] 292 293 def is_api_enabled(self) -> bool: 294 return apis.is_enabled(self.project_id, 'dataproc')
Represents Dataproc product
297@caching.cached_api_call 298def get_clusters(context: models.Context) -> Iterable[Cluster]: 299 r: List[Cluster] = [] 300 dataproc = Dataproc(context.project_id) 301 if not dataproc.is_api_enabled(): 302 return r 303 executor = get_executor() 304 for clusters in executor.map(lambda r: r.get_clusters(context), 305 dataproc.get_regions()): 306 r += clusters 307 return r
310@caching.cached_api_call 311def get_cluster(cluster_name, region, project) -> Optional[Cluster]: 312 api = apis.get_api('dataproc', 'v1', project) 313 request = api.projects().regions().clusters().get(projectId=project, 314 clusterName=cluster_name, 315 region=region) 316 try: 317 r = request.execute(num_retries=config.API_RETRIES) 318 except (googleapiclient.errors.HttpError, 319 requests.exceptions.RequestException): 320 #logging.error(err) 321 return None 322 return Cluster(r['clusterName'], project_id=r['projectId'], resource_data=r)
325class AutoScalingPolicy(models.Resource): 326 """AutoScalingPolicy.""" 327 328 _resource_data: dict 329 330 def __init__(self, project_id, resource_data, region): 331 super().__init__(project_id=project_id) 332 self._resource_data = resource_data 333 self.region = region 334 335 @property 336 def policy_id(self) -> str: 337 return self._resource_data['id'] 338 339 @property 340 def full_path(self) -> str: 341 return self._resource_data['name'] 342 343 @property 344 def short_path(self) -> str: 345 return f'{self.project_id}/{self.region}/{self.policy_id}' 346 347 @property 348 def name(self) -> str: 349 return self._resource_data['name'] 350 351 @property 352 def scale_down_factor(self) -> float: 353 return self._resource_data['basicAlgorithm']['yarnConfig'].get( 354 'scaleDownFactor', 0.0) 355 356 @property 357 def has_graceful_decommission_timeout(self) -> bool: 358 """Checks if a graceful decommission timeout is configured in the autoscaling policy.""" 359 return bool( 360 self._resource_data.get('basicAlgorithm', 361 {}).get('yarnConfig', 362 {}).get('gracefulDecommissionTimeout', 363 {})) 364 365 @property 366 def graceful_decommission_timeout(self) -> float: 367 """Gets the configured graceful decommission timeout in the autoscaling policy.""" 368 return (self._resource_data.get('basicAlgorithm', 369 {}).get('yarnConfig', {}).get( 370 'gracefulDecommissionTimeout', -1))
AutoScalingPolicy.
Returns the full path of this resource.
Example: 'projects/gcpdiag-gke-1-9b90/zones/europe-west4-a/clusters/gke1'
343 @property 344 def short_path(self) -> str: 345 return f'{self.project_id}/{self.region}/{self.policy_id}'
Returns the short name for this resource.
Note that it isn't clear from this name what kind of resource it is.
Example: 'gke1'
356 @property 357 def has_graceful_decommission_timeout(self) -> bool: 358 """Checks if a graceful decommission timeout is configured in the autoscaling policy.""" 359 return bool( 360 self._resource_data.get('basicAlgorithm', 361 {}).get('yarnConfig', 362 {}).get('gracefulDecommissionTimeout', 363 {}))
Checks if a graceful decommission timeout is configured in the autoscaling policy.
365 @property 366 def graceful_decommission_timeout(self) -> float: 367 """Gets the configured graceful decommission timeout in the autoscaling policy.""" 368 return (self._resource_data.get('basicAlgorithm', 369 {}).get('yarnConfig', {}).get( 370 'gracefulDecommissionTimeout', -1))
Gets the configured graceful decommission timeout in the autoscaling policy.
373@caching.cached_api_call 374def get_auto_scaling_policy(project_id: str, region: str, 375 policy_id: str) -> AutoScalingPolicy: 376 logging.debug('fetching autoscalingpolicy: %s', project_id) 377 dataproc = apis.get_api('dataproc', 'v1', project_id) 378 name = ( 379 f'projects/{project_id}/regions/{region}/autoscalingPolicies/{policy_id}') 380 try: 381 request = dataproc.projects().regions().autoscalingPolicies().get(name=name) 382 response = request.execute(num_retries=config.API_RETRIES) 383 return AutoScalingPolicy(project_id, response, region) 384 except googleapiclient.errors.HttpError as err: 385 raise utils.GcpApiError(err) from err
388@caching.cached_api_call 389def list_auto_scaling_policies(project_id: str, 390 region: str) -> List[AutoScalingPolicy]: 391 """Lists all autoscaling policies in the given project and region.""" 392 dataproc = apis.get_api('dataproc', 'v1', project_id) 393 parent = f'projects/{project_id}/regions/{region}' 394 try: 395 request = (dataproc.projects().regions().autoscalingPolicies().list( 396 parent=parent)) 397 response = request.execute(num_retries=config.API_RETRIES) 398 return [ 399 AutoScalingPolicy(project_id, policy_data, region) 400 for policy_data in response.get('policies', []) 401 ] 402 except googleapiclient.errors.HttpError as err: 403 raise utils.GcpApiError(err) from err
Lists all autoscaling policies in the given project and region.
406class Job(models.Resource): 407 """Job.""" 408 409 _resource_data: dict 410 411 def __init__(self, project_id, job_id, region, resource_data): 412 super().__init__(project_id=project_id) 413 self._resource_data = resource_data 414 self.region = region 415 self.job_id = job_id 416 417 @property 418 def full_path(self) -> str: 419 return ( 420 f'projects/{self.project_id}/regions/{self.region}/jobs/{self.job_id}') 421 422 @property 423 def short_path(self) -> str: 424 return f'{self.project_id}/{self.region}/{self.job_id}' 425 426 @property 427 def cluster_name(self) -> str: 428 return self._resource_data['placement']['clusterName'] 429 430 @property 431 def cluster_uuid(self) -> str: 432 return self._resource_data['placement']['clusterUuid'] 433 434 @property 435 def state(self): 436 return self._resource_data['status']['state'] 437 438 @property 439 def details(self): 440 if self._resource_data['status']['state'] == 'ERROR': 441 return self._resource_data['status']['details'] 442 return None 443 444 @property 445 def status_history(self): 446 status_history_dict = {} 447 for previous_status in self._resource_data['statusHistory']: 448 if previous_status['state'] not in status_history_dict: 449 status_history_dict[ 450 previous_status['state']] = previous_status['stateStartTime'] 451 452 return status_history_dict 453 454 @property 455 def yarn_applications(self): 456 return self._resource_data['yarnApplications'] 457 458 @property 459 def driver_output_resource_uri(self): 460 return self._resource_data.get('driverOutputResourceUri') 461 462 @property 463 def job_uuid(self): 464 return self._resource_data.get('jobUuid') 465 466 @property 467 def job_provided_bq_connector(self): 468 """Check user-supplied BigQuery connector on the job level""" 469 jar_file_uris = (self._resource_data.get('sparkJob', {}).get('jarFileUris')) 470 if jar_file_uris is not None: 471 for file in jar_file_uris: 472 if 'spark-bigquery-latest.jar' in file: 473 return 'spark-bigquery-latest' 474 else: 475 match = re.search( 476 r'spark-bigquery(?:-with-dependencies_\d+\.\d+)?-(\d+\.\d+\.\d+)\.jar', 477 file) 478 if match: 479 return match.group(1) 480 return None
Job.
417 @property 418 def full_path(self) -> str: 419 return ( 420 f'projects/{self.project_id}/regions/{self.region}/jobs/{self.job_id}')
Returns the full path of this resource.
Example: 'projects/gcpdiag-gke-1-9b90/zones/europe-west4-a/clusters/gke1'
422 @property 423 def short_path(self) -> str: 424 return f'{self.project_id}/{self.region}/{self.job_id}'
Returns the short name for this resource.
Note that it isn't clear from this name what kind of resource it is.
Example: 'gke1'
444 @property 445 def status_history(self): 446 status_history_dict = {} 447 for previous_status in self._resource_data['statusHistory']: 448 if previous_status['state'] not in status_history_dict: 449 status_history_dict[ 450 previous_status['state']] = previous_status['stateStartTime'] 451 452 return status_history_dict
466 @property 467 def job_provided_bq_connector(self): 468 """Check user-supplied BigQuery connector on the job level""" 469 jar_file_uris = (self._resource_data.get('sparkJob', {}).get('jarFileUris')) 470 if jar_file_uris is not None: 471 for file in jar_file_uris: 472 if 'spark-bigquery-latest.jar' in file: 473 return 'spark-bigquery-latest' 474 else: 475 match = re.search( 476 r'spark-bigquery(?:-with-dependencies_\d+\.\d+)?-(\d+\.\d+\.\d+)\.jar', 477 file) 478 if match: 479 return match.group(1) 480 return None
Check user-supplied BigQuery connector on the job level
483@caching.cached_api_call 484def get_job_by_jobid(project_id: str, region: str, job_id: str): 485 dataproc = apis.get_api('dataproc', 'v1', project_id) 486 try: 487 request = (dataproc.projects().regions().jobs().get(projectId=project_id, 488 region=region, 489 jobId=job_id)) 490 response = request.execute(num_retries=config.API_RETRIES) 491 return Job(project_id, region, job_id, response) 492 except googleapiclient.errors.HttpError as err: 493 raise utils.GcpApiError(err) from err
496@caching.cached_api_call 497def extract_dataproc_supported_version() -> list[str]: 498 """Extract the supported Dataproc versions(use Debian as representative). 499 """ 500 501 page_url = 'https://cloud.google.com/dataproc/docs/concepts/versioning/dataproc-version-clusters' 502 503 try: 504 table = web.fetch_and_extract_table(page_url, 505 tag='h3', 506 tag_id='debian_images') 507 if table: 508 rows = table.find_all('tr')[1:] #Skip the header row 509 version_list = [] 510 511 for row in rows: 512 dp_version = row.find_all('td')[0].get_text().strip().split('-')[0] 513 version_list.append(dp_version) 514 return version_list 515 516 else: 517 return [] 518 except ( 519 requests.exceptions.RequestException, 520 AttributeError, 521 TypeError, 522 ValueError, 523 IndexError, 524 ) as e: 525 logging.error( 526 'Error in extracting dataproc versions: %s', 527 e, 528 ) 529 return []
Extract the supported Dataproc versions(use Debian as representative).
532@caching.cached_api_call 533def extract_dataproc_bigquery_version(image_version) -> list[str]: 534 """Extract Dataproc BigQuery connector versions based on image version GCP documentation. 535 """ 536 537 page_url = ('https://cloud.google.com/dataproc/docs/concepts/versioning/' 538 'dataproc-release-' + image_version) 539 540 try: 541 table = web.fetch_and_extract_table(page_url, tag='div') 542 bq_version = [] 543 if table: 544 rows = table.find_all('tr')[1:] 545 for row in rows: 546 cells = row.find_all('td') 547 if 'BigQuery Connector' in cells[0].get_text(strip=True): 548 bq_version = cells[1].get_text(strip=True) 549 return bq_version 550 except ( 551 requests.exceptions.RequestException, 552 AttributeError, 553 TypeError, 554 ValueError, 555 IndexError, 556 ) as e: 557 logging.error( 558 '%s Error in extracting BigQuery connector versions.' 559 ' Please check BigQuery Connector version on %s', 560 e, 561 page_url, 562 ) 563 return []
Extract Dataproc BigQuery connector versions based on image version GCP documentation.