gcpdiag.queries.logs

Queries related to Cloud Logging. The main functionality is querying log entries, which is supposed to be used as follows: 1. Call query() with the logs query parameters that you need. This returns a LogsQuery object which can be used to retrieve the logs later. 2. Call execute_queries() to execute all log query jobs. Similar queries will be grouped together to minimize the number of required API calls. Multiple queries will be done in parallel, while always respecting the Cloud Logging limit of 60 queries per 60 seconds. 3. Use the entries property on the LogsQuery object to iterate over the fetched logs. Note that the entries are not guaranteed to be filtered by what was given in the "filter_str" argument to query(), you will need to filter out the entries in code as well when iterating over the log entries. Side note: this module is not called 'logging' to avoid using the same name as the standard python library for logging.
class LogsQuery:
67class LogsQuery:
68  """A log search job that was started with prefetch_logs()."""
69  job: _LogsQueryJob
70
71  def __init__(self, job):
72    self.job = job
73
74  @property
75  def entries(self) -> Sequence:
76    if not self.job.future:
77      raise RuntimeError(
78          'log query was\'t executed. did you forget to call execute_queries()?'
79      )
80    elif self.job.future.running():
81      logging.debug(
82          'waiting for logs query results (project: %s, resource type: %s)',
83          self.job.project_id, self.job.resource_type)
84    return self.job.future.result()

A log search job that was started with prefetch_logs().

LogsQuery(job)
71  def __init__(self, job):
72    self.job = job
job: gcpdiag.queries.logs._LogsQueryJob
entries: Sequence
74  @property
75  def entries(self) -> Sequence:
76    if not self.job.future:
77      raise RuntimeError(
78          'log query was\'t executed. did you forget to call execute_queries()?'
79      )
80    elif self.job.future.running():
81      logging.debug(
82          'waiting for logs query results (project: %s, resource type: %s)',
83          self.job.project_id, self.job.resource_type)
84    return self.job.future.result()
jobs_todo: Dict[Tuple[str, str, str], gcpdiag.queries.logs._LogsQueryJob] = {}
class LogEntryShort:
 90class LogEntryShort:
 91  """A common log entry"""
 92  _text: str
 93  _timestamp: Optional[datetime.datetime]
 94
 95  def __init__(self, raw_entry):
 96    if isinstance(raw_entry, dict):
 97      self._text = get_path(raw_entry, ('textPayload',), default='')
 98      self._timestamp = log_entry_timestamp(raw_entry)
 99
100    if isinstance(raw_entry, str):
101      self._text = raw_entry
102      # we could extract timestamp from serial entries
103      # but they are not always present
104      # and may be unreliable as we don't know the system clock setting
105      self._timestamp = None
106
107  @property
108  def text(self):
109    return self._text
110
111  @property
112  def timestamp(self):
113    return self._timestamp
114
115  @property
116  def timestamp_iso(self):
117    if self._timestamp:
118      return self._timestamp.astimezone().isoformat(sep=' ', timespec='seconds')
119    return None

A common log entry

LogEntryShort(raw_entry)
 95  def __init__(self, raw_entry):
 96    if isinstance(raw_entry, dict):
 97      self._text = get_path(raw_entry, ('textPayload',), default='')
 98      self._timestamp = log_entry_timestamp(raw_entry)
 99
100    if isinstance(raw_entry, str):
101      self._text = raw_entry
102      # we could extract timestamp from serial entries
103      # but they are not always present
104      # and may be unreliable as we don't know the system clock setting
105      self._timestamp = None
text
107  @property
108  def text(self):
109    return self._text
timestamp
111  @property
112  def timestamp(self):
113    return self._timestamp
timestamp_iso
115  @property
116  def timestamp_iso(self):
117    if self._timestamp:
118      return self._timestamp.astimezone().isoformat(sep=' ', timespec='seconds')
119    return None
class LogExclusion(gcpdiag.models.Resource):
122class LogExclusion(models.Resource):
123  """A log exclusion entry"""
124  _resource_data: dict
125  project_id: str
126
127  def __init__(self, project_id: str, resource_data: dict):
128    super().__init__(project_id)
129    self._resource_data = resource_data
130
131  @property
132  def full_path(self) -> str:
133    return self._resource_data['name']
134
135  @property
136  def filter(self) -> str:
137    return self._resource_data['filter']
138
139  @property
140  def disabled(self) -> bool:
141    if 'disabled' in self._resource_data:
142      return self._resource_data['disabled']
143    return False

A log exclusion entry

LogExclusion(project_id: str, resource_data: dict)
127  def __init__(self, project_id: str, resource_data: dict):
128    super().__init__(project_id)
129    self._resource_data = resource_data
project_id: str
266  @property
267  def project_id(self) -> str:
268    """Project id (not project number)."""
269    return self._project_id

Project id (not project number).

full_path: str
131  @property
132  def full_path(self) -> str:
133    return self._resource_data['name']

Returns the full path of this resource.

Example: 'projects/gcpdiag-gke-1-9b90/zones/europe-west4-a/clusters/gke1'

filter: str
135  @property
136  def filter(self) -> str:
137    return self._resource_data['filter']
disabled: bool
139  @property
140  def disabled(self) -> bool:
141    if 'disabled' in self._resource_data:
142      return self._resource_data['disabled']
143    return False
def query( project_id: str, resource_type: str, log_name: str, filter_str: str) -> LogsQuery:
146def query(project_id: str, resource_type: str, log_name: str,
147          filter_str: str) -> LogsQuery:
148  # Aggregate by project_id, resource_type, log_name
149  job_key = (project_id, resource_type, log_name)
150  job = jobs_todo.setdefault(
151      job_key,
152      _LogsQueryJob(
153          project_id=project_id,
154          resource_type=resource_type,
155          log_name=log_name,
156          filters=set(),
157      ))
158  job.filters.add(filter_str)
159  return LogsQuery(job=job)
@caching.cached_api_call
def realtime_query(project_id, filter_str, start_time, end_time, disable_paging=False):
249@caching.cached_api_call
250def realtime_query(project_id,
251                   filter_str,
252                   start_time,
253                   end_time,
254                   disable_paging=False):
255  """Intended for use in only runbooks. use logs.query() for lint rules."""
256  logging_api = apis.get_api('logging', 'v2', project_id)
257
258  filter_lines = [filter_str]
259  filter_lines.append('timestamp>"%s"' %
260                      start_time.isoformat(timespec='seconds'))
261  filter_lines.append('timestamp<"%s"' % end_time.isoformat(timespec='seconds'))
262  filter_str = '\n'.join(filter_lines)
263  logging.debug('searching logs in project %s for logs between %s and %s',
264                project_id, str(start_time), str(end_time))
265  deque = Deque()
266  req = logging_api.entries().list(
267      body={
268          'resourceNames': [f'projects/{project_id}'],
269          'filter': filter_str,
270          'orderBy': 'timestamp desc',
271          'pageSize': config.get('logging_page_size')
272      })
273  fetched_entries_count = 0
274  query_pages = 0
275  query_start_time = datetime.datetime.now()
276  while req is not None:
277    query_pages += 1
278    res = _ratelimited_execute(req)
279    if 'entries' in res:
280      for e in res['entries']:
281        fetched_entries_count += 1
282        deque.appendleft(e)
283
284    # Verify that we aren't above limits, exit otherwise.
285    if fetched_entries_count > config.get('logging_fetch_max_entries'):
286      logging.warning(
287          'maximum number of log entries (%d) reached (project: %s, query:'
288          ' %s).',
289          config.get('logging_fetch_max_entries'),
290          project_id,
291          filter_str.replace('\n', ' AND '),
292      )
293      return deque
294    run_time = (datetime.datetime.now() - query_start_time).total_seconds()
295    if run_time >= config.get('logging_fetch_max_time_seconds'):
296      logging.warning(
297          'maximum query runtime for log query reached (project: %s, query:'
298          ' %s).',
299          project_id,
300          filter_str.replace('\n', ' AND '),
301      )
302      return deque
303    if disable_paging:
304      break
305    req = logging_api.entries().list_next(req, res)
306    if req is not None:
307      logging.debug(
308          'still fetching logs (project: %s, max wait: %ds)',
309          project_id,
310          config.get('logging_fetch_max_time_seconds') - run_time,
311      )
312
313  query_end_time = datetime.datetime.now()
314  logging.debug(
315      'logging query run time: %s, pages: %d, query: %s',
316      query_end_time - query_start_time,
317      query_pages,
318      filter_str.replace('\n', ' AND '),
319  )
320
321  return deque

Intended for use in only runbooks. use logs.query() for lint rules.

def execute_queries( query_executor: gcpdiag.executor.ContextAwareExecutor, context: gcpdiag.models.Context):
324def execute_queries(query_executor: executor.ContextAwareExecutor,
325                    context: models.Context):
326  global jobs_todo
327  jobs_executing = jobs_todo
328  jobs_todo = {}
329  for job in jobs_executing.values():
330    job.future = query_executor.submit(_execute_query_job, job, context)
def log_entry_timestamp(log_entry: Mapping[str, Any]) -> datetime.datetime:
333def log_entry_timestamp(log_entry: Mapping[str, Any]) -> datetime.datetime:
334  # Use receiveTimestamp so that we don't have any time synchronization issues
335  # (i.e. don't trust the timestamp field)
336  timestamp = log_entry.get('receiveTimestamp', None)
337  if timestamp:
338    return dateutil.parser.parse(timestamp)
339  return timestamp
def format_log_entry(log_entry: dict) -> str:
342def format_log_entry(log_entry: dict) -> str:
343  """Format a log_entry, as returned by LogsQuery.entries to a simple one-line
344  string with the date and message."""
345  log_message = None
346  if 'jsonPayload' in log_entry:
347    for key in ['message', 'MESSAGE']:
348      if key in log_entry['jsonPayload']:
349        log_message = log_entry['jsonPayload'][key]
350        break
351  if log_message is None:
352    log_message = log_entry.get('textPayload')
353  log_date = log_entry_timestamp(log_entry)
354  log_date_str = log_date.astimezone().isoformat(sep=' ', timespec='seconds')
355  return f'{log_date_str}: {log_message}'

Format a log_entry, as returned by LogsQuery.entries to a simple one-line string with the date and message.

def exclusions(project_id: str) -> Optional[List[LogExclusion]]:
358def exclusions(project_id: str) -> Union[List[LogExclusion], None]:
359  logging_api = apis.get_api('logging', 'v2', project_id)
360  if not apis.is_enabled(project_id, 'logging'):
361    return None
362
363  log_exclusions: List[LogExclusion] = []
364
365  fetched_entries_count = 0
366  req = logging_api.exclusions().list(parent=f'projects/{project_id}')
367  while req is not None:
368    res = req.execute(num_retries=config.API_RETRIES)
369    fetched_entries_count += 1
370    if res:
371      for log_exclusion_resp in res['exclusions']:
372        log_exclusions.append(LogExclusion(project_id, log_exclusion_resp))
373    req = logging_api.exclusions().list_next(req, res)
374    if req is not None:
375      # pylint: disable=logging-fstring-interpolation
376      logging.debug(f'still fetching log exclusions for project {project_id}')
377      # pylint: enable=logging-fstring-interpolation
378  return log_exclusions