gcpdiag.queries.network

Queries related to VPC Networks.
IPv4AddrOrIPv6Addr = typing.Union[ipaddress.IPv4Address, ipaddress.IPv6Address]
IPv4NetOrIPv6Net = typing.Union[ipaddress.IPv4Network, ipaddress.IPv6Network]
IPAddrOrNet = typing.Union[ipaddress.IPv4Address, ipaddress.IPv6Address, ipaddress.IPv4Network, ipaddress.IPv6Network]
class Subnetwork(gcpdiag.models.Resource):
32class Subnetwork(models.Resource):
33  """A VPC subnetwork."""
34
35  _resource_data: dict
36
37  def __init__(self, project_id, resource_data):
38    super().__init__(project_id=project_id)
39    self._resource_data = resource_data
40
41  @property
42  def full_path(self) -> str:
43    result = re.match(r'https://www.googleapis.com/compute/v1/(.*)',
44                      self.self_link)
45    if result:
46      return result.group(1)
47    else:
48      return f'>> {self.self_link}'
49
50  @property
51  def short_path(self) -> str:
52    path = self.project_id + '/' + self.name
53    return path
54
55  @property
56  def name(self) -> str:
57    return self._resource_data['name']
58
59  @property
60  def self_link(self) -> str:
61    return self._resource_data.get('selfLink', '')
62
63  @property
64  def ip_network(self) -> IPv4NetOrIPv6Net:
65    return ipaddress.ip_network(self._resource_data['ipCidrRange'])
66
67  @property
68  def region(self) -> str:
69    # https://www.googleapis.com/compute/v1/projects/gcpdiag-gke1-aaaa/regions/europe-west4
70    m = re.match(
71        r'https://www.googleapis.com/compute/v1/projects/([^/]+)/regions/([^/]+)',
72        self._resource_data['region'])
73    if not m:
74      raise RuntimeError(
75          f"can't parse region URL: {self._resource_data['region']}")
76    return m.group(2)
77
78  def is_private_ip_google_access(self) -> bool:
79    return self._resource_data.get('privateIpGoogleAccess', False)
80
81  @property
82  def network(self):
83    return self._resource_data['network']

A VPC subnetwork.

Subnetwork(project_id, resource_data)
37  def __init__(self, project_id, resource_data):
38    super().__init__(project_id=project_id)
39    self._resource_data = resource_data
full_path: str
41  @property
42  def full_path(self) -> str:
43    result = re.match(r'https://www.googleapis.com/compute/v1/(.*)',
44                      self.self_link)
45    if result:
46      return result.group(1)
47    else:
48      return f'>> {self.self_link}'

Returns the full path of this resource.

Example: 'projects/gcpdiag-gke-1-9b90/zones/europe-west4-a/clusters/gke1'

short_path: str
50  @property
51  def short_path(self) -> str:
52    path = self.project_id + '/' + self.name
53    return path

Returns the short name for this resource.

Note that it isn't clear from this name what kind of resource it is.

Example: 'gke1'

name: str
55  @property
56  def name(self) -> str:
57    return self._resource_data['name']
ip_network: Union[ipaddress.IPv4Network, ipaddress.IPv6Network]
63  @property
64  def ip_network(self) -> IPv4NetOrIPv6Net:
65    return ipaddress.ip_network(self._resource_data['ipCidrRange'])
region: str
67  @property
68  def region(self) -> str:
69    # https://www.googleapis.com/compute/v1/projects/gcpdiag-gke1-aaaa/regions/europe-west4
70    m = re.match(
71        r'https://www.googleapis.com/compute/v1/projects/([^/]+)/regions/([^/]+)',
72        self._resource_data['region'])
73    if not m:
74      raise RuntimeError(
75          f"can't parse region URL: {self._resource_data['region']}")
76    return m.group(2)
def is_private_ip_google_access(self) -> bool:
78  def is_private_ip_google_access(self) -> bool:
79    return self._resource_data.get('privateIpGoogleAccess', False)
network
81  @property
82  def network(self):
83    return self._resource_data['network']
class Route(gcpdiag.models.Resource):
 86class Route(models.Resource):
 87  """A VPC Route."""
 88
 89  _resource_data: dict
 90
 91  def __init__(self, project_id, resource_data):
 92    super().__init__(project_id=project_id)
 93    self._resource_data = resource_data
 94
 95  @property
 96  def full_path(self) -> str:
 97    result = re.match(r'https://www.googleapis.com/compute/v1/(.*)',
 98                      self.self_link)
 99    if result:
100      return result.group(1)
101    else:
102      return f'>> {self.self_link}'
103
104  @property
105  def short_path(self) -> str:
106    path = self.project_id + '/' + self.name
107    return path
108
109  @property
110  def name(self) -> str:
111    return self._resource_data['name']
112
113  @property
114  def kind(self) -> str:
115    return self._resource_data['kind']
116
117  @property
118  def self_link(self) -> str:
119    return self._resource_data['selfLink']
120
121  @property
122  def network(self) -> str:
123    return self._resource_data['network']
124
125  @property
126  def tags(self) -> List[str]:
127    if 'tags' in self._resource_data:
128      return self._resource_data['tags']
129    return []
130
131  @property
132  def dest_range(self) -> str:
133    return self._resource_data['destRange']
134
135  @property
136  def next_hop_gateway(self) -> Optional[str]:
137    if 'nextHopGateway' in self._resource_data:
138      return self._resource_data['nextHopGateway']
139    return None
140
141  @property
142  def next_hop_vpn_tunnel(self) -> Optional[str]:
143    return self._resource_data.get('nextHopVpnTunnel')
144
145  @property
146  def next_hop_hub(self) -> Optional[str]:
147    return self._resource_data.get('nextHopHub')
148
149  @property
150  def priority(self) -> int:
151    return self._resource_data['priority']
152
153  def get_next_hop(self) -> Union[Dict[str, Any], Optional[str]]:
154    hop_types = {
155        'nextHopGateway': 'nextHopGateway',
156        'nextHopVpnTunnel': 'nextHopVpnTunnel',
157        'nextHopHub': 'nextHopHub',
158        'nextHopInstance': 'nextHopInstance',
159        'nextHopAddress': 'nextHopAddress',
160        'nextHopPeering': 'nextHopPeering',
161        'nextHopIlb': 'nextHopIlb',
162        'nextHopNetwork': 'nextHopNetwork',
163        'nextHopIp': 'nextHopIp'
164    }
165
166    for hop_type, value in hop_types.items():
167      if self._resource_data.get(hop_type):
168        return {'type': value, 'link': self._resource_data[hop_type]}
169    return None
170
171  def check_route_match(self, ip1: IPAddrOrNet, ip2: str) -> bool:
172    ip2_list = [ipaddress.ip_network(ip2)]
173    if _ip_match(ip1, ip2_list, 'allow'):
174      return True
175    return False

A VPC Route.

Route(project_id, resource_data)
91  def __init__(self, project_id, resource_data):
92    super().__init__(project_id=project_id)
93    self._resource_data = resource_data
full_path: str
 95  @property
 96  def full_path(self) -> str:
 97    result = re.match(r'https://www.googleapis.com/compute/v1/(.*)',
 98                      self.self_link)
 99    if result:
100      return result.group(1)
101    else:
102      return f'>> {self.self_link}'

Returns the full path of this resource.

Example: 'projects/gcpdiag-gke-1-9b90/zones/europe-west4-a/clusters/gke1'

short_path: str
104  @property
105  def short_path(self) -> str:
106    path = self.project_id + '/' + self.name
107    return path

Returns the short name for this resource.

Note that it isn't clear from this name what kind of resource it is.

Example: 'gke1'

name: str
109  @property
110  def name(self) -> str:
111    return self._resource_data['name']
kind: str
113  @property
114  def kind(self) -> str:
115    return self._resource_data['kind']
network: str
121  @property
122  def network(self) -> str:
123    return self._resource_data['network']
tags: List[str]
125  @property
126  def tags(self) -> List[str]:
127    if 'tags' in self._resource_data:
128      return self._resource_data['tags']
129    return []
dest_range: str
131  @property
132  def dest_range(self) -> str:
133    return self._resource_data['destRange']
next_hop_gateway: Optional[str]
135  @property
136  def next_hop_gateway(self) -> Optional[str]:
137    if 'nextHopGateway' in self._resource_data:
138      return self._resource_data['nextHopGateway']
139    return None
next_hop_vpn_tunnel: Optional[str]
141  @property
142  def next_hop_vpn_tunnel(self) -> Optional[str]:
143    return self._resource_data.get('nextHopVpnTunnel')
next_hop_hub: Optional[str]
145  @property
146  def next_hop_hub(self) -> Optional[str]:
147    return self._resource_data.get('nextHopHub')
priority: int
149  @property
150  def priority(self) -> int:
151    return self._resource_data['priority']
def get_next_hop(self) -> Union[Dict[str, Any], str, NoneType]:
153  def get_next_hop(self) -> Union[Dict[str, Any], Optional[str]]:
154    hop_types = {
155        'nextHopGateway': 'nextHopGateway',
156        'nextHopVpnTunnel': 'nextHopVpnTunnel',
157        'nextHopHub': 'nextHopHub',
158        'nextHopInstance': 'nextHopInstance',
159        'nextHopAddress': 'nextHopAddress',
160        'nextHopPeering': 'nextHopPeering',
161        'nextHopIlb': 'nextHopIlb',
162        'nextHopNetwork': 'nextHopNetwork',
163        'nextHopIp': 'nextHopIp'
164    }
165
166    for hop_type, value in hop_types.items():
167      if self._resource_data.get(hop_type):
168        return {'type': value, 'link': self._resource_data[hop_type]}
169    return None
def check_route_match( self, ip1: Union[ipaddress.IPv4Address, ipaddress.IPv6Address, ipaddress.IPv4Network, ipaddress.IPv6Network], ip2: str) -> bool:
171  def check_route_match(self, ip1: IPAddrOrNet, ip2: str) -> bool:
172    ip2_list = [ipaddress.ip_network(ip2)]
173    if _ip_match(ip1, ip2_list, 'allow'):
174      return True
175    return False
class ManagedZone(gcpdiag.models.Resource):
178class ManagedZone(models.Resource):
179  """
180  Represent a DNS zone (public or private
181
182  https://cloud.google.com/dns/docs/reference/v1beta2/managedZones
183  """
184  _resource_data: dict
185
186  def __init__(self, project_id, resource_data):
187    super().__init__(project_id=project_id)
188    self._resource_data = resource_data
189
190  @property
191  def cloud_logging_config(self) -> bool:
192    return self._resource_data['cloudLoggingConfig'].get('enableLogging', False)
193
194  @property
195  def is_public(self) -> bool:
196    return self._resource_data['visibility'] == 'public'
197
198  @property
199  def vpc_attached(self) -> bool:
200    if 'privateVisibilityConfig' not in self._resource_data:
201      self._resource_data['privateVisibilityConfig'] = {}
202
203    return (self._resource_data['privateVisibilityConfig'].get(
204        'networks', False) or
205            self._resource_data['privateVisibilityConfig'].get(
206                'gkeClusters', False))
207
208  @property
209  def dnssec_config_state(self) -> bool:
210    if 'dnssecConfig' not in self._resource_data:
211      self._resource_data['dnssecConfig'] = {}
212
213    return self._resource_data['dnssecConfig'].get('state', False)
214
215  @property
216  def name(self) -> str:
217    return self._resource_data['name']
218
219  @property
220  def full_path(self) -> str:
221    result = re.match(r'https://dns.googleapis.com/dns/v1beta2/(.*)',
222                      self.self_link)
223    if result:
224      return result.group(1)
225    else:
226      return f'>> {self.self_link}'
227
228  @property
229  def short_path(self) -> str:
230    path = self.project_id + '/' + self.name
231    return path
232
233  @property
234  def self_link(self) -> str:
235    return self._resource_data.get('selfLink', '')
ManagedZone(project_id, resource_data)
186  def __init__(self, project_id, resource_data):
187    super().__init__(project_id=project_id)
188    self._resource_data = resource_data
cloud_logging_config: bool
190  @property
191  def cloud_logging_config(self) -> bool:
192    return self._resource_data['cloudLoggingConfig'].get('enableLogging', False)
is_public: bool
194  @property
195  def is_public(self) -> bool:
196    return self._resource_data['visibility'] == 'public'
vpc_attached: bool
198  @property
199  def vpc_attached(self) -> bool:
200    if 'privateVisibilityConfig' not in self._resource_data:
201      self._resource_data['privateVisibilityConfig'] = {}
202
203    return (self._resource_data['privateVisibilityConfig'].get(
204        'networks', False) or
205            self._resource_data['privateVisibilityConfig'].get(
206                'gkeClusters', False))
dnssec_config_state: bool
208  @property
209  def dnssec_config_state(self) -> bool:
210    if 'dnssecConfig' not in self._resource_data:
211      self._resource_data['dnssecConfig'] = {}
212
213    return self._resource_data['dnssecConfig'].get('state', False)
name: str
215  @property
216  def name(self) -> str:
217    return self._resource_data['name']
full_path: str
219  @property
220  def full_path(self) -> str:
221    result = re.match(r'https://dns.googleapis.com/dns/v1beta2/(.*)',
222                      self.self_link)
223    if result:
224      return result.group(1)
225    else:
226      return f'>> {self.self_link}'

Returns the full path of this resource.

Example: 'projects/gcpdiag-gke-1-9b90/zones/europe-west4-a/clusters/gke1'

short_path: str
228  @property
229  def short_path(self) -> str:
230    path = self.project_id + '/' + self.name
231    return path

Returns the short name for this resource.

Note that it isn't clear from this name what kind of resource it is.

Example: 'gke1'

class Router(gcpdiag.models.Resource):
238class Router(models.Resource):
239  """A VPC Router."""
240
241  _resource_data: dict
242
243  def __init__(self, project_id, resource_data):
244    super().__init__(project_id=project_id)
245    self._resource_data = resource_data
246    self._nats = None
247
248  @property
249  def full_path(self) -> str:
250    result = re.match(r'https://www.googleapis.com/compute/v1/(.*)',
251                      self.self_link)
252    if result:
253      return result.group(1)
254    else:
255      return f'>> {self.self_link}'
256
257  @property
258  def short_path(self) -> str:
259    path = self.project_id + '/' + self.name
260    return path
261
262  @property
263  def name(self) -> str:
264    return self._resource_data['name']
265
266  @property
267  def self_link(self) -> str:
268    return self._resource_data.get('selfLink', '')
269
270  @property
271  def nats(self):
272    return self._resource_data.get('nats', [])
273
274  def get_nat_ip_allocate_option(self, nat_gateway) -> str:
275    nats = self._resource_data.get('nats', [])
276    nat = [n for n in nats if n['name'] == nat_gateway]
277    return nat[0].get('natIpAllocateOption', '')
278
279  def get_enable_dynamic_port_allocation(self, nat_gateway) -> str:
280    nats = self._resource_data.get('nats', [])
281    nat = [n for n in nats if n['name'] == nat_gateway]
282    return nat[0].get('enableDynamicPortAllocation', '')
283
284  def subnet_has_nat(self, subnetwork):
285    if not self._resource_data.get('nats', []):
286      return False
287    for n in self._resource_data.get('nats', []):
288      if n['sourceSubnetworkIpRangesToNat'] == 'LIST_OF_SUBNETWORKS':
289        # Cloud NAT configure for specific subnets
290        if 'subnetworks' in n and subnetwork.self_link in [
291            s['name'] for s in n['subnetworks']
292        ]:
293          return True
294      else:
295        # Cloud NAT configured for all subnets
296        return True
297    return False

A VPC Router.

Router(project_id, resource_data)
243  def __init__(self, project_id, resource_data):
244    super().__init__(project_id=project_id)
245    self._resource_data = resource_data
246    self._nats = None
full_path: str
248  @property
249  def full_path(self) -> str:
250    result = re.match(r'https://www.googleapis.com/compute/v1/(.*)',
251                      self.self_link)
252    if result:
253      return result.group(1)
254    else:
255      return f'>> {self.self_link}'

Returns the full path of this resource.

Example: 'projects/gcpdiag-gke-1-9b90/zones/europe-west4-a/clusters/gke1'

short_path: str
257  @property
258  def short_path(self) -> str:
259    path = self.project_id + '/' + self.name
260    return path

Returns the short name for this resource.

Note that it isn't clear from this name what kind of resource it is.

Example: 'gke1'

name: str
262  @property
263  def name(self) -> str:
264    return self._resource_data['name']
nats
270  @property
271  def nats(self):
272    return self._resource_data.get('nats', [])
def get_nat_ip_allocate_option(self, nat_gateway) -> str:
274  def get_nat_ip_allocate_option(self, nat_gateway) -> str:
275    nats = self._resource_data.get('nats', [])
276    nat = [n for n in nats if n['name'] == nat_gateway]
277    return nat[0].get('natIpAllocateOption', '')
def get_enable_dynamic_port_allocation(self, nat_gateway) -> str:
279  def get_enable_dynamic_port_allocation(self, nat_gateway) -> str:
280    nats = self._resource_data.get('nats', [])
281    nat = [n for n in nats if n['name'] == nat_gateway]
282    return nat[0].get('enableDynamicPortAllocation', '')
def subnet_has_nat(self, subnetwork):
284  def subnet_has_nat(self, subnetwork):
285    if not self._resource_data.get('nats', []):
286      return False
287    for n in self._resource_data.get('nats', []):
288      if n['sourceSubnetworkIpRangesToNat'] == 'LIST_OF_SUBNETWORKS':
289        # Cloud NAT configure for specific subnets
290        if 'subnetworks' in n and subnetwork.self_link in [
291            s['name'] for s in n['subnetworks']
292        ]:
293          return True
294      else:
295        # Cloud NAT configured for all subnets
296        return True
297    return False
class RouterStatus(gcpdiag.models.Resource):
300class RouterStatus(models.Resource):
301  """NAT Router Status"""
302
303  _resource_data: dict
304
305  def __init__(self, project_id, resource_data):
306    super().__init__(project_id=project_id)
307    self._resource_data = resource_data
308
309  @property
310  def full_path(self) -> str:
311    result = re.match(r'https://www.googleapis.com/compute/v1/(.*)',
312                      self.self_link)
313    if result:
314      return result.group(1)
315    else:
316      return f'>> {self.self_link}'
317
318  @property
319  def short_path(self) -> str:
320    path = self.project_id + '/' + self.name
321    return path
322
323  @property
324  def name(self) -> str:
325    return self._resource_data.get('name', '')
326
327  @property
328  def self_link(self) -> str:
329    return self._resource_data.get('selfLink', '')
330
331  @property
332  def min_extra_nat_ips_needed(self) -> str:
333    nat_status = self._resource_data.get('result', {}).get('natStatus', {})
334    return nat_status[0].get('minExtraNatIpsNeeded', None)
335
336  @property
337  def num_vms_with_nat_mappings(self) -> str:
338    nat_status = self._resource_data.get('result', {}).get('natStatus', {})
339    return nat_status[0].get('numVmEndpointsWithNatMappings', None)

NAT Router Status

RouterStatus(project_id, resource_data)
305  def __init__(self, project_id, resource_data):
306    super().__init__(project_id=project_id)
307    self._resource_data = resource_data
full_path: str
309  @property
310  def full_path(self) -> str:
311    result = re.match(r'https://www.googleapis.com/compute/v1/(.*)',
312                      self.self_link)
313    if result:
314      return result.group(1)
315    else:
316      return f'>> {self.self_link}'

Returns the full path of this resource.

Example: 'projects/gcpdiag-gke-1-9b90/zones/europe-west4-a/clusters/gke1'

short_path: str
318  @property
319  def short_path(self) -> str:
320    path = self.project_id + '/' + self.name
321    return path

Returns the short name for this resource.

Note that it isn't clear from this name what kind of resource it is.

Example: 'gke1'

name: str
323  @property
324  def name(self) -> str:
325    return self._resource_data.get('name', '')
min_extra_nat_ips_needed: str
331  @property
332  def min_extra_nat_ips_needed(self) -> str:
333    nat_status = self._resource_data.get('result', {}).get('natStatus', {})
334    return nat_status[0].get('minExtraNatIpsNeeded', None)
num_vms_with_nat_mappings: str
336  @property
337  def num_vms_with_nat_mappings(self) -> str:
338    nat_status = self._resource_data.get('result', {}).get('natStatus', {})
339    return nat_status[0].get('numVmEndpointsWithNatMappings', None)
class RouterNatIpInfo(gcpdiag.models.Resource):
342class RouterNatIpInfo(models.Resource):
343  """NAT IP Info"""
344
345  _resource_data: dict
346
347  def __init__(self, project_id, resource_data):
348    super().__init__(project_id=project_id)
349    self._resource_data = resource_data
350
351  @property
352  def full_path(self) -> str:
353    result = re.match(r'https://www.googleapis.com/compute/v1/(.*)',
354                      self.self_link)
355    if result:
356      return result.group(1)
357    else:
358      return f'>> {self.self_link}'
359
360  @property
361  def short_path(self) -> str:
362    path = self.project_id + '/' + self.name
363    return path
364
365  @property
366  def self_link(self) -> str:
367    return self._resource_data.get('selfLink', '')
368
369  @property
370  def name(self) -> str:
371    return self._resource_data.get('name', '')
372
373  @property
374  def result(self) -> str:
375    return self._resource_data.get('result', [])

NAT IP Info

RouterNatIpInfo(project_id, resource_data)
347  def __init__(self, project_id, resource_data):
348    super().__init__(project_id=project_id)
349    self._resource_data = resource_data
full_path: str
351  @property
352  def full_path(self) -> str:
353    result = re.match(r'https://www.googleapis.com/compute/v1/(.*)',
354                      self.self_link)
355    if result:
356      return result.group(1)
357    else:
358      return f'>> {self.self_link}'

Returns the full path of this resource.

Example: 'projects/gcpdiag-gke-1-9b90/zones/europe-west4-a/clusters/gke1'

short_path: str
360  @property
361  def short_path(self) -> str:
362    path = self.project_id + '/' + self.name
363    return path

Returns the short name for this resource.

Note that it isn't clear from this name what kind of resource it is.

Example: 'gke1'

name: str
369  @property
370  def name(self) -> str:
371    return self._resource_data.get('name', '')
result: str
373  @property
374  def result(self) -> str:
375    return self._resource_data.get('result', [])
@dataclasses.dataclass
class Peering:
378@dataclasses.dataclass
379class Peering:
380  """VPC Peerings"""
381
382  name: str
383  url: str
384  state: str
385  exports_custom_routes: bool
386  imports_custom_routes: bool
387  auto_creates_routes: bool
388
389  def __str__(self):
390    return self.name

VPC Peerings

Peering( name: str, url: str, state: str, exports_custom_routes: bool, imports_custom_routes: bool, auto_creates_routes: bool)
name: str
url: str
state: str
exports_custom_routes: bool
imports_custom_routes: bool
auto_creates_routes: bool
class Network(gcpdiag.models.Resource):
393class Network(models.Resource):
394  """A VPC network."""
395  _resource_data: dict
396  _subnetworks: Optional[Dict[str, Subnetwork]]
397
398  def __init__(self, project_id, resource_data):
399    super().__init__(project_id=project_id)
400    self._resource_data = resource_data
401    self._subnetworks = None
402
403  @property
404  def full_path(self) -> str:
405    result = re.match(r'https://www.googleapis.com/compute/v1/(.*)',
406                      self.self_link)
407    if result:
408      return result.group(1)
409    else:
410      return f'>> {self.self_link}'
411
412  @property
413  def short_path(self) -> str:
414    path = self.project_id + '/' + self.name
415    return path
416
417  @property
418  def name(self) -> str:
419    return self._resource_data['name']
420
421  @property
422  def self_link(self) -> str:
423    return self._resource_data.get('selfLink', '')
424
425  @property
426  def firewall(self) -> 'EffectiveFirewalls':
427    return _get_effective_firewalls(self)
428
429  @property
430  def subnetworks(self) -> Dict[str, Subnetwork]:
431    return _batch_get_subnetworks(
432        self._project_id, frozenset(self._resource_data.get('subnetworks', [])))
433
434  @property
435  def peerings(self) -> List[Peering]:
436    return [
437        Peering(peer['name'], peer['network'], peer['state'],
438                peer['exportCustomRoutes'], peer['importCustomRoutes'],
439                peer['autoCreateRoutes'])
440        for peer in self._resource_data.get('peerings', [])
441    ]

A VPC network.

Network(project_id, resource_data)
398  def __init__(self, project_id, resource_data):
399    super().__init__(project_id=project_id)
400    self._resource_data = resource_data
401    self._subnetworks = None
full_path: str
403  @property
404  def full_path(self) -> str:
405    result = re.match(r'https://www.googleapis.com/compute/v1/(.*)',
406                      self.self_link)
407    if result:
408      return result.group(1)
409    else:
410      return f'>> {self.self_link}'

Returns the full path of this resource.

Example: 'projects/gcpdiag-gke-1-9b90/zones/europe-west4-a/clusters/gke1'

short_path: str
412  @property
413  def short_path(self) -> str:
414    path = self.project_id + '/' + self.name
415    return path

Returns the short name for this resource.

Note that it isn't clear from this name what kind of resource it is.

Example: 'gke1'

name: str
417  @property
418  def name(self) -> str:
419    return self._resource_data['name']
firewall: EffectiveFirewalls
425  @property
426  def firewall(self) -> 'EffectiveFirewalls':
427    return _get_effective_firewalls(self)
subnetworks: Dict[str, Subnetwork]
429  @property
430  def subnetworks(self) -> Dict[str, Subnetwork]:
431    return _batch_get_subnetworks(
432        self._project_id, frozenset(self._resource_data.get('subnetworks', [])))
peerings: List[Peering]
434  @property
435  def peerings(self) -> List[Peering]:
436    return [
437        Peering(peer['name'], peer['network'], peer['state'],
438                peer['exportCustomRoutes'], peer['importCustomRoutes'],
439                peer['autoCreateRoutes'])
440        for peer in self._resource_data.get('peerings', [])
441    ]
@dataclasses.dataclass
class FirewallCheckResult:
549@dataclasses.dataclass
550class FirewallCheckResult:
551  """The result of a firewall connectivity check."""
552
553  action: str
554  firewall_policy_name: Optional[str] = None
555  firewall_policy_rule_description: Optional[str] = None
556  vpc_firewall_rule_id: Optional[str] = None
557  vpc_firewall_rule_name: Optional[str] = None
558
559  def __str__(self):
560    return self.action
561
562  @property
563  def matched_by_str(self):
564    if self.firewall_policy_name:
565      if self.firewall_policy_rule_description:
566        return f'policy: {self.firewall_policy_name}, rule: {self.firewall_policy_rule_description}'
567      else:
568        return f'policy: {self.firewall_policy_name}'
569    elif self.vpc_firewall_rule_name:
570      return f'vpc firewall rule: {self.vpc_firewall_rule_name}'

The result of a firewall connectivity check.

FirewallCheckResult( action: str, firewall_policy_name: Optional[str] = None, firewall_policy_rule_description: Optional[str] = None, vpc_firewall_rule_id: Optional[str] = None, vpc_firewall_rule_name: Optional[str] = None)
action: str
firewall_policy_name: Optional[str] = None
firewall_policy_rule_description: Optional[str] = None
vpc_firewall_rule_id: Optional[str] = None
vpc_firewall_rule_name: Optional[str] = None
matched_by_str
562  @property
563  def matched_by_str(self):
564    if self.firewall_policy_name:
565      if self.firewall_policy_rule_description:
566        return f'policy: {self.firewall_policy_name}, rule: {self.firewall_policy_rule_description}'
567      else:
568        return f'policy: {self.firewall_policy_name}'
569    elif self.vpc_firewall_rule_name:
570      return f'vpc firewall rule: {self.vpc_firewall_rule_name}'
class FirewallRuleNotFoundError(builtins.Exception):
573class FirewallRuleNotFoundError(Exception):
574  rule_name: str
575
576  def __init__(self, name, disabled=False):
577    # Call the base class constructor with the parameters it needs
578    super().__init__(f'firewall rule not found: {name}')
579    self.rule_name = name
580    self.disabled = disabled

Common base class for all non-exit exceptions.

FirewallRuleNotFoundError(name, disabled=False)
576  def __init__(self, name, disabled=False):
577    # Call the base class constructor with the parameters it needs
578    super().__init__(f'firewall rule not found: {name}')
579    self.rule_name = name
580    self.disabled = disabled
rule_name: str
disabled
class VpcFirewallRule:
583class VpcFirewallRule:
584  """Represents firewall rule"""
585
586  def __init__(self, resource_data):
587    self._resource_data = resource_data
588
589  @property
590  def name(self) -> str:
591    return self._resource_data['name']
592
593  @property
594  def source_ranges(self) -> List[ipaddress.IPv4Network]:
595    return self._resource_data['sourceRanges']
596
597  @property
598  def target_tags(self) -> set:
599    return self._resource_data['targetTags']
600
601  @property
602  def allowed(self) -> List[dict]:
603    return self._resource_data['allowed']
604
605  def is_enabled(self) -> bool:
606    return not self._resource_data['disabled']

Represents firewall rule

VpcFirewallRule(resource_data)
586  def __init__(self, resource_data):
587    self._resource_data = resource_data
name: str
589  @property
590  def name(self) -> str:
591    return self._resource_data['name']
source_ranges: List[ipaddress.IPv4Network]
593  @property
594  def source_ranges(self) -> List[ipaddress.IPv4Network]:
595    return self._resource_data['sourceRanges']
target_tags: set
597  @property
598  def target_tags(self) -> set:
599    return self._resource_data['targetTags']
allowed: List[dict]
601  @property
602  def allowed(self) -> List[dict]:
603    return self._resource_data['allowed']
def is_enabled(self) -> bool:
605  def is_enabled(self) -> bool:
606    return not self._resource_data['disabled']
class EffectiveFirewalls:
 958class EffectiveFirewalls:
 959  """Effective firewall rules for a VPC network or Instance.
 960
 961  Includes org/folder firewall policies)."""
 962  _resource_data: dict
 963  _policies: List[_FirewallPolicy]
 964  _vpc_firewall: _VpcFirewall
 965
 966  def __init__(self, resource_data):
 967    self._resource_data = resource_data
 968    self._policies = []
 969    if 'firewallPolicys' in resource_data:
 970      for policy in resource_data['firewallPolicys']:
 971        self._policies.append(_FirewallPolicy(policy))
 972    self._vpc_firewall = _VpcFirewall(resource_data.get('firewalls', {}))
 973
 974  def check_connectivity_ingress(
 975      self,  #
 976      *,
 977      src_ip: IPAddrOrNet,
 978      ip_protocol: str,
 979      port: Optional[int] = None,
 980      source_service_account: Optional[str] = None,
 981      source_tags: Optional[List[str]] = None,
 982      target_service_account: Optional[str] = None,
 983      target_tags: Optional[List[str]] = None) -> FirewallCheckResult:
 984
 985    if ip_protocol != 'ICMP' and port is None:
 986      raise ValueError('TCP and UDP must have port numbers')
 987
 988    # Firewall policies (organization, folders)
 989    for p in self._policies:
 990      result = p.check_connectivity_ingress(
 991          src_ip=src_ip,
 992          ip_protocol=ip_protocol,
 993          port=port,
 994          #target_network=self._network,
 995          target_service_account=target_service_account)
 996      if result.action != 'goto_next':
 997        return result
 998
 999    # VPC firewall rules
1000    return self._vpc_firewall.check_connectivity_ingress(
1001        src_ip=src_ip,
1002        ip_protocol=ip_protocol,
1003        port=port,
1004        source_service_account=source_service_account,
1005        source_tags=source_tags,
1006        target_service_account=target_service_account,
1007        target_tags=target_tags)
1008
1009  def check_connectivity_egress(
1010      self,  #
1011      *,
1012      src_ip: IPAddrOrNet,
1013      ip_protocol: str,
1014      port: Optional[int] = None,
1015      source_service_account: Optional[str] = None,
1016      source_tags: Optional[List[str]] = None,
1017      target_service_account: Optional[str] = None,
1018      target_tags: Optional[List[str]] = None) -> FirewallCheckResult:
1019
1020    if ip_protocol != 'ICMP' and port is None:
1021      raise ValueError('TCP and UDP must have port numbers')
1022
1023    # Firewall policies (organization, folders)
1024    for p in self._policies:
1025      result = p.check_connectivity_egress(
1026          src_ip=src_ip,
1027          ip_protocol=ip_protocol,
1028          port=port,
1029          #target_network=self._network,
1030          target_service_account=target_service_account)
1031      if result.action != 'goto_next':
1032        return result
1033
1034    # VPC firewall rules
1035    return self._vpc_firewall.check_connectivity_egress(
1036        src_ip=src_ip,
1037        ip_protocol=ip_protocol,
1038        port=port,
1039        source_service_account=source_service_account,
1040        source_tags=source_tags,
1041        target_service_account=target_service_account,
1042        target_tags=target_tags)
1043
1044  def get_vpc_ingress_rules(
1045      self,
1046      name: Optional[str] = None,
1047      name_pattern: Optional[re.Pattern] = None,
1048      target_tags: Optional[List[str]] = None) -> List[VpcFirewallRule]:
1049    """Retrieve the list of ingress firewall rules matching name or name pattern and target tags.
1050
1051    Args:
1052        name (Optional[str], optional): firewall rune name. Defaults to None.
1053        name_pattern (Optional[re.Pattern], optional): firewall rule name pattern. Defaults to None.
1054        target_tags (Optional[List[str]], optional): firewall target tags
1055          (if not specified any tag will match). Defaults to None.
1056
1057    Returns:
1058        List[VpcFirewallRule]: List of ingress firewall rules
1059    """
1060    rules = self._vpc_firewall.get_vpc_ingress_rules(name, name_pattern,
1061                                                     target_tags)
1062    return rules
1063
1064  def get_vpc_egress_rules(
1065      self,
1066      name: Optional[str] = None,
1067      name_pattern: Optional[re.Pattern] = None,
1068      target_tags: Optional[List[str]] = None) -> List[VpcFirewallRule]:
1069    """Retrieve the list of egress firewall rules matching name or name pattern and target tags.
1070
1071    Args:
1072        name (Optional[str], optional): firewall rune name. Defaults to None.
1073        name_pattern (Optional[re.Pattern], optional): firewall rule name pattern. Defaults to None.
1074        target_tags (Optional[List[str]], optional): firewall target tags
1075          (if not specified any tag will match). Defaults to None.
1076
1077    Returns:
1078        List[VpcFirewallRule]: List of egress firewall rules
1079    """
1080    rules = self._vpc_firewall.get_vpc_egress_rules(name, name_pattern,
1081                                                    target_tags)
1082    return rules
1083
1084  def verify_ingress_rule_exists(self, name: str):
1085    """Verify that a certain VPC rule exists. This is useful to verify
1086    whether maybe a permission was missing on a shared VPC and an
1087    automatic rule couldn't be created."""
1088    return self._vpc_firewall.verify_ingress_rule_exists(name)
1089
1090  def verify_egress_rule_exists(self, name: str):
1091    """Verify that a certain VPC rule exists. This is useful to verify
1092    whether maybe a permission was missing on a shared VPC and an
1093    automatic rule couldn't be created."""
1094    return self._vpc_firewall.verify_egress_rule_exists(name)

Effective firewall rules for a VPC network or Instance.

Includes org/folder firewall policies).

EffectiveFirewalls(resource_data)
966  def __init__(self, resource_data):
967    self._resource_data = resource_data
968    self._policies = []
969    if 'firewallPolicys' in resource_data:
970      for policy in resource_data['firewallPolicys']:
971        self._policies.append(_FirewallPolicy(policy))
972    self._vpc_firewall = _VpcFirewall(resource_data.get('firewalls', {}))
def check_connectivity_ingress( self, *, src_ip: Union[ipaddress.IPv4Address, ipaddress.IPv6Address, ipaddress.IPv4Network, ipaddress.IPv6Network], ip_protocol: str, port: Optional[int] = None, source_service_account: Optional[str] = None, source_tags: Optional[List[str]] = None, target_service_account: Optional[str] = None, target_tags: Optional[List[str]] = None) -> FirewallCheckResult:
 974  def check_connectivity_ingress(
 975      self,  #
 976      *,
 977      src_ip: IPAddrOrNet,
 978      ip_protocol: str,
 979      port: Optional[int] = None,
 980      source_service_account: Optional[str] = None,
 981      source_tags: Optional[List[str]] = None,
 982      target_service_account: Optional[str] = None,
 983      target_tags: Optional[List[str]] = None) -> FirewallCheckResult:
 984
 985    if ip_protocol != 'ICMP' and port is None:
 986      raise ValueError('TCP and UDP must have port numbers')
 987
 988    # Firewall policies (organization, folders)
 989    for p in self._policies:
 990      result = p.check_connectivity_ingress(
 991          src_ip=src_ip,
 992          ip_protocol=ip_protocol,
 993          port=port,
 994          #target_network=self._network,
 995          target_service_account=target_service_account)
 996      if result.action != 'goto_next':
 997        return result
 998
 999    # VPC firewall rules
1000    return self._vpc_firewall.check_connectivity_ingress(
1001        src_ip=src_ip,
1002        ip_protocol=ip_protocol,
1003        port=port,
1004        source_service_account=source_service_account,
1005        source_tags=source_tags,
1006        target_service_account=target_service_account,
1007        target_tags=target_tags)
def check_connectivity_egress( self, *, src_ip: Union[ipaddress.IPv4Address, ipaddress.IPv6Address, ipaddress.IPv4Network, ipaddress.IPv6Network], ip_protocol: str, port: Optional[int] = None, source_service_account: Optional[str] = None, source_tags: Optional[List[str]] = None, target_service_account: Optional[str] = None, target_tags: Optional[List[str]] = None) -> FirewallCheckResult:
1009  def check_connectivity_egress(
1010      self,  #
1011      *,
1012      src_ip: IPAddrOrNet,
1013      ip_protocol: str,
1014      port: Optional[int] = None,
1015      source_service_account: Optional[str] = None,
1016      source_tags: Optional[List[str]] = None,
1017      target_service_account: Optional[str] = None,
1018      target_tags: Optional[List[str]] = None) -> FirewallCheckResult:
1019
1020    if ip_protocol != 'ICMP' and port is None:
1021      raise ValueError('TCP and UDP must have port numbers')
1022
1023    # Firewall policies (organization, folders)
1024    for p in self._policies:
1025      result = p.check_connectivity_egress(
1026          src_ip=src_ip,
1027          ip_protocol=ip_protocol,
1028          port=port,
1029          #target_network=self._network,
1030          target_service_account=target_service_account)
1031      if result.action != 'goto_next':
1032        return result
1033
1034    # VPC firewall rules
1035    return self._vpc_firewall.check_connectivity_egress(
1036        src_ip=src_ip,
1037        ip_protocol=ip_protocol,
1038        port=port,
1039        source_service_account=source_service_account,
1040        source_tags=source_tags,
1041        target_service_account=target_service_account,
1042        target_tags=target_tags)
def get_vpc_ingress_rules( self, name: Optional[str] = None, name_pattern: Optional[re.Pattern] = None, target_tags: Optional[List[str]] = None) -> List[VpcFirewallRule]:
1044  def get_vpc_ingress_rules(
1045      self,
1046      name: Optional[str] = None,
1047      name_pattern: Optional[re.Pattern] = None,
1048      target_tags: Optional[List[str]] = None) -> List[VpcFirewallRule]:
1049    """Retrieve the list of ingress firewall rules matching name or name pattern and target tags.
1050
1051    Args:
1052        name (Optional[str], optional): firewall rune name. Defaults to None.
1053        name_pattern (Optional[re.Pattern], optional): firewall rule name pattern. Defaults to None.
1054        target_tags (Optional[List[str]], optional): firewall target tags
1055          (if not specified any tag will match). Defaults to None.
1056
1057    Returns:
1058        List[VpcFirewallRule]: List of ingress firewall rules
1059    """
1060    rules = self._vpc_firewall.get_vpc_ingress_rules(name, name_pattern,
1061                                                     target_tags)
1062    return rules

Retrieve the list of ingress firewall rules matching name or name pattern and target tags.

Arguments:
  • name (Optional[str], optional): firewall rune name. Defaults to None.
  • name_pattern (Optional[re.Pattern], optional): firewall rule name pattern. Defaults to None.
  • target_tags (Optional[List[str]], optional): firewall target tags (if not specified any tag will match). Defaults to None.
Returns:

List[VpcFirewallRule]: List of ingress firewall rules

def get_vpc_egress_rules( self, name: Optional[str] = None, name_pattern: Optional[re.Pattern] = None, target_tags: Optional[List[str]] = None) -> List[VpcFirewallRule]:
1064  def get_vpc_egress_rules(
1065      self,
1066      name: Optional[str] = None,
1067      name_pattern: Optional[re.Pattern] = None,
1068      target_tags: Optional[List[str]] = None) -> List[VpcFirewallRule]:
1069    """Retrieve the list of egress firewall rules matching name or name pattern and target tags.
1070
1071    Args:
1072        name (Optional[str], optional): firewall rune name. Defaults to None.
1073        name_pattern (Optional[re.Pattern], optional): firewall rule name pattern. Defaults to None.
1074        target_tags (Optional[List[str]], optional): firewall target tags
1075          (if not specified any tag will match). Defaults to None.
1076
1077    Returns:
1078        List[VpcFirewallRule]: List of egress firewall rules
1079    """
1080    rules = self._vpc_firewall.get_vpc_egress_rules(name, name_pattern,
1081                                                    target_tags)
1082    return rules

Retrieve the list of egress firewall rules matching name or name pattern and target tags.

Arguments:
  • name (Optional[str], optional): firewall rune name. Defaults to None.
  • name_pattern (Optional[re.Pattern], optional): firewall rule name pattern. Defaults to None.
  • target_tags (Optional[List[str]], optional): firewall target tags (if not specified any tag will match). Defaults to None.
Returns:

List[VpcFirewallRule]: List of egress firewall rules

def verify_ingress_rule_exists(self, name: str):
1084  def verify_ingress_rule_exists(self, name: str):
1085    """Verify that a certain VPC rule exists. This is useful to verify
1086    whether maybe a permission was missing on a shared VPC and an
1087    automatic rule couldn't be created."""
1088    return self._vpc_firewall.verify_ingress_rule_exists(name)

Verify that a certain VPC rule exists. This is useful to verify whether maybe a permission was missing on a shared VPC and an automatic rule couldn't be created.

def verify_egress_rule_exists(self, name: str):
1090  def verify_egress_rule_exists(self, name: str):
1091    """Verify that a certain VPC rule exists. This is useful to verify
1092    whether maybe a permission was missing on a shared VPC and an
1093    automatic rule couldn't be created."""
1094    return self._vpc_firewall.verify_egress_rule_exists(name)

Verify that a certain VPC rule exists. This is useful to verify whether maybe a permission was missing on a shared VPC and an automatic rule couldn't be created.

class VPCEffectiveFirewalls(EffectiveFirewalls):
1097class VPCEffectiveFirewalls(EffectiveFirewalls):
1098  """Effective firewall rules for a VPC network.
1099
1100  Includes org/folder firewall policies)."""
1101  _network: Network
1102
1103  def __init__(self, network, resource_data):
1104    super().__init__(resource_data)
1105    self._network = network

Effective firewall rules for a VPC network.

Includes org/folder firewall policies).

VPCEffectiveFirewalls(network, resource_data)
1103  def __init__(self, network, resource_data):
1104    super().__init__(resource_data)
1105    self._network = network
@caching.cached_api_call(in_memory=True)
def get_network(project_id: str, network_name: str) -> Network:
1117@caching.cached_api_call(in_memory=True)
1118def get_network(project_id: str, network_name: str) -> Network:
1119  logging.info('fetching network: %s/%s', project_id, network_name)
1120  compute = apis.get_api('compute', 'v1', project_id)
1121  request = compute.networks().get(project=project_id, network=network_name)
1122  response = request.execute(num_retries=config.API_RETRIES)
1123  return Network(project_id, response)
def get_subnetwork_from_url(url: str) -> Subnetwork:
1126def get_subnetwork_from_url(url: str) -> Subnetwork:
1127  """Returns Subnetwork object given subnetwork url"""
1128  m = re.match((r'https://www.googleapis.com/compute/v1/projects/'
1129                r'([^/]+)/regions/([^/]+)/subnetworks/([^/]+)$'), url)
1130  if not m:
1131    raise ValueError(f"can't parse network url: {url}")
1132  (project_id, region, subnetwork_name) = (m.group(1), m.group(2), m.group(3))
1133  return get_subnetwork(project_id, region, subnetwork_name)

Returns Subnetwork object given subnetwork url

def get_network_from_url(url: str) -> Network:
1136def get_network_from_url(url: str) -> Network:
1137  m = re.match(
1138      r'https://www.googleapis.com/compute/v1/projects/([^/]+)/global/networks/([^/]+)',
1139      url)
1140  if not m:
1141    raise ValueError(f"can't parse network url: {url}")
1142  (project_id, network_name) = (m.group(1), m.group(2))
1143  return get_network(project_id, network_name)
@caching.cached_api_call(in_memory=True)
def get_networks(project_id: str) -> List[Network]:
1146@caching.cached_api_call(in_memory=True)
1147def get_networks(project_id: str) -> List[Network]:
1148  logging.info('fetching network: %s', project_id)
1149  compute = apis.get_api('compute', 'v1', project_id)
1150  request = compute.networks().list(project=project_id)
1151  response = request.execute(num_retries=config.API_RETRIES)
1152  return [Network(project_id, item) for item in response.get('items', [])]
@caching.cached_api_call(in_memory=True)
def get_subnetwork( project_id: str, region: str, subnetwork_name: str) -> Subnetwork:
1155@caching.cached_api_call(in_memory=True)
1156def get_subnetwork(project_id: str, region: str,
1157                   subnetwork_name: str) -> Subnetwork:
1158  logging.info('fetching network: %s/%s', project_id, subnetwork_name)
1159  compute = apis.get_api('compute', 'v1', project_id)
1160  request = compute.subnetworks().get(project=project_id,
1161                                      region=region,
1162                                      subnetwork=subnetwork_name)
1163  response = request.execute(num_retries=config.API_RETRIES)
1164  return Subnetwork(project_id, response)
@caching.cached_api_call(in_memory=True)
def get_routes(project_id: str) -> List[Route]:
1193@caching.cached_api_call(in_memory=True)
1194def get_routes(project_id: str) -> List[Route]:
1195  logging.info('fetching routes: %s', project_id)
1196  compute = apis.get_api('compute', 'v1', project_id)
1197  request = compute.routes().list(project=project_id)
1198  response = request.execute(num_retries=config.API_RETRIES)
1199  return [Route(project_id, item) for item in response.get('items', [])]
@caching.cached_api_call(in_memory=True)
def get_zones(project_id: str) -> List[ManagedZone]:
1202@caching.cached_api_call(in_memory=True)
1203def get_zones(project_id: str) -> List[ManagedZone]:
1204  logging.info('fetching DNS zones: %s', project_id)
1205  dns = apis.get_api('dns', 'v1beta2', project_id)
1206  request = dns.managedZones().list(project=project_id)
1207  response = request.execute(num_retries=config.API_RETRIES)
1208  zones = []
1209  for zone in response.get('managedZones', []):
1210    request2 = dns.managedZones().get(project=project_id,
1211                                      managedZone=zone['name'])
1212    response2 = request2.execute(num_retries=config.API_RETRIES)
1213    zones.append(ManagedZone(project_id, response2))
1214  return zones
@caching.cached_api_call(in_memory=True)
def get_routers( project_id: str, region: str, network) -> List[Router]:
1217@caching.cached_api_call(in_memory=True)
1218def get_routers(project_id: str, region: str, network) -> List[Router]:
1219  logging.info('fetching routers: %s/%s', project_id, region)
1220  compute = apis.get_api('compute', 'v1', project_id)
1221  request = compute.routers().list(project=project_id,
1222                                   region=region,
1223                                   filter=f'network="{network.self_link}"')
1224  response = request.execute(num_retries=config.API_RETRIES)
1225  return [Router(project_id, item) for item in response.get('items', [])]
@caching.cached_api_call(in_memory=True)
def get_router(project_id: str, region: str, network) -> Router:
1228@caching.cached_api_call(in_memory=True)
1229def get_router(project_id: str, region: str, network) -> Router:
1230  logging.info('fetching routers: %s/%s', project_id, region)
1231  compute = apis.get_api('compute', 'v1', project_id)
1232  request = compute.routers().list(project=project_id,
1233                                   region=region,
1234                                   filter=f'network="{network.self_link}"')
1235  response = request.execute(num_retries=config.API_RETRIES)
1236  return Router(project_id, next(iter(response.get('items', [{}]))))
@caching.cached_api_call(in_memory=True)
def nat_router_status( project_id: str, router_name: str, region: str) -> RouterStatus:
1239@caching.cached_api_call(in_memory=True)
1240def nat_router_status(project_id: str, router_name: str,
1241                      region: str) -> RouterStatus:
1242  logging.info('fetching router status: %s/%s in region %s', project_id,
1243               router_name, region)
1244  compute = apis.get_api('compute', 'v1', project_id)
1245  request = compute.routers().getRouterStatus(project=project_id,
1246                                              router=router_name,
1247                                              region=region)
1248  response = request.execute(num_retries=config.API_RETRIES)
1249  if 'result' in str(response):
1250    return RouterStatus(project_id, response)
1251  else:
1252    logging.info('unable to fetch router status: %s/%s in region %s',
1253                 project_id, router_name, region)
1254    return RouterStatus(project_id, {})
@caching.cached_api_call(in_memory=True)
def get_nat_ip_info( project_id: str, router_name: str, region: str) -> RouterNatIpInfo:
1257@caching.cached_api_call(in_memory=True)
1258def get_nat_ip_info(project_id: str, router_name: str,
1259                    region: str) -> RouterNatIpInfo:
1260  logging.info('fetching NAT IP info for router: %s/%s in region %s',
1261               project_id, router_name, region)
1262  compute = apis.get_api('compute', 'v1', project_id)
1263  request = compute.routers().getNatIpInfo(project=project_id,
1264                                           router=router_name,
1265                                           region=region)
1266  response = request.execute(num_retries=config.API_RETRIES)
1267  if 'result' in str(response):
1268    return RouterNatIpInfo(project_id, response)
1269  else:
1270    logging.info('unable to fetch Nat IP Info for router: %s/%s in region %s',
1271                 project_id, router_name, region)
1272    return RouterNatIpInfo(project_id, {})
class VPCSubnetworkIAMPolicy(gcpdiag.queries.iam.BaseIAMPolicy):
1275class VPCSubnetworkIAMPolicy(iam.BaseIAMPolicy):
1276
1277  def _is_resource_permission(self, permission):
1278    return True

Common class for IAM policies

@caching.cached_api_call(in_memory=True)
def get_subnetwork_iam_policy( project_id: str, region: str, subnetwork_name: str) -> VPCSubnetworkIAMPolicy:
1281@caching.cached_api_call(in_memory=True)
1282def get_subnetwork_iam_policy(project_id: str, region: str,
1283                              subnetwork_name: str) -> VPCSubnetworkIAMPolicy:
1284  resource_name = (f'projects/{project_id}/regions/{region}/'
1285                   f'subnetworks/{subnetwork_name}')
1286
1287  compute = apis.get_api('compute', 'v1', project_id)
1288  request = compute.subnetworks().getIamPolicy(project=project_id,
1289                                               region=region,
1290                                               resource=subnetwork_name)
1291
1292  return iam.fetch_iam_policy(request, VPCSubnetworkIAMPolicy, project_id,
1293                              resource_name)
class Address(gcpdiag.models.Resource):
1296class Address(models.Resource):
1297  """IP Addresses."""
1298  _resource_data: dict
1299
1300  def __init__(self, project_id, resource_data):
1301    super().__init__(project_id=project_id)
1302    self._resource_data = resource_data
1303
1304  @property
1305  def full_path(self) -> str:
1306    result = re.match(r'https://www.googleapis.com/compute/v1/(.*)',
1307                      self.self_link)
1308    if result:
1309      return result.group(1)
1310    else:
1311      return f'>> {self.self_link}'
1312
1313  @property
1314  def short_path(self) -> str:
1315    path = self.project_id + '/' + self.name
1316    return path
1317
1318  @property
1319  def name(self) -> str:
1320    return self._resource_data['name']
1321
1322  @property
1323  def self_link(self) -> str:
1324    return self._resource_data.get('selfLink', '')
1325
1326  @property
1327  def subnetwork(self) -> str:
1328    return self._resource_data['subnetwork']
1329
1330  @property
1331  def status(self) -> str:
1332    return self._resource_data['status']

IP Addresses.

Address(project_id, resource_data)
1300  def __init__(self, project_id, resource_data):
1301    super().__init__(project_id=project_id)
1302    self._resource_data = resource_data
full_path: str
1304  @property
1305  def full_path(self) -> str:
1306    result = re.match(r'https://www.googleapis.com/compute/v1/(.*)',
1307                      self.self_link)
1308    if result:
1309      return result.group(1)
1310    else:
1311      return f'>> {self.self_link}'

Returns the full path of this resource.

Example: 'projects/gcpdiag-gke-1-9b90/zones/europe-west4-a/clusters/gke1'

short_path: str
1313  @property
1314  def short_path(self) -> str:
1315    path = self.project_id + '/' + self.name
1316    return path

Returns the short name for this resource.

Note that it isn't clear from this name what kind of resource it is.

Example: 'gke1'

name: str
1318  @property
1319  def name(self) -> str:
1320    return self._resource_data['name']
subnetwork: str
1326  @property
1327  def subnetwork(self) -> str:
1328    return self._resource_data['subnetwork']
status: str
1330  @property
1331  def status(self) -> str:
1332    return self._resource_data['status']
@caching.cached_api_call(in_memory=True)
def get_addresses(project_id: str) -> List[Address]:
1335@caching.cached_api_call(in_memory=True)
1336def get_addresses(project_id: str) -> List[Address]:
1337  logging.info('fetching addresses list: %s', project_id)
1338  compute = apis.get_api('compute', 'v1', project_id)
1339  addresses = []
1340  request = compute.addresses().aggregatedList(project=project_id)
1341  response = request.execute(num_retries=config.API_RETRIES)
1342  addresses_by_regions = response['items']
1343  for _, data_ in addresses_by_regions.items():
1344    if 'addresses' not in data_:
1345      continue
1346    addresses.extend(
1347        [Address(project_id, address) for address in data_['addresses']])
1348  return addresses