gcpdiag.queries.pubsub

Queries related to GCP PubSub
class Topic(gcpdiag.models.Resource):
32class Topic(models.Resource):
33  """Represent a Topic"""
34  _resource_data: dict
35
36  def __init__(self, project_id, resource_data):
37    super().__init__(project_id=project_id)
38    self._resource_data = resource_data
39    self._metadata_dict = None
40
41  @property
42  def name(self) -> str:
43    m = re.search(r'/topics/([^/]+)$', self._resource_data['name'])
44    if not m:
45      raise RuntimeError('can\'t determine name of topic %s' %
46                         (self._resource_data['name']))
47    return m.group(1)
48
49  @property
50  def full_path(self) -> str:
51    return self._resource_data['name']
52
53  @property
54  def short_path(self) -> str:
55    path = self.project_id + '/' + self.name
56    return path
57
58  @property
59  def kms_key_name(self) -> str:
60    return self._resource_data['kmsKeyName']

Represent a Topic

Topic(project_id, resource_data)
36  def __init__(self, project_id, resource_data):
37    super().__init__(project_id=project_id)
38    self._resource_data = resource_data
39    self._metadata_dict = None
name: str
41  @property
42  def name(self) -> str:
43    m = re.search(r'/topics/([^/]+)$', self._resource_data['name'])
44    if not m:
45      raise RuntimeError('can\'t determine name of topic %s' %
46                         (self._resource_data['name']))
47    return m.group(1)
full_path: str
49  @property
50  def full_path(self) -> str:
51    return self._resource_data['name']

Returns the full path of this resource.

Example: 'projects/gcpdiag-gke-1-9b90/zones/europe-west4-a/clusters/gke1'

short_path: str
53  @property
54  def short_path(self) -> str:
55    path = self.project_id + '/' + self.name
56    return path

Returns the short name for this resource.

Note that it isn't clear from this name what kind of resource it is.

Example: 'gke1'

kms_key_name: str
58  @property
59  def kms_key_name(self) -> str:
60    return self._resource_data['kmsKeyName']
@caching.cached_api_call
def get_topics( context: gcpdiag.models.Context) -> Mapping[str, Topic]:
63@caching.cached_api_call
64def get_topics(context: models.Context) -> Mapping[str, Topic]:
65  """Get all topics(Does not include deleted topics)."""
66  topics: Dict[str, Topic] = {}
67  if not apis.is_enabled(context.project_id, 'pubsub'):
68    return topics
69  pubsub_api = apis.get_api('pubsub', 'v1', context.project_id)
70  logging.info('fetching list of PubSub topics in project %s',
71               context.project_id)
72  query = pubsub_api.projects().topics().list(
73      project=f'projects/{context.project_id}')
74  try:
75    resp = query.execute(num_retries=config.API_RETRIES)
76    if 'topics' not in resp:
77      return topics
78    for t in resp['topics']:
79      # verify that we have some minimal data that we expect
80      if 'name' not in t:
81        raise RuntimeError('missing data in topics response')
82        # projects/{project}/topics/{topic}
83      result = re.match(r'projects/[^/]+/topics/([^/]+)', t['name'])
84      if not result:
85        logging.error('invalid topic data: %s', t['name'])
86        continue
87
88      if not context.match_project_resource(resource=result.group(1),
89                                            labels=t.get('labels', {})):
90        continue
91
92      topics[t['name']] = Topic(project_id=context.project_id, resource_data=t)
93  except googleapiclient.errors.HttpError as err:
94    raise utils.GcpApiError(err) from err
95  return topics

Get all topics(Does not include deleted topics).

class TopicIAMPolicy(gcpdiag.queries.iam.BaseIAMPolicy):
 98class TopicIAMPolicy(iam.BaseIAMPolicy):
 99
100  def _is_resource_permission(self, permission):
101    return True

Common class for IAM policies

@caching.cached_api_call(in_memory=True)
def get_topic_iam_policy(name: str) -> TopicIAMPolicy:
104@caching.cached_api_call(in_memory=True)
105def get_topic_iam_policy(name: str) -> TopicIAMPolicy:
106  project_id = utils.get_project_by_res_name(name)
107
108  pubsub_api = apis.get_api('pubsub', 'v1', project_id)
109  request = pubsub_api.projects().topics().getIamPolicy(resource=name)
110
111  return iam.fetch_iam_policy(request, TopicIAMPolicy, project_id, name)
class Subscription(gcpdiag.models.Resource):
114class Subscription(models.Resource):
115  """Represent a Subscription."""
116  _resource_data: dict
117
118  def __init__(self, project_id, resource_data):
119    super().__init__(project_id=project_id)
120    self._resource_data = resource_data
121    self._metadata_dict = None
122
123  @property
124  def name(self) -> str:
125    m = re.search(r'/subscriptions/([^/]+)$', self._resource_data['name'])
126    if not m:
127      raise RuntimeError('can\'t determine name of subscription %s' %
128                         (self._resource_data['name']))
129    return m.group(1)
130
131  @property
132  def full_path(self) -> str:
133    return self._resource_data['name']
134
135  @property
136  def short_path(self) -> str:
137    path = self.project_id + '/' + self.name
138    return path
139
140  @property
141  def topic(self) -> Union[Topic, str]:
142    """
143    Return subscription's topic as a Topic object,
144    or String '_deleted-topic_' if topic is deleted.
145    """
146    if 'topic' not in self._resource_data:
147      raise RuntimeError('topic not set for subscription {self.name}')
148    elif self._resource_data['topic'] == '_deleted-topic_':
149      return '_deleted_topic_'
150
151    m = re.match(r'projects/([^/]+)/topics/([^/]+)',
152                 self._resource_data['topic'])
153    if not m:
154      raise RuntimeError("can't parse topic: %s" % self._resource_data['topic'])
155    (project_id, topic_name) = (m.group(1), self._resource_data['topic'])
156    topics = get_topics(models.Context(project_id))
157    if topic_name not in topics:
158      raise RuntimeError(
159          f'Topic {topic_name} for Subscription {self.name} not found')
160    return topics[topic_name]
161
162  def is_detached(self) -> bool:
163    """Return if subscription is detached."""
164    if 'detached' in self._resource_data:
165      return bool(self._resource_data['detached'])
166    return False
167
168  def is_big_query_subscription(self) -> bool:
169    """Return Boolean value if subscription is a big query subscription."""
170    if 'bigqueryConfig' in self._resource_data:
171      return True
172    return False
173
174  def is_gcs_subscription(self) -> bool:
175    """Return Boolean value if subscription is a gcs subscription."""
176    if 'cloudStorageConfig' in self._resource_data:
177      return True
178    return False
179
180  def is_push_subscription(self) -> bool:
181    """Return Boolean value if subscription is a push subscription."""
182    if (self._resource_data['pushConfig'] or self.is_big_query_subscription() or
183        self.is_gcs_subscription()):
184      return True
185    return False
186
187  def has_dead_letter_topic(self) -> bool:
188    """Return Truthy value if subscription has a dead-letter topic."""
189    if 'deadLetterPolicy' in self._resource_data:
190      return bool(self._resource_data['deadLetterPolicy']['deadLetterTopic'])
191    return False
192
193  def gcs_subscription_bucket(self) -> str:
194    """Return the name of the bucket attached to GCS subscription."""
195    if self.is_gcs_subscription():
196      return get_path(self._resource_data, ('cloudStorageConfig', 'bucket'))
197    return ''  # acts as a null return that can be evaluated as a false value

Represent a Subscription.

Subscription(project_id, resource_data)
118  def __init__(self, project_id, resource_data):
119    super().__init__(project_id=project_id)
120    self._resource_data = resource_data
121    self._metadata_dict = None
name: str
123  @property
124  def name(self) -> str:
125    m = re.search(r'/subscriptions/([^/]+)$', self._resource_data['name'])
126    if not m:
127      raise RuntimeError('can\'t determine name of subscription %s' %
128                         (self._resource_data['name']))
129    return m.group(1)
full_path: str
131  @property
132  def full_path(self) -> str:
133    return self._resource_data['name']

Returns the full path of this resource.

Example: 'projects/gcpdiag-gke-1-9b90/zones/europe-west4-a/clusters/gke1'

short_path: str
135  @property
136  def short_path(self) -> str:
137    path = self.project_id + '/' + self.name
138    return path

Returns the short name for this resource.

Note that it isn't clear from this name what kind of resource it is.

Example: 'gke1'

topic: Union[Topic, str]
140  @property
141  def topic(self) -> Union[Topic, str]:
142    """
143    Return subscription's topic as a Topic object,
144    or String '_deleted-topic_' if topic is deleted.
145    """
146    if 'topic' not in self._resource_data:
147      raise RuntimeError('topic not set for subscription {self.name}')
148    elif self._resource_data['topic'] == '_deleted-topic_':
149      return '_deleted_topic_'
150
151    m = re.match(r'projects/([^/]+)/topics/([^/]+)',
152                 self._resource_data['topic'])
153    if not m:
154      raise RuntimeError("can't parse topic: %s" % self._resource_data['topic'])
155    (project_id, topic_name) = (m.group(1), self._resource_data['topic'])
156    topics = get_topics(models.Context(project_id))
157    if topic_name not in topics:
158      raise RuntimeError(
159          f'Topic {topic_name} for Subscription {self.name} not found')
160    return topics[topic_name]

Return subscription's topic as a Topic object, or String '_deleted-topic_' if topic is deleted.

def is_detached(self) -> bool:
162  def is_detached(self) -> bool:
163    """Return if subscription is detached."""
164    if 'detached' in self._resource_data:
165      return bool(self._resource_data['detached'])
166    return False

Return if subscription is detached.

def is_big_query_subscription(self) -> bool:
168  def is_big_query_subscription(self) -> bool:
169    """Return Boolean value if subscription is a big query subscription."""
170    if 'bigqueryConfig' in self._resource_data:
171      return True
172    return False

Return Boolean value if subscription is a big query subscription.

def is_gcs_subscription(self) -> bool:
174  def is_gcs_subscription(self) -> bool:
175    """Return Boolean value if subscription is a gcs subscription."""
176    if 'cloudStorageConfig' in self._resource_data:
177      return True
178    return False

Return Boolean value if subscription is a gcs subscription.

def is_push_subscription(self) -> bool:
180  def is_push_subscription(self) -> bool:
181    """Return Boolean value if subscription is a push subscription."""
182    if (self._resource_data['pushConfig'] or self.is_big_query_subscription() or
183        self.is_gcs_subscription()):
184      return True
185    return False

Return Boolean value if subscription is a push subscription.

def has_dead_letter_topic(self) -> bool:
187  def has_dead_letter_topic(self) -> bool:
188    """Return Truthy value if subscription has a dead-letter topic."""
189    if 'deadLetterPolicy' in self._resource_data:
190      return bool(self._resource_data['deadLetterPolicy']['deadLetterTopic'])
191    return False

Return Truthy value if subscription has a dead-letter topic.

def gcs_subscription_bucket(self) -> str:
193  def gcs_subscription_bucket(self) -> str:
194    """Return the name of the bucket attached to GCS subscription."""
195    if self.is_gcs_subscription():
196      return get_path(self._resource_data, ('cloudStorageConfig', 'bucket'))
197    return ''  # acts as a null return that can be evaluated as a false value

Return the name of the bucket attached to GCS subscription.

@caching.cached_api_call
def get_subscriptions( context: gcpdiag.models.Context) -> Mapping[str, Subscription]:
200@caching.cached_api_call
201def get_subscriptions(context: models.Context) -> Mapping[str, Subscription]:
202  subscriptions: Dict[str, Subscription] = {}
203  if not apis.is_enabled(context.project_id, 'pubsub'):
204    return subscriptions
205  pubsub_api = apis.get_api('pubsub', 'v1', context.project_id)
206  logging.info('fetching list of PubSub subscriptions in project %s',
207               context.project_id)
208  query = pubsub_api.projects().subscriptions().list(
209      project=f'projects/{context.project_id}')
210  try:
211    resp = query.execute(num_retries=config.API_RETRIES)
212    if 'subscriptions' not in resp:
213      return subscriptions
214    for s in resp['subscriptions']:
215      # verify that we have some minimal data that we expect
216      if 'name' not in s:
217        raise RuntimeError('missing data in topics response')
218
219      # projects/{project}/subscriptions/{sub}
220      result = re.match(r'projects/[^/]+/subscriptions/([^/]+)', s['name'])
221      if not result:
222        logging.error('invalid subscription data: %s', s['name'])
223        continue
224
225      if not context.match_project_resource(resource=result.group(1),
226                                            labels=s.get('labels', {})):
227        continue
228
229      subscriptions[s['name']] = Subscription(project_id=context.project_id,
230                                              resource_data=s)
231  except googleapiclient.errors.HttpError as err:
232    raise utils.GcpApiError(err) from err
233  return subscriptions
@caching.cached_api_call
def get_subscription( project_id: str, subscription_name: str) -> Optional[Subscription]:
236@caching.cached_api_call
237def get_subscription(project_id: str,
238                     subscription_name: str) -> Union[None, Subscription]:
239  if not apis.is_enabled(project_id, 'pubsub'):
240    return None
241  pubsub_api = apis.get_api('pubsub', 'v1', project_id)
242  logging.info('fetching PubSub subscription in project %s', project_id)
243  query = pubsub_api.projects().subscriptions().get(
244      subscription=f'projects/{project_id}/subscriptions/{subscription_name}')
245  try:
246    resp = query.execute(num_retries=config.API_RETRIES)
247    return Subscription(project_id=project_id, resource_data=resp)
248  except googleapiclient.errors.HttpError as err:
249    raise utils.GcpApiError(err) from err
class SubscriptionIAMPolicy(gcpdiag.queries.iam.BaseIAMPolicy):
252class SubscriptionIAMPolicy(iam.BaseIAMPolicy):
253
254  def _is_resource_permission(self, permission):
255    return True

Common class for IAM policies

@caching.cached_api_call(in_memory=True)
def get_subscription_iam_policy(name: str) -> SubscriptionIAMPolicy:
258@caching.cached_api_call(in_memory=True)
259def get_subscription_iam_policy(name: str) -> SubscriptionIAMPolicy:
260  project_id = utils.get_project_by_res_name(name)
261
262  pubsub_api = apis.get_api('pubsub', 'v1', project_id)
263  request = pubsub_api.projects().subscriptions().getIamPolicy(resource=name)
264
265  return iam.fetch_iam_policy(request, SubscriptionIAMPolicy, project_id, name)