gcpdiag.queries.osconfig

Queries related to GCP OS Config
class Inventory(gcpdiag.models.Resource):
29class Inventory(models.Resource):
30  """Represents OS Inventory data of a GCE VM instance"""
31
32  _resource_data: dict
33
34  def __init__(self, project_id, resource_data):
35    super().__init__(project_id=project_id)
36    self._resource_data = resource_data
37
38  # e.g: projects/{project_number}/locations/{location}/instances/{instance_id}/inventory
39  @property
40  def full_path(self) -> str:
41    return self._resource_data['name']
42
43  # e.g: {project_number}/{location}/{instance_id}/inventory
44  @property
45  def short_path(self) -> str:
46    path = self.full_path
47    path = re.sub(r'^projects/', '', path)
48    path = re.sub(r'/locations/', '/', path)
49    path = re.sub(r'/instances/', '/', path)
50    return path
51
52  # e.g: debian, windows.
53  @property
54  def os_shortname(self) -> str:
55    if 'osInfo' in self._resource_data:
56      return self._resource_data['osInfo'].get('shortName', '')
57    return ''
58
59  @property
60  def os_version(self) -> str:
61    if 'osInfo' in self._resource_data:
62      return self._resource_data['osInfo'].get('version', '')
63    return ''
64
65  # <key: installed package name, value: installed version>
66  @property
67  def installed_packages(self) -> Mapping[str, str]:
68    installed_packages: Dict[str, str] = {}
69    if 'items' in self._resource_data:
70      installed_items = [
71          i for i in self._resource_data['items'].values()
72          if i.get('type', '') == 'INSTALLED_PACKAGE'
73      ]
74      for item in installed_items:
75        if 'installedPackage' not in item:
76          continue
77        pkg = item['installedPackage']
78        if 'yumPackage' in pkg:
79          p = pkg['yumPackage']
80          installed_packages[p.get('packageName', '')] = p.get('version', '')
81        elif 'aptPackage' in pkg:
82          p = pkg['aptPackage']
83          installed_packages[p.get('packageName', '')] = p.get('version', '')
84        elif 'googetPackage' in pkg:
85          p = pkg['googetPackage']
86          installed_packages[p.get('packageName', '')] = p.get('version', '')
87        elif 'windowsApplication' in pkg:
88          p = pkg['windowsApplication']
89          installed_packages[p.get('displayName',
90                                   '')] = p.get('displayVersion', '')
91    return installed_packages

Represents OS Inventory data of a GCE VM instance

Inventory(project_id, resource_data)
34  def __init__(self, project_id, resource_data):
35    super().__init__(project_id=project_id)
36    self._resource_data = resource_data
full_path: str
39  @property
40  def full_path(self) -> str:
41    return self._resource_data['name']

Returns the full path of this resource.

Example: 'projects/gcpdiag-gke-1-9b90/zones/europe-west4-a/clusters/gke1'

short_path: str
44  @property
45  def short_path(self) -> str:
46    path = self.full_path
47    path = re.sub(r'^projects/', '', path)
48    path = re.sub(r'/locations/', '/', path)
49    path = re.sub(r'/instances/', '/', path)
50    return path

Returns the short name for this resource.

Note that it isn't clear from this name what kind of resource it is.

Example: 'gke1'

os_shortname: str
53  @property
54  def os_shortname(self) -> str:
55    if 'osInfo' in self._resource_data:
56      return self._resource_data['osInfo'].get('shortName', '')
57    return ''
os_version: str
59  @property
60  def os_version(self) -> str:
61    if 'osInfo' in self._resource_data:
62      return self._resource_data['osInfo'].get('version', '')
63    return ''
installed_packages: Mapping[str, str]
66  @property
67  def installed_packages(self) -> Mapping[str, str]:
68    installed_packages: Dict[str, str] = {}
69    if 'items' in self._resource_data:
70      installed_items = [
71          i for i in self._resource_data['items'].values()
72          if i.get('type', '') == 'INSTALLED_PACKAGE'
73      ]
74      for item in installed_items:
75        if 'installedPackage' not in item:
76          continue
77        pkg = item['installedPackage']
78        if 'yumPackage' in pkg:
79          p = pkg['yumPackage']
80          installed_packages[p.get('packageName', '')] = p.get('version', '')
81        elif 'aptPackage' in pkg:
82          p = pkg['aptPackage']
83          installed_packages[p.get('packageName', '')] = p.get('version', '')
84        elif 'googetPackage' in pkg:
85          p = pkg['googetPackage']
86          installed_packages[p.get('packageName', '')] = p.get('version', '')
87        elif 'windowsApplication' in pkg:
88          p = pkg['windowsApplication']
89          installed_packages[p.get('displayName',
90                                   '')] = p.get('displayVersion', '')
91    return installed_packages
Inherited Members
gcpdiag.models.Resource
project_id
@caching.cached_api_call(in_memory=True)
def get_inventory( context: gcpdiag.models.Context, location: str, instance_name: str) -> Optional[Inventory]:
 94@caching.cached_api_call(in_memory=True)
 95def get_inventory(context: models.Context, location: str,
 96                  instance_name: str) -> Optional[Inventory]:
 97  if not apis.is_enabled(context.project_id, 'osconfig'):
 98    return None
 99  osconfig_api = apis.get_api('osconfig', 'v1', context.project_id)
100  logging.info(
101      'fetching inventory data for VM %s in zone %s in project %s',
102      instance_name,
103      location,
104      context.project_id,
105  )
106  query = (osconfig_api.projects().locations().instances().inventories().get(
107      name=
108      f'projects/{context.project_id}/locations/{location}/instances/{instance_name}/inventory',
109      view='FULL',
110  ))
111  try:
112    resp = query.execute(num_retries=config.API_RETRIES)
113  except googleapiclient.errors.HttpError as err:
114    if err.resp.status in [404]:
115      return None
116    raise utils.GcpApiError(err) from err
117  return Inventory(context.project_id, resource_data=resp)