gcpdiag.queries.pubsub

Queries related to GCP PubSub
class Topic(gcpdiag.models.Resource):
32class Topic(models.Resource):
33  """Represent a Topic"""
34  _resource_data: dict
35
36  def __init__(self, project_id, resource_data):
37    super().__init__(project_id=project_id)
38    self._resource_data = resource_data
39    self._metadata_dict = None
40
41  @property
42  def name(self) -> str:
43    m = re.search(r'/topics/([^/]+)$', self._resource_data['name'])
44    if not m:
45      raise RuntimeError('can\'t determine name of topic %s' %
46                         (self._resource_data['name']))
47    return m.group(1)
48
49  @property
50  def full_path(self) -> str:
51    return self._resource_data['name']
52
53  @property
54  def short_path(self) -> str:
55    path = self.project_id + '/' + self.name
56    return path
57
58  @property
59  def kms_key_name(self) -> str:
60    return self._resource_data['kmsKeyName']

Represent a Topic

Topic(project_id, resource_data)
36  def __init__(self, project_id, resource_data):
37    super().__init__(project_id=project_id)
38    self._resource_data = resource_data
39    self._metadata_dict = None
name: str
41  @property
42  def name(self) -> str:
43    m = re.search(r'/topics/([^/]+)$', self._resource_data['name'])
44    if not m:
45      raise RuntimeError('can\'t determine name of topic %s' %
46                         (self._resource_data['name']))
47    return m.group(1)
full_path: str
49  @property
50  def full_path(self) -> str:
51    return self._resource_data['name']

Returns the full path of this resource.

Example: 'projects/gcpdiag-gke-1-9b90/zones/europe-west4-a/clusters/gke1'

short_path: str
53  @property
54  def short_path(self) -> str:
55    path = self.project_id + '/' + self.name
56    return path

Returns the short name for this resource.

Note that it isn't clear from this name what kind of resource it is.

Example: 'gke1'

kms_key_name: str
58  @property
59  def kms_key_name(self) -> str:
60    return self._resource_data['kmsKeyName']
@caching.cached_api_call
def get_topics( context: gcpdiag.models.Context) -> Mapping[str, Topic]:
63@caching.cached_api_call
64def get_topics(context: models.Context) -> Mapping[str, Topic]:
65  """Get all topics(Does not include deleted topics)."""
66  topics: Dict[str, Topic] = {}
67  if not apis.is_enabled(context.project_id, 'pubsub'):
68    return topics
69  pubsub_api = apis.get_api('pubsub', 'v1', context.project_id)
70  logging.debug('fetching list of PubSub topics in project %s',
71                context.project_id)
72  query = pubsub_api.projects().topics().list(
73      project=f'projects/{context.project_id}')
74  try:
75    resp = query.execute(num_retries=config.API_RETRIES)
76    if 'topics' not in resp:
77      return topics
78    for t in resp['topics']:
79      # verify that we have some minimal data that we expect
80      if 'name' not in t:
81        raise RuntimeError('missing data in topics response')
82        # projects/{project}/topics/{topic}
83      result = re.match(r'projects/[^/]+/topics/([^/]+)', t['name'])
84      if not result:
85        logging.error('invalid topic data: %s', t['name'])
86        continue
87
88      if not context.match_project_resource(resource=result.group(1),
89                                            labels=t.get('labels', {})):
90        continue
91
92      topics[t['name']] = Topic(project_id=context.project_id, resource_data=t)
93  except googleapiclient.errors.HttpError as err:
94    raise utils.GcpApiError(err) from err
95  return topics

Get all topics(Does not include deleted topics).

class TopicIAMPolicy(gcpdiag.queries.iam.BaseIAMPolicy):
 98class TopicIAMPolicy(iam.BaseIAMPolicy):
 99
100  def _is_resource_permission(self, permission):
101    return True

Common class for IAM policies

@caching.cached_api_call(in_memory=True)
def get_topic_iam_policy( context: gcpdiag.models.Context, name: str) -> TopicIAMPolicy:
104@caching.cached_api_call(in_memory=True)
105def get_topic_iam_policy(context: models.Context, name: str) -> TopicIAMPolicy:
106  project_id = utils.get_project_by_res_name(name)
107
108  pubsub_api = apis.get_api('pubsub', 'v1', project_id)
109  request = pubsub_api.projects().topics().getIamPolicy(resource=name)
110
111  return iam.fetch_iam_policy(request, TopicIAMPolicy, project_id, name,
112                              context)
class Subscription(gcpdiag.models.Resource):
115class Subscription(models.Resource):
116  """Represent a Subscription."""
117  _resource_data: dict
118
119  def __init__(self, project_id, resource_data):
120    super().__init__(project_id=project_id)
121    self._resource_data = resource_data
122    self._metadata_dict = None
123
124  @property
125  def name(self) -> str:
126    m = re.search(r'/subscriptions/([^/]+)$', self._resource_data['name'])
127    if not m:
128      raise RuntimeError('can\'t determine name of subscription %s' %
129                         (self._resource_data['name']))
130    return m.group(1)
131
132  @property
133  def full_path(self) -> str:
134    return self._resource_data['name']
135
136  @property
137  def short_path(self) -> str:
138    path = self.project_id + '/' + self.name
139    return path
140
141  @property
142  def topic(self) -> Union[Topic, str]:
143    """
144    Return subscription's topic as a Topic object,
145    or String '_deleted-topic_' if topic is deleted.
146    """
147    if 'topic' not in self._resource_data:
148      raise RuntimeError('topic not set for subscription {self.name}')
149    elif self._resource_data['topic'] == '_deleted-topic_':
150      return '_deleted_topic_'
151
152    m = re.match(r'projects/([^/]+)/topics/([^/]+)',
153                 self._resource_data['topic'])
154    if not m:
155      raise RuntimeError("can't parse topic: %s" % self._resource_data['topic'])
156    (project_id, topic_name) = (m.group(1), self._resource_data['topic'])
157    topics = get_topics(models.Context(project_id))
158    if topic_name not in topics:
159      raise RuntimeError(
160          f'Topic {topic_name} for Subscription {self.name} not found')
161    return topics[topic_name]
162
163  @property
164  def push_config(self) -> dict:
165    return self._resource_data.get('pushConfig', {})
166
167  @property
168  def push_oidc_service_account_email(self) -> str:
169    """Return the OIDC service account email for a push subscription."""
170    return self.push_config.get('oidcToken', {}).get('serviceAccountEmail', '')
171
172  def is_detached(self) -> bool:
173    """Return if subscription is detached."""
174    if 'detached' in self._resource_data:
175      return bool(self._resource_data['detached'])
176    return False
177
178  def is_big_query_subscription(self) -> bool:
179    """Return Boolean value if subscription is a big query subscription."""
180    if 'bigqueryConfig' in self._resource_data:
181      return True
182    return False
183
184  def is_gcs_subscription(self) -> bool:
185    """Return Boolean value if subscription is a gcs subscription."""
186    if 'cloudStorageConfig' in self._resource_data:
187      return True
188    return False
189
190  def is_push_subscription(self) -> bool:
191    """Return Boolean value if subscription is a push subscription."""
192    if (self._resource_data['pushConfig'] or self.is_big_query_subscription() or
193        self.is_gcs_subscription()):
194      return True
195    return False
196
197  def is_active(self) -> bool:
198    """Return Boolean value if subscription is active."""
199    return self._resource_data['state'] == 'ACTIVE'
200
201  def has_dead_letter_topic(self) -> bool:
202    """Return Truthy value if subscription has a dead-letter topic."""
203    if 'deadLetterPolicy' in self._resource_data:
204      return bool(self._resource_data['deadLetterPolicy']['deadLetterTopic'])
205    return False
206
207  def dead_letter_topic(self) -> str:
208    """Return the dead-letter topic."""
209    if self.has_dead_letter_topic():
210      return self._resource_data.get('deadLetterPolicy',
211                                     {}).get('deadLetterTopic', '')
212    return ''
213
214  def gcs_subscription_bucket(self) -> str:
215    """Return the name of the bucket attached to GCS subscription."""
216    if self.is_gcs_subscription():
217      return get_path(self._resource_data, ('cloudStorageConfig', 'bucket'))
218    return ''  # acts as a null return that can be evaluated as a false value

Represent a Subscription.

Subscription(project_id, resource_data)
119  def __init__(self, project_id, resource_data):
120    super().__init__(project_id=project_id)
121    self._resource_data = resource_data
122    self._metadata_dict = None
name: str
124  @property
125  def name(self) -> str:
126    m = re.search(r'/subscriptions/([^/]+)$', self._resource_data['name'])
127    if not m:
128      raise RuntimeError('can\'t determine name of subscription %s' %
129                         (self._resource_data['name']))
130    return m.group(1)
full_path: str
132  @property
133  def full_path(self) -> str:
134    return self._resource_data['name']

Returns the full path of this resource.

Example: 'projects/gcpdiag-gke-1-9b90/zones/europe-west4-a/clusters/gke1'

short_path: str
136  @property
137  def short_path(self) -> str:
138    path = self.project_id + '/' + self.name
139    return path

Returns the short name for this resource.

Note that it isn't clear from this name what kind of resource it is.

Example: 'gke1'

topic: Union[Topic, str]
141  @property
142  def topic(self) -> Union[Topic, str]:
143    """
144    Return subscription's topic as a Topic object,
145    or String '_deleted-topic_' if topic is deleted.
146    """
147    if 'topic' not in self._resource_data:
148      raise RuntimeError('topic not set for subscription {self.name}')
149    elif self._resource_data['topic'] == '_deleted-topic_':
150      return '_deleted_topic_'
151
152    m = re.match(r'projects/([^/]+)/topics/([^/]+)',
153                 self._resource_data['topic'])
154    if not m:
155      raise RuntimeError("can't parse topic: %s" % self._resource_data['topic'])
156    (project_id, topic_name) = (m.group(1), self._resource_data['topic'])
157    topics = get_topics(models.Context(project_id))
158    if topic_name not in topics:
159      raise RuntimeError(
160          f'Topic {topic_name} for Subscription {self.name} not found')
161    return topics[topic_name]

Return subscription's topic as a Topic object, or String '_deleted-topic_' if topic is deleted.

push_config: dict
163  @property
164  def push_config(self) -> dict:
165    return self._resource_data.get('pushConfig', {})
push_oidc_service_account_email: str
167  @property
168  def push_oidc_service_account_email(self) -> str:
169    """Return the OIDC service account email for a push subscription."""
170    return self.push_config.get('oidcToken', {}).get('serviceAccountEmail', '')

Return the OIDC service account email for a push subscription.

def is_detached(self) -> bool:
172  def is_detached(self) -> bool:
173    """Return if subscription is detached."""
174    if 'detached' in self._resource_data:
175      return bool(self._resource_data['detached'])
176    return False

Return if subscription is detached.

def is_big_query_subscription(self) -> bool:
178  def is_big_query_subscription(self) -> bool:
179    """Return Boolean value if subscription is a big query subscription."""
180    if 'bigqueryConfig' in self._resource_data:
181      return True
182    return False

Return Boolean value if subscription is a big query subscription.

def is_gcs_subscription(self) -> bool:
184  def is_gcs_subscription(self) -> bool:
185    """Return Boolean value if subscription is a gcs subscription."""
186    if 'cloudStorageConfig' in self._resource_data:
187      return True
188    return False

Return Boolean value if subscription is a gcs subscription.

def is_push_subscription(self) -> bool:
190  def is_push_subscription(self) -> bool:
191    """Return Boolean value if subscription is a push subscription."""
192    if (self._resource_data['pushConfig'] or self.is_big_query_subscription() or
193        self.is_gcs_subscription()):
194      return True
195    return False

Return Boolean value if subscription is a push subscription.

def is_active(self) -> bool:
197  def is_active(self) -> bool:
198    """Return Boolean value if subscription is active."""
199    return self._resource_data['state'] == 'ACTIVE'

Return Boolean value if subscription is active.

def has_dead_letter_topic(self) -> bool:
201  def has_dead_letter_topic(self) -> bool:
202    """Return Truthy value if subscription has a dead-letter topic."""
203    if 'deadLetterPolicy' in self._resource_data:
204      return bool(self._resource_data['deadLetterPolicy']['deadLetterTopic'])
205    return False

Return Truthy value if subscription has a dead-letter topic.

def dead_letter_topic(self) -> str:
207  def dead_letter_topic(self) -> str:
208    """Return the dead-letter topic."""
209    if self.has_dead_letter_topic():
210      return self._resource_data.get('deadLetterPolicy',
211                                     {}).get('deadLetterTopic', '')
212    return ''

Return the dead-letter topic.

def gcs_subscription_bucket(self) -> str:
214  def gcs_subscription_bucket(self) -> str:
215    """Return the name of the bucket attached to GCS subscription."""
216    if self.is_gcs_subscription():
217      return get_path(self._resource_data, ('cloudStorageConfig', 'bucket'))
218    return ''  # acts as a null return that can be evaluated as a false value

Return the name of the bucket attached to GCS subscription.

@caching.cached_api_call
def get_subscriptions( context: gcpdiag.models.Context) -> Mapping[str, Subscription]:
221@caching.cached_api_call
222def get_subscriptions(context: models.Context) -> Mapping[str, Subscription]:
223  subscriptions: Dict[str, Subscription] = {}
224  if not apis.is_enabled(context.project_id, 'pubsub'):
225    return subscriptions
226  pubsub_api = apis.get_api('pubsub', 'v1', context.project_id)
227  logging.debug('fetching list of PubSub subscriptions in project %s',
228                context.project_id)
229  query = pubsub_api.projects().subscriptions().list(
230      project=f'projects/{context.project_id}')
231  try:
232    resp = query.execute(num_retries=config.API_RETRIES)
233    if 'subscriptions' not in resp:
234      return subscriptions
235    for s in resp['subscriptions']:
236      # verify that we have some minimal data that we expect
237      if 'name' not in s:
238        raise RuntimeError('missing data in topics response')
239
240      # projects/{project}/subscriptions/{sub}
241      result = re.match(r'projects/[^/]+/subscriptions/([^/]+)', s['name'])
242      if not result:
243        logging.error('invalid subscription data: %s', s['name'])
244        continue
245
246      if not context.match_project_resource(resource=result.group(1),
247                                            labels=s.get('labels', {})):
248        continue
249
250      subscriptions[s['name']] = Subscription(project_id=context.project_id,
251                                              resource_data=s)
252  except googleapiclient.errors.HttpError as err:
253    raise utils.GcpApiError(err) from err
254  return subscriptions
@caching.cached_api_call
def get_subscription( project_id: str, subscription_name: str) -> Optional[Subscription]:
257@caching.cached_api_call
258def get_subscription(project_id: str,
259                     subscription_name: str) -> Union[None, Subscription]:
260  if not apis.is_enabled(project_id, 'pubsub'):
261    return None
262  pubsub_api = apis.get_api('pubsub', 'v1', project_id)
263  logging.debug('fetching PubSub subscription in project %s', project_id)
264  query = pubsub_api.projects().subscriptions().get(
265      subscription=f'projects/{project_id}/subscriptions/{subscription_name}')
266  try:
267    resp = query.execute(num_retries=config.API_RETRIES)
268    return Subscription(project_id=project_id, resource_data=resp)
269  except googleapiclient.errors.HttpError as err:
270    raise utils.GcpApiError(err) from err
class SubscriptionIAMPolicy(gcpdiag.queries.iam.BaseIAMPolicy):
273class SubscriptionIAMPolicy(iam.BaseIAMPolicy):
274
275  def _is_resource_permission(self, permission):
276    return True

Common class for IAM policies

@caching.cached_api_call(in_memory=True)
def get_subscription_iam_policy( context: gcpdiag.models.Context, name: str) -> SubscriptionIAMPolicy:
279@caching.cached_api_call(in_memory=True)
280def get_subscription_iam_policy(context: models.Context,
281                                name: str) -> SubscriptionIAMPolicy:
282  project_id = utils.get_project_by_res_name(name)
283
284  pubsub_api = apis.get_api('pubsub', 'v1', project_id)
285  request = pubsub_api.projects().subscriptions().getIamPolicy(resource=name)
286
287  return iam.fetch_iam_policy(request, SubscriptionIAMPolicy, project_id, name,
288                              context)