gcpdiag.queries.dataflow
Queries related to Dataflow.
DATAFLOW_REGIONS =
['asia-northeast2', 'us-central1', 'northamerica-northeast1', 'us-west3', 'southamerica-east1', 'us-east1', 'asia-northeast1', 'europe-west1', 'europe-west2', 'asia-northeast3', 'us-west4', 'asia-east2', 'europe-central2', 'europe-west6', 'us-west2', 'australia-southeast1', 'europe-west3', 'asia-south1', 'us-west1', 'us-east4', 'asia-southeast1']
class
Job(gcpdiag.models.Resource):
20class Job(models.Resource): 21 """ Represents Dataflow job """ 22 _resource_data: dict 23 project_id: str 24 25 def __init__(self, project_id: str, resource_data: dict): 26 super().__init__(project_id) 27 self._resource_data = resource_data 28 29 @property 30 def full_path(self) -> str: 31 return self._resource_data['name'] 32 33 @property 34 def id(self) -> str: 35 return self._resource_data['id'] 36 37 @property 38 def state(self) -> str: 39 return self._resource_data['currentState'] 40 41 @property 42 def sdk_support_status(self) -> str: 43 return self._resource_data['jobMetadata']['sdkVersion']['sdkSupportStatus'] 44 45 @property 46 def minutes_in_current_state(self) -> int: 47 timestamp = datetime.strptime(self._resource_data['currentStateTime'], 48 '%Y-%m-%dT%H:%M:%S.%fZ') 49 delta = datetime.now() - timestamp 50 return int(delta.total_seconds() // 60)
Represents Dataflow job
project_id: str
246 @property 247 def project_id(self) -> str: 248 """Project id (not project number).""" 249 return self._project_id
Project id (not project number).
full_path: str
Returns the full path of this resource.
Example: 'projects/gcpdiag-gke-1-9b90/zones/europe-west4-a/clusters/gke1'
Inherited Members
- gcpdiag.models.Resource
- short_path
53def get_region_dataflow_jobs(api, context: models.Context, 54 region: str) -> List[Job]: 55 response = apis_utils.list_all( 56 request=api.projects().locations().jobs().list( 57 projectId=context.project_id, location=region), 58 next_function=api.projects().locations().jobs().list_next, 59 response_keyword='jobs') 60 jobs = [] 61 for job in response: 62 location = job.get('location', '') 63 labels = job.get('labels', {}) 64 name = job.get('name', '') 65 66 # add job id as one of labels for filtering 67 labels['id'] = job.get('id', '') 68 69 # we could get the specific job but correctly matching the location will take too 70 # much effort. Hence get all the jobs and filter afterwards 71 # https://cloud.google.com/dataflow/docs/reference/rest/v1b3/projects.jobs/list#query-parameters 72 if not context.match_project_resource( 73 location=location, labels=labels, resource=name): 74 continue 75 jobs.append(Job(context.project_id, job)) 76 return jobs
79@caching.cached_api_call 80def get_all_dataflow_jobs(context: models.Context) -> List[Job]: 81 api = apis.get_api('dataflow', 'v1b3', context.project_id) 82 83 if not apis.is_enabled(context.project_id, 'dataflow'): 84 return [] 85 86 result: List[Job] = [] 87 executor = get_executor() 88 for jobs in executor.map(lambda r: get_region_dataflow_jobs(api, context, r), 89 DATAFLOW_REGIONS): 90 result += jobs 91 92 print(f'\n\nFound {len(result)} Dataflow jobs\n') 93 94 # print one Dataflow job id when it is found 95 if context.labels and result and 'id' in context.labels: 96 print(f'{result[0].full_path} - {result[0].id}\n') 97 98 return result