gcpdiag.queries.network

Queries related to VPC Networks.
IPv4AddrOrIPv6Addr = typing.Union[ipaddress.IPv4Address, ipaddress.IPv6Address]
IPv4NetOrIPv6Net = typing.Union[ipaddress.IPv4Network, ipaddress.IPv6Network]
IPAddrOrNet = typing.Union[ipaddress.IPv4Address, ipaddress.IPv6Address, ipaddress.IPv4Network, ipaddress.IPv6Network]
DEFAULT_MTU = 1460
class Subnetwork(gcpdiag.models.Resource):
35class Subnetwork(models.Resource):
36  """A VPC subnetwork."""
37
38  _resource_data: dict
39
40  def __init__(self, project_id, resource_data):
41    super().__init__(project_id=project_id)
42    self._resource_data = resource_data
43
44  @property
45  def full_path(self) -> str:
46    result = re.match(r'https://www.googleapis.com/compute/v1/(.*)',
47                      self.self_link)
48    if result:
49      return result.group(1)
50    else:
51      return f'>> {self.self_link}'
52
53  @property
54  def short_path(self) -> str:
55    path = self.project_id + '/' + self.name
56    return path
57
58  @property
59  def name(self) -> str:
60    return self._resource_data['name']
61
62  @property
63  def self_link(self) -> str:
64    return self._resource_data.get('selfLink', '')
65
66  @property
67  def ip_network(self) -> IPv4NetOrIPv6Net:
68    return ipaddress.ip_network(self._resource_data['ipCidrRange'])
69
70  @property
71  def region(self) -> str:
72    # https://www.googleapis.com/compute/v1/projects/gcpdiag-gke1-aaaa/regions/europe-west4
73    m = re.match(
74        r'https://www.googleapis.com/compute/v1/projects/([^/]+)/regions/([^/]+)',
75        self._resource_data['region'])
76    if not m:
77      raise RuntimeError(
78          f"can't parse region URL: {self._resource_data['region']}")
79    return m.group(2)
80
81  def is_private_ip_google_access(self) -> bool:
82    return self._resource_data.get('privateIpGoogleAccess', False)
83
84  @property
85  def network(self):
86    return self._resource_data['network']

A VPC subnetwork.

Subnetwork(project_id, resource_data)
40  def __init__(self, project_id, resource_data):
41    super().__init__(project_id=project_id)
42    self._resource_data = resource_data
full_path: str
44  @property
45  def full_path(self) -> str:
46    result = re.match(r'https://www.googleapis.com/compute/v1/(.*)',
47                      self.self_link)
48    if result:
49      return result.group(1)
50    else:
51      return f'>> {self.self_link}'

Returns the full path of this resource.

Example: 'projects/gcpdiag-gke-1-9b90/zones/europe-west4-a/clusters/gke1'

short_path: str
53  @property
54  def short_path(self) -> str:
55    path = self.project_id + '/' + self.name
56    return path

Returns the short name for this resource.

Note that it isn't clear from this name what kind of resource it is.

Example: 'gke1'

name: str
58  @property
59  def name(self) -> str:
60    return self._resource_data['name']
ip_network: Union[ipaddress.IPv4Network, ipaddress.IPv6Network]
66  @property
67  def ip_network(self) -> IPv4NetOrIPv6Net:
68    return ipaddress.ip_network(self._resource_data['ipCidrRange'])
region: str
70  @property
71  def region(self) -> str:
72    # https://www.googleapis.com/compute/v1/projects/gcpdiag-gke1-aaaa/regions/europe-west4
73    m = re.match(
74        r'https://www.googleapis.com/compute/v1/projects/([^/]+)/regions/([^/]+)',
75        self._resource_data['region'])
76    if not m:
77      raise RuntimeError(
78          f"can't parse region URL: {self._resource_data['region']}")
79    return m.group(2)
def is_private_ip_google_access(self) -> bool:
81  def is_private_ip_google_access(self) -> bool:
82    return self._resource_data.get('privateIpGoogleAccess', False)
network
84  @property
85  def network(self):
86    return self._resource_data['network']
class Route(gcpdiag.models.Resource):
 89class Route(models.Resource):
 90  """A VPC Route."""
 91
 92  _resource_data: dict
 93
 94  def __init__(self, project_id, resource_data):
 95    super().__init__(project_id=project_id)
 96    self._resource_data = resource_data
 97
 98  @property
 99  def full_path(self) -> str:
100    result = re.match(r'https://www.googleapis.com/compute/v1/(.*)',
101                      self.self_link)
102    if result:
103      return result.group(1)
104    else:
105      return f'>> {self.self_link}'
106
107  @property
108  def short_path(self) -> str:
109    path = self.project_id + '/' + self.name
110    return path
111
112  @property
113  def name(self) -> str:
114    return self._resource_data['name']
115
116  @property
117  def kind(self) -> str:
118    return self._resource_data['kind']
119
120  @property
121  def self_link(self) -> str:
122    return self._resource_data['selfLink']
123
124  @property
125  def network(self) -> str:
126    return self._resource_data['network']
127
128  @property
129  def tags(self) -> List[str]:
130    if 'tags' in self._resource_data:
131      return self._resource_data['tags']
132    return []
133
134  @property
135  def dest_range(self) -> str:
136    return self._resource_data['destRange']
137
138  @property
139  def next_hop_gateway(self) -> Optional[str]:
140    if 'nextHopGateway' in self._resource_data:
141      return self._resource_data['nextHopGateway']
142    return None
143
144  @property
145  def next_hop_vpn_tunnel(self) -> Optional[str]:
146    return self._resource_data.get('nextHopVpnTunnel')
147
148  @property
149  def next_hop_hub(self) -> Optional[str]:
150    return self._resource_data.get('nextHopHub')
151
152  @property
153  def priority(self) -> int:
154    return self._resource_data['priority']
155
156  def get_next_hop(self) -> Union[Dict[str, Any], Optional[str]]:
157    hop_types = {
158        'nextHopGateway': 'nextHopGateway',
159        'nextHopVpnTunnel': 'nextHopVpnTunnel',
160        'nextHopHub': 'nextHopHub',
161        'nextHopInstance': 'nextHopInstance',
162        'nextHopAddress': 'nextHopAddress',
163        'nextHopPeering': 'nextHopPeering',
164        'nextHopIlb': 'nextHopIlb',
165        'nextHopNetwork': 'nextHopNetwork',
166        'nextHopIp': 'nextHopIp'
167    }
168
169    for hop_type, value in hop_types.items():
170      if self._resource_data.get(hop_type):
171        return {'type': value, 'link': self._resource_data[hop_type]}
172    return None
173
174  def check_route_match(self, ip1: IPAddrOrNet, ip2: str) -> bool:
175    ip2_list = [ipaddress.ip_network(ip2)]
176    if _ip_match(ip1, ip2_list, 'allow'):
177      return True
178    return False

A VPC Route.

Route(project_id, resource_data)
94  def __init__(self, project_id, resource_data):
95    super().__init__(project_id=project_id)
96    self._resource_data = resource_data
full_path: str
 98  @property
 99  def full_path(self) -> str:
100    result = re.match(r'https://www.googleapis.com/compute/v1/(.*)',
101                      self.self_link)
102    if result:
103      return result.group(1)
104    else:
105      return f'>> {self.self_link}'

Returns the full path of this resource.

Example: 'projects/gcpdiag-gke-1-9b90/zones/europe-west4-a/clusters/gke1'

short_path: str
107  @property
108  def short_path(self) -> str:
109    path = self.project_id + '/' + self.name
110    return path

Returns the short name for this resource.

Note that it isn't clear from this name what kind of resource it is.

Example: 'gke1'

name: str
112  @property
113  def name(self) -> str:
114    return self._resource_data['name']
kind: str
116  @property
117  def kind(self) -> str:
118    return self._resource_data['kind']
network: str
124  @property
125  def network(self) -> str:
126    return self._resource_data['network']
tags: List[str]
128  @property
129  def tags(self) -> List[str]:
130    if 'tags' in self._resource_data:
131      return self._resource_data['tags']
132    return []
dest_range: str
134  @property
135  def dest_range(self) -> str:
136    return self._resource_data['destRange']
next_hop_gateway: Optional[str]
138  @property
139  def next_hop_gateway(self) -> Optional[str]:
140    if 'nextHopGateway' in self._resource_data:
141      return self._resource_data['nextHopGateway']
142    return None
next_hop_vpn_tunnel: Optional[str]
144  @property
145  def next_hop_vpn_tunnel(self) -> Optional[str]:
146    return self._resource_data.get('nextHopVpnTunnel')
next_hop_hub: Optional[str]
148  @property
149  def next_hop_hub(self) -> Optional[str]:
150    return self._resource_data.get('nextHopHub')
priority: int
152  @property
153  def priority(self) -> int:
154    return self._resource_data['priority']
def get_next_hop(self) -> Union[Dict[str, Any], str, NoneType]:
156  def get_next_hop(self) -> Union[Dict[str, Any], Optional[str]]:
157    hop_types = {
158        'nextHopGateway': 'nextHopGateway',
159        'nextHopVpnTunnel': 'nextHopVpnTunnel',
160        'nextHopHub': 'nextHopHub',
161        'nextHopInstance': 'nextHopInstance',
162        'nextHopAddress': 'nextHopAddress',
163        'nextHopPeering': 'nextHopPeering',
164        'nextHopIlb': 'nextHopIlb',
165        'nextHopNetwork': 'nextHopNetwork',
166        'nextHopIp': 'nextHopIp'
167    }
168
169    for hop_type, value in hop_types.items():
170      if self._resource_data.get(hop_type):
171        return {'type': value, 'link': self._resource_data[hop_type]}
172    return None
def check_route_match( self, ip1: Union[ipaddress.IPv4Address, ipaddress.IPv6Address, ipaddress.IPv4Network, ipaddress.IPv6Network], ip2: str) -> bool:
174  def check_route_match(self, ip1: IPAddrOrNet, ip2: str) -> bool:
175    ip2_list = [ipaddress.ip_network(ip2)]
176    if _ip_match(ip1, ip2_list, 'allow'):
177      return True
178    return False
class ManagedZone(gcpdiag.models.Resource):
181class ManagedZone(models.Resource):
182  """
183  Represent a DNS zone (public or private
184
185  https://cloud.google.com/dns/docs/reference/v1beta2/managedZones
186  """
187  _resource_data: dict
188
189  def __init__(self, project_id, resource_data):
190    super().__init__(project_id=project_id)
191    self._resource_data = resource_data
192
193  @property
194  def cloud_logging_config(self) -> bool:
195    return self._resource_data['cloudLoggingConfig'].get('enableLogging', False)
196
197  @property
198  def is_public(self) -> bool:
199    return self._resource_data['visibility'] == 'public'
200
201  @property
202  def vpc_attached(self) -> bool:
203    if 'privateVisibilityConfig' not in self._resource_data:
204      self._resource_data['privateVisibilityConfig'] = {}
205
206    return (self._resource_data['privateVisibilityConfig'].get(
207        'networks', False) or
208            self._resource_data['privateVisibilityConfig'].get(
209                'gkeClusters', False))
210
211  @property
212  def dnssec_config_state(self) -> bool:
213    if 'dnssecConfig' not in self._resource_data:
214      self._resource_data['dnssecConfig'] = {}
215
216    return self._resource_data['dnssecConfig'].get('state', False)
217
218  @property
219  def name(self) -> str:
220    return self._resource_data['name']
221
222  @property
223  def full_path(self) -> str:
224    result = re.match(r'https://dns.googleapis.com/dns/v1beta2/(.*)',
225                      self.self_link)
226    if result:
227      return result.group(1)
228    else:
229      return f'>> {self.self_link}'
230
231  @property
232  def short_path(self) -> str:
233    path = self.project_id + '/' + self.name
234    return path
235
236  @property
237  def self_link(self) -> str:
238    return self._resource_data.get('selfLink', '')
ManagedZone(project_id, resource_data)
189  def __init__(self, project_id, resource_data):
190    super().__init__(project_id=project_id)
191    self._resource_data = resource_data
cloud_logging_config: bool
193  @property
194  def cloud_logging_config(self) -> bool:
195    return self._resource_data['cloudLoggingConfig'].get('enableLogging', False)
is_public: bool
197  @property
198  def is_public(self) -> bool:
199    return self._resource_data['visibility'] == 'public'
vpc_attached: bool
201  @property
202  def vpc_attached(self) -> bool:
203    if 'privateVisibilityConfig' not in self._resource_data:
204      self._resource_data['privateVisibilityConfig'] = {}
205
206    return (self._resource_data['privateVisibilityConfig'].get(
207        'networks', False) or
208            self._resource_data['privateVisibilityConfig'].get(
209                'gkeClusters', False))
dnssec_config_state: bool
211  @property
212  def dnssec_config_state(self) -> bool:
213    if 'dnssecConfig' not in self._resource_data:
214      self._resource_data['dnssecConfig'] = {}
215
216    return self._resource_data['dnssecConfig'].get('state', False)
name: str
218  @property
219  def name(self) -> str:
220    return self._resource_data['name']
full_path: str
222  @property
223  def full_path(self) -> str:
224    result = re.match(r'https://dns.googleapis.com/dns/v1beta2/(.*)',
225                      self.self_link)
226    if result:
227      return result.group(1)
228    else:
229      return f'>> {self.self_link}'

Returns the full path of this resource.

Example: 'projects/gcpdiag-gke-1-9b90/zones/europe-west4-a/clusters/gke1'

short_path: str
231  @property
232  def short_path(self) -> str:
233    path = self.project_id + '/' + self.name
234    return path

Returns the short name for this resource.

Note that it isn't clear from this name what kind of resource it is.

Example: 'gke1'

class Router(gcpdiag.models.Resource):
241class Router(models.Resource):
242  """A VPC Router."""
243
244  _resource_data: dict
245
246  def __init__(self, project_id, resource_data):
247    super().__init__(project_id=project_id)
248    self._resource_data = resource_data
249    self._nats = None
250
251  @property
252  def full_path(self) -> str:
253    result = re.match(r'https://www.googleapis.com/compute/v1/(.*)',
254                      self.self_link)
255    if result:
256      return result.group(1)
257    else:
258      return f'>> {self.self_link}'
259
260  @property
261  def short_path(self) -> str:
262    path = self.project_id + '/' + self.name
263    return path
264
265  @property
266  def name(self) -> str:
267    return self._resource_data['name']
268
269  @property
270  def self_link(self) -> str:
271    return self._resource_data.get('selfLink', '')
272
273  @property
274  def network(self) -> str:
275    return self._resource_data['network']
276
277  def get_network_name(self) -> str:
278    logging.info('inside get_network_name function')
279    if self._resource_data['network']:
280      return self._resource_data['network'].split('/')[-1]
281    return ''
282
283  @property
284  def nats(self):
285    return self._resource_data.get('nats', [])
286
287  def get_nat_ip_allocate_option(self, nat_gateway) -> str:
288    nats = self._resource_data.get('nats', [])
289    nat = [n for n in nats if n['name'] == nat_gateway]
290    return nat[0].get('natIpAllocateOption', '')
291
292  def get_enable_dynamic_port_allocation(self, nat_gateway) -> str:
293    nats = self._resource_data.get('nats', [])
294    nat = [n for n in nats if n['name'] == nat_gateway]
295    return nat[0].get('enableDynamicPortAllocation', '')
296
297  def subnet_has_nat(self, subnetwork):
298    if not self._resource_data.get('nats', []):
299      return False
300    for n in self._resource_data.get('nats', []):
301      if n['sourceSubnetworkIpRangesToNat'] == 'LIST_OF_SUBNETWORKS':
302        # Cloud NAT configure for specific subnets
303        if 'subnetworks' in n and subnetwork.self_link in [
304            s['name'] for s in n['subnetworks']
305        ]:
306          return True
307      else:
308        # Cloud NAT configured for all subnets
309        return True
310    return False

A VPC Router.

Router(project_id, resource_data)
246  def __init__(self, project_id, resource_data):
247    super().__init__(project_id=project_id)
248    self._resource_data = resource_data
249    self._nats = None
full_path: str
251  @property
252  def full_path(self) -> str:
253    result = re.match(r'https://www.googleapis.com/compute/v1/(.*)',
254                      self.self_link)
255    if result:
256      return result.group(1)
257    else:
258      return f'>> {self.self_link}'

Returns the full path of this resource.

Example: 'projects/gcpdiag-gke-1-9b90/zones/europe-west4-a/clusters/gke1'

short_path: str
260  @property
261  def short_path(self) -> str:
262    path = self.project_id + '/' + self.name
263    return path

Returns the short name for this resource.

Note that it isn't clear from this name what kind of resource it is.

Example: 'gke1'

name: str
265  @property
266  def name(self) -> str:
267    return self._resource_data['name']
network: str
273  @property
274  def network(self) -> str:
275    return self._resource_data['network']
def get_network_name(self) -> str:
277  def get_network_name(self) -> str:
278    logging.info('inside get_network_name function')
279    if self._resource_data['network']:
280      return self._resource_data['network'].split('/')[-1]
281    return ''
nats
283  @property
284  def nats(self):
285    return self._resource_data.get('nats', [])
def get_nat_ip_allocate_option(self, nat_gateway) -> str:
287  def get_nat_ip_allocate_option(self, nat_gateway) -> str:
288    nats = self._resource_data.get('nats', [])
289    nat = [n for n in nats if n['name'] == nat_gateway]
290    return nat[0].get('natIpAllocateOption', '')
def get_enable_dynamic_port_allocation(self, nat_gateway) -> str:
292  def get_enable_dynamic_port_allocation(self, nat_gateway) -> str:
293    nats = self._resource_data.get('nats', [])
294    nat = [n for n in nats if n['name'] == nat_gateway]
295    return nat[0].get('enableDynamicPortAllocation', '')
def subnet_has_nat(self, subnetwork):
297  def subnet_has_nat(self, subnetwork):
298    if not self._resource_data.get('nats', []):
299      return False
300    for n in self._resource_data.get('nats', []):
301      if n['sourceSubnetworkIpRangesToNat'] == 'LIST_OF_SUBNETWORKS':
302        # Cloud NAT configure for specific subnets
303        if 'subnetworks' in n and subnetwork.self_link in [
304            s['name'] for s in n['subnetworks']
305        ]:
306          return True
307      else:
308        # Cloud NAT configured for all subnets
309        return True
310    return False
class RouterStatus(gcpdiag.models.Resource):
313class RouterStatus(models.Resource):
314  """NAT Router Status"""
315
316  _resource_data: dict
317
318  def __init__(self, project_id, resource_data):
319    super().__init__(project_id=project_id)
320    self._resource_data = resource_data
321
322  @property
323  def full_path(self) -> str:
324    result = re.match(r'https://www.googleapis.com/compute/v1/(.*)',
325                      self.self_link)
326    if result:
327      return result.group(1)
328    else:
329      return f'>> {self.self_link}'
330
331  @property
332  def short_path(self) -> str:
333    path = self.project_id + '/' + self.name
334    return path
335
336  @property
337  def name(self) -> str:
338    return self._resource_data.get('name', '')
339
340  @property
341  def self_link(self) -> str:
342    return self._resource_data.get('selfLink', '')
343
344  @property
345  def min_extra_nat_ips_needed(self) -> str:
346    nat_status = self._resource_data.get('result', {}).get('natStatus', {})
347    return nat_status[0].get('minExtraNatIpsNeeded', None)
348
349  @property
350  def num_vms_with_nat_mappings(self) -> str:
351    nat_status = self._resource_data.get('result', {}).get('natStatus', {})
352    return nat_status[0].get('numVmEndpointsWithNatMappings', None)

NAT Router Status

RouterStatus(project_id, resource_data)
318  def __init__(self, project_id, resource_data):
319    super().__init__(project_id=project_id)
320    self._resource_data = resource_data
full_path: str
322  @property
323  def full_path(self) -> str:
324    result = re.match(r'https://www.googleapis.com/compute/v1/(.*)',
325                      self.self_link)
326    if result:
327      return result.group(1)
328    else:
329      return f'>> {self.self_link}'

Returns the full path of this resource.

Example: 'projects/gcpdiag-gke-1-9b90/zones/europe-west4-a/clusters/gke1'

short_path: str
331  @property
332  def short_path(self) -> str:
333    path = self.project_id + '/' + self.name
334    return path

Returns the short name for this resource.

Note that it isn't clear from this name what kind of resource it is.

Example: 'gke1'

name: str
336  @property
337  def name(self) -> str:
338    return self._resource_data.get('name', '')
min_extra_nat_ips_needed: str
344  @property
345  def min_extra_nat_ips_needed(self) -> str:
346    nat_status = self._resource_data.get('result', {}).get('natStatus', {})
347    return nat_status[0].get('minExtraNatIpsNeeded', None)
num_vms_with_nat_mappings: str
349  @property
350  def num_vms_with_nat_mappings(self) -> str:
351    nat_status = self._resource_data.get('result', {}).get('natStatus', {})
352    return nat_status[0].get('numVmEndpointsWithNatMappings', None)
class RouterNatIpInfo(gcpdiag.models.Resource):
355class RouterNatIpInfo(models.Resource):
356  """NAT IP Info"""
357
358  _resource_data: dict
359
360  def __init__(self, project_id, resource_data):
361    super().__init__(project_id=project_id)
362    self._resource_data = resource_data
363
364  @property
365  def full_path(self) -> str:
366    result = re.match(r'https://www.googleapis.com/compute/v1/(.*)',
367                      self.self_link)
368    if result:
369      return result.group(1)
370    else:
371      return f'>> {self.self_link}'
372
373  @property
374  def short_path(self) -> str:
375    path = self.project_id + '/' + self.name
376    return path
377
378  @property
379  def self_link(self) -> str:
380    return self._resource_data.get('selfLink', '')
381
382  @property
383  def name(self) -> str:
384    return self._resource_data.get('name', '')
385
386  @property
387  def result(self) -> str:
388    return self._resource_data.get('result', [])

NAT IP Info

RouterNatIpInfo(project_id, resource_data)
360  def __init__(self, project_id, resource_data):
361    super().__init__(project_id=project_id)
362    self._resource_data = resource_data
full_path: str
364  @property
365  def full_path(self) -> str:
366    result = re.match(r'https://www.googleapis.com/compute/v1/(.*)',
367                      self.self_link)
368    if result:
369      return result.group(1)
370    else:
371      return f'>> {self.self_link}'

Returns the full path of this resource.

Example: 'projects/gcpdiag-gke-1-9b90/zones/europe-west4-a/clusters/gke1'

short_path: str
373  @property
374  def short_path(self) -> str:
375    path = self.project_id + '/' + self.name
376    return path

Returns the short name for this resource.

Note that it isn't clear from this name what kind of resource it is.

Example: 'gke1'

name: str
382  @property
383  def name(self) -> str:
384    return self._resource_data.get('name', '')
result: str
386  @property
387  def result(self) -> str:
388    return self._resource_data.get('result', [])
@dataclasses.dataclass
class Peering:
391@dataclasses.dataclass
392class Peering:
393  """VPC Peerings"""
394
395  name: str
396  url: str
397  state: str
398  exports_custom_routes: bool
399  imports_custom_routes: bool
400  auto_creates_routes: bool
401
402  def __str__(self):
403    return self.name

VPC Peerings

Peering( name: str, url: str, state: str, exports_custom_routes: bool, imports_custom_routes: bool, auto_creates_routes: bool)
name: str
url: str
state: str
exports_custom_routes: bool
imports_custom_routes: bool
auto_creates_routes: bool
class Network(gcpdiag.models.Resource):
406class Network(models.Resource):
407  """A VPC network."""
408  _resource_data: dict
409  _subnetworks: Optional[Dict[str, Subnetwork]]
410
411  def __init__(self, project_id, resource_data):
412    super().__init__(project_id=project_id)
413    self._resource_data = resource_data
414    self._subnetworks = None
415
416  @property
417  def full_path(self) -> str:
418    result = re.match(r'https://www.googleapis.com/compute/v1/(.*)',
419                      self.self_link)
420    if result:
421      return result.group(1)
422    else:
423      return f'>> {self.self_link}'
424
425  @property
426  def short_path(self) -> str:
427    path = self.project_id + '/' + self.name
428    return path
429
430  @property
431  def name(self) -> str:
432    return self._resource_data['name']
433
434  @property
435  def self_link(self) -> str:
436    return self._resource_data.get('selfLink', '')
437
438  @property
439  def firewall(self) -> 'EffectiveFirewalls':
440    return _get_effective_firewalls(self)
441
442  @property
443  def mtu(self) -> int:
444    if 'mtu' in self._resource_data:
445      return self._resource_data['mtu']
446    return DEFAULT_MTU
447
448  @property
449  def subnetworks(self) -> Dict[str, Subnetwork]:
450    return _batch_get_subnetworks(
451        self._project_id, frozenset(self._resource_data.get('subnetworks', [])))
452
453  @property
454  def peerings(self) -> List[Peering]:
455    return [
456        Peering(peer['name'], peer['network'], peer['state'],
457                peer['exportCustomRoutes'], peer['importCustomRoutes'],
458                peer['autoCreateRoutes'])
459        for peer in self._resource_data.get('peerings', [])
460    ]

A VPC network.

Network(project_id, resource_data)
411  def __init__(self, project_id, resource_data):
412    super().__init__(project_id=project_id)
413    self._resource_data = resource_data
414    self._subnetworks = None
full_path: str
416  @property
417  def full_path(self) -> str:
418    result = re.match(r'https://www.googleapis.com/compute/v1/(.*)',
419                      self.self_link)
420    if result:
421      return result.group(1)
422    else:
423      return f'>> {self.self_link}'

Returns the full path of this resource.

Example: 'projects/gcpdiag-gke-1-9b90/zones/europe-west4-a/clusters/gke1'

short_path: str
425  @property
426  def short_path(self) -> str:
427    path = self.project_id + '/' + self.name
428    return path

Returns the short name for this resource.

Note that it isn't clear from this name what kind of resource it is.

Example: 'gke1'

name: str
430  @property
431  def name(self) -> str:
432    return self._resource_data['name']
firewall: EffectiveFirewalls
438  @property
439  def firewall(self) -> 'EffectiveFirewalls':
440    return _get_effective_firewalls(self)
mtu: int
442  @property
443  def mtu(self) -> int:
444    if 'mtu' in self._resource_data:
445      return self._resource_data['mtu']
446    return DEFAULT_MTU
subnetworks: Dict[str, Subnetwork]
448  @property
449  def subnetworks(self) -> Dict[str, Subnetwork]:
450    return _batch_get_subnetworks(
451        self._project_id, frozenset(self._resource_data.get('subnetworks', [])))
peerings: List[Peering]
453  @property
454  def peerings(self) -> List[Peering]:
455    return [
456        Peering(peer['name'], peer['network'], peer['state'],
457                peer['exportCustomRoutes'], peer['importCustomRoutes'],
458                peer['autoCreateRoutes'])
459        for peer in self._resource_data.get('peerings', [])
460    ]
@dataclasses.dataclass
class FirewallCheckResult:
568@dataclasses.dataclass
569class FirewallCheckResult:
570  """The result of a firewall connectivity check."""
571
572  action: str
573  firewall_policy_name: Optional[str] = None
574  firewall_policy_rule_description: Optional[str] = None
575  vpc_firewall_rule_id: Optional[str] = None
576  vpc_firewall_rule_name: Optional[str] = None
577
578  def __str__(self):
579    return self.action
580
581  @property
582  def matched_by_str(self):
583    if self.firewall_policy_name:
584      if self.firewall_policy_rule_description:
585        return f'policy: {self.firewall_policy_name}, rule: {self.firewall_policy_rule_description}'
586      else:
587        return f'policy: {self.firewall_policy_name}'
588    elif self.vpc_firewall_rule_name:
589      return f'vpc firewall rule: {self.vpc_firewall_rule_name}'

The result of a firewall connectivity check.

FirewallCheckResult( action: str, firewall_policy_name: Optional[str] = None, firewall_policy_rule_description: Optional[str] = None, vpc_firewall_rule_id: Optional[str] = None, vpc_firewall_rule_name: Optional[str] = None)
action: str
firewall_policy_name: Optional[str] = None
firewall_policy_rule_description: Optional[str] = None
vpc_firewall_rule_id: Optional[str] = None
vpc_firewall_rule_name: Optional[str] = None
matched_by_str
581  @property
582  def matched_by_str(self):
583    if self.firewall_policy_name:
584      if self.firewall_policy_rule_description:
585        return f'policy: {self.firewall_policy_name}, rule: {self.firewall_policy_rule_description}'
586      else:
587        return f'policy: {self.firewall_policy_name}'
588    elif self.vpc_firewall_rule_name:
589      return f'vpc firewall rule: {self.vpc_firewall_rule_name}'
class FirewallRuleNotFoundError(builtins.Exception):
592class FirewallRuleNotFoundError(Exception):
593  rule_name: str
594
595  def __init__(self, name, disabled=False):
596    # Call the base class constructor with the parameters it needs
597    super().__init__(f'firewall rule not found: {name}')
598    self.rule_name = name
599    self.disabled = disabled

Common base class for all non-exit exceptions.

FirewallRuleNotFoundError(name, disabled=False)
595  def __init__(self, name, disabled=False):
596    # Call the base class constructor with the parameters it needs
597    super().__init__(f'firewall rule not found: {name}')
598    self.rule_name = name
599    self.disabled = disabled
rule_name: str
disabled
class VpcFirewallRule:
602class VpcFirewallRule:
603  """Represents firewall rule"""
604
605  def __init__(self, resource_data):
606    self._resource_data = resource_data
607
608  @property
609  def name(self) -> str:
610    return self._resource_data['name']
611
612  @property
613  def source_ranges(self) -> List[ipaddress.IPv4Network]:
614    return self._resource_data['sourceRanges']
615
616  @property
617  def target_tags(self) -> set:
618    return self._resource_data['targetTags']
619
620  @property
621  def allowed(self) -> List[dict]:
622    return self._resource_data['allowed']
623
624  def is_enabled(self) -> bool:
625    return not self._resource_data['disabled']

Represents firewall rule

VpcFirewallRule(resource_data)
605  def __init__(self, resource_data):
606    self._resource_data = resource_data
name: str
608  @property
609  def name(self) -> str:
610    return self._resource_data['name']
source_ranges: List[ipaddress.IPv4Network]
612  @property
613  def source_ranges(self) -> List[ipaddress.IPv4Network]:
614    return self._resource_data['sourceRanges']
target_tags: set
616  @property
617  def target_tags(self) -> set:
618    return self._resource_data['targetTags']
allowed: List[dict]
620  @property
621  def allowed(self) -> List[dict]:
622    return self._resource_data['allowed']
def is_enabled(self) -> bool:
624  def is_enabled(self) -> bool:
625    return not self._resource_data['disabled']
class EffectiveFirewalls:
 977class EffectiveFirewalls:
 978  """Effective firewall rules for a VPC network or Instance.
 979
 980  Includes org/folder firewall policies)."""
 981  _resource_data: dict
 982  _policies: List[_FirewallPolicy]
 983  _vpc_firewall: _VpcFirewall
 984
 985  def __init__(self, resource_data):
 986    self._resource_data = resource_data
 987    self._policies = []
 988    if 'firewallPolicys' in resource_data:
 989      for policy in resource_data['firewallPolicys']:
 990        self._policies.append(_FirewallPolicy(policy))
 991    self._vpc_firewall = _VpcFirewall(resource_data.get('firewalls', {}))
 992
 993  def check_connectivity_ingress(
 994      self,  #
 995      *,
 996      src_ip: IPAddrOrNet,
 997      ip_protocol: str,
 998      port: Optional[int] = None,
 999      source_service_account: Optional[str] = None,
1000      source_tags: Optional[List[str]] = None,
1001      target_service_account: Optional[str] = None,
1002      target_tags: Optional[List[str]] = None) -> FirewallCheckResult:
1003
1004    if ip_protocol != 'ICMP' and port is None:
1005      raise ValueError('TCP and UDP must have port numbers')
1006
1007    # Firewall policies (organization, folders)
1008    for p in self._policies:
1009      result = p.check_connectivity_ingress(
1010          src_ip=src_ip,
1011          ip_protocol=ip_protocol,
1012          port=port,
1013          #target_network=self._network,
1014          target_service_account=target_service_account)
1015      if result.action != 'goto_next':
1016        return result
1017
1018    # VPC firewall rules
1019    return self._vpc_firewall.check_connectivity_ingress(
1020        src_ip=src_ip,
1021        ip_protocol=ip_protocol,
1022        port=port,
1023        source_service_account=source_service_account,
1024        source_tags=source_tags,
1025        target_service_account=target_service_account,
1026        target_tags=target_tags)
1027
1028  def check_connectivity_egress(
1029      self,  #
1030      *,
1031      src_ip: IPAddrOrNet,
1032      ip_protocol: str,
1033      port: Optional[int] = None,
1034      source_service_account: Optional[str] = None,
1035      source_tags: Optional[List[str]] = None,
1036      target_service_account: Optional[str] = None,
1037      target_tags: Optional[List[str]] = None) -> FirewallCheckResult:
1038
1039    if ip_protocol != 'ICMP' and port is None:
1040      raise ValueError('TCP and UDP must have port numbers')
1041
1042    # Firewall policies (organization, folders)
1043    for p in self._policies:
1044      result = p.check_connectivity_egress(
1045          src_ip=src_ip,
1046          ip_protocol=ip_protocol,
1047          port=port,
1048          #target_network=self._network,
1049          target_service_account=target_service_account)
1050      if result.action != 'goto_next':
1051        return result
1052
1053    # VPC firewall rules
1054    return self._vpc_firewall.check_connectivity_egress(
1055        src_ip=src_ip,
1056        ip_protocol=ip_protocol,
1057        port=port,
1058        source_service_account=source_service_account,
1059        source_tags=source_tags,
1060        target_service_account=target_service_account,
1061        target_tags=target_tags)
1062
1063  def get_vpc_ingress_rules(
1064      self,
1065      name: Optional[str] = None,
1066      name_pattern: Optional[re.Pattern] = None,
1067      target_tags: Optional[List[str]] = None) -> List[VpcFirewallRule]:
1068    """Retrieve the list of ingress firewall rules matching name or name pattern and target tags.
1069
1070    Args:
1071        name (Optional[str], optional): firewall rune name. Defaults to None.
1072        name_pattern (Optional[re.Pattern], optional): firewall rule name pattern. Defaults to None.
1073        target_tags (Optional[List[str]], optional): firewall target tags
1074          (if not specified any tag will match). Defaults to None.
1075
1076    Returns:
1077        List[VpcFirewallRule]: List of ingress firewall rules
1078    """
1079    rules = self._vpc_firewall.get_vpc_ingress_rules(name, name_pattern,
1080                                                     target_tags)
1081    return rules
1082
1083  def get_vpc_egress_rules(
1084      self,
1085      name: Optional[str] = None,
1086      name_pattern: Optional[re.Pattern] = None,
1087      target_tags: Optional[List[str]] = None) -> List[VpcFirewallRule]:
1088    """Retrieve the list of egress firewall rules matching name or name pattern and target tags.
1089
1090    Args:
1091        name (Optional[str], optional): firewall rune name. Defaults to None.
1092        name_pattern (Optional[re.Pattern], optional): firewall rule name pattern. Defaults to None.
1093        target_tags (Optional[List[str]], optional): firewall target tags
1094          (if not specified any tag will match). Defaults to None.
1095
1096    Returns:
1097        List[VpcFirewallRule]: List of egress firewall rules
1098    """
1099    rules = self._vpc_firewall.get_vpc_egress_rules(name, name_pattern,
1100                                                    target_tags)
1101    return rules
1102
1103  def verify_ingress_rule_exists(self, name: str):
1104    """Verify that a certain VPC rule exists. This is useful to verify
1105    whether maybe a permission was missing on a shared VPC and an
1106    automatic rule couldn't be created."""
1107    return self._vpc_firewall.verify_ingress_rule_exists(name)
1108
1109  def verify_egress_rule_exists(self, name: str):
1110    """Verify that a certain VPC rule exists. This is useful to verify
1111    whether maybe a permission was missing on a shared VPC and an
1112    automatic rule couldn't be created."""
1113    return self._vpc_firewall.verify_egress_rule_exists(name)

Effective firewall rules for a VPC network or Instance.

Includes org/folder firewall policies).

EffectiveFirewalls(resource_data)
985  def __init__(self, resource_data):
986    self._resource_data = resource_data
987    self._policies = []
988    if 'firewallPolicys' in resource_data:
989      for policy in resource_data['firewallPolicys']:
990        self._policies.append(_FirewallPolicy(policy))
991    self._vpc_firewall = _VpcFirewall(resource_data.get('firewalls', {}))
def check_connectivity_ingress( self, *, src_ip: Union[ipaddress.IPv4Address, ipaddress.IPv6Address, ipaddress.IPv4Network, ipaddress.IPv6Network], ip_protocol: str, port: Optional[int] = None, source_service_account: Optional[str] = None, source_tags: Optional[List[str]] = None, target_service_account: Optional[str] = None, target_tags: Optional[List[str]] = None) -> FirewallCheckResult:
 993  def check_connectivity_ingress(
 994      self,  #
 995      *,
 996      src_ip: IPAddrOrNet,
 997      ip_protocol: str,
 998      port: Optional[int] = None,
 999      source_service_account: Optional[str] = None,
1000      source_tags: Optional[List[str]] = None,
1001      target_service_account: Optional[str] = None,
1002      target_tags: Optional[List[str]] = None) -> FirewallCheckResult:
1003
1004    if ip_protocol != 'ICMP' and port is None:
1005      raise ValueError('TCP and UDP must have port numbers')
1006
1007    # Firewall policies (organization, folders)
1008    for p in self._policies:
1009      result = p.check_connectivity_ingress(
1010          src_ip=src_ip,
1011          ip_protocol=ip_protocol,
1012          port=port,
1013          #target_network=self._network,
1014          target_service_account=target_service_account)
1015      if result.action != 'goto_next':
1016        return result
1017
1018    # VPC firewall rules
1019    return self._vpc_firewall.check_connectivity_ingress(
1020        src_ip=src_ip,
1021        ip_protocol=ip_protocol,
1022        port=port,
1023        source_service_account=source_service_account,
1024        source_tags=source_tags,
1025        target_service_account=target_service_account,
1026        target_tags=target_tags)
def check_connectivity_egress( self, *, src_ip: Union[ipaddress.IPv4Address, ipaddress.IPv6Address, ipaddress.IPv4Network, ipaddress.IPv6Network], ip_protocol: str, port: Optional[int] = None, source_service_account: Optional[str] = None, source_tags: Optional[List[str]] = None, target_service_account: Optional[str] = None, target_tags: Optional[List[str]] = None) -> FirewallCheckResult:
1028  def check_connectivity_egress(
1029      self,  #
1030      *,
1031      src_ip: IPAddrOrNet,
1032      ip_protocol: str,
1033      port: Optional[int] = None,
1034      source_service_account: Optional[str] = None,
1035      source_tags: Optional[List[str]] = None,
1036      target_service_account: Optional[str] = None,
1037      target_tags: Optional[List[str]] = None) -> FirewallCheckResult:
1038
1039    if ip_protocol != 'ICMP' and port is None:
1040      raise ValueError('TCP and UDP must have port numbers')
1041
1042    # Firewall policies (organization, folders)
1043    for p in self._policies:
1044      result = p.check_connectivity_egress(
1045          src_ip=src_ip,
1046          ip_protocol=ip_protocol,
1047          port=port,
1048          #target_network=self._network,
1049          target_service_account=target_service_account)
1050      if result.action != 'goto_next':
1051        return result
1052
1053    # VPC firewall rules
1054    return self._vpc_firewall.check_connectivity_egress(
1055        src_ip=src_ip,
1056        ip_protocol=ip_protocol,
1057        port=port,
1058        source_service_account=source_service_account,
1059        source_tags=source_tags,
1060        target_service_account=target_service_account,
1061        target_tags=target_tags)
def get_vpc_ingress_rules( self, name: Optional[str] = None, name_pattern: Optional[re.Pattern] = None, target_tags: Optional[List[str]] = None) -> List[VpcFirewallRule]:
1063  def get_vpc_ingress_rules(
1064      self,
1065      name: Optional[str] = None,
1066      name_pattern: Optional[re.Pattern] = None,
1067      target_tags: Optional[List[str]] = None) -> List[VpcFirewallRule]:
1068    """Retrieve the list of ingress firewall rules matching name or name pattern and target tags.
1069
1070    Args:
1071        name (Optional[str], optional): firewall rune name. Defaults to None.
1072        name_pattern (Optional[re.Pattern], optional): firewall rule name pattern. Defaults to None.
1073        target_tags (Optional[List[str]], optional): firewall target tags
1074          (if not specified any tag will match). Defaults to None.
1075
1076    Returns:
1077        List[VpcFirewallRule]: List of ingress firewall rules
1078    """
1079    rules = self._vpc_firewall.get_vpc_ingress_rules(name, name_pattern,
1080                                                     target_tags)
1081    return rules

Retrieve the list of ingress firewall rules matching name or name pattern and target tags.

Arguments:
  • name (Optional[str], optional): firewall rune name. Defaults to None.
  • name_pattern (Optional[re.Pattern], optional): firewall rule name pattern. Defaults to None.
  • target_tags (Optional[List[str]], optional): firewall target tags (if not specified any tag will match). Defaults to None.
Returns:

List[VpcFirewallRule]: List of ingress firewall rules

def get_vpc_egress_rules( self, name: Optional[str] = None, name_pattern: Optional[re.Pattern] = None, target_tags: Optional[List[str]] = None) -> List[VpcFirewallRule]:
1083  def get_vpc_egress_rules(
1084      self,
1085      name: Optional[str] = None,
1086      name_pattern: Optional[re.Pattern] = None,
1087      target_tags: Optional[List[str]] = None) -> List[VpcFirewallRule]:
1088    """Retrieve the list of egress firewall rules matching name or name pattern and target tags.
1089
1090    Args:
1091        name (Optional[str], optional): firewall rune name. Defaults to None.
1092        name_pattern (Optional[re.Pattern], optional): firewall rule name pattern. Defaults to None.
1093        target_tags (Optional[List[str]], optional): firewall target tags
1094          (if not specified any tag will match). Defaults to None.
1095
1096    Returns:
1097        List[VpcFirewallRule]: List of egress firewall rules
1098    """
1099    rules = self._vpc_firewall.get_vpc_egress_rules(name, name_pattern,
1100                                                    target_tags)
1101    return rules

Retrieve the list of egress firewall rules matching name or name pattern and target tags.

Arguments:
  • name (Optional[str], optional): firewall rune name. Defaults to None.
  • name_pattern (Optional[re.Pattern], optional): firewall rule name pattern. Defaults to None.
  • target_tags (Optional[List[str]], optional): firewall target tags (if not specified any tag will match). Defaults to None.
Returns:

List[VpcFirewallRule]: List of egress firewall rules

def verify_ingress_rule_exists(self, name: str):
1103  def verify_ingress_rule_exists(self, name: str):
1104    """Verify that a certain VPC rule exists. This is useful to verify
1105    whether maybe a permission was missing on a shared VPC and an
1106    automatic rule couldn't be created."""
1107    return self._vpc_firewall.verify_ingress_rule_exists(name)

Verify that a certain VPC rule exists. This is useful to verify whether maybe a permission was missing on a shared VPC and an automatic rule couldn't be created.

def verify_egress_rule_exists(self, name: str):
1109  def verify_egress_rule_exists(self, name: str):
1110    """Verify that a certain VPC rule exists. This is useful to verify
1111    whether maybe a permission was missing on a shared VPC and an
1112    automatic rule couldn't be created."""
1113    return self._vpc_firewall.verify_egress_rule_exists(name)

Verify that a certain VPC rule exists. This is useful to verify whether maybe a permission was missing on a shared VPC and an automatic rule couldn't be created.

class VPCEffectiveFirewalls(EffectiveFirewalls):
1116class VPCEffectiveFirewalls(EffectiveFirewalls):
1117  """Effective firewall rules for a VPC network.
1118
1119  Includes org/folder firewall policies)."""
1120  _network: Network
1121
1122  def __init__(self, network, resource_data):
1123    super().__init__(resource_data)
1124    self._network = network

Effective firewall rules for a VPC network.

Includes org/folder firewall policies).

VPCEffectiveFirewalls(network, resource_data)
1122  def __init__(self, network, resource_data):
1123    super().__init__(resource_data)
1124    self._network = network
@caching.cached_api_call(in_memory=True)
def get_network(project_id: str, network_name: str) -> Network:
1136@caching.cached_api_call(in_memory=True)
1137def get_network(project_id: str, network_name: str) -> Network:
1138  logging.info('fetching network: %s/%s', project_id, network_name)
1139  compute = apis.get_api('compute', 'v1', project_id)
1140  request = compute.networks().get(project=project_id, network=network_name)
1141  response = request.execute(num_retries=config.API_RETRIES)
1142  return Network(project_id, response)
def get_subnetwork_from_url(url: str) -> Subnetwork:
1145def get_subnetwork_from_url(url: str) -> Subnetwork:
1146  """Returns Subnetwork object given subnetwork url"""
1147  m = re.match((r'https://www.googleapis.com/compute/v1/projects/'
1148                r'([^/]+)/regions/([^/]+)/subnetworks/([^/]+)$'), url)
1149  if not m:
1150    raise ValueError(f"can't parse network url: {url}")
1151  (project_id, region, subnetwork_name) = (m.group(1), m.group(2), m.group(3))
1152  return get_subnetwork(project_id, region, subnetwork_name)

Returns Subnetwork object given subnetwork url

def get_network_from_url(url: str) -> Network:
1155def get_network_from_url(url: str) -> Network:
1156  m = re.match(
1157      r'https://www.googleapis.com/compute/v1/projects/([^/]+)/global/networks/([^/]+)',
1158      url)
1159  if not m:
1160    raise ValueError(f"can't parse network url: {url}")
1161  (project_id, network_name) = (m.group(1), m.group(2))
1162  return get_network(project_id, network_name)
@caching.cached_api_call(in_memory=True)
def get_networks(project_id: str) -> List[Network]:
1165@caching.cached_api_call(in_memory=True)
1166def get_networks(project_id: str) -> List[Network]:
1167  logging.info('fetching network: %s', project_id)
1168  compute = apis.get_api('compute', 'v1', project_id)
1169  request = compute.networks().list(project=project_id)
1170  response = request.execute(num_retries=config.API_RETRIES)
1171  return [Network(project_id, item) for item in response.get('items', [])]
@caching.cached_api_call(in_memory=True)
def get_subnetwork( project_id: str, region: str, subnetwork_name: str) -> Subnetwork:
1174@caching.cached_api_call(in_memory=True)
1175def get_subnetwork(project_id: str, region: str,
1176                   subnetwork_name: str) -> Subnetwork:
1177  logging.info('fetching network: %s/%s', project_id, subnetwork_name)
1178  compute = apis.get_api('compute', 'v1', project_id)
1179  request = compute.subnetworks().get(project=project_id,
1180                                      region=region,
1181                                      subnetwork=subnetwork_name)
1182  response = request.execute(num_retries=config.API_RETRIES)
1183  return Subnetwork(project_id, response)
@caching.cached_api_call(in_memory=True)
def get_routes(project_id: str) -> List[Route]:
1212@caching.cached_api_call(in_memory=True)
1213def get_routes(project_id: str) -> List[Route]:
1214  logging.info('fetching routes: %s', project_id)
1215  compute = apis.get_api('compute', 'v1', project_id)
1216  request = compute.routes().list(project=project_id)
1217  response = request.execute(num_retries=config.API_RETRIES)
1218  return [Route(project_id, item) for item in response.get('items', [])]
@caching.cached_api_call(in_memory=True)
def get_zones(project_id: str) -> List[ManagedZone]:
1221@caching.cached_api_call(in_memory=True)
1222def get_zones(project_id: str) -> List[ManagedZone]:
1223  logging.info('fetching DNS zones: %s', project_id)
1224  dns = apis.get_api('dns', 'v1beta2', project_id)
1225  request = dns.managedZones().list(project=project_id)
1226  response = request.execute(num_retries=config.API_RETRIES)
1227  zones = []
1228  for zone in response.get('managedZones', []):
1229    request2 = dns.managedZones().get(project=project_id,
1230                                      managedZone=zone['name'])
1231    response2 = request2.execute(num_retries=config.API_RETRIES)
1232    zones.append(ManagedZone(project_id, response2))
1233  return zones
@caching.cached_api_call(in_memory=True)
def get_routers( project_id: str, region: str, network) -> List[Router]:
1236@caching.cached_api_call(in_memory=True)
1237def get_routers(project_id: str, region: str, network) -> List[Router]:
1238  logging.info('fetching routers: %s/%s', project_id, region)
1239  compute = apis.get_api('compute', 'v1', project_id)
1240  request = compute.routers().list(project=project_id,
1241                                   region=region,
1242                                   filter=f'network="{network.self_link}"')
1243  response = request.execute(num_retries=config.API_RETRIES)
1244  return [Router(project_id, item) for item in response.get('items', [])]
@caching.cached_api_call(in_memory=True)
def get_router(project_id: str, region: str, network) -> Router:
1247@caching.cached_api_call(in_memory=True)
1248def get_router(project_id: str, region: str, network) -> Router:
1249  logging.info('fetching routers: %s/%s', project_id, region)
1250  compute = apis.get_api('compute', 'v1', project_id)
1251  request = compute.routers().list(project=project_id,
1252                                   region=region,
1253                                   filter=f'network="{network.self_link}"')
1254  response = request.execute(num_retries=config.API_RETRIES)
1255  return Router(project_id, next(iter(response.get('items', [{}]))))
@caching.cached_api_call(in_memory=True)
def get_router_by_name( project_id: str, region: str, router_name: str) -> Router:
1258@caching.cached_api_call(in_memory=True)
1259def get_router_by_name(project_id: str, region: str,
1260                       router_name: str) -> Router:
1261  logging.info('fetching router list: %s/%s in region %s', project_id,
1262               router_name, region)
1263  compute = apis.get_api('compute', 'v1', project_id)
1264  request = compute.routers().list(project=project_id, region=region)
1265  response = request.execute(num_retries=config.API_RETRIES)
1266  return next(
1267      Router(project_id, item)
1268      for item in response.get('items', [])
1269      if item['name'] == router_name)
@caching.cached_api_call(in_memory=True)
def nat_router_status( project_id: str, router_name: str, region: str) -> RouterStatus:
1272@caching.cached_api_call(in_memory=True)
1273def nat_router_status(project_id: str, router_name: str,
1274                      region: str) -> RouterStatus:
1275  logging.info('fetching router status: %s/%s in region %s', project_id,
1276               router_name, region)
1277  compute = apis.get_api('compute', 'v1', project_id)
1278  request = compute.routers().getRouterStatus(project=project_id,
1279                                              router=router_name,
1280                                              region=region)
1281  response = request.execute(num_retries=config.API_RETRIES)
1282  if 'result' in str(response):
1283    return RouterStatus(project_id, response)
1284  else:
1285    logging.info('unable to fetch router status: %s/%s in region %s',
1286                 project_id, router_name, region)
1287    return RouterStatus(project_id, {})
@caching.cached_api_call(in_memory=True)
def get_nat_ip_info( project_id: str, router_name: str, region: str) -> RouterNatIpInfo:
1290@caching.cached_api_call(in_memory=True)
1291def get_nat_ip_info(project_id: str, router_name: str,
1292                    region: str) -> RouterNatIpInfo:
1293  logging.info('fetching NAT IP info for router: %s/%s in region %s',
1294               project_id, router_name, region)
1295  compute = apis.get_api('compute', 'v1', project_id)
1296  request = compute.routers().getNatIpInfo(project=project_id,
1297                                           router=router_name,
1298                                           region=region)
1299  response = request.execute(num_retries=config.API_RETRIES)
1300  if 'result' in str(response):
1301    return RouterNatIpInfo(project_id, response)
1302  else:
1303    logging.info('unable to fetch Nat IP Info for router: %s/%s in region %s',
1304                 project_id, router_name, region)
1305    return RouterNatIpInfo(project_id, {})
class VPCSubnetworkIAMPolicy(gcpdiag.queries.iam.BaseIAMPolicy):
1308class VPCSubnetworkIAMPolicy(iam.BaseIAMPolicy):
1309
1310  def _is_resource_permission(self, permission):
1311    return True

Common class for IAM policies

@caching.cached_api_call(in_memory=True)
def get_subnetwork_iam_policy( project_id: str, region: str, subnetwork_name: str) -> VPCSubnetworkIAMPolicy:
1314@caching.cached_api_call(in_memory=True)
1315def get_subnetwork_iam_policy(project_id: str, region: str,
1316                              subnetwork_name: str) -> VPCSubnetworkIAMPolicy:
1317  resource_name = (f'projects/{project_id}/regions/{region}/'
1318                   f'subnetworks/{subnetwork_name}')
1319
1320  compute = apis.get_api('compute', 'v1', project_id)
1321  request = compute.subnetworks().getIamPolicy(project=project_id,
1322                                               region=region,
1323                                               resource=subnetwork_name)
1324
1325  return iam.fetch_iam_policy(request, VPCSubnetworkIAMPolicy, project_id,
1326                              resource_name)
class Address(gcpdiag.models.Resource):
1329class Address(models.Resource):
1330  """IP Addresses."""
1331  _resource_data: dict
1332
1333  def __init__(self, project_id, resource_data):
1334    super().__init__(project_id=project_id)
1335    self._resource_data = resource_data
1336
1337  @property
1338  def full_path(self) -> str:
1339    result = re.match(r'https://www.googleapis.com/compute/v1/(.*)',
1340                      self.self_link)
1341    if result:
1342      return result.group(1)
1343    else:
1344      return f'>> {self.self_link}'
1345
1346  @property
1347  def short_path(self) -> str:
1348    path = self.project_id + '/' + self.name
1349    return path
1350
1351  @property
1352  def name(self) -> str:
1353    return self._resource_data['name']
1354
1355  @property
1356  def self_link(self) -> str:
1357    return self._resource_data.get('selfLink', '')
1358
1359  @property
1360  def subnetwork(self) -> str:
1361    return self._resource_data['subnetwork']
1362
1363  @property
1364  def status(self) -> str:
1365    return self._resource_data['status']

IP Addresses.

Address(project_id, resource_data)
1333  def __init__(self, project_id, resource_data):
1334    super().__init__(project_id=project_id)
1335    self._resource_data = resource_data
full_path: str
1337  @property
1338  def full_path(self) -> str:
1339    result = re.match(r'https://www.googleapis.com/compute/v1/(.*)',
1340                      self.self_link)
1341    if result:
1342      return result.group(1)
1343    else:
1344      return f'>> {self.self_link}'

Returns the full path of this resource.

Example: 'projects/gcpdiag-gke-1-9b90/zones/europe-west4-a/clusters/gke1'

short_path: str
1346  @property
1347  def short_path(self) -> str:
1348    path = self.project_id + '/' + self.name
1349    return path

Returns the short name for this resource.

Note that it isn't clear from this name what kind of resource it is.

Example: 'gke1'

name: str
1351  @property
1352  def name(self) -> str:
1353    return self._resource_data['name']
subnetwork: str
1359  @property
1360  def subnetwork(self) -> str:
1361    return self._resource_data['subnetwork']
status: str
1363  @property
1364  def status(self) -> str:
1365    return self._resource_data['status']
@caching.cached_api_call(in_memory=True)
def get_addresses(project_id: str) -> List[Address]:
1368@caching.cached_api_call(in_memory=True)
1369def get_addresses(project_id: str) -> List[Address]:
1370  logging.info('fetching addresses list: %s', project_id)
1371  compute = apis.get_api('compute', 'v1', project_id)
1372  addresses = []
1373  request = compute.addresses().aggregatedList(project=project_id)
1374  response = request.execute(num_retries=config.API_RETRIES)
1375  addresses_by_regions = response['items']
1376  for _, data_ in addresses_by_regions.items():
1377    if 'addresses' not in data_:
1378      continue
1379    addresses.extend(
1380        [Address(project_id, address) for address in data_['addresses']])
1381  return addresses