gcpdiag.queries.network

Queries related to VPC Networks.
IPv4AddrOrIPv6Addr = typing.Union[ipaddress.IPv4Address, ipaddress.IPv6Address]
IPv4NetOrIPv6Net = typing.Union[ipaddress.IPv4Network, ipaddress.IPv6Network]
IPAddrOrNet = typing.Union[ipaddress.IPv4Address, ipaddress.IPv6Address, ipaddress.IPv4Network, ipaddress.IPv6Network]
class Subnetwork(gcpdiag.models.Resource):
32class Subnetwork(models.Resource):
33  """A VPC subnetwork."""
34
35  _resource_data: dict
36
37  def __init__(self, project_id, resource_data):
38    super().__init__(project_id=project_id)
39    self._resource_data = resource_data
40
41  @property
42  def full_path(self) -> str:
43    result = re.match(r'https://www.googleapis.com/compute/v1/(.*)',
44                      self.self_link)
45    if result:
46      return result.group(1)
47    else:
48      return f'>> {self.self_link}'
49
50  @property
51  def short_path(self) -> str:
52    path = self.project_id + '/' + self.name
53    return path
54
55  @property
56  def name(self) -> str:
57    return self._resource_data['name']
58
59  @property
60  def self_link(self) -> str:
61    return self._resource_data.get('selfLink', '')
62
63  @property
64  def ip_network(self) -> IPv4NetOrIPv6Net:
65    return ipaddress.ip_network(self._resource_data['ipCidrRange'])
66
67  @property
68  def region(self) -> str:
69    # https://www.googleapis.com/compute/v1/projects/gcpdiag-gke1-aaaa/regions/europe-west4
70    m = re.match(
71        r'https://www.googleapis.com/compute/v1/projects/([^/]+)/regions/([^/]+)',
72        self._resource_data['region'])
73    if not m:
74      raise RuntimeError(
75          f"can't parse region URL: {self._resource_data['region']}")
76    return m.group(2)
77
78  def is_private_ip_google_access(self) -> bool:
79    return self._resource_data.get('privateIpGoogleAccess', False)
80
81  @property
82  def network(self):
83    return self._resource_data['network']

A VPC subnetwork.

Subnetwork(project_id, resource_data)
37  def __init__(self, project_id, resource_data):
38    super().__init__(project_id=project_id)
39    self._resource_data = resource_data
full_path: str
41  @property
42  def full_path(self) -> str:
43    result = re.match(r'https://www.googleapis.com/compute/v1/(.*)',
44                      self.self_link)
45    if result:
46      return result.group(1)
47    else:
48      return f'>> {self.self_link}'

Returns the full path of this resource.

Example: 'projects/gcpdiag-gke-1-9b90/zones/europe-west4-a/clusters/gke1'

short_path: str
50  @property
51  def short_path(self) -> str:
52    path = self.project_id + '/' + self.name
53    return path

Returns the short name for this resource.

Note that it isn't clear from this name what kind of resource it is.

Example: 'gke1'

name: str
55  @property
56  def name(self) -> str:
57    return self._resource_data['name']
ip_network: Union[ipaddress.IPv4Network, ipaddress.IPv6Network]
63  @property
64  def ip_network(self) -> IPv4NetOrIPv6Net:
65    return ipaddress.ip_network(self._resource_data['ipCidrRange'])
region: str
67  @property
68  def region(self) -> str:
69    # https://www.googleapis.com/compute/v1/projects/gcpdiag-gke1-aaaa/regions/europe-west4
70    m = re.match(
71        r'https://www.googleapis.com/compute/v1/projects/([^/]+)/regions/([^/]+)',
72        self._resource_data['region'])
73    if not m:
74      raise RuntimeError(
75          f"can't parse region URL: {self._resource_data['region']}")
76    return m.group(2)
def is_private_ip_google_access(self) -> bool:
78  def is_private_ip_google_access(self) -> bool:
79    return self._resource_data.get('privateIpGoogleAccess', False)
network
81  @property
82  def network(self):
83    return self._resource_data['network']
class Route(gcpdiag.models.Resource):
 86class Route(models.Resource):
 87  """A VPC Route."""
 88
 89  _resource_data: dict
 90
 91  def __init__(self, project_id, resource_data):
 92    super().__init__(project_id=project_id)
 93    self._resource_data = resource_data
 94
 95  @property
 96  def full_path(self) -> str:
 97    result = re.match(r'https://www.googleapis.com/compute/v1/(.*)',
 98                      self.self_link)
 99    if result:
100      return result.group(1)
101    else:
102      return f'>> {self.self_link}'
103
104  @property
105  def short_path(self) -> str:
106    path = self.project_id + '/' + self.name
107    return path
108
109  @property
110  def name(self) -> str:
111    return self._resource_data['name']
112
113  @property
114  def kind(self) -> str:
115    return self._resource_data['kind']
116
117  @property
118  def self_link(self) -> str:
119    return self._resource_data['selfLink']
120
121  @property
122  def network(self) -> str:
123    return self._resource_data['network']
124
125  @property
126  def tags(self) -> List[str]:
127    if 'tags' in self._resource_data:
128      return self._resource_data['tags']
129    return []
130
131  @property
132  def dest_range(self) -> str:
133    return self._resource_data['destRange']
134
135  @property
136  def next_hop_gateway(self) -> Optional[str]:
137    if 'nextHopGateway' in self._resource_data:
138      return self._resource_data['nextHopGateway']
139    return None
140
141  @property
142  def next_hop_vpn_tunnel(self) -> Optional[str]:
143    return self._resource_data.get('nextHopVpnTunnel')
144
145  @property
146  def next_hop_hub(self) -> Optional[str]:
147    return self._resource_data.get('nextHopHub')
148
149  @property
150  def priority(self) -> int:
151    return self._resource_data['priority']
152
153  def get_next_hop(self) -> Union[Dict[str, Any], Optional[str]]:
154    hop_types = {
155        'nextHopGateway': 'nextHopGateway',
156        'nextHopVpnTunnel': 'nextHopVpnTunnel',
157        'nextHopHub': 'nextHopHub',
158        'nextHopInstance': 'nextHopInstance',
159        'nextHopAddress': 'nextHopAddress',
160        'nextHopPeering': 'nextHopPeering',
161        'nextHopIlb': 'nextHopIlb',
162        'nextHopNetwork': 'nextHopNetwork',
163        'nextHopIp': 'nextHopIp'
164    }
165
166    for hop_type, value in hop_types.items():
167      if self._resource_data.get(hop_type):
168        return {'type': value, 'link': self._resource_data[hop_type]}
169    return None
170
171  def check_route_match(self, ip1: IPAddrOrNet, ip2: str) -> bool:
172    ip2_list = [ipaddress.ip_network(ip2)]
173    if _ip_match(ip1, ip2_list, 'allow'):
174      return True
175    return False

A VPC Route.

Route(project_id, resource_data)
91  def __init__(self, project_id, resource_data):
92    super().__init__(project_id=project_id)
93    self._resource_data = resource_data
full_path: str
 95  @property
 96  def full_path(self) -> str:
 97    result = re.match(r'https://www.googleapis.com/compute/v1/(.*)',
 98                      self.self_link)
 99    if result:
100      return result.group(1)
101    else:
102      return f'>> {self.self_link}'

Returns the full path of this resource.

Example: 'projects/gcpdiag-gke-1-9b90/zones/europe-west4-a/clusters/gke1'

short_path: str
104  @property
105  def short_path(self) -> str:
106    path = self.project_id + '/' + self.name
107    return path

Returns the short name for this resource.

Note that it isn't clear from this name what kind of resource it is.

Example: 'gke1'

name: str
109  @property
110  def name(self) -> str:
111    return self._resource_data['name']
kind: str
113  @property
114  def kind(self) -> str:
115    return self._resource_data['kind']
network: str
121  @property
122  def network(self) -> str:
123    return self._resource_data['network']
tags: List[str]
125  @property
126  def tags(self) -> List[str]:
127    if 'tags' in self._resource_data:
128      return self._resource_data['tags']
129    return []
dest_range: str
131  @property
132  def dest_range(self) -> str:
133    return self._resource_data['destRange']
next_hop_gateway: Optional[str]
135  @property
136  def next_hop_gateway(self) -> Optional[str]:
137    if 'nextHopGateway' in self._resource_data:
138      return self._resource_data['nextHopGateway']
139    return None
next_hop_vpn_tunnel: Optional[str]
141  @property
142  def next_hop_vpn_tunnel(self) -> Optional[str]:
143    return self._resource_data.get('nextHopVpnTunnel')
next_hop_hub: Optional[str]
145  @property
146  def next_hop_hub(self) -> Optional[str]:
147    return self._resource_data.get('nextHopHub')
priority: int
149  @property
150  def priority(self) -> int:
151    return self._resource_data['priority']
def get_next_hop(self) -> Union[Dict[str, Any], str, NoneType]:
153  def get_next_hop(self) -> Union[Dict[str, Any], Optional[str]]:
154    hop_types = {
155        'nextHopGateway': 'nextHopGateway',
156        'nextHopVpnTunnel': 'nextHopVpnTunnel',
157        'nextHopHub': 'nextHopHub',
158        'nextHopInstance': 'nextHopInstance',
159        'nextHopAddress': 'nextHopAddress',
160        'nextHopPeering': 'nextHopPeering',
161        'nextHopIlb': 'nextHopIlb',
162        'nextHopNetwork': 'nextHopNetwork',
163        'nextHopIp': 'nextHopIp'
164    }
165
166    for hop_type, value in hop_types.items():
167      if self._resource_data.get(hop_type):
168        return {'type': value, 'link': self._resource_data[hop_type]}
169    return None
def check_route_match( self, ip1: Union[ipaddress.IPv4Address, ipaddress.IPv6Address, ipaddress.IPv4Network, ipaddress.IPv6Network], ip2: str) -> bool:
171  def check_route_match(self, ip1: IPAddrOrNet, ip2: str) -> bool:
172    ip2_list = [ipaddress.ip_network(ip2)]
173    if _ip_match(ip1, ip2_list, 'allow'):
174      return True
175    return False
class ManagedZone(gcpdiag.models.Resource):
178class ManagedZone(models.Resource):
179  """
180  Represent a DNS zone (public or private
181
182  https://cloud.google.com/dns/docs/reference/v1beta2/managedZones
183  """
184  _resource_data: dict
185
186  def __init__(self, project_id, resource_data):
187    super().__init__(project_id=project_id)
188    self._resource_data = resource_data
189
190  @property
191  def cloud_logging_config(self) -> bool:
192    return self._resource_data['cloudLoggingConfig'].get('enableLogging', False)
193
194  @property
195  def is_public(self) -> bool:
196    return self._resource_data['visibility'] == 'public'
197
198  @property
199  def vpc_attached(self) -> bool:
200    if 'privateVisibilityConfig' not in self._resource_data:
201      self._resource_data['privateVisibilityConfig'] = {}
202
203    return (self._resource_data['privateVisibilityConfig'].get(
204        'networks', False) or
205            self._resource_data['privateVisibilityConfig'].get(
206                'gkeClusters', False))
207
208  @property
209  def dnssec_config_state(self) -> bool:
210    if 'dnssecConfig' not in self._resource_data:
211      self._resource_data['dnssecConfig'] = {}
212
213    return self._resource_data['dnssecConfig'].get('state', False)
214
215  @property
216  def name(self) -> str:
217    return self._resource_data['name']
218
219  @property
220  def full_path(self) -> str:
221    result = re.match(r'https://dns.googleapis.com/dns/v1beta2/(.*)',
222                      self.self_link)
223    if result:
224      return result.group(1)
225    else:
226      return f'>> {self.self_link}'
227
228  @property
229  def short_path(self) -> str:
230    path = self.project_id + '/' + self.name
231    return path
232
233  @property
234  def self_link(self) -> str:
235    return self._resource_data.get('selfLink', '')
ManagedZone(project_id, resource_data)
186  def __init__(self, project_id, resource_data):
187    super().__init__(project_id=project_id)
188    self._resource_data = resource_data
cloud_logging_config: bool
190  @property
191  def cloud_logging_config(self) -> bool:
192    return self._resource_data['cloudLoggingConfig'].get('enableLogging', False)
is_public: bool
194  @property
195  def is_public(self) -> bool:
196    return self._resource_data['visibility'] == 'public'
vpc_attached: bool
198  @property
199  def vpc_attached(self) -> bool:
200    if 'privateVisibilityConfig' not in self._resource_data:
201      self._resource_data['privateVisibilityConfig'] = {}
202
203    return (self._resource_data['privateVisibilityConfig'].get(
204        'networks', False) or
205            self._resource_data['privateVisibilityConfig'].get(
206                'gkeClusters', False))
dnssec_config_state: bool
208  @property
209  def dnssec_config_state(self) -> bool:
210    if 'dnssecConfig' not in self._resource_data:
211      self._resource_data['dnssecConfig'] = {}
212
213    return self._resource_data['dnssecConfig'].get('state', False)
name: str
215  @property
216  def name(self) -> str:
217    return self._resource_data['name']
full_path: str
219  @property
220  def full_path(self) -> str:
221    result = re.match(r'https://dns.googleapis.com/dns/v1beta2/(.*)',
222                      self.self_link)
223    if result:
224      return result.group(1)
225    else:
226      return f'>> {self.self_link}'

Returns the full path of this resource.

Example: 'projects/gcpdiag-gke-1-9b90/zones/europe-west4-a/clusters/gke1'

short_path: str
228  @property
229  def short_path(self) -> str:
230    path = self.project_id + '/' + self.name
231    return path

Returns the short name for this resource.

Note that it isn't clear from this name what kind of resource it is.

Example: 'gke1'

class Router(gcpdiag.models.Resource):
238class Router(models.Resource):
239  """A VPC Router."""
240
241  _resource_data: dict
242
243  def __init__(self, project_id, resource_data):
244    super().__init__(project_id=project_id)
245    self._resource_data = resource_data
246    self._nats = None
247
248  @property
249  def full_path(self) -> str:
250    result = re.match(r'https://www.googleapis.com/compute/v1/(.*)',
251                      self.self_link)
252    if result:
253      return result.group(1)
254    else:
255      return f'>> {self.self_link}'
256
257  @property
258  def short_path(self) -> str:
259    path = self.project_id + '/' + self.name
260    return path
261
262  @property
263  def name(self) -> str:
264    return self._resource_data['name']
265
266  @property
267  def self_link(self) -> str:
268    return self._resource_data.get('selfLink', '')
269
270  def subnet_has_nat(self, subnetwork):
271    if not self._resource_data.get('nats', []):
272      return False
273    for n in self._resource_data.get('nats', []):
274      if n['sourceSubnetworkIpRangesToNat'] == 'LIST_OF_SUBNETWORKS':
275        # Cloud NAT configure for specific subnets
276        if 'subnetworks' in n and subnetwork.self_link in [
277            s['name'] for s in n['subnetworks']
278        ]:
279          return True
280      else:
281        # Cloud NAT configured for all subnets
282        return True
283    return False

A VPC Router.

Router(project_id, resource_data)
243  def __init__(self, project_id, resource_data):
244    super().__init__(project_id=project_id)
245    self._resource_data = resource_data
246    self._nats = None
full_path: str
248  @property
249  def full_path(self) -> str:
250    result = re.match(r'https://www.googleapis.com/compute/v1/(.*)',
251                      self.self_link)
252    if result:
253      return result.group(1)
254    else:
255      return f'>> {self.self_link}'

Returns the full path of this resource.

Example: 'projects/gcpdiag-gke-1-9b90/zones/europe-west4-a/clusters/gke1'

short_path: str
257  @property
258  def short_path(self) -> str:
259    path = self.project_id + '/' + self.name
260    return path

Returns the short name for this resource.

Note that it isn't clear from this name what kind of resource it is.

Example: 'gke1'

name: str
262  @property
263  def name(self) -> str:
264    return self._resource_data['name']
def subnet_has_nat(self, subnetwork):
270  def subnet_has_nat(self, subnetwork):
271    if not self._resource_data.get('nats', []):
272      return False
273    for n in self._resource_data.get('nats', []):
274      if n['sourceSubnetworkIpRangesToNat'] == 'LIST_OF_SUBNETWORKS':
275        # Cloud NAT configure for specific subnets
276        if 'subnetworks' in n and subnetwork.self_link in [
277            s['name'] for s in n['subnetworks']
278        ]:
279          return True
280      else:
281        # Cloud NAT configured for all subnets
282        return True
283    return False
@dataclasses.dataclass
class Peering:
286@dataclasses.dataclass
287class Peering:
288  """VPC Peerings"""
289
290  name: str
291  url: str
292  state: str
293  exports_custom_routes: bool
294  imports_custom_routes: bool
295  auto_creates_routes: bool
296
297  def __str__(self):
298    return self.name

VPC Peerings

Peering( name: str, url: str, state: str, exports_custom_routes: bool, imports_custom_routes: bool, auto_creates_routes: bool)
name: str
url: str
state: str
exports_custom_routes: bool
imports_custom_routes: bool
auto_creates_routes: bool
class Network(gcpdiag.models.Resource):
301class Network(models.Resource):
302  """A VPC network."""
303  _resource_data: dict
304  _subnetworks: Optional[Dict[str, Subnetwork]]
305
306  def __init__(self, project_id, resource_data):
307    super().__init__(project_id=project_id)
308    self._resource_data = resource_data
309    self._subnetworks = None
310
311  @property
312  def full_path(self) -> str:
313    result = re.match(r'https://www.googleapis.com/compute/v1/(.*)',
314                      self.self_link)
315    if result:
316      return result.group(1)
317    else:
318      return f'>> {self.self_link}'
319
320  @property
321  def short_path(self) -> str:
322    path = self.project_id + '/' + self.name
323    return path
324
325  @property
326  def name(self) -> str:
327    return self._resource_data['name']
328
329  @property
330  def self_link(self) -> str:
331    return self._resource_data.get('selfLink', '')
332
333  @property
334  def firewall(self) -> 'EffectiveFirewalls':
335    return _get_effective_firewalls(self)
336
337  @property
338  def subnetworks(self) -> Dict[str, Subnetwork]:
339    return _batch_get_subnetworks(
340        self._project_id, frozenset(self._resource_data.get('subnetworks', [])))
341
342  @property
343  def peerings(self) -> List[Peering]:
344    return [
345        Peering(peer['name'], peer['network'], peer['state'],
346                peer['exportCustomRoutes'], peer['importCustomRoutes'],
347                peer['autoCreateRoutes'])
348        for peer in self._resource_data.get('peerings', [])
349    ]

A VPC network.

Network(project_id, resource_data)
306  def __init__(self, project_id, resource_data):
307    super().__init__(project_id=project_id)
308    self._resource_data = resource_data
309    self._subnetworks = None
full_path: str
311  @property
312  def full_path(self) -> str:
313    result = re.match(r'https://www.googleapis.com/compute/v1/(.*)',
314                      self.self_link)
315    if result:
316      return result.group(1)
317    else:
318      return f'>> {self.self_link}'

Returns the full path of this resource.

Example: 'projects/gcpdiag-gke-1-9b90/zones/europe-west4-a/clusters/gke1'

short_path: str
320  @property
321  def short_path(self) -> str:
322    path = self.project_id + '/' + self.name
323    return path

Returns the short name for this resource.

Note that it isn't clear from this name what kind of resource it is.

Example: 'gke1'

name: str
325  @property
326  def name(self) -> str:
327    return self._resource_data['name']
firewall: EffectiveFirewalls
333  @property
334  def firewall(self) -> 'EffectiveFirewalls':
335    return _get_effective_firewalls(self)
subnetworks: Dict[str, Subnetwork]
337  @property
338  def subnetworks(self) -> Dict[str, Subnetwork]:
339    return _batch_get_subnetworks(
340        self._project_id, frozenset(self._resource_data.get('subnetworks', [])))
peerings: List[Peering]
342  @property
343  def peerings(self) -> List[Peering]:
344    return [
345        Peering(peer['name'], peer['network'], peer['state'],
346                peer['exportCustomRoutes'], peer['importCustomRoutes'],
347                peer['autoCreateRoutes'])
348        for peer in self._resource_data.get('peerings', [])
349    ]
@dataclasses.dataclass
class FirewallCheckResult:
457@dataclasses.dataclass
458class FirewallCheckResult:
459  """The result of a firewall connectivity check."""
460
461  action: str
462  firewall_policy_name: Optional[str] = None
463  firewall_policy_rule_description: Optional[str] = None
464  vpc_firewall_rule_id: Optional[str] = None
465  vpc_firewall_rule_name: Optional[str] = None
466
467  def __str__(self):
468    return self.action
469
470  @property
471  def matched_by_str(self):
472    if self.firewall_policy_name:
473      if self.firewall_policy_rule_description:
474        return f'policy: {self.firewall_policy_name}, rule: {self.firewall_policy_rule_description}'
475      else:
476        return f'policy: {self.firewall_policy_name}'
477    elif self.vpc_firewall_rule_name:
478      return f'vpc firewall rule: {self.vpc_firewall_rule_name}'

The result of a firewall connectivity check.

FirewallCheckResult( action: str, firewall_policy_name: Optional[str] = None, firewall_policy_rule_description: Optional[str] = None, vpc_firewall_rule_id: Optional[str] = None, vpc_firewall_rule_name: Optional[str] = None)
action: str
firewall_policy_name: Optional[str] = None
firewall_policy_rule_description: Optional[str] = None
vpc_firewall_rule_id: Optional[str] = None
vpc_firewall_rule_name: Optional[str] = None
matched_by_str
470  @property
471  def matched_by_str(self):
472    if self.firewall_policy_name:
473      if self.firewall_policy_rule_description:
474        return f'policy: {self.firewall_policy_name}, rule: {self.firewall_policy_rule_description}'
475      else:
476        return f'policy: {self.firewall_policy_name}'
477    elif self.vpc_firewall_rule_name:
478      return f'vpc firewall rule: {self.vpc_firewall_rule_name}'
class FirewallRuleNotFoundError(builtins.Exception):
481class FirewallRuleNotFoundError(Exception):
482  rule_name: str
483
484  def __init__(self, name, disabled=False):
485    # Call the base class constructor with the parameters it needs
486    super().__init__(f'firewall rule not found: {name}')
487    self.rule_name = name
488    self.disabled = disabled

Common base class for all non-exit exceptions.

FirewallRuleNotFoundError(name, disabled=False)
484  def __init__(self, name, disabled=False):
485    # Call the base class constructor with the parameters it needs
486    super().__init__(f'firewall rule not found: {name}')
487    self.rule_name = name
488    self.disabled = disabled
rule_name: str
disabled
class VpcFirewallRule:
491class VpcFirewallRule:
492  """Represents firewall rule"""
493
494  def __init__(self, resource_data):
495    self._resource_data = resource_data
496
497  @property
498  def name(self) -> str:
499    return self._resource_data['name']
500
501  @property
502  def source_ranges(self) -> List[ipaddress.IPv4Network]:
503    return self._resource_data['sourceRanges']
504
505  @property
506  def target_tags(self) -> set:
507    return self._resource_data['targetTags']
508
509  @property
510  def allowed(self) -> List[dict]:
511    return self._resource_data['allowed']
512
513  def is_enabled(self) -> bool:
514    return not self._resource_data['disabled']

Represents firewall rule

VpcFirewallRule(resource_data)
494  def __init__(self, resource_data):
495    self._resource_data = resource_data
name: str
497  @property
498  def name(self) -> str:
499    return self._resource_data['name']
source_ranges: List[ipaddress.IPv4Network]
501  @property
502  def source_ranges(self) -> List[ipaddress.IPv4Network]:
503    return self._resource_data['sourceRanges']
target_tags: set
505  @property
506  def target_tags(self) -> set:
507    return self._resource_data['targetTags']
allowed: List[dict]
509  @property
510  def allowed(self) -> List[dict]:
511    return self._resource_data['allowed']
def is_enabled(self) -> bool:
513  def is_enabled(self) -> bool:
514    return not self._resource_data['disabled']
class EffectiveFirewalls:
 866class EffectiveFirewalls:
 867  """Effective firewall rules for a VPC network or Instance.
 868
 869  Includes org/folder firewall policies)."""
 870  _resource_data: dict
 871  _policies: List[_FirewallPolicy]
 872  _vpc_firewall: _VpcFirewall
 873
 874  def __init__(self, resource_data):
 875    self._resource_data = resource_data
 876    self._policies = []
 877    if 'firewallPolicys' in resource_data:
 878      for policy in resource_data['firewallPolicys']:
 879        self._policies.append(_FirewallPolicy(policy))
 880    self._vpc_firewall = _VpcFirewall(resource_data.get('firewalls', {}))
 881
 882  def check_connectivity_ingress(
 883      self,  #
 884      *,
 885      src_ip: IPAddrOrNet,
 886      ip_protocol: str,
 887      port: Optional[int] = None,
 888      source_service_account: Optional[str] = None,
 889      source_tags: Optional[List[str]] = None,
 890      target_service_account: Optional[str] = None,
 891      target_tags: Optional[List[str]] = None) -> FirewallCheckResult:
 892
 893    if ip_protocol != 'ICMP' and port is None:
 894      raise ValueError('TCP and UDP must have port numbers')
 895
 896    # Firewall policies (organization, folders)
 897    for p in self._policies:
 898      result = p.check_connectivity_ingress(
 899          src_ip=src_ip,
 900          ip_protocol=ip_protocol,
 901          port=port,
 902          #target_network=self._network,
 903          target_service_account=target_service_account)
 904      if result.action != 'goto_next':
 905        return result
 906
 907    # VPC firewall rules
 908    return self._vpc_firewall.check_connectivity_ingress(
 909        src_ip=src_ip,
 910        ip_protocol=ip_protocol,
 911        port=port,
 912        source_service_account=source_service_account,
 913        source_tags=source_tags,
 914        target_service_account=target_service_account,
 915        target_tags=target_tags)
 916
 917  def check_connectivity_egress(
 918      self,  #
 919      *,
 920      src_ip: IPAddrOrNet,
 921      ip_protocol: str,
 922      port: Optional[int] = None,
 923      source_service_account: Optional[str] = None,
 924      source_tags: Optional[List[str]] = None,
 925      target_service_account: Optional[str] = None,
 926      target_tags: Optional[List[str]] = None) -> FirewallCheckResult:
 927
 928    if ip_protocol != 'ICMP' and port is None:
 929      raise ValueError('TCP and UDP must have port numbers')
 930
 931    # Firewall policies (organization, folders)
 932    for p in self._policies:
 933      result = p.check_connectivity_egress(
 934          src_ip=src_ip,
 935          ip_protocol=ip_protocol,
 936          port=port,
 937          #target_network=self._network,
 938          target_service_account=target_service_account)
 939      if result.action != 'goto_next':
 940        return result
 941
 942    # VPC firewall rules
 943    return self._vpc_firewall.check_connectivity_egress(
 944        src_ip=src_ip,
 945        ip_protocol=ip_protocol,
 946        port=port,
 947        source_service_account=source_service_account,
 948        source_tags=source_tags,
 949        target_service_account=target_service_account,
 950        target_tags=target_tags)
 951
 952  def get_vpc_ingress_rules(
 953      self,
 954      name: Optional[str] = None,
 955      name_pattern: Optional[re.Pattern] = None,
 956      target_tags: Optional[List[str]] = None) -> List[VpcFirewallRule]:
 957    """Retrieve the list of ingress firewall rules matching name or name pattern and target tags.
 958
 959    Args:
 960        name (Optional[str], optional): firewall rune name. Defaults to None.
 961        name_pattern (Optional[re.Pattern], optional): firewall rule name pattern. Defaults to None.
 962        target_tags (Optional[List[str]], optional): firewall target tags
 963          (if not specified any tag will match). Defaults to None.
 964
 965    Returns:
 966        List[VpcFirewallRule]: List of ingress firewall rules
 967    """
 968    rules = self._vpc_firewall.get_vpc_ingress_rules(name, name_pattern,
 969                                                     target_tags)
 970    return rules
 971
 972  def get_vpc_egress_rules(
 973      self,
 974      name: Optional[str] = None,
 975      name_pattern: Optional[re.Pattern] = None,
 976      target_tags: Optional[List[str]] = None) -> List[VpcFirewallRule]:
 977    """Retrieve the list of egress firewall rules matching name or name pattern and target tags.
 978
 979    Args:
 980        name (Optional[str], optional): firewall rune name. Defaults to None.
 981        name_pattern (Optional[re.Pattern], optional): firewall rule name pattern. Defaults to None.
 982        target_tags (Optional[List[str]], optional): firewall target tags
 983          (if not specified any tag will match). Defaults to None.
 984
 985    Returns:
 986        List[VpcFirewallRule]: List of egress firewall rules
 987    """
 988    rules = self._vpc_firewall.get_vpc_egress_rules(name, name_pattern,
 989                                                    target_tags)
 990    return rules
 991
 992  def verify_ingress_rule_exists(self, name: str):
 993    """Verify that a certain VPC rule exists. This is useful to verify
 994    whether maybe a permission was missing on a shared VPC and an
 995    automatic rule couldn't be created."""
 996    return self._vpc_firewall.verify_ingress_rule_exists(name)
 997
 998  def verify_egress_rule_exists(self, name: str):
 999    """Verify that a certain VPC rule exists. This is useful to verify
1000    whether maybe a permission was missing on a shared VPC and an
1001    automatic rule couldn't be created."""
1002    return self._vpc_firewall.verify_egress_rule_exists(name)

Effective firewall rules for a VPC network or Instance.

Includes org/folder firewall policies).

EffectiveFirewalls(resource_data)
874  def __init__(self, resource_data):
875    self._resource_data = resource_data
876    self._policies = []
877    if 'firewallPolicys' in resource_data:
878      for policy in resource_data['firewallPolicys']:
879        self._policies.append(_FirewallPolicy(policy))
880    self._vpc_firewall = _VpcFirewall(resource_data.get('firewalls', {}))
def check_connectivity_ingress( self, *, src_ip: Union[ipaddress.IPv4Address, ipaddress.IPv6Address, ipaddress.IPv4Network, ipaddress.IPv6Network], ip_protocol: str, port: Optional[int] = None, source_service_account: Optional[str] = None, source_tags: Optional[List[str]] = None, target_service_account: Optional[str] = None, target_tags: Optional[List[str]] = None) -> FirewallCheckResult:
882  def check_connectivity_ingress(
883      self,  #
884      *,
885      src_ip: IPAddrOrNet,
886      ip_protocol: str,
887      port: Optional[int] = None,
888      source_service_account: Optional[str] = None,
889      source_tags: Optional[List[str]] = None,
890      target_service_account: Optional[str] = None,
891      target_tags: Optional[List[str]] = None) -> FirewallCheckResult:
892
893    if ip_protocol != 'ICMP' and port is None:
894      raise ValueError('TCP and UDP must have port numbers')
895
896    # Firewall policies (organization, folders)
897    for p in self._policies:
898      result = p.check_connectivity_ingress(
899          src_ip=src_ip,
900          ip_protocol=ip_protocol,
901          port=port,
902          #target_network=self._network,
903          target_service_account=target_service_account)
904      if result.action != 'goto_next':
905        return result
906
907    # VPC firewall rules
908    return self._vpc_firewall.check_connectivity_ingress(
909        src_ip=src_ip,
910        ip_protocol=ip_protocol,
911        port=port,
912        source_service_account=source_service_account,
913        source_tags=source_tags,
914        target_service_account=target_service_account,
915        target_tags=target_tags)
def check_connectivity_egress( self, *, src_ip: Union[ipaddress.IPv4Address, ipaddress.IPv6Address, ipaddress.IPv4Network, ipaddress.IPv6Network], ip_protocol: str, port: Optional[int] = None, source_service_account: Optional[str] = None, source_tags: Optional[List[str]] = None, target_service_account: Optional[str] = None, target_tags: Optional[List[str]] = None) -> FirewallCheckResult:
917  def check_connectivity_egress(
918      self,  #
919      *,
920      src_ip: IPAddrOrNet,
921      ip_protocol: str,
922      port: Optional[int] = None,
923      source_service_account: Optional[str] = None,
924      source_tags: Optional[List[str]] = None,
925      target_service_account: Optional[str] = None,
926      target_tags: Optional[List[str]] = None) -> FirewallCheckResult:
927
928    if ip_protocol != 'ICMP' and port is None:
929      raise ValueError('TCP and UDP must have port numbers')
930
931    # Firewall policies (organization, folders)
932    for p in self._policies:
933      result = p.check_connectivity_egress(
934          src_ip=src_ip,
935          ip_protocol=ip_protocol,
936          port=port,
937          #target_network=self._network,
938          target_service_account=target_service_account)
939      if result.action != 'goto_next':
940        return result
941
942    # VPC firewall rules
943    return self._vpc_firewall.check_connectivity_egress(
944        src_ip=src_ip,
945        ip_protocol=ip_protocol,
946        port=port,
947        source_service_account=source_service_account,
948        source_tags=source_tags,
949        target_service_account=target_service_account,
950        target_tags=target_tags)
def get_vpc_ingress_rules( self, name: Optional[str] = None, name_pattern: Optional[re.Pattern] = None, target_tags: Optional[List[str]] = None) -> List[VpcFirewallRule]:
952  def get_vpc_ingress_rules(
953      self,
954      name: Optional[str] = None,
955      name_pattern: Optional[re.Pattern] = None,
956      target_tags: Optional[List[str]] = None) -> List[VpcFirewallRule]:
957    """Retrieve the list of ingress firewall rules matching name or name pattern and target tags.
958
959    Args:
960        name (Optional[str], optional): firewall rune name. Defaults to None.
961        name_pattern (Optional[re.Pattern], optional): firewall rule name pattern. Defaults to None.
962        target_tags (Optional[List[str]], optional): firewall target tags
963          (if not specified any tag will match). Defaults to None.
964
965    Returns:
966        List[VpcFirewallRule]: List of ingress firewall rules
967    """
968    rules = self._vpc_firewall.get_vpc_ingress_rules(name, name_pattern,
969                                                     target_tags)
970    return rules

Retrieve the list of ingress firewall rules matching name or name pattern and target tags.

Arguments:
  • name (Optional[str], optional): firewall rune name. Defaults to None.
  • name_pattern (Optional[re.Pattern], optional): firewall rule name pattern. Defaults to None.
  • target_tags (Optional[List[str]], optional): firewall target tags (if not specified any tag will match). Defaults to None.
Returns:

List[VpcFirewallRule]: List of ingress firewall rules

def get_vpc_egress_rules( self, name: Optional[str] = None, name_pattern: Optional[re.Pattern] = None, target_tags: Optional[List[str]] = None) -> List[VpcFirewallRule]:
972  def get_vpc_egress_rules(
973      self,
974      name: Optional[str] = None,
975      name_pattern: Optional[re.Pattern] = None,
976      target_tags: Optional[List[str]] = None) -> List[VpcFirewallRule]:
977    """Retrieve the list of egress firewall rules matching name or name pattern and target tags.
978
979    Args:
980        name (Optional[str], optional): firewall rune name. Defaults to None.
981        name_pattern (Optional[re.Pattern], optional): firewall rule name pattern. Defaults to None.
982        target_tags (Optional[List[str]], optional): firewall target tags
983          (if not specified any tag will match). Defaults to None.
984
985    Returns:
986        List[VpcFirewallRule]: List of egress firewall rules
987    """
988    rules = self._vpc_firewall.get_vpc_egress_rules(name, name_pattern,
989                                                    target_tags)
990    return rules

Retrieve the list of egress firewall rules matching name or name pattern and target tags.

Arguments:
  • name (Optional[str], optional): firewall rune name. Defaults to None.
  • name_pattern (Optional[re.Pattern], optional): firewall rule name pattern. Defaults to None.
  • target_tags (Optional[List[str]], optional): firewall target tags (if not specified any tag will match). Defaults to None.
Returns:

List[VpcFirewallRule]: List of egress firewall rules

def verify_ingress_rule_exists(self, name: str):
992  def verify_ingress_rule_exists(self, name: str):
993    """Verify that a certain VPC rule exists. This is useful to verify
994    whether maybe a permission was missing on a shared VPC and an
995    automatic rule couldn't be created."""
996    return self._vpc_firewall.verify_ingress_rule_exists(name)

Verify that a certain VPC rule exists. This is useful to verify whether maybe a permission was missing on a shared VPC and an automatic rule couldn't be created.

def verify_egress_rule_exists(self, name: str):
 998  def verify_egress_rule_exists(self, name: str):
 999    """Verify that a certain VPC rule exists. This is useful to verify
1000    whether maybe a permission was missing on a shared VPC and an
1001    automatic rule couldn't be created."""
1002    return self._vpc_firewall.verify_egress_rule_exists(name)

Verify that a certain VPC rule exists. This is useful to verify whether maybe a permission was missing on a shared VPC and an automatic rule couldn't be created.

class VPCEffectiveFirewalls(EffectiveFirewalls):
1005class VPCEffectiveFirewalls(EffectiveFirewalls):
1006  """Effective firewall rules for a VPC network.
1007
1008  Includes org/folder firewall policies)."""
1009  _network: Network
1010
1011  def __init__(self, network, resource_data):
1012    super().__init__(resource_data)
1013    self._network = network

Effective firewall rules for a VPC network.

Includes org/folder firewall policies).

VPCEffectiveFirewalls(network, resource_data)
1011  def __init__(self, network, resource_data):
1012    super().__init__(resource_data)
1013    self._network = network
@caching.cached_api_call(in_memory=True)
def get_network(project_id: str, network_name: str) -> Network:
1025@caching.cached_api_call(in_memory=True)
1026def get_network(project_id: str, network_name: str) -> Network:
1027  logging.info('fetching network: %s/%s', project_id, network_name)
1028  compute = apis.get_api('compute', 'v1', project_id)
1029  request = compute.networks().get(project=project_id, network=network_name)
1030  response = request.execute(num_retries=config.API_RETRIES)
1031  return Network(project_id, response)
def get_subnetwork_from_url(url: str) -> Subnetwork:
1034def get_subnetwork_from_url(url: str) -> Subnetwork:
1035  """Returns Subnetwork object given subnetwork url"""
1036  m = re.match((r'https://www.googleapis.com/compute/v1/projects/'
1037                r'([^/]+)/regions/([^/]+)/subnetworks/([^/]+)$'), url)
1038  if not m:
1039    raise ValueError(f"can't parse network url: {url}")
1040  (project_id, region, subnetwork_name) = (m.group(1), m.group(2), m.group(3))
1041  return get_subnetwork(project_id, region, subnetwork_name)

Returns Subnetwork object given subnetwork url

def get_network_from_url(url: str) -> Network:
1044def get_network_from_url(url: str) -> Network:
1045  m = re.match(
1046      r'https://www.googleapis.com/compute/v1/projects/([^/]+)/global/networks/([^/]+)',
1047      url)
1048  if not m:
1049    raise ValueError(f"can't parse network url: {url}")
1050  (project_id, network_name) = (m.group(1), m.group(2))
1051  return get_network(project_id, network_name)
@caching.cached_api_call(in_memory=True)
def get_networks(project_id: str) -> List[Network]:
1054@caching.cached_api_call(in_memory=True)
1055def get_networks(project_id: str) -> List[Network]:
1056  logging.info('fetching network: %s', project_id)
1057  compute = apis.get_api('compute', 'v1', project_id)
1058  request = compute.networks().list(project=project_id)
1059  response = request.execute(num_retries=config.API_RETRIES)
1060  return [Network(project_id, item) for item in response.get('items', [])]
@caching.cached_api_call(in_memory=True)
def get_subnetwork( project_id: str, region: str, subnetwork_name: str) -> Subnetwork:
1063@caching.cached_api_call(in_memory=True)
1064def get_subnetwork(project_id: str, region: str,
1065                   subnetwork_name: str) -> Subnetwork:
1066  logging.info('fetching network: %s/%s', project_id, subnetwork_name)
1067  compute = apis.get_api('compute', 'v1', project_id)
1068  request = compute.subnetworks().get(project=project_id,
1069                                      region=region,
1070                                      subnetwork=subnetwork_name)
1071  response = request.execute(num_retries=config.API_RETRIES)
1072  return Subnetwork(project_id, response)
@caching.cached_api_call(in_memory=True)
def get_routes(project_id: str) -> List[Route]:
1101@caching.cached_api_call(in_memory=True)
1102def get_routes(project_id: str) -> List[Route]:
1103  logging.info('fetching routes: %s', project_id)
1104  compute = apis.get_api('compute', 'v1', project_id)
1105  request = compute.routes().list(project=project_id)
1106  response = request.execute(num_retries=config.API_RETRIES)
1107  return [Route(project_id, item) for item in response.get('items', [])]
@caching.cached_api_call(in_memory=True)
def get_zones(project_id: str) -> List[ManagedZone]:
1110@caching.cached_api_call(in_memory=True)
1111def get_zones(project_id: str) -> List[ManagedZone]:
1112  logging.info('fetching DNS zones: %s', project_id)
1113  dns = apis.get_api('dns', 'v1beta2', project_id)
1114  request = dns.managedZones().list(project=project_id)
1115  response = request.execute(num_retries=config.API_RETRIES)
1116  zones = []
1117  for zone in response.get('managedZones', []):
1118    request2 = dns.managedZones().get(project=project_id,
1119                                      managedZone=zone['name'])
1120    response2 = request2.execute(num_retries=config.API_RETRIES)
1121    zones.append(ManagedZone(project_id, response2))
1122  return zones
@caching.cached_api_call(in_memory=True)
def get_router(project_id: str, region: str, network) -> Router:
1125@caching.cached_api_call(in_memory=True)
1126def get_router(project_id: str, region: str, network) -> Router:
1127  logging.info('fetching routers: %s/%s', project_id, region)
1128  compute = apis.get_api('compute', 'v1', project_id)
1129  request = compute.routers().list(project=project_id,
1130                                   region=region,
1131                                   filter=f'network="{network.self_link}"')
1132  response = request.execute(num_retries=config.API_RETRIES)
1133  return Router(project_id, next(iter(response.get('items', [{}]))))
class VPCSubnetworkIAMPolicy(gcpdiag.queries.iam.BaseIAMPolicy):
1136class VPCSubnetworkIAMPolicy(iam.BaseIAMPolicy):
1137
1138  def _is_resource_permission(self, permission):
1139    return True

Common class for IAM policies

@caching.cached_api_call(in_memory=True)
def get_subnetwork_iam_policy( project_id: str, region: str, subnetwork_name: str) -> VPCSubnetworkIAMPolicy:
1142@caching.cached_api_call(in_memory=True)
1143def get_subnetwork_iam_policy(project_id: str, region: str,
1144                              subnetwork_name: str) -> VPCSubnetworkIAMPolicy:
1145  resource_name = (f'projects/{project_id}/regions/{region}/'
1146                   f'subnetworks/{subnetwork_name}')
1147
1148  compute = apis.get_api('compute', 'v1', project_id)
1149  request = compute.subnetworks().getIamPolicy(project=project_id,
1150                                               region=region,
1151                                               resource=subnetwork_name)
1152
1153  return iam.fetch_iam_policy(request, VPCSubnetworkIAMPolicy, project_id,
1154                              resource_name)
class Address(gcpdiag.models.Resource):
1157class Address(models.Resource):
1158  """IP Addresses."""
1159  _resource_data: dict
1160
1161  def __init__(self, project_id, resource_data):
1162    super().__init__(project_id=project_id)
1163    self._resource_data = resource_data
1164
1165  @property
1166  def full_path(self) -> str:
1167    result = re.match(r'https://www.googleapis.com/compute/v1/(.*)',
1168                      self.self_link)
1169    if result:
1170      return result.group(1)
1171    else:
1172      return f'>> {self.self_link}'
1173
1174  @property
1175  def short_path(self) -> str:
1176    path = self.project_id + '/' + self.name
1177    return path
1178
1179  @property
1180  def name(self) -> str:
1181    return self._resource_data['name']
1182
1183  @property
1184  def self_link(self) -> str:
1185    return self._resource_data.get('selfLink', '')
1186
1187  @property
1188  def subnetwork(self) -> str:
1189    return self._resource_data['subnetwork']
1190
1191  @property
1192  def status(self) -> str:
1193    return self._resource_data['status']

IP Addresses.

Address(project_id, resource_data)
1161  def __init__(self, project_id, resource_data):
1162    super().__init__(project_id=project_id)
1163    self._resource_data = resource_data
full_path: str
1165  @property
1166  def full_path(self) -> str:
1167    result = re.match(r'https://www.googleapis.com/compute/v1/(.*)',
1168                      self.self_link)
1169    if result:
1170      return result.group(1)
1171    else:
1172      return f'>> {self.self_link}'

Returns the full path of this resource.

Example: 'projects/gcpdiag-gke-1-9b90/zones/europe-west4-a/clusters/gke1'

short_path: str
1174  @property
1175  def short_path(self) -> str:
1176    path = self.project_id + '/' + self.name
1177    return path

Returns the short name for this resource.

Note that it isn't clear from this name what kind of resource it is.

Example: 'gke1'

name: str
1179  @property
1180  def name(self) -> str:
1181    return self._resource_data['name']
subnetwork: str
1187  @property
1188  def subnetwork(self) -> str:
1189    return self._resource_data['subnetwork']
status: str
1191  @property
1192  def status(self) -> str:
1193    return self._resource_data['status']
@caching.cached_api_call(in_memory=True)
def get_addresses(project_id: str) -> List[Address]:
1196@caching.cached_api_call(in_memory=True)
1197def get_addresses(project_id: str) -> List[Address]:
1198  logging.info('fetching addresses list: %s', project_id)
1199  compute = apis.get_api('compute', 'v1', project_id)
1200  addresses = []
1201  request = compute.addresses().aggregatedList(project=project_id)
1202  response = request.execute(num_retries=config.API_RETRIES)
1203  addresses_by_regions = response['items']
1204  for _, data_ in addresses_by_regions.items():
1205    if 'addresses' not in data_:
1206      continue
1207    addresses.extend(
1208        [Address(project_id, address) for address in data_['addresses']])
1209  return addresses