gcpdiag.queries.dataproc
29class Cluster(models.Resource): 30 """Represents Dataproc Cluster""" 31 32 name: str 33 _resource_data: Mapping 34 35 def __init__(self, name: str, project_id: str, resource_data: Mapping): 36 super().__init__(project_id) 37 self.name = name 38 self._resource_data = resource_data 39 40 def is_running(self) -> bool: 41 return self.status == 'RUNNING' 42 43 def get_software_property(self, property_name) -> str: 44 return self._resource_data['config']['softwareConfig']['properties'].get( 45 property_name) 46 47 def is_stackdriver_logging_enabled(self) -> bool: 48 # Unless overridden during create, 49 # properties with default values are not returned, 50 # therefore get_software_property should only return when its false 51 return (not self.get_software_property( 52 'dataproc:dataproc.logging.stackdriver.enable') == 'false') 53 54 def is_stackdriver_monitoring_enabled(self) -> bool: 55 return (self.get_software_property( 56 'dataproc:dataproc.monitoring.stackdriver.enable') == 'true') 57 58 @property 59 def region(self) -> str: 60 """biggest regions have a trailing '-d' at most in its zoneUri 61 62 https://www.googleapis.com/compute/v1/projects/dataproc1/zones/us-central1-d 63 """ 64 return self._resource_data['config']['gceClusterConfig']['zoneUri'].split( 65 '/')[-1][0:-2] 66 67 @property 68 def zone(self) -> Optional[str]: 69 zone = (self._resource_data.get('config', {}).get('gceClusterConfig', 70 {}).get('zoneUri')) 71 if zone: 72 m = re.search(r'/zones/([^/]+)$', zone) 73 if m: 74 return m.group(1) 75 raise RuntimeError(f"can't determine zone for cluster {self.name}") 76 77 @property 78 def full_path(self) -> str: 79 return ( 80 f'projects/{self.project_id}/regions/{self.region}/clusters/{self.name}' 81 ) 82 83 @property 84 def short_path(self) -> str: 85 return f'{self.project_id}/{self.region}/{self.name}' 86 87 @property 88 def status(self) -> str: 89 return self._resource_data['status']['state'] 90 91 def __str__(self) -> str: 92 return self.short_path 93 94 @property 95 def cluster_uuid(self) -> str: 96 return self._resource_data['clusterUuid'] 97 98 @property 99 def image_version(self): 100 return self._resource_data['config']['softwareConfig']['imageVersion'] 101 102 @property 103 def vm_service_account_email(self): 104 sa = self._resource_data['config']['gceClusterConfig'].get('serviceAccount') 105 if sa is None: 106 sa = crm.get_project(self.project_id).default_compute_service_account 107 return sa 108 109 @property 110 def is_custom_gcs_connector(self) -> bool: 111 return bool( 112 self._resource_data.get('config', {}).get('gceClusterConfig', {}).get( 113 'metadata', {}).get('GCS_CONNECTOR_VERSION')) 114 115 @property 116 def cluster_provided_bq_connector(self): 117 """Check user-supplied BigQuery connector on the cluster level""" 118 bigquery_connector = (self._resource_data.get('config', {}).get( 119 'gceClusterConfig', {}).get('metadata', 120 {}).get('SPARK_BQ_CONNECTOR_VERSION')) 121 if not bigquery_connector: 122 bigquery_connector = (self._resource_data.get('config', {}).get( 123 'gceClusterConfig', {}).get('metadata', 124 {}).get('SPARK_BQ_CONNECTOR_URL')) 125 if bigquery_connector: 126 if bigquery_connector == 'spark-bigquery-latest.jar': 127 return 'spark-bigquery-latest' 128 else: 129 match = re.search( 130 r'spark-bigquery(?:-with-dependencies_\d+\.\d+)?-(\d+\.\d+\.\d+)\.jar', 131 bigquery_connector) 132 if match: 133 return match.group(1) 134 # If returns None, it means that the cluster is using the default, 135 # pre-installed BQ connector for the image version 136 return bigquery_connector 137 138 @property 139 def is_gce_cluster(self) -> bool: 140 return bool(self._resource_data.get('config', {}).get('gceClusterConfig')) 141 142 @property 143 def gce_network_uri(self) -> Optional[str]: 144 """Get network uri from cluster network or subnetwork""" 145 if not self.is_gce_cluster: 146 raise RuntimeError( 147 'Can not return network URI for a Dataproc on GKE cluster') 148 network_uri = (self._resource_data.get('config', 149 {}).get('gceClusterConfig', 150 {}).get('networkUri')) 151 if not network_uri: 152 subnetwork_uri = (self._resource_data.get('config', {}).get( 153 'gceClusterConfig', {}).get('subnetworkUri')) 154 network_uri = network.get_subnetwork_from_url(subnetwork_uri).network 155 return network_uri 156 157 @property 158 def gce_subnetwork_uri(self) -> Optional[str]: 159 """Get subnetwork uri from cluster subnetwork.""" 160 if not self.is_gce_cluster: 161 raise RuntimeError( 162 'Can not return subnetwork URI for a Dataproc on GKE cluster') 163 subnetwork_uri = (self._resource_data.get('config', 164 {}).get('gceClusterConfig', 165 {}).get('subnetworkUri')) 166 if not subnetwork_uri: 167 subnetwork_uri = ('https://www.googleapis.com/compute/v1/projects/' + 168 self.project_id + '/regions/' + self.region + 169 '/subnetworks/default') 170 return subnetwork_uri 171 172 @property 173 def is_single_node_cluster(self) -> bool: 174 workers = (self._resource_data.get('config', 175 {}).get('workerConfig', 176 {}).get('numInstances', 0)) 177 return workers == 0 178 179 @property 180 def is_ha_cluster(self) -> bool: 181 masters = (self._resource_data.get('config', 182 {}).get('masterConfig', 183 {}).get('numInstances', 1)) 184 return masters != 1 185 186 @property 187 def is_internal_ip_only(self) -> bool: 188 # internalIpOnly is set to true by default when creating a 189 # Dataproc 2.2 image version cluster. 190 # The default should be false in older versions instead. 191 internal_ip_only = self._resource_data['config']['gceClusterConfig'][ 192 'internalIpOnly'] 193 return internal_ip_only 194 195 @property 196 def has_autoscaling_policy(self) -> bool: 197 """Checks if an autoscaling policy is configured for the cluster.""" 198 return bool(self._resource_data['config'].get('autoscalingConfig', {})) 199 200 @property 201 def autoscaling_policy_id(self) -> str: 202 """Returns the autoscaling policy ID for the cluster.""" 203 if self.has_autoscaling_policy: 204 return (self._resource_data['config'].get('autoscalingConfig', 205 {}).get('policyUri', 206 '').split('/')[-1]) 207 else: 208 return '' 209 210 @property 211 def number_of_primary_workers(self) -> float: 212 """Gets the number of primary worker nodes in the cluster.""" 213 return (self._resource_data['config'].get('workerConfig', 214 {}).get('numInstances', 0)) 215 216 @property 217 def number_of_secondary_workers(self) -> float: 218 """Gets the number of secondary worker nodes in the cluster.""" 219 return (self._resource_data['config'].get('secondaryWorkerConfig', 220 {}).get('numInstances', 0)) 221 222 @property 223 def is_preemptible_primary_workers(self) -> bool: 224 """Checks if the primary worker nodes in the cluster are preemptible.""" 225 return (self._resource_data['config'].get('workerConfig', 226 {}).get('isPreemptible', False)) 227 228 @property 229 def is_preemptible_secondary_workers(self) -> bool: 230 """Checks if the secondary worker nodes in the cluster are preemptible.""" 231 return (self._resource_data['config'].get('secondaryWorkerConfig', 232 {}).get('isPreemptible', False)) 233 234 @property 235 def initialization_actions(self) -> List[str]: 236 return self._resource_data['config'].get('initializationActions', [])
Represents Dataproc Cluster
47 def is_stackdriver_logging_enabled(self) -> bool: 48 # Unless overridden during create, 49 # properties with default values are not returned, 50 # therefore get_software_property should only return when its false 51 return (not self.get_software_property( 52 'dataproc:dataproc.logging.stackdriver.enable') == 'false')
58 @property 59 def region(self) -> str: 60 """biggest regions have a trailing '-d' at most in its zoneUri 61 62 https://www.googleapis.com/compute/v1/projects/dataproc1/zones/us-central1-d 63 """ 64 return self._resource_data['config']['gceClusterConfig']['zoneUri'].split( 65 '/')[-1][0:-2]
biggest regions have a trailing '-d' at most in its zoneUri
https://www.googleapis.com/compute/v1/projects/dataproc1/zones/us-central1-d
67 @property 68 def zone(self) -> Optional[str]: 69 zone = (self._resource_data.get('config', {}).get('gceClusterConfig', 70 {}).get('zoneUri')) 71 if zone: 72 m = re.search(r'/zones/([^/]+)$', zone) 73 if m: 74 return m.group(1) 75 raise RuntimeError(f"can't determine zone for cluster {self.name}")
77 @property 78 def full_path(self) -> str: 79 return ( 80 f'projects/{self.project_id}/regions/{self.region}/clusters/{self.name}' 81 )
Returns the full path of this resource.
Example: 'projects/gcpdiag-gke-1-9b90/zones/europe-west4-a/clusters/gke1'
83 @property 84 def short_path(self) -> str: 85 return f'{self.project_id}/{self.region}/{self.name}'
Returns the short name for this resource.
Note that it isn't clear from this name what kind of resource it is.
Example: 'gke1'
115 @property 116 def cluster_provided_bq_connector(self): 117 """Check user-supplied BigQuery connector on the cluster level""" 118 bigquery_connector = (self._resource_data.get('config', {}).get( 119 'gceClusterConfig', {}).get('metadata', 120 {}).get('SPARK_BQ_CONNECTOR_VERSION')) 121 if not bigquery_connector: 122 bigquery_connector = (self._resource_data.get('config', {}).get( 123 'gceClusterConfig', {}).get('metadata', 124 {}).get('SPARK_BQ_CONNECTOR_URL')) 125 if bigquery_connector: 126 if bigquery_connector == 'spark-bigquery-latest.jar': 127 return 'spark-bigquery-latest' 128 else: 129 match = re.search( 130 r'spark-bigquery(?:-with-dependencies_\d+\.\d+)?-(\d+\.\d+\.\d+)\.jar', 131 bigquery_connector) 132 if match: 133 return match.group(1) 134 # If returns None, it means that the cluster is using the default, 135 # pre-installed BQ connector for the image version 136 return bigquery_connector
Check user-supplied BigQuery connector on the cluster level
142 @property 143 def gce_network_uri(self) -> Optional[str]: 144 """Get network uri from cluster network or subnetwork""" 145 if not self.is_gce_cluster: 146 raise RuntimeError( 147 'Can not return network URI for a Dataproc on GKE cluster') 148 network_uri = (self._resource_data.get('config', 149 {}).get('gceClusterConfig', 150 {}).get('networkUri')) 151 if not network_uri: 152 subnetwork_uri = (self._resource_data.get('config', {}).get( 153 'gceClusterConfig', {}).get('subnetworkUri')) 154 network_uri = network.get_subnetwork_from_url(subnetwork_uri).network 155 return network_uri
Get network uri from cluster network or subnetwork
157 @property 158 def gce_subnetwork_uri(self) -> Optional[str]: 159 """Get subnetwork uri from cluster subnetwork.""" 160 if not self.is_gce_cluster: 161 raise RuntimeError( 162 'Can not return subnetwork URI for a Dataproc on GKE cluster') 163 subnetwork_uri = (self._resource_data.get('config', 164 {}).get('gceClusterConfig', 165 {}).get('subnetworkUri')) 166 if not subnetwork_uri: 167 subnetwork_uri = ('https://www.googleapis.com/compute/v1/projects/' + 168 self.project_id + '/regions/' + self.region + 169 '/subnetworks/default') 170 return subnetwork_uri
Get subnetwork uri from cluster subnetwork.
186 @property 187 def is_internal_ip_only(self) -> bool: 188 # internalIpOnly is set to true by default when creating a 189 # Dataproc 2.2 image version cluster. 190 # The default should be false in older versions instead. 191 internal_ip_only = self._resource_data['config']['gceClusterConfig'][ 192 'internalIpOnly'] 193 return internal_ip_only
195 @property 196 def has_autoscaling_policy(self) -> bool: 197 """Checks if an autoscaling policy is configured for the cluster.""" 198 return bool(self._resource_data['config'].get('autoscalingConfig', {}))
Checks if an autoscaling policy is configured for the cluster.
200 @property 201 def autoscaling_policy_id(self) -> str: 202 """Returns the autoscaling policy ID for the cluster.""" 203 if self.has_autoscaling_policy: 204 return (self._resource_data['config'].get('autoscalingConfig', 205 {}).get('policyUri', 206 '').split('/')[-1]) 207 else: 208 return ''
Returns the autoscaling policy ID for the cluster.
210 @property 211 def number_of_primary_workers(self) -> float: 212 """Gets the number of primary worker nodes in the cluster.""" 213 return (self._resource_data['config'].get('workerConfig', 214 {}).get('numInstances', 0))
Gets the number of primary worker nodes in the cluster.
216 @property 217 def number_of_secondary_workers(self) -> float: 218 """Gets the number of secondary worker nodes in the cluster.""" 219 return (self._resource_data['config'].get('secondaryWorkerConfig', 220 {}).get('numInstances', 0))
Gets the number of secondary worker nodes in the cluster.
222 @property 223 def is_preemptible_primary_workers(self) -> bool: 224 """Checks if the primary worker nodes in the cluster are preemptible.""" 225 return (self._resource_data['config'].get('workerConfig', 226 {}).get('isPreemptible', False))
Checks if the primary worker nodes in the cluster are preemptible.
228 @property 229 def is_preemptible_secondary_workers(self) -> bool: 230 """Checks if the secondary worker nodes in the cluster are preemptible.""" 231 return (self._resource_data['config'].get('secondaryWorkerConfig', 232 {}).get('isPreemptible', False))
Checks if the secondary worker nodes in the cluster are preemptible.
239class Region: 240 """Represents Dataproc region""" 241 242 project_id: str 243 region: str 244 245 def __init__(self, project_id: str, region: str): 246 self.project_id = project_id 247 self.region = region 248 249 def get_clusters(self, context: models.Context) -> Iterable[Cluster]: 250 clusters = [] 251 for cluster in self.query_api(): 252 if not context.match_project_resource(resource=cluster.get('clusterName'), 253 labels=cluster.get('labels', {})): 254 continue 255 c = Cluster( 256 name=cluster['clusterName'], 257 project_id=self.project_id, 258 resource_data=cluster, 259 ) 260 clusters.append(c) 261 return clusters 262 263 def query_api(self) -> Iterable[dict]: 264 try: 265 api = apis.get_api('dataproc', 'v1', self.project_id) 266 query = (api.projects().regions().clusters().list( 267 projectId=self.project_id, region=self.region)) 268 # be careful not to retry too many times because querying all regions 269 # sometimes causes requests to fail permanently 270 resp = query.execute(num_retries=1) 271 return resp.get('clusters', []) 272 except googleapiclient.errors.HttpError as err: 273 # b/371526148 investigate permission denied error 274 logging.error(err) 275 return [] 276 # raise utils.GcpApiError(err) from err
Represents Dataproc region
249 def get_clusters(self, context: models.Context) -> Iterable[Cluster]: 250 clusters = [] 251 for cluster in self.query_api(): 252 if not context.match_project_resource(resource=cluster.get('clusterName'), 253 labels=cluster.get('labels', {})): 254 continue 255 c = Cluster( 256 name=cluster['clusterName'], 257 project_id=self.project_id, 258 resource_data=cluster, 259 ) 260 clusters.append(c) 261 return clusters
263 def query_api(self) -> Iterable[dict]: 264 try: 265 api = apis.get_api('dataproc', 'v1', self.project_id) 266 query = (api.projects().regions().clusters().list( 267 projectId=self.project_id, region=self.region)) 268 # be careful not to retry too many times because querying all regions 269 # sometimes causes requests to fail permanently 270 resp = query.execute(num_retries=1) 271 return resp.get('clusters', []) 272 except googleapiclient.errors.HttpError as err: 273 # b/371526148 investigate permission denied error 274 logging.error(err) 275 return [] 276 # raise utils.GcpApiError(err) from err
279class Dataproc: 280 """Represents Dataproc product""" 281 282 project_id: str 283 284 def __init__(self, project_id: str): 285 self.project_id = project_id 286 287 def get_regions(self) -> Iterable[Region]: 288 return [ 289 Region(self.project_id, r.name) 290 for r in gce.get_all_regions(self.project_id) 291 ] 292 293 def is_api_enabled(self) -> bool: 294 return apis.is_enabled(self.project_id, 'dataproc')
Represents Dataproc product
297@caching.cached_api_call 298def get_clusters(context: models.Context) -> Iterable[Cluster]: 299 r: List[Cluster] = [] 300 dataproc = Dataproc(context.project_id) 301 if not dataproc.is_api_enabled(): 302 return r 303 executor = get_executor() 304 for clusters in executor.map(lambda r: r.get_clusters(context), 305 dataproc.get_regions()): 306 r += clusters 307 return r
310@caching.cached_api_call 311def get_cluster(cluster_name, region, project) -> Optional[Cluster]: 312 api = apis.get_api('dataproc', 'v1', project) 313 request = api.projects().regions().clusters().get(projectId=project, 314 clusterName=cluster_name, 315 region=region) 316 try: 317 r = request.execute(num_retries=config.API_RETRIES) 318 except googleapiclient.errors.HttpError as err: 319 logging.error(err) 320 return None 321 return Cluster(r['clusterName'], project_id=r['projectId'], resource_data=r)
324class AutoScalingPolicy(models.Resource): 325 """AutoScalingPolicy.""" 326 327 _resource_data: dict 328 329 def __init__(self, project_id, resource_data, region): 330 super().__init__(project_id=project_id) 331 self._resource_data = resource_data 332 self.region = region 333 334 @property 335 def policy_id(self) -> str: 336 return self._resource_data['id'] 337 338 @property 339 def full_path(self) -> str: 340 return self._resource_data['name'] 341 342 @property 343 def short_path(self) -> str: 344 return f'{self.project_id}/{self.region}/{self.policy_id}' 345 346 @property 347 def name(self) -> str: 348 return self._resource_data['name'] 349 350 @property 351 def scale_down_factor(self) -> float: 352 return self._resource_data['basicAlgorithm']['yarnConfig'].get( 353 'scaleDownFactor', 0.0) 354 355 @property 356 def has_graceful_decommission_timeout(self) -> bool: 357 """Checks if a graceful decommission timeout is configured in the autoscaling policy.""" 358 return bool( 359 self._resource_data.get('basicAlgorithm', 360 {}).get('yarnConfig', 361 {}).get('gracefulDecommissionTimeout', 362 {})) 363 364 @property 365 def graceful_decommission_timeout(self) -> float: 366 """Gets the configured graceful decommission timeout in the autoscaling policy.""" 367 return (self._resource_data.get('basicAlgorithm', 368 {}).get('yarnConfig', {}).get( 369 'gracefulDecommissionTimeout', -1))
AutoScalingPolicy.
Returns the full path of this resource.
Example: 'projects/gcpdiag-gke-1-9b90/zones/europe-west4-a/clusters/gke1'
342 @property 343 def short_path(self) -> str: 344 return f'{self.project_id}/{self.region}/{self.policy_id}'
Returns the short name for this resource.
Note that it isn't clear from this name what kind of resource it is.
Example: 'gke1'
355 @property 356 def has_graceful_decommission_timeout(self) -> bool: 357 """Checks if a graceful decommission timeout is configured in the autoscaling policy.""" 358 return bool( 359 self._resource_data.get('basicAlgorithm', 360 {}).get('yarnConfig', 361 {}).get('gracefulDecommissionTimeout', 362 {}))
Checks if a graceful decommission timeout is configured in the autoscaling policy.
364 @property 365 def graceful_decommission_timeout(self) -> float: 366 """Gets the configured graceful decommission timeout in the autoscaling policy.""" 367 return (self._resource_data.get('basicAlgorithm', 368 {}).get('yarnConfig', {}).get( 369 'gracefulDecommissionTimeout', -1))
Gets the configured graceful decommission timeout in the autoscaling policy.
372@caching.cached_api_call 373def get_auto_scaling_policy(project_id: str, region: str, 374 policy_id: str) -> AutoScalingPolicy: 375 # logging.info('fetching autoscalingpolicy: %s', project_id) 376 dataproc = apis.get_api('dataproc', 'v1', project_id) 377 name = ( 378 f'projects/{project_id}/regions/{region}/autoscalingPolicies/{policy_id}') 379 try: 380 request = dataproc.projects().regions().autoscalingPolicies().get(name=name) 381 response = request.execute(num_retries=config.API_RETRIES) 382 return AutoScalingPolicy(project_id, response, region) 383 except googleapiclient.errors.HttpError as err: 384 raise utils.GcpApiError(err) from err
387@caching.cached_api_call 388def list_auto_scaling_policies(project_id: str, 389 region: str) -> List[AutoScalingPolicy]: 390 """Lists all autoscaling policies in the given project and region.""" 391 dataproc = apis.get_api('dataproc', 'v1', project_id) 392 parent = f'projects/{project_id}/regions/{region}' 393 try: 394 request = (dataproc.projects().regions().autoscalingPolicies().list( 395 parent=parent)) 396 response = request.execute(num_retries=config.API_RETRIES) 397 return [ 398 AutoScalingPolicy(project_id, policy_data, region) 399 for policy_data in response.get('policies', []) 400 ] 401 except googleapiclient.errors.HttpError as err: 402 raise utils.GcpApiError(err) from err
Lists all autoscaling policies in the given project and region.
405class Job(models.Resource): 406 """Job.""" 407 408 _resource_data: dict 409 410 def __init__(self, project_id, job_id, region, resource_data): 411 super().__init__(project_id=project_id) 412 self._resource_data = resource_data 413 self.region = region 414 self.job_id = job_id 415 416 @property 417 def full_path(self) -> str: 418 return ( 419 f'projects/{self.project_id}/regions/{self.region}/jobs/{self.job_id}') 420 421 @property 422 def short_path(self) -> str: 423 return f'{self.project_id}/{self.region}/{self.job_id}' 424 425 @property 426 def cluster_name(self) -> str: 427 return self._resource_data['placement']['clusterName'] 428 429 @property 430 def cluster_uuid(self) -> str: 431 return self._resource_data['placement']['clusterUuid'] 432 433 @property 434 def state(self): 435 return self._resource_data['status']['state'] 436 437 @property 438 def details(self): 439 if self._resource_data['status']['state'] == 'ERROR': 440 return self._resource_data['status']['details'] 441 return None 442 443 @property 444 def status_history(self): 445 status_history_dict = {} 446 for previous_status in self._resource_data['statusHistory']: 447 if previous_status['state'] not in status_history_dict: 448 status_history_dict[ 449 previous_status['state']] = previous_status['stateStartTime'] 450 451 return status_history_dict 452 453 @property 454 def yarn_applications(self): 455 return self._resource_data['yarnApplications'] 456 457 @property 458 def driver_output_resource_uri(self): 459 return self._resource_data.get('driverOutputResourceUri') 460 461 @property 462 def job_uuid(self): 463 return self._resource_data.get('jobUuid') 464 465 @property 466 def job_provided_bq_connector(self): 467 """Check user-supplied BigQuery connector on the job level""" 468 jar_file_uris = (self._resource_data.get('sparkJob', {}).get('jarFileUris')) 469 if jar_file_uris is not None: 470 for file in jar_file_uris: 471 if 'spark-bigquery-latest.jar' in file: 472 return 'spark-bigquery-latest' 473 else: 474 match = re.search( 475 r'spark-bigquery(?:-with-dependencies_\d+\.\d+)?-(\d+\.\d+\.\d+)\.jar', 476 file) 477 if match: 478 return match.group(1) 479 return None
Job.
416 @property 417 def full_path(self) -> str: 418 return ( 419 f'projects/{self.project_id}/regions/{self.region}/jobs/{self.job_id}')
Returns the full path of this resource.
Example: 'projects/gcpdiag-gke-1-9b90/zones/europe-west4-a/clusters/gke1'
421 @property 422 def short_path(self) -> str: 423 return f'{self.project_id}/{self.region}/{self.job_id}'
Returns the short name for this resource.
Note that it isn't clear from this name what kind of resource it is.
Example: 'gke1'
443 @property 444 def status_history(self): 445 status_history_dict = {} 446 for previous_status in self._resource_data['statusHistory']: 447 if previous_status['state'] not in status_history_dict: 448 status_history_dict[ 449 previous_status['state']] = previous_status['stateStartTime'] 450 451 return status_history_dict
465 @property 466 def job_provided_bq_connector(self): 467 """Check user-supplied BigQuery connector on the job level""" 468 jar_file_uris = (self._resource_data.get('sparkJob', {}).get('jarFileUris')) 469 if jar_file_uris is not None: 470 for file in jar_file_uris: 471 if 'spark-bigquery-latest.jar' in file: 472 return 'spark-bigquery-latest' 473 else: 474 match = re.search( 475 r'spark-bigquery(?:-with-dependencies_\d+\.\d+)?-(\d+\.\d+\.\d+)\.jar', 476 file) 477 if match: 478 return match.group(1) 479 return None
Check user-supplied BigQuery connector on the job level
482@caching.cached_api_call 483def get_job_by_jobid(project_id: str, region: str, job_id: str): 484 dataproc = apis.get_api('dataproc', 'v1', project_id) 485 try: 486 request = (dataproc.projects().regions().jobs().get(projectId=project_id, 487 region=region, 488 jobId=job_id)) 489 response = request.execute(num_retries=config.API_RETRIES) 490 return Job(project_id, region, job_id, response) 491 except googleapiclient.errors.HttpError as err: 492 raise utils.GcpApiError(err) from err
495@caching.cached_api_call 496def extract_dataproc_supported_version() -> list[str]: 497 """Extract the supported Dataproc versions(use Debian as representative). 498 """ 499 500 page_url = 'https://cloud.google.com/dataproc/docs/concepts/versioning/dataproc-version-clusters' 501 502 try: 503 table = web.fetch_and_extract_table(page_url, 504 tag='h3', 505 tag_id='debian_images') 506 if table: 507 rows = table.find_all('tr')[1:] #Skip the header row 508 version_list = [] 509 510 for row in rows: 511 dp_version = row.find_all('td')[0].get_text().strip().split('-')[0] 512 version_list.append(dp_version) 513 return version_list 514 515 else: 516 return [] 517 except ( 518 requests.exceptions.RequestException, 519 AttributeError, 520 TypeError, 521 ValueError, 522 IndexError, 523 ) as e: 524 logging.error( 525 'Error in extracting dataproc versions: %s', 526 e, 527 ) 528 return []
Extract the supported Dataproc versions(use Debian as representative).
531@caching.cached_api_call 532def extract_dataproc_bigquery_version(image_version) -> list[str]: 533 """Extract Dataproc BigQuery connector versions based on image version GCP documentation. 534 """ 535 536 page_url = ('https://cloud.google.com/dataproc/docs/concepts/versioning/' 537 'dataproc-release-' + image_version) 538 539 try: 540 table = web.fetch_and_extract_table(page_url, tag='div') 541 bq_version = [] 542 if table: 543 rows = table.find_all('tr')[1:] 544 for row in rows: 545 cells = row.find_all('td') 546 if 'BigQuery Connector' in cells[0].get_text(strip=True): 547 bq_version = cells[1].get_text(strip=True) 548 return bq_version 549 except ( 550 requests.exceptions.RequestException, 551 AttributeError, 552 TypeError, 553 ValueError, 554 IndexError, 555 ) as e: 556 logging.error( 557 '%s Error in extracting BigQuery connector versions.' 558 ' Please check BigQuery Connector version on %s', 559 e, 560 page_url, 561 ) 562 return []
Extract Dataproc BigQuery connector versions based on image version GCP documentation.