gcpdiag.queries.pubsub
32class Topic(models.Resource): 33 """Represent a Topic""" 34 _resource_data: dict 35 36 def __init__(self, project_id, resource_data): 37 super().__init__(project_id=project_id) 38 self._resource_data = resource_data 39 self._metadata_dict = None 40 41 @property 42 def name(self) -> str: 43 m = re.search(r'/topics/([^/]+)$', self._resource_data['name']) 44 if not m: 45 raise RuntimeError('can\'t determine name of topic %s' % 46 (self._resource_data['name'])) 47 return m.group(1) 48 49 @property 50 def full_path(self) -> str: 51 return self._resource_data['name'] 52 53 @property 54 def short_path(self) -> str: 55 path = self.project_id + '/' + self.name 56 return path 57 58 @property 59 def kms_key_name(self) -> str: 60 return self._resource_data['kmsKeyName']
Represent a Topic
Returns the full path of this resource.
Example: 'projects/gcpdiag-gke-1-9b90/zones/europe-west4-a/clusters/gke1'
63@caching.cached_api_call 64def get_topics(context: models.Context) -> Mapping[str, Topic]: 65 """Get all topics(Does not include deleted topics).""" 66 topics: Dict[str, Topic] = {} 67 if not apis.is_enabled(context.project_id, 'pubsub'): 68 return topics 69 pubsub_api = apis.get_api('pubsub', 'v1', context.project_id) 70 logging.debug('fetching list of PubSub topics in project %s', 71 context.project_id) 72 query = pubsub_api.projects().topics().list( 73 project=f'projects/{context.project_id}') 74 try: 75 resp = query.execute(num_retries=config.API_RETRIES) 76 if 'topics' not in resp: 77 return topics 78 for t in resp['topics']: 79 # verify that we have some minimal data that we expect 80 if 'name' not in t: 81 raise RuntimeError('missing data in topics response') 82 # projects/{project}/topics/{topic} 83 result = re.match(r'projects/[^/]+/topics/([^/]+)', t['name']) 84 if not result: 85 logging.error('invalid topic data: %s', t['name']) 86 continue 87 88 if not context.match_project_resource(resource=result.group(1), 89 labels=t.get('labels', {})): 90 continue 91 92 topics[t['name']] = Topic(project_id=context.project_id, resource_data=t) 93 except googleapiclient.errors.HttpError as err: 94 raise utils.GcpApiError(err) from err 95 return topics
Get all topics(Does not include deleted topics).
98class TopicIAMPolicy(iam.BaseIAMPolicy): 99 100 def _is_resource_permission(self, permission): 101 return True
Common class for IAM policies
104@caching.cached_api_call(in_memory=True) 105def get_topic_iam_policy(context: models.Context, name: str) -> TopicIAMPolicy: 106 project_id = utils.get_project_by_res_name(name) 107 108 pubsub_api = apis.get_api('pubsub', 'v1', project_id) 109 request = pubsub_api.projects().topics().getIamPolicy(resource=name) 110 111 return iam.fetch_iam_policy(request, TopicIAMPolicy, project_id, name, 112 context)
115class Subscription(models.Resource): 116 """Represent a Subscription.""" 117 _resource_data: dict 118 119 def __init__(self, project_id, resource_data): 120 super().__init__(project_id=project_id) 121 self._resource_data = resource_data 122 self._metadata_dict = None 123 124 @property 125 def name(self) -> str: 126 m = re.search(r'/subscriptions/([^/]+)$', self._resource_data['name']) 127 if not m: 128 raise RuntimeError('can\'t determine name of subscription %s' % 129 (self._resource_data['name'])) 130 return m.group(1) 131 132 @property 133 def full_path(self) -> str: 134 return self._resource_data['name'] 135 136 @property 137 def short_path(self) -> str: 138 path = self.project_id + '/' + self.name 139 return path 140 141 @property 142 def topic(self) -> Union[Topic, str]: 143 """ 144 Return subscription's topic as a Topic object, 145 or String '_deleted-topic_' if topic is deleted. 146 """ 147 if 'topic' not in self._resource_data: 148 raise RuntimeError('topic not set for subscription {self.name}') 149 elif self._resource_data['topic'] == '_deleted-topic_': 150 return '_deleted_topic_' 151 152 m = re.match(r'projects/([^/]+)/topics/([^/]+)', 153 self._resource_data['topic']) 154 if not m: 155 raise RuntimeError("can't parse topic: %s" % self._resource_data['topic']) 156 (project_id, topic_name) = (m.group(1), self._resource_data['topic']) 157 topics = get_topics(models.Context(project_id)) 158 if topic_name not in topics: 159 raise RuntimeError( 160 f'Topic {topic_name} for Subscription {self.name} not found') 161 return topics[topic_name] 162 163 @property 164 def push_config(self) -> dict: 165 return self._resource_data.get('pushConfig', {}) 166 167 @property 168 def push_oidc_service_account_email(self) -> str: 169 """Return the OIDC service account email for a push subscription.""" 170 return self.push_config.get('oidcToken', {}).get('serviceAccountEmail', '') 171 172 def is_detached(self) -> bool: 173 """Return if subscription is detached.""" 174 if 'detached' in self._resource_data: 175 return bool(self._resource_data['detached']) 176 return False 177 178 def is_big_query_subscription(self) -> bool: 179 """Return Boolean value if subscription is a big query subscription.""" 180 if 'bigqueryConfig' in self._resource_data: 181 return True 182 return False 183 184 def is_gcs_subscription(self) -> bool: 185 """Return Boolean value if subscription is a gcs subscription.""" 186 if 'cloudStorageConfig' in self._resource_data: 187 return True 188 return False 189 190 def is_push_subscription(self) -> bool: 191 """Return Boolean value if subscription is a push subscription.""" 192 if (self._resource_data['pushConfig'] or self.is_big_query_subscription() or 193 self.is_gcs_subscription()): 194 return True 195 return False 196 197 def is_active(self) -> bool: 198 """Return Boolean value if subscription is active.""" 199 return self._resource_data['state'] == 'ACTIVE' 200 201 def has_dead_letter_topic(self) -> bool: 202 """Return Truthy value if subscription has a dead-letter topic.""" 203 if 'deadLetterPolicy' in self._resource_data: 204 return bool(self._resource_data['deadLetterPolicy']['deadLetterTopic']) 205 return False 206 207 def dead_letter_topic(self) -> str: 208 """Return the dead-letter topic.""" 209 if self.has_dead_letter_topic(): 210 return self._resource_data.get('deadLetterPolicy', 211 {}).get('deadLetterTopic', '') 212 return '' 213 214 def gcs_subscription_bucket(self) -> str: 215 """Return the name of the bucket attached to GCS subscription.""" 216 if self.is_gcs_subscription(): 217 return get_path(self._resource_data, ('cloudStorageConfig', 'bucket')) 218 return '' # acts as a null return that can be evaluated as a false value
Represent a Subscription.
Returns the full path of this resource.
Example: 'projects/gcpdiag-gke-1-9b90/zones/europe-west4-a/clusters/gke1'
136 @property 137 def short_path(self) -> str: 138 path = self.project_id + '/' + self.name 139 return path
Returns the short name for this resource.
Note that it isn't clear from this name what kind of resource it is.
Example: 'gke1'
141 @property 142 def topic(self) -> Union[Topic, str]: 143 """ 144 Return subscription's topic as a Topic object, 145 or String '_deleted-topic_' if topic is deleted. 146 """ 147 if 'topic' not in self._resource_data: 148 raise RuntimeError('topic not set for subscription {self.name}') 149 elif self._resource_data['topic'] == '_deleted-topic_': 150 return '_deleted_topic_' 151 152 m = re.match(r'projects/([^/]+)/topics/([^/]+)', 153 self._resource_data['topic']) 154 if not m: 155 raise RuntimeError("can't parse topic: %s" % self._resource_data['topic']) 156 (project_id, topic_name) = (m.group(1), self._resource_data['topic']) 157 topics = get_topics(models.Context(project_id)) 158 if topic_name not in topics: 159 raise RuntimeError( 160 f'Topic {topic_name} for Subscription {self.name} not found') 161 return topics[topic_name]
Return subscription's topic as a Topic object, or String '_deleted-topic_' if topic is deleted.
167 @property 168 def push_oidc_service_account_email(self) -> str: 169 """Return the OIDC service account email for a push subscription.""" 170 return self.push_config.get('oidcToken', {}).get('serviceAccountEmail', '')
Return the OIDC service account email for a push subscription.
172 def is_detached(self) -> bool: 173 """Return if subscription is detached.""" 174 if 'detached' in self._resource_data: 175 return bool(self._resource_data['detached']) 176 return False
Return if subscription is detached.
178 def is_big_query_subscription(self) -> bool: 179 """Return Boolean value if subscription is a big query subscription.""" 180 if 'bigqueryConfig' in self._resource_data: 181 return True 182 return False
Return Boolean value if subscription is a big query subscription.
184 def is_gcs_subscription(self) -> bool: 185 """Return Boolean value if subscription is a gcs subscription.""" 186 if 'cloudStorageConfig' in self._resource_data: 187 return True 188 return False
Return Boolean value if subscription is a gcs subscription.
190 def is_push_subscription(self) -> bool: 191 """Return Boolean value if subscription is a push subscription.""" 192 if (self._resource_data['pushConfig'] or self.is_big_query_subscription() or 193 self.is_gcs_subscription()): 194 return True 195 return False
Return Boolean value if subscription is a push subscription.
197 def is_active(self) -> bool: 198 """Return Boolean value if subscription is active.""" 199 return self._resource_data['state'] == 'ACTIVE'
Return Boolean value if subscription is active.
201 def has_dead_letter_topic(self) -> bool: 202 """Return Truthy value if subscription has a dead-letter topic.""" 203 if 'deadLetterPolicy' in self._resource_data: 204 return bool(self._resource_data['deadLetterPolicy']['deadLetterTopic']) 205 return False
Return Truthy value if subscription has a dead-letter topic.
207 def dead_letter_topic(self) -> str: 208 """Return the dead-letter topic.""" 209 if self.has_dead_letter_topic(): 210 return self._resource_data.get('deadLetterPolicy', 211 {}).get('deadLetterTopic', '') 212 return ''
Return the dead-letter topic.
214 def gcs_subscription_bucket(self) -> str: 215 """Return the name of the bucket attached to GCS subscription.""" 216 if self.is_gcs_subscription(): 217 return get_path(self._resource_data, ('cloudStorageConfig', 'bucket')) 218 return '' # acts as a null return that can be evaluated as a false value
Return the name of the bucket attached to GCS subscription.
221@caching.cached_api_call 222def get_subscriptions(context: models.Context) -> Mapping[str, Subscription]: 223 subscriptions: Dict[str, Subscription] = {} 224 if not apis.is_enabled(context.project_id, 'pubsub'): 225 return subscriptions 226 pubsub_api = apis.get_api('pubsub', 'v1', context.project_id) 227 logging.debug('fetching list of PubSub subscriptions in project %s', 228 context.project_id) 229 query = pubsub_api.projects().subscriptions().list( 230 project=f'projects/{context.project_id}') 231 try: 232 resp = query.execute(num_retries=config.API_RETRIES) 233 if 'subscriptions' not in resp: 234 return subscriptions 235 for s in resp['subscriptions']: 236 # verify that we have some minimal data that we expect 237 if 'name' not in s: 238 raise RuntimeError('missing data in topics response') 239 240 # projects/{project}/subscriptions/{sub} 241 result = re.match(r'projects/[^/]+/subscriptions/([^/]+)', s['name']) 242 if not result: 243 logging.error('invalid subscription data: %s', s['name']) 244 continue 245 246 if not context.match_project_resource(resource=result.group(1), 247 labels=s.get('labels', {})): 248 continue 249 250 subscriptions[s['name']] = Subscription(project_id=context.project_id, 251 resource_data=s) 252 except googleapiclient.errors.HttpError as err: 253 raise utils.GcpApiError(err) from err 254 return subscriptions
257@caching.cached_api_call 258def get_subscription(project_id: str, 259 subscription_name: str) -> Union[None, Subscription]: 260 if not apis.is_enabled(project_id, 'pubsub'): 261 return None 262 pubsub_api = apis.get_api('pubsub', 'v1', project_id) 263 logging.debug('fetching PubSub subscription in project %s', project_id) 264 query = pubsub_api.projects().subscriptions().get( 265 subscription=f'projects/{project_id}/subscriptions/{subscription_name}') 266 try: 267 resp = query.execute(num_retries=config.API_RETRIES) 268 return Subscription(project_id=project_id, resource_data=resp) 269 except googleapiclient.errors.HttpError as err: 270 raise utils.GcpApiError(err) from err
273class SubscriptionIAMPolicy(iam.BaseIAMPolicy): 274 275 def _is_resource_permission(self, permission): 276 return True
Common class for IAM policies
279@caching.cached_api_call(in_memory=True) 280def get_subscription_iam_policy(context: models.Context, 281 name: str) -> SubscriptionIAMPolicy: 282 project_id = utils.get_project_by_res_name(name) 283 284 pubsub_api = apis.get_api('pubsub', 'v1', project_id) 285 request = pubsub_api.projects().subscriptions().getIamPolicy(resource=name) 286 287 return iam.fetch_iam_policy(request, SubscriptionIAMPolicy, project_id, name, 288 context)