gcpdiag.queries.logs
Queries related to Cloud Logging. The main functionality is querying log entries, which is supposed to be used as follows: 1. Call query() with the logs query parameters that you need. This returns a LogsQuery object which can be used to retrieve the logs later. 2. Call execute_queries() to execute all log query jobs. Similar queries will be grouped together to minimize the number of required API calls. Multiple queries will be done in parallel, while always respecting the Cloud Logging limit of 60 queries per 60 seconds. 3. Use the entries property on the LogsQuery object to iterate over the fetched logs. Note that the entries are not guaranteed to be filtered by what was given in the "filter_str" argument to query(), you will need to filter out the entries in code as well when iterating over the log entries. Side note: this module is not called 'logging' to avoid using the same name as the standard python library for logging.
class
LogsQuery:
67class LogsQuery: 68 """A log search job that was started with prefetch_logs().""" 69 job: _LogsQueryJob 70 71 def __init__(self, job): 72 self.job = job 73 74 @property 75 def entries(self) -> Sequence: 76 if not self.job.future: 77 raise RuntimeError( 78 'log query was\'t executed. did you forget to call execute_queries()?' 79 ) 80 elif self.job.future.running(): 81 logging.info( 82 'waiting for logs query results (project: %s, resource type: %s)', 83 self.job.project_id, self.job.resource_type) 84 return self.job.future.result()
A log search job that was started with prefetch_logs().
entries: Sequence
74 @property 75 def entries(self) -> Sequence: 76 if not self.job.future: 77 raise RuntimeError( 78 'log query was\'t executed. did you forget to call execute_queries()?' 79 ) 80 elif self.job.future.running(): 81 logging.info( 82 'waiting for logs query results (project: %s, resource type: %s)', 83 self.job.project_id, self.job.resource_type) 84 return self.job.future.result()
jobs_todo: Dict[Tuple[str, str, str], gcpdiag.queries.logs._LogsQueryJob] =
{}
class
LogEntryShort:
90class LogEntryShort: 91 """A common log entry""" 92 _text: str 93 _timestamp: Optional[datetime.datetime] 94 95 def __init__(self, raw_entry): 96 if isinstance(raw_entry, dict): 97 self._text = get_path(raw_entry, ('textPayload',), default='') 98 self._timestamp = log_entry_timestamp(raw_entry) 99 100 if isinstance(raw_entry, str): 101 self._text = raw_entry 102 # we could extract timestamp from serial entries 103 # but they are not always present 104 # and may be unreliable as we don't know the system clock setting 105 self._timestamp = None 106 107 @property 108 def text(self): 109 return self._text 110 111 @property 112 def timestamp(self): 113 return self._timestamp 114 115 @property 116 def timestamp_iso(self): 117 if self._timestamp: 118 return self._timestamp.astimezone().isoformat(sep=' ', timespec='seconds') 119 return None
A common log entry
LogEntryShort(raw_entry)
95 def __init__(self, raw_entry): 96 if isinstance(raw_entry, dict): 97 self._text = get_path(raw_entry, ('textPayload',), default='') 98 self._timestamp = log_entry_timestamp(raw_entry) 99 100 if isinstance(raw_entry, str): 101 self._text = raw_entry 102 # we could extract timestamp from serial entries 103 # but they are not always present 104 # and may be unreliable as we don't know the system clock setting 105 self._timestamp = None
class
LogExclusion(gcpdiag.models.Resource):
122class LogExclusion(models.Resource): 123 """A log exclusion entry""" 124 _resource_data: dict 125 project_id: str 126 127 def __init__(self, project_id: str, resource_data: dict): 128 super().__init__(project_id) 129 self._resource_data = resource_data 130 131 @property 132 def full_path(self) -> str: 133 return self._resource_data['name'] 134 135 @property 136 def filter(self) -> str: 137 return self._resource_data['filter'] 138 139 @property 140 def disabled(self) -> bool: 141 if 'disabled' in self._resource_data: 142 return self._resource_data['disabled'] 143 return False
A log exclusion entry
project_id: str
249 @property 250 def project_id(self) -> str: 251 """Project id (not project number).""" 252 return self._project_id
Project id (not project number).
146def query(project_id: str, resource_type: str, log_name: str, 147 filter_str: str) -> LogsQuery: 148 # Aggregate by project_id, resource_type, log_name 149 job_key = (project_id, resource_type, log_name) 150 job = jobs_todo.setdefault( 151 job_key, 152 _LogsQueryJob( 153 project_id=project_id, 154 resource_type=resource_type, 155 log_name=log_name, 156 filters=set(), 157 )) 158 job.filters.add(filter_str) 159 return LogsQuery(job=job)
@caching.cached_api_call
def
realtime_query(project_id, filter_str, start_time, end_time, disable_paging=False):
249@caching.cached_api_call 250def realtime_query(project_id, 251 filter_str, 252 start_time, 253 end_time, 254 disable_paging=False): 255 """Intended for use in only runbooks. use logs.query() for lint rules.""" 256 logging_api = apis.get_api('logging', 'v2', project_id) 257 258 filter_lines = [filter_str] 259 filter_lines.append('timestamp>"%s"' % 260 start_time.isoformat(timespec='seconds')) 261 filter_lines.append('timestamp<"%s"' % end_time.isoformat(timespec='seconds')) 262 filter_str = '\n'.join(filter_lines) 263 logging.info('searching logs in project %s for logs between %s and %s', 264 project_id, str(start_time), str(end_time)) 265 deque = Deque() 266 req = logging_api.entries().list( 267 body={ 268 'resourceNames': [f'projects/{project_id}'], 269 'filter': filter_str, 270 'orderBy': 'timestamp desc', 271 'pageSize': config.get('logging_page_size') 272 }) 273 fetched_entries_count = 0 274 query_pages = 0 275 query_start_time = datetime.datetime.now() 276 while req is not None: 277 query_pages += 1 278 res = _ratelimited_execute(req) 279 if 'entries' in res: 280 for e in res['entries']: 281 fetched_entries_count += 1 282 deque.appendleft(e) 283 284 # Verify that we aren't above limits, exit otherwise. 285 if fetched_entries_count > config.get('logging_fetch_max_entries'): 286 logging.warning( 287 'maximum number of log entries (%d) reached (project: %s, query: %s).', 288 config.get('logging_fetch_max_entries'), project_id, 289 filter_str.replace('\n', ' AND ')) 290 return deque 291 run_time = (datetime.datetime.now() - query_start_time).total_seconds() 292 if run_time >= config.get('logging_fetch_max_time_seconds'): 293 logging.warning( 294 'maximum query runtime for log query reached (project: %s, query: %s).', 295 project_id, filter_str.replace('\n', ' AND ')) 296 return deque 297 if disable_paging: 298 break 299 req = logging_api.entries().list_next(req, res) 300 if req is not None: 301 logging.info('still fetching logs (project: %s, max wait: %ds)', 302 project_id, 303 config.get('logging_fetch_max_time_seconds') - run_time) 304 305 query_end_time = datetime.datetime.now() 306 logging.debug('logging query run time: %s, pages: %d, query: %s', 307 query_end_time - query_start_time, query_pages, 308 filter_str.replace('\n', ' AND ')) 309 310 return deque
Intended for use in only runbooks. use logs.query() for lint rules.
def
execute_queries(executor: concurrent.futures._base.Executor):
def
log_entry_timestamp(log_entry: Mapping[str, Any]) -> datetime.datetime:
321def log_entry_timestamp(log_entry: Mapping[str, Any]) -> datetime.datetime: 322 # Use receiveTimestamp so that we don't have any time synchronization issues 323 # (i.e. don't trust the timestamp field) 324 timestamp = log_entry.get('receiveTimestamp', None) 325 if timestamp: 326 return dateutil.parser.parse(timestamp) 327 return timestamp
def
format_log_entry(log_entry: dict) -> str:
330def format_log_entry(log_entry: dict) -> str: 331 """Format a log_entry, as returned by LogsQuery.entries to a simple one-line 332 string with the date and message.""" 333 log_message = None 334 if 'jsonPayload' in log_entry: 335 for key in ['message', 'MESSAGE']: 336 if key in log_entry['jsonPayload']: 337 log_message = log_entry['jsonPayload'][key] 338 break 339 if log_message is None: 340 log_message = log_entry.get('textPayload') 341 log_date = log_entry_timestamp(log_entry) 342 log_date_str = log_date.astimezone().isoformat(sep=' ', timespec='seconds') 343 return f'{log_date_str}: {log_message}'
Format a log_entry, as returned by LogsQuery.entries to a simple one-line string with the date and message.
346def exclusions(project_id: str) -> Union[List[LogExclusion], None]: 347 logging_api = apis.get_api('logging', 'v2', project_id) 348 if not apis.is_enabled(project_id, 'logging'): 349 return None 350 351 log_exclusions: List[LogExclusion] = [] 352 353 fetched_entries_count = 0 354 req = logging_api.exclusions().list(parent=f'projects/{project_id}') 355 while req is not None: 356 res = req.execute(num_retries=config.API_RETRIES) 357 fetched_entries_count += 1 358 if res: 359 for log_exclusion_resp in res['exclusions']: 360 log_exclusions.append(LogExclusion(project_id, log_exclusion_resp)) 361 req = logging_api.exclusions().list_next(req, res) 362 if req is not None: 363 # pylint: disable=logging-fstring-interpolation 364 logging.info(f'still fetching log exclusions for project {project_id}') 365 # pylint: enable=logging-fstring-interpolation 366 return log_exclusions