gcpdiag.queries.logs
Queries related to Cloud Logging. The main functionality is querying log entries, which is supposed to be used as follows: 1. Call query() with the logs query parameters that you need. This returns a LogsQuery object which can be used to retrieve the logs later. 2. Call execute_queries() to execute all log query jobs. Similar queries will be grouped together to minimize the number of required API calls. Multiple queries will be done in parallel, while always respecting the Cloud Logging limit of 60 queries per 60 seconds. 3. Use the entries property on the LogsQuery object to iterate over the fetched logs. Note that the entries are not guaranteed to be filtered by what was given in the "filter_str" argument to query(), you will need to filter out the entries in code as well when iterating over the log entries. Side note: this module is not called 'logging' to avoid using the same name as the standard python library for logging.
class
LogsQuery:
67class LogsQuery: 68 """A log search job that was started with prefetch_logs().""" 69 job: _LogsQueryJob 70 71 def __init__(self, job): 72 self.job = job 73 74 @property 75 def entries(self) -> Sequence: 76 if not self.job.future: 77 raise RuntimeError( 78 'log query was\'t executed. did you forget to call execute_queries()?' 79 ) 80 elif self.job.future.running(): 81 logging.debug( 82 'waiting for logs query results (project: %s, resource type: %s)', 83 self.job.project_id, self.job.resource_type) 84 return self.job.future.result()
A log search job that was started with prefetch_logs().
entries: Sequence
74 @property 75 def entries(self) -> Sequence: 76 if not self.job.future: 77 raise RuntimeError( 78 'log query was\'t executed. did you forget to call execute_queries()?' 79 ) 80 elif self.job.future.running(): 81 logging.debug( 82 'waiting for logs query results (project: %s, resource type: %s)', 83 self.job.project_id, self.job.resource_type) 84 return self.job.future.result()
jobs_todo: Dict[Tuple[str, str, str], gcpdiag.queries.logs._LogsQueryJob] =
{}
class
LogEntryShort:
90class LogEntryShort: 91 """A common log entry""" 92 _text: str 93 _timestamp: Optional[datetime.datetime] 94 95 def __init__(self, raw_entry): 96 if isinstance(raw_entry, dict): 97 self._text = get_path(raw_entry, ('textPayload',), default='') 98 self._timestamp = log_entry_timestamp(raw_entry) 99 100 if isinstance(raw_entry, str): 101 self._text = raw_entry 102 # we could extract timestamp from serial entries 103 # but they are not always present 104 # and may be unreliable as we don't know the system clock setting 105 self._timestamp = None 106 107 @property 108 def text(self): 109 return self._text 110 111 @property 112 def timestamp(self): 113 return self._timestamp 114 115 @property 116 def timestamp_iso(self): 117 if self._timestamp: 118 return self._timestamp.astimezone().isoformat(sep=' ', timespec='seconds') 119 return None
A common log entry
LogEntryShort(raw_entry)
95 def __init__(self, raw_entry): 96 if isinstance(raw_entry, dict): 97 self._text = get_path(raw_entry, ('textPayload',), default='') 98 self._timestamp = log_entry_timestamp(raw_entry) 99 100 if isinstance(raw_entry, str): 101 self._text = raw_entry 102 # we could extract timestamp from serial entries 103 # but they are not always present 104 # and may be unreliable as we don't know the system clock setting 105 self._timestamp = None
class
LogExclusion(gcpdiag.models.Resource):
122class LogExclusion(models.Resource): 123 """A log exclusion entry""" 124 _resource_data: dict 125 project_id: str 126 127 def __init__(self, project_id: str, resource_data: dict): 128 super().__init__(project_id) 129 self._resource_data = resource_data 130 131 @property 132 def full_path(self) -> str: 133 return self._resource_data['name'] 134 135 @property 136 def filter(self) -> str: 137 return self._resource_data['filter'] 138 139 @property 140 def disabled(self) -> bool: 141 if 'disabled' in self._resource_data: 142 return self._resource_data['disabled'] 143 return False
A log exclusion entry
project_id: str
266 @property 267 def project_id(self) -> str: 268 """Project id (not project number).""" 269 return self._project_id
Project id (not project number).
146def query(project_id: str, resource_type: str, log_name: str, 147 filter_str: str) -> LogsQuery: 148 # Aggregate by project_id, resource_type, log_name 149 job_key = (project_id, resource_type, log_name) 150 job = jobs_todo.setdefault( 151 job_key, 152 _LogsQueryJob( 153 project_id=project_id, 154 resource_type=resource_type, 155 log_name=log_name, 156 filters=set(), 157 )) 158 job.filters.add(filter_str) 159 return LogsQuery(job=job)
@caching.cached_api_call
def
realtime_query(project_id, filter_str, start_time, end_time, disable_paging=False):
249@caching.cached_api_call 250def realtime_query(project_id, 251 filter_str, 252 start_time, 253 end_time, 254 disable_paging=False): 255 """Intended for use in only runbooks. use logs.query() for lint rules.""" 256 logging_api = apis.get_api('logging', 'v2', project_id) 257 258 filter_lines = [filter_str] 259 filter_lines.append('timestamp>"%s"' % 260 start_time.isoformat(timespec='seconds')) 261 filter_lines.append('timestamp<"%s"' % end_time.isoformat(timespec='seconds')) 262 filter_str = '\n'.join(filter_lines) 263 logging.debug('searching logs in project %s for logs between %s and %s', 264 project_id, str(start_time), str(end_time)) 265 deque = Deque() 266 req = logging_api.entries().list( 267 body={ 268 'resourceNames': [f'projects/{project_id}'], 269 'filter': filter_str, 270 'orderBy': 'timestamp desc', 271 'pageSize': config.get('logging_page_size') 272 }) 273 fetched_entries_count = 0 274 query_pages = 0 275 query_start_time = datetime.datetime.now() 276 while req is not None: 277 query_pages += 1 278 res = _ratelimited_execute(req) 279 if 'entries' in res: 280 for e in res['entries']: 281 fetched_entries_count += 1 282 deque.appendleft(e) 283 284 # Verify that we aren't above limits, exit otherwise. 285 if fetched_entries_count > config.get('logging_fetch_max_entries'): 286 logging.warning( 287 'maximum number of log entries (%d) reached (project: %s, query:' 288 ' %s).', 289 config.get('logging_fetch_max_entries'), 290 project_id, 291 filter_str.replace('\n', ' AND '), 292 ) 293 return deque 294 run_time = (datetime.datetime.now() - query_start_time).total_seconds() 295 if run_time >= config.get('logging_fetch_max_time_seconds'): 296 logging.warning( 297 'maximum query runtime for log query reached (project: %s, query:' 298 ' %s).', 299 project_id, 300 filter_str.replace('\n', ' AND '), 301 ) 302 return deque 303 if disable_paging: 304 break 305 req = logging_api.entries().list_next(req, res) 306 if req is not None: 307 logging.debug( 308 'still fetching logs (project: %s, max wait: %ds)', 309 project_id, 310 config.get('logging_fetch_max_time_seconds') - run_time, 311 ) 312 313 query_end_time = datetime.datetime.now() 314 logging.debug( 315 'logging query run time: %s, pages: %d, query: %s', 316 query_end_time - query_start_time, 317 query_pages, 318 filter_str.replace('\n', ' AND '), 319 ) 320 321 return deque
Intended for use in only runbooks. use logs.query() for lint rules.
def
execute_queries( query_executor: gcpdiag.executor.ContextAwareExecutor, context: gcpdiag.models.Context):
def
log_entry_timestamp(log_entry: Mapping[str, Any]) -> datetime.datetime:
333def log_entry_timestamp(log_entry: Mapping[str, Any]) -> datetime.datetime: 334 # Use receiveTimestamp so that we don't have any time synchronization issues 335 # (i.e. don't trust the timestamp field) 336 timestamp = log_entry.get('receiveTimestamp', None) 337 if timestamp: 338 return dateutil.parser.parse(timestamp) 339 return timestamp
def
format_log_entry(log_entry: dict) -> str:
342def format_log_entry(log_entry: dict) -> str: 343 """Format a log_entry, as returned by LogsQuery.entries to a simple one-line 344 string with the date and message.""" 345 log_message = None 346 if 'jsonPayload' in log_entry: 347 for key in ['message', 'MESSAGE']: 348 if key in log_entry['jsonPayload']: 349 log_message = log_entry['jsonPayload'][key] 350 break 351 if log_message is None: 352 log_message = log_entry.get('textPayload') 353 log_date = log_entry_timestamp(log_entry) 354 log_date_str = log_date.astimezone().isoformat(sep=' ', timespec='seconds') 355 return f'{log_date_str}: {log_message}'
Format a log_entry, as returned by LogsQuery.entries to a simple one-line string with the date and message.
358def exclusions(project_id: str) -> Union[List[LogExclusion], None]: 359 logging_api = apis.get_api('logging', 'v2', project_id) 360 if not apis.is_enabled(project_id, 'logging'): 361 return None 362 363 log_exclusions: List[LogExclusion] = [] 364 365 fetched_entries_count = 0 366 req = logging_api.exclusions().list(parent=f'projects/{project_id}') 367 while req is not None: 368 res = req.execute(num_retries=config.API_RETRIES) 369 fetched_entries_count += 1 370 if res: 371 for log_exclusion_resp in res['exclusions']: 372 log_exclusions.append(LogExclusion(project_id, log_exclusion_resp)) 373 req = logging_api.exclusions().list_next(req, res) 374 if req is not None: 375 # pylint: disable=logging-fstring-interpolation 376 logging.debug(f'still fetching log exclusions for project {project_id}') 377 # pylint: enable=logging-fstring-interpolation 378 return log_exclusions