gcpdiag.queries.logs
Queries related to Cloud Logging. The main functionality is querying log entries, which is supposed to be used as follows: 1. Call query() with the logs query parameters that you need. This returns a LogsQuery object which can be used to retrieve the logs later. 2. Call execute_queries() to execute all log query jobs. Similar queries will be grouped together to minimize the number of required API calls. Multiple queries will be done in parallel, while always respecting the Cloud Logging limit of 60 queries per 60 seconds. 3. Use the entries property on the LogsQuery object to iterate over the fetched logs. Note that the entries are not guaranteed to be filtered by what was given in the "filter_str" argument to query(), you will need to filter out the entries in code as well when iterating over the log entries. Side note: this module is not called 'logging' to avoid using the same name as the standard python library for logging.
class
LogsQuery:
65class LogsQuery: 66 """A log search job that was started with prefetch_logs().""" 67 job: _LogsQueryJob 68 69 def __init__(self, job): 70 self.job = job 71 72 @property 73 def entries(self) -> Sequence: 74 if not self.job.future: 75 raise RuntimeError( 76 'log query wasn\'t executed. did you forget to call execute_queries()?' 77 ) 78 elif self.job.future.running(): 79 logging.info( 80 'waiting for logs query results (project: %s, resource type: %s)', 81 self.job.project_id, self.job.resource_type) 82 return self.job.future.result()
A log search job that was started with prefetch_logs().
entries: Sequence
72 @property 73 def entries(self) -> Sequence: 74 if not self.job.future: 75 raise RuntimeError( 76 'log query wasn\'t executed. did you forget to call execute_queries()?' 77 ) 78 elif self.job.future.running(): 79 logging.info( 80 'waiting for logs query results (project: %s, resource type: %s)', 81 self.job.project_id, self.job.resource_type) 82 return self.job.future.result()
jobs_todo: Dict[Tuple[str, str, str], gcpdiag.queries.logs._LogsQueryJob] =
{}
class
LogEntryShort:
88class LogEntryShort: 89 """A common log entry""" 90 _text: str 91 _timestamp: Optional[datetime.datetime] 92 93 def __init__(self, raw_entry): 94 if isinstance(raw_entry, dict): 95 self._text = get_path(raw_entry, ('textPayload',), default='') 96 self._timestamp = log_entry_timestamp(raw_entry) 97 98 if isinstance(raw_entry, str): 99 self._text = raw_entry 100 # we could extract timestamp from serial entries 101 # but they are not always present 102 # and may be unreliable as we don't know the system clock setting 103 self._timestamp = None 104 105 @property 106 def text(self): 107 return self._text 108 109 @property 110 def timestamp(self): 111 return self._timestamp 112 113 @property 114 def timestamp_iso(self): 115 if self._timestamp: 116 return self._timestamp.astimezone().isoformat(sep=' ', timespec='seconds') 117 return None
A common log entry
LogEntryShort(raw_entry)
93 def __init__(self, raw_entry): 94 if isinstance(raw_entry, dict): 95 self._text = get_path(raw_entry, ('textPayload',), default='') 96 self._timestamp = log_entry_timestamp(raw_entry) 97 98 if isinstance(raw_entry, str): 99 self._text = raw_entry 100 # we could extract timestamp from serial entries 101 # but they are not always present 102 # and may be unreliable as we don't know the system clock setting 103 self._timestamp = None
120def query(project_id: str, resource_type: str, log_name: str, 121 filter_str: str) -> LogsQuery: 122 # Aggregate by project_id, resource_type, log_name 123 job_key = (project_id, resource_type, log_name) 124 job = jobs_todo.setdefault( 125 job_key, 126 _LogsQueryJob( 127 project_id=project_id, 128 resource_type=resource_type, 129 log_name=log_name, 130 filters=set(), 131 )) 132 job.filters.add(filter_str) 133 return LogsQuery(job=job)
def
realtime_query(project_id, filter_str, start_time_utc, end_time_utc):
218def realtime_query(project_id, filter_str, start_time_utc, end_time_utc): 219 """Inteded for use in only runbooks. use logs.query() for lint rules.""" 220 thread = threading.current_thread() 221 thread.name = f'log_query:{project_id}' 222 logging_api = apis.get_api('logging', 'v2', project_id) 223 224 # Convert "within" relative time to an absolute timestamp. 225 filter_lines = [filter_str] 226 filter_lines.append('timestamp>"%s"' % 227 start_time_utc.isoformat(timespec='seconds')) 228 filter_lines.append('timestamp<"%s"' % 229 end_time_utc.isoformat(timespec='seconds')) 230 filter_str = '\n'.join(filter_lines) 231 logging.info('searching logs in project %s for logs between %s and %s', 232 project_id, str(start_time_utc), str(end_time_utc)) 233 # Fetch all logs and put the results in temporary storage (diskcache.Deque) 234 deque = caching.get_tmp_deque('tmp-logs-') 235 req = logging_api.entries().list( 236 body={ 237 'resourceNames': [f'projects/{project_id}'], 238 'filter': filter_str, 239 'orderBy': 'timestamp desc', 240 'pageSize': config.get('logging_page_size') 241 }) 242 fetched_entries_count = 0 243 query_pages = 0 244 query_start_time = datetime.datetime.now() 245 while req is not None: 246 query_pages += 1 247 res = _ratelimited_execute(req) 248 if 'entries' in res: 249 for e in res['entries']: 250 fetched_entries_count += 1 251 deque.appendleft(e) 252 253 # Verify that we aren't above limits, exit otherwise. 254 if fetched_entries_count > config.get('logging_fetch_max_entries'): 255 logging.warning( 256 'maximum number of log entries (%d) reached (project: %s, query: %s).', 257 config.get('logging_fetch_max_entries'), project_id, 258 filter_str.replace('\n', ' AND ')) 259 return deque 260 run_time = (datetime.datetime.now() - query_start_time).total_seconds() 261 if run_time >= config.get('logging_fetch_max_time_seconds'): 262 logging.warning( 263 'maximum query runtime for log query reached (project: %s, query: %s).', 264 project_id, filter_str.replace('\n', ' AND ')) 265 return deque 266 req = logging_api.entries().list_next(req, res) 267 if req is not None: 268 logging.info('still fetching logs (project: %s, max wait: %ds)', 269 project_id, 270 config.get('logging_fetch_max_time_seconds') - run_time) 271 272 query_end_time = datetime.datetime.now() 273 logging.debug('logging query run time: %s, pages: %d, query: %s', 274 query_end_time - query_start_time, query_pages, 275 filter_str.replace('\n', ' AND ')) 276 277 return deque
Inteded for use in only runbooks. use logs.query() for lint rules.
def
execute_queries(executor: concurrent.futures._base.Executor):
def
log_entry_timestamp(log_entry: Mapping[str, Any]) -> datetime.datetime:
288def log_entry_timestamp(log_entry: Mapping[str, Any]) -> datetime.datetime: 289 # Use receiveTimestamp so that we don't have any time synchronization issues 290 # (i.e. don't trust the timestamp field) 291 timestamp = log_entry.get('receiveTimestamp', None) 292 if timestamp: 293 return dateutil.parser.parse(timestamp) 294 return timestamp
def
format_log_entry(log_entry: dict) -> str:
297def format_log_entry(log_entry: dict) -> str: 298 """Format a log_entry, as returned by LogsQuery.entries to a simple one-line 299 string with the date and message.""" 300 log_message = None 301 if 'jsonPayload' in log_entry: 302 for key in ['message', 'MESSAGE']: 303 if key in log_entry['jsonPayload']: 304 log_message = log_entry['jsonPayload'][key] 305 break 306 if log_message is None: 307 log_message = log_entry.get('textPayload') 308 log_date = log_entry_timestamp(log_entry) 309 log_date_str = log_date.astimezone().isoformat(sep=' ', timespec='seconds') 310 return f'{log_date_str}: {log_message}'
Format a log_entry, as returned by LogsQuery.entries to a simple one-line string with the date and message.