gcpdiag.queries.pubsub
32class Topic(models.Resource): 33 """Represent a Topic""" 34 _resource_data: dict 35 36 def __init__(self, project_id, resource_data): 37 super().__init__(project_id=project_id) 38 self._resource_data = resource_data 39 self._metadata_dict = None 40 41 @property 42 def name(self) -> str: 43 m = re.search(r'/topics/([^/]+)$', self._resource_data['name']) 44 if not m: 45 raise RuntimeError('can\'t determine name of topic %s' % 46 (self._resource_data['name'])) 47 return m.group(1) 48 49 @property 50 def full_path(self) -> str: 51 return self._resource_data['name'] 52 53 @property 54 def short_path(self) -> str: 55 path = self.project_id + '/' + self.name 56 return path 57 58 @property 59 def kms_key_name(self) -> str: 60 return self._resource_data['kmsKeyName']
Represent a Topic
Returns the full path of this resource.
Example: 'projects/gcpdiag-gke-1-9b90/zones/europe-west4-a/clusters/gke1'
63@caching.cached_api_call 64def get_topics(context: models.Context) -> Mapping[str, Topic]: 65 """Get all topics(Does not include deleted topics).""" 66 topics: Dict[str, Topic] = {} 67 if not apis.is_enabled(context.project_id, 'pubsub'): 68 return topics 69 pubsub_api = apis.get_api('pubsub', 'v1', context.project_id) 70 logging.info('fetching list of PubSub topics in project %s', 71 context.project_id) 72 query = pubsub_api.projects().topics().list( 73 project=f'projects/{context.project_id}') 74 try: 75 resp = query.execute(num_retries=config.API_RETRIES) 76 if 'topics' not in resp: 77 return topics 78 for t in resp['topics']: 79 # verify that we have some minimal data that we expect 80 if 'name' not in t: 81 raise RuntimeError('missing data in topics response') 82 # projects/{project}/topics/{topic} 83 result = re.match(r'projects/[^/]+/topics/([^/]+)', t['name']) 84 if not result: 85 logging.error('invalid topic data: %s', t['name']) 86 continue 87 88 if not context.match_project_resource(resource=result.group(1), 89 labels=t.get('labels', {})): 90 continue 91 92 topics[t['name']] = Topic(project_id=context.project_id, resource_data=t) 93 except googleapiclient.errors.HttpError as err: 94 raise utils.GcpApiError(err) from err 95 return topics
Get all topics(Does not include deleted topics).
98class TopicIAMPolicy(iam.BaseIAMPolicy): 99 100 def _is_resource_permission(self, permission): 101 return True
Common class for IAM policies
104@caching.cached_api_call(in_memory=True) 105def get_topic_iam_policy(name: str) -> TopicIAMPolicy: 106 project_id = utils.get_project_by_res_name(name) 107 108 pubsub_api = apis.get_api('pubsub', 'v1', project_id) 109 request = pubsub_api.projects().topics().getIamPolicy(resource=name) 110 111 return iam.fetch_iam_policy(request, TopicIAMPolicy, project_id, name)
114class Subscription(models.Resource): 115 """Represent a Subscription.""" 116 _resource_data: dict 117 118 def __init__(self, project_id, resource_data): 119 super().__init__(project_id=project_id) 120 self._resource_data = resource_data 121 self._metadata_dict = None 122 123 @property 124 def name(self) -> str: 125 m = re.search(r'/subscriptions/([^/]+)$', self._resource_data['name']) 126 if not m: 127 raise RuntimeError('can\'t determine name of subscription %s' % 128 (self._resource_data['name'])) 129 return m.group(1) 130 131 @property 132 def full_path(self) -> str: 133 return self._resource_data['name'] 134 135 @property 136 def short_path(self) -> str: 137 path = self.project_id + '/' + self.name 138 return path 139 140 @property 141 def topic(self) -> Union[Topic, str]: 142 """ 143 Return subscription's topic as a Topic object, 144 or String '_deleted-topic_' if topic is deleted. 145 """ 146 if 'topic' not in self._resource_data: 147 raise RuntimeError('topic not set for subscription {self.name}') 148 elif self._resource_data['topic'] == '_deleted-topic_': 149 return '_deleted_topic_' 150 151 m = re.match(r'projects/([^/]+)/topics/([^/]+)', 152 self._resource_data['topic']) 153 if not m: 154 raise RuntimeError("can't parse topic: %s" % self._resource_data['topic']) 155 (project_id, topic_name) = (m.group(1), self._resource_data['topic']) 156 topics = get_topics(models.Context(project_id)) 157 if topic_name not in topics: 158 raise RuntimeError( 159 f'Topic {topic_name} for Subscription {self.name} not found') 160 return topics[topic_name] 161 162 def is_detached(self) -> bool: 163 """Return if subscription is detached.""" 164 if 'detached' in self._resource_data: 165 return bool(self._resource_data['detached']) 166 return False 167 168 def is_big_query_subscription(self) -> bool: 169 """Return Boolean value if subscription is a big query subscription.""" 170 if 'bigqueryConfig' in self._resource_data: 171 return True 172 return False 173 174 def is_gcs_subscription(self) -> bool: 175 """Return Boolean value if subscription is a gcs subscription.""" 176 if 'cloudStorageConfig' in self._resource_data: 177 return True 178 return False 179 180 def is_push_subscription(self) -> bool: 181 """Return Boolean value if subscription is a push subscription.""" 182 if (self._resource_data['pushConfig'] or self.is_big_query_subscription() or 183 self.is_gcs_subscription()): 184 return True 185 return False 186 187 def has_dead_letter_topic(self) -> bool: 188 """Return Truthy value if subscription has a dead-letter topic.""" 189 if 'deadLetterPolicy' in self._resource_data: 190 return bool(self._resource_data['deadLetterPolicy']['deadLetterTopic']) 191 return False 192 193 def gcs_subscription_bucket(self) -> str: 194 """Return the name of the bucket attached to GCS subscription.""" 195 if self.is_gcs_subscription(): 196 return get_path(self._resource_data, ('cloudStorageConfig', 'bucket')) 197 return '' # acts as a null return that can be evaluated as a false value
Represent a Subscription.
Returns the full path of this resource.
Example: 'projects/gcpdiag-gke-1-9b90/zones/europe-west4-a/clusters/gke1'
135 @property 136 def short_path(self) -> str: 137 path = self.project_id + '/' + self.name 138 return path
Returns the short name for this resource.
Note that it isn't clear from this name what kind of resource it is.
Example: 'gke1'
140 @property 141 def topic(self) -> Union[Topic, str]: 142 """ 143 Return subscription's topic as a Topic object, 144 or String '_deleted-topic_' if topic is deleted. 145 """ 146 if 'topic' not in self._resource_data: 147 raise RuntimeError('topic not set for subscription {self.name}') 148 elif self._resource_data['topic'] == '_deleted-topic_': 149 return '_deleted_topic_' 150 151 m = re.match(r'projects/([^/]+)/topics/([^/]+)', 152 self._resource_data['topic']) 153 if not m: 154 raise RuntimeError("can't parse topic: %s" % self._resource_data['topic']) 155 (project_id, topic_name) = (m.group(1), self._resource_data['topic']) 156 topics = get_topics(models.Context(project_id)) 157 if topic_name not in topics: 158 raise RuntimeError( 159 f'Topic {topic_name} for Subscription {self.name} not found') 160 return topics[topic_name]
Return subscription's topic as a Topic object, or String '_deleted-topic_' if topic is deleted.
162 def is_detached(self) -> bool: 163 """Return if subscription is detached.""" 164 if 'detached' in self._resource_data: 165 return bool(self._resource_data['detached']) 166 return False
Return if subscription is detached.
168 def is_big_query_subscription(self) -> bool: 169 """Return Boolean value if subscription is a big query subscription.""" 170 if 'bigqueryConfig' in self._resource_data: 171 return True 172 return False
Return Boolean value if subscription is a big query subscription.
174 def is_gcs_subscription(self) -> bool: 175 """Return Boolean value if subscription is a gcs subscription.""" 176 if 'cloudStorageConfig' in self._resource_data: 177 return True 178 return False
Return Boolean value if subscription is a gcs subscription.
180 def is_push_subscription(self) -> bool: 181 """Return Boolean value if subscription is a push subscription.""" 182 if (self._resource_data['pushConfig'] or self.is_big_query_subscription() or 183 self.is_gcs_subscription()): 184 return True 185 return False
Return Boolean value if subscription is a push subscription.
187 def has_dead_letter_topic(self) -> bool: 188 """Return Truthy value if subscription has a dead-letter topic.""" 189 if 'deadLetterPolicy' in self._resource_data: 190 return bool(self._resource_data['deadLetterPolicy']['deadLetterTopic']) 191 return False
Return Truthy value if subscription has a dead-letter topic.
193 def gcs_subscription_bucket(self) -> str: 194 """Return the name of the bucket attached to GCS subscription.""" 195 if self.is_gcs_subscription(): 196 return get_path(self._resource_data, ('cloudStorageConfig', 'bucket')) 197 return '' # acts as a null return that can be evaluated as a false value
Return the name of the bucket attached to GCS subscription.
200@caching.cached_api_call 201def get_subscriptions(context: models.Context) -> Mapping[str, Subscription]: 202 subscriptions: Dict[str, Subscription] = {} 203 if not apis.is_enabled(context.project_id, 'pubsub'): 204 return subscriptions 205 pubsub_api = apis.get_api('pubsub', 'v1', context.project_id) 206 logging.info('fetching list of PubSub subscriptions in project %s', 207 context.project_id) 208 query = pubsub_api.projects().subscriptions().list( 209 project=f'projects/{context.project_id}') 210 try: 211 resp = query.execute(num_retries=config.API_RETRIES) 212 if 'subscriptions' not in resp: 213 return subscriptions 214 for s in resp['subscriptions']: 215 # verify that we have some minimal data that we expect 216 if 'name' not in s: 217 raise RuntimeError('missing data in topics response') 218 219 # projects/{project}/subscriptions/{sub} 220 result = re.match(r'projects/[^/]+/subscriptions/([^/]+)', s['name']) 221 if not result: 222 logging.error('invalid subscription data: %s', s['name']) 223 continue 224 225 if not context.match_project_resource(resource=result.group(1), 226 labels=s.get('labels', {})): 227 continue 228 229 subscriptions[s['name']] = Subscription(project_id=context.project_id, 230 resource_data=s) 231 except googleapiclient.errors.HttpError as err: 232 raise utils.GcpApiError(err) from err 233 return subscriptions
236@caching.cached_api_call 237def get_subscription(project_id: str, 238 subscription_name: str) -> Union[None, Subscription]: 239 if not apis.is_enabled(project_id, 'pubsub'): 240 return None 241 pubsub_api = apis.get_api('pubsub', 'v1', project_id) 242 logging.info('fetching PubSub subscription in project %s', project_id) 243 query = pubsub_api.projects().subscriptions().get( 244 subscription=f'projects/{project_id}/subscriptions/{subscription_name}') 245 try: 246 resp = query.execute(num_retries=config.API_RETRIES) 247 return Subscription(project_id=project_id, resource_data=resp) 248 except googleapiclient.errors.HttpError as err: 249 raise utils.GcpApiError(err) from err
252class SubscriptionIAMPolicy(iam.BaseIAMPolicy): 253 254 def _is_resource_permission(self, permission): 255 return True
Common class for IAM policies
258@caching.cached_api_call(in_memory=True) 259def get_subscription_iam_policy(name: str) -> SubscriptionIAMPolicy: 260 project_id = utils.get_project_by_res_name(name) 261 262 pubsub_api = apis.get_api('pubsub', 'v1', project_id) 263 request = pubsub_api.projects().subscriptions().getIamPolicy(resource=name) 264 265 return iam.fetch_iam_policy(request, SubscriptionIAMPolicy, project_id, name)