gcpdiag.queries.datafusion
38class Instance(models.Resource): 39 """Represents a Data Fusion instance. 40 41 https://cloud.google.com/data-fusion/docs/reference/rest/v1/projects.locations.instances#Instance 42 """ 43 44 _resource_data: dict 45 46 def __init__(self, project_id, resource_data): 47 super().__init__(project_id=project_id) 48 self._resource_data = resource_data 49 50 @property 51 def full_path(self) -> str: 52 """ 53 The 'name' of the instance is already in the full path form 54 55 projects/{project}/locations/{location}/instances/{instance}. 56 """ 57 return self._resource_data['name'] 58 59 @property 60 def short_path(self) -> str: 61 path = self.full_path 62 path = re.sub(r'^projects/', '', path) 63 path = re.sub(r'/locations/', '/', path) 64 path = re.sub(r'/instances/', '/', path) 65 return path 66 67 @property 68 def name(self) -> str: 69 return utils.extract_value_from_res_name(self._resource_data['name'], 70 'instances') 71 72 @property 73 def location(self) -> str: 74 return utils.extract_value_from_res_name(self._resource_data['name'], 75 'locations') 76 77 @property 78 def zone(self) -> str: 79 return self._resource_data['zone'] 80 81 @property 82 def type(self) -> str: 83 return self._resource_data['type'] 84 85 @property 86 def is_basic_type(self) -> bool: 87 return self._resource_data['type'] == 'BASIC' 88 89 @property 90 def is_enterprise_type(self) -> bool: 91 return self._resource_data['type'] == 'ENTERPRISE' 92 93 @property 94 def is_developer_type(self) -> bool: 95 return self._resource_data['type'] == 'DEVELOPER' 96 97 @property 98 def is_private(self) -> bool: 99 if 'privateInstance' in self._resource_data: 100 return self._resource_data['privateInstance'] 101 return False 102 103 @property 104 def status(self) -> str: 105 return self._resource_data['state'] 106 107 @property 108 def status_details(self) -> Optional[str]: 109 if 'stateMessage' in self._resource_data: 110 return self._resource_data['stateMessage'] 111 return None 112 113 @property 114 def is_running(self) -> bool: 115 return self.status == 'ACTIVE' 116 117 @property 118 def is_deleting(self) -> bool: 119 return self._resource_data['state'] == 'DELETING' 120 121 @property 122 def version(self) -> Version: 123 return Version(self._resource_data['version']) 124 125 @property 126 def api_service_agent(self) -> str: 127 return self._resource_data['p4ServiceAccount'] 128 129 @property 130 def dataproc_service_account(self) -> str: 131 sa = self._resource_data.get('dataprocServiceAccount') 132 if sa is None: 133 sa = crm.get_project(self.project_id).default_compute_service_account 134 return sa 135 136 @property 137 def tenant_project_id(self) -> str: 138 return self._resource_data['tenantProjectId'] 139 140 @property 141 def uses_shared_vpc(self) -> bool: 142 """ 143 If shared VPC then 'network_string' = 'projects/{host-project-id}/global/networks/{network}' 144 else 'network_string' = {network} 145 """ 146 if 'network' in self._resource_data['networkConfig']: 147 network_string = self._resource_data['networkConfig']['network'] 148 match = re.match(r'projects/([^/]+)/global/networks/([^/]+)$', 149 network_string) 150 if match and match.group(1) != self.project_id: 151 return True 152 153 return False 154 155 @property 156 def network(self) -> network.Network: 157 if 'network' in self._resource_data['networkConfig']: 158 network_string = self._resource_data['networkConfig']['network'] 159 match = re.match(r'projects/([^/]+)/global/networks/([^/]+)$', 160 network_string) 161 if match: 162 return network.get_network( 163 match.group(1), 164 match.group(2), 165 context=models.Context(project_id=match.group(1)), 166 ) 167 else: 168 return network.get_network( 169 self.project_id, 170 network_string, 171 context=models.Context(project_id=self.project_id), 172 ) 173 174 return network.get_network( 175 self.project_id, 176 'default', 177 context=models.Context(project_id=self.project_id), 178 ) 179 180 @property 181 def tp_ipv4_cidr(self) -> Optional[IPv4NetOrIPv6Net]: 182 if 'network' in self._resource_data['networkConfig']: 183 cidr = self._resource_data['networkConfig']['ipAllocation'] 184 return ipaddress.ip_network(cidr) 185 return None 186 187 @property 188 def api_endpoint(self) -> str: 189 return self._resource_data['apiEndpoint']
Represents a Data Fusion instance.
https://cloud.google.com/data-fusion/docs/reference/rest/v1/projects.locations.instances#Instance
50 @property 51 def full_path(self) -> str: 52 """ 53 The 'name' of the instance is already in the full path form 54 55 projects/{project}/locations/{location}/instances/{instance}. 56 """ 57 return self._resource_data['name']
The 'name' of the instance is already in the full path form
projects/{project}/locations/{location}/instances/{instance}.
59 @property 60 def short_path(self) -> str: 61 path = self.full_path 62 path = re.sub(r'^projects/', '', path) 63 path = re.sub(r'/locations/', '/', path) 64 path = re.sub(r'/instances/', '/', path) 65 return path
Returns the short name for this resource.
Note that it isn't clear from this name what kind of resource it is.
Example: 'gke1'
155 @property 156 def network(self) -> network.Network: 157 if 'network' in self._resource_data['networkConfig']: 158 network_string = self._resource_data['networkConfig']['network'] 159 match = re.match(r'projects/([^/]+)/global/networks/([^/]+)$', 160 network_string) 161 if match: 162 return network.get_network( 163 match.group(1), 164 match.group(2), 165 context=models.Context(project_id=match.group(1)), 166 ) 167 else: 168 return network.get_network( 169 self.project_id, 170 network_string, 171 context=models.Context(project_id=self.project_id), 172 ) 173 174 return network.get_network( 175 self.project_id, 176 'default', 177 context=models.Context(project_id=self.project_id), 178 )
192@caching.cached_api_call 193def get_instances(context: models.Context) -> Mapping[str, Instance]: 194 """Get a dict of Instance matching the given context, indexed by instance full path.""" 195 instances: Dict[str, Instance] = {} 196 197 if not apis.is_enabled(context.project_id, 'datafusion'): 198 return instances 199 200 logging.debug('fetching list of Data Fusion instances in project %s', 201 context.project_id) 202 datafusion_api = apis.get_api('datafusion', 'v1', context.project_id) 203 query = datafusion_api.projects().locations().instances().list( 204 parent=f'projects/{context.project_id}/locations/-' 205 ) #'-' (wildcard) all regions 206 207 try: 208 resp = query.execute(num_retries=config.API_RETRIES) 209 if 'instances' not in resp: 210 return instances 211 212 for i in resp['instances']: 213 # projects/{project}/locations/{location}/instances/{instance}. 214 result = re.match(r'projects/[^/]+/locations/([^/]+)/instances/([^/]+)', 215 i['name']) 216 if not result: 217 logging.error('invalid datafusion name: %s', i['name']) 218 continue 219 location = result.group(1) 220 labels = i.get('labels', {}) 221 name = result.group(2) 222 if not context.match_project_resource( 223 location=location, labels=labels, resource=name): 224 continue 225 226 instances[i['name']] = Instance(project_id=context.project_id, 227 resource_data=i) 228 229 except googleapiclient.errors.HttpError as err: 230 raise utils.GcpApiError(err) from err 231 232 return instances
Get a dict of Instance matching the given context, indexed by instance full path.
235@caching.cached_api_call 236def extract_support_datafusion_version() -> Dict[str, str]: 237 """Extract the version policy dictionary from the data fusion version support policy page. 238 239 Returns: 240 A dictionary of data fusion versions and their support end dates. 241 """ 242 page_url = 'https://cloud.google.com/data-fusion/docs/support/version-support-policy' 243 244 try: 245 data_fusion_table = web.fetch_and_extract_table(page_url, 246 tag='h2', 247 tag_id='support_timelines') 248 if data_fusion_table: 249 versions = [] 250 support_end_dates = [] 251 version_policy_dict = {} 252 253 for row in data_fusion_table.find_all('tr')[1:]: 254 columns = row.find_all('td') 255 version = columns[0] 256 support_end_date = columns[2].text.strip() 257 if version.sup: 258 version.sup.decompose() 259 260 version = version.text.strip() 261 try: 262 support_end_date = datetime.datetime.strptime(support_end_date, 263 '%B %d, %Y') 264 support_end_date = datetime.datetime.strftime(support_end_date, 265 '%Y-%m-%d') 266 except ValueError: 267 continue 268 269 versions.append(version) 270 support_end_dates.append(support_end_date) 271 272 version_policy_dict = dict(zip(versions, support_end_dates)) 273 return version_policy_dict 274 275 else: 276 return {} 277 278 except ( 279 requests.exceptions.RequestException, 280 AttributeError, 281 TypeError, 282 ValueError, 283 IndexError, 284 ) as e: 285 logging.error('Error in extracting data fusion version support policy: %s', 286 e) 287 return {}
Extract the version policy dictionary from the data fusion version support policy page.
Returns:
A dictionary of data fusion versions and their support end dates.
290class Profile(models.Resource): 291 """Represents a Compute Profile.""" 292 293 _resource_data: dict 294 295 def __init__(self, project_id, instance_name, resource_data): 296 super().__init__(project_id=project_id) 297 self.instance_name = instance_name 298 self._resource_data = resource_data 299 300 @property 301 def full_path(self) -> str: 302 """The full path form : 303 304 projects/{project}/instances/{instance}/computeProfiles/{profile}. 305 """ 306 return (f'projects/{self.project_id}/instances/{self.instance_name}' 307 f"/computeProfiles/{self._resource_data['name']}") 308 309 @property 310 def short_path(self) -> str: 311 """The short path form : 312 313 {project}/{instance}/{profile}. 314 """ 315 return ( 316 f"{self.project_id}/{self.instance_name}/{self._resource_data['name']}") 317 318 @property 319 def name(self) -> str: 320 return self._resource_data['name'] 321 322 @property 323 def region(self) -> str: 324 for value in self._resource_data['provisioner'].get('properties'): 325 if value.get('name') == 'region' and value.get('value') is not None: 326 return value.get('value') 327 return 'No region defined' 328 329 @property 330 def status(self) -> str: 331 return self._resource_data['status'] 332 333 @property 334 def scope(self) -> str: 335 return self._resource_data['scope'] 336 337 @property 338 def is_dataproc_provisioner(self) -> bool: 339 return self._resource_data['provisioner']['name'] == 'gcp-dataproc' 340 341 @property 342 def is_existing_dataproc_provisioner(self) -> bool: 343 return self._resource_data['provisioner']['name'] == 'gcp-existing-dataproc' 344 345 @property 346 def autoscaling_enabled(self) -> bool: 347 for value in self._resource_data['provisioner'].get('properties'): 348 if (value.get('name') == 'enablePredefinedAutoScaling' and 349 value.get('value') is not None): 350 return value.get('value') == 'true' 351 return False 352 353 @property 354 def image_version(self) -> str: 355 for value in self._resource_data['provisioner'].get('properties'): 356 if value.get('name') == 'imageVersion' and value.get('value') != '': 357 return value.get('value') 358 return 'No imageVersion defined' 359 360 @property 361 def auto_scaling_policy(self) -> str: 362 for value in self._resource_data['provisioner'].get('properties'): 363 if value.get('name') == 'autoScalingPolicy' and value.get('value') != '': 364 return value.get('value') 365 return 'No autoScalingPolicy defined'
Represents a Compute Profile.
300 @property 301 def full_path(self) -> str: 302 """The full path form : 303 304 projects/{project}/instances/{instance}/computeProfiles/{profile}. 305 """ 306 return (f'projects/{self.project_id}/instances/{self.instance_name}' 307 f"/computeProfiles/{self._resource_data['name']}")
The full path form :
projects/{project}/instances/{instance}/computeProfiles/{profile}.
309 @property 310 def short_path(self) -> str: 311 """The short path form : 312 313 {project}/{instance}/{profile}. 314 """ 315 return ( 316 f"{self.project_id}/{self.instance_name}/{self._resource_data['name']}")
The short path form :
{project}/{instance}/{profile}.
368@caching.cached_api_call 369def get_instance_system_compute_profile( 370 context: models.Context, instance: Instance) -> Iterable[Profile]: 371 """Get a list of datafusion Instance dataproc System compute profile.""" 372 logging.debug('fetching dataproc System compute profile list: %s', 373 context.project_id) 374 system_profiles: List[Profile] = [] 375 cdap_endpoint = instance.api_endpoint 376 datafusion = get_generic.get_generic_api('datafusion', cdap_endpoint) 377 response = datafusion.get_system_profiles() 378 if response is not None: 379 for res in response: 380 if (res['provisioner']['name'] == 'gcp-dataproc' or 381 res['provisioner']['name'] == 'gcp-existing-dataproc'): 382 system_profiles.append(Profile(context.project_id, instance.name, res)) 383 return system_profiles
Get a list of datafusion Instance dataproc System compute profile.
386@caching.cached_api_call 387def get_instance_user_compute_profile(context: models.Context, 388 instance: Instance) -> Iterable[Profile]: 389 """Get a list of datafusion Instance dataproc User compute profile.""" 390 logging.debug('fetching dataproc User compute profile list: %s', 391 context.project_id) 392 user_profiles: List[Profile] = [] 393 cdap_endpoint = instance.api_endpoint 394 datafusion = get_generic.get_generic_api('datafusion', cdap_endpoint) 395 response_namespaces = datafusion.get_all_namespaces() 396 if response_namespaces is not None: 397 for res in response_namespaces: 398 response = datafusion.get_user_profiles(namespace=res['name']) 399 if response is not None: 400 for res in response: 401 if (res['provisioner']['name'] == 'gcp-dataproc' or 402 res['provisioner']['name'] == 'gcp-existing-dataproc'): 403 user_profiles.append(Profile(context.project_id, instance.name, 404 res)) 405 user_profiles = list(filter(bool, user_profiles)) 406 return user_profiles
Get a list of datafusion Instance dataproc User compute profile.
409@caching.cached_api_call 410def extract_datafusion_dataproc_version() -> Dict[str, list[str]]: 411 """Extract the supported Data Fusion versions and their corresponding 412 Dataproc versions from the GCP documentation.""" 413 414 page_url = 'https://cloud.google.com/data-fusion/docs/concepts/configure-clusters' 415 416 try: 417 table = web.fetch_and_extract_table(page_url, 418 tag='h2', 419 tag_id='version-compatibility') 420 if table: 421 rows = table.find_all('tr')[1:] #Skip the header row 422 version_dict = {} 423 424 for row in rows: 425 cdf_versions = row.find_all('td')[0].get_text().strip() 426 dp_versions = row.find_all('td')[1].get_text().strip() 427 428 cdf_versions = cdf_versions.replace(' and later', '') 429 cdf_versions_list = [] 430 431 if '-' in cdf_versions: 432 start, end = map(float, cdf_versions.split('-')) 433 while start <= end: 434 cdf_versions_list.append(f'{start:.1f}') 435 start += 0.1 436 else: 437 cdf_versions_list.append(cdf_versions) 438 dp_versions = [v.split('*')[0].strip() for v in dp_versions.split(',')] 439 for version in cdf_versions_list: 440 version_dict[version] = dp_versions 441 return version_dict 442 443 else: 444 return {} 445 except ( 446 requests.exceptions.RequestException, 447 AttributeError, 448 TypeError, 449 ValueError, 450 IndexError, 451 ) as e: 452 logging.error( 453 'Error in extracting datafusion and dataproc versions: %s', 454 e, 455 ) 456 return {}
Extract the supported Data Fusion versions and their corresponding Dataproc versions from the GCP documentation.
459class Preference(models.Resource): 460 """Represents a Preference.""" 461 462 _resource_data: dict 463 464 def __init__(self, project_id, instance, resource_data): 465 super().__init__(project_id=project_id) 466 self.instance = instance 467 self._resource_data = resource_data 468 469 @property 470 def full_path(self) -> str: 471 """The full path form : 472 473 projects/{project}/locations/{location}/instances/{instance}. 474 """ 475 return self.instance.full_path 476 477 @property 478 def image_version(self): 479 return self._resource_data.get('system.profile.properties.imageVersion', 480 None)
Represents a Preference.
483def get_system_preferences(context: models.Context, 484 instance: Instance) -> Preference: 485 """Get datafusion Instance system preferences.""" 486 logging.debug('fetching dataproc System preferences: %s', context.project_id) 487 cdap_endpoint = instance.api_endpoint 488 datafusion = get_generic.get_generic_api('datafusion', cdap_endpoint) 489 response = datafusion.get_system_preferences() 490 return Preference(context.project_id, instance, response)
Get datafusion Instance system preferences.
493def get_namespace_preferences(context: models.Context, 494 instance: Instance) -> Mapping[str, Preference]: 495 """Get datafusion cdap namespace preferences. 496 """ 497 logging.debug('fetching dataproc namespace preferences: %s', 498 context.project_id) 499 cdap_endpoint = instance.api_endpoint 500 datafusion = get_generic.get_generic_api('datafusion', cdap_endpoint) 501 namespaces = datafusion.get_all_namespaces() 502 namespaces_preferences = {} 503 if namespaces is not None: 504 for namespace in namespaces: 505 response = datafusion.get_namespace_preferences( 506 namespace=namespace['name']) 507 if bool(response): 508 namespaces_preferences[namespace['name']] = Preference( 509 context.project_id, instance, response) 510 return namespaces_preferences
Get datafusion cdap namespace preferences.
513def get_application_preferences(context: models.Context, 514 instance: Instance) -> Mapping[str, Preference]: 515 """Get datafusion cdap application preferences.""" 516 logging.debug('fetching dataproc application preferences: %s', 517 context.project_id) 518 cdap_endpoint = instance.api_endpoint 519 datafusion = get_generic.get_generic_api('datafusion', cdap_endpoint) 520 applications_preferences = {} 521 namespaces = datafusion.get_all_namespaces() 522 if namespaces is not None: 523 for namespace in namespaces: 524 applications = datafusion.get_all_applications( 525 namespace=namespace['name']) 526 if applications is not None: 527 for application in applications: 528 response = datafusion.get_application_preferences( 529 namespace=namespace['name'], application_name=application['name']) 530 if bool(response): 531 applications_preferences[application['name']] = Preference( 532 context.project_id, instance, response) 533 return applications_preferences
Get datafusion cdap application preferences.