gcpdiag.queries.osconfig
Queries related to GCP OS Config
class
Inventory(gcpdiag.models.Resource):
29class Inventory(models.Resource): 30 """Represents OS Inventory data of a GCE VM instance""" 31 32 _resource_data: dict 33 34 def __init__(self, project_id, resource_data): 35 super().__init__(project_id=project_id) 36 self._resource_data = resource_data 37 38 # e.g: projects/{project_number}/locations/{location}/instances/{instance_id}/inventory 39 @property 40 def full_path(self) -> str: 41 return self._resource_data['name'] 42 43 # e.g: {project_number}/{location}/{instance_id}/inventory 44 @property 45 def short_path(self) -> str: 46 path = self.full_path 47 path = re.sub(r'^projects/', '', path) 48 path = re.sub(r'/locations/', '/', path) 49 path = re.sub(r'/instances/', '/', path) 50 return path 51 52 # e.g: debian, windows. 53 @property 54 def os_shortname(self) -> str: 55 if 'osInfo' in self._resource_data: 56 return self._resource_data['osInfo'].get('shortName', '') 57 return '' 58 59 @property 60 def os_version(self) -> str: 61 if 'osInfo' in self._resource_data: 62 return self._resource_data['osInfo'].get('version', '') 63 return '' 64 65 # <key: installed package name, value: installed version> 66 @property 67 def installed_packages(self) -> Mapping[str, str]: 68 installed_packages: Dict[str, str] = {} 69 if 'items' in self._resource_data: 70 installed_items = [ 71 i for i in self._resource_data['items'].values() 72 if i.get('type', '') == 'INSTALLED_PACKAGE' 73 ] 74 for item in installed_items: 75 if 'installedPackage' not in item: 76 continue 77 pkg = item['installedPackage'] 78 if 'yumPackage' in pkg: 79 p = pkg['yumPackage'] 80 installed_packages[p.get('packageName', '')] = p.get('version', '') 81 elif 'aptPackage' in pkg: 82 p = pkg['aptPackage'] 83 installed_packages[p.get('packageName', '')] = p.get('version', '') 84 elif 'googetPackage' in pkg: 85 p = pkg['googetPackage'] 86 installed_packages[p.get('packageName', '')] = p.get('version', '') 87 elif 'windowsApplication' in pkg: 88 p = pkg['windowsApplication'] 89 installed_packages[p.get('displayName', 90 '')] = p.get('displayVersion', '') 91 return installed_packages
Represents OS Inventory data of a GCE VM instance
full_path: str
Returns the full path of this resource.
Example: 'projects/gcpdiag-gke-1-9b90/zones/europe-west4-a/clusters/gke1'
short_path: str
44 @property 45 def short_path(self) -> str: 46 path = self.full_path 47 path = re.sub(r'^projects/', '', path) 48 path = re.sub(r'/locations/', '/', path) 49 path = re.sub(r'/instances/', '/', path) 50 return path
Returns the short name for this resource.
Note that it isn't clear from this name what kind of resource it is.
Example: 'gke1'
installed_packages: Mapping[str, str]
66 @property 67 def installed_packages(self) -> Mapping[str, str]: 68 installed_packages: Dict[str, str] = {} 69 if 'items' in self._resource_data: 70 installed_items = [ 71 i for i in self._resource_data['items'].values() 72 if i.get('type', '') == 'INSTALLED_PACKAGE' 73 ] 74 for item in installed_items: 75 if 'installedPackage' not in item: 76 continue 77 pkg = item['installedPackage'] 78 if 'yumPackage' in pkg: 79 p = pkg['yumPackage'] 80 installed_packages[p.get('packageName', '')] = p.get('version', '') 81 elif 'aptPackage' in pkg: 82 p = pkg['aptPackage'] 83 installed_packages[p.get('packageName', '')] = p.get('version', '') 84 elif 'googetPackage' in pkg: 85 p = pkg['googetPackage'] 86 installed_packages[p.get('packageName', '')] = p.get('version', '') 87 elif 'windowsApplication' in pkg: 88 p = pkg['windowsApplication'] 89 installed_packages[p.get('displayName', 90 '')] = p.get('displayVersion', '') 91 return installed_packages
Inherited Members
- gcpdiag.models.Resource
- project_id
@caching.cached_api_call(in_memory=True)
def
get_inventory( context: gcpdiag.models.Context, location: str, instance_name: str) -> Optional[Inventory]:
94@caching.cached_api_call(in_memory=True) 95def get_inventory(context: models.Context, location: str, 96 instance_name: str) -> Optional[Inventory]: 97 if not apis.is_enabled(context.project_id, 'osconfig'): 98 return None 99 osconfig_api = apis.get_api('osconfig', 'v1', context.project_id) 100 logging.info( 101 'fetching inventory data for VM %s in zone %s in project %s', 102 instance_name, 103 location, 104 context.project_id, 105 ) 106 query = (osconfig_api.projects().locations().instances().inventories().get( 107 name= 108 f'projects/{context.project_id}/locations/{location}/instances/{instance_name}/inventory', 109 view='FULL', 110 )) 111 try: 112 resp = query.execute(num_retries=config.API_RETRIES) 113 except googleapiclient.errors.HttpError as err: 114 if err.resp.status in [404]: 115 return None 116 raise utils.GcpApiError(err) from err 117 return Inventory(context.project_id, resource_data=resp)