gcpdiag.queries.bigquery
66def get_project_policy(context: models.Context): 67 """Fetches the IAM policy object for a project.""" 68 root_logger = logging.getLogger() 69 original_level = root_logger.level 70 71 try: 72 root_logger.setLevel(logging.ERROR) 73 policy = iam.get_project_policy(context, raise_error_if_fails=False) 74 return policy 75 except utils.GcpApiError: 76 return None 77 finally: 78 root_logger.setLevel(original_level)
Fetches the IAM policy object for a project.
81def get_organization_policy(context: models.Context, organization_id: str): 82 """Fetches the IAM policy object for an organization.""" 83 root_logger = logging.getLogger() 84 original_level = root_logger.level 85 86 try: 87 root_logger.setLevel(logging.ERROR) 88 policy = iam.get_organization_policy(context, 89 organization_id, 90 raise_error_if_fails=False) 91 return policy 92 except utils.GcpApiError as err: 93 if 'doesn\'t have access to' in err.message.lower( 94 ) or 'denied on resource' in err.message.lower(): 95 op.info( 96 'User does not have access to the organization policy. Investigation' 97 ' completeness and accuracy might depend on the presence of' 98 ' organization level permissions.') 99 return None 100 finally: 101 root_logger.setLevel(original_level)
Fetches the IAM policy object for an organization.
104def check_permissions_for_principal( 105 policy: PolicyObject, principal: str, 106 permissions_to_check: Set[str]) -> Dict[str, bool]: 107 """Uses a policy object to check a set of permissions for a principal. 108 109 Returns a dictionary mapping each permission to a boolean indicating its 110 presence. 111 """ 112 return { 113 permission: policy.has_permission(principal, permission) 114 for permission in permissions_to_check 115 }
Uses a policy object to check a set of permissions for a principal.
Returns a dictionary mapping each permission to a boolean indicating its presence.
118def get_missing_permissions(required_permissions: Set[str], 119 actual_permissions: Dict[str, bool]) -> Set[str]: 120 """Compares a set of required permissions against a dictionary of actual 121 122 permissions and returns the set of missing ones. 123 """ 124 return { 125 perm for perm in required_permissions if not actual_permissions.get(perm) 126 }
Compares a set of required permissions against a dictionary of actual
permissions and returns the set of missing ones.
129class BigQueryTable: 130 """Represents a BigQuery Table object.""" 131 132 project_id: str 133 dataset_id: str 134 table_id: str 135 136 def __init__(self, project_id: str, dataset_id: str, table_id: str): 137 self.project_id = project_id 138 self.dataset_id = dataset_id 139 self.table_id = table_id 140 141 @property 142 def table_identifier(self) -> str: 143 return f'{self.project_id}:{self.dataset_id}.{self.table_id}'
Represents a BigQuery Table object.
146class BigQueryRoutine: 147 """Represents a BigQuery Routine object.""" 148 149 project_id: str 150 dataset_id: str 151 routine_id: str 152 153 def __init__(self, project_id: str, dataset_id: str, routine_id: str): 154 self.project_id = project_id 155 self.dataset_id = dataset_id 156 self.routine_id = routine_id 157 158 @property 159 def routine_identifier(self) -> str: 160 return f'{self.project_id}:{self.dataset_id}.{self.routine_id}'
Represents a BigQuery Routine object.
163class BigQueryJob(models.Resource): 164 """Represents a BigQuery Job object.""" 165 166 _job_api_resource_data: dict[str, Any] 167 _information_schema_job_metadata: dict[str, Any] 168 project_id: str 169 170 def __init__( 171 self, 172 project_id: str, 173 job_api_resource_data: dict[str, Any], 174 information_schema_job_metadata: dict[str, str], 175 ): 176 super().__init__(project_id) 177 self._job_api_resource_data = job_api_resource_data 178 self._information_schema_job_metadata = (information_schema_job_metadata or 179 {}) 180 181 @property 182 def full_path(self) -> str: 183 # returns 'https://content-bigquery.googleapis.com/bigquery/v2/ 184 # projects/<PROJECT_ID>/jobs/<JOBID>?location=<REGION>' 185 return self._job_api_resource_data.get('selfLink', '') 186 187 @property 188 def id(self) -> str: 189 # returns <PROJECT>:<REGION>.<JobID> 190 return self._job_api_resource_data.get('id', '') 191 192 @property 193 def short_path(self) -> str: 194 # returns <PROJECT>:<REGION>.<JobID> 195 return self.id 196 197 @property 198 def user_email(self) -> str: 199 return self._job_api_resource_data.get('user_email', '') 200 201 @property 202 def _job_configuration(self) -> dict[str, Any]: 203 return self._job_api_resource_data.get('configuration', {}) 204 205 @property 206 def _query(self) -> dict[str, Any]: 207 return self._job_configuration.get('query', {}) 208 209 @property 210 def _stats(self) -> dict[str, Any]: 211 """Safely access the 'statistics' dictionary.""" 212 return self._job_api_resource_data.get('statistics', {}) 213 214 @property 215 def _query_stats(self) -> dict[str, Any]: 216 """Safely access the 'statistics.query' dictionary.""" 217 return self._stats.get('query', {}) 218 219 @property 220 def _query_info(self) -> dict[str, Any]: 221 return self._query_stats.get('queryInfo', {}) 222 223 @property 224 def _status(self) -> dict[str, Any]: 225 return self._job_api_resource_data.get('status', {}) 226 227 @property 228 def job_type(self) -> str: 229 return self._job_configuration.get('jobType', '') 230 231 @property 232 def query_sql(self) -> str: 233 return self._query.get('query', '') 234 235 @property 236 def use_legacy_sql(self) -> bool: 237 return self._query.get('useLegacySql', False) 238 239 @property 240 def priority(self) -> str: 241 return self._query.get('priority', '') 242 243 @property 244 def edition(self) -> str: 245 edition_value = self._query.get('edition') 246 return str(edition_value) if edition_value else '' 247 248 @property 249 def creation_time(self) -> Optional[int]: 250 time_str = self._stats.get('creationTime') 251 return (int(time_str) 252 if isinstance(time_str, str) and time_str.isdigit() else None) 253 254 @property 255 def start_time(self) -> Optional[int]: 256 time_str = self._stats.get('startTime') 257 return (int(time_str) 258 if isinstance(time_str, str) and time_str.isdigit() else None) 259 260 @property 261 def end_time(self) -> Optional[int]: 262 time_str = self._stats.get('endTime') 263 return (int(time_str) 264 if isinstance(time_str, str) and time_str.isdigit() else None) 265 266 @property 267 def total_bytes_processed(self) -> int: 268 bytes_str = self._stats.get('totalBytesProcessed', '0') 269 return (int(bytes_str) 270 if isinstance(bytes_str, str) and bytes_str.isdigit() else 0) 271 272 @property 273 def total_bytes_billed(self) -> int: 274 bytes_str = self._query_stats.get('totalBytesBilled', '0') 275 return (int(bytes_str) 276 if isinstance(bytes_str, str) and bytes_str.isdigit() else 0) 277 278 @property 279 def total_slot_ms(self) -> int: 280 ms_str = self._stats.get('totalSlotMs', '0') 281 return int(ms_str) if isinstance(ms_str, str) and ms_str.isdigit() else 0 282 283 @property 284 def cache_hit(self) -> bool: 285 return self._query_stats.get('cacheHit') is True 286 287 @property 288 def quota_deferments(self) -> list[str]: 289 deferments_dict = self._stats.get('quotaDeferments', {}) 290 if isinstance(deferments_dict, dict): 291 deferment_list = deferments_dict.get('', []) 292 if isinstance(deferment_list, list) and all( 293 isinstance(s, str) for s in deferment_list): 294 return deferment_list 295 return [] 296 297 @property 298 def query_plan(self) -> list[dict[str, Any]]: 299 plan = self._query_stats.get('queryPlan', []) 300 return plan if isinstance(plan, list) else [] 301 302 @property 303 def total_partitions_processed(self) -> int: 304 partitions_str = self._query_stats.get('totalPartitionsProcessed', '0') 305 return (int(partitions_str) if isinstance(partitions_str, str) and 306 partitions_str.isdigit() else 0) 307 308 @property 309 def referenced_tables(self) -> list[BigQueryTable]: 310 tables_list = self._query_stats.get('referencedTables', []) 311 referenced_tables = [] 312 if isinstance(tables_list, list): 313 for item in tables_list: 314 if isinstance(item, dict): 315 project_id = item.get('projectId') 316 dataset_id = item.get('datasetId') 317 table_id = item.get('tableId') 318 if (isinstance(project_id, str) and project_id and 319 isinstance(dataset_id, str) and dataset_id and 320 isinstance(table_id, str) and table_id): 321 referenced_tables.append( 322 BigQueryTable(project_id, dataset_id, table_id)) 323 return referenced_tables 324 325 @property 326 def referenced_routines(self) -> list[BigQueryRoutine]: 327 routines_list = self._query_stats.get('referencedRoutines', []) 328 referenced_routines = [] 329 if isinstance(routines_list, list): 330 for item in routines_list: 331 if isinstance(item, dict): 332 project_id = item.get('projectId') 333 dataset_id = item.get('datasetId') 334 routine_id = item.get('routineId') 335 if (isinstance(project_id, str) and project_id and 336 isinstance(dataset_id, str) and dataset_id and 337 isinstance(routine_id, str) and routine_id): 338 referenced_routines.append( 339 BigQueryRoutine(project_id, dataset_id, routine_id)) 340 return referenced_routines 341 342 @property 343 def num_affected_dml_rows(self) -> int: 344 rows_str = self._query_stats.get('numDmlAffectedRows', '0') 345 return (int(rows_str) 346 if isinstance(rows_str, str) and rows_str.isdigit() else 0) 347 348 @property 349 def dml_stats(self) -> dict[str, int]: 350 stats = self._query_stats.get('dmlStats') 351 if not isinstance(stats, dict): 352 return {} 353 inserted_str = stats.get('insertedRowCount', '0') 354 deleted_str = stats.get('deletedRowCount', '0') 355 updated_str = stats.get('updatedRowCount', '0') 356 return { 357 'insertedRowCount': 358 (int(inserted_str) if isinstance(inserted_str, str) and 359 inserted_str.isdigit() else 0), 360 'deletedRowCount': (int(deleted_str) if isinstance(deleted_str, str) and 361 deleted_str.isdigit() else 0), 362 'updatedRowCount': (int(updated_str) if isinstance(updated_str, str) and 363 updated_str.isdigit() else 0), 364 } 365 366 @property 367 def statement_type(self) -> str: 368 stype = self._query_stats.get('statementType', '') 369 return stype if isinstance(stype, str) else '' 370 371 @property 372 def bi_engine_statistics(self) -> dict[str, Any]: 373 stats = self._query_stats.get('biEngineStatistics') 374 if not isinstance(stats, dict): 375 return {} 376 reasons_list = stats.get('accelerationMode', {}).get('biEngineReasons', []) 377 bi_engine_reasons = [] 378 if isinstance(reasons_list, list): 379 for item in reasons_list: 380 if isinstance(item, dict): 381 bi_engine_reasons.append({ 382 'code': str(item.get('code', '')), 383 'message': item.get('message', ''), 384 }) 385 return { 386 'biEngineMode': str(stats.get('biEngineMode', '')), 387 'accelerationMode': str(stats.get('accelerationMode', '')), 388 'biEngineReasons': bi_engine_reasons, 389 } 390 391 @property 392 def vector_search_statistics(self) -> dict[str, Any]: 393 stats = self._query_stats.get('vectorSearchStatistics') 394 if not isinstance(stats, dict): 395 return {} 396 reasons_list = stats.get('indexUnusedReasons', []) 397 index_unused_reasons = [] 398 if isinstance(reasons_list, list): 399 for item in reasons_list: 400 if isinstance(item, dict): 401 base_table_data = item.get('baseTable') 402 base_table_obj = None 403 if isinstance(base_table_data, dict): 404 project_id = base_table_data.get('projectId') 405 dataset_id = base_table_data.get('datasetId') 406 table_id = base_table_data.get('tableId') 407 if (isinstance(project_id, str) and project_id and 408 isinstance(dataset_id, str) and dataset_id and 409 isinstance(table_id, str) and table_id): 410 base_table_obj = BigQueryTable(project_id, dataset_id, table_id) 411 index_unused_reasons.append({ 412 'code': str(item.get('code', '')), 413 'message': item.get('message', ''), 414 'indexName': item.get('indexName', ''), 415 'baseTable': base_table_obj, 416 }) 417 return { 418 'indexUsageMode': str(stats.get('indexUsageMode', '')), 419 'indexUnusedReasons': index_unused_reasons, 420 } 421 422 @property 423 def performance_insights(self) -> dict[str, Any]: 424 insights = self._query_stats.get('performanceInsights') 425 if not isinstance(insights, dict): 426 return {} 427 standalone_list = insights.get('stagePerformanceStandaloneInsights', []) 428 stage_performance_standalone_insights = [] 429 if isinstance(standalone_list, list): 430 for item in standalone_list: 431 if isinstance(item, dict): 432 stage_performance_standalone_insights.append({ 433 'stageId': item.get('stageId', ''), 434 }) 435 change_list = insights.get('stagePerformanceChangeInsights', []) 436 stage_performance_change_insights = [] 437 if isinstance(change_list, list): 438 for item in change_list: 439 if isinstance(item, dict): 440 stage_performance_change_insights.append({ 441 'stageId': item.get('stageId', ''), 442 }) 443 avg_ms_str = insights.get('avgPreviousExecutionMs', '0') 444 return { 445 'avgPreviousExecutionMs': 446 (int(avg_ms_str) 447 if isinstance(avg_ms_str, str) and avg_ms_str.isdigit() else 0), 448 'stagePerformanceStandaloneInsights': 449 (stage_performance_standalone_insights), 450 'stagePerformanceChangeInsights': stage_performance_change_insights, 451 } 452 453 @property 454 def optimization_details(self) -> Any: 455 return self._query_info.get('optimizationDetails') 456 457 @property 458 def export_data_statistics(self) -> dict[str, int]: 459 stats = self._query_stats.get('exportDataStatistics') 460 if not isinstance(stats, dict): 461 return {} 462 file_count_str = stats.get('fileCount', '0') 463 row_count_str = stats.get('rowCount', '0') 464 return { 465 'fileCount': (int(file_count_str) if isinstance(file_count_str, str) and 466 file_count_str.isdigit() else 0), 467 'rowCount': (int(row_count_str) if isinstance(row_count_str, str) and 468 row_count_str.isdigit() else 0), 469 } 470 471 @property 472 def load_query_statistics(self) -> dict[str, int]: 473 stats = self._query_stats.get('loadQueryStatistics') 474 if not isinstance(stats, dict): 475 return {} 476 input_files_str = stats.get('inputFiles', '0') 477 input_bytes_str = stats.get('inputFileBytes', '0') 478 output_rows_str = stats.get('outputRows', '0') 479 output_bytes_str = stats.get('outputBytes', '0') 480 bad_records_str = stats.get('badRecords', '0') 481 return { 482 'inputFiles': 483 (int(input_files_str) if isinstance(input_files_str, str) and 484 input_files_str.isdigit() else 0), 485 'inputFileBytes': 486 (int(input_bytes_str) if isinstance(input_bytes_str, str) and 487 input_bytes_str.isdigit() else 0), 488 'outputRows': 489 (int(output_rows_str) if isinstance(output_rows_str, str) and 490 output_rows_str.isdigit() else 0), 491 'outputBytes': 492 (int(output_bytes_str) if isinstance(output_bytes_str, str) and 493 output_bytes_str.isdigit() else 0), 494 'badRecords': 495 (int(bad_records_str) if isinstance(bad_records_str, str) and 496 bad_records_str.isdigit() else 0), 497 } 498 499 @property 500 def spark_statistics(self) -> dict[str, Any]: 501 stats = self._query_stats.get('sparkStatistics') 502 if not isinstance(stats, dict): 503 return {} 504 logging_info_dict = stats.get('loggingInfo', {}) 505 logging_info = ({ 506 'resourceType': logging_info_dict.get('resourceType', ''), 507 'projectId': logging_info_dict.get('projectId', ''), 508 } if isinstance(logging_info_dict, dict) else {}) 509 return { 510 'endpoints': stats.get('endpoints', {}), 511 'sparkJobId': stats.get('sparkJobId', ''), 512 'sparkJobLocation': stats.get('sparkJobLocation', ''), 513 'kmsKeyName': stats.get('kmsKeyName', ''), 514 'gcsStagingBucket': stats.get('gcsStagingBucket', ''), 515 'loggingInfo': logging_info, 516 } 517 518 @property 519 def transferred_bytes(self) -> int: 520 bytes_str = self._query_stats.get('transferredBytes', '0') 521 return (int(bytes_str) 522 if isinstance(bytes_str, str) and bytes_str.isdigit() else 0) 523 524 @property 525 def reservation_id(self) -> str: 526 res_id = self._stats.get('reservation_id', '') 527 return res_id if isinstance(res_id, str) else '' 528 529 @property 530 def reservation_admin_project_id(self) -> Optional[str]: 531 if not self.reservation_id: 532 return None 533 try: 534 parts = self.reservation_id.split('/') 535 if parts[0] == 'projects' and len(parts) >= 2: 536 return parts[1] 537 else: 538 logging.warning( 539 'Could not parse project ID from reservation_id: %s', 540 self.reservation_id, 541 ) 542 return None 543 except (IndexError, AttributeError): 544 logging.warning( 545 'Could not parse project ID from reservation_id: %s', 546 self.reservation_id, 547 ) 548 return None 549 550 @property 551 def num_child_jobs(self) -> int: 552 num_str = self._stats.get('numChildJobs', '0') 553 return int(num_str) if isinstance(num_str, str) and num_str.isdigit() else 0 554 555 @property 556 def parent_job_id(self) -> str: 557 parent_id = self._stats.get('parentJobId', '') 558 return parent_id if isinstance(parent_id, str) else '' 559 560 @property 561 def row_level_security_applied(self) -> bool: 562 rls_stats = self._stats.get('RowLevelSecurityStatistics', {}) 563 return (rls_stats.get('rowLevelSecurityApplied') is True if isinstance( 564 rls_stats, dict) else False) 565 566 @property 567 def data_masking_applied(self) -> bool: 568 masking_stats = self._stats.get('dataMaskingStatistics', {}) 569 return (masking_stats.get('dataMaskingApplied') is True if isinstance( 570 masking_stats, dict) else False) 571 572 @property 573 def session_id(self) -> str: 574 session_info = self._stats.get('sessionInfo', {}) 575 session_id_val = (session_info.get('sessionId', '') if isinstance( 576 session_info, dict) else '') 577 return session_id_val if isinstance(session_id_val, str) else '' 578 579 @property 580 def final_execution_duration_ms(self) -> int: 581 duration_str = self._stats.get('finalExecutionDurationMs', '0') 582 return (int(duration_str) 583 if isinstance(duration_str, str) and duration_str.isdigit() else 0) 584 585 @property 586 def job_state(self) -> str: 587 state = self._status.get('state', '') 588 return state if isinstance(state, str) else '' 589 590 @property 591 def job_error_result(self) -> dict[str, Optional[str]]: 592 error_result = self._status.get('errorResult') 593 if not isinstance(error_result, dict): 594 return {} 595 return { 596 'reason': error_result.get('reason'), 597 'location': error_result.get('location'), 598 'debugInfo': error_result.get('debugInfo'), 599 'message': error_result.get('message'), 600 } 601 602 @property 603 def job_errors(self) -> list[dict[str, Optional[str]]]: 604 errors_list = self._status.get('errors', []) 605 errors_iterable = [] 606 if isinstance(errors_list, list): 607 for item in errors_list: 608 if isinstance(item, dict): 609 errors_iterable.append({ 610 'reason': item.get('reason'), 611 'location': item.get('location'), 612 'debugInfo': item.get('debugInfo'), 613 'message': item.get('message'), 614 }) 615 return errors_iterable 616 617 @property 618 def materialized_view_statistics(self) -> dict[str, Any]: 619 stats_list = self._query_stats.get('materializedViewStatistics') 620 materialized_view = [] 621 if isinstance(stats_list, list): 622 for item in stats_list: 623 if isinstance(item, dict): 624 table_ref_data = item.get('tableReference') 625 table_ref_obj = None 626 if isinstance(table_ref_data, dict): 627 project_id = table_ref_data.get('projectId') 628 dataset_id = table_ref_data.get('datasetId') 629 table_id = table_ref_data.get('tableId') 630 if (isinstance(project_id, str) and project_id and 631 isinstance(dataset_id, str) and dataset_id and 632 isinstance(table_id, str) and table_id): 633 table_ref_obj = BigQueryTable(project_id, dataset_id, table_id) 634 chosen = item.get('chosen') is True 635 saved_str = item.get('estimatedBytesSaved', '0') 636 estimated_bytes_saved = (int(saved_str) 637 if isinstance(saved_str, str) and 638 saved_str.isdigit() else 0) 639 rejected_reason = str(item.get('rejectedReason', '')) 640 materialized_view.append({ 641 'chosen': chosen, 642 'estimatedBytesSaved': estimated_bytes_saved, 643 'rejectedReason': rejected_reason, 644 'tableReference': table_ref_obj, 645 }) 646 return {'materializedView': materialized_view} 647 648 @property 649 def metadata_cache_statistics(self) -> dict[str, Any]: 650 stats_list = self._query_stats.get('metadataCacheStatistics') 651 metadata_cache = [] 652 if isinstance(stats_list, list): 653 for item in stats_list: 654 if isinstance(item, dict): 655 table_ref_data = item.get('tableReference') 656 table_ref_obj = None 657 if isinstance(table_ref_data, dict): 658 project_id = table_ref_data.get('projectId') 659 dataset_id = table_ref_data.get('datasetId') 660 table_id = table_ref_data.get('tableId') 661 if (isinstance(project_id, str) and project_id and 662 isinstance(dataset_id, str) and dataset_id and 663 isinstance(table_id, str) and table_id): 664 table_ref_obj = BigQueryTable(project_id, dataset_id, table_id) 665 metadata_cache.append({ 666 'explanation': item.get('explanation', ''), 667 'unusedReason': str(item.get('unusedReason', '')), 668 'tableReference': table_ref_obj, 669 }) 670 return {'tableMetadataCacheUsage': metadata_cache} 671 672 # Properties derived from _information_schema_job_metadata 673 @property 674 def information_schema_user_email(self) -> str | None: 675 if not self._information_schema_job_metadata: 676 return C_NOT_AVAILABLE 677 return self._information_schema_job_metadata.get('user_email') 678 679 @property 680 def information_schema_start_time_str(self) -> str | None: 681 if not self._information_schema_job_metadata: 682 return C_NOT_AVAILABLE 683 return self._information_schema_job_metadata.get('start_time_str') 684 685 @property 686 def information_schema_end_time_str(self) -> str | None: 687 if not self._information_schema_job_metadata: 688 return C_NOT_AVAILABLE 689 return self._information_schema_job_metadata.get('end_time_str') 690 691 @property 692 def information_schema_query(self) -> str | None: 693 if not self._information_schema_job_metadata: 694 return C_NOT_AVAILABLE 695 return self._information_schema_job_metadata.get('query') 696 697 @property 698 def information_schema_total_modified_partitions(self) -> Union[int, str]: 699 """The total number of partitions the job modified. 700 701 This field is populated for LOAD and QUERY jobs. 702 """ 703 if not self._information_schema_job_metadata: 704 return C_NOT_AVAILABLE 705 try: 706 total_modified_partitions = self._information_schema_job_metadata[ 707 'total_modified_partitions'] 708 return total_modified_partitions 709 except KeyError: 710 return C_NOT_AVAILABLE 711 712 @property 713 def information_schema_resource_warning(self) -> str: 714 """The warning message that appears if the resource usage during query 715 716 processing is above the internal threshold of the system. 717 """ 718 if not self._information_schema_job_metadata: 719 return C_NOT_AVAILABLE 720 try: 721 resource_warning = self._information_schema_job_metadata['query_info'][ 722 'resource_warning'] 723 return resource_warning 724 except KeyError: 725 return C_NOT_AVAILABLE 726 727 @property 728 def information_schema_normalized_literals(self) -> str: 729 """Contains the hashes of the query.""" 730 try: 731 query_hashes = self._information_schema_job_metadata['query_info'][ 732 'query_hashes']['normalized_literals'] 733 return query_hashes 734 except KeyError: 735 return C_NOT_AVAILABLE
Represents a BigQuery Job object.
170 def __init__( 171 self, 172 project_id: str, 173 job_api_resource_data: dict[str, Any], 174 information_schema_job_metadata: dict[str, str], 175 ): 176 super().__init__(project_id) 177 self._job_api_resource_data = job_api_resource_data 178 self._information_schema_job_metadata = (information_schema_job_metadata or 179 {})
266 @property 267 def project_id(self) -> str: 268 """Project id (not project number).""" 269 return self._project_id
Project id (not project number).
181 @property 182 def full_path(self) -> str: 183 # returns 'https://content-bigquery.googleapis.com/bigquery/v2/ 184 # projects/<PROJECT_ID>/jobs/<JOBID>?location=<REGION>' 185 return self._job_api_resource_data.get('selfLink', '')
Returns the full path of this resource.
Example: 'projects/gcpdiag-gke-1-9b90/zones/europe-west4-a/clusters/gke1'
192 @property 193 def short_path(self) -> str: 194 # returns <PROJECT>:<REGION>.<JobID> 195 return self.id
Returns the short name for this resource.
Note that it isn't clear from this name what kind of resource it is.
Example: 'gke1'
287 @property 288 def quota_deferments(self) -> list[str]: 289 deferments_dict = self._stats.get('quotaDeferments', {}) 290 if isinstance(deferments_dict, dict): 291 deferment_list = deferments_dict.get('', []) 292 if isinstance(deferment_list, list) and all( 293 isinstance(s, str) for s in deferment_list): 294 return deferment_list 295 return []
308 @property 309 def referenced_tables(self) -> list[BigQueryTable]: 310 tables_list = self._query_stats.get('referencedTables', []) 311 referenced_tables = [] 312 if isinstance(tables_list, list): 313 for item in tables_list: 314 if isinstance(item, dict): 315 project_id = item.get('projectId') 316 dataset_id = item.get('datasetId') 317 table_id = item.get('tableId') 318 if (isinstance(project_id, str) and project_id and 319 isinstance(dataset_id, str) and dataset_id and 320 isinstance(table_id, str) and table_id): 321 referenced_tables.append( 322 BigQueryTable(project_id, dataset_id, table_id)) 323 return referenced_tables
325 @property 326 def referenced_routines(self) -> list[BigQueryRoutine]: 327 routines_list = self._query_stats.get('referencedRoutines', []) 328 referenced_routines = [] 329 if isinstance(routines_list, list): 330 for item in routines_list: 331 if isinstance(item, dict): 332 project_id = item.get('projectId') 333 dataset_id = item.get('datasetId') 334 routine_id = item.get('routineId') 335 if (isinstance(project_id, str) and project_id and 336 isinstance(dataset_id, str) and dataset_id and 337 isinstance(routine_id, str) and routine_id): 338 referenced_routines.append( 339 BigQueryRoutine(project_id, dataset_id, routine_id)) 340 return referenced_routines
348 @property 349 def dml_stats(self) -> dict[str, int]: 350 stats = self._query_stats.get('dmlStats') 351 if not isinstance(stats, dict): 352 return {} 353 inserted_str = stats.get('insertedRowCount', '0') 354 deleted_str = stats.get('deletedRowCount', '0') 355 updated_str = stats.get('updatedRowCount', '0') 356 return { 357 'insertedRowCount': 358 (int(inserted_str) if isinstance(inserted_str, str) and 359 inserted_str.isdigit() else 0), 360 'deletedRowCount': (int(deleted_str) if isinstance(deleted_str, str) and 361 deleted_str.isdigit() else 0), 362 'updatedRowCount': (int(updated_str) if isinstance(updated_str, str) and 363 updated_str.isdigit() else 0), 364 }
371 @property 372 def bi_engine_statistics(self) -> dict[str, Any]: 373 stats = self._query_stats.get('biEngineStatistics') 374 if not isinstance(stats, dict): 375 return {} 376 reasons_list = stats.get('accelerationMode', {}).get('biEngineReasons', []) 377 bi_engine_reasons = [] 378 if isinstance(reasons_list, list): 379 for item in reasons_list: 380 if isinstance(item, dict): 381 bi_engine_reasons.append({ 382 'code': str(item.get('code', '')), 383 'message': item.get('message', ''), 384 }) 385 return { 386 'biEngineMode': str(stats.get('biEngineMode', '')), 387 'accelerationMode': str(stats.get('accelerationMode', '')), 388 'biEngineReasons': bi_engine_reasons, 389 }
391 @property 392 def vector_search_statistics(self) -> dict[str, Any]: 393 stats = self._query_stats.get('vectorSearchStatistics') 394 if not isinstance(stats, dict): 395 return {} 396 reasons_list = stats.get('indexUnusedReasons', []) 397 index_unused_reasons = [] 398 if isinstance(reasons_list, list): 399 for item in reasons_list: 400 if isinstance(item, dict): 401 base_table_data = item.get('baseTable') 402 base_table_obj = None 403 if isinstance(base_table_data, dict): 404 project_id = base_table_data.get('projectId') 405 dataset_id = base_table_data.get('datasetId') 406 table_id = base_table_data.get('tableId') 407 if (isinstance(project_id, str) and project_id and 408 isinstance(dataset_id, str) and dataset_id and 409 isinstance(table_id, str) and table_id): 410 base_table_obj = BigQueryTable(project_id, dataset_id, table_id) 411 index_unused_reasons.append({ 412 'code': str(item.get('code', '')), 413 'message': item.get('message', ''), 414 'indexName': item.get('indexName', ''), 415 'baseTable': base_table_obj, 416 }) 417 return { 418 'indexUsageMode': str(stats.get('indexUsageMode', '')), 419 'indexUnusedReasons': index_unused_reasons, 420 }
422 @property 423 def performance_insights(self) -> dict[str, Any]: 424 insights = self._query_stats.get('performanceInsights') 425 if not isinstance(insights, dict): 426 return {} 427 standalone_list = insights.get('stagePerformanceStandaloneInsights', []) 428 stage_performance_standalone_insights = [] 429 if isinstance(standalone_list, list): 430 for item in standalone_list: 431 if isinstance(item, dict): 432 stage_performance_standalone_insights.append({ 433 'stageId': item.get('stageId', ''), 434 }) 435 change_list = insights.get('stagePerformanceChangeInsights', []) 436 stage_performance_change_insights = [] 437 if isinstance(change_list, list): 438 for item in change_list: 439 if isinstance(item, dict): 440 stage_performance_change_insights.append({ 441 'stageId': item.get('stageId', ''), 442 }) 443 avg_ms_str = insights.get('avgPreviousExecutionMs', '0') 444 return { 445 'avgPreviousExecutionMs': 446 (int(avg_ms_str) 447 if isinstance(avg_ms_str, str) and avg_ms_str.isdigit() else 0), 448 'stagePerformanceStandaloneInsights': 449 (stage_performance_standalone_insights), 450 'stagePerformanceChangeInsights': stage_performance_change_insights, 451 }
457 @property 458 def export_data_statistics(self) -> dict[str, int]: 459 stats = self._query_stats.get('exportDataStatistics') 460 if not isinstance(stats, dict): 461 return {} 462 file_count_str = stats.get('fileCount', '0') 463 row_count_str = stats.get('rowCount', '0') 464 return { 465 'fileCount': (int(file_count_str) if isinstance(file_count_str, str) and 466 file_count_str.isdigit() else 0), 467 'rowCount': (int(row_count_str) if isinstance(row_count_str, str) and 468 row_count_str.isdigit() else 0), 469 }
471 @property 472 def load_query_statistics(self) -> dict[str, int]: 473 stats = self._query_stats.get('loadQueryStatistics') 474 if not isinstance(stats, dict): 475 return {} 476 input_files_str = stats.get('inputFiles', '0') 477 input_bytes_str = stats.get('inputFileBytes', '0') 478 output_rows_str = stats.get('outputRows', '0') 479 output_bytes_str = stats.get('outputBytes', '0') 480 bad_records_str = stats.get('badRecords', '0') 481 return { 482 'inputFiles': 483 (int(input_files_str) if isinstance(input_files_str, str) and 484 input_files_str.isdigit() else 0), 485 'inputFileBytes': 486 (int(input_bytes_str) if isinstance(input_bytes_str, str) and 487 input_bytes_str.isdigit() else 0), 488 'outputRows': 489 (int(output_rows_str) if isinstance(output_rows_str, str) and 490 output_rows_str.isdigit() else 0), 491 'outputBytes': 492 (int(output_bytes_str) if isinstance(output_bytes_str, str) and 493 output_bytes_str.isdigit() else 0), 494 'badRecords': 495 (int(bad_records_str) if isinstance(bad_records_str, str) and 496 bad_records_str.isdigit() else 0), 497 }
499 @property 500 def spark_statistics(self) -> dict[str, Any]: 501 stats = self._query_stats.get('sparkStatistics') 502 if not isinstance(stats, dict): 503 return {} 504 logging_info_dict = stats.get('loggingInfo', {}) 505 logging_info = ({ 506 'resourceType': logging_info_dict.get('resourceType', ''), 507 'projectId': logging_info_dict.get('projectId', ''), 508 } if isinstance(logging_info_dict, dict) else {}) 509 return { 510 'endpoints': stats.get('endpoints', {}), 511 'sparkJobId': stats.get('sparkJobId', ''), 512 'sparkJobLocation': stats.get('sparkJobLocation', ''), 513 'kmsKeyName': stats.get('kmsKeyName', ''), 514 'gcsStagingBucket': stats.get('gcsStagingBucket', ''), 515 'loggingInfo': logging_info, 516 }
529 @property 530 def reservation_admin_project_id(self) -> Optional[str]: 531 if not self.reservation_id: 532 return None 533 try: 534 parts = self.reservation_id.split('/') 535 if parts[0] == 'projects' and len(parts) >= 2: 536 return parts[1] 537 else: 538 logging.warning( 539 'Could not parse project ID from reservation_id: %s', 540 self.reservation_id, 541 ) 542 return None 543 except (IndexError, AttributeError): 544 logging.warning( 545 'Could not parse project ID from reservation_id: %s', 546 self.reservation_id, 547 ) 548 return None
590 @property 591 def job_error_result(self) -> dict[str, Optional[str]]: 592 error_result = self._status.get('errorResult') 593 if not isinstance(error_result, dict): 594 return {} 595 return { 596 'reason': error_result.get('reason'), 597 'location': error_result.get('location'), 598 'debugInfo': error_result.get('debugInfo'), 599 'message': error_result.get('message'), 600 }
602 @property 603 def job_errors(self) -> list[dict[str, Optional[str]]]: 604 errors_list = self._status.get('errors', []) 605 errors_iterable = [] 606 if isinstance(errors_list, list): 607 for item in errors_list: 608 if isinstance(item, dict): 609 errors_iterable.append({ 610 'reason': item.get('reason'), 611 'location': item.get('location'), 612 'debugInfo': item.get('debugInfo'), 613 'message': item.get('message'), 614 }) 615 return errors_iterable
617 @property 618 def materialized_view_statistics(self) -> dict[str, Any]: 619 stats_list = self._query_stats.get('materializedViewStatistics') 620 materialized_view = [] 621 if isinstance(stats_list, list): 622 for item in stats_list: 623 if isinstance(item, dict): 624 table_ref_data = item.get('tableReference') 625 table_ref_obj = None 626 if isinstance(table_ref_data, dict): 627 project_id = table_ref_data.get('projectId') 628 dataset_id = table_ref_data.get('datasetId') 629 table_id = table_ref_data.get('tableId') 630 if (isinstance(project_id, str) and project_id and 631 isinstance(dataset_id, str) and dataset_id and 632 isinstance(table_id, str) and table_id): 633 table_ref_obj = BigQueryTable(project_id, dataset_id, table_id) 634 chosen = item.get('chosen') is True 635 saved_str = item.get('estimatedBytesSaved', '0') 636 estimated_bytes_saved = (int(saved_str) 637 if isinstance(saved_str, str) and 638 saved_str.isdigit() else 0) 639 rejected_reason = str(item.get('rejectedReason', '')) 640 materialized_view.append({ 641 'chosen': chosen, 642 'estimatedBytesSaved': estimated_bytes_saved, 643 'rejectedReason': rejected_reason, 644 'tableReference': table_ref_obj, 645 }) 646 return {'materializedView': materialized_view}
648 @property 649 def metadata_cache_statistics(self) -> dict[str, Any]: 650 stats_list = self._query_stats.get('metadataCacheStatistics') 651 metadata_cache = [] 652 if isinstance(stats_list, list): 653 for item in stats_list: 654 if isinstance(item, dict): 655 table_ref_data = item.get('tableReference') 656 table_ref_obj = None 657 if isinstance(table_ref_data, dict): 658 project_id = table_ref_data.get('projectId') 659 dataset_id = table_ref_data.get('datasetId') 660 table_id = table_ref_data.get('tableId') 661 if (isinstance(project_id, str) and project_id and 662 isinstance(dataset_id, str) and dataset_id and 663 isinstance(table_id, str) and table_id): 664 table_ref_obj = BigQueryTable(project_id, dataset_id, table_id) 665 metadata_cache.append({ 666 'explanation': item.get('explanation', ''), 667 'unusedReason': str(item.get('unusedReason', '')), 668 'tableReference': table_ref_obj, 669 }) 670 return {'tableMetadataCacheUsage': metadata_cache}
697 @property 698 def information_schema_total_modified_partitions(self) -> Union[int, str]: 699 """The total number of partitions the job modified. 700 701 This field is populated for LOAD and QUERY jobs. 702 """ 703 if not self._information_schema_job_metadata: 704 return C_NOT_AVAILABLE 705 try: 706 total_modified_partitions = self._information_schema_job_metadata[ 707 'total_modified_partitions'] 708 return total_modified_partitions 709 except KeyError: 710 return C_NOT_AVAILABLE
The total number of partitions the job modified.
This field is populated for LOAD and QUERY jobs.
712 @property 713 def information_schema_resource_warning(self) -> str: 714 """The warning message that appears if the resource usage during query 715 716 processing is above the internal threshold of the system. 717 """ 718 if not self._information_schema_job_metadata: 719 return C_NOT_AVAILABLE 720 try: 721 resource_warning = self._information_schema_job_metadata['query_info'][ 722 'resource_warning'] 723 return resource_warning 724 except KeyError: 725 return C_NOT_AVAILABLE
The warning message that appears if the resource usage during query
processing is above the internal threshold of the system.
727 @property 728 def information_schema_normalized_literals(self) -> str: 729 """Contains the hashes of the query.""" 730 try: 731 query_hashes = self._information_schema_job_metadata['query_info'][ 732 'query_hashes']['normalized_literals'] 733 return query_hashes 734 except KeyError: 735 return C_NOT_AVAILABLE
Contains the hashes of the query.
738@caching.cached_api_call 739def get_bigquery_job_api_resource_data( 740 project_id: str, 741 region: str, 742 job_id: str, 743) -> Union[dict[str, Any], None]: 744 """Fetch a specific BigQuery job's raw API resource data.""" 745 api = apis.get_api('bigquery', 'v2', project_id) 746 query_job = api.jobs().get(projectId=project_id, 747 location=region, 748 jobId=job_id) 749 750 try: 751 resp = query_job.execute(num_retries=config.API_RETRIES) 752 return resp 753 except errors.HttpError as err: 754 raise utils.GcpApiError(err) from err
Fetch a specific BigQuery job's raw API resource data.
757@caching.cached_api_call 758def get_information_schema_job_metadata( 759 context: models.Context, 760 project_id: str, 761 region: str, 762 job_id: str, 763 creation_time_milis: Optional[int] = None, 764 skip_permission_check: bool = False, 765) -> Optional[dict[str, Any]]: 766 """Fetch metadata about a BigQuery job from the INFORMATION_SCHEMA.""" 767 if not apis.is_enabled(project_id, 'bigquery'): 768 return None 769 user_email = '' 770 try: 771 user_email = apis.get_user_email() 772 except (RuntimeError, exceptions.DefaultCredentialsError): 773 pass 774 except AttributeError as err: 775 if (('has no attribute' in str(err)) and 776 ('with_quota_project' in str(err))): 777 op.info('Running the investigation within the GCA context.') 778 user = 'user:' + user_email 779 if not skip_permission_check: 780 try: 781 policy = iam.get_project_policy(context) 782 if (not policy.has_permission(user, 'bigquery.jobs.create')) or ( 783 not policy.has_permission(user, 'bigquery.jobs.listAll')): 784 op.info( 785 f'WARNING: Unable to run INFORMATION_SCHEMA view analysis due to missing permissions.\ 786 \nMake sure to grant {user_email} "bigquery.jobs.create" and "bigquery.jobs.listAll".\ 787 \nContinuing the investigation with the BigQuery job metadata obtained from the API.' 788 ) 789 return None 790 except utils.GcpApiError: 791 op.info( 792 'Attempting to query INFORMATION_SCHEMA with no knowledge of project' 793 ' level permissions \n(due to missing' 794 ' resourcemanager.projects.get permission).') 795 else: 796 op.info( 797 'Attempting to query INFORMATION_SCHEMA without checking project level permissions.' 798 ) 799 try: 800 creation_time_milis_filter = ' ' 801 if creation_time_milis: 802 creation_time_milis_filter = ( 803 f'AND creation_time = TIMESTAMP_MILLIS({creation_time_milis})') 804 query = f""" 805 SELECT 806 user_email, start_time, end_time, query 807 FROM 808 `{project_id}`.`region-{region}`.INFORMATION_SCHEMA.JOBS 809 WHERE 810 job_id = '{job_id}' 811 {creation_time_milis_filter} 812 LIMIT 1 813 """ 814 results = get_query_results( 815 project_id=project_id, 816 query=query, 817 location=region, 818 timeout_sec=30, 819 poll_interval_sec=2, # Short poll interval 820 ) 821 if not results or len(results) != 1: 822 # We cannot raise an exception otherwise tests that use get_bigquery_job would fail 823 # raise ValueError(f"Job {job_id} not found in INFORMATION_SCHEMA") 824 return None 825 return results[0] 826 except errors.HttpError as err: 827 logging.warning( 828 'Failed to retrieve INFORMATION_SCHEMA job metadata for job %s: %s', 829 job_id, 830 err, 831 ) 832 return None 833 except KeyError as err: 834 logging.warning( 835 'Failed to parse INFORMATION_SCHEMA response for job %s: %s', 836 job_id, 837 err, 838 ) 839 return None
Fetch metadata about a BigQuery job from the INFORMATION_SCHEMA.
842def get_bigquery_job( 843 context: models.Context, 844 region: str, 845 job_id: str, 846 skip_permission_check: bool = False) -> Union[BigQueryJob, None]: 847 """Fetch a BigQuery job, combining API and INFORMATION_SCHEMA data.""" 848 project_id = context.project_id 849 if not project_id: 850 return None 851 try: 852 job_api_resource_data = get_bigquery_job_api_resource_data( 853 project_id, region, job_id) 854 if not job_api_resource_data: 855 return None 856 except utils.GcpApiError as err: 857 # This will be returned when permissions to fetch a job are missing. 858 if 'permission' in err.message.lower(): 859 user_email = '' 860 try: 861 user_email = apis.get_user_email() 862 except (RuntimeError, AttributeError, 863 exceptions.DefaultCredentialsError) as error: 864 if (('has no attribute' in str(error)) and 865 ('with_quota_project' in str(error))): 866 op.info('Running the investigation within the GCA context.') 867 logging.debug(('Could not retrieve BigQuery job %s.\ 868 \n make sure to give the bigquery.jobs.get and bigquery.jobs.create permissions to %s', 869 (project_id + ':' + region + '.' + job_id), user_email)) 870 raise utils.GcpApiError(err) 871 # This will be returned when a job is not found. 872 elif 'not found' in err.message.lower(): 873 job_id_string = project_id + ':' + region + '.' + job_id 874 logging.debug('Could not find BigQuery job %s', job_id_string) 875 return None 876 else: 877 logging.debug(( 878 'Could not retrieve BigQuery job %s due to an issue calling the API. \ 879 Please restart the investigation.', 880 (project_id + ':' + region + '.' + job_id))) 881 return None 882 information_schema_job_metadata = {} 883 job_creation_millis = None 884 creation_time_str = job_api_resource_data.get('statistics', 885 {}).get('creationTime') 886 if creation_time_str: 887 try: 888 job_creation_millis = int(creation_time_str) 889 except (ValueError, TypeError): 890 pass 891 information_schema_job_metadata = get_information_schema_job_metadata( 892 context, project_id, region, job_id, job_creation_millis, 893 skip_permission_check) 894 return BigQueryJob( 895 project_id=project_id, 896 job_api_resource_data=job_api_resource_data, 897 information_schema_job_metadata=information_schema_job_metadata)
Fetch a BigQuery job, combining API and INFORMATION_SCHEMA data.
937def get_query_results( 938 project_id: str, 939 query: str, 940 location: Optional[str] = None, 941 timeout_sec: int = 30, 942 poll_interval_sec: int = 2, 943) -> Optional[List[dict[str, Any]]]: 944 """Executes a BigQuery query, waits for completion, and returns the results. 945 946 Args: 947 project_id: The GCP project ID where the query should run. 948 query: The SQL query string to execute. 949 location: The location (e.g., 'US', 'EU', 'us-central1') where the job 950 should run. If None, BigQuery defaults might apply, often based on 951 dataset locations if referenced. 952 timeout_sec: Maximum time in seconds to wait for the query job to 953 complete. 954 poll_interval_sec: Time in seconds between polling the job status. 955 956 Returns: 957 A list of dictionaries representing the result rows, or None if the 958 query fails, times out, or the API is disabled. 959 Raises: 960 utils.GcpApiError: If an unrecoverable API error occurs during job 961 insertion, status check, or result fetching. 962 """ 963 if not apis.is_enabled(project_id, 'bigquery'): 964 logging.warning('BigQuery API is not enabled in project %s.', project_id) 965 return None 966 api = apis.get_api('bigquery', 'v2', project_id) 967 job_id = f'gcpdiag_query_{uuid.uuid4()}' 968 job_body = { 969 'jobReference': { 970 'projectId': project_id, 971 'jobId': job_id, 972 'location': location, # Location can be None 973 }, 974 'configuration': { 975 'query': { 976 'query': query, 977 'useLegacySql': False, 978 # Consider adding priority, destinationTable, etc. if needed 979 } 980 }, 981 } 982 try: 983 logging.debug( 984 'Starting BigQuery job %s in project %s, location %s', 985 job_id, 986 project_id, 987 location or 'default', 988 ) 989 insert_request = api.jobs().insert(projectId=project_id, body=job_body) 990 insert_response = insert_request.execute(num_retries=config.API_RETRIES) 991 job_ref = insert_response['jobReference'] 992 actual_job_id = job_ref['jobId'] 993 actual_location = job_ref.get('location') # Get location assigned by BQ 994 logging.debug('Job %s created. Polling for completion...', actual_job_id) 995 start_time = time.time() 996 while True: 997 # Check for timeout 998 if time.time() - start_time > timeout_sec: 999 logging.error( 1000 'BigQuery job %s timed out after %d seconds.', 1001 actual_job_id, 1002 timeout_sec, 1003 ) 1004 return None 1005 # Get job status 1006 logging.debug('>>> Getting job status for %s', actual_job_id) 1007 get_request = api.jobs().get( 1008 projectId=job_ref['projectId'], 1009 jobId=actual_job_id, 1010 location=actual_location, 1011 ) 1012 job_status_response = get_request.execute(num_retries=config.API_RETRIES) 1013 status = job_status_response.get('status', {}) 1014 logging.debug('>>> Job status: %s', status.get('state')) 1015 if status.get('state') == 'DONE': 1016 if status.get('errorResult'): 1017 error_info = status['errorResult'] 1018 if 'User does not have permission to query table' in error_info.get( 1019 'message'): 1020 op.info( 1021 error_info.get('message')[15:] + 1022 '\nContinuing the investigation with the job metadata obtained from the API.' 1023 ) 1024 else: 1025 error_info = status['errorResult'] 1026 logging.error( 1027 'BigQuery job %s failed. Reason: %s, Message: %s', 1028 actual_job_id, 1029 error_info.get('reason'), 1030 error_info.get('message'), 1031 ) 1032 # Log detailed errors if available 1033 for error in status.get('errors', []): 1034 logging.error( 1035 ' - Detail: %s (Location: %s)', 1036 error.get('message'), 1037 error.get('location'), 1038 ) 1039 return None 1040 else: 1041 logging.debug('BigQuery job %s completed successfully.', 1042 actual_job_id) 1043 break # Job finished successfully 1044 elif status.get('state') in ['PENDING', 'RUNNING']: 1045 logging.debug('>>> Job running, sleeping...') 1046 # Job still running, wait and poll again 1047 time.sleep(poll_interval_sec) 1048 else: 1049 # Unexpected state 1050 logging.error( 1051 'BigQuery job %s entered unexpected state: %s', 1052 actual_job_id, 1053 status.get('state', 'UNKNOWN'), 1054 ) 1055 return None 1056 # Fetch results 1057 logging.debug('>>> Fetching results for job %s...', 1058 actual_job_id) # <-- ADD 1059 results_request = api.jobs().getQueryResults( 1060 projectId=job_ref['projectId'], 1061 jobId=actual_job_id, 1062 location=actual_location, 1063 # Add startIndex, maxResults for pagination if needed 1064 ) 1065 results_response = results_request.execute(num_retries=config.API_RETRIES) 1066 # Check if job actually completed (getQueryResults might return before DONE sometimes) 1067 if not results_response.get('jobComplete', False): 1068 logging.warning( 1069 'getQueryResults returned jobComplete=False for job %s, results might' 1070 ' be incomplete.', 1071 actual_job_id, 1072 ) 1073 # Decide if you want to wait longer or return potentially partial results 1074 rows = [] 1075 if 'rows' in results_response and 'schema' in results_response: 1076 schema_fields = results_response['schema'].get('fields') 1077 if not schema_fields: 1078 return [] 1079 for row_data in results_response['rows']: 1080 if 'f' in row_data: 1081 rows.append(_parse_row(schema_fields, row_data['f'])) 1082 if results_response.get('pageToken'): 1083 logging.warning( 1084 'Query results for job %s are paginated, but pagination ' 1085 'is not yet implemented.', 1086 actual_job_id, 1087 ) 1088 return rows 1089 except errors.HttpError as err: 1090 logging.error('API error during BigQuery query execution for job %s: %s', 1091 job_id, err) 1092 # Raise specific GcpApiError if needed for upstream handling 1093 raise utils.GcpApiError(err) from err 1094 except Exception as e: 1095 logging.exception( 1096 'Unexpected error during BigQuery query execution for job %s: %s', 1097 job_id, 1098 e, 1099 ) 1100 # Re-raise or handle as appropriate 1101 raise
Executes a BigQuery query, waits for completion, and returns the results.
Arguments:
- project_id: The GCP project ID where the query should run.
- query: The SQL query string to execute.
- location: The location (e.g., 'US', 'EU', 'us-central1') where the job should run. If None, BigQuery defaults might apply, often based on dataset locations if referenced.
- timeout_sec: Maximum time in seconds to wait for the query job to complete.
- poll_interval_sec: Time in seconds between polling the job status.
Returns:
A list of dictionaries representing the result rows, or None if the query fails, times out, or the API is disabled.
Raises:
- utils.GcpApiError: If an unrecoverable API error occurs during job insertion, status check, or result fetching.
1104@caching.cached_api_call 1105def get_bigquery_project(project_id: str) -> crm.Project: 1106 """Attempts to retrieve project details for the supplied BigQuery project id or number. 1107 1108 If the project is found/accessible, it returns a Project object with the resource data. 1109 If the project cannot be retrieved, the application raises one of the exceptions below. 1110 The get_bigquery_project method avoids unnecessary printing of the error message to keep 1111 the user interface of the tool cleaner to focus on meaningful investigation results. 1112 Corresponding errors are handled gracefully downstream. 1113 1114 Args: 1115 project_id (str): The project id or number of 1116 the project (e.g., "123456789", "example-project"). 1117 1118 Returns: 1119 Project: An object representing the BigQuery project's full details. 1120 1121 Raises: 1122 utils.GcpApiError: If there is an issue calling the GCP/HTTP Error API. 1123 1124 Usage: 1125 When using project identifier from gcpdiag.models.Context 1126 1127 project = crm.get_project(context.project_id) 1128 1129 An unknown project identifier 1130 try: 1131 project = crm.get_project("123456789") 1132 except: 1133 # Handle exception 1134 else: 1135 # use project data 1136 """ 1137 try: 1138 logging.debug('retrieving project %s ', project_id) 1139 crm_api = apis.get_api('cloudresourcemanager', 'v3', project_id) 1140 request = crm_api.projects().get(name=f'projects/{project_id}') 1141 response = request.execute(num_retries=config.API_RETRIES) 1142 except errors.HttpError as e: 1143 error = utils.GcpApiError(response=e) 1144 raise error from e 1145 else: 1146 return crm.Project(resource_data=response)
Attempts to retrieve project details for the supplied BigQuery project id or number.
If the project is found/accessible, it returns a Project object with the resource data. If the project cannot be retrieved, the application raises one of the exceptions below. The get_bigquery_project method avoids unnecessary printing of the error message to keep the user interface of the tool cleaner to focus on meaningful investigation results. Corresponding errors are handled gracefully downstream.
Arguments:
- project_id (str): The project id or number of
- the project (e.g., "123456789", "example-project").
Returns:
Project: An object representing the BigQuery project's full details.
Raises:
- utils.GcpApiError: If there is an issue calling the GCP/HTTP Error API.
Usage:
When using project identifier from gcpdiag.models.Context
project = crm.get_project(context.project_id)
An unknown project identifier try: project = crm.get_project("123456789") except: # Handle exception else: # use project data