gcpdiag.queries.datafusion

Queries related to Data Fusion.
IPv4NetOrIPv6Net = typing.Union[ipaddress.IPv4Network, ipaddress.IPv6Network]
class Instance(gcpdiag.models.Resource):
 37class Instance(models.Resource):
 38  """Represents a Data Fusion instance.
 39
 40  https://cloud.google.com/data-fusion/docs/reference/rest/v1/projects.locations.instances#Instance
 41  """
 42
 43  _resource_data: dict
 44
 45  def __init__(self, project_id, resource_data):
 46    super().__init__(project_id=project_id)
 47    self._resource_data = resource_data
 48
 49  @property
 50  def full_path(self) -> str:
 51    """
 52    The 'name' of the instance is already in the full path form
 53
 54    projects/{project}/locations/{location}/instances/{instance}.
 55    """
 56    return self._resource_data['name']
 57
 58  @property
 59  def short_path(self) -> str:
 60    path = self.full_path
 61    path = re.sub(r'^projects/', '', path)
 62    path = re.sub(r'/locations/', '/', path)
 63    path = re.sub(r'/instances/', '/', path)
 64    return path
 65
 66  @property
 67  def name(self) -> str:
 68    return utils.extract_value_from_res_name(self._resource_data['name'],
 69                                             'instances')
 70
 71  @property
 72  def location(self) -> str:
 73    return utils.extract_value_from_res_name(self._resource_data['name'],
 74                                             'locations')
 75
 76  @property
 77  def zone(self) -> str:
 78    return self._resource_data['zone']
 79
 80  @property
 81  def type(self) -> str:
 82    return self._resource_data['type']
 83
 84  @property
 85  def is_basic_type(self) -> bool:
 86    return self._resource_data['type'] == 'BASIC'
 87
 88  @property
 89  def is_enterprise_type(self) -> bool:
 90    return self._resource_data['type'] == 'ENTERPRISE'
 91
 92  @property
 93  def is_developer_type(self) -> bool:
 94    return self._resource_data['type'] == 'DEVELOPER'
 95
 96  @property
 97  def is_private(self) -> bool:
 98    if 'privateInstance' in self._resource_data:
 99      return self._resource_data['privateInstance']
100    return False
101
102  @property
103  def status(self) -> str:
104    return self._resource_data['state']
105
106  @property
107  def status_details(self) -> Optional[str]:
108    if 'stateMessage' in self._resource_data:
109      return self._resource_data['stateMessage']
110    return None
111
112  @property
113  def is_running(self) -> bool:
114    return self.status == 'ACTIVE'
115
116  @property
117  def is_deleting(self) -> bool:
118    return self._resource_data['state'] == 'DELETING'
119
120  @property
121  def version(self) -> Version:
122    return Version(self._resource_data['version'])
123
124  @property
125  def api_service_agent(self) -> str:
126    return self._resource_data['p4ServiceAccount']
127
128  @property
129  def dataproc_service_account(self) -> str:
130    sa = self._resource_data.get('dataprocServiceAccount')
131    if sa is None:
132      sa = crm.get_project(self.project_id).default_compute_service_account
133    return sa
134
135  @property
136  def tenant_project_id(self) -> str:
137    return self._resource_data['tenantProjectId']
138
139  @property
140  def uses_shared_vpc(self) -> bool:
141    """
142    If shared VPC then 'network_string' = 'projects/{host-project-id}/global/networks/{network}'
143    else 'network_string' = {network}
144    """
145    if 'network' in self._resource_data['networkConfig']:
146      network_string = self._resource_data['networkConfig']['network']
147      match = re.match(r'projects/([^/]+)/global/networks/([^/]+)$',
148                       network_string)
149      if match and match.group(1) != self.project_id:
150        return True
151
152    return False
153
154  @property
155  def network(self) -> network.Network:
156    if 'network' in self._resource_data['networkConfig']:
157      network_string = self._resource_data['networkConfig']['network']
158      match = re.match(r'projects/([^/]+)/global/networks/([^/]+)$',
159                       network_string)
160      if match:
161        return network.get_network(match.group(1), match.group(2))
162      else:
163        return network.get_network(self.project_id, network_string)
164
165    return network.get_network(self.project_id, 'default')
166
167  @property
168  def tp_ipv4_cidr(self) -> Optional[IPv4NetOrIPv6Net]:
169    if 'network' in self._resource_data['networkConfig']:
170      cidr = self._resource_data['networkConfig']['ipAllocation']
171      return ipaddress.ip_network(cidr)
172    return None
173
174  @property
175  def api_endpoint(self) -> str:
176    return self._resource_data['apiEndpoint']
Instance(project_id, resource_data)
45  def __init__(self, project_id, resource_data):
46    super().__init__(project_id=project_id)
47    self._resource_data = resource_data
full_path: str
49  @property
50  def full_path(self) -> str:
51    """
52    The 'name' of the instance is already in the full path form
53
54    projects/{project}/locations/{location}/instances/{instance}.
55    """
56    return self._resource_data['name']

The 'name' of the instance is already in the full path form

projects/{project}/locations/{location}/instances/{instance}.

short_path: str
58  @property
59  def short_path(self) -> str:
60    path = self.full_path
61    path = re.sub(r'^projects/', '', path)
62    path = re.sub(r'/locations/', '/', path)
63    path = re.sub(r'/instances/', '/', path)
64    return path

Returns the short name for this resource.

Note that it isn't clear from this name what kind of resource it is.

Example: 'gke1'

name: str
66  @property
67  def name(self) -> str:
68    return utils.extract_value_from_res_name(self._resource_data['name'],
69                                             'instances')
location: str
71  @property
72  def location(self) -> str:
73    return utils.extract_value_from_res_name(self._resource_data['name'],
74                                             'locations')
zone: str
76  @property
77  def zone(self) -> str:
78    return self._resource_data['zone']
type: str
80  @property
81  def type(self) -> str:
82    return self._resource_data['type']
is_basic_type: bool
84  @property
85  def is_basic_type(self) -> bool:
86    return self._resource_data['type'] == 'BASIC'
is_enterprise_type: bool
88  @property
89  def is_enterprise_type(self) -> bool:
90    return self._resource_data['type'] == 'ENTERPRISE'
is_developer_type: bool
92  @property
93  def is_developer_type(self) -> bool:
94    return self._resource_data['type'] == 'DEVELOPER'
is_private: bool
 96  @property
 97  def is_private(self) -> bool:
 98    if 'privateInstance' in self._resource_data:
 99      return self._resource_data['privateInstance']
100    return False
status: str
102  @property
103  def status(self) -> str:
104    return self._resource_data['state']
status_details: Optional[str]
106  @property
107  def status_details(self) -> Optional[str]:
108    if 'stateMessage' in self._resource_data:
109      return self._resource_data['stateMessage']
110    return None
is_running: bool
112  @property
113  def is_running(self) -> bool:
114    return self.status == 'ACTIVE'
is_deleting: bool
116  @property
117  def is_deleting(self) -> bool:
118    return self._resource_data['state'] == 'DELETING'
version: gcpdiag.utils.Version
120  @property
121  def version(self) -> Version:
122    return Version(self._resource_data['version'])
api_service_agent: str
124  @property
125  def api_service_agent(self) -> str:
126    return self._resource_data['p4ServiceAccount']
dataproc_service_account: str
128  @property
129  def dataproc_service_account(self) -> str:
130    sa = self._resource_data.get('dataprocServiceAccount')
131    if sa is None:
132      sa = crm.get_project(self.project_id).default_compute_service_account
133    return sa
tenant_project_id: str
135  @property
136  def tenant_project_id(self) -> str:
137    return self._resource_data['tenantProjectId']
uses_shared_vpc: bool
139  @property
140  def uses_shared_vpc(self) -> bool:
141    """
142    If shared VPC then 'network_string' = 'projects/{host-project-id}/global/networks/{network}'
143    else 'network_string' = {network}
144    """
145    if 'network' in self._resource_data['networkConfig']:
146      network_string = self._resource_data['networkConfig']['network']
147      match = re.match(r'projects/([^/]+)/global/networks/([^/]+)$',
148                       network_string)
149      if match and match.group(1) != self.project_id:
150        return True
151
152    return False

If shared VPC then 'network_string' = 'projects/{host-project-id}/global/networks/{network}' else 'network_string' = {network}

network: gcpdiag.queries.network.Network
154  @property
155  def network(self) -> network.Network:
156    if 'network' in self._resource_data['networkConfig']:
157      network_string = self._resource_data['networkConfig']['network']
158      match = re.match(r'projects/([^/]+)/global/networks/([^/]+)$',
159                       network_string)
160      if match:
161        return network.get_network(match.group(1), match.group(2))
162      else:
163        return network.get_network(self.project_id, network_string)
164
165    return network.get_network(self.project_id, 'default')
tp_ipv4_cidr: Union[ipaddress.IPv4Network, ipaddress.IPv6Network, NoneType]
167  @property
168  def tp_ipv4_cidr(self) -> Optional[IPv4NetOrIPv6Net]:
169    if 'network' in self._resource_data['networkConfig']:
170      cidr = self._resource_data['networkConfig']['ipAllocation']
171      return ipaddress.ip_network(cidr)
172    return None
api_endpoint: str
174  @property
175  def api_endpoint(self) -> str:
176    return self._resource_data['apiEndpoint']
@caching.cached_api_call
def get_instances( context: gcpdiag.models.Context) -> Mapping[str, Instance]:
179@caching.cached_api_call
180def get_instances(context: models.Context) -> Mapping[str, Instance]:
181  """Get a dict of Instance matching the given context, indexed by instance full path."""
182  instances: Dict[str, Instance] = {}
183
184  if not apis.is_enabled(context.project_id, 'datafusion'):
185    return instances
186
187  logging.info('fetching list of Data Fusion instances in project %s',
188               context.project_id)
189  datafusion_api = apis.get_api('datafusion', 'v1', context.project_id)
190  query = datafusion_api.projects().locations().instances().list(
191      parent=f'projects/{context.project_id}/locations/-'
192  )  #'-' (wildcard) all regions
193
194  try:
195    resp = query.execute(num_retries=config.API_RETRIES)
196    if 'instances' not in resp:
197      return instances
198
199    for i in resp['instances']:
200      # projects/{project}/locations/{location}/instances/{instance}.
201      result = re.match(r'projects/[^/]+/locations/([^/]+)/instances/([^/]+)',
202                        i['name'])
203      if not result:
204        logging.error('invalid datafusion name: %s', i['name'])
205        continue
206      location = result.group(1)
207      labels = i.get('labels', {})
208      name = result.group(2)
209      if not context.match_project_resource(
210          location=location, labels=labels, resource=name):
211        continue
212
213      instances[i['name']] = Instance(project_id=context.project_id,
214                                      resource_data=i)
215
216  except googleapiclient.errors.HttpError as err:
217    raise utils.GcpApiError(err) from err
218
219  return instances

Get a dict of Instance matching the given context, indexed by instance full path.

@caching.cached_api_call
def extract_support_datafusion_version() -> Dict[str, str]:
222@caching.cached_api_call
223def extract_support_datafusion_version() -> Dict[str, str]:
224  """Extract the version policy dictionary from the data fusion version support policy page.
225
226  Returns:
227    A dictionary of data fusion versions and their support end dates.
228  """
229  page_url = 'https://cloud.google.com/data-fusion/docs/support/version-support-policy'
230
231  try:
232    data_fusion_table = web.fetch_and_extract_table(page_url,
233                                                    tag='h2',
234                                                    tag_id='support_timelines')
235    if data_fusion_table:
236      versions = []
237      support_end_dates = []
238      version_policy_dict = {}
239
240      for row in data_fusion_table.find_all('tr')[1:]:
241        columns = row.find_all('td')
242        version = columns[0]
243        support_end_date = columns[2].text.strip()
244        if version.sup:
245          version.sup.decompose()
246
247        version = version.text.strip()
248        try:
249          support_end_date = datetime.datetime.strptime(support_end_date,
250                                                        '%B %d, %Y')
251          support_end_date = datetime.datetime.strftime(support_end_date,
252                                                        '%Y-%m-%d')
253        except ValueError:
254          continue
255
256        versions.append(version)
257        support_end_dates.append(support_end_date)
258
259        version_policy_dict = dict(zip(versions, support_end_dates))
260      return version_policy_dict
261
262    else:
263      return {}
264
265  except (
266      requests.exceptions.RequestException,
267      AttributeError,
268      TypeError,
269      ValueError,
270      IndexError,
271  ) as e:
272    logging.error('Error in extracting data fusion version support policy: %s',
273                  e)
274    return {}

Extract the version policy dictionary from the data fusion version support policy page.

Returns:

A dictionary of data fusion versions and their support end dates.

class Profile(gcpdiag.models.Resource):
277class Profile(models.Resource):
278  """Represents a Compute Profile."""
279
280  _resource_data: dict
281
282  def __init__(self, project_id, instance_name, resource_data):
283    super().__init__(project_id=project_id)
284    self.instance_name = instance_name
285    self._resource_data = resource_data
286
287  @property
288  def full_path(self) -> str:
289    """The full path form :
290
291    projects/{project}/instances/{instance}/computeProfiles/{profile}.
292    """
293    return (f'projects/{self.project_id}/instances/{self.instance_name}'
294            f'/computeProfiles/{self._resource_data["name"]}')
295
296  @property
297  def short_path(self) -> str:
298    """The short path form :
299
300    {project}/{instance}/{profile}.
301    """
302    return (
303        f'{self.project_id}/{self.instance_name}/{self._resource_data["name"]}')
304
305  @property
306  def name(self) -> str:
307    return self._resource_data['name']
308
309  @property
310  def region(self) -> str:
311    for value in self._resource_data['provisioner'].get('properties'):
312      if value.get('name') == 'region' and value.get('value') is not None:
313        return value.get('value')
314    return 'No region defined'
315
316  @property
317  def status(self) -> str:
318    return self._resource_data['status']
319
320  @property
321  def scope(self) -> str:
322    return self._resource_data['scope']
323
324  @property
325  def is_dataproc_provisioner(self) -> bool:
326    return self._resource_data['provisioner']['name'] == 'gcp-dataproc'
327
328  @property
329  def is_existing_dataproc_provisioner(self) -> bool:
330    return self._resource_data['provisioner']['name'] == 'gcp-existing-dataproc'
331
332  @property
333  def autoscaling_enabled(self) -> bool:
334    for value in self._resource_data['provisioner'].get('properties'):
335      if (value.get('name') == 'enablePredefinedAutoScaling' and
336          value.get('value') is not None):
337        return value.get('value') == 'true'
338    return False
339
340  @property
341  def image_version(self) -> str:
342    for value in self._resource_data['provisioner'].get('properties'):
343      if value.get('name') == 'imageVersion' and value.get('value') != '':
344        return value.get('value')
345    return 'No imageVersion defined'
346
347  @property
348  def auto_scaling_policy(self) -> str:
349    for value in self._resource_data['provisioner'].get('properties'):
350      if value.get('name') == 'autoScalingPolicy' and value.get('value') != '':
351        return value.get('value')
352    return 'No autoScalingPolicy defined'

Represents a Compute Profile.

Profile(project_id, instance_name, resource_data)
282  def __init__(self, project_id, instance_name, resource_data):
283    super().__init__(project_id=project_id)
284    self.instance_name = instance_name
285    self._resource_data = resource_data
instance_name
full_path: str
287  @property
288  def full_path(self) -> str:
289    """The full path form :
290
291    projects/{project}/instances/{instance}/computeProfiles/{profile}.
292    """
293    return (f'projects/{self.project_id}/instances/{self.instance_name}'
294            f'/computeProfiles/{self._resource_data["name"]}')

The full path form :

projects/{project}/instances/{instance}/computeProfiles/{profile}.

short_path: str
296  @property
297  def short_path(self) -> str:
298    """The short path form :
299
300    {project}/{instance}/{profile}.
301    """
302    return (
303        f'{self.project_id}/{self.instance_name}/{self._resource_data["name"]}')

The short path form :

{project}/{instance}/{profile}.

name: str
305  @property
306  def name(self) -> str:
307    return self._resource_data['name']
region: str
309  @property
310  def region(self) -> str:
311    for value in self._resource_data['provisioner'].get('properties'):
312      if value.get('name') == 'region' and value.get('value') is not None:
313        return value.get('value')
314    return 'No region defined'
status: str
316  @property
317  def status(self) -> str:
318    return self._resource_data['status']
scope: str
320  @property
321  def scope(self) -> str:
322    return self._resource_data['scope']
is_dataproc_provisioner: bool
324  @property
325  def is_dataproc_provisioner(self) -> bool:
326    return self._resource_data['provisioner']['name'] == 'gcp-dataproc'
is_existing_dataproc_provisioner: bool
328  @property
329  def is_existing_dataproc_provisioner(self) -> bool:
330    return self._resource_data['provisioner']['name'] == 'gcp-existing-dataproc'
autoscaling_enabled: bool
332  @property
333  def autoscaling_enabled(self) -> bool:
334    for value in self._resource_data['provisioner'].get('properties'):
335      if (value.get('name') == 'enablePredefinedAutoScaling' and
336          value.get('value') is not None):
337        return value.get('value') == 'true'
338    return False
image_version: str
340  @property
341  def image_version(self) -> str:
342    for value in self._resource_data['provisioner'].get('properties'):
343      if value.get('name') == 'imageVersion' and value.get('value') != '':
344        return value.get('value')
345    return 'No imageVersion defined'
auto_scaling_policy: str
347  @property
348  def auto_scaling_policy(self) -> str:
349    for value in self._resource_data['provisioner'].get('properties'):
350      if value.get('name') == 'autoScalingPolicy' and value.get('value') != '':
351        return value.get('value')
352    return 'No autoScalingPolicy defined'
@caching.cached_api_call
def get_instance_system_compute_profile( context: gcpdiag.models.Context, instance: Instance) -> Iterable[Profile]:
355@caching.cached_api_call
356def get_instance_system_compute_profile(
357    context: models.Context, instance: Instance) -> Iterable[Profile]:
358  """Get a list of datafusion Instance dataproc System compute profile."""
359  logging.info('fetching dataproc System compute profile list: %s',
360               context.project_id)
361  system_profiles: List[Profile] = []
362  cdap_endpoint = instance.api_endpoint
363  datafusion = get_generic.get_generic_api('datafusion', cdap_endpoint)
364  response = datafusion.get_system_profiles()
365  if response is not None:
366    for res in response:
367      if (res['provisioner']['name'] == 'gcp-dataproc' or
368          res['provisioner']['name'] == 'gcp-existing-dataproc'):
369        system_profiles.append(Profile(context.project_id, instance.name, res))
370  return system_profiles

Get a list of datafusion Instance dataproc System compute profile.

@caching.cached_api_call
def get_instance_user_compute_profile( context: gcpdiag.models.Context, instance: Instance) -> Iterable[Profile]:
373@caching.cached_api_call
374def get_instance_user_compute_profile(context: models.Context,
375                                      instance: Instance) -> Iterable[Profile]:
376  """Get a list of datafusion Instance dataproc User compute profile."""
377  logging.info('fetching dataproc User compute profile list: %s',
378               context.project_id)
379  user_profiles: List[Profile] = []
380  cdap_endpoint = instance.api_endpoint
381  datafusion = get_generic.get_generic_api('datafusion', cdap_endpoint)
382  response_namespaces = datafusion.get_all_namespaces()
383  if response_namespaces is not None:
384    for res in response_namespaces:
385      response = datafusion.get_user_profiles(namespace=res['name'])
386      if response is not None:
387        for res in response:
388          if (res['provisioner']['name'] == 'gcp-dataproc' or
389              res['provisioner']['name'] == 'gcp-existing-dataproc'):
390            user_profiles.append(Profile(context.project_id, instance.name,
391                                         res))
392      user_profiles = list(filter(bool, user_profiles))
393  return user_profiles

Get a list of datafusion Instance dataproc User compute profile.

@caching.cached_api_call
def extract_datafusion_dataproc_version() -> Dict[str, list[str]]:
396@caching.cached_api_call
397def extract_datafusion_dataproc_version() -> Dict[str, list[str]]:
398  """Extract the supported Data Fusion versions and their corresponding
399  Dataproc versions from the GCP documentation."""
400
401  page_url = 'https://cloud.google.com/data-fusion/docs/concepts/configure-clusters'
402
403  try:
404    table = web.fetch_and_extract_table(page_url,
405                                        tag='h2',
406                                        tag_id='version-compatibility')
407    if table:
408      rows = table.find_all('tr')[1:]  #Skip the header row
409      version_dict = {}
410
411      for row in rows:
412        cdf_versions = row.find_all('td')[0].get_text().strip()
413        dp_versions = row.find_all('td')[1].get_text().strip()
414
415        cdf_versions = cdf_versions.replace(' and later', '')
416        cdf_versions_list = []
417
418        if '-' in cdf_versions:
419          start, end = map(float, cdf_versions.split('-'))
420          while start <= end:
421            cdf_versions_list.append(f'{start:.1f}')
422            start += 0.1
423        else:
424          cdf_versions_list.append(cdf_versions)
425        dp_versions = [v.split('*')[0].strip() for v in dp_versions.split(',')]
426        for version in cdf_versions_list:
427          version_dict[version] = dp_versions
428      return version_dict
429
430    else:
431      return {}
432  except (
433      requests.exceptions.RequestException,
434      AttributeError,
435      TypeError,
436      ValueError,
437      IndexError,
438  ) as e:
439    logging.error(
440        'Error in extracting datafusion and dataproc versions: %s',
441        e,
442    )
443    return {}

Extract the supported Data Fusion versions and their corresponding Dataproc versions from the GCP documentation.

class Preference(gcpdiag.models.Resource):
446class Preference(models.Resource):
447  """Represents a Preference."""
448
449  _resource_data: dict
450
451  def __init__(self, project_id, instance, resource_data):
452    super().__init__(project_id=project_id)
453    self.instance = instance
454    self._resource_data = resource_data
455
456  @property
457  def full_path(self) -> str:
458    """The full path form :
459
460    projects/{project}/locations/{location}/instances/{instance}.
461    """
462    return self.instance.full_path
463
464  @property
465  def image_version(self):
466    return self._resource_data.get('system.profile.properties.imageVersion',
467                                   None)

Represents a Preference.

Preference(project_id, instance, resource_data)
451  def __init__(self, project_id, instance, resource_data):
452    super().__init__(project_id=project_id)
453    self.instance = instance
454    self._resource_data = resource_data
instance
full_path: str
456  @property
457  def full_path(self) -> str:
458    """The full path form :
459
460    projects/{project}/locations/{location}/instances/{instance}.
461    """
462    return self.instance.full_path

The full path form :

projects/{project}/locations/{location}/instances/{instance}.

image_version
464  @property
465  def image_version(self):
466    return self._resource_data.get('system.profile.properties.imageVersion',
467                                   None)
def get_system_preferences( context: gcpdiag.models.Context, instance: Instance) -> Preference:
470def get_system_preferences(context: models.Context,
471                           instance: Instance) -> Preference:
472  """Get datafusion Instance system preferences."""
473  logging.info('fetching dataproc System preferences: %s', context.project_id)
474  cdap_endpoint = instance.api_endpoint
475  datafusion = get_generic.get_generic_api('datafusion', cdap_endpoint)
476  response = datafusion.get_system_preferences()
477  return Preference(context.project_id, instance, response)

Get datafusion Instance system preferences.

def get_namespace_preferences( context: gcpdiag.models.Context, instance: Instance) -> Mapping[str, Preference]:
480def get_namespace_preferences(context: models.Context,
481                              instance: Instance) -> Mapping[str, Preference]:
482  """Get datafusion cdap namespace preferences.
483  """
484  logging.info('fetching dataproc namespace preferences: %s',
485               context.project_id)
486  cdap_endpoint = instance.api_endpoint
487  datafusion = get_generic.get_generic_api('datafusion', cdap_endpoint)
488  namespaces = datafusion.get_all_namespaces()
489  namespaces_preferences = {}
490  if namespaces is not None:
491    for namespace in namespaces:
492      response = datafusion.get_namespace_preferences(
493          namespace=namespace['name'])
494      if bool(response):
495        namespaces_preferences[namespace['name']] = Preference(
496            context.project_id, instance, response)
497  return namespaces_preferences

Get datafusion cdap namespace preferences.

def get_application_preferences( context: gcpdiag.models.Context, instance: Instance) -> Mapping[str, Preference]:
500def get_application_preferences(context: models.Context,
501                                instance: Instance) -> Mapping[str, Preference]:
502  """Get datafusion cdap application preferences."""
503  logging.info('fetching dataproc application preferences: %s',
504               context.project_id)
505  cdap_endpoint = instance.api_endpoint
506  datafusion = get_generic.get_generic_api('datafusion', cdap_endpoint)
507  applications_preferences = {}
508  namespaces = datafusion.get_all_namespaces()
509  if namespaces is not None:
510    for namespace in namespaces:
511      applications = datafusion.get_all_applications(
512          namespace=namespace['name'])
513      if applications is not None:
514        for application in applications:
515          response = datafusion.get_application_preferences(
516              namespace=namespace['name'], application_name=application['name'])
517          if bool(response):
518            applications_preferences[application['name']] = Preference(
519                context.project_id, instance, response)
520  return applications_preferences

Get datafusion cdap application preferences.