gcpdiag.queries.osconfig

Queries related to GCP OS Config
class Inventory(gcpdiag.models.Resource):
29class Inventory(models.Resource):
30  """Represents OS Inventory data of a GCE VM instance"""
31
32  _resource_data: dict
33
34  def __init__(self, project_id, resource_data):
35    super().__init__(project_id=project_id)
36    self._resource_data = resource_data
37
38  # e.g: projects/{project_number}/locations/{location}/instances/{instance_id}/inventory
39  @property
40  def full_path(self) -> str:
41    return self._resource_data['name']
42
43  # e.g: {project_number}/{location}/{instance_id}/inventory
44  @property
45  def short_path(self) -> str:
46    path = self.full_path
47    path = re.sub(r'^projects/', '', path)
48    path = re.sub(r'/locations/', '/', path)
49    path = re.sub(r'/instances/', '/', path)
50    return path
51
52  # e.g: '5221437597918447050'
53  @property
54  def instance_id(self) -> str:
55    return self._resource_data['name'].split('/')[-2]
56
57  # e.g: debian, windows.
58  @property
59  def os_shortname(self) -> str:
60    if 'osInfo' in self._resource_data:
61      return self._resource_data['osInfo'].get('shortName', '')
62    return ''
63
64  @property
65  def os_version(self) -> str:
66    if 'osInfo' in self._resource_data:
67      return self._resource_data['osInfo'].get('version', '')
68    return ''
69
70  # <key: installed package name, value: installed version>
71  @property
72  def installed_packages(self) -> Mapping[str, str]:
73    installed_packages: Dict[str, str] = {}
74    if 'items' in self._resource_data:
75      installed_items = [
76          i for i in self._resource_data['items'].values()
77          if i.get('type', '') == 'INSTALLED_PACKAGE'
78      ]
79      for item in installed_items:
80        if 'installedPackage' not in item:
81          continue
82        pkg = item['installedPackage']
83        if 'yumPackage' in pkg:
84          p = pkg['yumPackage']
85          installed_packages[p.get('packageName', '')] = p.get('version', '')
86        elif 'aptPackage' in pkg:
87          p = pkg['aptPackage']
88          installed_packages[p.get('packageName', '')] = p.get('version', '')
89        elif 'googetPackage' in pkg:
90          p = pkg['googetPackage']
91          installed_packages[p.get('packageName', '')] = p.get('version', '')
92        elif 'windowsApplication' in pkg:
93          p = pkg['windowsApplication']
94          installed_packages[p.get('displayName',
95                                   '')] = p.get('displayVersion', '')
96    return installed_packages

Represents OS Inventory data of a GCE VM instance

Inventory(project_id, resource_data)
34  def __init__(self, project_id, resource_data):
35    super().__init__(project_id=project_id)
36    self._resource_data = resource_data
full_path: str
39  @property
40  def full_path(self) -> str:
41    return self._resource_data['name']

Returns the full path of this resource.

Example: 'projects/gcpdiag-gke-1-9b90/zones/europe-west4-a/clusters/gke1'

short_path: str
44  @property
45  def short_path(self) -> str:
46    path = self.full_path
47    path = re.sub(r'^projects/', '', path)
48    path = re.sub(r'/locations/', '/', path)
49    path = re.sub(r'/instances/', '/', path)
50    return path

Returns the short name for this resource.

Note that it isn't clear from this name what kind of resource it is.

Example: 'gke1'

instance_id: str
53  @property
54  def instance_id(self) -> str:
55    return self._resource_data['name'].split('/')[-2]
os_shortname: str
58  @property
59  def os_shortname(self) -> str:
60    if 'osInfo' in self._resource_data:
61      return self._resource_data['osInfo'].get('shortName', '')
62    return ''
os_version: str
64  @property
65  def os_version(self) -> str:
66    if 'osInfo' in self._resource_data:
67      return self._resource_data['osInfo'].get('version', '')
68    return ''
installed_packages: Mapping[str, str]
71  @property
72  def installed_packages(self) -> Mapping[str, str]:
73    installed_packages: Dict[str, str] = {}
74    if 'items' in self._resource_data:
75      installed_items = [
76          i for i in self._resource_data['items'].values()
77          if i.get('type', '') == 'INSTALLED_PACKAGE'
78      ]
79      for item in installed_items:
80        if 'installedPackage' not in item:
81          continue
82        pkg = item['installedPackage']
83        if 'yumPackage' in pkg:
84          p = pkg['yumPackage']
85          installed_packages[p.get('packageName', '')] = p.get('version', '')
86        elif 'aptPackage' in pkg:
87          p = pkg['aptPackage']
88          installed_packages[p.get('packageName', '')] = p.get('version', '')
89        elif 'googetPackage' in pkg:
90          p = pkg['googetPackage']
91          installed_packages[p.get('packageName', '')] = p.get('version', '')
92        elif 'windowsApplication' in pkg:
93          p = pkg['windowsApplication']
94          installed_packages[p.get('displayName',
95                                   '')] = p.get('displayVersion', '')
96    return installed_packages
@caching.cached_api_call(in_memory=True)
def list_inventories( context: gcpdiag.models.Context, location: str) -> Mapping[str, Inventory]:
 99@caching.cached_api_call(in_memory=True)
100def list_inventories(
101    context: models.Context,
102    location: str,
103) -> Mapping[str, Inventory]:
104  inventories: Dict[str, Inventory] = {}
105  if not apis.is_enabled(context.project_id, 'osconfig'):
106    return inventories
107  osconfig_api = apis.get_api('osconfig', 'v1', context.project_id)
108  logging.info(
109      'fetching inventory data for all VMs under zone %s in project %s',
110      location,
111      context.project_id,
112  )
113  query = osconfig_api.projects().locations().instances().inventories()
114
115  try:
116    resp = apis_utils.list_all(
117        query.list(
118            parent=(
119                f'projects/{context.project_id}/locations/{location}/instances/-'
120            ),
121            view='FULL',
122        ),
123        query.list_next,
124        'inventories',
125    )
126  except googleapiclient.errors.HttpError as err:
127    if err.resp.status in [404]:
128      return inventories
129    raise utils.GcpApiError(err) from err
130
131  for i in resp:
132    inventory = Inventory(context.project_id, resource_data=i)
133    inventories[inventory.instance_id] = inventory
134  return inventories
@caching.cached_api_call(in_memory=True)
def get_inventory( context: gcpdiag.models.Context, location: str, instance_name: str) -> Optional[Inventory]:
137@caching.cached_api_call(in_memory=True)
138def get_inventory(context: models.Context, location: str,
139                  instance_name: str) -> Optional[Inventory]:
140  if not apis.is_enabled(context.project_id, 'osconfig'):
141    return None
142  osconfig_api = apis.get_api('osconfig', 'v1', context.project_id)
143  logging.info(
144      'fetching inventory data for VM %s in zone %s in project %s',
145      instance_name,
146      location,
147      context.project_id,
148  )
149  query = (osconfig_api.projects().locations().instances().inventories().get(
150      name=
151      f'projects/{context.project_id}/locations/{location}/instances/{instance_name}/inventory',
152      view='FULL',
153  ))
154  try:
155    resp = query.execute(num_retries=config.API_RETRIES)
156  except googleapiclient.errors.HttpError as err:
157    if err.resp.status in [404]:
158      return None
159    raise utils.GcpApiError(err) from err
160  return Inventory(context.project_id, resource_data=resp)