gcpdiag.queries.logs

Queries related to Cloud Logging. The main functionality is querying log entries, which is supposed to be used as follows: 1. Call query() with the logs query parameters that you need. This returns a LogsQuery object which can be used to retrieve the logs later. 2. Call execute_queries() to execute all log query jobs. Similar queries will be grouped together to minimize the number of required API calls. Multiple queries will be done in parallel, while always respecting the Cloud Logging limit of 60 queries per 60 seconds. 3. Use the entries property on the LogsQuery object to iterate over the fetched logs. Note that the entries are not guaranteed to be filtered by what was given in the "filter_str" argument to query(), you will need to filter out the entries in code as well when iterating over the log entries. Side note: this module is not called 'logging' to avoid using the same name as the standard python library for logging.
class LogsQuery:
65class LogsQuery:
66  """A log search job that was started with prefetch_logs()."""
67  job: _LogsQueryJob
68
69  def __init__(self, job):
70    self.job = job
71
72  @property
73  def entries(self) -> Sequence:
74    if not self.job.future:
75      raise RuntimeError(
76          'log query wasn\'t executed. did you forget to call execute_queries()?'
77      )
78    elif self.job.future.running():
79      logging.info(
80          'waiting for logs query results (project: %s, resource type: %s)',
81          self.job.project_id, self.job.resource_type)
82    return self.job.future.result()

A log search job that was started with prefetch_logs().

LogsQuery(job)
69  def __init__(self, job):
70    self.job = job
job: gcpdiag.queries.logs._LogsQueryJob
entries: Sequence
72  @property
73  def entries(self) -> Sequence:
74    if not self.job.future:
75      raise RuntimeError(
76          'log query wasn\'t executed. did you forget to call execute_queries()?'
77      )
78    elif self.job.future.running():
79      logging.info(
80          'waiting for logs query results (project: %s, resource type: %s)',
81          self.job.project_id, self.job.resource_type)
82    return self.job.future.result()
jobs_todo: Dict[Tuple[str, str, str], gcpdiag.queries.logs._LogsQueryJob] = {}
class LogEntryShort:
 88class LogEntryShort:
 89  """A common log entry"""
 90  _text: str
 91  _timestamp: Optional[datetime.datetime]
 92
 93  def __init__(self, raw_entry):
 94    if isinstance(raw_entry, dict):
 95      self._text = get_path(raw_entry, ('textPayload',), default='')
 96      self._timestamp = log_entry_timestamp(raw_entry)
 97
 98    if isinstance(raw_entry, str):
 99      self._text = raw_entry
100      # we could extract timestamp from serial entries
101      # but they are not always present
102      # and may be unreliable as we don't know the system clock setting
103      self._timestamp = None
104
105  @property
106  def text(self):
107    return self._text
108
109  @property
110  def timestamp(self):
111    return self._timestamp
112
113  @property
114  def timestamp_iso(self):
115    if self._timestamp:
116      return self._timestamp.astimezone().isoformat(sep=' ', timespec='seconds')
117    return None

A common log entry

LogEntryShort(raw_entry)
 93  def __init__(self, raw_entry):
 94    if isinstance(raw_entry, dict):
 95      self._text = get_path(raw_entry, ('textPayload',), default='')
 96      self._timestamp = log_entry_timestamp(raw_entry)
 97
 98    if isinstance(raw_entry, str):
 99      self._text = raw_entry
100      # we could extract timestamp from serial entries
101      # but they are not always present
102      # and may be unreliable as we don't know the system clock setting
103      self._timestamp = None
text
105  @property
106  def text(self):
107    return self._text
timestamp
109  @property
110  def timestamp(self):
111    return self._timestamp
timestamp_iso
113  @property
114  def timestamp_iso(self):
115    if self._timestamp:
116      return self._timestamp.astimezone().isoformat(sep=' ', timespec='seconds')
117    return None
def query( project_id: str, resource_type: str, log_name: str, filter_str: str) -> LogsQuery:
120def query(project_id: str, resource_type: str, log_name: str,
121          filter_str: str) -> LogsQuery:
122  # Aggregate by project_id, resource_type, log_name
123  job_key = (project_id, resource_type, log_name)
124  job = jobs_todo.setdefault(
125      job_key,
126      _LogsQueryJob(
127          project_id=project_id,
128          resource_type=resource_type,
129          log_name=log_name,
130          filters=set(),
131      ))
132  job.filters.add(filter_str)
133  return LogsQuery(job=job)
def realtime_query(project_id, filter_str, start_time_utc, end_time_utc):
218def realtime_query(project_id, filter_str, start_time_utc, end_time_utc):
219  """Inteded for use in only runbooks. use logs.query() for lint rules."""
220  thread = threading.current_thread()
221  thread.name = f'log_query:{project_id}'
222  logging_api = apis.get_api('logging', 'v2', project_id)
223
224  # Convert "within" relative time to an absolute timestamp.
225  filter_lines = [filter_str]
226  filter_lines.append('timestamp>"%s"' %
227                      start_time_utc.isoformat(timespec='seconds'))
228  filter_lines.append('timestamp<"%s"' %
229                      end_time_utc.isoformat(timespec='seconds'))
230  filter_str = '\n'.join(filter_lines)
231  logging.info('searching logs in project %s for logs between %s and %s',
232               project_id, str(start_time_utc), str(end_time_utc))
233  # Fetch all logs and put the results in temporary storage (diskcache.Deque)
234  deque = caching.get_tmp_deque('tmp-logs-')
235  req = logging_api.entries().list(
236      body={
237          'resourceNames': [f'projects/{project_id}'],
238          'filter': filter_str,
239          'orderBy': 'timestamp desc',
240          'pageSize': config.get('logging_page_size')
241      })
242  fetched_entries_count = 0
243  query_pages = 0
244  query_start_time = datetime.datetime.now()
245  while req is not None:
246    query_pages += 1
247    res = _ratelimited_execute(req)
248    if 'entries' in res:
249      for e in res['entries']:
250        fetched_entries_count += 1
251        deque.appendleft(e)
252
253    # Verify that we aren't above limits, exit otherwise.
254    if fetched_entries_count > config.get('logging_fetch_max_entries'):
255      logging.warning(
256          'maximum number of log entries (%d) reached (project: %s, query: %s).',
257          config.get('logging_fetch_max_entries'), project_id,
258          filter_str.replace('\n', ' AND '))
259      return deque
260    run_time = (datetime.datetime.now() - query_start_time).total_seconds()
261    if run_time >= config.get('logging_fetch_max_time_seconds'):
262      logging.warning(
263          'maximum query runtime for log query reached (project: %s, query: %s).',
264          project_id, filter_str.replace('\n', ' AND '))
265      return deque
266    req = logging_api.entries().list_next(req, res)
267    if req is not None:
268      logging.info('still fetching logs (project: %s, max wait: %ds)',
269                   project_id,
270                   config.get('logging_fetch_max_time_seconds') - run_time)
271
272  query_end_time = datetime.datetime.now()
273  logging.debug('logging query run time: %s, pages: %d, query: %s',
274                query_end_time - query_start_time, query_pages,
275                filter_str.replace('\n', ' AND '))
276
277  return deque

Inteded for use in only runbooks. use logs.query() for lint rules.

def execute_queries(executor: concurrent.futures._base.Executor):
280def execute_queries(executor: concurrent.futures.Executor):
281  global jobs_todo
282  jobs_executing = jobs_todo
283  jobs_todo = {}
284  for job in jobs_executing.values():
285    job.future = executor.submit(_execute_query_job, job)
def log_entry_timestamp(log_entry: Mapping[str, Any]) -> datetime.datetime:
288def log_entry_timestamp(log_entry: Mapping[str, Any]) -> datetime.datetime:
289  # Use receiveTimestamp so that we don't have any time synchronization issues
290  # (i.e. don't trust the timestamp field)
291  timestamp = log_entry.get('receiveTimestamp', None)
292  if timestamp:
293    return dateutil.parser.parse(timestamp)
294  return timestamp
def format_log_entry(log_entry: dict) -> str:
297def format_log_entry(log_entry: dict) -> str:
298  """Format a log_entry, as returned by LogsQuery.entries to a simple one-line
299  string with the date and message."""
300  log_message = None
301  if 'jsonPayload' in log_entry:
302    for key in ['message', 'MESSAGE']:
303      if key in log_entry['jsonPayload']:
304        log_message = log_entry['jsonPayload'][key]
305        break
306  if log_message is None:
307    log_message = log_entry.get('textPayload')
308  log_date = log_entry_timestamp(log_entry)
309  log_date_str = log_date.astimezone().isoformat(sep=' ', timespec='seconds')
310  return f'{log_date_str}: {log_message}'

Format a log_entry, as returned by LogsQuery.entries to a simple one-line string with the date and message.