gcpdiag.queries.dataflow

Queries related to Dataflow.
DATAFLOW_REGIONS = ['asia-northeast2', 'us-central1', 'northamerica-northeast1', 'us-west3', 'southamerica-east1', 'us-east1', 'asia-northeast1', 'europe-west1', 'europe-west2', 'asia-northeast3', 'us-west4', 'asia-east2', 'europe-central2', 'europe-west6', 'us-west2', 'australia-southeast1', 'europe-west3', 'asia-south1', 'us-west1', 'us-east4', 'asia-southeast1']
class Job(gcpdiag.models.Resource):
20class Job(models.Resource):
21  """ Represents Dataflow job """
22  _resource_data: dict
23  project_id: str
24
25  def __init__(self, project_id: str, resource_data: dict):
26    super().__init__(project_id)
27    self._resource_data = resource_data
28
29  @property
30  def full_path(self) -> str:
31    return self._resource_data['name']
32
33  @property
34  def id(self) -> str:
35    return self._resource_data['id']
36
37  @property
38  def state(self) -> str:
39    return self._resource_data['currentState']
40
41  @property
42  def sdk_support_status(self) -> str:
43    return self._resource_data['jobMetadata']['sdkVersion']['sdkSupportStatus']
44
45  @property
46  def minutes_in_current_state(self) -> int:
47    timestamp = datetime.strptime(self._resource_data['currentStateTime'],
48                                  '%Y-%m-%dT%H:%M:%S.%fZ')
49    delta = datetime.now() - timestamp
50    return int(delta.total_seconds() // 60)

Represents Dataflow job

Job(project_id: str, resource_data: dict)
25  def __init__(self, project_id: str, resource_data: dict):
26    super().__init__(project_id)
27    self._resource_data = resource_data
project_id: str
246  @property
247  def project_id(self) -> str:
248    """Project id (not project number)."""
249    return self._project_id

Project id (not project number).

full_path: str
29  @property
30  def full_path(self) -> str:
31    return self._resource_data['name']

Returns the full path of this resource.

Example: 'projects/gcpdiag-gke-1-9b90/zones/europe-west4-a/clusters/gke1'

id: str
33  @property
34  def id(self) -> str:
35    return self._resource_data['id']
state: str
37  @property
38  def state(self) -> str:
39    return self._resource_data['currentState']
sdk_support_status: str
41  @property
42  def sdk_support_status(self) -> str:
43    return self._resource_data['jobMetadata']['sdkVersion']['sdkSupportStatus']
minutes_in_current_state: int
45  @property
46  def minutes_in_current_state(self) -> int:
47    timestamp = datetime.strptime(self._resource_data['currentStateTime'],
48                                  '%Y-%m-%dT%H:%M:%S.%fZ')
49    delta = datetime.now() - timestamp
50    return int(delta.total_seconds() // 60)
Inherited Members
gcpdiag.models.Resource
short_path
def get_region_dataflow_jobs( api, context: gcpdiag.models.Context, region: str) -> List[Job]:
53def get_region_dataflow_jobs(api, context: models.Context,
54                             region: str) -> List[Job]:
55  response = apis_utils.list_all(
56      request=api.projects().locations().jobs().list(
57          projectId=context.project_id, location=region),
58      next_function=api.projects().locations().jobs().list_next,
59      response_keyword='jobs')
60  jobs = []
61  for job in response:
62    location = job.get('location', '')
63    labels = job.get('labels', {})
64    name = job.get('name', '')
65
66    # add job id as one of labels for filtering
67    labels['id'] = job.get('id', '')
68
69    # we could get the specific job but correctly matching the location will take too
70    # much effort. Hence get all the jobs and filter afterwards
71    # https://cloud.google.com/dataflow/docs/reference/rest/v1b3/projects.jobs/list#query-parameters
72    if not context.match_project_resource(
73        location=location, labels=labels, resource=name):
74      continue
75    jobs.append(Job(context.project_id, job))
76  return jobs
@caching.cached_api_call
def get_all_dataflow_jobs(context: gcpdiag.models.Context) -> List[Job]:
79@caching.cached_api_call
80def get_all_dataflow_jobs(context: models.Context) -> List[Job]:
81  api = apis.get_api('dataflow', 'v1b3', context.project_id)
82
83  if not apis.is_enabled(context.project_id, 'dataflow'):
84    return []
85
86  result: List[Job] = []
87  executor = get_executor()
88  for jobs in executor.map(lambda r: get_region_dataflow_jobs(api, context, r),
89                           DATAFLOW_REGIONS):
90    result += jobs
91
92  print(f'\n\nFound {len(result)} Dataflow jobs\n')
93
94  # print one Dataflow job id when it is found
95  if context.labels and result and 'id' in context.labels:
96    print(f'{result[0].full_path} - {result[0].id}\n')
97
98  return result