gcpdiag.queries.network

Queries related to VPC Networks.
IPv4AddrOrIPv6Addr = typing.Union[ipaddress.IPv4Address, ipaddress.IPv6Address]
IPv4NetOrIPv6Net = typing.Union[ipaddress.IPv4Network, ipaddress.IPv6Network]
IPAddrOrNet = typing.Union[ipaddress.IPv4Address, ipaddress.IPv6Address, ipaddress.IPv4Network, ipaddress.IPv6Network]
DEFAULT_MTU = 1460
class Subnetwork(gcpdiag.models.Resource):
35class Subnetwork(models.Resource):
36  """A VPC subnetwork."""
37
38  _resource_data: dict
39  _context: models.Context
40
41  def __init__(self, project_id, resource_data, context: models.Context):
42    super().__init__(project_id=project_id)
43    self._resource_data = resource_data
44    self._context = context
45
46  @property
47  def full_path(self) -> str:
48    result = re.match(r'https://www.googleapis.com/compute/v1/(.*)',
49                      self.self_link)
50    if result:
51      return result.group(1)
52    else:
53      return f'>> {self.self_link}'
54
55  @property
56  def short_path(self) -> str:
57    path = self.project_id + '/' + self.name
58    return path
59
60  @property
61  def name(self) -> str:
62    return self._resource_data['name']
63
64  @property
65  def self_link(self) -> str:
66    return self._resource_data.get('selfLink', '')
67
68  @property
69  def ip_network(self) -> IPv4NetOrIPv6Net:
70    return ipaddress.ip_network(self._resource_data['ipCidrRange'])
71
72  @property
73  def region(self) -> str:
74    # https://www.googleapis.com/compute/v1/projects/gcpdiag-gke1-aaaa/regions/europe-west4
75    m = re.match(
76        r'https://www.googleapis.com/compute/v1/projects/([^/]+)/regions/([^/]+)',
77        self._resource_data['region'])
78    if not m:
79      raise RuntimeError(
80          f"can't parse region URL: {self._resource_data['region']}")
81    return m.group(2)
82
83  def is_private_ip_google_access(self) -> bool:
84    return self._resource_data.get('privateIpGoogleAccess', False)
85
86  @property
87  def network(self):
88    return self._resource_data['network']

A VPC subnetwork.

Subnetwork(project_id, resource_data, context: gcpdiag.models.Context)
41  def __init__(self, project_id, resource_data, context: models.Context):
42    super().__init__(project_id=project_id)
43    self._resource_data = resource_data
44    self._context = context
full_path: str
46  @property
47  def full_path(self) -> str:
48    result = re.match(r'https://www.googleapis.com/compute/v1/(.*)',
49                      self.self_link)
50    if result:
51      return result.group(1)
52    else:
53      return f'>> {self.self_link}'

Returns the full path of this resource.

Example: 'projects/gcpdiag-gke-1-9b90/zones/europe-west4-a/clusters/gke1'

short_path: str
55  @property
56  def short_path(self) -> str:
57    path = self.project_id + '/' + self.name
58    return path

Returns the short name for this resource.

Note that it isn't clear from this name what kind of resource it is.

Example: 'gke1'

name: str
60  @property
61  def name(self) -> str:
62    return self._resource_data['name']
ip_network: Union[ipaddress.IPv4Network, ipaddress.IPv6Network]
68  @property
69  def ip_network(self) -> IPv4NetOrIPv6Net:
70    return ipaddress.ip_network(self._resource_data['ipCidrRange'])
region: str
72  @property
73  def region(self) -> str:
74    # https://www.googleapis.com/compute/v1/projects/gcpdiag-gke1-aaaa/regions/europe-west4
75    m = re.match(
76        r'https://www.googleapis.com/compute/v1/projects/([^/]+)/regions/([^/]+)',
77        self._resource_data['region'])
78    if not m:
79      raise RuntimeError(
80          f"can't parse region URL: {self._resource_data['region']}")
81    return m.group(2)
def is_private_ip_google_access(self) -> bool:
83  def is_private_ip_google_access(self) -> bool:
84    return self._resource_data.get('privateIpGoogleAccess', False)
network
86  @property
87  def network(self):
88    return self._resource_data['network']
class Route(gcpdiag.models.Resource):
 91class Route(models.Resource):
 92  """A VPC Route."""
 93
 94  _resource_data: dict
 95
 96  def __init__(self, project_id, resource_data):
 97    super().__init__(project_id=project_id)
 98    self._resource_data = resource_data
 99
100  @property
101  def full_path(self) -> str:
102    result = re.match(r'https://www.googleapis.com/compute/v1/(.*)',
103                      self.self_link)
104    if result:
105      return result.group(1)
106    else:
107      return f'>> {self.self_link}'
108
109  @property
110  def short_path(self) -> str:
111    path = self.project_id + '/' + self.name
112    return path
113
114  @property
115  def name(self) -> str:
116    return self._resource_data['name']
117
118  @property
119  def kind(self) -> str:
120    return self._resource_data['kind']
121
122  @property
123  def self_link(self) -> str:
124    return self._resource_data['selfLink']
125
126  @property
127  def network(self) -> str:
128    return self._resource_data['network']
129
130  @property
131  def tags(self) -> List[str]:
132    if 'tags' in self._resource_data:
133      return self._resource_data['tags']
134    return []
135
136  @property
137  def dest_range(self) -> str:
138    return self._resource_data['destRange']
139
140  @property
141  def next_hop_gateway(self) -> Optional[str]:
142    if 'nextHopGateway' in self._resource_data:
143      return self._resource_data['nextHopGateway']
144    return None
145
146  @property
147  def next_hop_vpn_tunnel(self) -> Optional[str]:
148    return self._resource_data.get('nextHopVpnTunnel')
149
150  @property
151  def next_hop_hub(self) -> Optional[str]:
152    return self._resource_data.get('nextHopHub')
153
154  @property
155  def priority(self) -> int:
156    return self._resource_data['priority']
157
158  def get_next_hop(self) -> Union[Dict[str, Any], Optional[str]]:
159    hop_types = {
160        'nextHopGateway': 'nextHopGateway',
161        'nextHopVpnTunnel': 'nextHopVpnTunnel',
162        'nextHopHub': 'nextHopHub',
163        'nextHopInstance': 'nextHopInstance',
164        'nextHopAddress': 'nextHopAddress',
165        'nextHopPeering': 'nextHopPeering',
166        'nextHopIlb': 'nextHopIlb',
167        'nextHopNetwork': 'nextHopNetwork',
168        'nextHopIp': 'nextHopIp'
169    }
170
171    for hop_type, value in hop_types.items():
172      if self._resource_data.get(hop_type):
173        return {'type': value, 'link': self._resource_data[hop_type]}
174    return None
175
176  def check_route_match(self, ip1: IPAddrOrNet, ip2: str) -> bool:
177    ip2_list = [ipaddress.ip_network(ip2)]
178    if _ip_match(ip1, ip2_list, 'allow'):
179      return True
180    return False

A VPC Route.

Route(project_id, resource_data)
96  def __init__(self, project_id, resource_data):
97    super().__init__(project_id=project_id)
98    self._resource_data = resource_data
full_path: str
100  @property
101  def full_path(self) -> str:
102    result = re.match(r'https://www.googleapis.com/compute/v1/(.*)',
103                      self.self_link)
104    if result:
105      return result.group(1)
106    else:
107      return f'>> {self.self_link}'

Returns the full path of this resource.

Example: 'projects/gcpdiag-gke-1-9b90/zones/europe-west4-a/clusters/gke1'

short_path: str
109  @property
110  def short_path(self) -> str:
111    path = self.project_id + '/' + self.name
112    return path

Returns the short name for this resource.

Note that it isn't clear from this name what kind of resource it is.

Example: 'gke1'

name: str
114  @property
115  def name(self) -> str:
116    return self._resource_data['name']
kind: str
118  @property
119  def kind(self) -> str:
120    return self._resource_data['kind']
network: str
126  @property
127  def network(self) -> str:
128    return self._resource_data['network']
tags: List[str]
130  @property
131  def tags(self) -> List[str]:
132    if 'tags' in self._resource_data:
133      return self._resource_data['tags']
134    return []
dest_range: str
136  @property
137  def dest_range(self) -> str:
138    return self._resource_data['destRange']
next_hop_gateway: Optional[str]
140  @property
141  def next_hop_gateway(self) -> Optional[str]:
142    if 'nextHopGateway' in self._resource_data:
143      return self._resource_data['nextHopGateway']
144    return None
next_hop_vpn_tunnel: Optional[str]
146  @property
147  def next_hop_vpn_tunnel(self) -> Optional[str]:
148    return self._resource_data.get('nextHopVpnTunnel')
next_hop_hub: Optional[str]
150  @property
151  def next_hop_hub(self) -> Optional[str]:
152    return self._resource_data.get('nextHopHub')
priority: int
154  @property
155  def priority(self) -> int:
156    return self._resource_data['priority']
def get_next_hop(self) -> Union[Dict[str, Any], str, NoneType]:
158  def get_next_hop(self) -> Union[Dict[str, Any], Optional[str]]:
159    hop_types = {
160        'nextHopGateway': 'nextHopGateway',
161        'nextHopVpnTunnel': 'nextHopVpnTunnel',
162        'nextHopHub': 'nextHopHub',
163        'nextHopInstance': 'nextHopInstance',
164        'nextHopAddress': 'nextHopAddress',
165        'nextHopPeering': 'nextHopPeering',
166        'nextHopIlb': 'nextHopIlb',
167        'nextHopNetwork': 'nextHopNetwork',
168        'nextHopIp': 'nextHopIp'
169    }
170
171    for hop_type, value in hop_types.items():
172      if self._resource_data.get(hop_type):
173        return {'type': value, 'link': self._resource_data[hop_type]}
174    return None
def check_route_match( self, ip1: Union[ipaddress.IPv4Address, ipaddress.IPv6Address, ipaddress.IPv4Network, ipaddress.IPv6Network], ip2: str) -> bool:
176  def check_route_match(self, ip1: IPAddrOrNet, ip2: str) -> bool:
177    ip2_list = [ipaddress.ip_network(ip2)]
178    if _ip_match(ip1, ip2_list, 'allow'):
179      return True
180    return False
class ManagedZone(gcpdiag.models.Resource):
183class ManagedZone(models.Resource):
184  """
185  Represent a DNS zone (public or private
186
187  https://cloud.google.com/dns/docs/reference/v1beta2/managedZones
188  """
189  _resource_data: dict
190
191  def __init__(self, project_id, resource_data):
192    super().__init__(project_id=project_id)
193    self._resource_data = resource_data
194
195  @property
196  def cloud_logging_config(self) -> bool:
197    return self._resource_data['cloudLoggingConfig'].get('enableLogging', False)
198
199  @property
200  def is_public(self) -> bool:
201    return self._resource_data['visibility'] == 'public'
202
203  @property
204  def vpc_attached(self) -> bool:
205    if 'privateVisibilityConfig' not in self._resource_data:
206      self._resource_data['privateVisibilityConfig'] = {}
207
208    return (self._resource_data['privateVisibilityConfig'].get(
209        'networks', False) or
210            self._resource_data['privateVisibilityConfig'].get(
211                'gkeClusters', False))
212
213  @property
214  def dnssec_config_state(self) -> bool:
215    if 'dnssecConfig' not in self._resource_data:
216      self._resource_data['dnssecConfig'] = {}
217
218    return self._resource_data['dnssecConfig'].get('state', False)
219
220  @property
221  def name(self) -> str:
222    return self._resource_data['name']
223
224  @property
225  def full_path(self) -> str:
226    result = re.match(r'https://dns.googleapis.com/dns/v1beta2/(.*)',
227                      self.self_link)
228    if result:
229      return result.group(1)
230    else:
231      return f'>> {self.self_link}'
232
233  @property
234  def short_path(self) -> str:
235    path = self.project_id + '/' + self.name
236    return path
237
238  @property
239  def self_link(self) -> str:
240    return self._resource_data.get('selfLink', '')
ManagedZone(project_id, resource_data)
191  def __init__(self, project_id, resource_data):
192    super().__init__(project_id=project_id)
193    self._resource_data = resource_data
cloud_logging_config: bool
195  @property
196  def cloud_logging_config(self) -> bool:
197    return self._resource_data['cloudLoggingConfig'].get('enableLogging', False)
is_public: bool
199  @property
200  def is_public(self) -> bool:
201    return self._resource_data['visibility'] == 'public'
vpc_attached: bool
203  @property
204  def vpc_attached(self) -> bool:
205    if 'privateVisibilityConfig' not in self._resource_data:
206      self._resource_data['privateVisibilityConfig'] = {}
207
208    return (self._resource_data['privateVisibilityConfig'].get(
209        'networks', False) or
210            self._resource_data['privateVisibilityConfig'].get(
211                'gkeClusters', False))
dnssec_config_state: bool
213  @property
214  def dnssec_config_state(self) -> bool:
215    if 'dnssecConfig' not in self._resource_data:
216      self._resource_data['dnssecConfig'] = {}
217
218    return self._resource_data['dnssecConfig'].get('state', False)
name: str
220  @property
221  def name(self) -> str:
222    return self._resource_data['name']
full_path: str
224  @property
225  def full_path(self) -> str:
226    result = re.match(r'https://dns.googleapis.com/dns/v1beta2/(.*)',
227                      self.self_link)
228    if result:
229      return result.group(1)
230    else:
231      return f'>> {self.self_link}'

Returns the full path of this resource.

Example: 'projects/gcpdiag-gke-1-9b90/zones/europe-west4-a/clusters/gke1'

short_path: str
233  @property
234  def short_path(self) -> str:
235    path = self.project_id + '/' + self.name
236    return path

Returns the short name for this resource.

Note that it isn't clear from this name what kind of resource it is.

Example: 'gke1'

class Router(gcpdiag.models.Resource):
243class Router(models.Resource):
244  """A VPC Router."""
245
246  _resource_data: dict
247
248  def __init__(self, project_id, resource_data):
249    super().__init__(project_id=project_id)
250    self._resource_data = resource_data
251    self._nats = None
252
253  @property
254  def full_path(self) -> str:
255    result = re.match(r'https://www.googleapis.com/compute/v1/(.*)',
256                      self.self_link)
257    if result:
258      return result.group(1)
259    else:
260      return f'>> {self.self_link}'
261
262  @property
263  def short_path(self) -> str:
264    path = self.project_id + '/' + self.name
265    return path
266
267  @property
268  def name(self) -> str:
269    return self._resource_data['name']
270
271  @property
272  def self_link(self) -> str:
273    return self._resource_data.get('selfLink', '')
274
275  @property
276  def network(self) -> str:
277    return self._resource_data['network']
278
279  def get_network_name(self) -> str:
280    logging.info('inside get_network_name function')
281    if self._resource_data['network']:
282      return self._resource_data['network'].split('/')[-1]
283    return ''
284
285  @property
286  def nats(self):
287    return self._resource_data.get('nats', [])
288
289  def get_nat_ip_allocate_option(self, nat_gateway) -> str:
290    nats = self._resource_data.get('nats', [])
291    nat = [n for n in nats if n['name'] == nat_gateway]
292    return nat[0].get('natIpAllocateOption', '')
293
294  def get_enable_dynamic_port_allocation(self, nat_gateway) -> str:
295    nats = self._resource_data.get('nats', [])
296    nat = [n for n in nats if n['name'] == nat_gateway]
297    return nat[0].get('enableDynamicPortAllocation', '')
298
299  def subnet_has_nat(self, subnetwork):
300    if not self._resource_data.get('nats', []):
301      return False
302    for n in self._resource_data.get('nats', []):
303      if n['sourceSubnetworkIpRangesToNat'] == 'LIST_OF_SUBNETWORKS':
304        # Cloud NAT configure for specific subnets
305        if 'subnetworks' in n and subnetwork.self_link in [
306            s['name'] for s in n['subnetworks']
307        ]:
308          return True
309      else:
310        # Cloud NAT configured for all subnets
311        return True
312    return False

A VPC Router.

Router(project_id, resource_data)
248  def __init__(self, project_id, resource_data):
249    super().__init__(project_id=project_id)
250    self._resource_data = resource_data
251    self._nats = None
full_path: str
253  @property
254  def full_path(self) -> str:
255    result = re.match(r'https://www.googleapis.com/compute/v1/(.*)',
256                      self.self_link)
257    if result:
258      return result.group(1)
259    else:
260      return f'>> {self.self_link}'

Returns the full path of this resource.

Example: 'projects/gcpdiag-gke-1-9b90/zones/europe-west4-a/clusters/gke1'

short_path: str
262  @property
263  def short_path(self) -> str:
264    path = self.project_id + '/' + self.name
265    return path

Returns the short name for this resource.

Note that it isn't clear from this name what kind of resource it is.

Example: 'gke1'

name: str
267  @property
268  def name(self) -> str:
269    return self._resource_data['name']
network: str
275  @property
276  def network(self) -> str:
277    return self._resource_data['network']
def get_network_name(self) -> str:
279  def get_network_name(self) -> str:
280    logging.info('inside get_network_name function')
281    if self._resource_data['network']:
282      return self._resource_data['network'].split('/')[-1]
283    return ''
nats
285  @property
286  def nats(self):
287    return self._resource_data.get('nats', [])
def get_nat_ip_allocate_option(self, nat_gateway) -> str:
289  def get_nat_ip_allocate_option(self, nat_gateway) -> str:
290    nats = self._resource_data.get('nats', [])
291    nat = [n for n in nats if n['name'] == nat_gateway]
292    return nat[0].get('natIpAllocateOption', '')
def get_enable_dynamic_port_allocation(self, nat_gateway) -> str:
294  def get_enable_dynamic_port_allocation(self, nat_gateway) -> str:
295    nats = self._resource_data.get('nats', [])
296    nat = [n for n in nats if n['name'] == nat_gateway]
297    return nat[0].get('enableDynamicPortAllocation', '')
def subnet_has_nat(self, subnetwork):
299  def subnet_has_nat(self, subnetwork):
300    if not self._resource_data.get('nats', []):
301      return False
302    for n in self._resource_data.get('nats', []):
303      if n['sourceSubnetworkIpRangesToNat'] == 'LIST_OF_SUBNETWORKS':
304        # Cloud NAT configure for specific subnets
305        if 'subnetworks' in n and subnetwork.self_link in [
306            s['name'] for s in n['subnetworks']
307        ]:
308          return True
309      else:
310        # Cloud NAT configured for all subnets
311        return True
312    return False
class RouterStatus(gcpdiag.models.Resource):
315class RouterStatus(models.Resource):
316  """NAT Router Status"""
317
318  _resource_data: dict
319
320  def __init__(self, project_id, resource_data):
321    super().__init__(project_id=project_id)
322    self._resource_data = resource_data
323
324  @property
325  def full_path(self) -> str:
326    result = re.match(r'https://www.googleapis.com/compute/v1/(.*)',
327                      self.self_link)
328    if result:
329      return result.group(1)
330    else:
331      return f'>> {self.self_link}'
332
333  @property
334  def short_path(self) -> str:
335    path = self.project_id + '/' + self.name
336    return path
337
338  @property
339  def name(self) -> str:
340    return self._resource_data.get('name', '')
341
342  @property
343  def self_link(self) -> str:
344    return self._resource_data.get('selfLink', '')
345
346  @property
347  def min_extra_nat_ips_needed(self) -> str:
348    nat_status = self._resource_data.get('result', {}).get('natStatus', {})
349    return nat_status[0].get('minExtraNatIpsNeeded', None)
350
351  @property
352  def num_vms_with_nat_mappings(self) -> str:
353    nat_status = self._resource_data.get('result', {}).get('natStatus', {})
354    return nat_status[0].get('numVmEndpointsWithNatMappings', None)
355
356  @property
357  def bgp_peer_status(self) -> str:
358    bgp_peer_status = self._resource_data.get('result',
359                                              {}).get('bgpPeerStatus', {})
360    return bgp_peer_status

NAT Router Status

RouterStatus(project_id, resource_data)
320  def __init__(self, project_id, resource_data):
321    super().__init__(project_id=project_id)
322    self._resource_data = resource_data
full_path: str
324  @property
325  def full_path(self) -> str:
326    result = re.match(r'https://www.googleapis.com/compute/v1/(.*)',
327                      self.self_link)
328    if result:
329      return result.group(1)
330    else:
331      return f'>> {self.self_link}'

Returns the full path of this resource.

Example: 'projects/gcpdiag-gke-1-9b90/zones/europe-west4-a/clusters/gke1'

short_path: str
333  @property
334  def short_path(self) -> str:
335    path = self.project_id + '/' + self.name
336    return path

Returns the short name for this resource.

Note that it isn't clear from this name what kind of resource it is.

Example: 'gke1'

name: str
338  @property
339  def name(self) -> str:
340    return self._resource_data.get('name', '')
min_extra_nat_ips_needed: str
346  @property
347  def min_extra_nat_ips_needed(self) -> str:
348    nat_status = self._resource_data.get('result', {}).get('natStatus', {})
349    return nat_status[0].get('minExtraNatIpsNeeded', None)
num_vms_with_nat_mappings: str
351  @property
352  def num_vms_with_nat_mappings(self) -> str:
353    nat_status = self._resource_data.get('result', {}).get('natStatus', {})
354    return nat_status[0].get('numVmEndpointsWithNatMappings', None)
bgp_peer_status: str
356  @property
357  def bgp_peer_status(self) -> str:
358    bgp_peer_status = self._resource_data.get('result',
359                                              {}).get('bgpPeerStatus', {})
360    return bgp_peer_status
class RouterNatIpInfo(gcpdiag.models.Resource):
363class RouterNatIpInfo(models.Resource):
364  """NAT IP Info"""
365
366  _resource_data: dict
367
368  def __init__(self, project_id, resource_data):
369    super().__init__(project_id=project_id)
370    self._resource_data = resource_data
371
372  @property
373  def full_path(self) -> str:
374    result = re.match(r'https://www.googleapis.com/compute/v1/(.*)',
375                      self.self_link)
376    if result:
377      return result.group(1)
378    else:
379      return f'>> {self.self_link}'
380
381  @property
382  def short_path(self) -> str:
383    path = self.project_id + '/' + self.name
384    return path
385
386  @property
387  def self_link(self) -> str:
388    return self._resource_data.get('selfLink', '')
389
390  @property
391  def name(self) -> str:
392    return self._resource_data.get('name', '')
393
394  @property
395  def result(self) -> str:
396    return self._resource_data.get('result', [])

NAT IP Info

RouterNatIpInfo(project_id, resource_data)
368  def __init__(self, project_id, resource_data):
369    super().__init__(project_id=project_id)
370    self._resource_data = resource_data
full_path: str
372  @property
373  def full_path(self) -> str:
374    result = re.match(r'https://www.googleapis.com/compute/v1/(.*)',
375                      self.self_link)
376    if result:
377      return result.group(1)
378    else:
379      return f'>> {self.self_link}'

Returns the full path of this resource.

Example: 'projects/gcpdiag-gke-1-9b90/zones/europe-west4-a/clusters/gke1'

short_path: str
381  @property
382  def short_path(self) -> str:
383    path = self.project_id + '/' + self.name
384    return path

Returns the short name for this resource.

Note that it isn't clear from this name what kind of resource it is.

Example: 'gke1'

name: str
390  @property
391  def name(self) -> str:
392    return self._resource_data.get('name', '')
result: str
394  @property
395  def result(self) -> str:
396    return self._resource_data.get('result', [])
@dataclasses.dataclass
class Peering:
399@dataclasses.dataclass
400class Peering:
401  """VPC Peerings"""
402
403  name: str
404  url: str
405  state: str
406  exports_custom_routes: bool
407  imports_custom_routes: bool
408  auto_creates_routes: bool
409
410  def __str__(self):
411    return self.name

VPC Peerings

Peering( name: str, url: str, state: str, exports_custom_routes: bool, imports_custom_routes: bool, auto_creates_routes: bool)
name: str
url: str
state: str
exports_custom_routes: bool
imports_custom_routes: bool
auto_creates_routes: bool
class Network(gcpdiag.models.Resource):
414class Network(models.Resource):
415  """A VPC network."""
416  _resource_data: dict
417  _subnetworks: Optional[Dict[str, Subnetwork]]
418  _context: models.Context
419
420  def __init__(self, project_id, resource_data, context: models.Context):
421    super().__init__(project_id=project_id)
422    self._resource_data = resource_data
423    self._subnetworks = None
424    self._context = context
425
426  @property
427  def full_path(self) -> str:
428    result = re.match(r'https://www.googleapis.com/compute/v1/(.*)',
429                      self.self_link)
430    if result:
431      return result.group(1)
432    else:
433      return f'>> {self.self_link}'
434
435  @property
436  def short_path(self) -> str:
437    path = self.project_id + '/' + self.name
438    return path
439
440  @property
441  def name(self) -> str:
442    return self._resource_data['name']
443
444  @property
445  def self_link(self) -> str:
446    return self._resource_data.get('selfLink', '')
447
448  @property
449  def firewall(self) -> 'EffectiveFirewalls':
450    return _get_effective_firewalls(self)
451
452  @property
453  def mtu(self) -> int:
454    if 'mtu' in self._resource_data:
455      return self._resource_data['mtu']
456    return DEFAULT_MTU
457
458  @property
459  def subnetworks(self) -> Dict[str, Subnetwork]:
460    return _batch_get_subnetworks(
461        self._project_id, frozenset(self._resource_data.get('subnetworks', [])),
462        self._context)
463
464  @property
465  def peerings(self) -> List[Peering]:
466    return [
467        Peering(peer['name'], peer['network'], peer['state'],
468                peer['exportCustomRoutes'], peer['importCustomRoutes'],
469                peer['autoCreateRoutes'])
470        for peer in self._resource_data.get('peerings', [])
471    ]

A VPC network.

Network(project_id, resource_data, context: gcpdiag.models.Context)
420  def __init__(self, project_id, resource_data, context: models.Context):
421    super().__init__(project_id=project_id)
422    self._resource_data = resource_data
423    self._subnetworks = None
424    self._context = context
full_path: str
426  @property
427  def full_path(self) -> str:
428    result = re.match(r'https://www.googleapis.com/compute/v1/(.*)',
429                      self.self_link)
430    if result:
431      return result.group(1)
432    else:
433      return f'>> {self.self_link}'

Returns the full path of this resource.

Example: 'projects/gcpdiag-gke-1-9b90/zones/europe-west4-a/clusters/gke1'

short_path: str
435  @property
436  def short_path(self) -> str:
437    path = self.project_id + '/' + self.name
438    return path

Returns the short name for this resource.

Note that it isn't clear from this name what kind of resource it is.

Example: 'gke1'

name: str
440  @property
441  def name(self) -> str:
442    return self._resource_data['name']
firewall: EffectiveFirewalls
448  @property
449  def firewall(self) -> 'EffectiveFirewalls':
450    return _get_effective_firewalls(self)
mtu: int
452  @property
453  def mtu(self) -> int:
454    if 'mtu' in self._resource_data:
455      return self._resource_data['mtu']
456    return DEFAULT_MTU
subnetworks: Dict[str, Subnetwork]
458  @property
459  def subnetworks(self) -> Dict[str, Subnetwork]:
460    return _batch_get_subnetworks(
461        self._project_id, frozenset(self._resource_data.get('subnetworks', [])),
462        self._context)
peerings: List[Peering]
464  @property
465  def peerings(self) -> List[Peering]:
466    return [
467        Peering(peer['name'], peer['network'], peer['state'],
468                peer['exportCustomRoutes'], peer['importCustomRoutes'],
469                peer['autoCreateRoutes'])
470        for peer in self._resource_data.get('peerings', [])
471    ]
@dataclasses.dataclass
class FirewallCheckResult:
579@dataclasses.dataclass
580class FirewallCheckResult:
581  """The result of a firewall connectivity check."""
582
583  action: str
584  firewall_policy_name: Optional[str] = None
585  firewall_policy_rule_description: Optional[str] = None
586  vpc_firewall_rule_id: Optional[str] = None
587  vpc_firewall_rule_name: Optional[str] = None
588
589  def __str__(self):
590    return self.action
591
592  @property
593  def matched_by_str(self):
594    if self.firewall_policy_name:
595      if self.firewall_policy_rule_description:
596        return f'policy: {self.firewall_policy_name}, rule: {self.firewall_policy_rule_description}'
597      else:
598        return f'policy: {self.firewall_policy_name}'
599    elif self.vpc_firewall_rule_name:
600      return f'vpc firewall rule: {self.vpc_firewall_rule_name}'

The result of a firewall connectivity check.

FirewallCheckResult( action: str, firewall_policy_name: Optional[str] = None, firewall_policy_rule_description: Optional[str] = None, vpc_firewall_rule_id: Optional[str] = None, vpc_firewall_rule_name: Optional[str] = None)
action: str
firewall_policy_name: Optional[str] = None
firewall_policy_rule_description: Optional[str] = None
vpc_firewall_rule_id: Optional[str] = None
vpc_firewall_rule_name: Optional[str] = None
matched_by_str
592  @property
593  def matched_by_str(self):
594    if self.firewall_policy_name:
595      if self.firewall_policy_rule_description:
596        return f'policy: {self.firewall_policy_name}, rule: {self.firewall_policy_rule_description}'
597      else:
598        return f'policy: {self.firewall_policy_name}'
599    elif self.vpc_firewall_rule_name:
600      return f'vpc firewall rule: {self.vpc_firewall_rule_name}'
class FirewallRuleNotFoundError(builtins.Exception):
603class FirewallRuleNotFoundError(Exception):
604  rule_name: str
605
606  def __init__(self, name, disabled=False):
607    # Call the base class constructor with the parameters it needs
608    super().__init__(f'firewall rule not found: {name}')
609    self.rule_name = name
610    self.disabled = disabled

Common base class for all non-exit exceptions.

FirewallRuleNotFoundError(name, disabled=False)
606  def __init__(self, name, disabled=False):
607    # Call the base class constructor with the parameters it needs
608    super().__init__(f'firewall rule not found: {name}')
609    self.rule_name = name
610    self.disabled = disabled
rule_name: str
disabled
class VpcFirewallRule:
613class VpcFirewallRule:
614  """Represents firewall rule"""
615
616  def __init__(self, resource_data):
617    self._resource_data = resource_data
618
619  @property
620  def name(self) -> str:
621    return self._resource_data['name']
622
623  @property
624  def source_ranges(self) -> List[ipaddress.IPv4Network]:
625    return self._resource_data['sourceRanges']
626
627  @property
628  def target_tags(self) -> set:
629    return self._resource_data['targetTags']
630
631  @property
632  def allowed(self) -> List[dict]:
633    return self._resource_data['allowed']
634
635  def is_enabled(self) -> bool:
636    return not self._resource_data['disabled']

Represents firewall rule

VpcFirewallRule(resource_data)
616  def __init__(self, resource_data):
617    self._resource_data = resource_data
name: str
619  @property
620  def name(self) -> str:
621    return self._resource_data['name']
source_ranges: List[ipaddress.IPv4Network]
623  @property
624  def source_ranges(self) -> List[ipaddress.IPv4Network]:
625    return self._resource_data['sourceRanges']
target_tags: set
627  @property
628  def target_tags(self) -> set:
629    return self._resource_data['targetTags']
allowed: List[dict]
631  @property
632  def allowed(self) -> List[dict]:
633    return self._resource_data['allowed']
def is_enabled(self) -> bool:
635  def is_enabled(self) -> bool:
636    return not self._resource_data['disabled']
class EffectiveFirewalls:
 996class EffectiveFirewalls:
 997  """Effective firewall rules for a VPC network or Instance.
 998
 999  Includes org/folder firewall policies)."""
1000  _resource_data: dict
1001  _policies: List[_FirewallPolicy]
1002  _vpc_firewall: _VpcFirewall
1003
1004  def __init__(self, resource_data):
1005    self._resource_data = resource_data
1006    self._policies = []
1007    if 'firewallPolicys' in resource_data:
1008      for policy in resource_data['firewallPolicys']:
1009        self._policies.append(_FirewallPolicy(policy))
1010    self._vpc_firewall = _VpcFirewall(resource_data.get('firewalls', {}))
1011
1012  def check_connectivity_ingress(
1013      self,  #
1014      *,
1015      src_ip: IPAddrOrNet,
1016      ip_protocol: str,
1017      port: Optional[int] = None,
1018      source_service_account: Optional[str] = None,
1019      source_tags: Optional[List[str]] = None,
1020      target_service_account: Optional[str] = None,
1021      target_tags: Optional[List[str]] = None) -> FirewallCheckResult:
1022
1023    if ip_protocol != 'ICMP' and port is None:
1024      raise ValueError('TCP and UDP must have port numbers')
1025
1026    # Firewall policies (organization, folders)
1027    for p in self._policies:
1028      result = p.check_connectivity_ingress(
1029          src_ip=src_ip,
1030          ip_protocol=ip_protocol,
1031          port=port,
1032          #target_network=self._network,
1033          target_service_account=target_service_account)
1034      if result.action != 'goto_next':
1035        return result
1036
1037    # VPC firewall rules
1038    return self._vpc_firewall.check_connectivity_ingress(
1039        src_ip=src_ip,
1040        ip_protocol=ip_protocol,
1041        port=port,
1042        source_service_account=source_service_account,
1043        source_tags=source_tags,
1044        target_service_account=target_service_account,
1045        target_tags=target_tags)
1046
1047  def check_connectivity_egress(
1048      self,  #
1049      *,
1050      src_ip: IPAddrOrNet,
1051      ip_protocol: str,
1052      port: Optional[int] = None,
1053      source_service_account: Optional[str] = None,
1054      source_tags: Optional[List[str]] = None,
1055      target_service_account: Optional[str] = None,
1056      target_tags: Optional[List[str]] = None) -> FirewallCheckResult:
1057
1058    if ip_protocol != 'ICMP' and port is None:
1059      raise ValueError('TCP and UDP must have port numbers')
1060
1061    # Firewall policies (organization, folders)
1062    for p in self._policies:
1063      result = p.check_connectivity_egress(
1064          src_ip=src_ip,
1065          ip_protocol=ip_protocol,
1066          port=port,
1067          #target_network=self._network,
1068          target_service_account=target_service_account)
1069      if result.action != 'goto_next':
1070        return result
1071
1072    # VPC firewall rules
1073    return self._vpc_firewall.check_connectivity_egress(
1074        src_ip=src_ip,
1075        ip_protocol=ip_protocol,
1076        port=port,
1077        source_service_account=source_service_account,
1078        source_tags=source_tags,
1079        target_service_account=target_service_account,
1080        target_tags=target_tags)
1081
1082  def get_vpc_ingress_rules(
1083      self,
1084      name: Optional[str] = None,
1085      name_pattern: Optional[re.Pattern] = None,
1086      target_tags: Optional[List[str]] = None) -> List[VpcFirewallRule]:
1087    """Retrieve the list of ingress firewall rules matching name or name pattern and target tags.
1088
1089    Args:
1090        name (Optional[str], optional): firewall rune name. Defaults to None.
1091        name_pattern (Optional[re.Pattern], optional): firewall rule name pattern. Defaults to None.
1092        target_tags (Optional[List[str]], optional): firewall target tags
1093          (if not specified any tag will match). Defaults to None.
1094
1095    Returns:
1096        List[VpcFirewallRule]: List of ingress firewall rules
1097    """
1098    rules = self._vpc_firewall.get_vpc_ingress_rules(name, name_pattern,
1099                                                     target_tags)
1100    return rules
1101
1102  def get_vpc_egress_rules(
1103      self,
1104      name: Optional[str] = None,
1105      name_pattern: Optional[re.Pattern] = None,
1106      target_tags: Optional[List[str]] = None) -> List[VpcFirewallRule]:
1107    """Retrieve the list of egress firewall rules matching name or name pattern and target tags.
1108
1109    Args:
1110        name (Optional[str], optional): firewall rune name. Defaults to None.
1111        name_pattern (Optional[re.Pattern], optional): firewall rule name pattern. Defaults to None.
1112        target_tags (Optional[List[str]], optional): firewall target tags
1113          (if not specified any tag will match). Defaults to None.
1114
1115    Returns:
1116        List[VpcFirewallRule]: List of egress firewall rules
1117    """
1118    rules = self._vpc_firewall.get_vpc_egress_rules(name, name_pattern,
1119                                                    target_tags)
1120    return rules
1121
1122  def verify_ingress_rule_exists(self, name: str):
1123    """Verify that a certain VPC rule exists. This is useful to verify
1124    whether maybe a permission was missing on a shared VPC and an
1125    automatic rule couldn't be created."""
1126    return self._vpc_firewall.verify_ingress_rule_exists(name)
1127
1128  def verify_egress_rule_exists(self, name: str):
1129    """Verify that a certain VPC rule exists. This is useful to verify
1130    whether maybe a permission was missing on a shared VPC and an
1131    automatic rule couldn't be created."""
1132    return self._vpc_firewall.verify_egress_rule_exists(name)

Effective firewall rules for a VPC network or Instance.

Includes org/folder firewall policies).

EffectiveFirewalls(resource_data)
1004  def __init__(self, resource_data):
1005    self._resource_data = resource_data
1006    self._policies = []
1007    if 'firewallPolicys' in resource_data:
1008      for policy in resource_data['firewallPolicys']:
1009        self._policies.append(_FirewallPolicy(policy))
1010    self._vpc_firewall = _VpcFirewall(resource_data.get('firewalls', {}))
def check_connectivity_ingress( self, *, src_ip: Union[ipaddress.IPv4Address, ipaddress.IPv6Address, ipaddress.IPv4Network, ipaddress.IPv6Network], ip_protocol: str, port: Optional[int] = None, source_service_account: Optional[str] = None, source_tags: Optional[List[str]] = None, target_service_account: Optional[str] = None, target_tags: Optional[List[str]] = None) -> FirewallCheckResult:
1012  def check_connectivity_ingress(
1013      self,  #
1014      *,
1015      src_ip: IPAddrOrNet,
1016      ip_protocol: str,
1017      port: Optional[int] = None,
1018      source_service_account: Optional[str] = None,
1019      source_tags: Optional[List[str]] = None,
1020      target_service_account: Optional[str] = None,
1021      target_tags: Optional[List[str]] = None) -> FirewallCheckResult:
1022
1023    if ip_protocol != 'ICMP' and port is None:
1024      raise ValueError('TCP and UDP must have port numbers')
1025
1026    # Firewall policies (organization, folders)
1027    for p in self._policies:
1028      result = p.check_connectivity_ingress(
1029          src_ip=src_ip,
1030          ip_protocol=ip_protocol,
1031          port=port,
1032          #target_network=self._network,
1033          target_service_account=target_service_account)
1034      if result.action != 'goto_next':
1035        return result
1036
1037    # VPC firewall rules
1038    return self._vpc_firewall.check_connectivity_ingress(
1039        src_ip=src_ip,
1040        ip_protocol=ip_protocol,
1041        port=port,
1042        source_service_account=source_service_account,
1043        source_tags=source_tags,
1044        target_service_account=target_service_account,
1045        target_tags=target_tags)
def check_connectivity_egress( self, *, src_ip: Union[ipaddress.IPv4Address, ipaddress.IPv6Address, ipaddress.IPv4Network, ipaddress.IPv6Network], ip_protocol: str, port: Optional[int] = None, source_service_account: Optional[str] = None, source_tags: Optional[List[str]] = None, target_service_account: Optional[str] = None, target_tags: Optional[List[str]] = None) -> FirewallCheckResult:
1047  def check_connectivity_egress(
1048      self,  #
1049      *,
1050      src_ip: IPAddrOrNet,
1051      ip_protocol: str,
1052      port: Optional[int] = None,
1053      source_service_account: Optional[str] = None,
1054      source_tags: Optional[List[str]] = None,
1055      target_service_account: Optional[str] = None,
1056      target_tags: Optional[List[str]] = None) -> FirewallCheckResult:
1057
1058    if ip_protocol != 'ICMP' and port is None:
1059      raise ValueError('TCP and UDP must have port numbers')
1060
1061    # Firewall policies (organization, folders)
1062    for p in self._policies:
1063      result = p.check_connectivity_egress(
1064          src_ip=src_ip,
1065          ip_protocol=ip_protocol,
1066          port=port,
1067          #target_network=self._network,
1068          target_service_account=target_service_account)
1069      if result.action != 'goto_next':
1070        return result
1071
1072    # VPC firewall rules
1073    return self._vpc_firewall.check_connectivity_egress(
1074        src_ip=src_ip,
1075        ip_protocol=ip_protocol,
1076        port=port,
1077        source_service_account=source_service_account,
1078        source_tags=source_tags,
1079        target_service_account=target_service_account,
1080        target_tags=target_tags)
def get_vpc_ingress_rules( self, name: Optional[str] = None, name_pattern: Optional[re.Pattern] = None, target_tags: Optional[List[str]] = None) -> List[VpcFirewallRule]:
1082  def get_vpc_ingress_rules(
1083      self,
1084      name: Optional[str] = None,
1085      name_pattern: Optional[re.Pattern] = None,
1086      target_tags: Optional[List[str]] = None) -> List[VpcFirewallRule]:
1087    """Retrieve the list of ingress firewall rules matching name or name pattern and target tags.
1088
1089    Args:
1090        name (Optional[str], optional): firewall rune name. Defaults to None.
1091        name_pattern (Optional[re.Pattern], optional): firewall rule name pattern. Defaults to None.
1092        target_tags (Optional[List[str]], optional): firewall target tags
1093          (if not specified any tag will match). Defaults to None.
1094
1095    Returns:
1096        List[VpcFirewallRule]: List of ingress firewall rules
1097    """
1098    rules = self._vpc_firewall.get_vpc_ingress_rules(name, name_pattern,
1099                                                     target_tags)
1100    return rules

Retrieve the list of ingress firewall rules matching name or name pattern and target tags.

Arguments:
  • name (Optional[str], optional): firewall rune name. Defaults to None.
  • name_pattern (Optional[re.Pattern], optional): firewall rule name pattern. Defaults to None.
  • target_tags (Optional[List[str]], optional): firewall target tags (if not specified any tag will match). Defaults to None.
Returns:

List[VpcFirewallRule]: List of ingress firewall rules

def get_vpc_egress_rules( self, name: Optional[str] = None, name_pattern: Optional[re.Pattern] = None, target_tags: Optional[List[str]] = None) -> List[VpcFirewallRule]:
1102  def get_vpc_egress_rules(
1103      self,
1104      name: Optional[str] = None,
1105      name_pattern: Optional[re.Pattern] = None,
1106      target_tags: Optional[List[str]] = None) -> List[VpcFirewallRule]:
1107    """Retrieve the list of egress firewall rules matching name or name pattern and target tags.
1108
1109    Args:
1110        name (Optional[str], optional): firewall rune name. Defaults to None.
1111        name_pattern (Optional[re.Pattern], optional): firewall rule name pattern. Defaults to None.
1112        target_tags (Optional[List[str]], optional): firewall target tags
1113          (if not specified any tag will match). Defaults to None.
1114
1115    Returns:
1116        List[VpcFirewallRule]: List of egress firewall rules
1117    """
1118    rules = self._vpc_firewall.get_vpc_egress_rules(name, name_pattern,
1119                                                    target_tags)
1120    return rules

Retrieve the list of egress firewall rules matching name or name pattern and target tags.

Arguments:
  • name (Optional[str], optional): firewall rune name. Defaults to None.
  • name_pattern (Optional[re.Pattern], optional): firewall rule name pattern. Defaults to None.
  • target_tags (Optional[List[str]], optional): firewall target tags (if not specified any tag will match). Defaults to None.
Returns:

List[VpcFirewallRule]: List of egress firewall rules

def verify_ingress_rule_exists(self, name: str):
1122  def verify_ingress_rule_exists(self, name: str):
1123    """Verify that a certain VPC rule exists. This is useful to verify
1124    whether maybe a permission was missing on a shared VPC and an
1125    automatic rule couldn't be created."""
1126    return self._vpc_firewall.verify_ingress_rule_exists(name)

Verify that a certain VPC rule exists. This is useful to verify whether maybe a permission was missing on a shared VPC and an automatic rule couldn't be created.

def verify_egress_rule_exists(self, name: str):
1128  def verify_egress_rule_exists(self, name: str):
1129    """Verify that a certain VPC rule exists. This is useful to verify
1130    whether maybe a permission was missing on a shared VPC and an
1131    automatic rule couldn't be created."""
1132    return self._vpc_firewall.verify_egress_rule_exists(name)

Verify that a certain VPC rule exists. This is useful to verify whether maybe a permission was missing on a shared VPC and an automatic rule couldn't be created.

class VPCEffectiveFirewalls(EffectiveFirewalls):
1135class VPCEffectiveFirewalls(EffectiveFirewalls):
1136  """Effective firewall rules for a VPC network.
1137
1138  Includes org/folder firewall policies).
1139  """
1140  _network: Network
1141
1142  def __init__(self, network, resource_data):
1143    super().__init__(resource_data)
1144    self._network = network

Effective firewall rules for a VPC network.

Includes org/folder firewall policies).

VPCEffectiveFirewalls(network, resource_data)
1142  def __init__(self, network, resource_data):
1143    super().__init__(resource_data)
1144    self._network = network
@caching.cached_api_call(in_memory=True)
def get_network( project_id: str, network_name: str, context: gcpdiag.models.Context) -> Network:
1156@caching.cached_api_call(in_memory=True)
1157def get_network(project_id: str, network_name: str,
1158                context: models.Context) -> Network:
1159  logging.debug('fetching network: %s/%s', project_id, network_name)
1160  compute = apis.get_api('compute', 'v1', project_id)
1161  request = compute.networks().get(project=project_id, network=network_name)
1162  response = request.execute(num_retries=config.API_RETRIES)
1163  return Network(project_id, response, context)
def get_subnetwork_from_url(url: str) -> Subnetwork:
1166def get_subnetwork_from_url(url: str) -> Subnetwork:
1167  """Returns Subnetwork object given subnetwork url"""
1168  m = re.match((r'https://www.googleapis.com/compute/v1/projects/'
1169                r'([^/]+)/regions/([^/]+)/subnetworks/([^/]+)$'), url)
1170  if not m:
1171    raise ValueError(f"can't parse network url: {url}")
1172  (project_id, region, subnetwork_name) = (m.group(1), m.group(2), m.group(3))
1173  return get_subnetwork(project_id, region, subnetwork_name)

Returns Subnetwork object given subnetwork url

def get_network_from_url(url: str) -> Network:
1176def get_network_from_url(url: str) -> Network:
1177  m = re.match(
1178      r'https://www.googleapis.com/compute/v1/projects/([^/]+)/global/networks/([^/]+)',
1179      url)
1180  if not m:
1181    raise ValueError(f"can't parse network url: {url}")
1182  (project_id, network_name) = (m.group(1), m.group(2))
1183  return get_network(project_id, network_name,
1184                     models.Context(project_id=project_id))
@caching.cached_api_call(in_memory=True)
def get_networks(context: gcpdiag.models.Context) -> List[Network]:
1187@caching.cached_api_call(in_memory=True)
1188def get_networks(context: models.Context) -> List[Network]:
1189  logging.debug('fetching network: %s', context.project_id)
1190  compute = apis.get_api('compute', 'v1', context.project_id)
1191  request = compute.networks().list(project=context.project_id)
1192  response = request.execute(num_retries=config.API_RETRIES)
1193  return [
1194      Network(context.project_id, item, context)
1195      for item in response.get('items', [])
1196  ]
@caching.cached_api_call(in_memory=True)
def get_subnetwork( project_id: str, region: str, subnetwork_name: str) -> Subnetwork:
1199@caching.cached_api_call(in_memory=True)
1200def get_subnetwork(project_id: str, region: str,
1201                   subnetwork_name: str) -> Subnetwork:
1202  logging.debug('fetching network: %s/%s', project_id, subnetwork_name)
1203  compute = apis.get_api('compute', 'v1', project_id)
1204  request = compute.subnetworks().get(project=project_id,
1205                                      region=region,
1206                                      subnetwork=subnetwork_name)
1207  response = request.execute(num_retries=config.API_RETRIES)
1208  if response is None:
1209    raise RuntimeError(
1210        f'failed to fetch subnetwork: {project_id}/{region}/{subnetwork_name}')
1211  return Subnetwork(project_id,
1212                    response,
1213                    context=models.Context(project_id=project_id))
@caching.cached_api_call(in_memory=True)
def get_routes(project_id: str) -> List[Route]:
1248@caching.cached_api_call(in_memory=True)
1249def get_routes(project_id: str) -> List[Route]:
1250  logging.debug('fetching routes: %s', project_id)
1251  compute = apis.get_api('compute', 'v1', project_id)
1252  request = compute.routes().list(project=project_id)
1253  response = request.execute(num_retries=config.API_RETRIES)
1254  return [Route(project_id, item) for item in response.get('items', [])]
@caching.cached_api_call(in_memory=True)
def get_zones(project_id: str) -> List[ManagedZone]:
1257@caching.cached_api_call(in_memory=True)
1258def get_zones(project_id: str) -> List[ManagedZone]:
1259  logging.debug('fetching DNS zones: %s', project_id)
1260  dns = apis.get_api('dns', 'v1beta2', project_id)
1261  request = dns.managedZones().list(project=project_id)
1262  response = request.execute(num_retries=config.API_RETRIES)
1263  zones = []
1264  for zone in response.get('managedZones', []):
1265    request2 = dns.managedZones().get(project=project_id,
1266                                      managedZone=zone['name'])
1267    response2 = request2.execute(num_retries=config.API_RETRIES)
1268    zones.append(ManagedZone(project_id, response2))
1269  return zones
@caching.cached_api_call(in_memory=True)
def get_routers( project_id: str, region: str, network) -> List[Router]:
1272@caching.cached_api_call(in_memory=True)
1273def get_routers(project_id: str, region: str, network) -> List[Router]:
1274  logging.debug('fetching routers: %s/%s', project_id, region)
1275  compute = apis.get_api('compute', 'v1', project_id)
1276  request = compute.routers().list(project=project_id,
1277                                   region=region,
1278                                   filter=f'network="{network.self_link}"')
1279  response = request.execute(num_retries=config.API_RETRIES)
1280  return [Router(project_id, item) for item in response.get('items', [])]
@caching.cached_api_call(in_memory=True)
def get_router(project_id: str, region: str, network) -> Router:
1283@caching.cached_api_call(in_memory=True)
1284def get_router(project_id: str, region: str, network) -> Router:
1285  logging.debug('fetching routers: %s/%s', project_id, region)
1286  compute = apis.get_api('compute', 'v1', project_id)
1287  request = compute.routers().list(project=project_id,
1288                                   region=region,
1289                                   filter=f'network="{network.self_link}"')
1290  response = request.execute(num_retries=config.API_RETRIES)
1291  return Router(project_id, next(iter(response.get('items', [{}]))))
@caching.cached_api_call(in_memory=True)
def get_router_by_name( project_id: str, region: str, router_name: str) -> Router:
1294@caching.cached_api_call(in_memory=True)
1295def get_router_by_name(project_id: str, region: str,
1296                       router_name: str) -> Router:
1297  logging.debug('fetching router list: %s/%s in region %s', project_id,
1298                router_name, region)
1299  compute = apis.get_api('compute', 'v1', project_id)
1300  request = compute.routers().list(project=project_id, region=region)
1301  response = request.execute(num_retries=config.API_RETRIES)
1302  return next(
1303      Router(project_id, item)
1304      for item in response.get('items', [])
1305      if item['name'] == router_name)
@caching.cached_api_call(in_memory=True)
def nat_router_status( project_id: str, router_name: str, region: str) -> RouterStatus:
1308@caching.cached_api_call(in_memory=True)
1309def nat_router_status(project_id: str, router_name: str,
1310                      region: str) -> RouterStatus:
1311  logging.debug('fetching router status: %s/%s in region %s', project_id,
1312                router_name, region)
1313  compute = apis.get_api('compute', 'v1', project_id)
1314  request = compute.routers().getRouterStatus(project=project_id,
1315                                              router=router_name,
1316                                              region=region)
1317  response = request.execute(num_retries=config.API_RETRIES)
1318  if 'result' in str(response):
1319    return RouterStatus(project_id, response)
1320  else:
1321    logging.debug('unable to fetch router status: %s/%s in region %s',
1322                  project_id, router_name, region)
1323    return RouterStatus(project_id, {})
@caching.cached_api_call(in_memory=True)
def get_nat_ip_info( project_id: str, router_name: str, region: str) -> RouterNatIpInfo:
1326@caching.cached_api_call(in_memory=True)
1327def get_nat_ip_info(project_id: str, router_name: str,
1328                    region: str) -> RouterNatIpInfo:
1329  logging.debug('fetching NAT IP info for router: %s/%s in region %s',
1330                project_id, router_name, region)
1331  compute = apis.get_api('compute', 'v1', project_id)
1332  request = compute.routers().getNatIpInfo(project=project_id,
1333                                           router=router_name,
1334                                           region=region)
1335  response = request.execute(num_retries=config.API_RETRIES)
1336  if 'result' in str(response):
1337    return RouterNatIpInfo(project_id, response)
1338  else:
1339    logging.debug('unable to fetch Nat IP Info for router: %s/%s in region %s',
1340                  project_id, router_name, region)
1341    return RouterNatIpInfo(project_id, {})
class VPCSubnetworkIAMPolicy(gcpdiag.queries.iam.BaseIAMPolicy):
1344class VPCSubnetworkIAMPolicy(iam.BaseIAMPolicy):
1345
1346  def _is_resource_permission(self, permission):
1347    return True

Common class for IAM policies

@caching.cached_api_call(in_memory=True)
def get_subnetwork_iam_policy( context: gcpdiag.models.Context, region: str, subnetwork_name: str) -> VPCSubnetworkIAMPolicy:
1350@caching.cached_api_call(in_memory=True)
1351def get_subnetwork_iam_policy(context: models.Context, region: str,
1352                              subnetwork_name: str) -> VPCSubnetworkIAMPolicy:
1353  project_id = context.project_id
1354  resource_name = (f'projects/{project_id}/regions/{region}/'
1355                   f'subnetworks/{subnetwork_name}')
1356
1357  compute = apis.get_api('compute', 'v1', project_id)
1358  request = compute.subnetworks().getIamPolicy(project=project_id,
1359                                               region=region,
1360                                               resource=subnetwork_name)
1361
1362  return iam.fetch_iam_policy(request, VPCSubnetworkIAMPolicy, project_id,
1363                              resource_name, context)
class Address(gcpdiag.models.Resource):
1366class Address(models.Resource):
1367  """IP Addresses."""
1368  _resource_data: dict
1369
1370  def __init__(self, project_id, resource_data):
1371    super().__init__(project_id=project_id)
1372    self._resource_data = resource_data
1373
1374  @property
1375  def full_path(self) -> str:
1376    result = re.match(r'https://www.googleapis.com/compute/v1/(.*)',
1377                      self.self_link)
1378    if result:
1379      return result.group(1)
1380    else:
1381      return f'>> {self.self_link}'
1382
1383  @property
1384  def short_path(self) -> str:
1385    path = self.project_id + '/' + self.name
1386    return path
1387
1388  @property
1389  def name(self) -> str:
1390    return self._resource_data['name']
1391
1392  @property
1393  def self_link(self) -> str:
1394    return self._resource_data.get('selfLink', '')
1395
1396  @property
1397  def subnetwork(self) -> str:
1398    return self._resource_data['subnetwork']
1399
1400  @property
1401  def status(self) -> str:
1402    return self._resource_data['status']

IP Addresses.

Address(project_id, resource_data)
1370  def __init__(self, project_id, resource_data):
1371    super().__init__(project_id=project_id)
1372    self._resource_data = resource_data
full_path: str
1374  @property
1375  def full_path(self) -> str:
1376    result = re.match(r'https://www.googleapis.com/compute/v1/(.*)',
1377                      self.self_link)
1378    if result:
1379      return result.group(1)
1380    else:
1381      return f'>> {self.self_link}'

Returns the full path of this resource.

Example: 'projects/gcpdiag-gke-1-9b90/zones/europe-west4-a/clusters/gke1'

short_path: str
1383  @property
1384  def short_path(self) -> str:
1385    path = self.project_id + '/' + self.name
1386    return path

Returns the short name for this resource.

Note that it isn't clear from this name what kind of resource it is.

Example: 'gke1'

name: str
1388  @property
1389  def name(self) -> str:
1390    return self._resource_data['name']
subnetwork: str
1396  @property
1397  def subnetwork(self) -> str:
1398    return self._resource_data['subnetwork']
status: str
1400  @property
1401  def status(self) -> str:
1402    return self._resource_data['status']
@caching.cached_api_call(in_memory=True)
def get_addresses(project_id: str) -> List[Address]:
1405@caching.cached_api_call(in_memory=True)
1406def get_addresses(project_id: str) -> List[Address]:
1407  logging.debug('fetching addresses list: %s', project_id)
1408  compute = apis.get_api('compute', 'v1', project_id)
1409  addresses = []
1410  request = compute.addresses().aggregatedList(project=project_id)
1411  response = request.execute(num_retries=config.API_RETRIES)
1412  addresses_by_regions = response['items']
1413  for _, data_ in addresses_by_regions.items():
1414    if 'addresses' not in data_:
1415      continue
1416    addresses.extend(
1417        [Address(project_id, address) for address in data_['addresses']])
1418  return addresses