gcpdiag.queries.datafusion

Queries related to Data Fusion.
IPv4NetOrIPv6Net = typing.Union[ipaddress.IPv4Network, ipaddress.IPv6Network]
class Instance(gcpdiag.models.Resource):
 38class Instance(models.Resource):
 39  """Represents a Data Fusion instance.
 40
 41  https://cloud.google.com/data-fusion/docs/reference/rest/v1/projects.locations.instances#Instance
 42  """
 43
 44  _resource_data: dict
 45
 46  def __init__(self, project_id, resource_data):
 47    super().__init__(project_id=project_id)
 48    self._resource_data = resource_data
 49
 50  @property
 51  def full_path(self) -> str:
 52    """
 53    The 'name' of the instance is already in the full path form
 54
 55    projects/{project}/locations/{location}/instances/{instance}.
 56    """
 57    return self._resource_data['name']
 58
 59  @property
 60  def short_path(self) -> str:
 61    path = self.full_path
 62    path = re.sub(r'^projects/', '', path)
 63    path = re.sub(r'/locations/', '/', path)
 64    path = re.sub(r'/instances/', '/', path)
 65    return path
 66
 67  @property
 68  def name(self) -> str:
 69    return utils.extract_value_from_res_name(self._resource_data['name'],
 70                                             'instances')
 71
 72  @property
 73  def location(self) -> str:
 74    return utils.extract_value_from_res_name(self._resource_data['name'],
 75                                             'locations')
 76
 77  @property
 78  def zone(self) -> str:
 79    return self._resource_data['zone']
 80
 81  @property
 82  def type(self) -> str:
 83    return self._resource_data['type']
 84
 85  @property
 86  def is_basic_type(self) -> bool:
 87    return self._resource_data['type'] == 'BASIC'
 88
 89  @property
 90  def is_enterprise_type(self) -> bool:
 91    return self._resource_data['type'] == 'ENTERPRISE'
 92
 93  @property
 94  def is_developer_type(self) -> bool:
 95    return self._resource_data['type'] == 'DEVELOPER'
 96
 97  @property
 98  def is_private(self) -> bool:
 99    if 'privateInstance' in self._resource_data:
100      return self._resource_data['privateInstance']
101    return False
102
103  @property
104  def status(self) -> str:
105    return self._resource_data['state']
106
107  @property
108  def status_details(self) -> Optional[str]:
109    if 'stateMessage' in self._resource_data:
110      return self._resource_data['stateMessage']
111    return None
112
113  @property
114  def is_running(self) -> bool:
115    return self.status == 'ACTIVE'
116
117  @property
118  def is_deleting(self) -> bool:
119    return self._resource_data['state'] == 'DELETING'
120
121  @property
122  def version(self) -> Version:
123    return Version(self._resource_data['version'])
124
125  @property
126  def api_service_agent(self) -> str:
127    return self._resource_data['p4ServiceAccount']
128
129  @property
130  def dataproc_service_account(self) -> str:
131    sa = self._resource_data.get('dataprocServiceAccount')
132    if sa is None:
133      sa = crm.get_project(self.project_id).default_compute_service_account
134    return sa
135
136  @property
137  def tenant_project_id(self) -> str:
138    return self._resource_data['tenantProjectId']
139
140  @property
141  def uses_shared_vpc(self) -> bool:
142    """
143    If shared VPC then 'network_string' = 'projects/{host-project-id}/global/networks/{network}'
144    else 'network_string' = {network}
145    """
146    if 'network' in self._resource_data['networkConfig']:
147      network_string = self._resource_data['networkConfig']['network']
148      match = re.match(r'projects/([^/]+)/global/networks/([^/]+)$',
149                       network_string)
150      if match and match.group(1) != self.project_id:
151        return True
152
153    return False
154
155  @property
156  def network(self) -> network.Network:
157    if 'network' in self._resource_data['networkConfig']:
158      network_string = self._resource_data['networkConfig']['network']
159      match = re.match(r'projects/([^/]+)/global/networks/([^/]+)$',
160                       network_string)
161      if match:
162        return network.get_network(match.group(1), match.group(2))
163      else:
164        return network.get_network(self.project_id, network_string)
165
166    return network.get_network(self.project_id, 'default')
167
168  @property
169  def tp_ipv4_cidr(self) -> Optional[IPv4NetOrIPv6Net]:
170    if 'network' in self._resource_data['networkConfig']:
171      cidr = self._resource_data['networkConfig']['ipAllocation']
172      return ipaddress.ip_network(cidr)
173    return None
174
175  @property
176  def api_endpoint(self) -> str:
177    return self._resource_data['apiEndpoint']
Instance(project_id, resource_data)
46  def __init__(self, project_id, resource_data):
47    super().__init__(project_id=project_id)
48    self._resource_data = resource_data
full_path: str
50  @property
51  def full_path(self) -> str:
52    """
53    The 'name' of the instance is already in the full path form
54
55    projects/{project}/locations/{location}/instances/{instance}.
56    """
57    return self._resource_data['name']

The 'name' of the instance is already in the full path form

projects/{project}/locations/{location}/instances/{instance}.

short_path: str
59  @property
60  def short_path(self) -> str:
61    path = self.full_path
62    path = re.sub(r'^projects/', '', path)
63    path = re.sub(r'/locations/', '/', path)
64    path = re.sub(r'/instances/', '/', path)
65    return path

Returns the short name for this resource.

Note that it isn't clear from this name what kind of resource it is.

Example: 'gke1'

name: str
67  @property
68  def name(self) -> str:
69    return utils.extract_value_from_res_name(self._resource_data['name'],
70                                             'instances')
location: str
72  @property
73  def location(self) -> str:
74    return utils.extract_value_from_res_name(self._resource_data['name'],
75                                             'locations')
zone: str
77  @property
78  def zone(self) -> str:
79    return self._resource_data['zone']
type: str
81  @property
82  def type(self) -> str:
83    return self._resource_data['type']
is_basic_type: bool
85  @property
86  def is_basic_type(self) -> bool:
87    return self._resource_data['type'] == 'BASIC'
is_enterprise_type: bool
89  @property
90  def is_enterprise_type(self) -> bool:
91    return self._resource_data['type'] == 'ENTERPRISE'
is_developer_type: bool
93  @property
94  def is_developer_type(self) -> bool:
95    return self._resource_data['type'] == 'DEVELOPER'
is_private: bool
 97  @property
 98  def is_private(self) -> bool:
 99    if 'privateInstance' in self._resource_data:
100      return self._resource_data['privateInstance']
101    return False
status: str
103  @property
104  def status(self) -> str:
105    return self._resource_data['state']
status_details: Optional[str]
107  @property
108  def status_details(self) -> Optional[str]:
109    if 'stateMessage' in self._resource_data:
110      return self._resource_data['stateMessage']
111    return None
is_running: bool
113  @property
114  def is_running(self) -> bool:
115    return self.status == 'ACTIVE'
is_deleting: bool
117  @property
118  def is_deleting(self) -> bool:
119    return self._resource_data['state'] == 'DELETING'
version: gcpdiag.utils.Version
121  @property
122  def version(self) -> Version:
123    return Version(self._resource_data['version'])
api_service_agent: str
125  @property
126  def api_service_agent(self) -> str:
127    return self._resource_data['p4ServiceAccount']
dataproc_service_account: str
129  @property
130  def dataproc_service_account(self) -> str:
131    sa = self._resource_data.get('dataprocServiceAccount')
132    if sa is None:
133      sa = crm.get_project(self.project_id).default_compute_service_account
134    return sa
tenant_project_id: str
136  @property
137  def tenant_project_id(self) -> str:
138    return self._resource_data['tenantProjectId']
uses_shared_vpc: bool
140  @property
141  def uses_shared_vpc(self) -> bool:
142    """
143    If shared VPC then 'network_string' = 'projects/{host-project-id}/global/networks/{network}'
144    else 'network_string' = {network}
145    """
146    if 'network' in self._resource_data['networkConfig']:
147      network_string = self._resource_data['networkConfig']['network']
148      match = re.match(r'projects/([^/]+)/global/networks/([^/]+)$',
149                       network_string)
150      if match and match.group(1) != self.project_id:
151        return True
152
153    return False

If shared VPC then 'network_string' = 'projects/{host-project-id}/global/networks/{network}' else 'network_string' = {network}

network: gcpdiag.queries.network.Network
155  @property
156  def network(self) -> network.Network:
157    if 'network' in self._resource_data['networkConfig']:
158      network_string = self._resource_data['networkConfig']['network']
159      match = re.match(r'projects/([^/]+)/global/networks/([^/]+)$',
160                       network_string)
161      if match:
162        return network.get_network(match.group(1), match.group(2))
163      else:
164        return network.get_network(self.project_id, network_string)
165
166    return network.get_network(self.project_id, 'default')
tp_ipv4_cidr: Union[ipaddress.IPv4Network, ipaddress.IPv6Network, NoneType]
168  @property
169  def tp_ipv4_cidr(self) -> Optional[IPv4NetOrIPv6Net]:
170    if 'network' in self._resource_data['networkConfig']:
171      cidr = self._resource_data['networkConfig']['ipAllocation']
172      return ipaddress.ip_network(cidr)
173    return None
api_endpoint: str
175  @property
176  def api_endpoint(self) -> str:
177    return self._resource_data['apiEndpoint']
@caching.cached_api_call
def get_instances( context: gcpdiag.models.Context) -> Mapping[str, Instance]:
180@caching.cached_api_call
181def get_instances(context: models.Context) -> Mapping[str, Instance]:
182  """Get a dict of Instance matching the given context, indexed by instance full path."""
183  instances: Dict[str, Instance] = {}
184
185  if not apis.is_enabled(context.project_id, 'datafusion'):
186    return instances
187
188  logging.info('fetching list of Data Fusion instances in project %s',
189               context.project_id)
190  datafusion_api = apis.get_api('datafusion', 'v1', context.project_id)
191  query = datafusion_api.projects().locations().instances().list(
192      parent=f'projects/{context.project_id}/locations/-'
193  )  #'-' (wildcard) all regions
194
195  try:
196    resp = query.execute(num_retries=config.API_RETRIES)
197    if 'instances' not in resp:
198      return instances
199
200    for i in resp['instances']:
201      # projects/{project}/locations/{location}/instances/{instance}.
202      result = re.match(r'projects/[^/]+/locations/([^/]+)/instances/([^/]+)',
203                        i['name'])
204      if not result:
205        logging.error('invalid datafusion name: %s', i['name'])
206        continue
207      location = result.group(1)
208      labels = i.get('labels', {})
209      name = result.group(2)
210      if not context.match_project_resource(
211          location=location, labels=labels, resource=name):
212        continue
213
214      instances[i['name']] = Instance(project_id=context.project_id,
215                                      resource_data=i)
216
217  except googleapiclient.errors.HttpError as err:
218    raise utils.GcpApiError(err) from err
219
220  return instances

Get a dict of Instance matching the given context, indexed by instance full path.

@caching.cached_api_call
def extract_support_datafusion_version() -> Dict[str, str]:
223@caching.cached_api_call
224def extract_support_datafusion_version() -> Dict[str, str]:
225  """Extract the version policy dictionary from the data fusion version support policy page.
226
227  Returns:
228    A dictionary of data fusion versions and their support end dates.
229  """
230  page_url = 'https://cloud.google.com/data-fusion/docs/support/version-support-policy'
231
232  try:
233    data_fusion_table = web.fetch_and_extract_table(page_url,
234                                                    tag='h2',
235                                                    tag_id='support_timelines')
236    if data_fusion_table:
237      versions = []
238      support_end_dates = []
239      version_policy_dict = {}
240
241      for row in data_fusion_table.find_all('tr')[1:]:
242        columns = row.find_all('td')
243        version = columns[0]
244        support_end_date = columns[2].text.strip()
245        if version.sup:
246          version.sup.decompose()
247
248        version = version.text.strip()
249        try:
250          support_end_date = datetime.datetime.strptime(support_end_date,
251                                                        '%B %d, %Y')
252          support_end_date = datetime.datetime.strftime(support_end_date,
253                                                        '%Y-%m-%d')
254        except ValueError:
255          continue
256
257        versions.append(version)
258        support_end_dates.append(support_end_date)
259
260        version_policy_dict = dict(zip(versions, support_end_dates))
261      return version_policy_dict
262
263    else:
264      return {}
265
266  except (
267      requests.exceptions.RequestException,
268      AttributeError,
269      TypeError,
270      ValueError,
271      IndexError,
272  ) as e:
273    logging.error('Error in extracting data fusion version support policy: %s',
274                  e)
275    return {}

Extract the version policy dictionary from the data fusion version support policy page.

Returns:

A dictionary of data fusion versions and their support end dates.

class Profile(gcpdiag.models.Resource):
278class Profile(models.Resource):
279  """Represents a Compute Profile."""
280
281  _resource_data: dict
282
283  def __init__(self, project_id, instance_name, resource_data):
284    super().__init__(project_id=project_id)
285    self.instance_name = instance_name
286    self._resource_data = resource_data
287
288  @property
289  def full_path(self) -> str:
290    """The full path form :
291
292    projects/{project}/instances/{instance}/computeProfiles/{profile}.
293    """
294    return (f'projects/{self.project_id}/instances/{self.instance_name}'
295            f"/computeProfiles/{self._resource_data['name']}")
296
297  @property
298  def short_path(self) -> str:
299    """The short path form :
300
301    {project}/{instance}/{profile}.
302    """
303    return (
304        f"{self.project_id}/{self.instance_name}/{self._resource_data['name']}")
305
306  @property
307  def name(self) -> str:
308    return self._resource_data['name']
309
310  @property
311  def region(self) -> str:
312    for value in self._resource_data['provisioner'].get('properties'):
313      if value.get('name') == 'region' and value.get('value') is not None:
314        return value.get('value')
315    return 'No region defined'
316
317  @property
318  def status(self) -> str:
319    return self._resource_data['status']
320
321  @property
322  def scope(self) -> str:
323    return self._resource_data['scope']
324
325  @property
326  def is_dataproc_provisioner(self) -> bool:
327    return self._resource_data['provisioner']['name'] == 'gcp-dataproc'
328
329  @property
330  def is_existing_dataproc_provisioner(self) -> bool:
331    return self._resource_data['provisioner']['name'] == 'gcp-existing-dataproc'
332
333  @property
334  def autoscaling_enabled(self) -> bool:
335    for value in self._resource_data['provisioner'].get('properties'):
336      if (value.get('name') == 'enablePredefinedAutoScaling' and
337          value.get('value') is not None):
338        return value.get('value') == 'true'
339    return False
340
341  @property
342  def image_version(self) -> str:
343    for value in self._resource_data['provisioner'].get('properties'):
344      if value.get('name') == 'imageVersion' and value.get('value') != '':
345        return value.get('value')
346    return 'No imageVersion defined'
347
348  @property
349  def auto_scaling_policy(self) -> str:
350    for value in self._resource_data['provisioner'].get('properties'):
351      if value.get('name') == 'autoScalingPolicy' and value.get('value') != '':
352        return value.get('value')
353    return 'No autoScalingPolicy defined'

Represents a Compute Profile.

Profile(project_id, instance_name, resource_data)
283  def __init__(self, project_id, instance_name, resource_data):
284    super().__init__(project_id=project_id)
285    self.instance_name = instance_name
286    self._resource_data = resource_data
instance_name
full_path: str
288  @property
289  def full_path(self) -> str:
290    """The full path form :
291
292    projects/{project}/instances/{instance}/computeProfiles/{profile}.
293    """
294    return (f'projects/{self.project_id}/instances/{self.instance_name}'
295            f"/computeProfiles/{self._resource_data['name']}")

The full path form :

projects/{project}/instances/{instance}/computeProfiles/{profile}.

short_path: str
297  @property
298  def short_path(self) -> str:
299    """The short path form :
300
301    {project}/{instance}/{profile}.
302    """
303    return (
304        f"{self.project_id}/{self.instance_name}/{self._resource_data['name']}")

The short path form :

{project}/{instance}/{profile}.

name: str
306  @property
307  def name(self) -> str:
308    return self._resource_data['name']
region: str
310  @property
311  def region(self) -> str:
312    for value in self._resource_data['provisioner'].get('properties'):
313      if value.get('name') == 'region' and value.get('value') is not None:
314        return value.get('value')
315    return 'No region defined'
status: str
317  @property
318  def status(self) -> str:
319    return self._resource_data['status']
scope: str
321  @property
322  def scope(self) -> str:
323    return self._resource_data['scope']
is_dataproc_provisioner: bool
325  @property
326  def is_dataproc_provisioner(self) -> bool:
327    return self._resource_data['provisioner']['name'] == 'gcp-dataproc'
is_existing_dataproc_provisioner: bool
329  @property
330  def is_existing_dataproc_provisioner(self) -> bool:
331    return self._resource_data['provisioner']['name'] == 'gcp-existing-dataproc'
autoscaling_enabled: bool
333  @property
334  def autoscaling_enabled(self) -> bool:
335    for value in self._resource_data['provisioner'].get('properties'):
336      if (value.get('name') == 'enablePredefinedAutoScaling' and
337          value.get('value') is not None):
338        return value.get('value') == 'true'
339    return False
image_version: str
341  @property
342  def image_version(self) -> str:
343    for value in self._resource_data['provisioner'].get('properties'):
344      if value.get('name') == 'imageVersion' and value.get('value') != '':
345        return value.get('value')
346    return 'No imageVersion defined'
auto_scaling_policy: str
348  @property
349  def auto_scaling_policy(self) -> str:
350    for value in self._resource_data['provisioner'].get('properties'):
351      if value.get('name') == 'autoScalingPolicy' and value.get('value') != '':
352        return value.get('value')
353    return 'No autoScalingPolicy defined'
@caching.cached_api_call
def get_instance_system_compute_profile( context: gcpdiag.models.Context, instance: Instance) -> Iterable[Profile]:
356@caching.cached_api_call
357def get_instance_system_compute_profile(
358    context: models.Context, instance: Instance) -> Iterable[Profile]:
359  """Get a list of datafusion Instance dataproc System compute profile."""
360  logging.info('fetching dataproc System compute profile list: %s',
361               context.project_id)
362  system_profiles: List[Profile] = []
363  cdap_endpoint = instance.api_endpoint
364  datafusion = get_generic.get_generic_api('datafusion', cdap_endpoint)
365  response = datafusion.get_system_profiles()
366  if response is not None:
367    for res in response:
368      if (res['provisioner']['name'] == 'gcp-dataproc' or
369          res['provisioner']['name'] == 'gcp-existing-dataproc'):
370        system_profiles.append(Profile(context.project_id, instance.name, res))
371  return system_profiles

Get a list of datafusion Instance dataproc System compute profile.

@caching.cached_api_call
def get_instance_user_compute_profile( context: gcpdiag.models.Context, instance: Instance) -> Iterable[Profile]:
374@caching.cached_api_call
375def get_instance_user_compute_profile(context: models.Context,
376                                      instance: Instance) -> Iterable[Profile]:
377  """Get a list of datafusion Instance dataproc User compute profile."""
378  logging.info('fetching dataproc User compute profile list: %s',
379               context.project_id)
380  user_profiles: List[Profile] = []
381  cdap_endpoint = instance.api_endpoint
382  datafusion = get_generic.get_generic_api('datafusion', cdap_endpoint)
383  response_namespaces = datafusion.get_all_namespaces()
384  if response_namespaces is not None:
385    for res in response_namespaces:
386      response = datafusion.get_user_profiles(namespace=res['name'])
387      if response is not None:
388        for res in response:
389          if (res['provisioner']['name'] == 'gcp-dataproc' or
390              res['provisioner']['name'] == 'gcp-existing-dataproc'):
391            user_profiles.append(Profile(context.project_id, instance.name,
392                                         res))
393      user_profiles = list(filter(bool, user_profiles))
394  return user_profiles

Get a list of datafusion Instance dataproc User compute profile.

@caching.cached_api_call
def extract_datafusion_dataproc_version() -> Dict[str, list[str]]:
397@caching.cached_api_call
398def extract_datafusion_dataproc_version() -> Dict[str, list[str]]:
399  """Extract the supported Data Fusion versions and their corresponding
400  Dataproc versions from the GCP documentation."""
401
402  page_url = 'https://cloud.google.com/data-fusion/docs/concepts/configure-clusters'
403
404  try:
405    table = web.fetch_and_extract_table(page_url,
406                                        tag='h2',
407                                        tag_id='version-compatibility')
408    if table:
409      rows = table.find_all('tr')[1:]  #Skip the header row
410      version_dict = {}
411
412      for row in rows:
413        cdf_versions = row.find_all('td')[0].get_text().strip()
414        dp_versions = row.find_all('td')[1].get_text().strip()
415
416        cdf_versions = cdf_versions.replace(' and later', '')
417        cdf_versions_list = []
418
419        if '-' in cdf_versions:
420          start, end = map(float, cdf_versions.split('-'))
421          while start <= end:
422            cdf_versions_list.append(f'{start:.1f}')
423            start += 0.1
424        else:
425          cdf_versions_list.append(cdf_versions)
426        dp_versions = [v.split('*')[0].strip() for v in dp_versions.split(',')]
427        for version in cdf_versions_list:
428          version_dict[version] = dp_versions
429      return version_dict
430
431    else:
432      return {}
433  except (
434      requests.exceptions.RequestException,
435      AttributeError,
436      TypeError,
437      ValueError,
438      IndexError,
439  ) as e:
440    logging.error(
441        'Error in extracting datafusion and dataproc versions: %s',
442        e,
443    )
444    return {}

Extract the supported Data Fusion versions and their corresponding Dataproc versions from the GCP documentation.

class Preference(gcpdiag.models.Resource):
447class Preference(models.Resource):
448  """Represents a Preference."""
449
450  _resource_data: dict
451
452  def __init__(self, project_id, instance, resource_data):
453    super().__init__(project_id=project_id)
454    self.instance = instance
455    self._resource_data = resource_data
456
457  @property
458  def full_path(self) -> str:
459    """The full path form :
460
461    projects/{project}/locations/{location}/instances/{instance}.
462    """
463    return self.instance.full_path
464
465  @property
466  def image_version(self):
467    return self._resource_data.get('system.profile.properties.imageVersion',
468                                   None)

Represents a Preference.

Preference(project_id, instance, resource_data)
452  def __init__(self, project_id, instance, resource_data):
453    super().__init__(project_id=project_id)
454    self.instance = instance
455    self._resource_data = resource_data
instance
full_path: str
457  @property
458  def full_path(self) -> str:
459    """The full path form :
460
461    projects/{project}/locations/{location}/instances/{instance}.
462    """
463    return self.instance.full_path

The full path form :

projects/{project}/locations/{location}/instances/{instance}.

image_version
465  @property
466  def image_version(self):
467    return self._resource_data.get('system.profile.properties.imageVersion',
468                                   None)
def get_system_preferences( context: gcpdiag.models.Context, instance: Instance) -> Preference:
471def get_system_preferences(context: models.Context,
472                           instance: Instance) -> Preference:
473  """Get datafusion Instance system preferences."""
474  logging.info('fetching dataproc System preferences: %s', context.project_id)
475  cdap_endpoint = instance.api_endpoint
476  datafusion = get_generic.get_generic_api('datafusion', cdap_endpoint)
477  response = datafusion.get_system_preferences()
478  return Preference(context.project_id, instance, response)

Get datafusion Instance system preferences.

def get_namespace_preferences( context: gcpdiag.models.Context, instance: Instance) -> Mapping[str, Preference]:
481def get_namespace_preferences(context: models.Context,
482                              instance: Instance) -> Mapping[str, Preference]:
483  """Get datafusion cdap namespace preferences.
484  """
485  logging.info('fetching dataproc namespace preferences: %s',
486               context.project_id)
487  cdap_endpoint = instance.api_endpoint
488  datafusion = get_generic.get_generic_api('datafusion', cdap_endpoint)
489  namespaces = datafusion.get_all_namespaces()
490  namespaces_preferences = {}
491  if namespaces is not None:
492    for namespace in namespaces:
493      response = datafusion.get_namespace_preferences(
494          namespace=namespace['name'])
495      if bool(response):
496        namespaces_preferences[namespace['name']] = Preference(
497            context.project_id, instance, response)
498  return namespaces_preferences

Get datafusion cdap namespace preferences.

def get_application_preferences( context: gcpdiag.models.Context, instance: Instance) -> Mapping[str, Preference]:
501def get_application_preferences(context: models.Context,
502                                instance: Instance) -> Mapping[str, Preference]:
503  """Get datafusion cdap application preferences."""
504  logging.info('fetching dataproc application preferences: %s',
505               context.project_id)
506  cdap_endpoint = instance.api_endpoint
507  datafusion = get_generic.get_generic_api('datafusion', cdap_endpoint)
508  applications_preferences = {}
509  namespaces = datafusion.get_all_namespaces()
510  if namespaces is not None:
511    for namespace in namespaces:
512      applications = datafusion.get_all_applications(
513          namespace=namespace['name'])
514      if applications is not None:
515        for application in applications:
516          response = datafusion.get_application_preferences(
517              namespace=namespace['name'], application_name=application['name'])
518          if bool(response):
519            applications_preferences[application['name']] = Preference(
520                context.project_id, instance, response)
521  return applications_preferences

Get datafusion cdap application preferences.