gcpdiag.queries.datafusion
37class Instance(models.Resource): 38 """Represents a Data Fusion instance. 39 40 https://cloud.google.com/data-fusion/docs/reference/rest/v1/projects.locations.instances#Instance 41 """ 42 43 _resource_data: dict 44 45 def __init__(self, project_id, resource_data): 46 super().__init__(project_id=project_id) 47 self._resource_data = resource_data 48 49 @property 50 def full_path(self) -> str: 51 """ 52 The 'name' of the instance is already in the full path form 53 54 projects/{project}/locations/{location}/instances/{instance}. 55 """ 56 return self._resource_data['name'] 57 58 @property 59 def short_path(self) -> str: 60 path = self.full_path 61 path = re.sub(r'^projects/', '', path) 62 path = re.sub(r'/locations/', '/', path) 63 path = re.sub(r'/instances/', '/', path) 64 return path 65 66 @property 67 def name(self) -> str: 68 return utils.extract_value_from_res_name(self._resource_data['name'], 69 'instances') 70 71 @property 72 def location(self) -> str: 73 return utils.extract_value_from_res_name(self._resource_data['name'], 74 'locations') 75 76 @property 77 def zone(self) -> str: 78 return self._resource_data['zone'] 79 80 @property 81 def type(self) -> str: 82 return self._resource_data['type'] 83 84 @property 85 def is_basic_type(self) -> bool: 86 return self._resource_data['type'] == 'BASIC' 87 88 @property 89 def is_enterprise_type(self) -> bool: 90 return self._resource_data['type'] == 'ENTERPRISE' 91 92 @property 93 def is_developer_type(self) -> bool: 94 return self._resource_data['type'] == 'DEVELOPER' 95 96 @property 97 def is_private(self) -> bool: 98 if 'privateInstance' in self._resource_data: 99 return self._resource_data['privateInstance'] 100 return False 101 102 @property 103 def status(self) -> str: 104 return self._resource_data['state'] 105 106 @property 107 def status_details(self) -> Optional[str]: 108 if 'stateMessage' in self._resource_data: 109 return self._resource_data['stateMessage'] 110 return None 111 112 @property 113 def is_running(self) -> bool: 114 return self.status == 'ACTIVE' 115 116 @property 117 def is_deleting(self) -> bool: 118 return self._resource_data['state'] == 'DELETING' 119 120 @property 121 def version(self) -> Version: 122 return Version(self._resource_data['version']) 123 124 @property 125 def api_service_agent(self) -> str: 126 return self._resource_data['p4ServiceAccount'] 127 128 @property 129 def dataproc_service_account(self) -> str: 130 sa = self._resource_data.get('dataprocServiceAccount') 131 if sa is None: 132 sa = crm.get_project(self.project_id).default_compute_service_account 133 return sa 134 135 @property 136 def tenant_project_id(self) -> str: 137 return self._resource_data['tenantProjectId'] 138 139 @property 140 def uses_shared_vpc(self) -> bool: 141 """ 142 If shared VPC then 'network_string' = 'projects/{host-project-id}/global/networks/{network}' 143 else 'network_string' = {network} 144 """ 145 if 'network' in self._resource_data['networkConfig']: 146 network_string = self._resource_data['networkConfig']['network'] 147 match = re.match(r'projects/([^/]+)/global/networks/([^/]+)$', 148 network_string) 149 if match and match.group(1) != self.project_id: 150 return True 151 152 return False 153 154 @property 155 def network(self) -> network.Network: 156 if 'network' in self._resource_data['networkConfig']: 157 network_string = self._resource_data['networkConfig']['network'] 158 match = re.match(r'projects/([^/]+)/global/networks/([^/]+)$', 159 network_string) 160 if match: 161 return network.get_network(match.group(1), match.group(2)) 162 else: 163 return network.get_network(self.project_id, network_string) 164 165 return network.get_network(self.project_id, 'default') 166 167 @property 168 def tp_ipv4_cidr(self) -> Optional[IPv4NetOrIPv6Net]: 169 if 'network' in self._resource_data['networkConfig']: 170 cidr = self._resource_data['networkConfig']['ipAllocation'] 171 return ipaddress.ip_network(cidr) 172 return None 173 174 @property 175 def api_endpoint(self) -> str: 176 return self._resource_data['apiEndpoint']
Represents a Data Fusion instance.
https://cloud.google.com/data-fusion/docs/reference/rest/v1/projects.locations.instances#Instance
49 @property 50 def full_path(self) -> str: 51 """ 52 The 'name' of the instance is already in the full path form 53 54 projects/{project}/locations/{location}/instances/{instance}. 55 """ 56 return self._resource_data['name']
The 'name' of the instance is already in the full path form
projects/{project}/locations/{location}/instances/{instance}.
58 @property 59 def short_path(self) -> str: 60 path = self.full_path 61 path = re.sub(r'^projects/', '', path) 62 path = re.sub(r'/locations/', '/', path) 63 path = re.sub(r'/instances/', '/', path) 64 return path
Returns the short name for this resource.
Note that it isn't clear from this name what kind of resource it is.
Example: 'gke1'
154 @property 155 def network(self) -> network.Network: 156 if 'network' in self._resource_data['networkConfig']: 157 network_string = self._resource_data['networkConfig']['network'] 158 match = re.match(r'projects/([^/]+)/global/networks/([^/]+)$', 159 network_string) 160 if match: 161 return network.get_network(match.group(1), match.group(2)) 162 else: 163 return network.get_network(self.project_id, network_string) 164 165 return network.get_network(self.project_id, 'default')
179@caching.cached_api_call 180def get_instances(context: models.Context) -> Mapping[str, Instance]: 181 """Get a dict of Instance matching the given context, indexed by instance full path.""" 182 instances: Dict[str, Instance] = {} 183 184 if not apis.is_enabled(context.project_id, 'datafusion'): 185 return instances 186 187 logging.info('fetching list of Data Fusion instances in project %s', 188 context.project_id) 189 datafusion_api = apis.get_api('datafusion', 'v1', context.project_id) 190 query = datafusion_api.projects().locations().instances().list( 191 parent=f'projects/{context.project_id}/locations/-' 192 ) #'-' (wildcard) all regions 193 194 try: 195 resp = query.execute(num_retries=config.API_RETRIES) 196 if 'instances' not in resp: 197 return instances 198 199 for i in resp['instances']: 200 # projects/{project}/locations/{location}/instances/{instance}. 201 result = re.match(r'projects/[^/]+/locations/([^/]+)/instances/([^/]+)', 202 i['name']) 203 if not result: 204 logging.error('invalid datafusion name: %s', i['name']) 205 continue 206 location = result.group(1) 207 labels = i.get('labels', {}) 208 name = result.group(2) 209 if not context.match_project_resource( 210 location=location, labels=labels, resource=name): 211 continue 212 213 instances[i['name']] = Instance(project_id=context.project_id, 214 resource_data=i) 215 216 except googleapiclient.errors.HttpError as err: 217 raise utils.GcpApiError(err) from err 218 219 return instances
Get a dict of Instance matching the given context, indexed by instance full path.
222@caching.cached_api_call 223def extract_support_datafusion_version() -> Dict[str, str]: 224 """Extract the version policy dictionary from the data fusion version support policy page. 225 226 Returns: 227 A dictionary of data fusion versions and their support end dates. 228 """ 229 page_url = 'https://cloud.google.com/data-fusion/docs/support/version-support-policy' 230 231 try: 232 data_fusion_table = web.fetch_and_extract_table(page_url, 233 tag='h2', 234 tag_id='support_timelines') 235 if data_fusion_table: 236 versions = [] 237 support_end_dates = [] 238 version_policy_dict = {} 239 240 for row in data_fusion_table.find_all('tr')[1:]: 241 columns = row.find_all('td') 242 version = columns[0] 243 support_end_date = columns[2].text.strip() 244 if version.sup: 245 version.sup.decompose() 246 247 version = version.text.strip() 248 try: 249 support_end_date = datetime.datetime.strptime(support_end_date, 250 '%B %d, %Y') 251 support_end_date = datetime.datetime.strftime(support_end_date, 252 '%Y-%m-%d') 253 except ValueError: 254 continue 255 256 versions.append(version) 257 support_end_dates.append(support_end_date) 258 259 version_policy_dict = dict(zip(versions, support_end_dates)) 260 return version_policy_dict 261 262 else: 263 return {} 264 265 except ( 266 requests.exceptions.RequestException, 267 AttributeError, 268 TypeError, 269 ValueError, 270 IndexError, 271 ) as e: 272 logging.error('Error in extracting data fusion version support policy: %s', 273 e) 274 return {}
Extract the version policy dictionary from the data fusion version support policy page.
Returns:
A dictionary of data fusion versions and their support end dates.
277class Profile(models.Resource): 278 """Represents a Compute Profile.""" 279 280 _resource_data: dict 281 282 def __init__(self, project_id, instance_name, resource_data): 283 super().__init__(project_id=project_id) 284 self.instance_name = instance_name 285 self._resource_data = resource_data 286 287 @property 288 def full_path(self) -> str: 289 """The full path form : 290 291 projects/{project}/instances/{instance}/computeProfiles/{profile}. 292 """ 293 return (f'projects/{self.project_id}/instances/{self.instance_name}' 294 f'/computeProfiles/{self._resource_data["name"]}') 295 296 @property 297 def short_path(self) -> str: 298 """The short path form : 299 300 {project}/{instance}/{profile}. 301 """ 302 return ( 303 f'{self.project_id}/{self.instance_name}/{self._resource_data["name"]}') 304 305 @property 306 def name(self) -> str: 307 return self._resource_data['name'] 308 309 @property 310 def region(self) -> str: 311 for value in self._resource_data['provisioner'].get('properties'): 312 if value.get('name') == 'region' and value.get('value') is not None: 313 return value.get('value') 314 return 'No region defined' 315 316 @property 317 def status(self) -> str: 318 return self._resource_data['status'] 319 320 @property 321 def scope(self) -> str: 322 return self._resource_data['scope'] 323 324 @property 325 def is_dataproc_provisioner(self) -> bool: 326 return self._resource_data['provisioner']['name'] == 'gcp-dataproc' 327 328 @property 329 def is_existing_dataproc_provisioner(self) -> bool: 330 return self._resource_data['provisioner']['name'] == 'gcp-existing-dataproc' 331 332 @property 333 def autoscaling_enabled(self) -> bool: 334 for value in self._resource_data['provisioner'].get('properties'): 335 if (value.get('name') == 'enablePredefinedAutoScaling' and 336 value.get('value') is not None): 337 return value.get('value') == 'true' 338 return False 339 340 @property 341 def image_version(self) -> str: 342 for value in self._resource_data['provisioner'].get('properties'): 343 if value.get('name') == 'imageVersion' and value.get('value') != '': 344 return value.get('value') 345 return 'No imageVersion defined' 346 347 @property 348 def auto_scaling_policy(self) -> str: 349 for value in self._resource_data['provisioner'].get('properties'): 350 if value.get('name') == 'autoScalingPolicy' and value.get('value') != '': 351 return value.get('value') 352 return 'No autoScalingPolicy defined'
Represents a Compute Profile.
287 @property 288 def full_path(self) -> str: 289 """The full path form : 290 291 projects/{project}/instances/{instance}/computeProfiles/{profile}. 292 """ 293 return (f'projects/{self.project_id}/instances/{self.instance_name}' 294 f'/computeProfiles/{self._resource_data["name"]}')
The full path form :
projects/{project}/instances/{instance}/computeProfiles/{profile}.
296 @property 297 def short_path(self) -> str: 298 """The short path form : 299 300 {project}/{instance}/{profile}. 301 """ 302 return ( 303 f'{self.project_id}/{self.instance_name}/{self._resource_data["name"]}')
The short path form :
{project}/{instance}/{profile}.
355@caching.cached_api_call 356def get_instance_system_compute_profile( 357 context: models.Context, instance: Instance) -> Iterable[Profile]: 358 """Get a list of datafusion Instance dataproc System compute profile.""" 359 logging.info('fetching dataproc System compute profile list: %s', 360 context.project_id) 361 system_profiles: List[Profile] = [] 362 cdap_endpoint = instance.api_endpoint 363 datafusion = get_generic.get_generic_api('datafusion', cdap_endpoint) 364 response = datafusion.get_system_profiles() 365 if response is not None: 366 for res in response: 367 if (res['provisioner']['name'] == 'gcp-dataproc' or 368 res['provisioner']['name'] == 'gcp-existing-dataproc'): 369 system_profiles.append(Profile(context.project_id, instance.name, res)) 370 return system_profiles
Get a list of datafusion Instance dataproc System compute profile.
373@caching.cached_api_call 374def get_instance_user_compute_profile(context: models.Context, 375 instance: Instance) -> Iterable[Profile]: 376 """Get a list of datafusion Instance dataproc User compute profile.""" 377 logging.info('fetching dataproc User compute profile list: %s', 378 context.project_id) 379 user_profiles: List[Profile] = [] 380 cdap_endpoint = instance.api_endpoint 381 datafusion = get_generic.get_generic_api('datafusion', cdap_endpoint) 382 response_namespaces = datafusion.get_all_namespaces() 383 if response_namespaces is not None: 384 for res in response_namespaces: 385 response = datafusion.get_user_profiles(namespace=res['name']) 386 if response is not None: 387 for res in response: 388 if (res['provisioner']['name'] == 'gcp-dataproc' or 389 res['provisioner']['name'] == 'gcp-existing-dataproc'): 390 user_profiles.append(Profile(context.project_id, instance.name, 391 res)) 392 user_profiles = list(filter(bool, user_profiles)) 393 return user_profiles
Get a list of datafusion Instance dataproc User compute profile.
396@caching.cached_api_call 397def extract_datafusion_dataproc_version() -> Dict[str, list[str]]: 398 """Extract the supported Data Fusion versions and their corresponding 399 Dataproc versions from the GCP documentation.""" 400 401 page_url = 'https://cloud.google.com/data-fusion/docs/concepts/configure-clusters' 402 403 try: 404 table = web.fetch_and_extract_table(page_url, 405 tag='h2', 406 tag_id='version-compatibility') 407 if table: 408 rows = table.find_all('tr')[1:] #Skip the header row 409 version_dict = {} 410 411 for row in rows: 412 cdf_versions = row.find_all('td')[0].get_text().strip() 413 dp_versions = row.find_all('td')[1].get_text().strip() 414 415 cdf_versions = cdf_versions.replace(' and later', '') 416 cdf_versions_list = [] 417 418 if '-' in cdf_versions: 419 start, end = map(float, cdf_versions.split('-')) 420 while start <= end: 421 cdf_versions_list.append(f'{start:.1f}') 422 start += 0.1 423 else: 424 cdf_versions_list.append(cdf_versions) 425 dp_versions = [v.split('*')[0].strip() for v in dp_versions.split(',')] 426 for version in cdf_versions_list: 427 version_dict[version] = dp_versions 428 return version_dict 429 430 else: 431 return {} 432 except ( 433 requests.exceptions.RequestException, 434 AttributeError, 435 TypeError, 436 ValueError, 437 IndexError, 438 ) as e: 439 logging.error( 440 'Error in extracting datafusion and dataproc versions: %s', 441 e, 442 ) 443 return {}
Extract the supported Data Fusion versions and their corresponding Dataproc versions from the GCP documentation.
446class Preference(models.Resource): 447 """Represents a Preference.""" 448 449 _resource_data: dict 450 451 def __init__(self, project_id, instance, resource_data): 452 super().__init__(project_id=project_id) 453 self.instance = instance 454 self._resource_data = resource_data 455 456 @property 457 def full_path(self) -> str: 458 """The full path form : 459 460 projects/{project}/locations/{location}/instances/{instance}. 461 """ 462 return self.instance.full_path 463 464 @property 465 def image_version(self): 466 return self._resource_data.get('system.profile.properties.imageVersion', 467 None)
Represents a Preference.
470def get_system_preferences(context: models.Context, 471 instance: Instance) -> Preference: 472 """Get datafusion Instance system preferences.""" 473 logging.info('fetching dataproc System preferences: %s', context.project_id) 474 cdap_endpoint = instance.api_endpoint 475 datafusion = get_generic.get_generic_api('datafusion', cdap_endpoint) 476 response = datafusion.get_system_preferences() 477 return Preference(context.project_id, instance, response)
Get datafusion Instance system preferences.
480def get_namespace_preferences(context: models.Context, 481 instance: Instance) -> Mapping[str, Preference]: 482 """Get datafusion cdap namespace preferences. 483 """ 484 logging.info('fetching dataproc namespace preferences: %s', 485 context.project_id) 486 cdap_endpoint = instance.api_endpoint 487 datafusion = get_generic.get_generic_api('datafusion', cdap_endpoint) 488 namespaces = datafusion.get_all_namespaces() 489 namespaces_preferences = {} 490 if namespaces is not None: 491 for namespace in namespaces: 492 response = datafusion.get_namespace_preferences( 493 namespace=namespace['name']) 494 if bool(response): 495 namespaces_preferences[namespace['name']] = Preference( 496 context.project_id, instance, response) 497 return namespaces_preferences
Get datafusion cdap namespace preferences.
500def get_application_preferences(context: models.Context, 501 instance: Instance) -> Mapping[str, Preference]: 502 """Get datafusion cdap application preferences.""" 503 logging.info('fetching dataproc application preferences: %s', 504 context.project_id) 505 cdap_endpoint = instance.api_endpoint 506 datafusion = get_generic.get_generic_api('datafusion', cdap_endpoint) 507 applications_preferences = {} 508 namespaces = datafusion.get_all_namespaces() 509 if namespaces is not None: 510 for namespace in namespaces: 511 applications = datafusion.get_all_applications( 512 namespace=namespace['name']) 513 if applications is not None: 514 for application in applications: 515 response = datafusion.get_application_preferences( 516 namespace=namespace['name'], application_name=application['name']) 517 if bool(response): 518 applications_preferences[application['name']] = Preference( 519 context.project_id, instance, response) 520 return applications_preferences
Get datafusion cdap application preferences.