gcpdiag.queries.dataflow

Queries related to Dataflow.
DATAFLOW_REGIONS = ['asia-northeast2', 'us-central1', 'northamerica-northeast1', 'us-west3', 'southamerica-east1', 'us-east1', 'asia-northeast1', 'europe-west1', 'europe-west2', 'asia-northeast3', 'us-west4', 'asia-east2', 'europe-central2', 'europe-west6', 'us-west2', 'australia-southeast1', 'europe-west3', 'asia-south1', 'us-west1', 'us-east4', 'asia-southeast1']
class Job(gcpdiag.models.Resource):
23class Job(models.Resource):
24  """Represents Dataflow job.
25
26  resource_data is of the form similar to:
27  {'id': 'my_job_id',
28  'projectId': 'my_project_id',
29  'name': 'pubsubtogcs-20240328-122953',
30  'environment': {},
31  'currentState': 'JOB_STATE_FAILED',
32  'currentStateTime': '2024-03-28T12:34:27.383249Z',
33  'createTime': '2024-03-28T12:29:55.284524Z',
34  'location': 'europe-west2',
35  'startTime': '2024-03-28T12:29:55.284524Z'}
36  """
37  _resource_data: dict
38  project_id: str
39
40  def __init__(self, project_id: str, resource_data: dict):
41    super().__init__(project_id)
42    self._resource_data = resource_data
43
44  @property
45  def full_path(self) -> str:
46    return self._resource_data['name']
47
48  @property
49  def id(self) -> str:
50    return self._resource_data['id']
51
52  @property
53  def state(self) -> str:
54    return self._resource_data['currentState']
55
56  @property
57  def job_type(self) -> str:
58    return self._resource_data['type']
59
60  @property
61  def location(self) -> str:
62    return self._resource_data['location']
63
64  @property
65  def sdk_support_status(self) -> str:
66    return self._resource_data['jobMetadata']['sdkVersion']['sdkSupportStatus']
67
68  @property
69  def sdk_language(self) -> str:
70    return self._resource_data['jobMetadata']['sdkVersion'][
71        'versionDisplayName']
72
73  @property
74  def minutes_in_current_state(self) -> int:
75    timestamp = datetime.strptime(self._resource_data['currentStateTime'],
76                                  '%Y-%m-%dT%H:%M:%S.%fZ')
77    delta = datetime.now() - timestamp
78    return int(delta.total_seconds() // 60)

Represents Dataflow job.

resource_data is of the form similar to: {'id': 'my_job_id', 'projectId': 'my_project_id', 'name': 'pubsubtogcs-20240328-122953', 'environment': {}, 'currentState': 'JOB_STATE_FAILED', 'currentStateTime': '2024-03-28T12:34:27.383249Z', 'createTime': '2024-03-28T12:29:55.284524Z', 'location': 'europe-west2', 'startTime': '2024-03-28T12:29:55.284524Z'}

Job(project_id: str, resource_data: dict)
40  def __init__(self, project_id: str, resource_data: dict):
41    super().__init__(project_id)
42    self._resource_data = resource_data
project_id: str
266  @property
267  def project_id(self) -> str:
268    """Project id (not project number)."""
269    return self._project_id

Project id (not project number).

full_path: str
44  @property
45  def full_path(self) -> str:
46    return self._resource_data['name']

Returns the full path of this resource.

Example: 'projects/gcpdiag-gke-1-9b90/zones/europe-west4-a/clusters/gke1'

id: str
48  @property
49  def id(self) -> str:
50    return self._resource_data['id']
state: str
52  @property
53  def state(self) -> str:
54    return self._resource_data['currentState']
job_type: str
56  @property
57  def job_type(self) -> str:
58    return self._resource_data['type']
location: str
60  @property
61  def location(self) -> str:
62    return self._resource_data['location']
sdk_support_status: str
64  @property
65  def sdk_support_status(self) -> str:
66    return self._resource_data['jobMetadata']['sdkVersion']['sdkSupportStatus']
sdk_language: str
68  @property
69  def sdk_language(self) -> str:
70    return self._resource_data['jobMetadata']['sdkVersion'][
71        'versionDisplayName']
minutes_in_current_state: int
73  @property
74  def minutes_in_current_state(self) -> int:
75    timestamp = datetime.strptime(self._resource_data['currentStateTime'],
76                                  '%Y-%m-%dT%H:%M:%S.%fZ')
77    delta = datetime.now() - timestamp
78    return int(delta.total_seconds() // 60)
def get_region_dataflow_jobs( api, context: gcpdiag.models.Context, region: str) -> List[Job]:
 81def get_region_dataflow_jobs(api, context: models.Context,
 82                             region: str) -> List[Job]:
 83  response = apis_utils.list_all(
 84      request=api.projects().locations().jobs().list(
 85          projectId=context.project_id, location=region),
 86      next_function=api.projects().locations().jobs().list_next,
 87      response_keyword='jobs')
 88  jobs = []
 89  for job in response:
 90    location = job.get('location', '')
 91    labels = job.get('labels', {})
 92    name = job.get('name', '')
 93
 94    # add job id as one of labels for filtering
 95    labels['id'] = job.get('id', '')
 96
 97    # we could get the specific job but correctly matching the location will take too
 98    # much effort. Hence get all the jobs and filter afterwards
 99    # https://cloud.google.com/dataflow/docs/reference/rest/v1b3/projects.jobs/list#query-parameters
100    if not context.match_project_resource(
101        location=location, labels=labels, resource=name):
102      continue
103    jobs.append(Job(context.project_id, job))
104  return jobs
@caching.cached_api_call
def get_all_dataflow_jobs(context: gcpdiag.models.Context) -> List[Job]:
107@caching.cached_api_call
108def get_all_dataflow_jobs(context: models.Context) -> List[Job]:
109  api = apis.get_api('dataflow', 'v1b3', context.project_id)
110
111  if not apis.is_enabled(context.project_id, 'dataflow'):
112    return []
113
114  result: List[Job] = []
115  executor = get_executor(context)
116  for jobs in executor.map(lambda r: get_region_dataflow_jobs(api, context, r),
117                           DATAFLOW_REGIONS):
118    result += jobs
119
120  print(f'\n\nFound {len(result)} Dataflow jobs\n')
121
122  # print one Dataflow job id when it is found
123  if context.labels and result and 'id' in context.labels:
124    print(f'{result[0].full_path} - {result[0].id}\n')
125
126  return result
@caching.cached_api_call
def get_job( project_id: str, job: str, region: str) -> Optional[Job]:
129@caching.cached_api_call
130def get_job(project_id: str, job: str, region: str) -> Union[Job, None]:
131  """Fetch a specific Dataflow job."""
132  api = apis.get_api('dataflow', 'v1b3', project_id)
133
134  if not apis.is_enabled(project_id, 'dataflow'):
135    return None
136
137  query = (api.projects().locations().jobs().get(projectId=project_id,
138                                                 location=region,
139                                                 jobId=job))
140  try:
141    resp = query.execute(num_retries=config.API_RETRIES)
142    return Job(project_id, resp)
143  except googleapiclient.errors.HttpError as err:
144    raise utils.GcpApiError(err) from err

Fetch a specific Dataflow job.

@caching.cached_api_call
def get_all_dataflow_jobs_for_project( project_id: str, filter_str: Optional[str] = None) -> Optional[List[Job]]:
147@caching.cached_api_call
148def get_all_dataflow_jobs_for_project(
149    project_id: str,
150    filter_str: Optional[str] = None,
151) -> Union[List[Job], None]:
152  """Fetch all Dataflow jobs for a project."""
153  api = apis.get_api('dataflow', 'v1b3', project_id)
154
155  if not apis.is_enabled(project_id, 'dataflow'):
156    return None
157
158  jobs: List[Job] = []
159
160  request = (api.projects().jobs().aggregated(projectId=project_id,
161                                              filter=filter_str))
162  logging.debug('listing dataflow jobs of project %s', project_id)
163
164  while request:  # Continue as long as there are pages
165    response = request.execute(num_retries=config.API_RETRIES)
166    if 'jobs' in response:
167      jobs.extend([Job(project_id, job) for job in response['jobs']])
168    request = (api.projects().jobs().aggregated_next(
169        previous_request=request, previous_response=response))
170  return jobs

Fetch all Dataflow jobs for a project.

@caching.cached_api_call
def logs_excluded(project_id: str) -> Optional[bool]:
173@caching.cached_api_call
174def logs_excluded(project_id: str) -> Union[bool, None]:
175  """Check if Dataflow Logs are excluded."""
176
177  if not apis.is_enabled(project_id, 'dataflow'):
178    return None
179
180  exclusions = logs.exclusions(project_id)
181  if exclusions is None:
182    return None
183  else:
184    for log_exclusion in exclusions:
185      if 'resource.type="dataflow_step"' in log_exclusion.filter and log_exclusion.disabled:
186        return True
187  return False

Check if Dataflow Logs are excluded.