gcpdiag.queries.bigquery

Queries related to BigQuery.
BIGQUERY_REGIONS = ['me-central1', 'me-central2', 'me-west1', 'africa-south1', 'us', 'eu', 'us-east1', 'us-east4', 'us-east5', 'us-west1', 'us-west2', 'us-west3', 'us-west4', 'us-central1', 'us-south1', 'northamerica-northeast1', 'northamerica-northeast2', 'southamerica-east1', 'southamerica-west1', 'asia-east1', 'asia-east2', 'asia-south1', 'asia-south2', 'asia-northeast1', 'asia-northeast2', 'asia-northeast3', 'asia-southeast1', 'asia-southeast2', 'australia-southeast1', 'australia-southeast2', 'europe-north1', 'europe-southwest1', 'europe-central2', 'europe-west1', 'europe-west10', 'europe-west2', 'europe-west3', 'europe-west4', 'europe-west6', 'europe-west8', 'europe-west9', 'europe-west12']
C_NOT_AVAILABLE = 'N/A'
def get_project_policy(context: gcpdiag.models.Context):
66def get_project_policy(context: models.Context):
67  """Fetches the IAM policy object for a project."""
68  root_logger = logging.getLogger()
69  original_level = root_logger.level
70
71  try:
72    root_logger.setLevel(logging.ERROR)
73    policy = iam.get_project_policy(context, raise_error_if_fails=False)
74    return policy
75  except utils.GcpApiError:
76    return None
77  finally:
78    root_logger.setLevel(original_level)

Fetches the IAM policy object for a project.

def get_organization_policy(context: gcpdiag.models.Context, organization_id: str):
 81def get_organization_policy(context: models.Context, organization_id: str):
 82  """Fetches the IAM policy object for an organization."""
 83  root_logger = logging.getLogger()
 84  original_level = root_logger.level
 85
 86  try:
 87    root_logger.setLevel(logging.ERROR)
 88    policy = iam.get_organization_policy(context,
 89                                         organization_id,
 90                                         raise_error_if_fails=False)
 91    return policy
 92  except utils.GcpApiError as err:
 93    if 'doesn\'t have access to' in err.message.lower(
 94    ) or 'denied on resource' in err.message.lower():
 95      op.info(
 96          'User does not have access to the organization policy. Investigation'
 97          ' completeness and accuracy might depend on the presence of'
 98          ' organization level permissions.')
 99    return None
100  finally:
101    root_logger.setLevel(original_level)

Fetches the IAM policy object for an organization.

def check_permissions_for_principal( policy: Union[gcpdiag.queries.iam.ProjectPolicy, gcpdiag.queries.iam.OrganizationPolicy], principal: str, permissions_to_check: Set[str]) -> Dict[str, bool]:
104def check_permissions_for_principal(
105    policy: PolicyObject, principal: str,
106    permissions_to_check: Set[str]) -> Dict[str, bool]:
107  """Uses a policy object to check a set of permissions for a principal.
108
109  Returns a dictionary mapping each permission to a boolean indicating its
110  presence.
111  """
112  return {
113      permission: policy.has_permission(principal, permission)
114      for permission in permissions_to_check
115  }

Uses a policy object to check a set of permissions for a principal.

Returns a dictionary mapping each permission to a boolean indicating its presence.

def get_missing_permissions( required_permissions: Set[str], actual_permissions: Dict[str, bool]) -> Set[str]:
118def get_missing_permissions(required_permissions: Set[str],
119                            actual_permissions: Dict[str, bool]) -> Set[str]:
120  """Compares a set of required permissions against a dictionary of actual
121
122  permissions and returns the set of missing ones.
123  """
124  return {
125      perm for perm in required_permissions if not actual_permissions.get(perm)
126  }

Compares a set of required permissions against a dictionary of actual

permissions and returns the set of missing ones.

class BigQueryTable:
129class BigQueryTable:
130  """Represents a BigQuery Table object."""
131
132  project_id: str
133  dataset_id: str
134  table_id: str
135
136  def __init__(self, project_id: str, dataset_id: str, table_id: str):
137    self.project_id = project_id
138    self.dataset_id = dataset_id
139    self.table_id = table_id
140
141  @property
142  def table_identifier(self) -> str:
143    return f'{self.project_id}:{self.dataset_id}.{self.table_id}'

Represents a BigQuery Table object.

BigQueryTable(project_id: str, dataset_id: str, table_id: str)
136  def __init__(self, project_id: str, dataset_id: str, table_id: str):
137    self.project_id = project_id
138    self.dataset_id = dataset_id
139    self.table_id = table_id
project_id: str
dataset_id: str
table_id: str
table_identifier: str
141  @property
142  def table_identifier(self) -> str:
143    return f'{self.project_id}:{self.dataset_id}.{self.table_id}'
class BigQueryRoutine:
146class BigQueryRoutine:
147  """Represents a BigQuery Routine object."""
148
149  project_id: str
150  dataset_id: str
151  routine_id: str
152
153  def __init__(self, project_id: str, dataset_id: str, routine_id: str):
154    self.project_id = project_id
155    self.dataset_id = dataset_id
156    self.routine_id = routine_id
157
158  @property
159  def routine_identifier(self) -> str:
160    return f'{self.project_id}:{self.dataset_id}.{self.routine_id}'

Represents a BigQuery Routine object.

BigQueryRoutine(project_id: str, dataset_id: str, routine_id: str)
153  def __init__(self, project_id: str, dataset_id: str, routine_id: str):
154    self.project_id = project_id
155    self.dataset_id = dataset_id
156    self.routine_id = routine_id
project_id: str
dataset_id: str
routine_id: str
routine_identifier: str
158  @property
159  def routine_identifier(self) -> str:
160    return f'{self.project_id}:{self.dataset_id}.{self.routine_id}'
class BigQueryJob(gcpdiag.models.Resource):
163class BigQueryJob(models.Resource):
164  """Represents a BigQuery Job object."""
165
166  _job_api_resource_data: dict[str, Any]
167  _information_schema_job_metadata: dict[str, Any]
168  project_id: str
169
170  def __init__(
171      self,
172      project_id: str,
173      job_api_resource_data: dict[str, Any],
174      information_schema_job_metadata: dict[str, str],
175  ):
176    super().__init__(project_id)
177    self._job_api_resource_data = job_api_resource_data
178    self._information_schema_job_metadata = (information_schema_job_metadata or
179                                             {})
180
181  @property
182  def full_path(self) -> str:
183    # returns 'https://content-bigquery.googleapis.com/bigquery/v2/
184    # projects/<PROJECT_ID>/jobs/<JOBID>?location=<REGION>'
185    return self._job_api_resource_data.get('selfLink', '')
186
187  @property
188  def id(self) -> str:
189    # returns <PROJECT>:<REGION>.<JobID>
190    return self._job_api_resource_data.get('id', '')
191
192  @property
193  def short_path(self) -> str:
194    # returns <PROJECT>:<REGION>.<JobID>
195    return self.id
196
197  @property
198  def user_email(self) -> str:
199    return self._job_api_resource_data.get('user_email', '')
200
201  @property
202  def _job_configuration(self) -> dict[str, Any]:
203    return self._job_api_resource_data.get('configuration', {})
204
205  @property
206  def _query(self) -> dict[str, Any]:
207    return self._job_configuration.get('query', {})
208
209  @property
210  def _stats(self) -> dict[str, Any]:
211    """Safely access the 'statistics' dictionary."""
212    return self._job_api_resource_data.get('statistics', {})
213
214  @property
215  def _query_stats(self) -> dict[str, Any]:
216    """Safely access the 'statistics.query' dictionary."""
217    return self._stats.get('query', {})
218
219  @property
220  def _query_info(self) -> dict[str, Any]:
221    return self._query_stats.get('queryInfo', {})
222
223  @property
224  def _status(self) -> dict[str, Any]:
225    return self._job_api_resource_data.get('status', {})
226
227  @property
228  def job_type(self) -> str:
229    return self._job_configuration.get('jobType', '')
230
231  @property
232  def query_sql(self) -> str:
233    return self._query.get('query', '')
234
235  @property
236  def use_legacy_sql(self) -> bool:
237    return self._query.get('useLegacySql', False)
238
239  @property
240  def priority(self) -> str:
241    return self._query.get('priority', '')
242
243  @property
244  def edition(self) -> str:
245    edition_value = self._query.get('edition')
246    return str(edition_value) if edition_value else ''
247
248  @property
249  def creation_time(self) -> Optional[int]:
250    time_str = self._stats.get('creationTime')
251    return (int(time_str)
252            if isinstance(time_str, str) and time_str.isdigit() else None)
253
254  @property
255  def start_time(self) -> Optional[int]:
256    time_str = self._stats.get('startTime')
257    return (int(time_str)
258            if isinstance(time_str, str) and time_str.isdigit() else None)
259
260  @property
261  def end_time(self) -> Optional[int]:
262    time_str = self._stats.get('endTime')
263    return (int(time_str)
264            if isinstance(time_str, str) and time_str.isdigit() else None)
265
266  @property
267  def total_bytes_processed(self) -> int:
268    bytes_str = self._stats.get('totalBytesProcessed', '0')
269    return (int(bytes_str)
270            if isinstance(bytes_str, str) and bytes_str.isdigit() else 0)
271
272  @property
273  def total_bytes_billed(self) -> int:
274    bytes_str = self._query_stats.get('totalBytesBilled', '0')
275    return (int(bytes_str)
276            if isinstance(bytes_str, str) and bytes_str.isdigit() else 0)
277
278  @property
279  def total_slot_ms(self) -> int:
280    ms_str = self._stats.get('totalSlotMs', '0')
281    return int(ms_str) if isinstance(ms_str, str) and ms_str.isdigit() else 0
282
283  @property
284  def cache_hit(self) -> bool:
285    return self._query_stats.get('cacheHit') is True
286
287  @property
288  def quota_deferments(self) -> list[str]:
289    deferments_dict = self._stats.get('quotaDeferments', {})
290    if isinstance(deferments_dict, dict):
291      deferment_list = deferments_dict.get('', [])
292      if isinstance(deferment_list, list) and all(
293          isinstance(s, str) for s in deferment_list):
294        return deferment_list
295    return []
296
297  @property
298  def query_plan(self) -> list[dict[str, Any]]:
299    plan = self._query_stats.get('queryPlan', [])
300    return plan if isinstance(plan, list) else []
301
302  @property
303  def total_partitions_processed(self) -> int:
304    partitions_str = self._query_stats.get('totalPartitionsProcessed', '0')
305    return (int(partitions_str) if isinstance(partitions_str, str) and
306            partitions_str.isdigit() else 0)
307
308  @property
309  def referenced_tables(self) -> list[BigQueryTable]:
310    tables_list = self._query_stats.get('referencedTables', [])
311    referenced_tables = []
312    if isinstance(tables_list, list):
313      for item in tables_list:
314        if isinstance(item, dict):
315          project_id = item.get('projectId')
316          dataset_id = item.get('datasetId')
317          table_id = item.get('tableId')
318          if (isinstance(project_id, str) and project_id and
319              isinstance(dataset_id, str) and dataset_id and
320              isinstance(table_id, str) and table_id):
321            referenced_tables.append(
322                BigQueryTable(project_id, dataset_id, table_id))
323    return referenced_tables
324
325  @property
326  def referenced_routines(self) -> list[BigQueryRoutine]:
327    routines_list = self._query_stats.get('referencedRoutines', [])
328    referenced_routines = []
329    if isinstance(routines_list, list):
330      for item in routines_list:
331        if isinstance(item, dict):
332          project_id = item.get('projectId')
333          dataset_id = item.get('datasetId')
334          routine_id = item.get('routineId')
335          if (isinstance(project_id, str) and project_id and
336              isinstance(dataset_id, str) and dataset_id and
337              isinstance(routine_id, str) and routine_id):
338            referenced_routines.append(
339                BigQueryRoutine(project_id, dataset_id, routine_id))
340    return referenced_routines
341
342  @property
343  def num_affected_dml_rows(self) -> int:
344    rows_str = self._query_stats.get('numDmlAffectedRows', '0')
345    return (int(rows_str)
346            if isinstance(rows_str, str) and rows_str.isdigit() else 0)
347
348  @property
349  def dml_stats(self) -> dict[str, int]:
350    stats = self._query_stats.get('dmlStats')
351    if not isinstance(stats, dict):
352      return {}
353    inserted_str = stats.get('insertedRowCount', '0')
354    deleted_str = stats.get('deletedRowCount', '0')
355    updated_str = stats.get('updatedRowCount', '0')
356    return {
357        'insertedRowCount':
358            (int(inserted_str) if isinstance(inserted_str, str) and
359             inserted_str.isdigit() else 0),
360        'deletedRowCount': (int(deleted_str) if isinstance(deleted_str, str) and
361                            deleted_str.isdigit() else 0),
362        'updatedRowCount': (int(updated_str) if isinstance(updated_str, str) and
363                            updated_str.isdigit() else 0),
364    }
365
366  @property
367  def statement_type(self) -> str:
368    stype = self._query_stats.get('statementType', '')
369    return stype if isinstance(stype, str) else ''
370
371  @property
372  def bi_engine_statistics(self) -> dict[str, Any]:
373    stats = self._query_stats.get('biEngineStatistics')
374    if not isinstance(stats, dict):
375      return {}
376    reasons_list = stats.get('accelerationMode', {}).get('biEngineReasons', [])
377    bi_engine_reasons = []
378    if isinstance(reasons_list, list):
379      for item in reasons_list:
380        if isinstance(item, dict):
381          bi_engine_reasons.append({
382              'code': str(item.get('code', '')),
383              'message': item.get('message', ''),
384          })
385    return {
386        'biEngineMode': str(stats.get('biEngineMode', '')),
387        'accelerationMode': str(stats.get('accelerationMode', '')),
388        'biEngineReasons': bi_engine_reasons,
389    }
390
391  @property
392  def vector_search_statistics(self) -> dict[str, Any]:
393    stats = self._query_stats.get('vectorSearchStatistics')
394    if not isinstance(stats, dict):
395      return {}
396    reasons_list = stats.get('indexUnusedReasons', [])
397    index_unused_reasons = []
398    if isinstance(reasons_list, list):
399      for item in reasons_list:
400        if isinstance(item, dict):
401          base_table_data = item.get('baseTable')
402          base_table_obj = None
403          if isinstance(base_table_data, dict):
404            project_id = base_table_data.get('projectId')
405            dataset_id = base_table_data.get('datasetId')
406            table_id = base_table_data.get('tableId')
407            if (isinstance(project_id, str) and project_id and
408                isinstance(dataset_id, str) and dataset_id and
409                isinstance(table_id, str) and table_id):
410              base_table_obj = BigQueryTable(project_id, dataset_id, table_id)
411          index_unused_reasons.append({
412              'code': str(item.get('code', '')),
413              'message': item.get('message', ''),
414              'indexName': item.get('indexName', ''),
415              'baseTable': base_table_obj,
416          })
417    return {
418        'indexUsageMode': str(stats.get('indexUsageMode', '')),
419        'indexUnusedReasons': index_unused_reasons,
420    }
421
422  @property
423  def performance_insights(self) -> dict[str, Any]:
424    insights = self._query_stats.get('performanceInsights')
425    if not isinstance(insights, dict):
426      return {}
427    standalone_list = insights.get('stagePerformanceStandaloneInsights', [])
428    stage_performance_standalone_insights = []
429    if isinstance(standalone_list, list):
430      for item in standalone_list:
431        if isinstance(item, dict):
432          stage_performance_standalone_insights.append({
433              'stageId': item.get('stageId', ''),
434          })
435    change_list = insights.get('stagePerformanceChangeInsights', [])
436    stage_performance_change_insights = []
437    if isinstance(change_list, list):
438      for item in change_list:
439        if isinstance(item, dict):
440          stage_performance_change_insights.append({
441              'stageId': item.get('stageId', ''),
442          })
443    avg_ms_str = insights.get('avgPreviousExecutionMs', '0')
444    return {
445        'avgPreviousExecutionMs':
446            (int(avg_ms_str)
447             if isinstance(avg_ms_str, str) and avg_ms_str.isdigit() else 0),
448        'stagePerformanceStandaloneInsights':
449            (stage_performance_standalone_insights),
450        'stagePerformanceChangeInsights': stage_performance_change_insights,
451    }
452
453  @property
454  def optimization_details(self) -> Any:
455    return self._query_info.get('optimizationDetails')
456
457  @property
458  def export_data_statistics(self) -> dict[str, int]:
459    stats = self._query_stats.get('exportDataStatistics')
460    if not isinstance(stats, dict):
461      return {}
462    file_count_str = stats.get('fileCount', '0')
463    row_count_str = stats.get('rowCount', '0')
464    return {
465        'fileCount': (int(file_count_str) if isinstance(file_count_str, str) and
466                      file_count_str.isdigit() else 0),
467        'rowCount': (int(row_count_str) if isinstance(row_count_str, str) and
468                     row_count_str.isdigit() else 0),
469    }
470
471  @property
472  def load_query_statistics(self) -> dict[str, int]:
473    stats = self._query_stats.get('loadQueryStatistics')
474    if not isinstance(stats, dict):
475      return {}
476    input_files_str = stats.get('inputFiles', '0')
477    input_bytes_str = stats.get('inputFileBytes', '0')
478    output_rows_str = stats.get('outputRows', '0')
479    output_bytes_str = stats.get('outputBytes', '0')
480    bad_records_str = stats.get('badRecords', '0')
481    return {
482        'inputFiles':
483            (int(input_files_str) if isinstance(input_files_str, str) and
484             input_files_str.isdigit() else 0),
485        'inputFileBytes':
486            (int(input_bytes_str) if isinstance(input_bytes_str, str) and
487             input_bytes_str.isdigit() else 0),
488        'outputRows':
489            (int(output_rows_str) if isinstance(output_rows_str, str) and
490             output_rows_str.isdigit() else 0),
491        'outputBytes':
492            (int(output_bytes_str) if isinstance(output_bytes_str, str) and
493             output_bytes_str.isdigit() else 0),
494        'badRecords':
495            (int(bad_records_str) if isinstance(bad_records_str, str) and
496             bad_records_str.isdigit() else 0),
497    }
498
499  @property
500  def spark_statistics(self) -> dict[str, Any]:
501    stats = self._query_stats.get('sparkStatistics')
502    if not isinstance(stats, dict):
503      return {}
504    logging_info_dict = stats.get('loggingInfo', {})
505    logging_info = ({
506        'resourceType': logging_info_dict.get('resourceType', ''),
507        'projectId': logging_info_dict.get('projectId', ''),
508    } if isinstance(logging_info_dict, dict) else {})
509    return {
510        'endpoints': stats.get('endpoints', {}),
511        'sparkJobId': stats.get('sparkJobId', ''),
512        'sparkJobLocation': stats.get('sparkJobLocation', ''),
513        'kmsKeyName': stats.get('kmsKeyName', ''),
514        'gcsStagingBucket': stats.get('gcsStagingBucket', ''),
515        'loggingInfo': logging_info,
516    }
517
518  @property
519  def transferred_bytes(self) -> int:
520    bytes_str = self._query_stats.get('transferredBytes', '0')
521    return (int(bytes_str)
522            if isinstance(bytes_str, str) and bytes_str.isdigit() else 0)
523
524  @property
525  def reservation_id(self) -> str:
526    res_id = self._stats.get('reservation_id', '')
527    return res_id if isinstance(res_id, str) else ''
528
529  @property
530  def reservation_admin_project_id(self) -> Optional[str]:
531    if not self.reservation_id:
532      return None
533    try:
534      parts = self.reservation_id.split('/')
535      if parts[0] == 'projects' and len(parts) >= 2:
536        return parts[1]
537      else:
538        logging.warning(
539            'Could not parse project ID from reservation_id: %s',
540            self.reservation_id,
541        )
542        return None
543    except (IndexError, AttributeError):
544      logging.warning(
545          'Could not parse project ID from reservation_id: %s',
546          self.reservation_id,
547      )
548      return None
549
550  @property
551  def num_child_jobs(self) -> int:
552    num_str = self._stats.get('numChildJobs', '0')
553    return int(num_str) if isinstance(num_str, str) and num_str.isdigit() else 0
554
555  @property
556  def parent_job_id(self) -> str:
557    parent_id = self._stats.get('parentJobId', '')
558    return parent_id if isinstance(parent_id, str) else ''
559
560  @property
561  def row_level_security_applied(self) -> bool:
562    rls_stats = self._stats.get('RowLevelSecurityStatistics', {})
563    return (rls_stats.get('rowLevelSecurityApplied') is True if isinstance(
564        rls_stats, dict) else False)
565
566  @property
567  def data_masking_applied(self) -> bool:
568    masking_stats = self._stats.get('dataMaskingStatistics', {})
569    return (masking_stats.get('dataMaskingApplied') is True if isinstance(
570        masking_stats, dict) else False)
571
572  @property
573  def session_id(self) -> str:
574    session_info = self._stats.get('sessionInfo', {})
575    session_id_val = (session_info.get('sessionId', '') if isinstance(
576        session_info, dict) else '')
577    return session_id_val if isinstance(session_id_val, str) else ''
578
579  @property
580  def final_execution_duration_ms(self) -> int:
581    duration_str = self._stats.get('finalExecutionDurationMs', '0')
582    return (int(duration_str)
583            if isinstance(duration_str, str) and duration_str.isdigit() else 0)
584
585  @property
586  def job_state(self) -> str:
587    state = self._status.get('state', '')
588    return state if isinstance(state, str) else ''
589
590  @property
591  def job_error_result(self) -> dict[str, Optional[str]]:
592    error_result = self._status.get('errorResult')
593    if not isinstance(error_result, dict):
594      return {}
595    return {
596        'reason': error_result.get('reason'),
597        'location': error_result.get('location'),
598        'debugInfo': error_result.get('debugInfo'),
599        'message': error_result.get('message'),
600    }
601
602  @property
603  def job_errors(self) -> list[dict[str, Optional[str]]]:
604    errors_list = self._status.get('errors', [])
605    errors_iterable = []
606    if isinstance(errors_list, list):
607      for item in errors_list:
608        if isinstance(item, dict):
609          errors_iterable.append({
610              'reason': item.get('reason'),
611              'location': item.get('location'),
612              'debugInfo': item.get('debugInfo'),
613              'message': item.get('message'),
614          })
615    return errors_iterable
616
617  @property
618  def materialized_view_statistics(self) -> dict[str, Any]:
619    stats_list = self._query_stats.get('materializedViewStatistics')
620    materialized_view = []
621    if isinstance(stats_list, list):
622      for item in stats_list:
623        if isinstance(item, dict):
624          table_ref_data = item.get('tableReference')
625          table_ref_obj = None
626          if isinstance(table_ref_data, dict):
627            project_id = table_ref_data.get('projectId')
628            dataset_id = table_ref_data.get('datasetId')
629            table_id = table_ref_data.get('tableId')
630            if (isinstance(project_id, str) and project_id and
631                isinstance(dataset_id, str) and dataset_id and
632                isinstance(table_id, str) and table_id):
633              table_ref_obj = BigQueryTable(project_id, dataset_id, table_id)
634          chosen = item.get('chosen') is True
635          saved_str = item.get('estimatedBytesSaved', '0')
636          estimated_bytes_saved = (int(saved_str)
637                                   if isinstance(saved_str, str) and
638                                   saved_str.isdigit() else 0)
639          rejected_reason = str(item.get('rejectedReason', ''))
640          materialized_view.append({
641              'chosen': chosen,
642              'estimatedBytesSaved': estimated_bytes_saved,
643              'rejectedReason': rejected_reason,
644              'tableReference': table_ref_obj,
645          })
646    return {'materializedView': materialized_view}
647
648  @property
649  def metadata_cache_statistics(self) -> dict[str, Any]:
650    stats_list = self._query_stats.get('metadataCacheStatistics')
651    metadata_cache = []
652    if isinstance(stats_list, list):
653      for item in stats_list:
654        if isinstance(item, dict):
655          table_ref_data = item.get('tableReference')
656          table_ref_obj = None
657          if isinstance(table_ref_data, dict):
658            project_id = table_ref_data.get('projectId')
659            dataset_id = table_ref_data.get('datasetId')
660            table_id = table_ref_data.get('tableId')
661            if (isinstance(project_id, str) and project_id and
662                isinstance(dataset_id, str) and dataset_id and
663                isinstance(table_id, str) and table_id):
664              table_ref_obj = BigQueryTable(project_id, dataset_id, table_id)
665          metadata_cache.append({
666              'explanation': item.get('explanation', ''),
667              'unusedReason': str(item.get('unusedReason', '')),
668              'tableReference': table_ref_obj,
669          })
670    return {'tableMetadataCacheUsage': metadata_cache}
671
672  # Properties derived from _information_schema_job_metadata
673  @property
674  def information_schema_user_email(self) -> str | None:
675    if not self._information_schema_job_metadata:
676      return C_NOT_AVAILABLE
677    return self._information_schema_job_metadata.get('user_email')
678
679  @property
680  def information_schema_start_time_str(self) -> str | None:
681    if not self._information_schema_job_metadata:
682      return C_NOT_AVAILABLE
683    return self._information_schema_job_metadata.get('start_time_str')
684
685  @property
686  def information_schema_end_time_str(self) -> str | None:
687    if not self._information_schema_job_metadata:
688      return C_NOT_AVAILABLE
689    return self._information_schema_job_metadata.get('end_time_str')
690
691  @property
692  def information_schema_query(self) -> str | None:
693    if not self._information_schema_job_metadata:
694      return C_NOT_AVAILABLE
695    return self._information_schema_job_metadata.get('query')
696
697  @property
698  def information_schema_total_modified_partitions(self) -> Union[int, str]:
699    """The total number of partitions the job modified.
700
701    This field is populated for LOAD and QUERY jobs.
702    """
703    if not self._information_schema_job_metadata:
704      return C_NOT_AVAILABLE
705    try:
706      total_modified_partitions = self._information_schema_job_metadata[
707          'total_modified_partitions']
708      return total_modified_partitions
709    except KeyError:
710      return C_NOT_AVAILABLE
711
712  @property
713  def information_schema_resource_warning(self) -> str:
714    """The warning message that appears if the resource usage during query
715
716    processing is above the internal threshold of the system.
717    """
718    if not self._information_schema_job_metadata:
719      return C_NOT_AVAILABLE
720    try:
721      resource_warning = self._information_schema_job_metadata['query_info'][
722          'resource_warning']
723      return resource_warning
724    except KeyError:
725      return C_NOT_AVAILABLE
726
727  @property
728  def information_schema_normalized_literals(self) -> str:
729    """Contains the hashes of the query."""
730    try:
731      query_hashes = self._information_schema_job_metadata['query_info'][
732          'query_hashes']['normalized_literals']
733      return query_hashes
734    except KeyError:
735      return C_NOT_AVAILABLE

Represents a BigQuery Job object.

BigQueryJob( project_id: str, job_api_resource_data: dict[str, typing.Any], information_schema_job_metadata: dict[str, str])
170  def __init__(
171      self,
172      project_id: str,
173      job_api_resource_data: dict[str, Any],
174      information_schema_job_metadata: dict[str, str],
175  ):
176    super().__init__(project_id)
177    self._job_api_resource_data = job_api_resource_data
178    self._information_schema_job_metadata = (information_schema_job_metadata or
179                                             {})
project_id: str
266  @property
267  def project_id(self) -> str:
268    """Project id (not project number)."""
269    return self._project_id

Project id (not project number).

full_path: str
181  @property
182  def full_path(self) -> str:
183    # returns 'https://content-bigquery.googleapis.com/bigquery/v2/
184    # projects/<PROJECT_ID>/jobs/<JOBID>?location=<REGION>'
185    return self._job_api_resource_data.get('selfLink', '')

Returns the full path of this resource.

Example: 'projects/gcpdiag-gke-1-9b90/zones/europe-west4-a/clusters/gke1'

id: str
187  @property
188  def id(self) -> str:
189    # returns <PROJECT>:<REGION>.<JobID>
190    return self._job_api_resource_data.get('id', '')
short_path: str
192  @property
193  def short_path(self) -> str:
194    # returns <PROJECT>:<REGION>.<JobID>
195    return self.id

Returns the short name for this resource.

Note that it isn't clear from this name what kind of resource it is.

Example: 'gke1'

user_email: str
197  @property
198  def user_email(self) -> str:
199    return self._job_api_resource_data.get('user_email', '')
job_type: str
227  @property
228  def job_type(self) -> str:
229    return self._job_configuration.get('jobType', '')
query_sql: str
231  @property
232  def query_sql(self) -> str:
233    return self._query.get('query', '')
use_legacy_sql: bool
235  @property
236  def use_legacy_sql(self) -> bool:
237    return self._query.get('useLegacySql', False)
priority: str
239  @property
240  def priority(self) -> str:
241    return self._query.get('priority', '')
edition: str
243  @property
244  def edition(self) -> str:
245    edition_value = self._query.get('edition')
246    return str(edition_value) if edition_value else ''
creation_time: Optional[int]
248  @property
249  def creation_time(self) -> Optional[int]:
250    time_str = self._stats.get('creationTime')
251    return (int(time_str)
252            if isinstance(time_str, str) and time_str.isdigit() else None)
start_time: Optional[int]
254  @property
255  def start_time(self) -> Optional[int]:
256    time_str = self._stats.get('startTime')
257    return (int(time_str)
258            if isinstance(time_str, str) and time_str.isdigit() else None)
end_time: Optional[int]
260  @property
261  def end_time(self) -> Optional[int]:
262    time_str = self._stats.get('endTime')
263    return (int(time_str)
264            if isinstance(time_str, str) and time_str.isdigit() else None)
total_bytes_processed: int
266  @property
267  def total_bytes_processed(self) -> int:
268    bytes_str = self._stats.get('totalBytesProcessed', '0')
269    return (int(bytes_str)
270            if isinstance(bytes_str, str) and bytes_str.isdigit() else 0)
total_bytes_billed: int
272  @property
273  def total_bytes_billed(self) -> int:
274    bytes_str = self._query_stats.get('totalBytesBilled', '0')
275    return (int(bytes_str)
276            if isinstance(bytes_str, str) and bytes_str.isdigit() else 0)
total_slot_ms: int
278  @property
279  def total_slot_ms(self) -> int:
280    ms_str = self._stats.get('totalSlotMs', '0')
281    return int(ms_str) if isinstance(ms_str, str) and ms_str.isdigit() else 0
cache_hit: bool
283  @property
284  def cache_hit(self) -> bool:
285    return self._query_stats.get('cacheHit') is True
quota_deferments: list[str]
287  @property
288  def quota_deferments(self) -> list[str]:
289    deferments_dict = self._stats.get('quotaDeferments', {})
290    if isinstance(deferments_dict, dict):
291      deferment_list = deferments_dict.get('', [])
292      if isinstance(deferment_list, list) and all(
293          isinstance(s, str) for s in deferment_list):
294        return deferment_list
295    return []
query_plan: list[dict[str, typing.Any]]
297  @property
298  def query_plan(self) -> list[dict[str, Any]]:
299    plan = self._query_stats.get('queryPlan', [])
300    return plan if isinstance(plan, list) else []
total_partitions_processed: int
302  @property
303  def total_partitions_processed(self) -> int:
304    partitions_str = self._query_stats.get('totalPartitionsProcessed', '0')
305    return (int(partitions_str) if isinstance(partitions_str, str) and
306            partitions_str.isdigit() else 0)
referenced_tables: list[BigQueryTable]
308  @property
309  def referenced_tables(self) -> list[BigQueryTable]:
310    tables_list = self._query_stats.get('referencedTables', [])
311    referenced_tables = []
312    if isinstance(tables_list, list):
313      for item in tables_list:
314        if isinstance(item, dict):
315          project_id = item.get('projectId')
316          dataset_id = item.get('datasetId')
317          table_id = item.get('tableId')
318          if (isinstance(project_id, str) and project_id and
319              isinstance(dataset_id, str) and dataset_id and
320              isinstance(table_id, str) and table_id):
321            referenced_tables.append(
322                BigQueryTable(project_id, dataset_id, table_id))
323    return referenced_tables
referenced_routines: list[BigQueryRoutine]
325  @property
326  def referenced_routines(self) -> list[BigQueryRoutine]:
327    routines_list = self._query_stats.get('referencedRoutines', [])
328    referenced_routines = []
329    if isinstance(routines_list, list):
330      for item in routines_list:
331        if isinstance(item, dict):
332          project_id = item.get('projectId')
333          dataset_id = item.get('datasetId')
334          routine_id = item.get('routineId')
335          if (isinstance(project_id, str) and project_id and
336              isinstance(dataset_id, str) and dataset_id and
337              isinstance(routine_id, str) and routine_id):
338            referenced_routines.append(
339                BigQueryRoutine(project_id, dataset_id, routine_id))
340    return referenced_routines
num_affected_dml_rows: int
342  @property
343  def num_affected_dml_rows(self) -> int:
344    rows_str = self._query_stats.get('numDmlAffectedRows', '0')
345    return (int(rows_str)
346            if isinstance(rows_str, str) and rows_str.isdigit() else 0)
dml_stats: dict[str, int]
348  @property
349  def dml_stats(self) -> dict[str, int]:
350    stats = self._query_stats.get('dmlStats')
351    if not isinstance(stats, dict):
352      return {}
353    inserted_str = stats.get('insertedRowCount', '0')
354    deleted_str = stats.get('deletedRowCount', '0')
355    updated_str = stats.get('updatedRowCount', '0')
356    return {
357        'insertedRowCount':
358            (int(inserted_str) if isinstance(inserted_str, str) and
359             inserted_str.isdigit() else 0),
360        'deletedRowCount': (int(deleted_str) if isinstance(deleted_str, str) and
361                            deleted_str.isdigit() else 0),
362        'updatedRowCount': (int(updated_str) if isinstance(updated_str, str) and
363                            updated_str.isdigit() else 0),
364    }
statement_type: str
366  @property
367  def statement_type(self) -> str:
368    stype = self._query_stats.get('statementType', '')
369    return stype if isinstance(stype, str) else ''
bi_engine_statistics: dict[str, typing.Any]
371  @property
372  def bi_engine_statistics(self) -> dict[str, Any]:
373    stats = self._query_stats.get('biEngineStatistics')
374    if not isinstance(stats, dict):
375      return {}
376    reasons_list = stats.get('accelerationMode', {}).get('biEngineReasons', [])
377    bi_engine_reasons = []
378    if isinstance(reasons_list, list):
379      for item in reasons_list:
380        if isinstance(item, dict):
381          bi_engine_reasons.append({
382              'code': str(item.get('code', '')),
383              'message': item.get('message', ''),
384          })
385    return {
386        'biEngineMode': str(stats.get('biEngineMode', '')),
387        'accelerationMode': str(stats.get('accelerationMode', '')),
388        'biEngineReasons': bi_engine_reasons,
389    }
vector_search_statistics: dict[str, typing.Any]
391  @property
392  def vector_search_statistics(self) -> dict[str, Any]:
393    stats = self._query_stats.get('vectorSearchStatistics')
394    if not isinstance(stats, dict):
395      return {}
396    reasons_list = stats.get('indexUnusedReasons', [])
397    index_unused_reasons = []
398    if isinstance(reasons_list, list):
399      for item in reasons_list:
400        if isinstance(item, dict):
401          base_table_data = item.get('baseTable')
402          base_table_obj = None
403          if isinstance(base_table_data, dict):
404            project_id = base_table_data.get('projectId')
405            dataset_id = base_table_data.get('datasetId')
406            table_id = base_table_data.get('tableId')
407            if (isinstance(project_id, str) and project_id and
408                isinstance(dataset_id, str) and dataset_id and
409                isinstance(table_id, str) and table_id):
410              base_table_obj = BigQueryTable(project_id, dataset_id, table_id)
411          index_unused_reasons.append({
412              'code': str(item.get('code', '')),
413              'message': item.get('message', ''),
414              'indexName': item.get('indexName', ''),
415              'baseTable': base_table_obj,
416          })
417    return {
418        'indexUsageMode': str(stats.get('indexUsageMode', '')),
419        'indexUnusedReasons': index_unused_reasons,
420    }
performance_insights: dict[str, typing.Any]
422  @property
423  def performance_insights(self) -> dict[str, Any]:
424    insights = self._query_stats.get('performanceInsights')
425    if not isinstance(insights, dict):
426      return {}
427    standalone_list = insights.get('stagePerformanceStandaloneInsights', [])
428    stage_performance_standalone_insights = []
429    if isinstance(standalone_list, list):
430      for item in standalone_list:
431        if isinstance(item, dict):
432          stage_performance_standalone_insights.append({
433              'stageId': item.get('stageId', ''),
434          })
435    change_list = insights.get('stagePerformanceChangeInsights', [])
436    stage_performance_change_insights = []
437    if isinstance(change_list, list):
438      for item in change_list:
439        if isinstance(item, dict):
440          stage_performance_change_insights.append({
441              'stageId': item.get('stageId', ''),
442          })
443    avg_ms_str = insights.get('avgPreviousExecutionMs', '0')
444    return {
445        'avgPreviousExecutionMs':
446            (int(avg_ms_str)
447             if isinstance(avg_ms_str, str) and avg_ms_str.isdigit() else 0),
448        'stagePerformanceStandaloneInsights':
449            (stage_performance_standalone_insights),
450        'stagePerformanceChangeInsights': stage_performance_change_insights,
451    }
optimization_details: Any
453  @property
454  def optimization_details(self) -> Any:
455    return self._query_info.get('optimizationDetails')
export_data_statistics: dict[str, int]
457  @property
458  def export_data_statistics(self) -> dict[str, int]:
459    stats = self._query_stats.get('exportDataStatistics')
460    if not isinstance(stats, dict):
461      return {}
462    file_count_str = stats.get('fileCount', '0')
463    row_count_str = stats.get('rowCount', '0')
464    return {
465        'fileCount': (int(file_count_str) if isinstance(file_count_str, str) and
466                      file_count_str.isdigit() else 0),
467        'rowCount': (int(row_count_str) if isinstance(row_count_str, str) and
468                     row_count_str.isdigit() else 0),
469    }
load_query_statistics: dict[str, int]
471  @property
472  def load_query_statistics(self) -> dict[str, int]:
473    stats = self._query_stats.get('loadQueryStatistics')
474    if not isinstance(stats, dict):
475      return {}
476    input_files_str = stats.get('inputFiles', '0')
477    input_bytes_str = stats.get('inputFileBytes', '0')
478    output_rows_str = stats.get('outputRows', '0')
479    output_bytes_str = stats.get('outputBytes', '0')
480    bad_records_str = stats.get('badRecords', '0')
481    return {
482        'inputFiles':
483            (int(input_files_str) if isinstance(input_files_str, str) and
484             input_files_str.isdigit() else 0),
485        'inputFileBytes':
486            (int(input_bytes_str) if isinstance(input_bytes_str, str) and
487             input_bytes_str.isdigit() else 0),
488        'outputRows':
489            (int(output_rows_str) if isinstance(output_rows_str, str) and
490             output_rows_str.isdigit() else 0),
491        'outputBytes':
492            (int(output_bytes_str) if isinstance(output_bytes_str, str) and
493             output_bytes_str.isdigit() else 0),
494        'badRecords':
495            (int(bad_records_str) if isinstance(bad_records_str, str) and
496             bad_records_str.isdigit() else 0),
497    }
spark_statistics: dict[str, typing.Any]
499  @property
500  def spark_statistics(self) -> dict[str, Any]:
501    stats = self._query_stats.get('sparkStatistics')
502    if not isinstance(stats, dict):
503      return {}
504    logging_info_dict = stats.get('loggingInfo', {})
505    logging_info = ({
506        'resourceType': logging_info_dict.get('resourceType', ''),
507        'projectId': logging_info_dict.get('projectId', ''),
508    } if isinstance(logging_info_dict, dict) else {})
509    return {
510        'endpoints': stats.get('endpoints', {}),
511        'sparkJobId': stats.get('sparkJobId', ''),
512        'sparkJobLocation': stats.get('sparkJobLocation', ''),
513        'kmsKeyName': stats.get('kmsKeyName', ''),
514        'gcsStagingBucket': stats.get('gcsStagingBucket', ''),
515        'loggingInfo': logging_info,
516    }
transferred_bytes: int
518  @property
519  def transferred_bytes(self) -> int:
520    bytes_str = self._query_stats.get('transferredBytes', '0')
521    return (int(bytes_str)
522            if isinstance(bytes_str, str) and bytes_str.isdigit() else 0)
reservation_id: str
524  @property
525  def reservation_id(self) -> str:
526    res_id = self._stats.get('reservation_id', '')
527    return res_id if isinstance(res_id, str) else ''
reservation_admin_project_id: Optional[str]
529  @property
530  def reservation_admin_project_id(self) -> Optional[str]:
531    if not self.reservation_id:
532      return None
533    try:
534      parts = self.reservation_id.split('/')
535      if parts[0] == 'projects' and len(parts) >= 2:
536        return parts[1]
537      else:
538        logging.warning(
539            'Could not parse project ID from reservation_id: %s',
540            self.reservation_id,
541        )
542        return None
543    except (IndexError, AttributeError):
544      logging.warning(
545          'Could not parse project ID from reservation_id: %s',
546          self.reservation_id,
547      )
548      return None
num_child_jobs: int
550  @property
551  def num_child_jobs(self) -> int:
552    num_str = self._stats.get('numChildJobs', '0')
553    return int(num_str) if isinstance(num_str, str) and num_str.isdigit() else 0
parent_job_id: str
555  @property
556  def parent_job_id(self) -> str:
557    parent_id = self._stats.get('parentJobId', '')
558    return parent_id if isinstance(parent_id, str) else ''
row_level_security_applied: bool
560  @property
561  def row_level_security_applied(self) -> bool:
562    rls_stats = self._stats.get('RowLevelSecurityStatistics', {})
563    return (rls_stats.get('rowLevelSecurityApplied') is True if isinstance(
564        rls_stats, dict) else False)
data_masking_applied: bool
566  @property
567  def data_masking_applied(self) -> bool:
568    masking_stats = self._stats.get('dataMaskingStatistics', {})
569    return (masking_stats.get('dataMaskingApplied') is True if isinstance(
570        masking_stats, dict) else False)
session_id: str
572  @property
573  def session_id(self) -> str:
574    session_info = self._stats.get('sessionInfo', {})
575    session_id_val = (session_info.get('sessionId', '') if isinstance(
576        session_info, dict) else '')
577    return session_id_val if isinstance(session_id_val, str) else ''
final_execution_duration_ms: int
579  @property
580  def final_execution_duration_ms(self) -> int:
581    duration_str = self._stats.get('finalExecutionDurationMs', '0')
582    return (int(duration_str)
583            if isinstance(duration_str, str) and duration_str.isdigit() else 0)
job_state: str
585  @property
586  def job_state(self) -> str:
587    state = self._status.get('state', '')
588    return state if isinstance(state, str) else ''
job_error_result: dict[str, typing.Optional[str]]
590  @property
591  def job_error_result(self) -> dict[str, Optional[str]]:
592    error_result = self._status.get('errorResult')
593    if not isinstance(error_result, dict):
594      return {}
595    return {
596        'reason': error_result.get('reason'),
597        'location': error_result.get('location'),
598        'debugInfo': error_result.get('debugInfo'),
599        'message': error_result.get('message'),
600    }
job_errors: list[dict[str, typing.Optional[str]]]
602  @property
603  def job_errors(self) -> list[dict[str, Optional[str]]]:
604    errors_list = self._status.get('errors', [])
605    errors_iterable = []
606    if isinstance(errors_list, list):
607      for item in errors_list:
608        if isinstance(item, dict):
609          errors_iterable.append({
610              'reason': item.get('reason'),
611              'location': item.get('location'),
612              'debugInfo': item.get('debugInfo'),
613              'message': item.get('message'),
614          })
615    return errors_iterable
materialized_view_statistics: dict[str, typing.Any]
617  @property
618  def materialized_view_statistics(self) -> dict[str, Any]:
619    stats_list = self._query_stats.get('materializedViewStatistics')
620    materialized_view = []
621    if isinstance(stats_list, list):
622      for item in stats_list:
623        if isinstance(item, dict):
624          table_ref_data = item.get('tableReference')
625          table_ref_obj = None
626          if isinstance(table_ref_data, dict):
627            project_id = table_ref_data.get('projectId')
628            dataset_id = table_ref_data.get('datasetId')
629            table_id = table_ref_data.get('tableId')
630            if (isinstance(project_id, str) and project_id and
631                isinstance(dataset_id, str) and dataset_id and
632                isinstance(table_id, str) and table_id):
633              table_ref_obj = BigQueryTable(project_id, dataset_id, table_id)
634          chosen = item.get('chosen') is True
635          saved_str = item.get('estimatedBytesSaved', '0')
636          estimated_bytes_saved = (int(saved_str)
637                                   if isinstance(saved_str, str) and
638                                   saved_str.isdigit() else 0)
639          rejected_reason = str(item.get('rejectedReason', ''))
640          materialized_view.append({
641              'chosen': chosen,
642              'estimatedBytesSaved': estimated_bytes_saved,
643              'rejectedReason': rejected_reason,
644              'tableReference': table_ref_obj,
645          })
646    return {'materializedView': materialized_view}
metadata_cache_statistics: dict[str, typing.Any]
648  @property
649  def metadata_cache_statistics(self) -> dict[str, Any]:
650    stats_list = self._query_stats.get('metadataCacheStatistics')
651    metadata_cache = []
652    if isinstance(stats_list, list):
653      for item in stats_list:
654        if isinstance(item, dict):
655          table_ref_data = item.get('tableReference')
656          table_ref_obj = None
657          if isinstance(table_ref_data, dict):
658            project_id = table_ref_data.get('projectId')
659            dataset_id = table_ref_data.get('datasetId')
660            table_id = table_ref_data.get('tableId')
661            if (isinstance(project_id, str) and project_id and
662                isinstance(dataset_id, str) and dataset_id and
663                isinstance(table_id, str) and table_id):
664              table_ref_obj = BigQueryTable(project_id, dataset_id, table_id)
665          metadata_cache.append({
666              'explanation': item.get('explanation', ''),
667              'unusedReason': str(item.get('unusedReason', '')),
668              'tableReference': table_ref_obj,
669          })
670    return {'tableMetadataCacheUsage': metadata_cache}
information_schema_user_email: str | None
673  @property
674  def information_schema_user_email(self) -> str | None:
675    if not self._information_schema_job_metadata:
676      return C_NOT_AVAILABLE
677    return self._information_schema_job_metadata.get('user_email')
information_schema_start_time_str: str | None
679  @property
680  def information_schema_start_time_str(self) -> str | None:
681    if not self._information_schema_job_metadata:
682      return C_NOT_AVAILABLE
683    return self._information_schema_job_metadata.get('start_time_str')
information_schema_end_time_str: str | None
685  @property
686  def information_schema_end_time_str(self) -> str | None:
687    if not self._information_schema_job_metadata:
688      return C_NOT_AVAILABLE
689    return self._information_schema_job_metadata.get('end_time_str')
information_schema_query: str | None
691  @property
692  def information_schema_query(self) -> str | None:
693    if not self._information_schema_job_metadata:
694      return C_NOT_AVAILABLE
695    return self._information_schema_job_metadata.get('query')
information_schema_total_modified_partitions: Union[int, str]
697  @property
698  def information_schema_total_modified_partitions(self) -> Union[int, str]:
699    """The total number of partitions the job modified.
700
701    This field is populated for LOAD and QUERY jobs.
702    """
703    if not self._information_schema_job_metadata:
704      return C_NOT_AVAILABLE
705    try:
706      total_modified_partitions = self._information_schema_job_metadata[
707          'total_modified_partitions']
708      return total_modified_partitions
709    except KeyError:
710      return C_NOT_AVAILABLE

The total number of partitions the job modified.

This field is populated for LOAD and QUERY jobs.

information_schema_resource_warning: str
712  @property
713  def information_schema_resource_warning(self) -> str:
714    """The warning message that appears if the resource usage during query
715
716    processing is above the internal threshold of the system.
717    """
718    if not self._information_schema_job_metadata:
719      return C_NOT_AVAILABLE
720    try:
721      resource_warning = self._information_schema_job_metadata['query_info'][
722          'resource_warning']
723      return resource_warning
724    except KeyError:
725      return C_NOT_AVAILABLE

The warning message that appears if the resource usage during query

processing is above the internal threshold of the system.

information_schema_normalized_literals: str
727  @property
728  def information_schema_normalized_literals(self) -> str:
729    """Contains the hashes of the query."""
730    try:
731      query_hashes = self._information_schema_job_metadata['query_info'][
732          'query_hashes']['normalized_literals']
733      return query_hashes
734    except KeyError:
735      return C_NOT_AVAILABLE

Contains the hashes of the query.

@caching.cached_api_call
def get_bigquery_job_api_resource_data(project_id: str, region: str, job_id: str) -> Optional[dict[str, Any]]:
738@caching.cached_api_call
739def get_bigquery_job_api_resource_data(
740    project_id: str,
741    region: str,
742    job_id: str,
743) -> Union[dict[str, Any], None]:
744  """Fetch a specific BigQuery job's raw API resource data."""
745  api = apis.get_api('bigquery', 'v2', project_id)
746  query_job = api.jobs().get(projectId=project_id,
747                             location=region,
748                             jobId=job_id)
749
750  try:
751    resp = query_job.execute(num_retries=config.API_RETRIES)
752    return resp
753  except errors.HttpError as err:
754    raise utils.GcpApiError(err) from err

Fetch a specific BigQuery job's raw API resource data.

@caching.cached_api_call
def get_information_schema_job_metadata( context: gcpdiag.models.Context, project_id: str, region: str, job_id: str, creation_time_milis: Optional[int] = None, skip_permission_check: bool = False) -> Optional[dict[str, Any]]:
757@caching.cached_api_call
758def get_information_schema_job_metadata(
759    context: models.Context,
760    project_id: str,
761    region: str,
762    job_id: str,
763    creation_time_milis: Optional[int] = None,
764    skip_permission_check: bool = False,
765) -> Optional[dict[str, Any]]:
766  """Fetch metadata about a BigQuery job from the INFORMATION_SCHEMA."""
767  if not apis.is_enabled(project_id, 'bigquery'):
768    return None
769  user_email = ''
770  try:
771    user_email = apis.get_user_email()
772  except (RuntimeError, exceptions.DefaultCredentialsError):
773    pass
774  except AttributeError as err:
775    if (('has no attribute' in str(err)) and
776        ('with_quota_project' in str(err))):
777      op.info('Running the investigation within the GCA context.')
778  user = 'user:' + user_email
779  if not skip_permission_check:
780    try:
781      policy = iam.get_project_policy(context)
782      if (not policy.has_permission(user, 'bigquery.jobs.create')) or (
783          not policy.has_permission(user, 'bigquery.jobs.listAll')):
784        op.info(
785            f'WARNING: Unable to run INFORMATION_SCHEMA view analysis due to missing permissions.\
786            \nMake sure to grant {user_email} "bigquery.jobs.create" and "bigquery.jobs.listAll".\
787            \nContinuing the investigation with the BigQuery job metadata obtained from the API.'
788        )
789        return None
790    except utils.GcpApiError:
791      op.info(
792          'Attempting to query INFORMATION_SCHEMA with no knowledge of project'
793          ' level permissions        \n(due to missing'
794          ' resourcemanager.projects.get permission).')
795  else:
796    op.info(
797        'Attempting to query INFORMATION_SCHEMA without checking project level permissions.'
798    )
799  try:
800    creation_time_milis_filter = ' '
801    if creation_time_milis:
802      creation_time_milis_filter = (
803          f'AND creation_time = TIMESTAMP_MILLIS({creation_time_milis})')
804    query = f"""
805    SELECT
806        user_email, start_time, end_time, query
807      FROM
808        `{project_id}`.`region-{region}`.INFORMATION_SCHEMA.JOBS
809      WHERE
810        job_id = '{job_id}'
811        {creation_time_milis_filter}
812      LIMIT 1
813    """
814    results = get_query_results(
815        project_id=project_id,
816        query=query,
817        location=region,
818        timeout_sec=30,
819        poll_interval_sec=2,  # Short poll interval
820    )
821    if not results or len(results) != 1:
822      # We cannot raise an exception otherwise tests that use get_bigquery_job would fail
823      # raise ValueError(f"Job {job_id} not found in INFORMATION_SCHEMA")
824      return None
825    return results[0]
826  except errors.HttpError as err:
827    logging.warning(
828        'Failed to retrieve INFORMATION_SCHEMA job metadata for job %s: %s',
829        job_id,
830        err,
831    )
832    return None
833  except KeyError as err:
834    logging.warning(
835        'Failed to parse INFORMATION_SCHEMA response for job %s: %s',
836        job_id,
837        err,
838    )
839    return None

Fetch metadata about a BigQuery job from the INFORMATION_SCHEMA.

def get_bigquery_job( context: gcpdiag.models.Context, region: str, job_id: str, skip_permission_check: bool = False) -> Optional[BigQueryJob]:
842def get_bigquery_job(
843    context: models.Context,
844    region: str,
845    job_id: str,
846    skip_permission_check: bool = False) -> Union[BigQueryJob, None]:
847  """Fetch a BigQuery job, combining API and INFORMATION_SCHEMA data."""
848  project_id = context.project_id
849  if not project_id:
850    return None
851  try:
852    job_api_resource_data = get_bigquery_job_api_resource_data(
853        project_id, region, job_id)
854    if not job_api_resource_data:
855      return None
856  except utils.GcpApiError as err:
857    # This will be returned when permissions to fetch a job are missing.
858    if 'permission' in err.message.lower():
859      user_email = ''
860      try:
861        user_email = apis.get_user_email()
862      except (RuntimeError, AttributeError,
863              exceptions.DefaultCredentialsError) as error:
864        if (('has no attribute' in str(error)) and
865            ('with_quota_project' in str(error))):
866          op.info('Running the investigation within the GCA context.')
867      logging.debug(('Could not retrieve BigQuery job %s.\
868          \n make sure to give the bigquery.jobs.get and bigquery.jobs.create permissions to %s',
869                     (project_id + ':' + region + '.' + job_id), user_email))
870      raise utils.GcpApiError(err)
871    # This will be returned when a job is not found.
872    elif 'not found' in err.message.lower():
873      job_id_string = project_id + ':' + region + '.' + job_id
874      logging.debug('Could not find BigQuery job %s', job_id_string)
875      return None
876    else:
877      logging.debug((
878          'Could not retrieve BigQuery job %s due to an issue calling the API. \
879            Please restart the investigation.',
880          (project_id + ':' + region + '.' + job_id)))
881      return None
882  information_schema_job_metadata = {}
883  job_creation_millis = None
884  creation_time_str = job_api_resource_data.get('statistics',
885                                                {}).get('creationTime')
886  if creation_time_str:
887    try:
888      job_creation_millis = int(creation_time_str)
889    except (ValueError, TypeError):
890      pass
891  information_schema_job_metadata = get_information_schema_job_metadata(
892      context, project_id, region, job_id, job_creation_millis,
893      skip_permission_check)
894  return BigQueryJob(
895      project_id=project_id,
896      job_api_resource_data=job_api_resource_data,
897      information_schema_job_metadata=information_schema_job_metadata)

Fetch a BigQuery job, combining API and INFORMATION_SCHEMA data.

def get_query_results( project_id: str, query: str, location: Optional[str] = None, timeout_sec: int = 30, poll_interval_sec: int = 2) -> Optional[List[dict[str, Any]]]:
 937def get_query_results(
 938    project_id: str,
 939    query: str,
 940    location: Optional[str] = None,
 941    timeout_sec: int = 30,
 942    poll_interval_sec: int = 2,
 943) -> Optional[List[dict[str, Any]]]:
 944  """Executes a BigQuery query, waits for completion, and returns the results.
 945
 946  Args:
 947      project_id: The GCP project ID where the query should run.
 948      query: The SQL query string to execute.
 949      location: The location (e.g., 'US', 'EU', 'us-central1') where the job
 950        should run. If None, BigQuery defaults might apply, often based on
 951        dataset locations if referenced.
 952      timeout_sec: Maximum time in seconds to wait for the query job to
 953        complete.
 954      poll_interval_sec: Time in seconds between polling the job status.
 955
 956  Returns:
 957      A list of dictionaries representing the result rows, or None if the
 958      query fails, times out, or the API is disabled.
 959  Raises:
 960      utils.GcpApiError: If an unrecoverable API error occurs during job
 961                         insertion, status check, or result fetching.
 962  """
 963  if not apis.is_enabled(project_id, 'bigquery'):
 964    logging.warning('BigQuery API is not enabled in project %s.', project_id)
 965    return None
 966  api = apis.get_api('bigquery', 'v2', project_id)
 967  job_id = f'gcpdiag_query_{uuid.uuid4()}'
 968  job_body = {
 969      'jobReference': {
 970          'projectId': project_id,
 971          'jobId': job_id,
 972          'location': location,  # Location can be None
 973      },
 974      'configuration': {
 975          'query': {
 976              'query': query,
 977              'useLegacySql': False,
 978              # Consider adding priority, destinationTable, etc. if needed
 979          }
 980      },
 981  }
 982  try:
 983    logging.debug(
 984        'Starting BigQuery job %s in project %s, location %s',
 985        job_id,
 986        project_id,
 987        location or 'default',
 988    )
 989    insert_request = api.jobs().insert(projectId=project_id, body=job_body)
 990    insert_response = insert_request.execute(num_retries=config.API_RETRIES)
 991    job_ref = insert_response['jobReference']
 992    actual_job_id = job_ref['jobId']
 993    actual_location = job_ref.get('location')  # Get location assigned by BQ
 994    logging.debug('Job %s created. Polling for completion...', actual_job_id)
 995    start_time = time.time()
 996    while True:
 997      # Check for timeout
 998      if time.time() - start_time > timeout_sec:
 999        logging.error(
1000            'BigQuery job %s timed out after %d seconds.',
1001            actual_job_id,
1002            timeout_sec,
1003        )
1004        return None
1005      # Get job status
1006      logging.debug('>>> Getting job status for %s', actual_job_id)
1007      get_request = api.jobs().get(
1008          projectId=job_ref['projectId'],
1009          jobId=actual_job_id,
1010          location=actual_location,
1011      )
1012      job_status_response = get_request.execute(num_retries=config.API_RETRIES)
1013      status = job_status_response.get('status', {})
1014      logging.debug('>>> Job status: %s', status.get('state'))
1015      if status.get('state') == 'DONE':
1016        if status.get('errorResult'):
1017          error_info = status['errorResult']
1018          if 'User does not have permission to query table' in error_info.get(
1019              'message'):
1020            op.info(
1021                error_info.get('message')[15:] +
1022                '\nContinuing the investigation with the job metadata obtained from the API.'
1023            )
1024          else:
1025            error_info = status['errorResult']
1026            logging.error(
1027                'BigQuery job %s failed. Reason: %s, Message: %s',
1028                actual_job_id,
1029                error_info.get('reason'),
1030                error_info.get('message'),
1031            )
1032            # Log detailed errors if available
1033            for error in status.get('errors', []):
1034              logging.error(
1035                  '  - Detail: %s (Location: %s)',
1036                  error.get('message'),
1037                  error.get('location'),
1038              )
1039          return None
1040        else:
1041          logging.debug('BigQuery job %s completed successfully.',
1042                        actual_job_id)
1043          break  # Job finished successfully
1044      elif status.get('state') in ['PENDING', 'RUNNING']:
1045        logging.debug('>>> Job running, sleeping...')
1046        # Job still running, wait and poll again
1047        time.sleep(poll_interval_sec)
1048      else:
1049        # Unexpected state
1050        logging.error(
1051            'BigQuery job %s entered unexpected state: %s',
1052            actual_job_id,
1053            status.get('state', 'UNKNOWN'),
1054        )
1055        return None
1056    # Fetch results
1057    logging.debug('>>> Fetching results for job %s...',
1058                  actual_job_id)  # <-- ADD
1059    results_request = api.jobs().getQueryResults(
1060        projectId=job_ref['projectId'],
1061        jobId=actual_job_id,
1062        location=actual_location,
1063        # Add startIndex, maxResults for pagination if needed
1064    )
1065    results_response = results_request.execute(num_retries=config.API_RETRIES)
1066    # Check if job actually completed (getQueryResults might return before DONE sometimes)
1067    if not results_response.get('jobComplete', False):
1068      logging.warning(
1069          'getQueryResults returned jobComplete=False for job %s, results might'
1070          ' be incomplete.',
1071          actual_job_id,
1072      )
1073      # Decide if you want to wait longer or return potentially partial results
1074    rows = []
1075    if 'rows' in results_response and 'schema' in results_response:
1076      schema_fields = results_response['schema'].get('fields')
1077      if not schema_fields:
1078        return []
1079      for row_data in results_response['rows']:
1080        if 'f' in row_data:
1081          rows.append(_parse_row(schema_fields, row_data['f']))
1082    if results_response.get('pageToken'):
1083      logging.warning(
1084          'Query results for job %s are paginated, but pagination '
1085          'is not yet implemented.',
1086          actual_job_id,
1087      )
1088    return rows
1089  except errors.HttpError as err:
1090    logging.error('API error during BigQuery query execution for job %s: %s',
1091                  job_id, err)
1092    # Raise specific GcpApiError if needed for upstream handling
1093    raise utils.GcpApiError(err) from err
1094  except Exception as e:
1095    logging.exception(
1096        'Unexpected error during BigQuery query execution for job %s: %s',
1097        job_id,
1098        e,
1099    )
1100    # Re-raise or handle as appropriate
1101    raise

Executes a BigQuery query, waits for completion, and returns the results.

Arguments:
  • project_id: The GCP project ID where the query should run.
  • query: The SQL query string to execute.
  • location: The location (e.g., 'US', 'EU', 'us-central1') where the job should run. If None, BigQuery defaults might apply, often based on dataset locations if referenced.
  • timeout_sec: Maximum time in seconds to wait for the query job to complete.
  • poll_interval_sec: Time in seconds between polling the job status.
Returns:

A list of dictionaries representing the result rows, or None if the query fails, times out, or the API is disabled.

Raises:
  • utils.GcpApiError: If an unrecoverable API error occurs during job insertion, status check, or result fetching.
@caching.cached_api_call
def get_bigquery_project(project_id: str) -> gcpdiag.queries.crm.Project:
1104@caching.cached_api_call
1105def get_bigquery_project(project_id: str) -> crm.Project:
1106  """Attempts to retrieve project details for the supplied BigQuery project id or number.
1107
1108    If the project is found/accessible, it returns a Project object with the resource data.
1109    If the project cannot be retrieved, the application raises one of the exceptions below.
1110    The get_bigquery_project method avoids unnecessary printing of the error message to keep
1111    the user interface of the tool cleaner to focus on meaningful investigation results.
1112    Corresponding errors are handled gracefully downstream.
1113
1114    Args:
1115        project_id (str): The project id or number of
1116        the project (e.g., "123456789", "example-project").
1117
1118    Returns:
1119        Project: An object representing the BigQuery project's full details.
1120
1121    Raises:
1122        utils.GcpApiError: If there is an issue calling the GCP/HTTP Error API.
1123
1124    Usage:
1125        When using project identifier from gcpdiag.models.Context
1126
1127        project = crm.get_project(context.project_id)
1128
1129        An unknown project identifier
1130        try:
1131          project = crm.get_project("123456789")
1132        except:
1133          # Handle exception
1134        else:
1135          # use project data
1136  """
1137  try:
1138    logging.debug('retrieving project %s ', project_id)
1139    crm_api = apis.get_api('cloudresourcemanager', 'v3', project_id)
1140    request = crm_api.projects().get(name=f'projects/{project_id}')
1141    response = request.execute(num_retries=config.API_RETRIES)
1142  except errors.HttpError as e:
1143    error = utils.GcpApiError(response=e)
1144    raise error from e
1145  else:
1146    return crm.Project(resource_data=response)

Attempts to retrieve project details for the supplied BigQuery project id or number.

If the project is found/accessible, it returns a Project object with the resource data. If the project cannot be retrieved, the application raises one of the exceptions below. The get_bigquery_project method avoids unnecessary printing of the error message to keep the user interface of the tool cleaner to focus on meaningful investigation results. Corresponding errors are handled gracefully downstream.

Arguments:
  • project_id (str): The project id or number of
  • the project (e.g., "123456789", "example-project").
Returns:

Project: An object representing the BigQuery project's full details.

Raises:
  • utils.GcpApiError: If there is an issue calling the GCP/HTTP Error API.
Usage:

When using project identifier from gcpdiag.models.Context

project = crm.get_project(context.project_id)

An unknown project identifier try: project = crm.get_project("123456789") except: # Handle exception else: # use project data