gcpdiag.queries.datafusion

Queries related to Data Fusion.
IPv4NetOrIPv6Net = typing.Union[ipaddress.IPv4Network, ipaddress.IPv6Network]
class Instance(gcpdiag.models.Resource):
 38class Instance(models.Resource):
 39  """Represents a Data Fusion instance.
 40
 41  https://cloud.google.com/data-fusion/docs/reference/rest/v1/projects.locations.instances#Instance
 42  """
 43
 44  _resource_data: dict
 45
 46  def __init__(self, project_id, resource_data):
 47    super().__init__(project_id=project_id)
 48    self._resource_data = resource_data
 49
 50  @property
 51  def full_path(self) -> str:
 52    """
 53    The 'name' of the instance is already in the full path form
 54
 55    projects/{project}/locations/{location}/instances/{instance}.
 56    """
 57    return self._resource_data['name']
 58
 59  @property
 60  def short_path(self) -> str:
 61    path = self.full_path
 62    path = re.sub(r'^projects/', '', path)
 63    path = re.sub(r'/locations/', '/', path)
 64    path = re.sub(r'/instances/', '/', path)
 65    return path
 66
 67  @property
 68  def name(self) -> str:
 69    return utils.extract_value_from_res_name(self._resource_data['name'],
 70                                             'instances')
 71
 72  @property
 73  def location(self) -> str:
 74    return utils.extract_value_from_res_name(self._resource_data['name'],
 75                                             'locations')
 76
 77  @property
 78  def zone(self) -> str:
 79    return self._resource_data['zone']
 80
 81  @property
 82  def type(self) -> str:
 83    return self._resource_data['type']
 84
 85  @property
 86  def is_basic_type(self) -> bool:
 87    return self._resource_data['type'] == 'BASIC'
 88
 89  @property
 90  def is_enterprise_type(self) -> bool:
 91    return self._resource_data['type'] == 'ENTERPRISE'
 92
 93  @property
 94  def is_developer_type(self) -> bool:
 95    return self._resource_data['type'] == 'DEVELOPER'
 96
 97  @property
 98  def is_private(self) -> bool:
 99    if 'privateInstance' in self._resource_data:
100      return self._resource_data['privateInstance']
101    return False
102
103  @property
104  def status(self) -> str:
105    return self._resource_data['state']
106
107  @property
108  def status_details(self) -> Optional[str]:
109    if 'stateMessage' in self._resource_data:
110      return self._resource_data['stateMessage']
111    return None
112
113  @property
114  def is_running(self) -> bool:
115    return self.status == 'ACTIVE'
116
117  @property
118  def is_deleting(self) -> bool:
119    return self._resource_data['state'] == 'DELETING'
120
121  @property
122  def version(self) -> Version:
123    return Version(self._resource_data['version'])
124
125  @property
126  def api_service_agent(self) -> str:
127    return self._resource_data['p4ServiceAccount']
128
129  @property
130  def dataproc_service_account(self) -> str:
131    sa = self._resource_data.get('dataprocServiceAccount')
132    if sa is None:
133      sa = crm.get_project(self.project_id).default_compute_service_account
134    return sa
135
136  @property
137  def tenant_project_id(self) -> str:
138    return self._resource_data['tenantProjectId']
139
140  @property
141  def uses_shared_vpc(self) -> bool:
142    """
143    If shared VPC then 'network_string' = 'projects/{host-project-id}/global/networks/{network}'
144    else 'network_string' = {network}
145    """
146    if 'network' in self._resource_data['networkConfig']:
147      network_string = self._resource_data['networkConfig']['network']
148      match = re.match(r'projects/([^/]+)/global/networks/([^/]+)$',
149                       network_string)
150      if match and match.group(1) != self.project_id:
151        return True
152
153    return False
154
155  @property
156  def network(self) -> network.Network:
157    if 'network' in self._resource_data['networkConfig']:
158      network_string = self._resource_data['networkConfig']['network']
159      match = re.match(r'projects/([^/]+)/global/networks/([^/]+)$',
160                       network_string)
161      if match:
162        return network.get_network(
163            match.group(1),
164            match.group(2),
165            context=models.Context(project_id=match.group(1)),
166        )
167      else:
168        return network.get_network(
169            self.project_id,
170            network_string,
171            context=models.Context(project_id=self.project_id),
172        )
173
174    return network.get_network(
175        self.project_id,
176        'default',
177        context=models.Context(project_id=self.project_id),
178    )
179
180  @property
181  def tp_ipv4_cidr(self) -> Optional[IPv4NetOrIPv6Net]:
182    if 'network' in self._resource_data['networkConfig']:
183      cidr = self._resource_data['networkConfig']['ipAllocation']
184      return ipaddress.ip_network(cidr)
185    return None
186
187  @property
188  def api_endpoint(self) -> str:
189    return self._resource_data['apiEndpoint']
Instance(project_id, resource_data)
46  def __init__(self, project_id, resource_data):
47    super().__init__(project_id=project_id)
48    self._resource_data = resource_data
full_path: str
50  @property
51  def full_path(self) -> str:
52    """
53    The 'name' of the instance is already in the full path form
54
55    projects/{project}/locations/{location}/instances/{instance}.
56    """
57    return self._resource_data['name']

The 'name' of the instance is already in the full path form

projects/{project}/locations/{location}/instances/{instance}.

short_path: str
59  @property
60  def short_path(self) -> str:
61    path = self.full_path
62    path = re.sub(r'^projects/', '', path)
63    path = re.sub(r'/locations/', '/', path)
64    path = re.sub(r'/instances/', '/', path)
65    return path

Returns the short name for this resource.

Note that it isn't clear from this name what kind of resource it is.

Example: 'gke1'

name: str
67  @property
68  def name(self) -> str:
69    return utils.extract_value_from_res_name(self._resource_data['name'],
70                                             'instances')
location: str
72  @property
73  def location(self) -> str:
74    return utils.extract_value_from_res_name(self._resource_data['name'],
75                                             'locations')
zone: str
77  @property
78  def zone(self) -> str:
79    return self._resource_data['zone']
type: str
81  @property
82  def type(self) -> str:
83    return self._resource_data['type']
is_basic_type: bool
85  @property
86  def is_basic_type(self) -> bool:
87    return self._resource_data['type'] == 'BASIC'
is_enterprise_type: bool
89  @property
90  def is_enterprise_type(self) -> bool:
91    return self._resource_data['type'] == 'ENTERPRISE'
is_developer_type: bool
93  @property
94  def is_developer_type(self) -> bool:
95    return self._resource_data['type'] == 'DEVELOPER'
is_private: bool
 97  @property
 98  def is_private(self) -> bool:
 99    if 'privateInstance' in self._resource_data:
100      return self._resource_data['privateInstance']
101    return False
status: str
103  @property
104  def status(self) -> str:
105    return self._resource_data['state']
status_details: Optional[str]
107  @property
108  def status_details(self) -> Optional[str]:
109    if 'stateMessage' in self._resource_data:
110      return self._resource_data['stateMessage']
111    return None
is_running: bool
113  @property
114  def is_running(self) -> bool:
115    return self.status == 'ACTIVE'
is_deleting: bool
117  @property
118  def is_deleting(self) -> bool:
119    return self._resource_data['state'] == 'DELETING'
version: gcpdiag.utils.Version
121  @property
122  def version(self) -> Version:
123    return Version(self._resource_data['version'])
api_service_agent: str
125  @property
126  def api_service_agent(self) -> str:
127    return self._resource_data['p4ServiceAccount']
dataproc_service_account: str
129  @property
130  def dataproc_service_account(self) -> str:
131    sa = self._resource_data.get('dataprocServiceAccount')
132    if sa is None:
133      sa = crm.get_project(self.project_id).default_compute_service_account
134    return sa
tenant_project_id: str
136  @property
137  def tenant_project_id(self) -> str:
138    return self._resource_data['tenantProjectId']
uses_shared_vpc: bool
140  @property
141  def uses_shared_vpc(self) -> bool:
142    """
143    If shared VPC then 'network_string' = 'projects/{host-project-id}/global/networks/{network}'
144    else 'network_string' = {network}
145    """
146    if 'network' in self._resource_data['networkConfig']:
147      network_string = self._resource_data['networkConfig']['network']
148      match = re.match(r'projects/([^/]+)/global/networks/([^/]+)$',
149                       network_string)
150      if match and match.group(1) != self.project_id:
151        return True
152
153    return False

If shared VPC then 'network_string' = 'projects/{host-project-id}/global/networks/{network}' else 'network_string' = {network}

network: gcpdiag.queries.network.Network
155  @property
156  def network(self) -> network.Network:
157    if 'network' in self._resource_data['networkConfig']:
158      network_string = self._resource_data['networkConfig']['network']
159      match = re.match(r'projects/([^/]+)/global/networks/([^/]+)$',
160                       network_string)
161      if match:
162        return network.get_network(
163            match.group(1),
164            match.group(2),
165            context=models.Context(project_id=match.group(1)),
166        )
167      else:
168        return network.get_network(
169            self.project_id,
170            network_string,
171            context=models.Context(project_id=self.project_id),
172        )
173
174    return network.get_network(
175        self.project_id,
176        'default',
177        context=models.Context(project_id=self.project_id),
178    )
tp_ipv4_cidr: Union[ipaddress.IPv4Network, ipaddress.IPv6Network, NoneType]
180  @property
181  def tp_ipv4_cidr(self) -> Optional[IPv4NetOrIPv6Net]:
182    if 'network' in self._resource_data['networkConfig']:
183      cidr = self._resource_data['networkConfig']['ipAllocation']
184      return ipaddress.ip_network(cidr)
185    return None
api_endpoint: str
187  @property
188  def api_endpoint(self) -> str:
189    return self._resource_data['apiEndpoint']
@caching.cached_api_call
def get_instances( context: gcpdiag.models.Context) -> Mapping[str, Instance]:
192@caching.cached_api_call
193def get_instances(context: models.Context) -> Mapping[str, Instance]:
194  """Get a dict of Instance matching the given context, indexed by instance full path."""
195  instances: Dict[str, Instance] = {}
196
197  if not apis.is_enabled(context.project_id, 'datafusion'):
198    return instances
199
200  logging.debug('fetching list of Data Fusion instances in project %s',
201                context.project_id)
202  datafusion_api = apis.get_api('datafusion', 'v1', context.project_id)
203  query = datafusion_api.projects().locations().instances().list(
204      parent=f'projects/{context.project_id}/locations/-'
205  )  #'-' (wildcard) all regions
206
207  try:
208    resp = query.execute(num_retries=config.API_RETRIES)
209    if 'instances' not in resp:
210      return instances
211
212    for i in resp['instances']:
213      # projects/{project}/locations/{location}/instances/{instance}.
214      result = re.match(r'projects/[^/]+/locations/([^/]+)/instances/([^/]+)',
215                        i['name'])
216      if not result:
217        logging.error('invalid datafusion name: %s', i['name'])
218        continue
219      location = result.group(1)
220      labels = i.get('labels', {})
221      name = result.group(2)
222      if not context.match_project_resource(
223          location=location, labels=labels, resource=name):
224        continue
225
226      instances[i['name']] = Instance(project_id=context.project_id,
227                                      resource_data=i)
228
229  except googleapiclient.errors.HttpError as err:
230    raise utils.GcpApiError(err) from err
231
232  return instances

Get a dict of Instance matching the given context, indexed by instance full path.

@caching.cached_api_call
def extract_support_datafusion_version() -> Dict[str, str]:
235@caching.cached_api_call
236def extract_support_datafusion_version() -> Dict[str, str]:
237  """Extract the version policy dictionary from the data fusion version support policy page.
238
239  Returns:
240    A dictionary of data fusion versions and their support end dates.
241  """
242  page_url = 'https://cloud.google.com/data-fusion/docs/support/version-support-policy'
243
244  try:
245    data_fusion_table = web.fetch_and_extract_table(page_url,
246                                                    tag='h2',
247                                                    tag_id='support_timelines')
248    if data_fusion_table:
249      versions = []
250      support_end_dates = []
251      version_policy_dict = {}
252
253      for row in data_fusion_table.find_all('tr')[1:]:
254        columns = row.find_all('td')
255        version = columns[0]
256        support_end_date = columns[2].text.strip()
257        if version.sup:
258          version.sup.decompose()
259
260        version = version.text.strip()
261        try:
262          support_end_date = datetime.datetime.strptime(support_end_date,
263                                                        '%B %d, %Y')
264          support_end_date = datetime.datetime.strftime(support_end_date,
265                                                        '%Y-%m-%d')
266        except ValueError:
267          continue
268
269        versions.append(version)
270        support_end_dates.append(support_end_date)
271
272        version_policy_dict = dict(zip(versions, support_end_dates))
273      return version_policy_dict
274
275    else:
276      return {}
277
278  except (
279      requests.exceptions.RequestException,
280      AttributeError,
281      TypeError,
282      ValueError,
283      IndexError,
284  ) as e:
285    logging.error('Error in extracting data fusion version support policy: %s',
286                  e)
287    return {}

Extract the version policy dictionary from the data fusion version support policy page.

Returns:

A dictionary of data fusion versions and their support end dates.

class Profile(gcpdiag.models.Resource):
290class Profile(models.Resource):
291  """Represents a Compute Profile."""
292
293  _resource_data: dict
294
295  def __init__(self, project_id, instance_name, resource_data):
296    super().__init__(project_id=project_id)
297    self.instance_name = instance_name
298    self._resource_data = resource_data
299
300  @property
301  def full_path(self) -> str:
302    """The full path form :
303
304    projects/{project}/instances/{instance}/computeProfiles/{profile}.
305    """
306    return (f'projects/{self.project_id}/instances/{self.instance_name}'
307            f"/computeProfiles/{self._resource_data['name']}")
308
309  @property
310  def short_path(self) -> str:
311    """The short path form :
312
313    {project}/{instance}/{profile}.
314    """
315    return (
316        f"{self.project_id}/{self.instance_name}/{self._resource_data['name']}")
317
318  @property
319  def name(self) -> str:
320    return self._resource_data['name']
321
322  @property
323  def region(self) -> str:
324    for value in self._resource_data['provisioner'].get('properties'):
325      if value.get('name') == 'region' and value.get('value') is not None:
326        return value.get('value')
327    return 'No region defined'
328
329  @property
330  def status(self) -> str:
331    return self._resource_data['status']
332
333  @property
334  def scope(self) -> str:
335    return self._resource_data['scope']
336
337  @property
338  def is_dataproc_provisioner(self) -> bool:
339    return self._resource_data['provisioner']['name'] == 'gcp-dataproc'
340
341  @property
342  def is_existing_dataproc_provisioner(self) -> bool:
343    return self._resource_data['provisioner']['name'] == 'gcp-existing-dataproc'
344
345  @property
346  def autoscaling_enabled(self) -> bool:
347    for value in self._resource_data['provisioner'].get('properties'):
348      if (value.get('name') == 'enablePredefinedAutoScaling' and
349          value.get('value') is not None):
350        return value.get('value') == 'true'
351    return False
352
353  @property
354  def image_version(self) -> str:
355    for value in self._resource_data['provisioner'].get('properties'):
356      if value.get('name') == 'imageVersion' and value.get('value') != '':
357        return value.get('value')
358    return 'No imageVersion defined'
359
360  @property
361  def auto_scaling_policy(self) -> str:
362    for value in self._resource_data['provisioner'].get('properties'):
363      if value.get('name') == 'autoScalingPolicy' and value.get('value') != '':
364        return value.get('value')
365    return 'No autoScalingPolicy defined'

Represents a Compute Profile.

Profile(project_id, instance_name, resource_data)
295  def __init__(self, project_id, instance_name, resource_data):
296    super().__init__(project_id=project_id)
297    self.instance_name = instance_name
298    self._resource_data = resource_data
instance_name
full_path: str
300  @property
301  def full_path(self) -> str:
302    """The full path form :
303
304    projects/{project}/instances/{instance}/computeProfiles/{profile}.
305    """
306    return (f'projects/{self.project_id}/instances/{self.instance_name}'
307            f"/computeProfiles/{self._resource_data['name']}")

The full path form :

projects/{project}/instances/{instance}/computeProfiles/{profile}.

short_path: str
309  @property
310  def short_path(self) -> str:
311    """The short path form :
312
313    {project}/{instance}/{profile}.
314    """
315    return (
316        f"{self.project_id}/{self.instance_name}/{self._resource_data['name']}")

The short path form :

{project}/{instance}/{profile}.

name: str
318  @property
319  def name(self) -> str:
320    return self._resource_data['name']
region: str
322  @property
323  def region(self) -> str:
324    for value in self._resource_data['provisioner'].get('properties'):
325      if value.get('name') == 'region' and value.get('value') is not None:
326        return value.get('value')
327    return 'No region defined'
status: str
329  @property
330  def status(self) -> str:
331    return self._resource_data['status']
scope: str
333  @property
334  def scope(self) -> str:
335    return self._resource_data['scope']
is_dataproc_provisioner: bool
337  @property
338  def is_dataproc_provisioner(self) -> bool:
339    return self._resource_data['provisioner']['name'] == 'gcp-dataproc'
is_existing_dataproc_provisioner: bool
341  @property
342  def is_existing_dataproc_provisioner(self) -> bool:
343    return self._resource_data['provisioner']['name'] == 'gcp-existing-dataproc'
autoscaling_enabled: bool
345  @property
346  def autoscaling_enabled(self) -> bool:
347    for value in self._resource_data['provisioner'].get('properties'):
348      if (value.get('name') == 'enablePredefinedAutoScaling' and
349          value.get('value') is not None):
350        return value.get('value') == 'true'
351    return False
image_version: str
353  @property
354  def image_version(self) -> str:
355    for value in self._resource_data['provisioner'].get('properties'):
356      if value.get('name') == 'imageVersion' and value.get('value') != '':
357        return value.get('value')
358    return 'No imageVersion defined'
auto_scaling_policy: str
360  @property
361  def auto_scaling_policy(self) -> str:
362    for value in self._resource_data['provisioner'].get('properties'):
363      if value.get('name') == 'autoScalingPolicy' and value.get('value') != '':
364        return value.get('value')
365    return 'No autoScalingPolicy defined'
@caching.cached_api_call
def get_instance_system_compute_profile( context: gcpdiag.models.Context, instance: Instance) -> Iterable[Profile]:
368@caching.cached_api_call
369def get_instance_system_compute_profile(
370    context: models.Context, instance: Instance) -> Iterable[Profile]:
371  """Get a list of datafusion Instance dataproc System compute profile."""
372  logging.debug('fetching dataproc System compute profile list: %s',
373                context.project_id)
374  system_profiles: List[Profile] = []
375  cdap_endpoint = instance.api_endpoint
376  datafusion = get_generic.get_generic_api('datafusion', cdap_endpoint)
377  response = datafusion.get_system_profiles()
378  if response is not None:
379    for res in response:
380      if (res['provisioner']['name'] == 'gcp-dataproc' or
381          res['provisioner']['name'] == 'gcp-existing-dataproc'):
382        system_profiles.append(Profile(context.project_id, instance.name, res))
383  return system_profiles

Get a list of datafusion Instance dataproc System compute profile.

@caching.cached_api_call
def get_instance_user_compute_profile( context: gcpdiag.models.Context, instance: Instance) -> Iterable[Profile]:
386@caching.cached_api_call
387def get_instance_user_compute_profile(context: models.Context,
388                                      instance: Instance) -> Iterable[Profile]:
389  """Get a list of datafusion Instance dataproc User compute profile."""
390  logging.debug('fetching dataproc User compute profile list: %s',
391                context.project_id)
392  user_profiles: List[Profile] = []
393  cdap_endpoint = instance.api_endpoint
394  datafusion = get_generic.get_generic_api('datafusion', cdap_endpoint)
395  response_namespaces = datafusion.get_all_namespaces()
396  if response_namespaces is not None:
397    for res in response_namespaces:
398      response = datafusion.get_user_profiles(namespace=res['name'])
399      if response is not None:
400        for res in response:
401          if (res['provisioner']['name'] == 'gcp-dataproc' or
402              res['provisioner']['name'] == 'gcp-existing-dataproc'):
403            user_profiles.append(Profile(context.project_id, instance.name,
404                                         res))
405      user_profiles = list(filter(bool, user_profiles))
406  return user_profiles

Get a list of datafusion Instance dataproc User compute profile.

@caching.cached_api_call
def extract_datafusion_dataproc_version() -> Dict[str, list[str]]:
409@caching.cached_api_call
410def extract_datafusion_dataproc_version() -> Dict[str, list[str]]:
411  """Extract the supported Data Fusion versions and their corresponding
412  Dataproc versions from the GCP documentation."""
413
414  page_url = 'https://cloud.google.com/data-fusion/docs/concepts/configure-clusters'
415
416  try:
417    table = web.fetch_and_extract_table(page_url,
418                                        tag='h2',
419                                        tag_id='version-compatibility')
420    if table:
421      rows = table.find_all('tr')[1:]  #Skip the header row
422      version_dict = {}
423
424      for row in rows:
425        cdf_versions = row.find_all('td')[0].get_text().strip()
426        dp_versions = row.find_all('td')[1].get_text().strip()
427
428        cdf_versions = cdf_versions.replace(' and later', '')
429        cdf_versions_list = []
430
431        if '-' in cdf_versions:
432          start, end = map(float, cdf_versions.split('-'))
433          while start <= end:
434            cdf_versions_list.append(f'{start:.1f}')
435            start += 0.1
436        else:
437          cdf_versions_list.append(cdf_versions)
438        dp_versions = [v.split('*')[0].strip() for v in dp_versions.split(',')]
439        for version in cdf_versions_list:
440          version_dict[version] = dp_versions
441      return version_dict
442
443    else:
444      return {}
445  except (
446      requests.exceptions.RequestException,
447      AttributeError,
448      TypeError,
449      ValueError,
450      IndexError,
451  ) as e:
452    logging.error(
453        'Error in extracting datafusion and dataproc versions: %s',
454        e,
455    )
456    return {}

Extract the supported Data Fusion versions and their corresponding Dataproc versions from the GCP documentation.

class Preference(gcpdiag.models.Resource):
459class Preference(models.Resource):
460  """Represents a Preference."""
461
462  _resource_data: dict
463
464  def __init__(self, project_id, instance, resource_data):
465    super().__init__(project_id=project_id)
466    self.instance = instance
467    self._resource_data = resource_data
468
469  @property
470  def full_path(self) -> str:
471    """The full path form :
472
473    projects/{project}/locations/{location}/instances/{instance}.
474    """
475    return self.instance.full_path
476
477  @property
478  def image_version(self):
479    return self._resource_data.get('system.profile.properties.imageVersion',
480                                   None)

Represents a Preference.

Preference(project_id, instance, resource_data)
464  def __init__(self, project_id, instance, resource_data):
465    super().__init__(project_id=project_id)
466    self.instance = instance
467    self._resource_data = resource_data
instance
full_path: str
469  @property
470  def full_path(self) -> str:
471    """The full path form :
472
473    projects/{project}/locations/{location}/instances/{instance}.
474    """
475    return self.instance.full_path

The full path form :

projects/{project}/locations/{location}/instances/{instance}.

image_version
477  @property
478  def image_version(self):
479    return self._resource_data.get('system.profile.properties.imageVersion',
480                                   None)
def get_system_preferences( context: gcpdiag.models.Context, instance: Instance) -> Preference:
483def get_system_preferences(context: models.Context,
484                           instance: Instance) -> Preference:
485  """Get datafusion Instance system preferences."""
486  logging.debug('fetching dataproc System preferences: %s', context.project_id)
487  cdap_endpoint = instance.api_endpoint
488  datafusion = get_generic.get_generic_api('datafusion', cdap_endpoint)
489  response = datafusion.get_system_preferences()
490  return Preference(context.project_id, instance, response)

Get datafusion Instance system preferences.

def get_namespace_preferences( context: gcpdiag.models.Context, instance: Instance) -> Mapping[str, Preference]:
493def get_namespace_preferences(context: models.Context,
494                              instance: Instance) -> Mapping[str, Preference]:
495  """Get datafusion cdap namespace preferences.
496  """
497  logging.debug('fetching dataproc namespace preferences: %s',
498                context.project_id)
499  cdap_endpoint = instance.api_endpoint
500  datafusion = get_generic.get_generic_api('datafusion', cdap_endpoint)
501  namespaces = datafusion.get_all_namespaces()
502  namespaces_preferences = {}
503  if namespaces is not None:
504    for namespace in namespaces:
505      response = datafusion.get_namespace_preferences(
506          namespace=namespace['name'])
507      if bool(response):
508        namespaces_preferences[namespace['name']] = Preference(
509            context.project_id, instance, response)
510  return namespaces_preferences

Get datafusion cdap namespace preferences.

def get_application_preferences( context: gcpdiag.models.Context, instance: Instance) -> Mapping[str, Preference]:
513def get_application_preferences(context: models.Context,
514                                instance: Instance) -> Mapping[str, Preference]:
515  """Get datafusion cdap application preferences."""
516  logging.debug('fetching dataproc application preferences: %s',
517                context.project_id)
518  cdap_endpoint = instance.api_endpoint
519  datafusion = get_generic.get_generic_api('datafusion', cdap_endpoint)
520  applications_preferences = {}
521  namespaces = datafusion.get_all_namespaces()
522  if namespaces is not None:
523    for namespace in namespaces:
524      applications = datafusion.get_all_applications(
525          namespace=namespace['name'])
526      if applications is not None:
527        for application in applications:
528          response = datafusion.get_application_preferences(
529              namespace=namespace['name'], application_name=application['name'])
530          if bool(response):
531            applications_preferences[application['name']] = Preference(
532                context.project_id, instance, response)
533  return applications_preferences

Get datafusion cdap application preferences.