gcpdiag.queries.datafusion

Queries related to Data Fusion.
IPv4NetOrIPv6Net = typing.Union[ipaddress.IPv4Network, ipaddress.IPv6Network]
class Instance(gcpdiag.models.Resource):
 37class Instance(models.Resource):
 38  """Represents a Data Fusion instance.
 39
 40  https://cloud.google.com/data-fusion/docs/reference/rest/v1/projects.locations.instances#Instance
 41  """
 42
 43  _resource_data: dict
 44
 45  def __init__(self, project_id, resource_data):
 46    super().__init__(project_id=project_id)
 47    self._resource_data = resource_data
 48
 49  @property
 50  def full_path(self) -> str:
 51    """
 52    The 'name' of the instance is already in the full path form
 53
 54    projects/{project}/locations/{location}/instances/{instance}.
 55    """
 56    return self._resource_data['name']
 57
 58  @property
 59  def short_path(self) -> str:
 60    path = self.full_path
 61    path = re.sub(r'^projects/', '', path)
 62    path = re.sub(r'/locations/', '/', path)
 63    path = re.sub(r'/instances/', '/', path)
 64    return path
 65
 66  @property
 67  def name(self) -> str:
 68    return utils.extract_value_from_res_name(self._resource_data['name'],
 69                                             'instances')
 70
 71  @property
 72  def location(self) -> str:
 73    return utils.extract_value_from_res_name(self._resource_data['name'],
 74                                             'locations')
 75
 76  @property
 77  def zone(self) -> str:
 78    return self._resource_data['zone']
 79
 80  @property
 81  def type(self) -> str:
 82    return self._resource_data['type']
 83
 84  @property
 85  def is_basic_type(self) -> bool:
 86    return self._resource_data['type'] == 'BASIC'
 87
 88  @property
 89  def is_enterprise_type(self) -> bool:
 90    return self._resource_data['type'] == 'ENTERPRISE'
 91
 92  @property
 93  def is_developer_type(self) -> bool:
 94    return self._resource_data['type'] == 'DEVELOPER'
 95
 96  @property
 97  def is_private(self) -> bool:
 98    if 'privateInstance' in self._resource_data:
 99      return self._resource_data['privateInstance']
100    return False
101
102  @property
103  def status(self) -> str:
104    return self._resource_data['state']
105
106  @property
107  def is_running(self) -> bool:
108    return self.status == 'ACTIVE'
109
110  @property
111  def is_deleting(self) -> bool:
112    return self._resource_data['state'] == 'DELETING'
113
114  @property
115  def version(self) -> Version:
116    return Version(self._resource_data['version'])
117
118  @property
119  def api_service_agent(self) -> str:
120    return self._resource_data['p4ServiceAccount']
121
122  @property
123  def dataproc_service_account(self) -> str:
124    sa = self._resource_data.get('dataprocServiceAccount')
125    if sa is None:
126      sa = crm.get_project(self.project_id).default_compute_service_account
127    return sa
128
129  @property
130  def tenant_project_id(self) -> str:
131    return self._resource_data['tenantProjectId']
132
133  @property
134  def uses_shared_vpc(self) -> bool:
135    """
136    If shared VPC then 'network_string' = 'projects/{host-project-id}/global/networks/{network}'
137    else 'network_string' = {network}
138    """
139    if 'network' in self._resource_data['networkConfig']:
140      network_string = self._resource_data['networkConfig']['network']
141      match = re.match(r'projects/([^/]+)/global/networks/([^/]+)$',
142                       network_string)
143      if match and match.group(1) != self.project_id:
144        return True
145
146    return False
147
148  @property
149  def network(self) -> network.Network:
150    if 'network' in self._resource_data['networkConfig']:
151      network_string = self._resource_data['networkConfig']['network']
152      match = re.match(r'projects/([^/]+)/global/networks/([^/]+)$',
153                       network_string)
154      if match:
155        return network.get_network(match.group(1), match.group(2))
156      else:
157        return network.get_network(self.project_id, network_string)
158
159    return network.get_network(self.project_id, 'default')
160
161  @property
162  def tp_ipv4_cidr(self) -> Optional[IPv4NetOrIPv6Net]:
163    if 'network' in self._resource_data['networkConfig']:
164      cidr = self._resource_data['networkConfig']['ipAllocation']
165      return ipaddress.ip_network(cidr)
166    return None
167
168  @property
169  def api_endpoint(self) -> str:
170    return self._resource_data['apiEndpoint']
Instance(project_id, resource_data)
45  def __init__(self, project_id, resource_data):
46    super().__init__(project_id=project_id)
47    self._resource_data = resource_data
full_path: str
49  @property
50  def full_path(self) -> str:
51    """
52    The 'name' of the instance is already in the full path form
53
54    projects/{project}/locations/{location}/instances/{instance}.
55    """
56    return self._resource_data['name']

The 'name' of the instance is already in the full path form

projects/{project}/locations/{location}/instances/{instance}.

short_path: str
58  @property
59  def short_path(self) -> str:
60    path = self.full_path
61    path = re.sub(r'^projects/', '', path)
62    path = re.sub(r'/locations/', '/', path)
63    path = re.sub(r'/instances/', '/', path)
64    return path

Returns the short name for this resource.

Note that it isn't clear from this name what kind of resource it is.

Example: 'gke1'

name: str
66  @property
67  def name(self) -> str:
68    return utils.extract_value_from_res_name(self._resource_data['name'],
69                                             'instances')
location: str
71  @property
72  def location(self) -> str:
73    return utils.extract_value_from_res_name(self._resource_data['name'],
74                                             'locations')
zone: str
76  @property
77  def zone(self) -> str:
78    return self._resource_data['zone']
type: str
80  @property
81  def type(self) -> str:
82    return self._resource_data['type']
is_basic_type: bool
84  @property
85  def is_basic_type(self) -> bool:
86    return self._resource_data['type'] == 'BASIC'
is_enterprise_type: bool
88  @property
89  def is_enterprise_type(self) -> bool:
90    return self._resource_data['type'] == 'ENTERPRISE'
is_developer_type: bool
92  @property
93  def is_developer_type(self) -> bool:
94    return self._resource_data['type'] == 'DEVELOPER'
is_private: bool
 96  @property
 97  def is_private(self) -> bool:
 98    if 'privateInstance' in self._resource_data:
 99      return self._resource_data['privateInstance']
100    return False
status: str
102  @property
103  def status(self) -> str:
104    return self._resource_data['state']
is_running: bool
106  @property
107  def is_running(self) -> bool:
108    return self.status == 'ACTIVE'
is_deleting: bool
110  @property
111  def is_deleting(self) -> bool:
112    return self._resource_data['state'] == 'DELETING'
version: gcpdiag.utils.Version
114  @property
115  def version(self) -> Version:
116    return Version(self._resource_data['version'])
api_service_agent: str
118  @property
119  def api_service_agent(self) -> str:
120    return self._resource_data['p4ServiceAccount']
dataproc_service_account: str
122  @property
123  def dataproc_service_account(self) -> str:
124    sa = self._resource_data.get('dataprocServiceAccount')
125    if sa is None:
126      sa = crm.get_project(self.project_id).default_compute_service_account
127    return sa
tenant_project_id: str
129  @property
130  def tenant_project_id(self) -> str:
131    return self._resource_data['tenantProjectId']
uses_shared_vpc: bool
133  @property
134  def uses_shared_vpc(self) -> bool:
135    """
136    If shared VPC then 'network_string' = 'projects/{host-project-id}/global/networks/{network}'
137    else 'network_string' = {network}
138    """
139    if 'network' in self._resource_data['networkConfig']:
140      network_string = self._resource_data['networkConfig']['network']
141      match = re.match(r'projects/([^/]+)/global/networks/([^/]+)$',
142                       network_string)
143      if match and match.group(1) != self.project_id:
144        return True
145
146    return False

If shared VPC then 'network_string' = 'projects/{host-project-id}/global/networks/{network}' else 'network_string' = {network}

network: gcpdiag.queries.network.Network
148  @property
149  def network(self) -> network.Network:
150    if 'network' in self._resource_data['networkConfig']:
151      network_string = self._resource_data['networkConfig']['network']
152      match = re.match(r'projects/([^/]+)/global/networks/([^/]+)$',
153                       network_string)
154      if match:
155        return network.get_network(match.group(1), match.group(2))
156      else:
157        return network.get_network(self.project_id, network_string)
158
159    return network.get_network(self.project_id, 'default')
tp_ipv4_cidr: Union[ipaddress.IPv4Network, ipaddress.IPv6Network, NoneType]
161  @property
162  def tp_ipv4_cidr(self) -> Optional[IPv4NetOrIPv6Net]:
163    if 'network' in self._resource_data['networkConfig']:
164      cidr = self._resource_data['networkConfig']['ipAllocation']
165      return ipaddress.ip_network(cidr)
166    return None
api_endpoint: str
168  @property
169  def api_endpoint(self) -> str:
170    return self._resource_data['apiEndpoint']
Inherited Members
gcpdiag.models.Resource
project_id
@caching.cached_api_call
def get_instances( context: gcpdiag.models.Context) -> Mapping[str, Instance]:
173@caching.cached_api_call
174def get_instances(context: models.Context) -> Mapping[str, Instance]:
175  """Get a dict of Instance matching the given context, indexed by instance full path."""
176  instances: Dict[str, Instance] = {}
177
178  if not apis.is_enabled(context.project_id, 'datafusion'):
179    return instances
180
181  logging.info('fetching list of Data Fusion instances in project %s',
182               context.project_id)
183  datafusion_api = apis.get_api('datafusion', 'v1', context.project_id)
184  query = datafusion_api.projects().locations().instances().list(
185      parent=f'projects/{context.project_id}/locations/-'
186  )  #'-' (wildcard) all regions
187
188  try:
189    resp = query.execute(num_retries=config.API_RETRIES)
190    if 'instances' not in resp:
191      return instances
192
193    for i in resp['instances']:
194      # projects/{project}/locations/{location}/instances/{instance}.
195      result = re.match(r'projects/[^/]+/locations/([^/]+)/instances/([^/]+)',
196                        i['name'])
197      if not result:
198        logging.error('invalid datafusion name: %s', i['name'])
199        continue
200      location = result.group(1)
201      labels = i.get('labels', {})
202      name = result.group(2)
203      if not context.match_project_resource(
204          location=location, labels=labels, resource=name):
205        continue
206
207      instances[i['name']] = Instance(project_id=context.project_id,
208                                      resource_data=i)
209
210  except googleapiclient.errors.HttpError as err:
211    raise utils.GcpApiError(err) from err
212
213  return instances

Get a dict of Instance matching the given context, indexed by instance full path.

@caching.cached_api_call
def extract_support_datafusion_version() -> Dict[str, str]:
216@caching.cached_api_call
217def extract_support_datafusion_version() -> Dict[str, str]:
218  """Extract the version policy dictionary from the data fusion version support policy page.
219
220  Returns:
221    A dictionary of data fusion versions and their support end dates.
222  """
223  page_url = 'https://cloud.google.com/data-fusion/docs/support/version-support-policy'
224
225  try:
226    response = requests.get(page_url)
227    response.raise_for_status()
228    soup = BeautifulSoup(response.content, 'html.parser')
229    data_fusion_table = soup.find('table')
230    if data_fusion_table:
231      versions = []
232      support_end_dates = []
233      version_policy_dict = {}
234
235      for row in data_fusion_table.find_all('tr')[1:]:
236        columns = row.find_all('td')
237        version = columns[0]
238        support_end_date = columns[2].text.strip()
239        if version.sup:
240          version.sup.decompose()
241
242        version = version.text.strip()
243        try:
244          support_end_date = datetime.datetime.strptime(support_end_date,
245                                                        '%B %d, %Y')
246          support_end_date = datetime.datetime.strftime(support_end_date,
247                                                        '%Y-%m-%d')
248        except ValueError:
249          continue
250
251        versions.append(version)
252        support_end_dates.append(support_end_date)
253
254        version_policy_dict = dict(zip(versions, support_end_dates))
255      return version_policy_dict
256
257    else:
258      return {}
259
260  except (
261      requests.exceptions.RequestException,
262      AttributeError,
263      TypeError,
264      ValueError,
265      IndexError,
266  ) as e:
267    logging.error('Error in extracting data fusion version support policy: %s',
268                  e)
269    return {}

Extract the version policy dictionary from the data fusion version support policy page.

Returns:

A dictionary of data fusion versions and their support end dates.

class Profile(gcpdiag.models.Resource):
272class Profile(models.Resource):
273  """Represents a Compute Profile."""
274
275  _resource_data: dict
276
277  def __init__(self, project_id, resource_data):
278    super().__init__(project_id=project_id)
279    self._resource_data = resource_data
280
281  @property
282  def full_path(self) -> str:
283    return self._resource_data['name']
284
285  @property
286  def short_path(self) -> str:
287    path = self.full_path
288    return path
289
290  @property
291  def name(self) -> str:
292    return self._resource_data['name']
293
294  @property
295  def region(self) -> str:
296    for value in self._resource_data['provisioner'].get('properties'):
297      if value.get('name') == 'region' and value.get('value') is not None:
298        return value.get('value')
299    return 'No region defined'
300
301  @property
302  def status(self) -> str:
303    return self._resource_data['status']
304
305  @property
306  def scope(self) -> str:
307    return self._resource_data['scope']
308
309  @property
310  def is_dataproc_provisioner(self) -> bool:
311    return self._resource_data['provisioner']['name'] == 'gcp-dataproc'
312
313  @property
314  def is_existing_dataproc_provisioner(self) -> bool:
315    return self._resource_data['provisioner']['name'] == 'gcp-existing-dataproc'
316
317  @property
318  def autoscaling_enabled(self) -> bool:
319    for value in self._resource_data['provisioner'].get('properties'):
320      if (value.get('name') == 'enablePredefinedAutoScaling' and
321          value.get('value') is not None):
322        return value.get('value') == 'true'
323    return False
324
325  @property
326  def image_version(self) -> str:
327    for value in self._resource_data['provisioner'].get('properties'):
328      if value.get('name') == 'imageVersion' and value.get('value') != '':
329        return value.get('value')
330    return 'No imageVersion defined'

Represents a Compute Profile.

Profile(project_id, resource_data)
277  def __init__(self, project_id, resource_data):
278    super().__init__(project_id=project_id)
279    self._resource_data = resource_data
full_path: str
281  @property
282  def full_path(self) -> str:
283    return self._resource_data['name']

Returns the full path of this resource.

Example: 'projects/gcpdiag-gke-1-9b90/zones/europe-west4-a/clusters/gke1'

short_path: str
285  @property
286  def short_path(self) -> str:
287    path = self.full_path
288    return path

Returns the short name for this resource.

Note that it isn't clear from this name what kind of resource it is.

Example: 'gke1'

name: str
290  @property
291  def name(self) -> str:
292    return self._resource_data['name']
region: str
294  @property
295  def region(self) -> str:
296    for value in self._resource_data['provisioner'].get('properties'):
297      if value.get('name') == 'region' and value.get('value') is not None:
298        return value.get('value')
299    return 'No region defined'
status: str
301  @property
302  def status(self) -> str:
303    return self._resource_data['status']
scope: str
305  @property
306  def scope(self) -> str:
307    return self._resource_data['scope']
is_dataproc_provisioner: bool
309  @property
310  def is_dataproc_provisioner(self) -> bool:
311    return self._resource_data['provisioner']['name'] == 'gcp-dataproc'
is_existing_dataproc_provisioner: bool
313  @property
314  def is_existing_dataproc_provisioner(self) -> bool:
315    return self._resource_data['provisioner']['name'] == 'gcp-existing-dataproc'
autoscaling_enabled: bool
317  @property
318  def autoscaling_enabled(self) -> bool:
319    for value in self._resource_data['provisioner'].get('properties'):
320      if (value.get('name') == 'enablePredefinedAutoScaling' and
321          value.get('value') is not None):
322        return value.get('value') == 'true'
323    return False
image_version: str
325  @property
326  def image_version(self) -> str:
327    for value in self._resource_data['provisioner'].get('properties'):
328      if value.get('name') == 'imageVersion' and value.get('value') != '':
329        return value.get('value')
330    return 'No imageVersion defined'
Inherited Members
gcpdiag.models.Resource
project_id
@caching.cached_api_call
def get_instance_system_compute_profile( context: gcpdiag.models.Context, instance: Instance) -> Iterable[Profile]:
333@caching.cached_api_call
334def get_instance_system_compute_profile(
335    context: models.Context, instance: Instance) -> Iterable[Profile]:
336  """Get a list of datafusion Instance dataproc System compute profile."""
337  logging.info('fetching dataproc System compute profile list: %s',
338               context.project_id)
339  system_profiles: List[Profile] = []
340  cdap_endpoint = instance.api_endpoint
341  response = apis.make_request('GET', f'{cdap_endpoint}/v3/profiles')
342  if response is not None:
343    for res in response:
344      if (res['provisioner']['name'] == 'gcp-dataproc' or
345          res['provisioner']['name'] == 'gcp-existing-dataproc'):
346        system_profiles.append(Profile(context.project_id, res))
347  return system_profiles

Get a list of datafusion Instance dataproc System compute profile.

@caching.cached_api_call
def get_instance_user_compute_profile( context: gcpdiag.models.Context, instance: Instance) -> Iterable[Profile]:
350@caching.cached_api_call
351def get_instance_user_compute_profile(context: models.Context,
352                                      instance: Instance) -> Iterable[Profile]:
353  """Get a list of datafusion Instance dataproc User compute profile."""
354  logging.info('fetching dataproc User compute profile list: %s',
355               context.project_id)
356  user_profiles: List[Profile] = []
357  cdap_endpoint = instance.api_endpoint
358  response_namespaces = apis.make_request('GET',
359                                          f'{cdap_endpoint}/v3/namespaces')
360  if response_namespaces is not None:
361    for res in response_namespaces:
362      response = apis.make_request(
363          'GET', f"{cdap_endpoint}/v3/namespaces/{res['name']}/profiles")
364      if response is not None:
365        for res in response:
366          if (res['provisioner']['name'] == 'gcp-dataproc' or
367              res['provisioner']['name'] == 'gcp-existing-dataproc'):
368            user_profiles.append(Profile(context.project_id, res))
369      user_profiles = list(filter(bool, user_profiles))
370  return user_profiles

Get a list of datafusion Instance dataproc User compute profile.