gcpdiag.queries.dataflow

Queries related to Dataflow.
DATAFLOW_REGIONS = ['asia-northeast2', 'us-central1', 'northamerica-northeast1', 'us-west3', 'southamerica-east1', 'us-east1', 'asia-northeast1', 'europe-west1', 'europe-west2', 'asia-northeast3', 'us-west4', 'asia-east2', 'europe-central2', 'europe-west6', 'us-west2', 'australia-southeast1', 'europe-west3', 'asia-south1', 'us-west1', 'us-east4', 'asia-southeast1']
class Job(gcpdiag.models.Resource):
22class Job(models.Resource):
23  """Represents Dataflow job.
24
25  resource_data is of the form similar to:
26  {'id': 'my_job_id',
27  'projectId': 'my_project_id',
28  'name': 'pubsubtogcs-20240328-122953',
29  'environment': {},
30  'currentState': 'JOB_STATE_FAILED',
31  'currentStateTime': '2024-03-28T12:34:27.383249Z',
32  'createTime': '2024-03-28T12:29:55.284524Z',
33  'location': 'europe-west2',
34  'startTime': '2024-03-28T12:29:55.284524Z'}
35  """
36  _resource_data: dict
37  project_id: str
38
39  def __init__(self, project_id: str, resource_data: dict):
40    super().__init__(project_id)
41    self._resource_data = resource_data
42
43  @property
44  def full_path(self) -> str:
45    return self._resource_data['name']
46
47  @property
48  def id(self) -> str:
49    return self._resource_data['id']
50
51  @property
52  def state(self) -> str:
53    return self._resource_data['currentState']
54
55  @property
56  def job_type(self) -> str:
57    return self._resource_data['type']
58
59  @property
60  def sdk_support_status(self) -> str:
61    return self._resource_data['jobMetadata']['sdkVersion']['sdkSupportStatus']
62
63  @property
64  def sdk_language(self) -> str:
65    return self._resource_data['jobMetadata']['sdkVersion'][
66        'versionDisplayName']
67
68  @property
69  def minutes_in_current_state(self) -> int:
70    timestamp = datetime.strptime(self._resource_data['currentStateTime'],
71                                  '%Y-%m-%dT%H:%M:%S.%fZ')
72    delta = datetime.now() - timestamp
73    return int(delta.total_seconds() // 60)

Represents Dataflow job.

resource_data is of the form similar to: {'id': 'my_job_id', 'projectId': 'my_project_id', 'name': 'pubsubtogcs-20240328-122953', 'environment': {}, 'currentState': 'JOB_STATE_FAILED', 'currentStateTime': '2024-03-28T12:34:27.383249Z', 'createTime': '2024-03-28T12:29:55.284524Z', 'location': 'europe-west2', 'startTime': '2024-03-28T12:29:55.284524Z'}

Job(project_id: str, resource_data: dict)
39  def __init__(self, project_id: str, resource_data: dict):
40    super().__init__(project_id)
41    self._resource_data = resource_data
project_id: str
249  @property
250  def project_id(self) -> str:
251    """Project id (not project number)."""
252    return self._project_id

Project id (not project number).

full_path: str
43  @property
44  def full_path(self) -> str:
45    return self._resource_data['name']

Returns the full path of this resource.

Example: 'projects/gcpdiag-gke-1-9b90/zones/europe-west4-a/clusters/gke1'

id: str
47  @property
48  def id(self) -> str:
49    return self._resource_data['id']
state: str
51  @property
52  def state(self) -> str:
53    return self._resource_data['currentState']
job_type: str
55  @property
56  def job_type(self) -> str:
57    return self._resource_data['type']
sdk_support_status: str
59  @property
60  def sdk_support_status(self) -> str:
61    return self._resource_data['jobMetadata']['sdkVersion']['sdkSupportStatus']
sdk_language: str
63  @property
64  def sdk_language(self) -> str:
65    return self._resource_data['jobMetadata']['sdkVersion'][
66        'versionDisplayName']
minutes_in_current_state: int
68  @property
69  def minutes_in_current_state(self) -> int:
70    timestamp = datetime.strptime(self._resource_data['currentStateTime'],
71                                  '%Y-%m-%dT%H:%M:%S.%fZ')
72    delta = datetime.now() - timestamp
73    return int(delta.total_seconds() // 60)
def get_region_dataflow_jobs( api, context: gcpdiag.models.Context, region: str) -> List[Job]:
76def get_region_dataflow_jobs(api, context: models.Context,
77                             region: str) -> List[Job]:
78  response = apis_utils.list_all(
79      request=api.projects().locations().jobs().list(
80          projectId=context.project_id, location=region),
81      next_function=api.projects().locations().jobs().list_next,
82      response_keyword='jobs')
83  jobs = []
84  for job in response:
85    location = job.get('location', '')
86    labels = job.get('labels', {})
87    name = job.get('name', '')
88
89    # add job id as one of labels for filtering
90    labels['id'] = job.get('id', '')
91
92    # we could get the specific job but correctly matching the location will take too
93    # much effort. Hence get all the jobs and filter afterwards
94    # https://cloud.google.com/dataflow/docs/reference/rest/v1b3/projects.jobs/list#query-parameters
95    if not context.match_project_resource(
96        location=location, labels=labels, resource=name):
97      continue
98    jobs.append(Job(context.project_id, job))
99  return jobs
@caching.cached_api_call
def get_all_dataflow_jobs(context: gcpdiag.models.Context) -> List[Job]:
102@caching.cached_api_call
103def get_all_dataflow_jobs(context: models.Context) -> List[Job]:
104  api = apis.get_api('dataflow', 'v1b3', context.project_id)
105
106  if not apis.is_enabled(context.project_id, 'dataflow'):
107    return []
108
109  result: List[Job] = []
110  executor = get_executor()
111  for jobs in executor.map(lambda r: get_region_dataflow_jobs(api, context, r),
112                           DATAFLOW_REGIONS):
113    result += jobs
114
115  print(f'\n\nFound {len(result)} Dataflow jobs\n')
116
117  # print one Dataflow job id when it is found
118  if context.labels and result and 'id' in context.labels:
119    print(f'{result[0].full_path} - {result[0].id}\n')
120
121  return result
@caching.cached_api_call
def get_job( project_id: str, job: str, region: str) -> Optional[Job]:
124@caching.cached_api_call
125def get_job(project_id: str, job: str, region: str) -> Union[Job, None]:
126  """Fetch a specific Dataflow job."""
127  api = apis.get_api('dataflow', 'v1b3', project_id)
128
129  if not apis.is_enabled(project_id, 'dataflow'):
130    return None
131
132  query = (api.projects().locations().jobs().get(projectId=project_id,
133                                                 location=region,
134                                                 jobId=job))
135  try:
136    resp = query.execute(num_retries=config.API_RETRIES)
137    return Job(project_id, resp)
138  except googleapiclient.errors.HttpError as err:
139    raise utils.GcpApiError(err) from err

Fetch a specific Dataflow job.

@caching.cached_api_call
def logs_excluded(project_id: str) -> Optional[bool]:
142@caching.cached_api_call
143def logs_excluded(project_id: str) -> Union[bool, None]:
144  """Check if Dataflow Logs are excluded."""
145
146  if not apis.is_enabled(project_id, 'dataflow'):
147    return None
148
149  exclusions = logs.exclusions(project_id)
150  if exclusions is None:
151    return None
152  else:
153    for log_exclusion in exclusions:
154      if 'resource.type="dataflow_step"' in log_exclusion.filter and log_exclusion.disabled:
155        return True
156  return False

Check if Dataflow Logs are excluded.