gcpdiag.queries.logs

Queries related to Cloud Logging. The main functionality is querying log entries, which is supposed to be used as follows: 1. Call query() with the logs query parameters that you need. This returns a LogsQuery object which can be used to retrieve the logs later. 2. Call execute_queries() to execute all log query jobs. Similar queries will be grouped together to minimize the number of required API calls. Multiple queries will be done in parallel, while always respecting the Cloud Logging limit of 60 queries per 60 seconds. 3. Use the entries property on the LogsQuery object to iterate over the fetched logs. Note that the entries are not guaranteed to be filtered by what was given in the "filter_str" argument to query(), you will need to filter out the entries in code as well when iterating over the log entries. Side note: this module is not called 'logging' to avoid using the same name as the standard python library for logging.
class LogsQuery:
67class LogsQuery:
68  """A log search job that was started with prefetch_logs()."""
69  job: _LogsQueryJob
70
71  def __init__(self, job):
72    self.job = job
73
74  @property
75  def entries(self) -> Sequence:
76    if not self.job.future:
77      raise RuntimeError(
78          'log query was\'t executed. did you forget to call execute_queries()?'
79      )
80    elif self.job.future.running():
81      logging.info(
82          'waiting for logs query results (project: %s, resource type: %s)',
83          self.job.project_id, self.job.resource_type)
84    return self.job.future.result()

A log search job that was started with prefetch_logs().

LogsQuery(job)
71  def __init__(self, job):
72    self.job = job
job: gcpdiag.queries.logs._LogsQueryJob
entries: Sequence
74  @property
75  def entries(self) -> Sequence:
76    if not self.job.future:
77      raise RuntimeError(
78          'log query was\'t executed. did you forget to call execute_queries()?'
79      )
80    elif self.job.future.running():
81      logging.info(
82          'waiting for logs query results (project: %s, resource type: %s)',
83          self.job.project_id, self.job.resource_type)
84    return self.job.future.result()
jobs_todo: Dict[Tuple[str, str, str], gcpdiag.queries.logs._LogsQueryJob] = {}
class LogEntryShort:
 90class LogEntryShort:
 91  """A common log entry"""
 92  _text: str
 93  _timestamp: Optional[datetime.datetime]
 94
 95  def __init__(self, raw_entry):
 96    if isinstance(raw_entry, dict):
 97      self._text = get_path(raw_entry, ('textPayload',), default='')
 98      self._timestamp = log_entry_timestamp(raw_entry)
 99
100    if isinstance(raw_entry, str):
101      self._text = raw_entry
102      # we could extract timestamp from serial entries
103      # but they are not always present
104      # and may be unreliable as we don't know the system clock setting
105      self._timestamp = None
106
107  @property
108  def text(self):
109    return self._text
110
111  @property
112  def timestamp(self):
113    return self._timestamp
114
115  @property
116  def timestamp_iso(self):
117    if self._timestamp:
118      return self._timestamp.astimezone().isoformat(sep=' ', timespec='seconds')
119    return None

A common log entry

LogEntryShort(raw_entry)
 95  def __init__(self, raw_entry):
 96    if isinstance(raw_entry, dict):
 97      self._text = get_path(raw_entry, ('textPayload',), default='')
 98      self._timestamp = log_entry_timestamp(raw_entry)
 99
100    if isinstance(raw_entry, str):
101      self._text = raw_entry
102      # we could extract timestamp from serial entries
103      # but they are not always present
104      # and may be unreliable as we don't know the system clock setting
105      self._timestamp = None
text
107  @property
108  def text(self):
109    return self._text
timestamp
111  @property
112  def timestamp(self):
113    return self._timestamp
timestamp_iso
115  @property
116  def timestamp_iso(self):
117    if self._timestamp:
118      return self._timestamp.astimezone().isoformat(sep=' ', timespec='seconds')
119    return None
class LogExclusion(gcpdiag.models.Resource):
122class LogExclusion(models.Resource):
123  """A log exclusion entry"""
124  _resource_data: dict
125  project_id: str
126
127  def __init__(self, project_id: str, resource_data: dict):
128    super().__init__(project_id)
129    self._resource_data = resource_data
130
131  @property
132  def full_path(self) -> str:
133    return self._resource_data['name']
134
135  @property
136  def filter(self) -> str:
137    return self._resource_data['filter']
138
139  @property
140  def disabled(self) -> bool:
141    if 'disabled' in self._resource_data:
142      return self._resource_data['disabled']
143    return False

A log exclusion entry

LogExclusion(project_id: str, resource_data: dict)
127  def __init__(self, project_id: str, resource_data: dict):
128    super().__init__(project_id)
129    self._resource_data = resource_data
project_id: str
249  @property
250  def project_id(self) -> str:
251    """Project id (not project number)."""
252    return self._project_id

Project id (not project number).

full_path: str
131  @property
132  def full_path(self) -> str:
133    return self._resource_data['name']

Returns the full path of this resource.

Example: 'projects/gcpdiag-gke-1-9b90/zones/europe-west4-a/clusters/gke1'

filter: str
135  @property
136  def filter(self) -> str:
137    return self._resource_data['filter']
disabled: bool
139  @property
140  def disabled(self) -> bool:
141    if 'disabled' in self._resource_data:
142      return self._resource_data['disabled']
143    return False
def query( project_id: str, resource_type: str, log_name: str, filter_str: str) -> LogsQuery:
146def query(project_id: str, resource_type: str, log_name: str,
147          filter_str: str) -> LogsQuery:
148  # Aggregate by project_id, resource_type, log_name
149  job_key = (project_id, resource_type, log_name)
150  job = jobs_todo.setdefault(
151      job_key,
152      _LogsQueryJob(
153          project_id=project_id,
154          resource_type=resource_type,
155          log_name=log_name,
156          filters=set(),
157      ))
158  job.filters.add(filter_str)
159  return LogsQuery(job=job)
@caching.cached_api_call
def realtime_query(project_id, filter_str, start_time, end_time, disable_paging=False):
249@caching.cached_api_call
250def realtime_query(project_id,
251                   filter_str,
252                   start_time,
253                   end_time,
254                   disable_paging=False):
255  """Intended for use in only runbooks. use logs.query() for lint rules."""
256  logging_api = apis.get_api('logging', 'v2', project_id)
257
258  filter_lines = [filter_str]
259  filter_lines.append('timestamp>"%s"' %
260                      start_time.isoformat(timespec='seconds'))
261  filter_lines.append('timestamp<"%s"' % end_time.isoformat(timespec='seconds'))
262  filter_str = '\n'.join(filter_lines)
263  logging.info('searching logs in project %s for logs between %s and %s',
264               project_id, str(start_time), str(end_time))
265  deque = Deque()
266  req = logging_api.entries().list(
267      body={
268          'resourceNames': [f'projects/{project_id}'],
269          'filter': filter_str,
270          'orderBy': 'timestamp desc',
271          'pageSize': config.get('logging_page_size')
272      })
273  fetched_entries_count = 0
274  query_pages = 0
275  query_start_time = datetime.datetime.now()
276  while req is not None:
277    query_pages += 1
278    res = _ratelimited_execute(req)
279    if 'entries' in res:
280      for e in res['entries']:
281        fetched_entries_count += 1
282        deque.appendleft(e)
283
284    # Verify that we aren't above limits, exit otherwise.
285    if fetched_entries_count > config.get('logging_fetch_max_entries'):
286      logging.warning(
287          'maximum number of log entries (%d) reached (project: %s, query: %s).',
288          config.get('logging_fetch_max_entries'), project_id,
289          filter_str.replace('\n', ' AND '))
290      return deque
291    run_time = (datetime.datetime.now() - query_start_time).total_seconds()
292    if run_time >= config.get('logging_fetch_max_time_seconds'):
293      logging.warning(
294          'maximum query runtime for log query reached (project: %s, query: %s).',
295          project_id, filter_str.replace('\n', ' AND '))
296      return deque
297    if disable_paging:
298      break
299    req = logging_api.entries().list_next(req, res)
300    if req is not None:
301      logging.info('still fetching logs (project: %s, max wait: %ds)',
302                   project_id,
303                   config.get('logging_fetch_max_time_seconds') - run_time)
304
305  query_end_time = datetime.datetime.now()
306  logging.debug('logging query run time: %s, pages: %d, query: %s',
307                query_end_time - query_start_time, query_pages,
308                filter_str.replace('\n', ' AND '))
309
310  return deque

Intended for use in only runbooks. use logs.query() for lint rules.

def execute_queries(executor: concurrent.futures._base.Executor):
313def execute_queries(executor: concurrent.futures.Executor):
314  global jobs_todo
315  jobs_executing = jobs_todo
316  jobs_todo = {}
317  for job in jobs_executing.values():
318    job.future = executor.submit(_execute_query_job, job)
def log_entry_timestamp(log_entry: Mapping[str, Any]) -> datetime.datetime:
321def log_entry_timestamp(log_entry: Mapping[str, Any]) -> datetime.datetime:
322  # Use receiveTimestamp so that we don't have any time synchronization issues
323  # (i.e. don't trust the timestamp field)
324  timestamp = log_entry.get('receiveTimestamp', None)
325  if timestamp:
326    return dateutil.parser.parse(timestamp)
327  return timestamp
def format_log_entry(log_entry: dict) -> str:
330def format_log_entry(log_entry: dict) -> str:
331  """Format a log_entry, as returned by LogsQuery.entries to a simple one-line
332  string with the date and message."""
333  log_message = None
334  if 'jsonPayload' in log_entry:
335    for key in ['message', 'MESSAGE']:
336      if key in log_entry['jsonPayload']:
337        log_message = log_entry['jsonPayload'][key]
338        break
339  if log_message is None:
340    log_message = log_entry.get('textPayload')
341  log_date = log_entry_timestamp(log_entry)
342  log_date_str = log_date.astimezone().isoformat(sep=' ', timespec='seconds')
343  return f'{log_date_str}: {log_message}'

Format a log_entry, as returned by LogsQuery.entries to a simple one-line string with the date and message.

def exclusions(project_id: str) -> Optional[List[LogExclusion]]:
346def exclusions(project_id: str) -> Union[List[LogExclusion], None]:
347  logging_api = apis.get_api('logging', 'v2', project_id)
348  if not apis.is_enabled(project_id, 'logging'):
349    return None
350
351  log_exclusions: List[LogExclusion] = []
352
353  fetched_entries_count = 0
354  req = logging_api.exclusions().list(parent=f'projects/{project_id}')
355  while req is not None:
356    res = req.execute(num_retries=config.API_RETRIES)
357    fetched_entries_count += 1
358    if res:
359      for log_exclusion_resp in res['exclusions']:
360        log_exclusions.append(LogExclusion(project_id, log_exclusion_resp))
361    req = logging_api.exclusions().list_next(req, res)
362    if req is not None:
363      # pylint: disable=logging-fstring-interpolation
364      logging.info(f'still fetching log exclusions for project {project_id}')
365      # pylint: enable=logging-fstring-interpolation
366  return log_exclusions