gcpdiag.queries.datafusion
38class Instance(models.Resource): 39 """Represents a Data Fusion instance. 40 41 https://cloud.google.com/data-fusion/docs/reference/rest/v1/projects.locations.instances#Instance 42 """ 43 44 _resource_data: dict 45 46 def __init__(self, project_id, resource_data): 47 super().__init__(project_id=project_id) 48 self._resource_data = resource_data 49 50 @property 51 def full_path(self) -> str: 52 """ 53 The 'name' of the instance is already in the full path form 54 55 projects/{project}/locations/{location}/instances/{instance}. 56 """ 57 return self._resource_data['name'] 58 59 @property 60 def short_path(self) -> str: 61 path = self.full_path 62 path = re.sub(r'^projects/', '', path) 63 path = re.sub(r'/locations/', '/', path) 64 path = re.sub(r'/instances/', '/', path) 65 return path 66 67 @property 68 def name(self) -> str: 69 return utils.extract_value_from_res_name(self._resource_data['name'], 70 'instances') 71 72 @property 73 def location(self) -> str: 74 return utils.extract_value_from_res_name(self._resource_data['name'], 75 'locations') 76 77 @property 78 def zone(self) -> str: 79 return self._resource_data['zone'] 80 81 @property 82 def type(self) -> str: 83 return self._resource_data['type'] 84 85 @property 86 def is_basic_type(self) -> bool: 87 return self._resource_data['type'] == 'BASIC' 88 89 @property 90 def is_enterprise_type(self) -> bool: 91 return self._resource_data['type'] == 'ENTERPRISE' 92 93 @property 94 def is_developer_type(self) -> bool: 95 return self._resource_data['type'] == 'DEVELOPER' 96 97 @property 98 def is_private(self) -> bool: 99 if 'privateInstance' in self._resource_data: 100 return self._resource_data['privateInstance'] 101 return False 102 103 @property 104 def status(self) -> str: 105 return self._resource_data['state'] 106 107 @property 108 def status_details(self) -> Optional[str]: 109 if 'stateMessage' in self._resource_data: 110 return self._resource_data['stateMessage'] 111 return None 112 113 @property 114 def is_running(self) -> bool: 115 return self.status == 'ACTIVE' 116 117 @property 118 def is_deleting(self) -> bool: 119 return self._resource_data['state'] == 'DELETING' 120 121 @property 122 def version(self) -> Version: 123 return Version(self._resource_data['version']) 124 125 @property 126 def api_service_agent(self) -> str: 127 return self._resource_data['p4ServiceAccount'] 128 129 @property 130 def dataproc_service_account(self) -> str: 131 sa = self._resource_data.get('dataprocServiceAccount') 132 if sa is None: 133 sa = crm.get_project(self.project_id).default_compute_service_account 134 return sa 135 136 @property 137 def tenant_project_id(self) -> str: 138 return self._resource_data['tenantProjectId'] 139 140 @property 141 def uses_shared_vpc(self) -> bool: 142 """ 143 If shared VPC then 'network_string' = 'projects/{host-project-id}/global/networks/{network}' 144 else 'network_string' = {network} 145 """ 146 if 'network' in self._resource_data['networkConfig']: 147 network_string = self._resource_data['networkConfig']['network'] 148 match = re.match(r'projects/([^/]+)/global/networks/([^/]+)$', 149 network_string) 150 if match and match.group(1) != self.project_id: 151 return True 152 153 return False 154 155 @property 156 def network(self) -> network.Network: 157 if 'network' in self._resource_data['networkConfig']: 158 network_string = self._resource_data['networkConfig']['network'] 159 match = re.match(r'projects/([^/]+)/global/networks/([^/]+)$', 160 network_string) 161 if match: 162 return network.get_network(match.group(1), match.group(2)) 163 else: 164 return network.get_network(self.project_id, network_string) 165 166 return network.get_network(self.project_id, 'default') 167 168 @property 169 def tp_ipv4_cidr(self) -> Optional[IPv4NetOrIPv6Net]: 170 if 'network' in self._resource_data['networkConfig']: 171 cidr = self._resource_data['networkConfig']['ipAllocation'] 172 return ipaddress.ip_network(cidr) 173 return None 174 175 @property 176 def api_endpoint(self) -> str: 177 return self._resource_data['apiEndpoint']
Represents a Data Fusion instance.
https://cloud.google.com/data-fusion/docs/reference/rest/v1/projects.locations.instances#Instance
50 @property 51 def full_path(self) -> str: 52 """ 53 The 'name' of the instance is already in the full path form 54 55 projects/{project}/locations/{location}/instances/{instance}. 56 """ 57 return self._resource_data['name']
The 'name' of the instance is already in the full path form
projects/{project}/locations/{location}/instances/{instance}.
59 @property 60 def short_path(self) -> str: 61 path = self.full_path 62 path = re.sub(r'^projects/', '', path) 63 path = re.sub(r'/locations/', '/', path) 64 path = re.sub(r'/instances/', '/', path) 65 return path
Returns the short name for this resource.
Note that it isn't clear from this name what kind of resource it is.
Example: 'gke1'
155 @property 156 def network(self) -> network.Network: 157 if 'network' in self._resource_data['networkConfig']: 158 network_string = self._resource_data['networkConfig']['network'] 159 match = re.match(r'projects/([^/]+)/global/networks/([^/]+)$', 160 network_string) 161 if match: 162 return network.get_network(match.group(1), match.group(2)) 163 else: 164 return network.get_network(self.project_id, network_string) 165 166 return network.get_network(self.project_id, 'default')
180@caching.cached_api_call 181def get_instances(context: models.Context) -> Mapping[str, Instance]: 182 """Get a dict of Instance matching the given context, indexed by instance full path.""" 183 instances: Dict[str, Instance] = {} 184 185 if not apis.is_enabled(context.project_id, 'datafusion'): 186 return instances 187 188 logging.info('fetching list of Data Fusion instances in project %s', 189 context.project_id) 190 datafusion_api = apis.get_api('datafusion', 'v1', context.project_id) 191 query = datafusion_api.projects().locations().instances().list( 192 parent=f'projects/{context.project_id}/locations/-' 193 ) #'-' (wildcard) all regions 194 195 try: 196 resp = query.execute(num_retries=config.API_RETRIES) 197 if 'instances' not in resp: 198 return instances 199 200 for i in resp['instances']: 201 # projects/{project}/locations/{location}/instances/{instance}. 202 result = re.match(r'projects/[^/]+/locations/([^/]+)/instances/([^/]+)', 203 i['name']) 204 if not result: 205 logging.error('invalid datafusion name: %s', i['name']) 206 continue 207 location = result.group(1) 208 labels = i.get('labels', {}) 209 name = result.group(2) 210 if not context.match_project_resource( 211 location=location, labels=labels, resource=name): 212 continue 213 214 instances[i['name']] = Instance(project_id=context.project_id, 215 resource_data=i) 216 217 except googleapiclient.errors.HttpError as err: 218 raise utils.GcpApiError(err) from err 219 220 return instances
Get a dict of Instance matching the given context, indexed by instance full path.
223@caching.cached_api_call 224def extract_support_datafusion_version() -> Dict[str, str]: 225 """Extract the version policy dictionary from the data fusion version support policy page. 226 227 Returns: 228 A dictionary of data fusion versions and their support end dates. 229 """ 230 page_url = 'https://cloud.google.com/data-fusion/docs/support/version-support-policy' 231 232 try: 233 data_fusion_table = web.fetch_and_extract_table(page_url, 234 tag='h2', 235 tag_id='support_timelines') 236 if data_fusion_table: 237 versions = [] 238 support_end_dates = [] 239 version_policy_dict = {} 240 241 for row in data_fusion_table.find_all('tr')[1:]: 242 columns = row.find_all('td') 243 version = columns[0] 244 support_end_date = columns[2].text.strip() 245 if version.sup: 246 version.sup.decompose() 247 248 version = version.text.strip() 249 try: 250 support_end_date = datetime.datetime.strptime(support_end_date, 251 '%B %d, %Y') 252 support_end_date = datetime.datetime.strftime(support_end_date, 253 '%Y-%m-%d') 254 except ValueError: 255 continue 256 257 versions.append(version) 258 support_end_dates.append(support_end_date) 259 260 version_policy_dict = dict(zip(versions, support_end_dates)) 261 return version_policy_dict 262 263 else: 264 return {} 265 266 except ( 267 requests.exceptions.RequestException, 268 AttributeError, 269 TypeError, 270 ValueError, 271 IndexError, 272 ) as e: 273 logging.error('Error in extracting data fusion version support policy: %s', 274 e) 275 return {}
Extract the version policy dictionary from the data fusion version support policy page.
Returns:
A dictionary of data fusion versions and their support end dates.
278class Profile(models.Resource): 279 """Represents a Compute Profile.""" 280 281 _resource_data: dict 282 283 def __init__(self, project_id, instance_name, resource_data): 284 super().__init__(project_id=project_id) 285 self.instance_name = instance_name 286 self._resource_data = resource_data 287 288 @property 289 def full_path(self) -> str: 290 """The full path form : 291 292 projects/{project}/instances/{instance}/computeProfiles/{profile}. 293 """ 294 return (f'projects/{self.project_id}/instances/{self.instance_name}' 295 f"/computeProfiles/{self._resource_data['name']}") 296 297 @property 298 def short_path(self) -> str: 299 """The short path form : 300 301 {project}/{instance}/{profile}. 302 """ 303 return ( 304 f"{self.project_id}/{self.instance_name}/{self._resource_data['name']}") 305 306 @property 307 def name(self) -> str: 308 return self._resource_data['name'] 309 310 @property 311 def region(self) -> str: 312 for value in self._resource_data['provisioner'].get('properties'): 313 if value.get('name') == 'region' and value.get('value') is not None: 314 return value.get('value') 315 return 'No region defined' 316 317 @property 318 def status(self) -> str: 319 return self._resource_data['status'] 320 321 @property 322 def scope(self) -> str: 323 return self._resource_data['scope'] 324 325 @property 326 def is_dataproc_provisioner(self) -> bool: 327 return self._resource_data['provisioner']['name'] == 'gcp-dataproc' 328 329 @property 330 def is_existing_dataproc_provisioner(self) -> bool: 331 return self._resource_data['provisioner']['name'] == 'gcp-existing-dataproc' 332 333 @property 334 def autoscaling_enabled(self) -> bool: 335 for value in self._resource_data['provisioner'].get('properties'): 336 if (value.get('name') == 'enablePredefinedAutoScaling' and 337 value.get('value') is not None): 338 return value.get('value') == 'true' 339 return False 340 341 @property 342 def image_version(self) -> str: 343 for value in self._resource_data['provisioner'].get('properties'): 344 if value.get('name') == 'imageVersion' and value.get('value') != '': 345 return value.get('value') 346 return 'No imageVersion defined' 347 348 @property 349 def auto_scaling_policy(self) -> str: 350 for value in self._resource_data['provisioner'].get('properties'): 351 if value.get('name') == 'autoScalingPolicy' and value.get('value') != '': 352 return value.get('value') 353 return 'No autoScalingPolicy defined'
Represents a Compute Profile.
288 @property 289 def full_path(self) -> str: 290 """The full path form : 291 292 projects/{project}/instances/{instance}/computeProfiles/{profile}. 293 """ 294 return (f'projects/{self.project_id}/instances/{self.instance_name}' 295 f"/computeProfiles/{self._resource_data['name']}")
The full path form :
projects/{project}/instances/{instance}/computeProfiles/{profile}.
297 @property 298 def short_path(self) -> str: 299 """The short path form : 300 301 {project}/{instance}/{profile}. 302 """ 303 return ( 304 f"{self.project_id}/{self.instance_name}/{self._resource_data['name']}")
The short path form :
{project}/{instance}/{profile}.
356@caching.cached_api_call 357def get_instance_system_compute_profile( 358 context: models.Context, instance: Instance) -> Iterable[Profile]: 359 """Get a list of datafusion Instance dataproc System compute profile.""" 360 logging.info('fetching dataproc System compute profile list: %s', 361 context.project_id) 362 system_profiles: List[Profile] = [] 363 cdap_endpoint = instance.api_endpoint 364 datafusion = get_generic.get_generic_api('datafusion', cdap_endpoint) 365 response = datafusion.get_system_profiles() 366 if response is not None: 367 for res in response: 368 if (res['provisioner']['name'] == 'gcp-dataproc' or 369 res['provisioner']['name'] == 'gcp-existing-dataproc'): 370 system_profiles.append(Profile(context.project_id, instance.name, res)) 371 return system_profiles
Get a list of datafusion Instance dataproc System compute profile.
374@caching.cached_api_call 375def get_instance_user_compute_profile(context: models.Context, 376 instance: Instance) -> Iterable[Profile]: 377 """Get a list of datafusion Instance dataproc User compute profile.""" 378 logging.info('fetching dataproc User compute profile list: %s', 379 context.project_id) 380 user_profiles: List[Profile] = [] 381 cdap_endpoint = instance.api_endpoint 382 datafusion = get_generic.get_generic_api('datafusion', cdap_endpoint) 383 response_namespaces = datafusion.get_all_namespaces() 384 if response_namespaces is not None: 385 for res in response_namespaces: 386 response = datafusion.get_user_profiles(namespace=res['name']) 387 if response is not None: 388 for res in response: 389 if (res['provisioner']['name'] == 'gcp-dataproc' or 390 res['provisioner']['name'] == 'gcp-existing-dataproc'): 391 user_profiles.append(Profile(context.project_id, instance.name, 392 res)) 393 user_profiles = list(filter(bool, user_profiles)) 394 return user_profiles
Get a list of datafusion Instance dataproc User compute profile.
397@caching.cached_api_call 398def extract_datafusion_dataproc_version() -> Dict[str, list[str]]: 399 """Extract the supported Data Fusion versions and their corresponding 400 Dataproc versions from the GCP documentation.""" 401 402 page_url = 'https://cloud.google.com/data-fusion/docs/concepts/configure-clusters' 403 404 try: 405 table = web.fetch_and_extract_table(page_url, 406 tag='h2', 407 tag_id='version-compatibility') 408 if table: 409 rows = table.find_all('tr')[1:] #Skip the header row 410 version_dict = {} 411 412 for row in rows: 413 cdf_versions = row.find_all('td')[0].get_text().strip() 414 dp_versions = row.find_all('td')[1].get_text().strip() 415 416 cdf_versions = cdf_versions.replace(' and later', '') 417 cdf_versions_list = [] 418 419 if '-' in cdf_versions: 420 start, end = map(float, cdf_versions.split('-')) 421 while start <= end: 422 cdf_versions_list.append(f'{start:.1f}') 423 start += 0.1 424 else: 425 cdf_versions_list.append(cdf_versions) 426 dp_versions = [v.split('*')[0].strip() for v in dp_versions.split(',')] 427 for version in cdf_versions_list: 428 version_dict[version] = dp_versions 429 return version_dict 430 431 else: 432 return {} 433 except ( 434 requests.exceptions.RequestException, 435 AttributeError, 436 TypeError, 437 ValueError, 438 IndexError, 439 ) as e: 440 logging.error( 441 'Error in extracting datafusion and dataproc versions: %s', 442 e, 443 ) 444 return {}
Extract the supported Data Fusion versions and their corresponding Dataproc versions from the GCP documentation.
447class Preference(models.Resource): 448 """Represents a Preference.""" 449 450 _resource_data: dict 451 452 def __init__(self, project_id, instance, resource_data): 453 super().__init__(project_id=project_id) 454 self.instance = instance 455 self._resource_data = resource_data 456 457 @property 458 def full_path(self) -> str: 459 """The full path form : 460 461 projects/{project}/locations/{location}/instances/{instance}. 462 """ 463 return self.instance.full_path 464 465 @property 466 def image_version(self): 467 return self._resource_data.get('system.profile.properties.imageVersion', 468 None)
Represents a Preference.
471def get_system_preferences(context: models.Context, 472 instance: Instance) -> Preference: 473 """Get datafusion Instance system preferences.""" 474 logging.info('fetching dataproc System preferences: %s', context.project_id) 475 cdap_endpoint = instance.api_endpoint 476 datafusion = get_generic.get_generic_api('datafusion', cdap_endpoint) 477 response = datafusion.get_system_preferences() 478 return Preference(context.project_id, instance, response)
Get datafusion Instance system preferences.
481def get_namespace_preferences(context: models.Context, 482 instance: Instance) -> Mapping[str, Preference]: 483 """Get datafusion cdap namespace preferences. 484 """ 485 logging.info('fetching dataproc namespace preferences: %s', 486 context.project_id) 487 cdap_endpoint = instance.api_endpoint 488 datafusion = get_generic.get_generic_api('datafusion', cdap_endpoint) 489 namespaces = datafusion.get_all_namespaces() 490 namespaces_preferences = {} 491 if namespaces is not None: 492 for namespace in namespaces: 493 response = datafusion.get_namespace_preferences( 494 namespace=namespace['name']) 495 if bool(response): 496 namespaces_preferences[namespace['name']] = Preference( 497 context.project_id, instance, response) 498 return namespaces_preferences
Get datafusion cdap namespace preferences.
501def get_application_preferences(context: models.Context, 502 instance: Instance) -> Mapping[str, Preference]: 503 """Get datafusion cdap application preferences.""" 504 logging.info('fetching dataproc application preferences: %s', 505 context.project_id) 506 cdap_endpoint = instance.api_endpoint 507 datafusion = get_generic.get_generic_api('datafusion', cdap_endpoint) 508 applications_preferences = {} 509 namespaces = datafusion.get_all_namespaces() 510 if namespaces is not None: 511 for namespace in namespaces: 512 applications = datafusion.get_all_applications( 513 namespace=namespace['name']) 514 if applications is not None: 515 for application in applications: 516 response = datafusion.get_application_preferences( 517 namespace=namespace['name'], application_name=application['name']) 518 if bool(response): 519 applications_preferences[application['name']] = Preference( 520 context.project_id, instance, response) 521 return applications_preferences
Get datafusion cdap application preferences.