gcpdiag.queries.gae
Queries related to GCP App Engine Standard app.
class
Service(gcpdiag.models.Resource):
29class Service(models.Resource): 30 """Represents an App Engine Standard app service.""" 31 _resource_data: dict 32 33 def __init__(self, project_id, resource_data): 34 super().__init__(project_id=project_id) 35 self._resource_data = resource_data 36 37 @property 38 def name(self) -> str: 39 m = re.search(r'/services/([^/]+)$', self._resource_data['name']) 40 if not m: 41 raise RuntimeError('can\'t determine name of service %s' % 42 (self._resource_data['name'])) 43 return m.group(1) 44 45 @property 46 def id(self) -> str: 47 return self._resource_data['id'] 48 49 @property 50 def full_path(self) -> str: 51 return self._resource_data['name'] 52 53 @property 54 def short_path(self) -> str: 55 path = self.project_id + '/' + self.id 56 return path
Represents an App Engine Standard app service.
class
Version(gcpdiag.models.Resource):
59class Version(models.Resource): 60 """Represents an App Engine Standard app version.""" 61 _resource_data: dict 62 service: Service 63 64 def __init__(self, project_id, resource_data): 65 super().__init__(project_id=project_id) 66 self._resource_data = resource_data 67 68 @property 69 def id(self) -> str: 70 return self._resource_data['id'] 71 72 @property 73 def full_path(self) -> str: 74 return self._resource_data['name'] 75 76 @property 77 def short_path(self) -> str: 78 path = self.project_id + '/' + self.id 79 return path 80 81 @property 82 def runtime(self) -> str: 83 return self._resource_data['runtime'] 84 85 @property 86 def env(self) -> str: 87 return self._resource_data['env']
Represents an App Engine Standard app version.
service: Service
full_path: str
Returns the full path of this resource.
Example: 'projects/gcpdiag-gke-1-9b90/zones/europe-west4-a/clusters/gke1'
@caching.cached_api_call
def
get_services( context: gcpdiag.models.Context) -> Mapping[str, Service]:
90@caching.cached_api_call 91def get_services(context: models.Context) -> Mapping[str, Service]: 92 """Get a list of App Engine Standard services matching the given context, 93 indexed by service id.""" 94 services: Dict[str, Service] = {} 95 if not apis.is_enabled(context.project_id, 'appengine'): 96 return services 97 appengine_api = apis.get_api('appengine', 'v1', context.project_id) 98 logging.info('fetching list of app engine services in the project %s', 99 context.project_id) 100 query = appengine_api.apps().services().list(appsId=context.project_id) 101 try: 102 resp = query.execute(num_retries=config.API_RETRIES) 103 if 'services' not in resp: 104 return services 105 for s in resp['services']: 106 # apps/myapp/services/default 107 result = re.match(r'apps/[^/]+/services/([^/]+)', s['name']) 108 if not result: 109 logging.error('invalid appengine data: %s', s['name']) 110 continue 111 112 labels = s.get('labels', {}) 113 114 if not context.match_project_resource(resource=result.group(1), 115 labels=labels): 116 continue 117 118 services[s['id']] = Service(project_id=context.project_id, 119 resource_data=s) 120 except googleapiclient.errors.HttpError as err: 121 raise utils.GcpApiError(err) from err 122 return services
Get a list of App Engine Standard services matching the given context, indexed by service id.
@caching.cached_api_call
def
get_versions( context: gcpdiag.models.Context) -> Mapping[str, Version]:
125@caching.cached_api_call 126def get_versions(context: models.Context) -> Mapping[str, Version]: 127 """Get a list of App Engine Standard service versions the given context, 128 indexed by a version id.""" 129 versions: Dict[str, Version] = {} 130 if not apis.is_enabled(context.project_id, 'appengine'): 131 return versions 132 133 appengine_api = apis.get_api('appengine', 'v1', context.project_id) 134 135 services = get_services(context) 136 137 for service in services.values(): 138 query = appengine_api.apps().services().versions().list( 139 appsId=context.project_id, servicesId=service.id) 140 try: 141 resp = query.execute(num_retries=config.API_RETRIES) 142 if 'versions' not in resp: 143 return versions 144 for resp_s in resp['versions']: 145 # verify that we have some minimal data that we expect 146 if 'id' not in resp_s: 147 raise RuntimeError('missing data in apps.services.list response') 148 v = Version(project_id=context.project_id, resource_data=resp_s) 149 v.service = service 150 versions[v.id] = v 151 except googleapiclient.errors.HttpError as err: 152 raise utils.GcpApiError(err) from err 153 154 logging.info('fetching list of app engine services in the project %s', 155 context.project_id) 156 157 return versions
Get a list of App Engine Standard service versions the given context, indexed by a version id.