gcpdiag.queries.logs

Queries related to Cloud Logging. The main functionality is querying log entries, which is supposed to be used as follows: 1. Call query() with the logs query parameters that you need. This returns a LogsQuery object which can be used to retrieve the logs later. 2. Call execute_queries() to execute all log query jobs. Similar queries will be grouped together to minimize the number of required API calls. Multiple queries will be done in parallel, while always respecting the Cloud Logging limit of 60 queries per 60 seconds. 3. Use the entries property on the LogsQuery object to iterate over the fetched logs. Note that the entries are not guaranteed to be filtered by what was given in the "filter_str" argument to query(), you will need to filter out the entries in code as well when iterating over the log entries. Side note: this module is not called 'logging' to avoid using the same name as the standard python library for logging.
class LogsQuery:
66class LogsQuery:
67  """A log search job that was started with prefetch_logs()."""
68  job: _LogsQueryJob
69
70  def __init__(self, job):
71    self.job = job
72
73  @property
74  def entries(self) -> Sequence:
75    if not self.job.future:
76      raise RuntimeError(
77          'log query wasn\'t executed. did you forget to call execute_queries()?'
78      )
79    elif self.job.future.running():
80      logging.info(
81          'waiting for logs query results (project: %s, resource type: %s)',
82          self.job.project_id, self.job.resource_type)
83    return self.job.future.result()

A log search job that was started with prefetch_logs().

LogsQuery(job)
70  def __init__(self, job):
71    self.job = job
job: gcpdiag.queries.logs._LogsQueryJob
entries: Sequence
73  @property
74  def entries(self) -> Sequence:
75    if not self.job.future:
76      raise RuntimeError(
77          'log query wasn\'t executed. did you forget to call execute_queries()?'
78      )
79    elif self.job.future.running():
80      logging.info(
81          'waiting for logs query results (project: %s, resource type: %s)',
82          self.job.project_id, self.job.resource_type)
83    return self.job.future.result()
jobs_todo: Dict[Tuple[str, str, str], gcpdiag.queries.logs._LogsQueryJob] = {}
class LogEntryShort:
 89class LogEntryShort:
 90  """A common log entry"""
 91  _text: str
 92  _timestamp: Optional[datetime.datetime]
 93
 94  def __init__(self, raw_entry):
 95    if isinstance(raw_entry, dict):
 96      self._text = get_path(raw_entry, ('textPayload',), default='')
 97      self._timestamp = log_entry_timestamp(raw_entry)
 98
 99    if isinstance(raw_entry, str):
100      self._text = raw_entry
101      # we could extract timestamp from serial entries
102      # but they are not always present
103      # and may be unreliable as we don't know the system clock setting
104      self._timestamp = None
105
106  @property
107  def text(self):
108    return self._text
109
110  @property
111  def timestamp(self):
112    return self._timestamp
113
114  @property
115  def timestamp_iso(self):
116    if self._timestamp:
117      return self._timestamp.astimezone().isoformat(sep=' ', timespec='seconds')
118    return None

A common log entry

LogEntryShort(raw_entry)
 94  def __init__(self, raw_entry):
 95    if isinstance(raw_entry, dict):
 96      self._text = get_path(raw_entry, ('textPayload',), default='')
 97      self._timestamp = log_entry_timestamp(raw_entry)
 98
 99    if isinstance(raw_entry, str):
100      self._text = raw_entry
101      # we could extract timestamp from serial entries
102      # but they are not always present
103      # and may be unreliable as we don't know the system clock setting
104      self._timestamp = None
text
106  @property
107  def text(self):
108    return self._text
timestamp
110  @property
111  def timestamp(self):
112    return self._timestamp
timestamp_iso
114  @property
115  def timestamp_iso(self):
116    if self._timestamp:
117      return self._timestamp.astimezone().isoformat(sep=' ', timespec='seconds')
118    return None
class LogExclusion(gcpdiag.models.Resource):
121class LogExclusion(models.Resource):
122  """A log exclusion entry"""
123  _resource_data: dict
124  project_id: str
125
126  def __init__(self, project_id: str, resource_data: dict):
127    super().__init__(project_id)
128    self._resource_data = resource_data
129
130  @property
131  def full_path(self) -> str:
132    return self._resource_data['name']
133
134  @property
135  def filter(self) -> str:
136    return self._resource_data['filter']
137
138  @property
139  def disabled(self) -> bool:
140    if 'disabled' in self._resource_data:
141      return self._resource_data['disabled']
142    return False

A log exclusion entry

LogExclusion(project_id: str, resource_data: dict)
126  def __init__(self, project_id: str, resource_data: dict):
127    super().__init__(project_id)
128    self._resource_data = resource_data
project_id: str
249  @property
250  def project_id(self) -> str:
251    """Project id (not project number)."""
252    return self._project_id

Project id (not project number).

full_path: str
130  @property
131  def full_path(self) -> str:
132    return self._resource_data['name']

Returns the full path of this resource.

Example: 'projects/gcpdiag-gke-1-9b90/zones/europe-west4-a/clusters/gke1'

filter: str
134  @property
135  def filter(self) -> str:
136    return self._resource_data['filter']
disabled: bool
138  @property
139  def disabled(self) -> bool:
140    if 'disabled' in self._resource_data:
141      return self._resource_data['disabled']
142    return False
def query( project_id: str, resource_type: str, log_name: str, filter_str: str) -> LogsQuery:
145def query(project_id: str, resource_type: str, log_name: str,
146          filter_str: str) -> LogsQuery:
147  # Aggregate by project_id, resource_type, log_name
148  job_key = (project_id, resource_type, log_name)
149  job = jobs_todo.setdefault(
150      job_key,
151      _LogsQueryJob(
152          project_id=project_id,
153          resource_type=resource_type,
154          log_name=log_name,
155          filters=set(),
156      ))
157  job.filters.add(filter_str)
158  return LogsQuery(job=job)
@caching.cached_api_call
def realtime_query(project_id, filter_str, start_time_utc, end_time_utc):
243@caching.cached_api_call
244def realtime_query(project_id, filter_str, start_time_utc, end_time_utc):
245  """Intended for use in only runbooks. use logs.query() for lint rules."""
246  logging_api = apis.get_api('logging', 'v2', project_id)
247
248  filter_lines = [filter_str]
249  filter_lines.append('timestamp>"%s"' %
250                      start_time_utc.isoformat(timespec='seconds'))
251  filter_lines.append('timestamp<"%s"' %
252                      end_time_utc.isoformat(timespec='seconds'))
253  filter_str = '\n'.join(filter_lines)
254  logging.info('searching logs in project %s for logs between %s and %s',
255               project_id, str(start_time_utc), str(end_time_utc))
256  deque = Deque()
257  req = logging_api.entries().list(
258      body={
259          'resourceNames': [f'projects/{project_id}'],
260          'filter': filter_str,
261          'orderBy': 'timestamp desc',
262          'pageSize': config.get('logging_page_size')
263      })
264  fetched_entries_count = 0
265  query_pages = 0
266  query_start_time = datetime.datetime.now()
267  while req is not None:
268    query_pages += 1
269    res = _ratelimited_execute(req)
270    if 'entries' in res:
271      for e in res['entries']:
272        fetched_entries_count += 1
273        deque.appendleft(e)
274
275    # Verify that we aren't above limits, exit otherwise.
276    if fetched_entries_count > config.get('logging_fetch_max_entries'):
277      logging.warning(
278          'maximum number of log entries (%d) reached (project: %s, query: %s).',
279          config.get('logging_fetch_max_entries'), project_id,
280          filter_str.replace('\n', ' AND '))
281      return deque
282    run_time = (datetime.datetime.now() - query_start_time).total_seconds()
283    if run_time >= config.get('logging_fetch_max_time_seconds'):
284      logging.warning(
285          'maximum query runtime for log query reached (project: %s, query: %s).',
286          project_id, filter_str.replace('\n', ' AND '))
287      return deque
288    req = logging_api.entries().list_next(req, res)
289    if req is not None:
290      logging.info('still fetching logs (project: %s, max wait: %ds)',
291                   project_id,
292                   config.get('logging_fetch_max_time_seconds') - run_time)
293
294  query_end_time = datetime.datetime.now()
295  logging.debug('logging query run time: %s, pages: %d, query: %s',
296                query_end_time - query_start_time, query_pages,
297                filter_str.replace('\n', ' AND '))
298
299  return deque

Intended for use in only runbooks. use logs.query() for lint rules.

def execute_queries(executor: concurrent.futures._base.Executor):
302def execute_queries(executor: concurrent.futures.Executor):
303  global jobs_todo
304  jobs_executing = jobs_todo
305  jobs_todo = {}
306  for job in jobs_executing.values():
307    job.future = executor.submit(_execute_query_job, job)
def log_entry_timestamp(log_entry: Mapping[str, Any]) -> datetime.datetime:
310def log_entry_timestamp(log_entry: Mapping[str, Any]) -> datetime.datetime:
311  # Use receiveTimestamp so that we don't have any time synchronization issues
312  # (i.e. don't trust the timestamp field)
313  timestamp = log_entry.get('receiveTimestamp', None)
314  if timestamp:
315    return dateutil.parser.parse(timestamp)
316  return timestamp
def format_log_entry(log_entry: dict) -> str:
319def format_log_entry(log_entry: dict) -> str:
320  """Format a log_entry, as returned by LogsQuery.entries to a simple one-line
321  string with the date and message."""
322  log_message = None
323  if 'jsonPayload' in log_entry:
324    for key in ['message', 'MESSAGE']:
325      if key in log_entry['jsonPayload']:
326        log_message = log_entry['jsonPayload'][key]
327        break
328  if log_message is None:
329    log_message = log_entry.get('textPayload')
330  log_date = log_entry_timestamp(log_entry)
331  log_date_str = log_date.astimezone().isoformat(sep=' ', timespec='seconds')
332  return f'{log_date_str}: {log_message}'

Format a log_entry, as returned by LogsQuery.entries to a simple one-line string with the date and message.

def exclusions(project_id: str) -> Optional[List[LogExclusion]]:
335def exclusions(project_id: str) -> Union[List[LogExclusion], None]:
336  logging_api = apis.get_api('logging', 'v2', project_id)
337  if not apis.is_enabled(project_id, 'logging'):
338    return None
339
340  log_exclusions: List[LogExclusion] = []
341
342  fetched_entries_count = 0
343  req = logging_api.exclusions().list(parent=f'projects/{project_id}')
344  while req is not None:
345    res = req.execute(num_retries=config.API_RETRIES)
346    fetched_entries_count += 1
347    if res:
348      for log_exclusion_resp in res['exclusions']:
349        log_exclusions.append(LogExclusion(project_id, log_exclusion_resp))
350    req = logging_api.exclusions().list_next(req, res)
351    if req is not None:
352      # pylint: disable=logging-fstring-interpolation
353      logging.info(f'still fetching log exclusions for project {project_id}')
354      # pylint: enable=logging-fstring-interpolation
355  return log_exclusions