gcpdiag.queries.osconfig
Queries related to GCP OS Config
class
Inventory(gcpdiag.models.Resource):
29class Inventory(models.Resource): 30 """Represents OS Inventory data of a GCE VM instance""" 31 32 _resource_data: dict 33 34 def __init__(self, project_id, resource_data): 35 super().__init__(project_id=project_id) 36 self._resource_data = resource_data 37 38 # e.g: projects/{project_number}/locations/{location}/instances/{instance_id}/inventory 39 @property 40 def full_path(self) -> str: 41 return self._resource_data['name'] 42 43 # e.g: {project_number}/{location}/{instance_id}/inventory 44 @property 45 def short_path(self) -> str: 46 path = self.full_path 47 path = re.sub(r'^projects/', '', path) 48 path = re.sub(r'/locations/', '/', path) 49 path = re.sub(r'/instances/', '/', path) 50 return path 51 52 # e.g: '5221437597918447050' 53 @property 54 def instance_id(self) -> str: 55 return self._resource_data['name'].split('/')[-2] 56 57 # e.g: debian, windows. 58 @property 59 def os_shortname(self) -> str: 60 if 'osInfo' in self._resource_data: 61 return self._resource_data['osInfo'].get('shortName', '') 62 return '' 63 64 @property 65 def os_version(self) -> str: 66 if 'osInfo' in self._resource_data: 67 return self._resource_data['osInfo'].get('version', '') 68 return '' 69 70 # <key: installed package name, value: installed version> 71 @property 72 def installed_packages(self) -> Mapping[str, str]: 73 installed_packages: Dict[str, str] = {} 74 if 'items' in self._resource_data: 75 installed_items = [ 76 i for i in self._resource_data['items'].values() 77 if i.get('type', '') == 'INSTALLED_PACKAGE' 78 ] 79 for item in installed_items: 80 if 'installedPackage' not in item: 81 continue 82 pkg = item['installedPackage'] 83 if 'yumPackage' in pkg: 84 p = pkg['yumPackage'] 85 installed_packages[p.get('packageName', '')] = p.get('version', '') 86 elif 'aptPackage' in pkg: 87 p = pkg['aptPackage'] 88 installed_packages[p.get('packageName', '')] = p.get('version', '') 89 elif 'googetPackage' in pkg: 90 p = pkg['googetPackage'] 91 installed_packages[p.get('packageName', '')] = p.get('version', '') 92 elif 'windowsApplication' in pkg: 93 p = pkg['windowsApplication'] 94 installed_packages[p.get('displayName', 95 '')] = p.get('displayVersion', '') 96 return installed_packages
Represents OS Inventory data of a GCE VM instance
full_path: str
Returns the full path of this resource.
Example: 'projects/gcpdiag-gke-1-9b90/zones/europe-west4-a/clusters/gke1'
short_path: str
44 @property 45 def short_path(self) -> str: 46 path = self.full_path 47 path = re.sub(r'^projects/', '', path) 48 path = re.sub(r'/locations/', '/', path) 49 path = re.sub(r'/instances/', '/', path) 50 return path
Returns the short name for this resource.
Note that it isn't clear from this name what kind of resource it is.
Example: 'gke1'
installed_packages: Mapping[str, str]
71 @property 72 def installed_packages(self) -> Mapping[str, str]: 73 installed_packages: Dict[str, str] = {} 74 if 'items' in self._resource_data: 75 installed_items = [ 76 i for i in self._resource_data['items'].values() 77 if i.get('type', '') == 'INSTALLED_PACKAGE' 78 ] 79 for item in installed_items: 80 if 'installedPackage' not in item: 81 continue 82 pkg = item['installedPackage'] 83 if 'yumPackage' in pkg: 84 p = pkg['yumPackage'] 85 installed_packages[p.get('packageName', '')] = p.get('version', '') 86 elif 'aptPackage' in pkg: 87 p = pkg['aptPackage'] 88 installed_packages[p.get('packageName', '')] = p.get('version', '') 89 elif 'googetPackage' in pkg: 90 p = pkg['googetPackage'] 91 installed_packages[p.get('packageName', '')] = p.get('version', '') 92 elif 'windowsApplication' in pkg: 93 p = pkg['windowsApplication'] 94 installed_packages[p.get('displayName', 95 '')] = p.get('displayVersion', '') 96 return installed_packages
@caching.cached_api_call(in_memory=True)
def
list_inventories( context: gcpdiag.models.Context, location: str) -> Mapping[str, Inventory]:
99@caching.cached_api_call(in_memory=True) 100def list_inventories( 101 context: models.Context, 102 location: str, 103) -> Mapping[str, Inventory]: 104 inventories: Dict[str, Inventory] = {} 105 if not apis.is_enabled(context.project_id, 'osconfig'): 106 return inventories 107 osconfig_api = apis.get_api('osconfig', 'v1', context.project_id) 108 logging.info( 109 'fetching inventory data for all VMs under zone %s in project %s', 110 location, 111 context.project_id, 112 ) 113 query = osconfig_api.projects().locations().instances().inventories() 114 115 try: 116 resp = apis_utils.list_all( 117 query.list( 118 parent=( 119 f'projects/{context.project_id}/locations/{location}/instances/-' 120 ), 121 view='FULL', 122 ), 123 query.list_next, 124 'inventories', 125 ) 126 except googleapiclient.errors.HttpError as err: 127 if err.resp.status in [404]: 128 return inventories 129 raise utils.GcpApiError(err) from err 130 131 for i in resp: 132 inventory = Inventory(context.project_id, resource_data=i) 133 inventories[inventory.instance_id] = inventory 134 return inventories
@caching.cached_api_call(in_memory=True)
def
get_inventory( context: gcpdiag.models.Context, location: str, instance_name: str) -> Optional[Inventory]:
137@caching.cached_api_call(in_memory=True) 138def get_inventory(context: models.Context, location: str, 139 instance_name: str) -> Optional[Inventory]: 140 if not apis.is_enabled(context.project_id, 'osconfig'): 141 return None 142 osconfig_api = apis.get_api('osconfig', 'v1', context.project_id) 143 logging.info( 144 'fetching inventory data for VM %s in zone %s in project %s', 145 instance_name, 146 location, 147 context.project_id, 148 ) 149 query = (osconfig_api.projects().locations().instances().inventories().get( 150 name= 151 f'projects/{context.project_id}/locations/{location}/instances/{instance_name}/inventory', 152 view='FULL', 153 )) 154 try: 155 resp = query.execute(num_retries=config.API_RETRIES) 156 except googleapiclient.errors.HttpError as err: 157 if err.resp.status in [404]: 158 return None 159 raise utils.GcpApiError(err) from err 160 return Inventory(context.project_id, resource_data=resp)