gcpdiag.queries.dataflow
Queries related to Dataflow.
DATAFLOW_REGIONS =
['asia-northeast2', 'us-central1', 'northamerica-northeast1', 'us-west3', 'southamerica-east1', 'us-east1', 'asia-northeast1', 'europe-west1', 'europe-west2', 'asia-northeast3', 'us-west4', 'asia-east2', 'europe-central2', 'europe-west6', 'us-west2', 'australia-southeast1', 'europe-west3', 'asia-south1', 'us-west1', 'us-east4', 'asia-southeast1']
class
Job(gcpdiag.models.Resource):
23class Job(models.Resource): 24 """Represents Dataflow job. 25 26 resource_data is of the form similar to: 27 {'id': 'my_job_id', 28 'projectId': 'my_project_id', 29 'name': 'pubsubtogcs-20240328-122953', 30 'environment': {}, 31 'currentState': 'JOB_STATE_FAILED', 32 'currentStateTime': '2024-03-28T12:34:27.383249Z', 33 'createTime': '2024-03-28T12:29:55.284524Z', 34 'location': 'europe-west2', 35 'startTime': '2024-03-28T12:29:55.284524Z'} 36 """ 37 _resource_data: dict 38 project_id: str 39 40 def __init__(self, project_id: str, resource_data: dict): 41 super().__init__(project_id) 42 self._resource_data = resource_data 43 44 @property 45 def full_path(self) -> str: 46 return self._resource_data['name'] 47 48 @property 49 def id(self) -> str: 50 return self._resource_data['id'] 51 52 @property 53 def state(self) -> str: 54 return self._resource_data['currentState'] 55 56 @property 57 def job_type(self) -> str: 58 return self._resource_data['type'] 59 60 @property 61 def location(self) -> str: 62 return self._resource_data['location'] 63 64 @property 65 def sdk_support_status(self) -> str: 66 return self._resource_data['jobMetadata']['sdkVersion']['sdkSupportStatus'] 67 68 @property 69 def sdk_language(self) -> str: 70 return self._resource_data['jobMetadata']['sdkVersion'][ 71 'versionDisplayName'] 72 73 @property 74 def minutes_in_current_state(self) -> int: 75 timestamp = datetime.strptime(self._resource_data['currentStateTime'], 76 '%Y-%m-%dT%H:%M:%S.%fZ') 77 delta = datetime.now() - timestamp 78 return int(delta.total_seconds() // 60)
Represents Dataflow job.
resource_data is of the form similar to: {'id': 'my_job_id', 'projectId': 'my_project_id', 'name': 'pubsubtogcs-20240328-122953', 'environment': {}, 'currentState': 'JOB_STATE_FAILED', 'currentStateTime': '2024-03-28T12:34:27.383249Z', 'createTime': '2024-03-28T12:29:55.284524Z', 'location': 'europe-west2', 'startTime': '2024-03-28T12:29:55.284524Z'}
project_id: str
266 @property 267 def project_id(self) -> str: 268 """Project id (not project number).""" 269 return self._project_id
Project id (not project number).
full_path: str
Returns the full path of this resource.
Example: 'projects/gcpdiag-gke-1-9b90/zones/europe-west4-a/clusters/gke1'
81def get_region_dataflow_jobs(api, context: models.Context, 82 region: str) -> List[Job]: 83 response = apis_utils.list_all( 84 request=api.projects().locations().jobs().list( 85 projectId=context.project_id, location=region), 86 next_function=api.projects().locations().jobs().list_next, 87 response_keyword='jobs') 88 jobs = [] 89 for job in response: 90 location = job.get('location', '') 91 labels = job.get('labels', {}) 92 name = job.get('name', '') 93 94 # add job id as one of labels for filtering 95 labels['id'] = job.get('id', '') 96 97 # we could get the specific job but correctly matching the location will take too 98 # much effort. Hence get all the jobs and filter afterwards 99 # https://cloud.google.com/dataflow/docs/reference/rest/v1b3/projects.jobs/list#query-parameters 100 if not context.match_project_resource( 101 location=location, labels=labels, resource=name): 102 continue 103 jobs.append(Job(context.project_id, job)) 104 return jobs
107@caching.cached_api_call 108def get_all_dataflow_jobs(context: models.Context) -> List[Job]: 109 api = apis.get_api('dataflow', 'v1b3', context.project_id) 110 111 if not apis.is_enabled(context.project_id, 'dataflow'): 112 return [] 113 114 result: List[Job] = [] 115 executor = get_executor(context) 116 for jobs in executor.map(lambda r: get_region_dataflow_jobs(api, context, r), 117 DATAFLOW_REGIONS): 118 result += jobs 119 120 print(f'\n\nFound {len(result)} Dataflow jobs\n') 121 122 # print one Dataflow job id when it is found 123 if context.labels and result and 'id' in context.labels: 124 print(f'{result[0].full_path} - {result[0].id}\n') 125 126 return result
129@caching.cached_api_call 130def get_job(project_id: str, job: str, region: str) -> Union[Job, None]: 131 """Fetch a specific Dataflow job.""" 132 api = apis.get_api('dataflow', 'v1b3', project_id) 133 134 if not apis.is_enabled(project_id, 'dataflow'): 135 return None 136 137 query = (api.projects().locations().jobs().get(projectId=project_id, 138 location=region, 139 jobId=job)) 140 try: 141 resp = query.execute(num_retries=config.API_RETRIES) 142 return Job(project_id, resp) 143 except googleapiclient.errors.HttpError as err: 144 raise utils.GcpApiError(err) from err
Fetch a specific Dataflow job.
@caching.cached_api_call
def
get_all_dataflow_jobs_for_project( project_id: str, filter_str: Optional[str] = None) -> Optional[List[Job]]:
147@caching.cached_api_call 148def get_all_dataflow_jobs_for_project( 149 project_id: str, 150 filter_str: Optional[str] = None, 151) -> Union[List[Job], None]: 152 """Fetch all Dataflow jobs for a project.""" 153 api = apis.get_api('dataflow', 'v1b3', project_id) 154 155 if not apis.is_enabled(project_id, 'dataflow'): 156 return None 157 158 jobs: List[Job] = [] 159 160 request = (api.projects().jobs().aggregated(projectId=project_id, 161 filter=filter_str)) 162 logging.debug('listing dataflow jobs of project %s', project_id) 163 164 while request: # Continue as long as there are pages 165 response = request.execute(num_retries=config.API_RETRIES) 166 if 'jobs' in response: 167 jobs.extend([Job(project_id, job) for job in response['jobs']]) 168 request = (api.projects().jobs().aggregated_next( 169 previous_request=request, previous_response=response)) 170 return jobs
Fetch all Dataflow jobs for a project.
@caching.cached_api_call
def
logs_excluded(project_id: str) -> Optional[bool]:
173@caching.cached_api_call 174def logs_excluded(project_id: str) -> Union[bool, None]: 175 """Check if Dataflow Logs are excluded.""" 176 177 if not apis.is_enabled(project_id, 'dataflow'): 178 return None 179 180 exclusions = logs.exclusions(project_id) 181 if exclusions is None: 182 return None 183 else: 184 for log_exclusion in exclusions: 185 if 'resource.type="dataflow_step"' in log_exclusion.filter and log_exclusion.disabled: 186 return True 187 return False
Check if Dataflow Logs are excluded.