gcpdiag.queries.network

Queries related to VPC Networks.
IPv4AddrOrIPv6Addr = typing.Union[ipaddress.IPv4Address, ipaddress.IPv6Address]
IPv4NetOrIPv6Net = typing.Union[ipaddress.IPv4Network, ipaddress.IPv6Network]
IPAddrOrNet = typing.Union[ipaddress.IPv4Address, ipaddress.IPv6Address, ipaddress.IPv4Network, ipaddress.IPv6Network]
class Subnetwork(gcpdiag.models.Resource):
32class Subnetwork(models.Resource):
33  """A VPC subnetwork."""
34  _resource_data: dict
35
36  def __init__(self, project_id, resource_data):
37    super().__init__(project_id=project_id)
38    self._resource_data = resource_data
39
40  @property
41  def full_path(self) -> str:
42    result = re.match(r'https://www.googleapis.com/compute/v1/(.*)',
43                      self.self_link)
44    if result:
45      return result.group(1)
46    else:
47      return f'>> {self.self_link}'
48
49  @property
50  def short_path(self) -> str:
51    path = self.project_id + '/' + self.name
52    return path
53
54  @property
55  def name(self) -> str:
56    return self._resource_data['name']
57
58  @property
59  def self_link(self) -> str:
60    return self._resource_data.get('selfLink', '')
61
62  @property
63  def ip_network(self) -> IPv4NetOrIPv6Net:
64    return ipaddress.ip_network(self._resource_data['ipCidrRange'])
65
66  @property
67  def region(self) -> str:
68    # https://www.googleapis.com/compute/v1/projects/gcpdiag-gke1-aaaa/regions/europe-west4
69    m = re.match(
70        r'https://www.googleapis.com/compute/v1/projects/([^/]+)/regions/([^/]+)',
71        self._resource_data['region'])
72    if not m:
73      raise RuntimeError(
74          f"can't parse region URL: {self._resource_data['region']}")
75    return m.group(2)
76
77  def is_private_ip_google_access(self) -> bool:
78    return self._resource_data.get('privateIpGoogleAccess', False)
79
80  @property
81  def network(self):
82    return self._resource_data['network']

A VPC subnetwork.

Subnetwork(project_id, resource_data)
36  def __init__(self, project_id, resource_data):
37    super().__init__(project_id=project_id)
38    self._resource_data = resource_data
full_path: str
40  @property
41  def full_path(self) -> str:
42    result = re.match(r'https://www.googleapis.com/compute/v1/(.*)',
43                      self.self_link)
44    if result:
45      return result.group(1)
46    else:
47      return f'>> {self.self_link}'

Returns the full path of this resource.

Example: 'projects/gcpdiag-gke-1-9b90/zones/europe-west4-a/clusters/gke1'

short_path: str
49  @property
50  def short_path(self) -> str:
51    path = self.project_id + '/' + self.name
52    return path

Returns the short name for this resource.

Note that it isn't clear from this name what kind of resource it is.

Example: 'gke1'

name: str
54  @property
55  def name(self) -> str:
56    return self._resource_data['name']
ip_network: Union[ipaddress.IPv4Network, ipaddress.IPv6Network]
62  @property
63  def ip_network(self) -> IPv4NetOrIPv6Net:
64    return ipaddress.ip_network(self._resource_data['ipCidrRange'])
region: str
66  @property
67  def region(self) -> str:
68    # https://www.googleapis.com/compute/v1/projects/gcpdiag-gke1-aaaa/regions/europe-west4
69    m = re.match(
70        r'https://www.googleapis.com/compute/v1/projects/([^/]+)/regions/([^/]+)',
71        self._resource_data['region'])
72    if not m:
73      raise RuntimeError(
74          f"can't parse region URL: {self._resource_data['region']}")
75    return m.group(2)
def is_private_ip_google_access(self) -> bool:
77  def is_private_ip_google_access(self) -> bool:
78    return self._resource_data.get('privateIpGoogleAccess', False)
network
80  @property
81  def network(self):
82    return self._resource_data['network']
Inherited Members
gcpdiag.models.Resource
project_id
class Route(gcpdiag.models.Resource):
 85class Route(models.Resource):
 86  """A VPC Route."""
 87  _resource_data: dict
 88
 89  def __init__(self, project_id, resource_data):
 90    super().__init__(project_id=project_id)
 91    self._resource_data = resource_data
 92
 93  @property
 94  def full_path(self) -> str:
 95    result = re.match(r'https://www.googleapis.com/compute/v1/(.*)',
 96                      self.self_link)
 97    if result:
 98      return result.group(1)
 99    else:
100      return f'>> {self.self_link}'
101
102  @property
103  def short_path(self) -> str:
104    path = self.project_id + '/' + self.name
105    return path
106
107  @property
108  def name(self) -> str:
109    return self._resource_data['name']
110
111  @property
112  def self_link(self) -> str:
113    return self._resource_data['selfLink']
114
115  @property
116  def network(self) -> str:
117    return self._resource_data['network']
118
119  @property
120  def tags(self) -> List[str]:
121    if 'tags' in self._resource_data:
122      return self._resource_data['tags']
123    return []
124
125  @property
126  def dest_range(self) -> str:
127    return self._resource_data['destRange']
128
129  @property
130  def next_hop_gateway(self) -> Optional[str]:
131    if 'nextHopGateway' in self._resource_data:
132      return self._resource_data['nextHopGateway']
133    return None
134
135  @property
136  def next_hop_vpn_tunnel(self) -> Optional[str]:
137    return self._resource_data.get('nextHopVpnTunnel')
138
139  @property
140  def next_hop_hub(self) -> Optional[str]:
141    return self._resource_data.get('nextHopHub')
142
143  @property
144  def priority(self) -> int:
145    return self._resource_data['priority']
146
147  def check_route_match(self, ip1: IPAddrOrNet, ip2: str) -> bool:
148    ip2_list = [ipaddress.ip_network(ip2)]
149    if _ip_match(ip1, ip2_list, 'allow'):
150      return True
151    return False

A VPC Route.

Route(project_id, resource_data)
89  def __init__(self, project_id, resource_data):
90    super().__init__(project_id=project_id)
91    self._resource_data = resource_data
full_path: str
 93  @property
 94  def full_path(self) -> str:
 95    result = re.match(r'https://www.googleapis.com/compute/v1/(.*)',
 96                      self.self_link)
 97    if result:
 98      return result.group(1)
 99    else:
100      return f'>> {self.self_link}'

Returns the full path of this resource.

Example: 'projects/gcpdiag-gke-1-9b90/zones/europe-west4-a/clusters/gke1'

short_path: str
102  @property
103  def short_path(self) -> str:
104    path = self.project_id + '/' + self.name
105    return path

Returns the short name for this resource.

Note that it isn't clear from this name what kind of resource it is.

Example: 'gke1'

name: str
107  @property
108  def name(self) -> str:
109    return self._resource_data['name']
network: str
115  @property
116  def network(self) -> str:
117    return self._resource_data['network']
tags: List[str]
119  @property
120  def tags(self) -> List[str]:
121    if 'tags' in self._resource_data:
122      return self._resource_data['tags']
123    return []
dest_range: str
125  @property
126  def dest_range(self) -> str:
127    return self._resource_data['destRange']
next_hop_gateway: Optional[str]
129  @property
130  def next_hop_gateway(self) -> Optional[str]:
131    if 'nextHopGateway' in self._resource_data:
132      return self._resource_data['nextHopGateway']
133    return None
next_hop_vpn_tunnel: Optional[str]
135  @property
136  def next_hop_vpn_tunnel(self) -> Optional[str]:
137    return self._resource_data.get('nextHopVpnTunnel')
next_hop_hub: Optional[str]
139  @property
140  def next_hop_hub(self) -> Optional[str]:
141    return self._resource_data.get('nextHopHub')
priority: int
143  @property
144  def priority(self) -> int:
145    return self._resource_data['priority']
def check_route_match( self, ip1: Union[ipaddress.IPv4Address, ipaddress.IPv6Address, ipaddress.IPv4Network, ipaddress.IPv6Network], ip2: str) -> bool:
147  def check_route_match(self, ip1: IPAddrOrNet, ip2: str) -> bool:
148    ip2_list = [ipaddress.ip_network(ip2)]
149    if _ip_match(ip1, ip2_list, 'allow'):
150      return True
151    return False
Inherited Members
gcpdiag.models.Resource
project_id
class ManagedZone(gcpdiag.models.Resource):
154class ManagedZone(models.Resource):
155  """
156  Represent a DNS zone (public or private
157  https://cloud.google.com/dns/docs/reference/v1beta2/managedZones
158  """
159  _resource_data: dict
160
161  def __init__(self, project_id, resource_data):
162    super().__init__(project_id=project_id)
163    self._resource_data = resource_data
164
165  @property
166  def cloud_logging_config(self) -> bool:
167    return self._resource_data['cloudLoggingConfig'].get('enableLogging', False)
168
169  @property
170  def is_public(self) -> bool:
171    return self._resource_data['visibility'] == 'public'
172
173  @property
174  def vpc_attached(self) -> bool:
175    if 'privateVisibilityConfig' not in self._resource_data:
176      self._resource_data['privateVisibilityConfig'] = {}
177
178    return (self._resource_data['privateVisibilityConfig'].get(
179        'networks', False) or
180            self._resource_data['privateVisibilityConfig'].get(
181                'gkeClusters', False))
182
183  @property
184  def dnssec_config_state(self) -> bool:
185    if 'dnssecConfig' not in self._resource_data:
186      self._resource_data['dnssecConfig'] = {}
187
188    return self._resource_data['dnssecConfig'].get('state', False)
189
190  @property
191  def name(self) -> str:
192    return self._resource_data['name']
193
194  @property
195  def full_path(self) -> str:
196    result = re.match(r'https://dns.googleapis.com/dns/v1beta2/(.*)',
197                      self.self_link)
198    if result:
199      return result.group(1)
200    else:
201      return f'>> {self.self_link}'
202
203  @property
204  def short_path(self) -> str:
205    path = self.project_id + '/' + self.name
206    return path
207
208  @property
209  def self_link(self) -> str:
210    return self._resource_data.get('selfLink', '')
ManagedZone(project_id, resource_data)
161  def __init__(self, project_id, resource_data):
162    super().__init__(project_id=project_id)
163    self._resource_data = resource_data
cloud_logging_config: bool
165  @property
166  def cloud_logging_config(self) -> bool:
167    return self._resource_data['cloudLoggingConfig'].get('enableLogging', False)
is_public: bool
169  @property
170  def is_public(self) -> bool:
171    return self._resource_data['visibility'] == 'public'
vpc_attached: bool
173  @property
174  def vpc_attached(self) -> bool:
175    if 'privateVisibilityConfig' not in self._resource_data:
176      self._resource_data['privateVisibilityConfig'] = {}
177
178    return (self._resource_data['privateVisibilityConfig'].get(
179        'networks', False) or
180            self._resource_data['privateVisibilityConfig'].get(
181                'gkeClusters', False))
dnssec_config_state: bool
183  @property
184  def dnssec_config_state(self) -> bool:
185    if 'dnssecConfig' not in self._resource_data:
186      self._resource_data['dnssecConfig'] = {}
187
188    return self._resource_data['dnssecConfig'].get('state', False)
name: str
190  @property
191  def name(self) -> str:
192    return self._resource_data['name']
full_path: str
194  @property
195  def full_path(self) -> str:
196    result = re.match(r'https://dns.googleapis.com/dns/v1beta2/(.*)',
197                      self.self_link)
198    if result:
199      return result.group(1)
200    else:
201      return f'>> {self.self_link}'

Returns the full path of this resource.

Example: 'projects/gcpdiag-gke-1-9b90/zones/europe-west4-a/clusters/gke1'

short_path: str
203  @property
204  def short_path(self) -> str:
205    path = self.project_id + '/' + self.name
206    return path

Returns the short name for this resource.

Note that it isn't clear from this name what kind of resource it is.

Example: 'gke1'

Inherited Members
gcpdiag.models.Resource
project_id
class Router(gcpdiag.models.Resource):
213class Router(models.Resource):
214  """A VPC Router."""
215  _resource_data: dict
216
217  def __init__(self, project_id, resource_data):
218    super().__init__(project_id=project_id)
219    self._resource_data = resource_data
220    self._nats = None
221
222  @property
223  def full_path(self) -> str:
224    result = re.match(r'https://www.googleapis.com/compute/v1/(.*)',
225                      self.self_link)
226    if result:
227      return result.group(1)
228    else:
229      return f'>> {self.self_link}'
230
231  @property
232  def short_path(self) -> str:
233    path = self.project_id + '/' + self.name
234    return path
235
236  @property
237  def name(self) -> str:
238    return self._resource_data['name']
239
240  @property
241  def self_link(self) -> str:
242    return self._resource_data.get('selfLink', '')
243
244  def subnet_has_nat(self, subnetwork):
245    if not self._resource_data.get('nats', []):
246      return False
247    for n in self._resource_data.get('nats', []):
248      if n['sourceSubnetworkIpRangesToNat'] == 'LIST_OF_SUBNETWORKS':
249        # Cloud NAT configure for specific subnets
250        if 'subnetworks' in n and subnetwork.self_link in [
251            s['name'] for s in n['subnetworks']
252        ]:
253          return True
254      else:
255        # Cloud NAT configured for all subnets
256        return True
257    return False

A VPC Router.

Router(project_id, resource_data)
217  def __init__(self, project_id, resource_data):
218    super().__init__(project_id=project_id)
219    self._resource_data = resource_data
220    self._nats = None
full_path: str
222  @property
223  def full_path(self) -> str:
224    result = re.match(r'https://www.googleapis.com/compute/v1/(.*)',
225                      self.self_link)
226    if result:
227      return result.group(1)
228    else:
229      return f'>> {self.self_link}'

Returns the full path of this resource.

Example: 'projects/gcpdiag-gke-1-9b90/zones/europe-west4-a/clusters/gke1'

short_path: str
231  @property
232  def short_path(self) -> str:
233    path = self.project_id + '/' + self.name
234    return path

Returns the short name for this resource.

Note that it isn't clear from this name what kind of resource it is.

Example: 'gke1'

name: str
236  @property
237  def name(self) -> str:
238    return self._resource_data['name']
def subnet_has_nat(self, subnetwork):
244  def subnet_has_nat(self, subnetwork):
245    if not self._resource_data.get('nats', []):
246      return False
247    for n in self._resource_data.get('nats', []):
248      if n['sourceSubnetworkIpRangesToNat'] == 'LIST_OF_SUBNETWORKS':
249        # Cloud NAT configure for specific subnets
250        if 'subnetworks' in n and subnetwork.self_link in [
251            s['name'] for s in n['subnetworks']
252        ]:
253          return True
254      else:
255        # Cloud NAT configured for all subnets
256        return True
257    return False
Inherited Members
gcpdiag.models.Resource
project_id
@dataclasses.dataclass
class Peering:
260@dataclasses.dataclass
261class Peering:
262  """VPC Peerings"""
263  name: str
264  url: str
265  state: str
266  exports_custom_routes: bool
267  imports_custom_routes: bool
268  auto_creates_routes: bool
269
270  def __str__(self):
271    return self.name

VPC Peerings

Peering( name: str, url: str, state: str, exports_custom_routes: bool, imports_custom_routes: bool, auto_creates_routes: bool)
name: str
url: str
state: str
exports_custom_routes: bool
imports_custom_routes: bool
auto_creates_routes: bool
class Network(gcpdiag.models.Resource):
274class Network(models.Resource):
275  """A VPC network."""
276  _resource_data: dict
277  _subnetworks: Optional[Dict[str, Subnetwork]]
278
279  def __init__(self, project_id, resource_data):
280    super().__init__(project_id=project_id)
281    self._resource_data = resource_data
282    self._subnetworks = None
283
284  @property
285  def full_path(self) -> str:
286    result = re.match(r'https://www.googleapis.com/compute/v1/(.*)',
287                      self.self_link)
288    if result:
289      return result.group(1)
290    else:
291      return f'>> {self.self_link}'
292
293  @property
294  def short_path(self) -> str:
295    path = self.project_id + '/' + self.name
296    return path
297
298  @property
299  def name(self) -> str:
300    return self._resource_data['name']
301
302  @property
303  def self_link(self) -> str:
304    return self._resource_data.get('selfLink', '')
305
306  @property
307  def firewall(self) -> 'EffectiveFirewalls':
308    return _get_effective_firewalls(self)
309
310  @property
311  def subnetworks(self) -> Dict[str, Subnetwork]:
312    return _batch_get_subnetworks(
313        self._project_id, frozenset(self._resource_data.get('subnetworks', [])))
314
315  @property
316  def peerings(self) -> List[Peering]:
317    return [
318        Peering(peer['name'], peer['network'], peer['state'],
319                peer['exportCustomRoutes'], peer['importCustomRoutes'],
320                peer['autoCreateRoutes'])
321        for peer in self._resource_data.get('peerings', [])
322    ]

A VPC network.

Network(project_id, resource_data)
279  def __init__(self, project_id, resource_data):
280    super().__init__(project_id=project_id)
281    self._resource_data = resource_data
282    self._subnetworks = None
full_path: str
284  @property
285  def full_path(self) -> str:
286    result = re.match(r'https://www.googleapis.com/compute/v1/(.*)',
287                      self.self_link)
288    if result:
289      return result.group(1)
290    else:
291      return f'>> {self.self_link}'

Returns the full path of this resource.

Example: 'projects/gcpdiag-gke-1-9b90/zones/europe-west4-a/clusters/gke1'

short_path: str
293  @property
294  def short_path(self) -> str:
295    path = self.project_id + '/' + self.name
296    return path

Returns the short name for this resource.

Note that it isn't clear from this name what kind of resource it is.

Example: 'gke1'

name: str
298  @property
299  def name(self) -> str:
300    return self._resource_data['name']
firewall: EffectiveFirewalls
306  @property
307  def firewall(self) -> 'EffectiveFirewalls':
308    return _get_effective_firewalls(self)
subnetworks: Dict[str, Subnetwork]
310  @property
311  def subnetworks(self) -> Dict[str, Subnetwork]:
312    return _batch_get_subnetworks(
313        self._project_id, frozenset(self._resource_data.get('subnetworks', [])))
peerings: List[Peering]
315  @property
316  def peerings(self) -> List[Peering]:
317    return [
318        Peering(peer['name'], peer['network'], peer['state'],
319                peer['exportCustomRoutes'], peer['importCustomRoutes'],
320                peer['autoCreateRoutes'])
321        for peer in self._resource_data.get('peerings', [])
322    ]
Inherited Members
gcpdiag.models.Resource
project_id
@dataclasses.dataclass
class FirewallCheckResult:
430@dataclasses.dataclass
431class FirewallCheckResult:
432  """The result of a firewall connectivity check."""
433  action: str
434  firewall_policy_name: Optional[str] = None
435  firewall_policy_rule_description: Optional[str] = None
436  vpc_firewall_rule_id: Optional[str] = None
437  vpc_firewall_rule_name: Optional[str] = None
438
439  def __str__(self):
440    return self.action
441
442  @property
443  def matched_by_str(self):
444    if self.firewall_policy_name:
445      if self.firewall_policy_rule_description:
446        return f'policy: {self.firewall_policy_name}, rule: {self.firewall_policy_rule_description}'
447      else:
448        return f'policy: {self.firewall_policy_name}'
449    elif self.vpc_firewall_rule_name:
450      return f'vpc firewall rule: {self.vpc_firewall_rule_name}'

The result of a firewall connectivity check.

FirewallCheckResult( action: str, firewall_policy_name: Optional[str] = None, firewall_policy_rule_description: Optional[str] = None, vpc_firewall_rule_id: Optional[str] = None, vpc_firewall_rule_name: Optional[str] = None)
action: str
firewall_policy_name: Optional[str] = None
firewall_policy_rule_description: Optional[str] = None
vpc_firewall_rule_id: Optional[str] = None
vpc_firewall_rule_name: Optional[str] = None
matched_by_str
442  @property
443  def matched_by_str(self):
444    if self.firewall_policy_name:
445      if self.firewall_policy_rule_description:
446        return f'policy: {self.firewall_policy_name}, rule: {self.firewall_policy_rule_description}'
447      else:
448        return f'policy: {self.firewall_policy_name}'
449    elif self.vpc_firewall_rule_name:
450      return f'vpc firewall rule: {self.vpc_firewall_rule_name}'
class FirewallRuleNotFoundError(builtins.Exception):
453class FirewallRuleNotFoundError(Exception):
454  rule_name: str
455
456  def __init__(self, name, disabled=False):
457    # Call the base class constructor with the parameters it needs
458    super().__init__(f'firewall rule not found: {name}')
459    self.rule_name = name
460    self.disabled = disabled

Common base class for all non-exit exceptions.

FirewallRuleNotFoundError(name, disabled=False)
456  def __init__(self, name, disabled=False):
457    # Call the base class constructor with the parameters it needs
458    super().__init__(f'firewall rule not found: {name}')
459    self.rule_name = name
460    self.disabled = disabled
rule_name: str
disabled
Inherited Members
builtins.BaseException
with_traceback
args
class VpcFirewallRule:
463class VpcFirewallRule:
464  """Represents firewall rule"""
465
466  def __init__(self, resource_data):
467    self._resource_data = resource_data
468
469  @property
470  def name(self) -> str:
471    return self._resource_data['name']
472
473  @property
474  def source_ranges(self) -> List[ipaddress.IPv4Network]:
475    return self._resource_data['sourceRanges']
476
477  @property
478  def target_tags(self) -> set:
479    return self._resource_data['targetTags']
480
481  @property
482  def allowed(self) -> List[dict]:
483    return self._resource_data['allowed']
484
485  def is_enabled(self) -> bool:
486    return not self._resource_data['disabled']

Represents firewall rule

VpcFirewallRule(resource_data)
466  def __init__(self, resource_data):
467    self._resource_data = resource_data
name: str
469  @property
470  def name(self) -> str:
471    return self._resource_data['name']
source_ranges: List[ipaddress.IPv4Network]
473  @property
474  def source_ranges(self) -> List[ipaddress.IPv4Network]:
475    return self._resource_data['sourceRanges']
target_tags: set
477  @property
478  def target_tags(self) -> set:
479    return self._resource_data['targetTags']
allowed: List[dict]
481  @property
482  def allowed(self) -> List[dict]:
483    return self._resource_data['allowed']
def is_enabled(self) -> bool:
485  def is_enabled(self) -> bool:
486    return not self._resource_data['disabled']
class EffectiveFirewalls:
834class EffectiveFirewalls:
835  """Effective firewall rules for a VPC network or Instance.
836
837  Includes org/folder firewall policies)."""
838  _resource_data: dict
839  _policies: List[_FirewallPolicy]
840  _vpc_firewall: _VpcFirewall
841
842  def __init__(self, resource_data):
843    self._resource_data = resource_data
844    self._policies = []
845    if 'firewallPolicys' in resource_data:
846      for policy in resource_data['firewallPolicys']:
847        self._policies.append(_FirewallPolicy(policy))
848    self._vpc_firewall = _VpcFirewall(resource_data.get('firewalls', {}))
849
850  def check_connectivity_ingress(
851      self,  #
852      *,
853      src_ip: IPAddrOrNet,
854      ip_protocol: str,
855      port: Optional[int] = None,
856      source_service_account: Optional[str] = None,
857      source_tags: Optional[List[str]] = None,
858      target_service_account: Optional[str] = None,
859      target_tags: Optional[List[str]] = None) -> FirewallCheckResult:
860
861    if ip_protocol != 'ICMP' and port is None:
862      raise ValueError('TCP and UDP must have port numbers')
863
864    # Firewall policies (organization, folders)
865    for p in self._policies:
866      result = p.check_connectivity_ingress(
867          src_ip=src_ip,
868          ip_protocol=ip_protocol,
869          port=port,
870          #target_network=self._network,
871          target_service_account=target_service_account)
872      if result.action != 'goto_next':
873        return result
874
875    # VPC firewall rules
876    return self._vpc_firewall.check_connectivity_ingress(
877        src_ip=src_ip,
878        ip_protocol=ip_protocol,
879        port=port,
880        source_service_account=source_service_account,
881        source_tags=source_tags,
882        target_service_account=target_service_account,
883        target_tags=target_tags)
884
885  def check_connectivity_egress(
886      self,  #
887      *,
888      src_ip: IPAddrOrNet,
889      ip_protocol: str,
890      port: Optional[int] = None,
891      source_service_account: Optional[str] = None,
892      source_tags: Optional[List[str]] = None,
893      target_service_account: Optional[str] = None,
894      target_tags: Optional[List[str]] = None) -> FirewallCheckResult:
895
896    if ip_protocol != 'ICMP' and port is None:
897      raise ValueError('TCP and UDP must have port numbers')
898
899    # Firewall policies (organization, folders)
900    for p in self._policies:
901      result = p.check_connectivity_egress(
902          src_ip=src_ip,
903          ip_protocol=ip_protocol,
904          port=port,
905          #target_network=self._network,
906          target_service_account=target_service_account)
907      if result.action != 'goto_next':
908        return result
909
910    # VPC firewall rules
911    return self._vpc_firewall.check_connectivity_egress(
912        src_ip=src_ip,
913        ip_protocol=ip_protocol,
914        port=port,
915        source_service_account=source_service_account,
916        source_tags=source_tags,
917        target_service_account=target_service_account,
918        target_tags=target_tags)
919
920  def get_vpc_ingress_rules(
921      self,
922      name: Optional[str] = None,
923      name_pattern: Optional[re.Pattern] = None,
924      target_tags: Optional[List[str]] = None) -> List[VpcFirewallRule]:
925    """Retrieve the list of ingress firewall rules matching name or name pattern and target tags.
926
927    Args:
928        name (Optional[str], optional): firewall rune name. Defaults to None.
929        name_pattern (Optional[re.Pattern], optional): firewall rule name pattern. Defaults to None.
930        target_tags (Optional[List[str]], optional): firewall target tags
931          (if not specified any tag will match). Defaults to None.
932
933    Returns:
934        List[VpcFirewallRule]: List of ingress firewall rules
935    """
936    rules = self._vpc_firewall.get_vpc_ingress_rules(name, name_pattern,
937                                                     target_tags)
938    return rules
939
940  def get_vpc_egress_rules(
941      self,
942      name: Optional[str] = None,
943      name_pattern: Optional[re.Pattern] = None,
944      target_tags: Optional[List[str]] = None) -> List[VpcFirewallRule]:
945    """Retrieve the list of egress firewall rules matching name or name pattern and target tags.
946
947    Args:
948        name (Optional[str], optional): firewall rune name. Defaults to None.
949        name_pattern (Optional[re.Pattern], optional): firewall rule name pattern. Defaults to None.
950        target_tags (Optional[List[str]], optional): firewall target tags
951          (if not specified any tag will match). Defaults to None.
952
953    Returns:
954        List[VpcFirewallRule]: List of egress firewall rules
955    """
956    rules = self._vpc_firewall.get_vpc_egress_rules(name, name_pattern,
957                                                    target_tags)
958    return rules
959
960  def verify_ingress_rule_exists(self, name: str):
961    """Verify that a certain VPC rule exists. This is useful to verify
962    whether maybe a permission was missing on a shared VPC and an
963    automatic rule couldn't be created."""
964    return self._vpc_firewall.verify_ingress_rule_exists(name)
965
966  def verify_egress_rule_exists(self, name: str):
967    """Verify that a certain VPC rule exists. This is useful to verify
968    whether maybe a permission was missing on a shared VPC and an
969    automatic rule couldn't be created."""
970    return self._vpc_firewall.verify_egress_rule_exists(name)

Effective firewall rules for a VPC network or Instance.

Includes org/folder firewall policies).

EffectiveFirewalls(resource_data)
842  def __init__(self, resource_data):
843    self._resource_data = resource_data
844    self._policies = []
845    if 'firewallPolicys' in resource_data:
846      for policy in resource_data['firewallPolicys']:
847        self._policies.append(_FirewallPolicy(policy))
848    self._vpc_firewall = _VpcFirewall(resource_data.get('firewalls', {}))
def check_connectivity_ingress( self, *, src_ip: Union[ipaddress.IPv4Address, ipaddress.IPv6Address, ipaddress.IPv4Network, ipaddress.IPv6Network], ip_protocol: str, port: Optional[int] = None, source_service_account: Optional[str] = None, source_tags: Optional[List[str]] = None, target_service_account: Optional[str] = None, target_tags: Optional[List[str]] = None) -> FirewallCheckResult:
850  def check_connectivity_ingress(
851      self,  #
852      *,
853      src_ip: IPAddrOrNet,
854      ip_protocol: str,
855      port: Optional[int] = None,
856      source_service_account: Optional[str] = None,
857      source_tags: Optional[List[str]] = None,
858      target_service_account: Optional[str] = None,
859      target_tags: Optional[List[str]] = None) -> FirewallCheckResult:
860
861    if ip_protocol != 'ICMP' and port is None:
862      raise ValueError('TCP and UDP must have port numbers')
863
864    # Firewall policies (organization, folders)
865    for p in self._policies:
866      result = p.check_connectivity_ingress(
867          src_ip=src_ip,
868          ip_protocol=ip_protocol,
869          port=port,
870          #target_network=self._network,
871          target_service_account=target_service_account)
872      if result.action != 'goto_next':
873        return result
874
875    # VPC firewall rules
876    return self._vpc_firewall.check_connectivity_ingress(
877        src_ip=src_ip,
878        ip_protocol=ip_protocol,
879        port=port,
880        source_service_account=source_service_account,
881        source_tags=source_tags,
882        target_service_account=target_service_account,
883        target_tags=target_tags)
def check_connectivity_egress( self, *, src_ip: Union[ipaddress.IPv4Address, ipaddress.IPv6Address, ipaddress.IPv4Network, ipaddress.IPv6Network], ip_protocol: str, port: Optional[int] = None, source_service_account: Optional[str] = None, source_tags: Optional[List[str]] = None, target_service_account: Optional[str] = None, target_tags: Optional[List[str]] = None) -> FirewallCheckResult:
885  def check_connectivity_egress(
886      self,  #
887      *,
888      src_ip: IPAddrOrNet,
889      ip_protocol: str,
890      port: Optional[int] = None,
891      source_service_account: Optional[str] = None,
892      source_tags: Optional[List[str]] = None,
893      target_service_account: Optional[str] = None,
894      target_tags: Optional[List[str]] = None) -> FirewallCheckResult:
895
896    if ip_protocol != 'ICMP' and port is None:
897      raise ValueError('TCP and UDP must have port numbers')
898
899    # Firewall policies (organization, folders)
900    for p in self._policies:
901      result = p.check_connectivity_egress(
902          src_ip=src_ip,
903          ip_protocol=ip_protocol,
904          port=port,
905          #target_network=self._network,
906          target_service_account=target_service_account)
907      if result.action != 'goto_next':
908        return result
909
910    # VPC firewall rules
911    return self._vpc_firewall.check_connectivity_egress(
912        src_ip=src_ip,
913        ip_protocol=ip_protocol,
914        port=port,
915        source_service_account=source_service_account,
916        source_tags=source_tags,
917        target_service_account=target_service_account,
918        target_tags=target_tags)
def get_vpc_ingress_rules( self, name: Optional[str] = None, name_pattern: Optional[re.Pattern] = None, target_tags: Optional[List[str]] = None) -> List[VpcFirewallRule]:
920  def get_vpc_ingress_rules(
921      self,
922      name: Optional[str] = None,
923      name_pattern: Optional[re.Pattern] = None,
924      target_tags: Optional[List[str]] = None) -> List[VpcFirewallRule]:
925    """Retrieve the list of ingress firewall rules matching name or name pattern and target tags.
926
927    Args:
928        name (Optional[str], optional): firewall rune name. Defaults to None.
929        name_pattern (Optional[re.Pattern], optional): firewall rule name pattern. Defaults to None.
930        target_tags (Optional[List[str]], optional): firewall target tags
931          (if not specified any tag will match). Defaults to None.
932
933    Returns:
934        List[VpcFirewallRule]: List of ingress firewall rules
935    """
936    rules = self._vpc_firewall.get_vpc_ingress_rules(name, name_pattern,
937                                                     target_tags)
938    return rules

Retrieve the list of ingress firewall rules matching name or name pattern and target tags.

Arguments:
  • name (Optional[str], optional): firewall rune name. Defaults to None.
  • name_pattern (Optional[re.Pattern], optional): firewall rule name pattern. Defaults to None.
  • target_tags (Optional[List[str]], optional): firewall target tags (if not specified any tag will match). Defaults to None.
Returns:

List[VpcFirewallRule]: List of ingress firewall rules

def get_vpc_egress_rules( self, name: Optional[str] = None, name_pattern: Optional[re.Pattern] = None, target_tags: Optional[List[str]] = None) -> List[VpcFirewallRule]:
940  def get_vpc_egress_rules(
941      self,
942      name: Optional[str] = None,
943      name_pattern: Optional[re.Pattern] = None,
944      target_tags: Optional[List[str]] = None) -> List[VpcFirewallRule]:
945    """Retrieve the list of egress firewall rules matching name or name pattern and target tags.
946
947    Args:
948        name (Optional[str], optional): firewall rune name. Defaults to None.
949        name_pattern (Optional[re.Pattern], optional): firewall rule name pattern. Defaults to None.
950        target_tags (Optional[List[str]], optional): firewall target tags
951          (if not specified any tag will match). Defaults to None.
952
953    Returns:
954        List[VpcFirewallRule]: List of egress firewall rules
955    """
956    rules = self._vpc_firewall.get_vpc_egress_rules(name, name_pattern,
957                                                    target_tags)
958    return rules

Retrieve the list of egress firewall rules matching name or name pattern and target tags.

Arguments:
  • name (Optional[str], optional): firewall rune name. Defaults to None.
  • name_pattern (Optional[re.Pattern], optional): firewall rule name pattern. Defaults to None.
  • target_tags (Optional[List[str]], optional): firewall target tags (if not specified any tag will match). Defaults to None.
Returns:

List[VpcFirewallRule]: List of egress firewall rules

def verify_ingress_rule_exists(self, name: str):
960  def verify_ingress_rule_exists(self, name: str):
961    """Verify that a certain VPC rule exists. This is useful to verify
962    whether maybe a permission was missing on a shared VPC and an
963    automatic rule couldn't be created."""
964    return self._vpc_firewall.verify_ingress_rule_exists(name)

Verify that a certain VPC rule exists. This is useful to verify whether maybe a permission was missing on a shared VPC and an automatic rule couldn't be created.

def verify_egress_rule_exists(self, name: str):
966  def verify_egress_rule_exists(self, name: str):
967    """Verify that a certain VPC rule exists. This is useful to verify
968    whether maybe a permission was missing on a shared VPC and an
969    automatic rule couldn't be created."""
970    return self._vpc_firewall.verify_egress_rule_exists(name)

Verify that a certain VPC rule exists. This is useful to verify whether maybe a permission was missing on a shared VPC and an automatic rule couldn't be created.

class VPCEffectiveFirewalls(EffectiveFirewalls):
973class VPCEffectiveFirewalls(EffectiveFirewalls):
974  """Effective firewall rules for a VPC network.
975
976  Includes org/folder firewall policies)."""
977  _network: Network
978
979  def __init__(self, network, resource_data):
980    super().__init__(resource_data)
981    self._network = network

Effective firewall rules for a VPC network.

Includes org/folder firewall policies).

VPCEffectiveFirewalls(network, resource_data)
979  def __init__(self, network, resource_data):
980    super().__init__(resource_data)
981    self._network = network
@caching.cached_api_call(in_memory=True)
def get_network(project_id: str, network_name: str) -> Network:
993@caching.cached_api_call(in_memory=True)
994def get_network(project_id: str, network_name: str) -> Network:
995  logging.info('fetching network: %s/%s', project_id, network_name)
996  compute = apis.get_api('compute', 'v1', project_id)
997  request = compute.networks().get(project=project_id, network=network_name)
998  response = request.execute(num_retries=config.API_RETRIES)
999  return Network(project_id, response)
def get_subnetwork_from_url(url: str) -> Subnetwork:
1002def get_subnetwork_from_url(url: str) -> Subnetwork:
1003  """Returns Subnetwork object given subnetwork url"""
1004  m = re.match((r'https://www.googleapis.com/compute/v1/projects/'
1005                r'([^/]+)/regions/([^/]+)/subnetworks/([^/]+)$'), url)
1006  if not m:
1007    raise ValueError(f"can't parse network url: {url}")
1008  (project_id, region, subnetwork_name) = (m.group(1), m.group(2), m.group(3))
1009  return get_subnetwork(project_id, region, subnetwork_name)

Returns Subnetwork object given subnetwork url

def get_network_from_url(url: str) -> Network:
1012def get_network_from_url(url: str) -> Network:
1013  m = re.match(
1014      r'https://www.googleapis.com/compute/v1/projects/([^/]+)/global/networks/([^/]+)',
1015      url)
1016  if not m:
1017    raise ValueError(f"can't parse network url: {url}")
1018  (project_id, network_name) = (m.group(1), m.group(2))
1019  return get_network(project_id, network_name)
@caching.cached_api_call(in_memory=True)
def get_networks(project_id: str) -> List[Network]:
1022@caching.cached_api_call(in_memory=True)
1023def get_networks(project_id: str) -> List[Network]:
1024  logging.info('fetching network: %s', project_id)
1025  compute = apis.get_api('compute', 'v1', project_id)
1026  request = compute.networks().list(project=project_id)
1027  response = request.execute(num_retries=config.API_RETRIES)
1028  return [Network(project_id, item) for item in response.get('items', [])]
@caching.cached_api_call(in_memory=True)
def get_subnetwork( project_id: str, region: str, subnetwork_name: str) -> Subnetwork:
1031@caching.cached_api_call(in_memory=True)
1032def get_subnetwork(project_id: str, region: str,
1033                   subnetwork_name: str) -> Subnetwork:
1034  logging.info('fetching network: %s/%s', project_id, subnetwork_name)
1035  compute = apis.get_api('compute', 'v1', project_id)
1036  request = compute.subnetworks().get(project=project_id,
1037                                      region=region,
1038                                      subnetwork=subnetwork_name)
1039  response = request.execute(num_retries=config.API_RETRIES)
1040  return Subnetwork(project_id, response)
@caching.cached_api_call(in_memory=True)
def get_routes(project_id: str) -> List[Route]:
1069@caching.cached_api_call(in_memory=True)
1070def get_routes(project_id: str) -> List[Route]:
1071  logging.info('fetching routes: %s', project_id)
1072  compute = apis.get_api('compute', 'v1', project_id)
1073  request = compute.routes().list(project=project_id)
1074  response = request.execute(num_retries=config.API_RETRIES)
1075  return [Route(project_id, item) for item in response.get('items', [])]
@caching.cached_api_call(in_memory=True)
def get_zones(project_id: str) -> List[ManagedZone]:
1078@caching.cached_api_call(in_memory=True)
1079def get_zones(project_id: str) -> List[ManagedZone]:
1080  logging.info('fetching DNS zones: %s', project_id)
1081  dns = apis.get_api('dns', 'v1beta2', project_id)
1082  request = dns.managedZones().list(project=project_id)
1083  response = request.execute(num_retries=config.API_RETRIES)
1084  zones = []
1085  for zone in response.get('managedZones', []):
1086    request2 = dns.managedZones().get(project=project_id,
1087                                      managedZone=zone['name'])
1088    response2 = request2.execute(num_retries=config.API_RETRIES)
1089    zones.append(ManagedZone(project_id, response2))
1090  return zones
@caching.cached_api_call(in_memory=True)
def get_router(project_id: str, region: str, network) -> Router:
1093@caching.cached_api_call(in_memory=True)
1094def get_router(project_id: str, region: str, network) -> Router:
1095  logging.info('fetching routers: %s/%s', project_id, region)
1096  compute = apis.get_api('compute', 'v1', project_id)
1097  request = compute.routers().list(project=project_id,
1098                                   region=region,
1099                                   filter=f'network="{network.self_link}"')
1100  response = request.execute(num_retries=config.API_RETRIES)
1101  return Router(project_id, next(iter(response.get('items', [{}]))))
class VPCSubnetworkIAMPolicy(gcpdiag.queries.iam.BaseIAMPolicy):
1104class VPCSubnetworkIAMPolicy(iam.BaseIAMPolicy):
1105
1106  def _is_resource_permission(self, permission):
1107    return True

Common class for IAM policies

@caching.cached_api_call(in_memory=True)
def get_subnetwork_iam_policy( project_id: str, region: str, subnetwork_name: str) -> VPCSubnetworkIAMPolicy:
1110@caching.cached_api_call(in_memory=True)
1111def get_subnetwork_iam_policy(project_id: str, region: str,
1112                              subnetwork_name: str) -> VPCSubnetworkIAMPolicy:
1113  resource_name = (f'projects/{project_id}/regions/{region}/'
1114                   f'subnetworks/{subnetwork_name}')
1115
1116  compute = apis.get_api('compute', 'v1', project_id)
1117  request = compute.subnetworks().getIamPolicy(project=project_id,
1118                                               region=region,
1119                                               resource=subnetwork_name)
1120
1121  return iam.fetch_iam_policy(request, VPCSubnetworkIAMPolicy, project_id,
1122                              resource_name)
class Address(gcpdiag.models.Resource):
1125class Address(models.Resource):
1126  """IP Addresses."""
1127  _resource_data: dict
1128
1129  def __init__(self, project_id, resource_data):
1130    super().__init__(project_id=project_id)
1131    self._resource_data = resource_data
1132
1133  @property
1134  def full_path(self) -> str:
1135    result = re.match(r'https://www.googleapis.com/compute/v1/(.*)',
1136                      self.self_link)
1137    if result:
1138      return result.group(1)
1139    else:
1140      return f'>> {self.self_link}'
1141
1142  @property
1143  def short_path(self) -> str:
1144    path = self.project_id + '/' + self.name
1145    return path
1146
1147  @property
1148  def name(self) -> str:
1149    return self._resource_data['name']
1150
1151  @property
1152  def self_link(self) -> str:
1153    return self._resource_data.get('selfLink', '')
1154
1155  @property
1156  def subnetwork(self) -> str:
1157    return self._resource_data['subnetwork']
1158
1159  @property
1160  def status(self) -> str:
1161    return self._resource_data['status']

IP Addresses.

Address(project_id, resource_data)
1129  def __init__(self, project_id, resource_data):
1130    super().__init__(project_id=project_id)
1131    self._resource_data = resource_data
full_path: str
1133  @property
1134  def full_path(self) -> str:
1135    result = re.match(r'https://www.googleapis.com/compute/v1/(.*)',
1136                      self.self_link)
1137    if result:
1138      return result.group(1)
1139    else:
1140      return f'>> {self.self_link}'

Returns the full path of this resource.

Example: 'projects/gcpdiag-gke-1-9b90/zones/europe-west4-a/clusters/gke1'

short_path: str
1142  @property
1143  def short_path(self) -> str:
1144    path = self.project_id + '/' + self.name
1145    return path

Returns the short name for this resource.

Note that it isn't clear from this name what kind of resource it is.

Example: 'gke1'

name: str
1147  @property
1148  def name(self) -> str:
1149    return self._resource_data['name']
subnetwork: str
1155  @property
1156  def subnetwork(self) -> str:
1157    return self._resource_data['subnetwork']
status: str
1159  @property
1160  def status(self) -> str:
1161    return self._resource_data['status']
Inherited Members
gcpdiag.models.Resource
project_id
@caching.cached_api_call(in_memory=True)
def get_addresses(project_id: str) -> List[Address]:
1164@caching.cached_api_call(in_memory=True)
1165def get_addresses(project_id: str) -> List[Address]:
1166  logging.info('fetching addresses list: %s', project_id)
1167  compute = apis.get_api('compute', 'v1', project_id)
1168  addresses = []
1169  request = compute.addresses().aggregatedList(project=project_id)
1170  response = request.execute(num_retries=config.API_RETRIES)
1171  addresses_by_regions = response['items']
1172  for _, data_ in addresses_by_regions.items():
1173    if 'addresses' not in data_:
1174      continue
1175    addresses.extend(
1176        [Address(project_id, address) for address in data_['addresses']])
1177  return addresses