gcpdiag.queries.network

Queries related to VPC Networks.
IPv4AddrOrIPv6Addr = typing.Union[ipaddress.IPv4Address, ipaddress.IPv6Address]
IPv4NetOrIPv6Net = typing.Union[ipaddress.IPv4Network, ipaddress.IPv6Network]
IPAddrOrNet = typing.Union[ipaddress.IPv4Address, ipaddress.IPv6Address, ipaddress.IPv4Network, ipaddress.IPv6Network]
DEFAULT_MTU = 1460
class Subnetwork(gcpdiag.models.Resource):
35class Subnetwork(models.Resource):
36  """A VPC subnetwork."""
37
38  _resource_data: dict
39
40  def __init__(self, project_id, resource_data):
41    super().__init__(project_id=project_id)
42    self._resource_data = resource_data
43
44  @property
45  def full_path(self) -> str:
46    result = re.match(r'https://www.googleapis.com/compute/v1/(.*)',
47                      self.self_link)
48    if result:
49      return result.group(1)
50    else:
51      return f'>> {self.self_link}'
52
53  @property
54  def short_path(self) -> str:
55    path = self.project_id + '/' + self.name
56    return path
57
58  @property
59  def name(self) -> str:
60    return self._resource_data['name']
61
62  @property
63  def self_link(self) -> str:
64    return self._resource_data.get('selfLink', '')
65
66  @property
67  def ip_network(self) -> IPv4NetOrIPv6Net:
68    return ipaddress.ip_network(self._resource_data['ipCidrRange'])
69
70  @property
71  def region(self) -> str:
72    # https://www.googleapis.com/compute/v1/projects/gcpdiag-gke1-aaaa/regions/europe-west4
73    m = re.match(
74        r'https://www.googleapis.com/compute/v1/projects/([^/]+)/regions/([^/]+)',
75        self._resource_data['region'])
76    if not m:
77      raise RuntimeError(
78          f"can't parse region URL: {self._resource_data['region']}")
79    return m.group(2)
80
81  def is_private_ip_google_access(self) -> bool:
82    return self._resource_data.get('privateIpGoogleAccess', False)
83
84  @property
85  def network(self):
86    return self._resource_data['network']

A VPC subnetwork.

Subnetwork(project_id, resource_data)
40  def __init__(self, project_id, resource_data):
41    super().__init__(project_id=project_id)
42    self._resource_data = resource_data
full_path: str
44  @property
45  def full_path(self) -> str:
46    result = re.match(r'https://www.googleapis.com/compute/v1/(.*)',
47                      self.self_link)
48    if result:
49      return result.group(1)
50    else:
51      return f'>> {self.self_link}'

Returns the full path of this resource.

Example: 'projects/gcpdiag-gke-1-9b90/zones/europe-west4-a/clusters/gke1'

short_path: str
53  @property
54  def short_path(self) -> str:
55    path = self.project_id + '/' + self.name
56    return path

Returns the short name for this resource.

Note that it isn't clear from this name what kind of resource it is.

Example: 'gke1'

name: str
58  @property
59  def name(self) -> str:
60    return self._resource_data['name']
ip_network: Union[ipaddress.IPv4Network, ipaddress.IPv6Network]
66  @property
67  def ip_network(self) -> IPv4NetOrIPv6Net:
68    return ipaddress.ip_network(self._resource_data['ipCidrRange'])
region: str
70  @property
71  def region(self) -> str:
72    # https://www.googleapis.com/compute/v1/projects/gcpdiag-gke1-aaaa/regions/europe-west4
73    m = re.match(
74        r'https://www.googleapis.com/compute/v1/projects/([^/]+)/regions/([^/]+)',
75        self._resource_data['region'])
76    if not m:
77      raise RuntimeError(
78          f"can't parse region URL: {self._resource_data['region']}")
79    return m.group(2)
def is_private_ip_google_access(self) -> bool:
81  def is_private_ip_google_access(self) -> bool:
82    return self._resource_data.get('privateIpGoogleAccess', False)
network
84  @property
85  def network(self):
86    return self._resource_data['network']
class Route(gcpdiag.models.Resource):
 89class Route(models.Resource):
 90  """A VPC Route."""
 91
 92  _resource_data: dict
 93
 94  def __init__(self, project_id, resource_data):
 95    super().__init__(project_id=project_id)
 96    self._resource_data = resource_data
 97
 98  @property
 99  def full_path(self) -> str:
100    result = re.match(r'https://www.googleapis.com/compute/v1/(.*)',
101                      self.self_link)
102    if result:
103      return result.group(1)
104    else:
105      return f'>> {self.self_link}'
106
107  @property
108  def short_path(self) -> str:
109    path = self.project_id + '/' + self.name
110    return path
111
112  @property
113  def name(self) -> str:
114    return self._resource_data['name']
115
116  @property
117  def kind(self) -> str:
118    return self._resource_data['kind']
119
120  @property
121  def self_link(self) -> str:
122    return self._resource_data['selfLink']
123
124  @property
125  def network(self) -> str:
126    return self._resource_data['network']
127
128  @property
129  def tags(self) -> List[str]:
130    if 'tags' in self._resource_data:
131      return self._resource_data['tags']
132    return []
133
134  @property
135  def dest_range(self) -> str:
136    return self._resource_data['destRange']
137
138  @property
139  def next_hop_gateway(self) -> Optional[str]:
140    if 'nextHopGateway' in self._resource_data:
141      return self._resource_data['nextHopGateway']
142    return None
143
144  @property
145  def next_hop_vpn_tunnel(self) -> Optional[str]:
146    return self._resource_data.get('nextHopVpnTunnel')
147
148  @property
149  def next_hop_hub(self) -> Optional[str]:
150    return self._resource_data.get('nextHopHub')
151
152  @property
153  def priority(self) -> int:
154    return self._resource_data['priority']
155
156  def get_next_hop(self) -> Union[Dict[str, Any], Optional[str]]:
157    hop_types = {
158        'nextHopGateway': 'nextHopGateway',
159        'nextHopVpnTunnel': 'nextHopVpnTunnel',
160        'nextHopHub': 'nextHopHub',
161        'nextHopInstance': 'nextHopInstance',
162        'nextHopAddress': 'nextHopAddress',
163        'nextHopPeering': 'nextHopPeering',
164        'nextHopIlb': 'nextHopIlb',
165        'nextHopNetwork': 'nextHopNetwork',
166        'nextHopIp': 'nextHopIp'
167    }
168
169    for hop_type, value in hop_types.items():
170      if self._resource_data.get(hop_type):
171        return {'type': value, 'link': self._resource_data[hop_type]}
172    return None
173
174  def check_route_match(self, ip1: IPAddrOrNet, ip2: str) -> bool:
175    ip2_list = [ipaddress.ip_network(ip2)]
176    if _ip_match(ip1, ip2_list, 'allow'):
177      return True
178    return False

A VPC Route.

Route(project_id, resource_data)
94  def __init__(self, project_id, resource_data):
95    super().__init__(project_id=project_id)
96    self._resource_data = resource_data
full_path: str
 98  @property
 99  def full_path(self) -> str:
100    result = re.match(r'https://www.googleapis.com/compute/v1/(.*)',
101                      self.self_link)
102    if result:
103      return result.group(1)
104    else:
105      return f'>> {self.self_link}'

Returns the full path of this resource.

Example: 'projects/gcpdiag-gke-1-9b90/zones/europe-west4-a/clusters/gke1'

short_path: str
107  @property
108  def short_path(self) -> str:
109    path = self.project_id + '/' + self.name
110    return path

Returns the short name for this resource.

Note that it isn't clear from this name what kind of resource it is.

Example: 'gke1'

name: str
112  @property
113  def name(self) -> str:
114    return self._resource_data['name']
kind: str
116  @property
117  def kind(self) -> str:
118    return self._resource_data['kind']
network: str
124  @property
125  def network(self) -> str:
126    return self._resource_data['network']
tags: List[str]
128  @property
129  def tags(self) -> List[str]:
130    if 'tags' in self._resource_data:
131      return self._resource_data['tags']
132    return []
dest_range: str
134  @property
135  def dest_range(self) -> str:
136    return self._resource_data['destRange']
next_hop_gateway: Optional[str]
138  @property
139  def next_hop_gateway(self) -> Optional[str]:
140    if 'nextHopGateway' in self._resource_data:
141      return self._resource_data['nextHopGateway']
142    return None
next_hop_vpn_tunnel: Optional[str]
144  @property
145  def next_hop_vpn_tunnel(self) -> Optional[str]:
146    return self._resource_data.get('nextHopVpnTunnel')
next_hop_hub: Optional[str]
148  @property
149  def next_hop_hub(self) -> Optional[str]:
150    return self._resource_data.get('nextHopHub')
priority: int
152  @property
153  def priority(self) -> int:
154    return self._resource_data['priority']
def get_next_hop(self) -> Union[Dict[str, Any], str, NoneType]:
156  def get_next_hop(self) -> Union[Dict[str, Any], Optional[str]]:
157    hop_types = {
158        'nextHopGateway': 'nextHopGateway',
159        'nextHopVpnTunnel': 'nextHopVpnTunnel',
160        'nextHopHub': 'nextHopHub',
161        'nextHopInstance': 'nextHopInstance',
162        'nextHopAddress': 'nextHopAddress',
163        'nextHopPeering': 'nextHopPeering',
164        'nextHopIlb': 'nextHopIlb',
165        'nextHopNetwork': 'nextHopNetwork',
166        'nextHopIp': 'nextHopIp'
167    }
168
169    for hop_type, value in hop_types.items():
170      if self._resource_data.get(hop_type):
171        return {'type': value, 'link': self._resource_data[hop_type]}
172    return None
def check_route_match( self, ip1: Union[ipaddress.IPv4Address, ipaddress.IPv6Address, ipaddress.IPv4Network, ipaddress.IPv6Network], ip2: str) -> bool:
174  def check_route_match(self, ip1: IPAddrOrNet, ip2: str) -> bool:
175    ip2_list = [ipaddress.ip_network(ip2)]
176    if _ip_match(ip1, ip2_list, 'allow'):
177      return True
178    return False
class ManagedZone(gcpdiag.models.Resource):
181class ManagedZone(models.Resource):
182  """
183  Represent a DNS zone (public or private
184
185  https://cloud.google.com/dns/docs/reference/v1beta2/managedZones
186  """
187  _resource_data: dict
188
189  def __init__(self, project_id, resource_data):
190    super().__init__(project_id=project_id)
191    self._resource_data = resource_data
192
193  @property
194  def cloud_logging_config(self) -> bool:
195    return self._resource_data['cloudLoggingConfig'].get('enableLogging', False)
196
197  @property
198  def is_public(self) -> bool:
199    return self._resource_data['visibility'] == 'public'
200
201  @property
202  def vpc_attached(self) -> bool:
203    if 'privateVisibilityConfig' not in self._resource_data:
204      self._resource_data['privateVisibilityConfig'] = {}
205
206    return (self._resource_data['privateVisibilityConfig'].get(
207        'networks', False) or
208            self._resource_data['privateVisibilityConfig'].get(
209                'gkeClusters', False))
210
211  @property
212  def dnssec_config_state(self) -> bool:
213    if 'dnssecConfig' not in self._resource_data:
214      self._resource_data['dnssecConfig'] = {}
215
216    return self._resource_data['dnssecConfig'].get('state', False)
217
218  @property
219  def name(self) -> str:
220    return self._resource_data['name']
221
222  @property
223  def full_path(self) -> str:
224    result = re.match(r'https://dns.googleapis.com/dns/v1beta2/(.*)',
225                      self.self_link)
226    if result:
227      return result.group(1)
228    else:
229      return f'>> {self.self_link}'
230
231  @property
232  def short_path(self) -> str:
233    path = self.project_id + '/' + self.name
234    return path
235
236  @property
237  def self_link(self) -> str:
238    return self._resource_data.get('selfLink', '')
ManagedZone(project_id, resource_data)
189  def __init__(self, project_id, resource_data):
190    super().__init__(project_id=project_id)
191    self._resource_data = resource_data
cloud_logging_config: bool
193  @property
194  def cloud_logging_config(self) -> bool:
195    return self._resource_data['cloudLoggingConfig'].get('enableLogging', False)
is_public: bool
197  @property
198  def is_public(self) -> bool:
199    return self._resource_data['visibility'] == 'public'
vpc_attached: bool
201  @property
202  def vpc_attached(self) -> bool:
203    if 'privateVisibilityConfig' not in self._resource_data:
204      self._resource_data['privateVisibilityConfig'] = {}
205
206    return (self._resource_data['privateVisibilityConfig'].get(
207        'networks', False) or
208            self._resource_data['privateVisibilityConfig'].get(
209                'gkeClusters', False))
dnssec_config_state: bool
211  @property
212  def dnssec_config_state(self) -> bool:
213    if 'dnssecConfig' not in self._resource_data:
214      self._resource_data['dnssecConfig'] = {}
215
216    return self._resource_data['dnssecConfig'].get('state', False)
name: str
218  @property
219  def name(self) -> str:
220    return self._resource_data['name']
full_path: str
222  @property
223  def full_path(self) -> str:
224    result = re.match(r'https://dns.googleapis.com/dns/v1beta2/(.*)',
225                      self.self_link)
226    if result:
227      return result.group(1)
228    else:
229      return f'>> {self.self_link}'

Returns the full path of this resource.

Example: 'projects/gcpdiag-gke-1-9b90/zones/europe-west4-a/clusters/gke1'

short_path: str
231  @property
232  def short_path(self) -> str:
233    path = self.project_id + '/' + self.name
234    return path

Returns the short name for this resource.

Note that it isn't clear from this name what kind of resource it is.

Example: 'gke1'

class Router(gcpdiag.models.Resource):
241class Router(models.Resource):
242  """A VPC Router."""
243
244  _resource_data: dict
245
246  def __init__(self, project_id, resource_data):
247    super().__init__(project_id=project_id)
248    self._resource_data = resource_data
249    self._nats = None
250
251  @property
252  def full_path(self) -> str:
253    result = re.match(r'https://www.googleapis.com/compute/v1/(.*)',
254                      self.self_link)
255    if result:
256      return result.group(1)
257    else:
258      return f'>> {self.self_link}'
259
260  @property
261  def short_path(self) -> str:
262    path = self.project_id + '/' + self.name
263    return path
264
265  @property
266  def name(self) -> str:
267    return self._resource_data['name']
268
269  @property
270  def self_link(self) -> str:
271    return self._resource_data.get('selfLink', '')
272
273  @property
274  def network(self) -> str:
275    return self._resource_data['network']
276
277  def get_network_name(self) -> str:
278    logging.info('inside get_network_name function')
279    if self._resource_data['network']:
280      return self._resource_data['network'].split('/')[-1]
281    return ''
282
283  @property
284  def nats(self):
285    return self._resource_data.get('nats', [])
286
287  def get_nat_ip_allocate_option(self, nat_gateway) -> str:
288    nats = self._resource_data.get('nats', [])
289    nat = [n for n in nats if n['name'] == nat_gateway]
290    return nat[0].get('natIpAllocateOption', '')
291
292  def get_enable_dynamic_port_allocation(self, nat_gateway) -> str:
293    nats = self._resource_data.get('nats', [])
294    nat = [n for n in nats if n['name'] == nat_gateway]
295    return nat[0].get('enableDynamicPortAllocation', '')
296
297  def subnet_has_nat(self, subnetwork):
298    if not self._resource_data.get('nats', []):
299      return False
300    for n in self._resource_data.get('nats', []):
301      if n['sourceSubnetworkIpRangesToNat'] == 'LIST_OF_SUBNETWORKS':
302        # Cloud NAT configure for specific subnets
303        if 'subnetworks' in n and subnetwork.self_link in [
304            s['name'] for s in n['subnetworks']
305        ]:
306          return True
307      else:
308        # Cloud NAT configured for all subnets
309        return True
310    return False

A VPC Router.

Router(project_id, resource_data)
246  def __init__(self, project_id, resource_data):
247    super().__init__(project_id=project_id)
248    self._resource_data = resource_data
249    self._nats = None
full_path: str
251  @property
252  def full_path(self) -> str:
253    result = re.match(r'https://www.googleapis.com/compute/v1/(.*)',
254                      self.self_link)
255    if result:
256      return result.group(1)
257    else:
258      return f'>> {self.self_link}'

Returns the full path of this resource.

Example: 'projects/gcpdiag-gke-1-9b90/zones/europe-west4-a/clusters/gke1'

short_path: str
260  @property
261  def short_path(self) -> str:
262    path = self.project_id + '/' + self.name
263    return path

Returns the short name for this resource.

Note that it isn't clear from this name what kind of resource it is.

Example: 'gke1'

name: str
265  @property
266  def name(self) -> str:
267    return self._resource_data['name']
network: str
273  @property
274  def network(self) -> str:
275    return self._resource_data['network']
def get_network_name(self) -> str:
277  def get_network_name(self) -> str:
278    logging.info('inside get_network_name function')
279    if self._resource_data['network']:
280      return self._resource_data['network'].split('/')[-1]
281    return ''
nats
283  @property
284  def nats(self):
285    return self._resource_data.get('nats', [])
def get_nat_ip_allocate_option(self, nat_gateway) -> str:
287  def get_nat_ip_allocate_option(self, nat_gateway) -> str:
288    nats = self._resource_data.get('nats', [])
289    nat = [n for n in nats if n['name'] == nat_gateway]
290    return nat[0].get('natIpAllocateOption', '')
def get_enable_dynamic_port_allocation(self, nat_gateway) -> str:
292  def get_enable_dynamic_port_allocation(self, nat_gateway) -> str:
293    nats = self._resource_data.get('nats', [])
294    nat = [n for n in nats if n['name'] == nat_gateway]
295    return nat[0].get('enableDynamicPortAllocation', '')
def subnet_has_nat(self, subnetwork):
297  def subnet_has_nat(self, subnetwork):
298    if not self._resource_data.get('nats', []):
299      return False
300    for n in self._resource_data.get('nats', []):
301      if n['sourceSubnetworkIpRangesToNat'] == 'LIST_OF_SUBNETWORKS':
302        # Cloud NAT configure for specific subnets
303        if 'subnetworks' in n and subnetwork.self_link in [
304            s['name'] for s in n['subnetworks']
305        ]:
306          return True
307      else:
308        # Cloud NAT configured for all subnets
309        return True
310    return False
class RouterStatus(gcpdiag.models.Resource):
313class RouterStatus(models.Resource):
314  """NAT Router Status"""
315
316  _resource_data: dict
317
318  def __init__(self, project_id, resource_data):
319    super().__init__(project_id=project_id)
320    self._resource_data = resource_data
321
322  @property
323  def full_path(self) -> str:
324    result = re.match(r'https://www.googleapis.com/compute/v1/(.*)',
325                      self.self_link)
326    if result:
327      return result.group(1)
328    else:
329      return f'>> {self.self_link}'
330
331  @property
332  def short_path(self) -> str:
333    path = self.project_id + '/' + self.name
334    return path
335
336  @property
337  def name(self) -> str:
338    return self._resource_data.get('name', '')
339
340  @property
341  def self_link(self) -> str:
342    return self._resource_data.get('selfLink', '')
343
344  @property
345  def min_extra_nat_ips_needed(self) -> str:
346    nat_status = self._resource_data.get('result', {}).get('natStatus', {})
347    return nat_status[0].get('minExtraNatIpsNeeded', None)
348
349  @property
350  def num_vms_with_nat_mappings(self) -> str:
351    nat_status = self._resource_data.get('result', {}).get('natStatus', {})
352    return nat_status[0].get('numVmEndpointsWithNatMappings', None)
353
354  @property
355  def bgp_peer_status(self) -> str:
356    bgp_peer_status = self._resource_data.get('result',
357                                              {}).get('bgpPeerStatus', {})
358    return bgp_peer_status

NAT Router Status

RouterStatus(project_id, resource_data)
318  def __init__(self, project_id, resource_data):
319    super().__init__(project_id=project_id)
320    self._resource_data = resource_data
full_path: str
322  @property
323  def full_path(self) -> str:
324    result = re.match(r'https://www.googleapis.com/compute/v1/(.*)',
325                      self.self_link)
326    if result:
327      return result.group(1)
328    else:
329      return f'>> {self.self_link}'

Returns the full path of this resource.

Example: 'projects/gcpdiag-gke-1-9b90/zones/europe-west4-a/clusters/gke1'

short_path: str
331  @property
332  def short_path(self) -> str:
333    path = self.project_id + '/' + self.name
334    return path

Returns the short name for this resource.

Note that it isn't clear from this name what kind of resource it is.

Example: 'gke1'

name: str
336  @property
337  def name(self) -> str:
338    return self._resource_data.get('name', '')
min_extra_nat_ips_needed: str
344  @property
345  def min_extra_nat_ips_needed(self) -> str:
346    nat_status = self._resource_data.get('result', {}).get('natStatus', {})
347    return nat_status[0].get('minExtraNatIpsNeeded', None)
num_vms_with_nat_mappings: str
349  @property
350  def num_vms_with_nat_mappings(self) -> str:
351    nat_status = self._resource_data.get('result', {}).get('natStatus', {})
352    return nat_status[0].get('numVmEndpointsWithNatMappings', None)
bgp_peer_status: str
354  @property
355  def bgp_peer_status(self) -> str:
356    bgp_peer_status = self._resource_data.get('result',
357                                              {}).get('bgpPeerStatus', {})
358    return bgp_peer_status
class RouterNatIpInfo(gcpdiag.models.Resource):
361class RouterNatIpInfo(models.Resource):
362  """NAT IP Info"""
363
364  _resource_data: dict
365
366  def __init__(self, project_id, resource_data):
367    super().__init__(project_id=project_id)
368    self._resource_data = resource_data
369
370  @property
371  def full_path(self) -> str:
372    result = re.match(r'https://www.googleapis.com/compute/v1/(.*)',
373                      self.self_link)
374    if result:
375      return result.group(1)
376    else:
377      return f'>> {self.self_link}'
378
379  @property
380  def short_path(self) -> str:
381    path = self.project_id + '/' + self.name
382    return path
383
384  @property
385  def self_link(self) -> str:
386    return self._resource_data.get('selfLink', '')
387
388  @property
389  def name(self) -> str:
390    return self._resource_data.get('name', '')
391
392  @property
393  def result(self) -> str:
394    return self._resource_data.get('result', [])

NAT IP Info

RouterNatIpInfo(project_id, resource_data)
366  def __init__(self, project_id, resource_data):
367    super().__init__(project_id=project_id)
368    self._resource_data = resource_data
full_path: str
370  @property
371  def full_path(self) -> str:
372    result = re.match(r'https://www.googleapis.com/compute/v1/(.*)',
373                      self.self_link)
374    if result:
375      return result.group(1)
376    else:
377      return f'>> {self.self_link}'

Returns the full path of this resource.

Example: 'projects/gcpdiag-gke-1-9b90/zones/europe-west4-a/clusters/gke1'

short_path: str
379  @property
380  def short_path(self) -> str:
381    path = self.project_id + '/' + self.name
382    return path

Returns the short name for this resource.

Note that it isn't clear from this name what kind of resource it is.

Example: 'gke1'

name: str
388  @property
389  def name(self) -> str:
390    return self._resource_data.get('name', '')
result: str
392  @property
393  def result(self) -> str:
394    return self._resource_data.get('result', [])
@dataclasses.dataclass
class Peering:
397@dataclasses.dataclass
398class Peering:
399  """VPC Peerings"""
400
401  name: str
402  url: str
403  state: str
404  exports_custom_routes: bool
405  imports_custom_routes: bool
406  auto_creates_routes: bool
407
408  def __str__(self):
409    return self.name

VPC Peerings

Peering( name: str, url: str, state: str, exports_custom_routes: bool, imports_custom_routes: bool, auto_creates_routes: bool)
name: str
url: str
state: str
exports_custom_routes: bool
imports_custom_routes: bool
auto_creates_routes: bool
class Network(gcpdiag.models.Resource):
412class Network(models.Resource):
413  """A VPC network."""
414  _resource_data: dict
415  _subnetworks: Optional[Dict[str, Subnetwork]]
416
417  def __init__(self, project_id, resource_data):
418    super().__init__(project_id=project_id)
419    self._resource_data = resource_data
420    self._subnetworks = None
421
422  @property
423  def full_path(self) -> str:
424    result = re.match(r'https://www.googleapis.com/compute/v1/(.*)',
425                      self.self_link)
426    if result:
427      return result.group(1)
428    else:
429      return f'>> {self.self_link}'
430
431  @property
432  def short_path(self) -> str:
433    path = self.project_id + '/' + self.name
434    return path
435
436  @property
437  def name(self) -> str:
438    return self._resource_data['name']
439
440  @property
441  def self_link(self) -> str:
442    return self._resource_data.get('selfLink', '')
443
444  @property
445  def firewall(self) -> 'EffectiveFirewalls':
446    return _get_effective_firewalls(self)
447
448  @property
449  def mtu(self) -> int:
450    if 'mtu' in self._resource_data:
451      return self._resource_data['mtu']
452    return DEFAULT_MTU
453
454  @property
455  def subnetworks(self) -> Dict[str, Subnetwork]:
456    return _batch_get_subnetworks(
457        self._project_id, frozenset(self._resource_data.get('subnetworks', [])))
458
459  @property
460  def peerings(self) -> List[Peering]:
461    return [
462        Peering(peer['name'], peer['network'], peer['state'],
463                peer['exportCustomRoutes'], peer['importCustomRoutes'],
464                peer['autoCreateRoutes'])
465        for peer in self._resource_data.get('peerings', [])
466    ]

A VPC network.

Network(project_id, resource_data)
417  def __init__(self, project_id, resource_data):
418    super().__init__(project_id=project_id)
419    self._resource_data = resource_data
420    self._subnetworks = None
full_path: str
422  @property
423  def full_path(self) -> str:
424    result = re.match(r'https://www.googleapis.com/compute/v1/(.*)',
425                      self.self_link)
426    if result:
427      return result.group(1)
428    else:
429      return f'>> {self.self_link}'

Returns the full path of this resource.

Example: 'projects/gcpdiag-gke-1-9b90/zones/europe-west4-a/clusters/gke1'

short_path: str
431  @property
432  def short_path(self) -> str:
433    path = self.project_id + '/' + self.name
434    return path

Returns the short name for this resource.

Note that it isn't clear from this name what kind of resource it is.

Example: 'gke1'

name: str
436  @property
437  def name(self) -> str:
438    return self._resource_data['name']
firewall: EffectiveFirewalls
444  @property
445  def firewall(self) -> 'EffectiveFirewalls':
446    return _get_effective_firewalls(self)
mtu: int
448  @property
449  def mtu(self) -> int:
450    if 'mtu' in self._resource_data:
451      return self._resource_data['mtu']
452    return DEFAULT_MTU
subnetworks: Dict[str, Subnetwork]
454  @property
455  def subnetworks(self) -> Dict[str, Subnetwork]:
456    return _batch_get_subnetworks(
457        self._project_id, frozenset(self._resource_data.get('subnetworks', [])))
peerings: List[Peering]
459  @property
460  def peerings(self) -> List[Peering]:
461    return [
462        Peering(peer['name'], peer['network'], peer['state'],
463                peer['exportCustomRoutes'], peer['importCustomRoutes'],
464                peer['autoCreateRoutes'])
465        for peer in self._resource_data.get('peerings', [])
466    ]
@dataclasses.dataclass
class FirewallCheckResult:
574@dataclasses.dataclass
575class FirewallCheckResult:
576  """The result of a firewall connectivity check."""
577
578  action: str
579  firewall_policy_name: Optional[str] = None
580  firewall_policy_rule_description: Optional[str] = None
581  vpc_firewall_rule_id: Optional[str] = None
582  vpc_firewall_rule_name: Optional[str] = None
583
584  def __str__(self):
585    return self.action
586
587  @property
588  def matched_by_str(self):
589    if self.firewall_policy_name:
590      if self.firewall_policy_rule_description:
591        return f'policy: {self.firewall_policy_name}, rule: {self.firewall_policy_rule_description}'
592      else:
593        return f'policy: {self.firewall_policy_name}'
594    elif self.vpc_firewall_rule_name:
595      return f'vpc firewall rule: {self.vpc_firewall_rule_name}'

The result of a firewall connectivity check.

FirewallCheckResult( action: str, firewall_policy_name: Optional[str] = None, firewall_policy_rule_description: Optional[str] = None, vpc_firewall_rule_id: Optional[str] = None, vpc_firewall_rule_name: Optional[str] = None)
action: str
firewall_policy_name: Optional[str] = None
firewall_policy_rule_description: Optional[str] = None
vpc_firewall_rule_id: Optional[str] = None
vpc_firewall_rule_name: Optional[str] = None
matched_by_str
587  @property
588  def matched_by_str(self):
589    if self.firewall_policy_name:
590      if self.firewall_policy_rule_description:
591        return f'policy: {self.firewall_policy_name}, rule: {self.firewall_policy_rule_description}'
592      else:
593        return f'policy: {self.firewall_policy_name}'
594    elif self.vpc_firewall_rule_name:
595      return f'vpc firewall rule: {self.vpc_firewall_rule_name}'
class FirewallRuleNotFoundError(builtins.Exception):
598class FirewallRuleNotFoundError(Exception):
599  rule_name: str
600
601  def __init__(self, name, disabled=False):
602    # Call the base class constructor with the parameters it needs
603    super().__init__(f'firewall rule not found: {name}')
604    self.rule_name = name
605    self.disabled = disabled

Common base class for all non-exit exceptions.

FirewallRuleNotFoundError(name, disabled=False)
601  def __init__(self, name, disabled=False):
602    # Call the base class constructor with the parameters it needs
603    super().__init__(f'firewall rule not found: {name}')
604    self.rule_name = name
605    self.disabled = disabled
rule_name: str
disabled
class VpcFirewallRule:
608class VpcFirewallRule:
609  """Represents firewall rule"""
610
611  def __init__(self, resource_data):
612    self._resource_data = resource_data
613
614  @property
615  def name(self) -> str:
616    return self._resource_data['name']
617
618  @property
619  def source_ranges(self) -> List[ipaddress.IPv4Network]:
620    return self._resource_data['sourceRanges']
621
622  @property
623  def target_tags(self) -> set:
624    return self._resource_data['targetTags']
625
626  @property
627  def allowed(self) -> List[dict]:
628    return self._resource_data['allowed']
629
630  def is_enabled(self) -> bool:
631    return not self._resource_data['disabled']

Represents firewall rule

VpcFirewallRule(resource_data)
611  def __init__(self, resource_data):
612    self._resource_data = resource_data
name: str
614  @property
615  def name(self) -> str:
616    return self._resource_data['name']
source_ranges: List[ipaddress.IPv4Network]
618  @property
619  def source_ranges(self) -> List[ipaddress.IPv4Network]:
620    return self._resource_data['sourceRanges']
target_tags: set
622  @property
623  def target_tags(self) -> set:
624    return self._resource_data['targetTags']
allowed: List[dict]
626  @property
627  def allowed(self) -> List[dict]:
628    return self._resource_data['allowed']
def is_enabled(self) -> bool:
630  def is_enabled(self) -> bool:
631    return not self._resource_data['disabled']
class EffectiveFirewalls:
 983class EffectiveFirewalls:
 984  """Effective firewall rules for a VPC network or Instance.
 985
 986  Includes org/folder firewall policies)."""
 987  _resource_data: dict
 988  _policies: List[_FirewallPolicy]
 989  _vpc_firewall: _VpcFirewall
 990
 991  def __init__(self, resource_data):
 992    self._resource_data = resource_data
 993    self._policies = []
 994    if 'firewallPolicys' in resource_data:
 995      for policy in resource_data['firewallPolicys']:
 996        self._policies.append(_FirewallPolicy(policy))
 997    self._vpc_firewall = _VpcFirewall(resource_data.get('firewalls', {}))
 998
 999  def check_connectivity_ingress(
1000      self,  #
1001      *,
1002      src_ip: IPAddrOrNet,
1003      ip_protocol: str,
1004      port: Optional[int] = None,
1005      source_service_account: Optional[str] = None,
1006      source_tags: Optional[List[str]] = None,
1007      target_service_account: Optional[str] = None,
1008      target_tags: Optional[List[str]] = None) -> FirewallCheckResult:
1009
1010    if ip_protocol != 'ICMP' and port is None:
1011      raise ValueError('TCP and UDP must have port numbers')
1012
1013    # Firewall policies (organization, folders)
1014    for p in self._policies:
1015      result = p.check_connectivity_ingress(
1016          src_ip=src_ip,
1017          ip_protocol=ip_protocol,
1018          port=port,
1019          #target_network=self._network,
1020          target_service_account=target_service_account)
1021      if result.action != 'goto_next':
1022        return result
1023
1024    # VPC firewall rules
1025    return self._vpc_firewall.check_connectivity_ingress(
1026        src_ip=src_ip,
1027        ip_protocol=ip_protocol,
1028        port=port,
1029        source_service_account=source_service_account,
1030        source_tags=source_tags,
1031        target_service_account=target_service_account,
1032        target_tags=target_tags)
1033
1034  def check_connectivity_egress(
1035      self,  #
1036      *,
1037      src_ip: IPAddrOrNet,
1038      ip_protocol: str,
1039      port: Optional[int] = None,
1040      source_service_account: Optional[str] = None,
1041      source_tags: Optional[List[str]] = None,
1042      target_service_account: Optional[str] = None,
1043      target_tags: Optional[List[str]] = None) -> FirewallCheckResult:
1044
1045    if ip_protocol != 'ICMP' and port is None:
1046      raise ValueError('TCP and UDP must have port numbers')
1047
1048    # Firewall policies (organization, folders)
1049    for p in self._policies:
1050      result = p.check_connectivity_egress(
1051          src_ip=src_ip,
1052          ip_protocol=ip_protocol,
1053          port=port,
1054          #target_network=self._network,
1055          target_service_account=target_service_account)
1056      if result.action != 'goto_next':
1057        return result
1058
1059    # VPC firewall rules
1060    return self._vpc_firewall.check_connectivity_egress(
1061        src_ip=src_ip,
1062        ip_protocol=ip_protocol,
1063        port=port,
1064        source_service_account=source_service_account,
1065        source_tags=source_tags,
1066        target_service_account=target_service_account,
1067        target_tags=target_tags)
1068
1069  def get_vpc_ingress_rules(
1070      self,
1071      name: Optional[str] = None,
1072      name_pattern: Optional[re.Pattern] = None,
1073      target_tags: Optional[List[str]] = None) -> List[VpcFirewallRule]:
1074    """Retrieve the list of ingress firewall rules matching name or name pattern and target tags.
1075
1076    Args:
1077        name (Optional[str], optional): firewall rune name. Defaults to None.
1078        name_pattern (Optional[re.Pattern], optional): firewall rule name pattern. Defaults to None.
1079        target_tags (Optional[List[str]], optional): firewall target tags
1080          (if not specified any tag will match). Defaults to None.
1081
1082    Returns:
1083        List[VpcFirewallRule]: List of ingress firewall rules
1084    """
1085    rules = self._vpc_firewall.get_vpc_ingress_rules(name, name_pattern,
1086                                                     target_tags)
1087    return rules
1088
1089  def get_vpc_egress_rules(
1090      self,
1091      name: Optional[str] = None,
1092      name_pattern: Optional[re.Pattern] = None,
1093      target_tags: Optional[List[str]] = None) -> List[VpcFirewallRule]:
1094    """Retrieve the list of egress firewall rules matching name or name pattern and target tags.
1095
1096    Args:
1097        name (Optional[str], optional): firewall rune name. Defaults to None.
1098        name_pattern (Optional[re.Pattern], optional): firewall rule name pattern. Defaults to None.
1099        target_tags (Optional[List[str]], optional): firewall target tags
1100          (if not specified any tag will match). Defaults to None.
1101
1102    Returns:
1103        List[VpcFirewallRule]: List of egress firewall rules
1104    """
1105    rules = self._vpc_firewall.get_vpc_egress_rules(name, name_pattern,
1106                                                    target_tags)
1107    return rules
1108
1109  def verify_ingress_rule_exists(self, name: str):
1110    """Verify that a certain VPC rule exists. This is useful to verify
1111    whether maybe a permission was missing on a shared VPC and an
1112    automatic rule couldn't be created."""
1113    return self._vpc_firewall.verify_ingress_rule_exists(name)
1114
1115  def verify_egress_rule_exists(self, name: str):
1116    """Verify that a certain VPC rule exists. This is useful to verify
1117    whether maybe a permission was missing on a shared VPC and an
1118    automatic rule couldn't be created."""
1119    return self._vpc_firewall.verify_egress_rule_exists(name)

Effective firewall rules for a VPC network or Instance.

Includes org/folder firewall policies).

EffectiveFirewalls(resource_data)
991  def __init__(self, resource_data):
992    self._resource_data = resource_data
993    self._policies = []
994    if 'firewallPolicys' in resource_data:
995      for policy in resource_data['firewallPolicys']:
996        self._policies.append(_FirewallPolicy(policy))
997    self._vpc_firewall = _VpcFirewall(resource_data.get('firewalls', {}))
def check_connectivity_ingress( self, *, src_ip: Union[ipaddress.IPv4Address, ipaddress.IPv6Address, ipaddress.IPv4Network, ipaddress.IPv6Network], ip_protocol: str, port: Optional[int] = None, source_service_account: Optional[str] = None, source_tags: Optional[List[str]] = None, target_service_account: Optional[str] = None, target_tags: Optional[List[str]] = None) -> FirewallCheckResult:
 999  def check_connectivity_ingress(
1000      self,  #
1001      *,
1002      src_ip: IPAddrOrNet,
1003      ip_protocol: str,
1004      port: Optional[int] = None,
1005      source_service_account: Optional[str] = None,
1006      source_tags: Optional[List[str]] = None,
1007      target_service_account: Optional[str] = None,
1008      target_tags: Optional[List[str]] = None) -> FirewallCheckResult:
1009
1010    if ip_protocol != 'ICMP' and port is None:
1011      raise ValueError('TCP and UDP must have port numbers')
1012
1013    # Firewall policies (organization, folders)
1014    for p in self._policies:
1015      result = p.check_connectivity_ingress(
1016          src_ip=src_ip,
1017          ip_protocol=ip_protocol,
1018          port=port,
1019          #target_network=self._network,
1020          target_service_account=target_service_account)
1021      if result.action != 'goto_next':
1022        return result
1023
1024    # VPC firewall rules
1025    return self._vpc_firewall.check_connectivity_ingress(
1026        src_ip=src_ip,
1027        ip_protocol=ip_protocol,
1028        port=port,
1029        source_service_account=source_service_account,
1030        source_tags=source_tags,
1031        target_service_account=target_service_account,
1032        target_tags=target_tags)
def check_connectivity_egress( self, *, src_ip: Union[ipaddress.IPv4Address, ipaddress.IPv6Address, ipaddress.IPv4Network, ipaddress.IPv6Network], ip_protocol: str, port: Optional[int] = None, source_service_account: Optional[str] = None, source_tags: Optional[List[str]] = None, target_service_account: Optional[str] = None, target_tags: Optional[List[str]] = None) -> FirewallCheckResult:
1034  def check_connectivity_egress(
1035      self,  #
1036      *,
1037      src_ip: IPAddrOrNet,
1038      ip_protocol: str,
1039      port: Optional[int] = None,
1040      source_service_account: Optional[str] = None,
1041      source_tags: Optional[List[str]] = None,
1042      target_service_account: Optional[str] = None,
1043      target_tags: Optional[List[str]] = None) -> FirewallCheckResult:
1044
1045    if ip_protocol != 'ICMP' and port is None:
1046      raise ValueError('TCP and UDP must have port numbers')
1047
1048    # Firewall policies (organization, folders)
1049    for p in self._policies:
1050      result = p.check_connectivity_egress(
1051          src_ip=src_ip,
1052          ip_protocol=ip_protocol,
1053          port=port,
1054          #target_network=self._network,
1055          target_service_account=target_service_account)
1056      if result.action != 'goto_next':
1057        return result
1058
1059    # VPC firewall rules
1060    return self._vpc_firewall.check_connectivity_egress(
1061        src_ip=src_ip,
1062        ip_protocol=ip_protocol,
1063        port=port,
1064        source_service_account=source_service_account,
1065        source_tags=source_tags,
1066        target_service_account=target_service_account,
1067        target_tags=target_tags)
def get_vpc_ingress_rules( self, name: Optional[str] = None, name_pattern: Optional[re.Pattern] = None, target_tags: Optional[List[str]] = None) -> List[VpcFirewallRule]:
1069  def get_vpc_ingress_rules(
1070      self,
1071      name: Optional[str] = None,
1072      name_pattern: Optional[re.Pattern] = None,
1073      target_tags: Optional[List[str]] = None) -> List[VpcFirewallRule]:
1074    """Retrieve the list of ingress firewall rules matching name or name pattern and target tags.
1075
1076    Args:
1077        name (Optional[str], optional): firewall rune name. Defaults to None.
1078        name_pattern (Optional[re.Pattern], optional): firewall rule name pattern. Defaults to None.
1079        target_tags (Optional[List[str]], optional): firewall target tags
1080          (if not specified any tag will match). Defaults to None.
1081
1082    Returns:
1083        List[VpcFirewallRule]: List of ingress firewall rules
1084    """
1085    rules = self._vpc_firewall.get_vpc_ingress_rules(name, name_pattern,
1086                                                     target_tags)
1087    return rules

Retrieve the list of ingress firewall rules matching name or name pattern and target tags.

Arguments:
  • name (Optional[str], optional): firewall rune name. Defaults to None.
  • name_pattern (Optional[re.Pattern], optional): firewall rule name pattern. Defaults to None.
  • target_tags (Optional[List[str]], optional): firewall target tags (if not specified any tag will match). Defaults to None.
Returns:

List[VpcFirewallRule]: List of ingress firewall rules

def get_vpc_egress_rules( self, name: Optional[str] = None, name_pattern: Optional[re.Pattern] = None, target_tags: Optional[List[str]] = None) -> List[VpcFirewallRule]:
1089  def get_vpc_egress_rules(
1090      self,
1091      name: Optional[str] = None,
1092      name_pattern: Optional[re.Pattern] = None,
1093      target_tags: Optional[List[str]] = None) -> List[VpcFirewallRule]:
1094    """Retrieve the list of egress firewall rules matching name or name pattern and target tags.
1095
1096    Args:
1097        name (Optional[str], optional): firewall rune name. Defaults to None.
1098        name_pattern (Optional[re.Pattern], optional): firewall rule name pattern. Defaults to None.
1099        target_tags (Optional[List[str]], optional): firewall target tags
1100          (if not specified any tag will match). Defaults to None.
1101
1102    Returns:
1103        List[VpcFirewallRule]: List of egress firewall rules
1104    """
1105    rules = self._vpc_firewall.get_vpc_egress_rules(name, name_pattern,
1106                                                    target_tags)
1107    return rules

Retrieve the list of egress firewall rules matching name or name pattern and target tags.

Arguments:
  • name (Optional[str], optional): firewall rune name. Defaults to None.
  • name_pattern (Optional[re.Pattern], optional): firewall rule name pattern. Defaults to None.
  • target_tags (Optional[List[str]], optional): firewall target tags (if not specified any tag will match). Defaults to None.
Returns:

List[VpcFirewallRule]: List of egress firewall rules

def verify_ingress_rule_exists(self, name: str):
1109  def verify_ingress_rule_exists(self, name: str):
1110    """Verify that a certain VPC rule exists. This is useful to verify
1111    whether maybe a permission was missing on a shared VPC and an
1112    automatic rule couldn't be created."""
1113    return self._vpc_firewall.verify_ingress_rule_exists(name)

Verify that a certain VPC rule exists. This is useful to verify whether maybe a permission was missing on a shared VPC and an automatic rule couldn't be created.

def verify_egress_rule_exists(self, name: str):
1115  def verify_egress_rule_exists(self, name: str):
1116    """Verify that a certain VPC rule exists. This is useful to verify
1117    whether maybe a permission was missing on a shared VPC and an
1118    automatic rule couldn't be created."""
1119    return self._vpc_firewall.verify_egress_rule_exists(name)

Verify that a certain VPC rule exists. This is useful to verify whether maybe a permission was missing on a shared VPC and an automatic rule couldn't be created.

class VPCEffectiveFirewalls(EffectiveFirewalls):
1122class VPCEffectiveFirewalls(EffectiveFirewalls):
1123  """Effective firewall rules for a VPC network.
1124
1125  Includes org/folder firewall policies)."""
1126  _network: Network
1127
1128  def __init__(self, network, resource_data):
1129    super().__init__(resource_data)
1130    self._network = network

Effective firewall rules for a VPC network.

Includes org/folder firewall policies).

VPCEffectiveFirewalls(network, resource_data)
1128  def __init__(self, network, resource_data):
1129    super().__init__(resource_data)
1130    self._network = network
@caching.cached_api_call(in_memory=True)
def get_network(project_id: str, network_name: str) -> Network:
1142@caching.cached_api_call(in_memory=True)
1143def get_network(project_id: str, network_name: str) -> Network:
1144  logging.debug('fetching network: %s/%s', project_id, network_name)
1145  compute = apis.get_api('compute', 'v1', project_id)
1146  request = compute.networks().get(project=project_id, network=network_name)
1147  response = request.execute(num_retries=config.API_RETRIES)
1148  return Network(project_id, response)
def get_subnetwork_from_url(url: str) -> Subnetwork:
1151def get_subnetwork_from_url(url: str) -> Subnetwork:
1152  """Returns Subnetwork object given subnetwork url"""
1153  m = re.match((r'https://www.googleapis.com/compute/v1/projects/'
1154                r'([^/]+)/regions/([^/]+)/subnetworks/([^/]+)$'), url)
1155  if not m:
1156    raise ValueError(f"can't parse network url: {url}")
1157  (project_id, region, subnetwork_name) = (m.group(1), m.group(2), m.group(3))
1158  return get_subnetwork(project_id, region, subnetwork_name)

Returns Subnetwork object given subnetwork url

def get_network_from_url(url: str) -> Network:
1161def get_network_from_url(url: str) -> Network:
1162  m = re.match(
1163      r'https://www.googleapis.com/compute/v1/projects/([^/]+)/global/networks/([^/]+)',
1164      url)
1165  if not m:
1166    raise ValueError(f"can't parse network url: {url}")
1167  (project_id, network_name) = (m.group(1), m.group(2))
1168  return get_network(project_id, network_name)
@caching.cached_api_call(in_memory=True)
def get_networks(project_id: str) -> List[Network]:
1171@caching.cached_api_call(in_memory=True)
1172def get_networks(project_id: str) -> List[Network]:
1173  logging.debug('fetching network: %s', project_id)
1174  compute = apis.get_api('compute', 'v1', project_id)
1175  request = compute.networks().list(project=project_id)
1176  response = request.execute(num_retries=config.API_RETRIES)
1177  return [Network(project_id, item) for item in response.get('items', [])]
@caching.cached_api_call(in_memory=True)
def get_subnetwork( project_id: str, region: str, subnetwork_name: str) -> Subnetwork:
1180@caching.cached_api_call(in_memory=True)
1181def get_subnetwork(project_id: str, region: str,
1182                   subnetwork_name: str) -> Subnetwork:
1183  logging.debug('fetching network: %s/%s', project_id, subnetwork_name)
1184  compute = apis.get_api('compute', 'v1', project_id)
1185  request = compute.subnetworks().get(project=project_id,
1186                                      region=region,
1187                                      subnetwork=subnetwork_name)
1188  response = request.execute(num_retries=config.API_RETRIES)
1189  return Subnetwork(project_id, response)
@caching.cached_api_call(in_memory=True)
def get_routes(project_id: str) -> List[Route]:
1218@caching.cached_api_call(in_memory=True)
1219def get_routes(project_id: str) -> List[Route]:
1220  logging.debug('fetching routes: %s', project_id)
1221  compute = apis.get_api('compute', 'v1', project_id)
1222  request = compute.routes().list(project=project_id)
1223  response = request.execute(num_retries=config.API_RETRIES)
1224  return [Route(project_id, item) for item in response.get('items', [])]
@caching.cached_api_call(in_memory=True)
def get_zones(project_id: str) -> List[ManagedZone]:
1227@caching.cached_api_call(in_memory=True)
1228def get_zones(project_id: str) -> List[ManagedZone]:
1229  logging.debug('fetching DNS zones: %s', project_id)
1230  dns = apis.get_api('dns', 'v1beta2', project_id)
1231  request = dns.managedZones().list(project=project_id)
1232  response = request.execute(num_retries=config.API_RETRIES)
1233  zones = []
1234  for zone in response.get('managedZones', []):
1235    request2 = dns.managedZones().get(project=project_id,
1236                                      managedZone=zone['name'])
1237    response2 = request2.execute(num_retries=config.API_RETRIES)
1238    zones.append(ManagedZone(project_id, response2))
1239  return zones
@caching.cached_api_call(in_memory=True)
def get_routers( project_id: str, region: str, network) -> List[Router]:
1242@caching.cached_api_call(in_memory=True)
1243def get_routers(project_id: str, region: str, network) -> List[Router]:
1244  logging.debug('fetching routers: %s/%s', project_id, region)
1245  compute = apis.get_api('compute', 'v1', project_id)
1246  request = compute.routers().list(project=project_id,
1247                                   region=region,
1248                                   filter=f'network="{network.self_link}"')
1249  response = request.execute(num_retries=config.API_RETRIES)
1250  return [Router(project_id, item) for item in response.get('items', [])]
@caching.cached_api_call(in_memory=True)
def get_router(project_id: str, region: str, network) -> Router:
1253@caching.cached_api_call(in_memory=True)
1254def get_router(project_id: str, region: str, network) -> Router:
1255  logging.debug('fetching routers: %s/%s', project_id, region)
1256  compute = apis.get_api('compute', 'v1', project_id)
1257  request = compute.routers().list(project=project_id,
1258                                   region=region,
1259                                   filter=f'network="{network.self_link}"')
1260  response = request.execute(num_retries=config.API_RETRIES)
1261  return Router(project_id, next(iter(response.get('items', [{}]))))
@caching.cached_api_call(in_memory=True)
def get_router_by_name( project_id: str, region: str, router_name: str) -> Router:
1264@caching.cached_api_call(in_memory=True)
1265def get_router_by_name(project_id: str, region: str,
1266                       router_name: str) -> Router:
1267  logging.debug('fetching router list: %s/%s in region %s', project_id,
1268                router_name, region)
1269  compute = apis.get_api('compute', 'v1', project_id)
1270  request = compute.routers().list(project=project_id, region=region)
1271  response = request.execute(num_retries=config.API_RETRIES)
1272  return next(
1273      Router(project_id, item)
1274      for item in response.get('items', [])
1275      if item['name'] == router_name)
@caching.cached_api_call(in_memory=True)
def nat_router_status( project_id: str, router_name: str, region: str) -> RouterStatus:
1278@caching.cached_api_call(in_memory=True)
1279def nat_router_status(project_id: str, router_name: str,
1280                      region: str) -> RouterStatus:
1281  logging.debug('fetching router status: %s/%s in region %s', project_id,
1282                router_name, region)
1283  compute = apis.get_api('compute', 'v1', project_id)
1284  request = compute.routers().getRouterStatus(project=project_id,
1285                                              router=router_name,
1286                                              region=region)
1287  response = request.execute(num_retries=config.API_RETRIES)
1288  if 'result' in str(response):
1289    return RouterStatus(project_id, response)
1290  else:
1291    logging.debug('unable to fetch router status: %s/%s in region %s',
1292                  project_id, router_name, region)
1293    return RouterStatus(project_id, {})
@caching.cached_api_call(in_memory=True)
def get_nat_ip_info( project_id: str, router_name: str, region: str) -> RouterNatIpInfo:
1296@caching.cached_api_call(in_memory=True)
1297def get_nat_ip_info(project_id: str, router_name: str,
1298                    region: str) -> RouterNatIpInfo:
1299  logging.debug('fetching NAT IP info for router: %s/%s in region %s',
1300                project_id, router_name, region)
1301  compute = apis.get_api('compute', 'v1', project_id)
1302  request = compute.routers().getNatIpInfo(project=project_id,
1303                                           router=router_name,
1304                                           region=region)
1305  response = request.execute(num_retries=config.API_RETRIES)
1306  if 'result' in str(response):
1307    return RouterNatIpInfo(project_id, response)
1308  else:
1309    logging.debug('unable to fetch Nat IP Info for router: %s/%s in region %s',
1310                  project_id, router_name, region)
1311    return RouterNatIpInfo(project_id, {})
class VPCSubnetworkIAMPolicy(gcpdiag.queries.iam.BaseIAMPolicy):
1314class VPCSubnetworkIAMPolicy(iam.BaseIAMPolicy):
1315
1316  def _is_resource_permission(self, permission):
1317    return True

Common class for IAM policies

@caching.cached_api_call(in_memory=True)
def get_subnetwork_iam_policy( project_id: str, region: str, subnetwork_name: str) -> VPCSubnetworkIAMPolicy:
1320@caching.cached_api_call(in_memory=True)
1321def get_subnetwork_iam_policy(project_id: str, region: str,
1322                              subnetwork_name: str) -> VPCSubnetworkIAMPolicy:
1323  resource_name = (f'projects/{project_id}/regions/{region}/'
1324                   f'subnetworks/{subnetwork_name}')
1325
1326  compute = apis.get_api('compute', 'v1', project_id)
1327  request = compute.subnetworks().getIamPolicy(project=project_id,
1328                                               region=region,
1329                                               resource=subnetwork_name)
1330
1331  return iam.fetch_iam_policy(request, VPCSubnetworkIAMPolicy, project_id,
1332                              resource_name)
class Address(gcpdiag.models.Resource):
1335class Address(models.Resource):
1336  """IP Addresses."""
1337  _resource_data: dict
1338
1339  def __init__(self, project_id, resource_data):
1340    super().__init__(project_id=project_id)
1341    self._resource_data = resource_data
1342
1343  @property
1344  def full_path(self) -> str:
1345    result = re.match(r'https://www.googleapis.com/compute/v1/(.*)',
1346                      self.self_link)
1347    if result:
1348      return result.group(1)
1349    else:
1350      return f'>> {self.self_link}'
1351
1352  @property
1353  def short_path(self) -> str:
1354    path = self.project_id + '/' + self.name
1355    return path
1356
1357  @property
1358  def name(self) -> str:
1359    return self._resource_data['name']
1360
1361  @property
1362  def self_link(self) -> str:
1363    return self._resource_data.get('selfLink', '')
1364
1365  @property
1366  def subnetwork(self) -> str:
1367    return self._resource_data['subnetwork']
1368
1369  @property
1370  def status(self) -> str:
1371    return self._resource_data['status']

IP Addresses.

Address(project_id, resource_data)
1339  def __init__(self, project_id, resource_data):
1340    super().__init__(project_id=project_id)
1341    self._resource_data = resource_data
full_path: str
1343  @property
1344  def full_path(self) -> str:
1345    result = re.match(r'https://www.googleapis.com/compute/v1/(.*)',
1346                      self.self_link)
1347    if result:
1348      return result.group(1)
1349    else:
1350      return f'>> {self.self_link}'

Returns the full path of this resource.

Example: 'projects/gcpdiag-gke-1-9b90/zones/europe-west4-a/clusters/gke1'

short_path: str
1352  @property
1353  def short_path(self) -> str:
1354    path = self.project_id + '/' + self.name
1355    return path

Returns the short name for this resource.

Note that it isn't clear from this name what kind of resource it is.

Example: 'gke1'

name: str
1357  @property
1358  def name(self) -> str:
1359    return self._resource_data['name']
subnetwork: str
1365  @property
1366  def subnetwork(self) -> str:
1367    return self._resource_data['subnetwork']
status: str
1369  @property
1370  def status(self) -> str:
1371    return self._resource_data['status']
@caching.cached_api_call(in_memory=True)
def get_addresses(project_id: str) -> List[Address]:
1374@caching.cached_api_call(in_memory=True)
1375def get_addresses(project_id: str) -> List[Address]:
1376  logging.debug('fetching addresses list: %s', project_id)
1377  compute = apis.get_api('compute', 'v1', project_id)
1378  addresses = []
1379  request = compute.addresses().aggregatedList(project=project_id)
1380  response = request.execute(num_retries=config.API_RETRIES)
1381  addresses_by_regions = response['items']
1382  for _, data_ in addresses_by_regions.items():
1383    if 'addresses' not in data_:
1384      continue
1385    addresses.extend(
1386        [Address(project_id, address) for address in data_['addresses']])
1387  return addresses