gcpdiag.queries.logs
Queries related to Cloud Logging. The main functionality is querying log entries, which is supposed to be used as follows: 1. Call query() with the logs query parameters that you need. This returns a LogsQuery object which can be used to retrieve the logs later. 2. Call execute_queries() to execute all log query jobs. Similar queries will be grouped together to minimize the number of required API calls. Multiple queries will be done in parallel, while always respecting the Cloud Logging limit of 60 queries per 60 seconds. 3. Use the entries property on the LogsQuery object to iterate over the fetched logs. Note that the entries are not guaranteed to be filtered by what was given in the "filter_str" argument to query(), you will need to filter out the entries in code as well when iterating over the log entries. Side note: this module is not called 'logging' to avoid using the same name as the standard python library for logging.
class
LogsQuery:
66class LogsQuery: 67 """A log search job that was started with prefetch_logs().""" 68 job: _LogsQueryJob 69 70 def __init__(self, job): 71 self.job = job 72 73 @property 74 def entries(self) -> Sequence: 75 if not self.job.future: 76 raise RuntimeError( 77 'log query wasn\'t executed. did you forget to call execute_queries()?' 78 ) 79 elif self.job.future.running(): 80 logging.info( 81 'waiting for logs query results (project: %s, resource type: %s)', 82 self.job.project_id, self.job.resource_type) 83 return self.job.future.result()
A log search job that was started with prefetch_logs().
entries: Sequence
73 @property 74 def entries(self) -> Sequence: 75 if not self.job.future: 76 raise RuntimeError( 77 'log query wasn\'t executed. did you forget to call execute_queries()?' 78 ) 79 elif self.job.future.running(): 80 logging.info( 81 'waiting for logs query results (project: %s, resource type: %s)', 82 self.job.project_id, self.job.resource_type) 83 return self.job.future.result()
jobs_todo: Dict[Tuple[str, str, str], gcpdiag.queries.logs._LogsQueryJob] =
{}
class
LogEntryShort:
89class LogEntryShort: 90 """A common log entry""" 91 _text: str 92 _timestamp: Optional[datetime.datetime] 93 94 def __init__(self, raw_entry): 95 if isinstance(raw_entry, dict): 96 self._text = get_path(raw_entry, ('textPayload',), default='') 97 self._timestamp = log_entry_timestamp(raw_entry) 98 99 if isinstance(raw_entry, str): 100 self._text = raw_entry 101 # we could extract timestamp from serial entries 102 # but they are not always present 103 # and may be unreliable as we don't know the system clock setting 104 self._timestamp = None 105 106 @property 107 def text(self): 108 return self._text 109 110 @property 111 def timestamp(self): 112 return self._timestamp 113 114 @property 115 def timestamp_iso(self): 116 if self._timestamp: 117 return self._timestamp.astimezone().isoformat(sep=' ', timespec='seconds') 118 return None
A common log entry
LogEntryShort(raw_entry)
94 def __init__(self, raw_entry): 95 if isinstance(raw_entry, dict): 96 self._text = get_path(raw_entry, ('textPayload',), default='') 97 self._timestamp = log_entry_timestamp(raw_entry) 98 99 if isinstance(raw_entry, str): 100 self._text = raw_entry 101 # we could extract timestamp from serial entries 102 # but they are not always present 103 # and may be unreliable as we don't know the system clock setting 104 self._timestamp = None
class
LogExclusion(gcpdiag.models.Resource):
121class LogExclusion(models.Resource): 122 """A log exclusion entry""" 123 _resource_data: dict 124 project_id: str 125 126 def __init__(self, project_id: str, resource_data: dict): 127 super().__init__(project_id) 128 self._resource_data = resource_data 129 130 @property 131 def full_path(self) -> str: 132 return self._resource_data['name'] 133 134 @property 135 def filter(self) -> str: 136 return self._resource_data['filter'] 137 138 @property 139 def disabled(self) -> bool: 140 if 'disabled' in self._resource_data: 141 return self._resource_data['disabled'] 142 return False
A log exclusion entry
project_id: str
249 @property 250 def project_id(self) -> str: 251 """Project id (not project number).""" 252 return self._project_id
Project id (not project number).
145def query(project_id: str, resource_type: str, log_name: str, 146 filter_str: str) -> LogsQuery: 147 # Aggregate by project_id, resource_type, log_name 148 job_key = (project_id, resource_type, log_name) 149 job = jobs_todo.setdefault( 150 job_key, 151 _LogsQueryJob( 152 project_id=project_id, 153 resource_type=resource_type, 154 log_name=log_name, 155 filters=set(), 156 )) 157 job.filters.add(filter_str) 158 return LogsQuery(job=job)
@caching.cached_api_call
def
realtime_query(project_id, filter_str, start_time_utc, end_time_utc):
243@caching.cached_api_call 244def realtime_query(project_id, filter_str, start_time_utc, end_time_utc): 245 """Intended for use in only runbooks. use logs.query() for lint rules.""" 246 logging_api = apis.get_api('logging', 'v2', project_id) 247 248 filter_lines = [filter_str] 249 filter_lines.append('timestamp>"%s"' % 250 start_time_utc.isoformat(timespec='seconds')) 251 filter_lines.append('timestamp<"%s"' % 252 end_time_utc.isoformat(timespec='seconds')) 253 filter_str = '\n'.join(filter_lines) 254 logging.info('searching logs in project %s for logs between %s and %s', 255 project_id, str(start_time_utc), str(end_time_utc)) 256 deque = Deque() 257 req = logging_api.entries().list( 258 body={ 259 'resourceNames': [f'projects/{project_id}'], 260 'filter': filter_str, 261 'orderBy': 'timestamp desc', 262 'pageSize': config.get('logging_page_size') 263 }) 264 fetched_entries_count = 0 265 query_pages = 0 266 query_start_time = datetime.datetime.now() 267 while req is not None: 268 query_pages += 1 269 res = _ratelimited_execute(req) 270 if 'entries' in res: 271 for e in res['entries']: 272 fetched_entries_count += 1 273 deque.appendleft(e) 274 275 # Verify that we aren't above limits, exit otherwise. 276 if fetched_entries_count > config.get('logging_fetch_max_entries'): 277 logging.warning( 278 'maximum number of log entries (%d) reached (project: %s, query: %s).', 279 config.get('logging_fetch_max_entries'), project_id, 280 filter_str.replace('\n', ' AND ')) 281 return deque 282 run_time = (datetime.datetime.now() - query_start_time).total_seconds() 283 if run_time >= config.get('logging_fetch_max_time_seconds'): 284 logging.warning( 285 'maximum query runtime for log query reached (project: %s, query: %s).', 286 project_id, filter_str.replace('\n', ' AND ')) 287 return deque 288 req = logging_api.entries().list_next(req, res) 289 if req is not None: 290 logging.info('still fetching logs (project: %s, max wait: %ds)', 291 project_id, 292 config.get('logging_fetch_max_time_seconds') - run_time) 293 294 query_end_time = datetime.datetime.now() 295 logging.debug('logging query run time: %s, pages: %d, query: %s', 296 query_end_time - query_start_time, query_pages, 297 filter_str.replace('\n', ' AND ')) 298 299 return deque
Intended for use in only runbooks. use logs.query() for lint rules.
def
execute_queries(executor: concurrent.futures._base.Executor):
def
log_entry_timestamp(log_entry: Mapping[str, Any]) -> datetime.datetime:
310def log_entry_timestamp(log_entry: Mapping[str, Any]) -> datetime.datetime: 311 # Use receiveTimestamp so that we don't have any time synchronization issues 312 # (i.e. don't trust the timestamp field) 313 timestamp = log_entry.get('receiveTimestamp', None) 314 if timestamp: 315 return dateutil.parser.parse(timestamp) 316 return timestamp
def
format_log_entry(log_entry: dict) -> str:
319def format_log_entry(log_entry: dict) -> str: 320 """Format a log_entry, as returned by LogsQuery.entries to a simple one-line 321 string with the date and message.""" 322 log_message = None 323 if 'jsonPayload' in log_entry: 324 for key in ['message', 'MESSAGE']: 325 if key in log_entry['jsonPayload']: 326 log_message = log_entry['jsonPayload'][key] 327 break 328 if log_message is None: 329 log_message = log_entry.get('textPayload') 330 log_date = log_entry_timestamp(log_entry) 331 log_date_str = log_date.astimezone().isoformat(sep=' ', timespec='seconds') 332 return f'{log_date_str}: {log_message}'
Format a log_entry, as returned by LogsQuery.entries to a simple one-line string with the date and message.
335def exclusions(project_id: str) -> Union[List[LogExclusion], None]: 336 logging_api = apis.get_api('logging', 'v2', project_id) 337 if not apis.is_enabled(project_id, 'logging'): 338 return None 339 340 log_exclusions: List[LogExclusion] = [] 341 342 fetched_entries_count = 0 343 req = logging_api.exclusions().list(parent=f'projects/{project_id}') 344 while req is not None: 345 res = req.execute(num_retries=config.API_RETRIES) 346 fetched_entries_count += 1 347 if res: 348 for log_exclusion_resp in res['exclusions']: 349 log_exclusions.append(LogExclusion(project_id, log_exclusion_resp)) 350 req = logging_api.exclusions().list_next(req, res) 351 if req is not None: 352 # pylint: disable=logging-fstring-interpolation 353 logging.info(f'still fetching log exclusions for project {project_id}') 354 # pylint: enable=logging-fstring-interpolation 355 return log_exclusions