gcpdiag.queries.dataflow
Queries related to Dataflow.
DATAFLOW_REGIONS =
['asia-northeast2', 'us-central1', 'northamerica-northeast1', 'us-west3', 'southamerica-east1', 'us-east1', 'asia-northeast1', 'europe-west1', 'europe-west2', 'asia-northeast3', 'us-west4', 'asia-east2', 'europe-central2', 'europe-west6', 'us-west2', 'australia-southeast1', 'europe-west3', 'asia-south1', 'us-west1', 'us-east4', 'asia-southeast1']
class
Job(gcpdiag.models.Resource):
22class Job(models.Resource): 23 """Represents Dataflow job. 24 25 resource_data is of the form similar to: 26 {'id': 'my_job_id', 27 'projectId': 'my_project_id', 28 'name': 'pubsubtogcs-20240328-122953', 29 'environment': {}, 30 'currentState': 'JOB_STATE_FAILED', 31 'currentStateTime': '2024-03-28T12:34:27.383249Z', 32 'createTime': '2024-03-28T12:29:55.284524Z', 33 'location': 'europe-west2', 34 'startTime': '2024-03-28T12:29:55.284524Z'} 35 """ 36 _resource_data: dict 37 project_id: str 38 39 def __init__(self, project_id: str, resource_data: dict): 40 super().__init__(project_id) 41 self._resource_data = resource_data 42 43 @property 44 def full_path(self) -> str: 45 return self._resource_data['name'] 46 47 @property 48 def id(self) -> str: 49 return self._resource_data['id'] 50 51 @property 52 def state(self) -> str: 53 return self._resource_data['currentState'] 54 55 @property 56 def job_type(self) -> str: 57 return self._resource_data['type'] 58 59 @property 60 def sdk_support_status(self) -> str: 61 return self._resource_data['jobMetadata']['sdkVersion']['sdkSupportStatus'] 62 63 @property 64 def sdk_language(self) -> str: 65 return self._resource_data['jobMetadata']['sdkVersion'][ 66 'versionDisplayName'] 67 68 @property 69 def minutes_in_current_state(self) -> int: 70 timestamp = datetime.strptime(self._resource_data['currentStateTime'], 71 '%Y-%m-%dT%H:%M:%S.%fZ') 72 delta = datetime.now() - timestamp 73 return int(delta.total_seconds() // 60)
Represents Dataflow job.
resource_data is of the form similar to: {'id': 'my_job_id', 'projectId': 'my_project_id', 'name': 'pubsubtogcs-20240328-122953', 'environment': {}, 'currentState': 'JOB_STATE_FAILED', 'currentStateTime': '2024-03-28T12:34:27.383249Z', 'createTime': '2024-03-28T12:29:55.284524Z', 'location': 'europe-west2', 'startTime': '2024-03-28T12:29:55.284524Z'}
project_id: str
249 @property 250 def project_id(self) -> str: 251 """Project id (not project number).""" 252 return self._project_id
Project id (not project number).
full_path: str
Returns the full path of this resource.
Example: 'projects/gcpdiag-gke-1-9b90/zones/europe-west4-a/clusters/gke1'
76def get_region_dataflow_jobs(api, context: models.Context, 77 region: str) -> List[Job]: 78 response = apis_utils.list_all( 79 request=api.projects().locations().jobs().list( 80 projectId=context.project_id, location=region), 81 next_function=api.projects().locations().jobs().list_next, 82 response_keyword='jobs') 83 jobs = [] 84 for job in response: 85 location = job.get('location', '') 86 labels = job.get('labels', {}) 87 name = job.get('name', '') 88 89 # add job id as one of labels for filtering 90 labels['id'] = job.get('id', '') 91 92 # we could get the specific job but correctly matching the location will take too 93 # much effort. Hence get all the jobs and filter afterwards 94 # https://cloud.google.com/dataflow/docs/reference/rest/v1b3/projects.jobs/list#query-parameters 95 if not context.match_project_resource( 96 location=location, labels=labels, resource=name): 97 continue 98 jobs.append(Job(context.project_id, job)) 99 return jobs
102@caching.cached_api_call 103def get_all_dataflow_jobs(context: models.Context) -> List[Job]: 104 api = apis.get_api('dataflow', 'v1b3', context.project_id) 105 106 if not apis.is_enabled(context.project_id, 'dataflow'): 107 return [] 108 109 result: List[Job] = [] 110 executor = get_executor() 111 for jobs in executor.map(lambda r: get_region_dataflow_jobs(api, context, r), 112 DATAFLOW_REGIONS): 113 result += jobs 114 115 print(f'\n\nFound {len(result)} Dataflow jobs\n') 116 117 # print one Dataflow job id when it is found 118 if context.labels and result and 'id' in context.labels: 119 print(f'{result[0].full_path} - {result[0].id}\n') 120 121 return result
124@caching.cached_api_call 125def get_job(project_id: str, job: str, region: str) -> Union[Job, None]: 126 """Fetch a specific Dataflow job.""" 127 api = apis.get_api('dataflow', 'v1b3', project_id) 128 129 if not apis.is_enabled(project_id, 'dataflow'): 130 return None 131 132 query = (api.projects().locations().jobs().get(projectId=project_id, 133 location=region, 134 jobId=job)) 135 try: 136 resp = query.execute(num_retries=config.API_RETRIES) 137 return Job(project_id, resp) 138 except googleapiclient.errors.HttpError as err: 139 raise utils.GcpApiError(err) from err
Fetch a specific Dataflow job.
@caching.cached_api_call
def
logs_excluded(project_id: str) -> Optional[bool]:
142@caching.cached_api_call 143def logs_excluded(project_id: str) -> Union[bool, None]: 144 """Check if Dataflow Logs are excluded.""" 145 146 if not apis.is_enabled(project_id, 'dataflow'): 147 return None 148 149 exclusions = logs.exclusions(project_id) 150 if exclusions is None: 151 return None 152 else: 153 for log_exclusion in exclusions: 154 if 'resource.type="dataflow_step"' in log_exclusion.filter and log_exclusion.disabled: 155 return True 156 return False
Check if Dataflow Logs are excluded.