gcpdiag.queries.network
35class Subnetwork(models.Resource): 36 """A VPC subnetwork.""" 37 38 _resource_data: dict 39 _context: models.Context 40 41 def __init__(self, project_id, resource_data, context: models.Context): 42 super().__init__(project_id=project_id) 43 self._resource_data = resource_data 44 self._context = context 45 46 @property 47 def full_path(self) -> str: 48 result = re.match(r'https://www.googleapis.com/compute/v1/(.*)', 49 self.self_link) 50 if result: 51 return result.group(1) 52 else: 53 return f'>> {self.self_link}' 54 55 @property 56 def short_path(self) -> str: 57 path = self.project_id + '/' + self.name 58 return path 59 60 @property 61 def name(self) -> str: 62 return self._resource_data['name'] 63 64 @property 65 def self_link(self) -> str: 66 return self._resource_data.get('selfLink', '') 67 68 @property 69 def ip_network(self) -> IPv4NetOrIPv6Net: 70 return ipaddress.ip_network(self._resource_data['ipCidrRange']) 71 72 @property 73 def region(self) -> str: 74 # https://www.googleapis.com/compute/v1/projects/gcpdiag-gke1-aaaa/regions/europe-west4 75 m = re.match( 76 r'https://www.googleapis.com/compute/v1/projects/([^/]+)/regions/([^/]+)', 77 self._resource_data['region']) 78 if not m: 79 raise RuntimeError( 80 f"can't parse region URL: {self._resource_data['region']}") 81 return m.group(2) 82 83 def is_private_ip_google_access(self) -> bool: 84 return self._resource_data.get('privateIpGoogleAccess', False) 85 86 @property 87 def network(self): 88 return self._resource_data['network']
A VPC subnetwork.
46 @property 47 def full_path(self) -> str: 48 result = re.match(r'https://www.googleapis.com/compute/v1/(.*)', 49 self.self_link) 50 if result: 51 return result.group(1) 52 else: 53 return f'>> {self.self_link}'
Returns the full path of this resource.
Example: 'projects/gcpdiag-gke-1-9b90/zones/europe-west4-a/clusters/gke1'
55 @property 56 def short_path(self) -> str: 57 path = self.project_id + '/' + self.name 58 return path
Returns the short name for this resource.
Note that it isn't clear from this name what kind of resource it is.
Example: 'gke1'
72 @property 73 def region(self) -> str: 74 # https://www.googleapis.com/compute/v1/projects/gcpdiag-gke1-aaaa/regions/europe-west4 75 m = re.match( 76 r'https://www.googleapis.com/compute/v1/projects/([^/]+)/regions/([^/]+)', 77 self._resource_data['region']) 78 if not m: 79 raise RuntimeError( 80 f"can't parse region URL: {self._resource_data['region']}") 81 return m.group(2)
91class Route(models.Resource): 92 """A VPC Route.""" 93 94 _resource_data: dict 95 96 def __init__(self, project_id, resource_data): 97 super().__init__(project_id=project_id) 98 self._resource_data = resource_data 99 100 @property 101 def full_path(self) -> str: 102 result = re.match(r'https://www.googleapis.com/compute/v1/(.*)', 103 self.self_link) 104 if result: 105 return result.group(1) 106 else: 107 return f'>> {self.self_link}' 108 109 @property 110 def short_path(self) -> str: 111 path = self.project_id + '/' + self.name 112 return path 113 114 @property 115 def name(self) -> str: 116 return self._resource_data['name'] 117 118 @property 119 def kind(self) -> str: 120 return self._resource_data['kind'] 121 122 @property 123 def self_link(self) -> str: 124 return self._resource_data['selfLink'] 125 126 @property 127 def network(self) -> str: 128 return self._resource_data['network'] 129 130 @property 131 def tags(self) -> List[str]: 132 if 'tags' in self._resource_data: 133 return self._resource_data['tags'] 134 return [] 135 136 @property 137 def dest_range(self) -> str: 138 return self._resource_data['destRange'] 139 140 @property 141 def next_hop_gateway(self) -> Optional[str]: 142 if 'nextHopGateway' in self._resource_data: 143 return self._resource_data['nextHopGateway'] 144 return None 145 146 @property 147 def next_hop_vpn_tunnel(self) -> Optional[str]: 148 return self._resource_data.get('nextHopVpnTunnel') 149 150 @property 151 def next_hop_hub(self) -> Optional[str]: 152 return self._resource_data.get('nextHopHub') 153 154 @property 155 def priority(self) -> int: 156 return self._resource_data['priority'] 157 158 def get_next_hop(self) -> Union[Dict[str, Any], Optional[str]]: 159 hop_types = { 160 'nextHopGateway': 'nextHopGateway', 161 'nextHopVpnTunnel': 'nextHopVpnTunnel', 162 'nextHopHub': 'nextHopHub', 163 'nextHopInstance': 'nextHopInstance', 164 'nextHopAddress': 'nextHopAddress', 165 'nextHopPeering': 'nextHopPeering', 166 'nextHopIlb': 'nextHopIlb', 167 'nextHopNetwork': 'nextHopNetwork', 168 'nextHopIp': 'nextHopIp' 169 } 170 171 for hop_type, value in hop_types.items(): 172 if self._resource_data.get(hop_type): 173 return {'type': value, 'link': self._resource_data[hop_type]} 174 return None 175 176 def check_route_match(self, ip1: IPAddrOrNet, ip2: str) -> bool: 177 ip2_list = [ipaddress.ip_network(ip2)] 178 if _ip_match(ip1, ip2_list, 'allow'): 179 return True 180 return False
A VPC Route.
100 @property 101 def full_path(self) -> str: 102 result = re.match(r'https://www.googleapis.com/compute/v1/(.*)', 103 self.self_link) 104 if result: 105 return result.group(1) 106 else: 107 return f'>> {self.self_link}'
Returns the full path of this resource.
Example: 'projects/gcpdiag-gke-1-9b90/zones/europe-west4-a/clusters/gke1'
109 @property 110 def short_path(self) -> str: 111 path = self.project_id + '/' + self.name 112 return path
Returns the short name for this resource.
Note that it isn't clear from this name what kind of resource it is.
Example: 'gke1'
158 def get_next_hop(self) -> Union[Dict[str, Any], Optional[str]]: 159 hop_types = { 160 'nextHopGateway': 'nextHopGateway', 161 'nextHopVpnTunnel': 'nextHopVpnTunnel', 162 'nextHopHub': 'nextHopHub', 163 'nextHopInstance': 'nextHopInstance', 164 'nextHopAddress': 'nextHopAddress', 165 'nextHopPeering': 'nextHopPeering', 166 'nextHopIlb': 'nextHopIlb', 167 'nextHopNetwork': 'nextHopNetwork', 168 'nextHopIp': 'nextHopIp' 169 } 170 171 for hop_type, value in hop_types.items(): 172 if self._resource_data.get(hop_type): 173 return {'type': value, 'link': self._resource_data[hop_type]} 174 return None
183class ManagedZone(models.Resource): 184 """ 185 Represent a DNS zone (public or private 186 187 https://cloud.google.com/dns/docs/reference/v1beta2/managedZones 188 """ 189 _resource_data: dict 190 191 def __init__(self, project_id, resource_data): 192 super().__init__(project_id=project_id) 193 self._resource_data = resource_data 194 195 @property 196 def cloud_logging_config(self) -> bool: 197 return self._resource_data['cloudLoggingConfig'].get('enableLogging', False) 198 199 @property 200 def is_public(self) -> bool: 201 return self._resource_data['visibility'] == 'public' 202 203 @property 204 def vpc_attached(self) -> bool: 205 if 'privateVisibilityConfig' not in self._resource_data: 206 self._resource_data['privateVisibilityConfig'] = {} 207 208 return (self._resource_data['privateVisibilityConfig'].get( 209 'networks', False) or 210 self._resource_data['privateVisibilityConfig'].get( 211 'gkeClusters', False)) 212 213 @property 214 def dnssec_config_state(self) -> bool: 215 if 'dnssecConfig' not in self._resource_data: 216 self._resource_data['dnssecConfig'] = {} 217 218 return self._resource_data['dnssecConfig'].get('state', False) 219 220 @property 221 def name(self) -> str: 222 return self._resource_data['name'] 223 224 @property 225 def full_path(self) -> str: 226 result = re.match(r'https://dns.googleapis.com/dns/v1beta2/(.*)', 227 self.self_link) 228 if result: 229 return result.group(1) 230 else: 231 return f'>> {self.self_link}' 232 233 @property 234 def short_path(self) -> str: 235 path = self.project_id + '/' + self.name 236 return path 237 238 @property 239 def self_link(self) -> str: 240 return self._resource_data.get('selfLink', '')
Represent a DNS zone (public or private
https://cloud.google.com/dns/docs/reference/v1beta2/managedZones
203 @property 204 def vpc_attached(self) -> bool: 205 if 'privateVisibilityConfig' not in self._resource_data: 206 self._resource_data['privateVisibilityConfig'] = {} 207 208 return (self._resource_data['privateVisibilityConfig'].get( 209 'networks', False) or 210 self._resource_data['privateVisibilityConfig'].get( 211 'gkeClusters', False))
224 @property 225 def full_path(self) -> str: 226 result = re.match(r'https://dns.googleapis.com/dns/v1beta2/(.*)', 227 self.self_link) 228 if result: 229 return result.group(1) 230 else: 231 return f'>> {self.self_link}'
Returns the full path of this resource.
Example: 'projects/gcpdiag-gke-1-9b90/zones/europe-west4-a/clusters/gke1'
243class Router(models.Resource): 244 """A VPC Router.""" 245 246 _resource_data: dict 247 248 def __init__(self, project_id, resource_data): 249 super().__init__(project_id=project_id) 250 self._resource_data = resource_data 251 self._nats = None 252 253 @property 254 def full_path(self) -> str: 255 result = re.match(r'https://www.googleapis.com/compute/v1/(.*)', 256 self.self_link) 257 if result: 258 return result.group(1) 259 else: 260 return f'>> {self.self_link}' 261 262 @property 263 def short_path(self) -> str: 264 path = self.project_id + '/' + self.name 265 return path 266 267 @property 268 def name(self) -> str: 269 return self._resource_data['name'] 270 271 @property 272 def self_link(self) -> str: 273 return self._resource_data.get('selfLink', '') 274 275 @property 276 def network(self) -> str: 277 return self._resource_data['network'] 278 279 def get_network_name(self) -> str: 280 logging.info('inside get_network_name function') 281 if self._resource_data['network']: 282 return self._resource_data['network'].split('/')[-1] 283 return '' 284 285 @property 286 def nats(self): 287 return self._resource_data.get('nats', []) 288 289 def get_nat_ip_allocate_option(self, nat_gateway) -> str: 290 nats = self._resource_data.get('nats', []) 291 nat = [n for n in nats if n['name'] == nat_gateway] 292 return nat[0].get('natIpAllocateOption', '') 293 294 def get_enable_dynamic_port_allocation(self, nat_gateway) -> str: 295 nats = self._resource_data.get('nats', []) 296 nat = [n for n in nats if n['name'] == nat_gateway] 297 return nat[0].get('enableDynamicPortAllocation', '') 298 299 def subnet_has_nat(self, subnetwork): 300 if not self._resource_data.get('nats', []): 301 return False 302 for n in self._resource_data.get('nats', []): 303 if n['sourceSubnetworkIpRangesToNat'] == 'LIST_OF_SUBNETWORKS': 304 # Cloud NAT configure for specific subnets 305 if 'subnetworks' in n and subnetwork.self_link in [ 306 s['name'] for s in n['subnetworks'] 307 ]: 308 return True 309 else: 310 # Cloud NAT configured for all subnets 311 return True 312 return False
A VPC Router.
253 @property 254 def full_path(self) -> str: 255 result = re.match(r'https://www.googleapis.com/compute/v1/(.*)', 256 self.self_link) 257 if result: 258 return result.group(1) 259 else: 260 return f'>> {self.self_link}'
Returns the full path of this resource.
Example: 'projects/gcpdiag-gke-1-9b90/zones/europe-west4-a/clusters/gke1'
262 @property 263 def short_path(self) -> str: 264 path = self.project_id + '/' + self.name 265 return path
Returns the short name for this resource.
Note that it isn't clear from this name what kind of resource it is.
Example: 'gke1'
299 def subnet_has_nat(self, subnetwork): 300 if not self._resource_data.get('nats', []): 301 return False 302 for n in self._resource_data.get('nats', []): 303 if n['sourceSubnetworkIpRangesToNat'] == 'LIST_OF_SUBNETWORKS': 304 # Cloud NAT configure for specific subnets 305 if 'subnetworks' in n and subnetwork.self_link in [ 306 s['name'] for s in n['subnetworks'] 307 ]: 308 return True 309 else: 310 # Cloud NAT configured for all subnets 311 return True 312 return False
315class RouterStatus(models.Resource): 316 """NAT Router Status""" 317 318 _resource_data: dict 319 320 def __init__(self, project_id, resource_data): 321 super().__init__(project_id=project_id) 322 self._resource_data = resource_data 323 324 @property 325 def full_path(self) -> str: 326 result = re.match(r'https://www.googleapis.com/compute/v1/(.*)', 327 self.self_link) 328 if result: 329 return result.group(1) 330 else: 331 return f'>> {self.self_link}' 332 333 @property 334 def short_path(self) -> str: 335 path = self.project_id + '/' + self.name 336 return path 337 338 @property 339 def name(self) -> str: 340 return self._resource_data.get('name', '') 341 342 @property 343 def self_link(self) -> str: 344 return self._resource_data.get('selfLink', '') 345 346 @property 347 def min_extra_nat_ips_needed(self) -> str: 348 nat_status = self._resource_data.get('result', {}).get('natStatus', {}) 349 return nat_status[0].get('minExtraNatIpsNeeded', None) 350 351 @property 352 def num_vms_with_nat_mappings(self) -> str: 353 nat_status = self._resource_data.get('result', {}).get('natStatus', {}) 354 return nat_status[0].get('numVmEndpointsWithNatMappings', None) 355 356 @property 357 def bgp_peer_status(self) -> str: 358 bgp_peer_status = self._resource_data.get('result', 359 {}).get('bgpPeerStatus', {}) 360 return bgp_peer_status
NAT Router Status
324 @property 325 def full_path(self) -> str: 326 result = re.match(r'https://www.googleapis.com/compute/v1/(.*)', 327 self.self_link) 328 if result: 329 return result.group(1) 330 else: 331 return f'>> {self.self_link}'
Returns the full path of this resource.
Example: 'projects/gcpdiag-gke-1-9b90/zones/europe-west4-a/clusters/gke1'
333 @property 334 def short_path(self) -> str: 335 path = self.project_id + '/' + self.name 336 return path
Returns the short name for this resource.
Note that it isn't clear from this name what kind of resource it is.
Example: 'gke1'
363class RouterNatIpInfo(models.Resource): 364 """NAT IP Info""" 365 366 _resource_data: dict 367 368 def __init__(self, project_id, resource_data): 369 super().__init__(project_id=project_id) 370 self._resource_data = resource_data 371 372 @property 373 def full_path(self) -> str: 374 result = re.match(r'https://www.googleapis.com/compute/v1/(.*)', 375 self.self_link) 376 if result: 377 return result.group(1) 378 else: 379 return f'>> {self.self_link}' 380 381 @property 382 def short_path(self) -> str: 383 path = self.project_id + '/' + self.name 384 return path 385 386 @property 387 def self_link(self) -> str: 388 return self._resource_data.get('selfLink', '') 389 390 @property 391 def name(self) -> str: 392 return self._resource_data.get('name', '') 393 394 @property 395 def result(self) -> str: 396 return self._resource_data.get('result', [])
NAT IP Info
372 @property 373 def full_path(self) -> str: 374 result = re.match(r'https://www.googleapis.com/compute/v1/(.*)', 375 self.self_link) 376 if result: 377 return result.group(1) 378 else: 379 return f'>> {self.self_link}'
Returns the full path of this resource.
Example: 'projects/gcpdiag-gke-1-9b90/zones/europe-west4-a/clusters/gke1'
381 @property 382 def short_path(self) -> str: 383 path = self.project_id + '/' + self.name 384 return path
Returns the short name for this resource.
Note that it isn't clear from this name what kind of resource it is.
Example: 'gke1'
399@dataclasses.dataclass 400class Peering: 401 """VPC Peerings""" 402 403 name: str 404 url: str 405 state: str 406 exports_custom_routes: bool 407 imports_custom_routes: bool 408 auto_creates_routes: bool 409 410 def __str__(self): 411 return self.name
VPC Peerings
414class Network(models.Resource): 415 """A VPC network.""" 416 _resource_data: dict 417 _subnetworks: Optional[Dict[str, Subnetwork]] 418 _context: models.Context 419 420 def __init__(self, project_id, resource_data, context: models.Context): 421 super().__init__(project_id=project_id) 422 self._resource_data = resource_data 423 self._subnetworks = None 424 self._context = context 425 426 @property 427 def full_path(self) -> str: 428 result = re.match(r'https://www.googleapis.com/compute/v1/(.*)', 429 self.self_link) 430 if result: 431 return result.group(1) 432 else: 433 return f'>> {self.self_link}' 434 435 @property 436 def short_path(self) -> str: 437 path = self.project_id + '/' + self.name 438 return path 439 440 @property 441 def name(self) -> str: 442 return self._resource_data['name'] 443 444 @property 445 def self_link(self) -> str: 446 return self._resource_data.get('selfLink', '') 447 448 @property 449 def firewall(self) -> 'EffectiveFirewalls': 450 return _get_effective_firewalls(self) 451 452 @property 453 def mtu(self) -> int: 454 if 'mtu' in self._resource_data: 455 return self._resource_data['mtu'] 456 return DEFAULT_MTU 457 458 @property 459 def subnetworks(self) -> Dict[str, Subnetwork]: 460 return _batch_get_subnetworks( 461 self._project_id, frozenset(self._resource_data.get('subnetworks', [])), 462 self._context) 463 464 @property 465 def peerings(self) -> List[Peering]: 466 return [ 467 Peering(peer['name'], peer['network'], peer['state'], 468 peer['exportCustomRoutes'], peer['importCustomRoutes'], 469 peer['autoCreateRoutes']) 470 for peer in self._resource_data.get('peerings', []) 471 ]
A VPC network.
426 @property 427 def full_path(self) -> str: 428 result = re.match(r'https://www.googleapis.com/compute/v1/(.*)', 429 self.self_link) 430 if result: 431 return result.group(1) 432 else: 433 return f'>> {self.self_link}'
Returns the full path of this resource.
Example: 'projects/gcpdiag-gke-1-9b90/zones/europe-west4-a/clusters/gke1'
435 @property 436 def short_path(self) -> str: 437 path = self.project_id + '/' + self.name 438 return path
Returns the short name for this resource.
Note that it isn't clear from this name what kind of resource it is.
Example: 'gke1'
579@dataclasses.dataclass 580class FirewallCheckResult: 581 """The result of a firewall connectivity check.""" 582 583 action: str 584 firewall_policy_name: Optional[str] = None 585 firewall_policy_rule_description: Optional[str] = None 586 vpc_firewall_rule_id: Optional[str] = None 587 vpc_firewall_rule_name: Optional[str] = None 588 589 def __str__(self): 590 return self.action 591 592 @property 593 def matched_by_str(self): 594 if self.firewall_policy_name: 595 if self.firewall_policy_rule_description: 596 return f'policy: {self.firewall_policy_name}, rule: {self.firewall_policy_rule_description}' 597 else: 598 return f'policy: {self.firewall_policy_name}' 599 elif self.vpc_firewall_rule_name: 600 return f'vpc firewall rule: {self.vpc_firewall_rule_name}'
The result of a firewall connectivity check.
592 @property 593 def matched_by_str(self): 594 if self.firewall_policy_name: 595 if self.firewall_policy_rule_description: 596 return f'policy: {self.firewall_policy_name}, rule: {self.firewall_policy_rule_description}' 597 else: 598 return f'policy: {self.firewall_policy_name}' 599 elif self.vpc_firewall_rule_name: 600 return f'vpc firewall rule: {self.vpc_firewall_rule_name}'
603class FirewallRuleNotFoundError(Exception): 604 rule_name: str 605 606 def __init__(self, name, disabled=False): 607 # Call the base class constructor with the parameters it needs 608 super().__init__(f'firewall rule not found: {name}') 609 self.rule_name = name 610 self.disabled = disabled
Common base class for all non-exit exceptions.
613class VpcFirewallRule: 614 """Represents firewall rule""" 615 616 def __init__(self, resource_data): 617 self._resource_data = resource_data 618 619 @property 620 def name(self) -> str: 621 return self._resource_data['name'] 622 623 @property 624 def source_ranges(self) -> List[ipaddress.IPv4Network]: 625 return self._resource_data['sourceRanges'] 626 627 @property 628 def target_tags(self) -> set: 629 return self._resource_data['targetTags'] 630 631 @property 632 def allowed(self) -> List[dict]: 633 return self._resource_data['allowed'] 634 635 def is_enabled(self) -> bool: 636 return not self._resource_data['disabled']
Represents firewall rule
996class EffectiveFirewalls: 997 """Effective firewall rules for a VPC network or Instance. 998 999 Includes org/folder firewall policies).""" 1000 _resource_data: dict 1001 _policies: List[_FirewallPolicy] 1002 _vpc_firewall: _VpcFirewall 1003 1004 def __init__(self, resource_data): 1005 self._resource_data = resource_data 1006 self._policies = [] 1007 if 'firewallPolicys' in resource_data: 1008 for policy in resource_data['firewallPolicys']: 1009 self._policies.append(_FirewallPolicy(policy)) 1010 self._vpc_firewall = _VpcFirewall(resource_data.get('firewalls', {})) 1011 1012 def check_connectivity_ingress( 1013 self, # 1014 *, 1015 src_ip: IPAddrOrNet, 1016 ip_protocol: str, 1017 port: Optional[int] = None, 1018 source_service_account: Optional[str] = None, 1019 source_tags: Optional[List[str]] = None, 1020 target_service_account: Optional[str] = None, 1021 target_tags: Optional[List[str]] = None) -> FirewallCheckResult: 1022 1023 if ip_protocol != 'ICMP' and port is None: 1024 raise ValueError('TCP and UDP must have port numbers') 1025 1026 # Firewall policies (organization, folders) 1027 for p in self._policies: 1028 result = p.check_connectivity_ingress( 1029 src_ip=src_ip, 1030 ip_protocol=ip_protocol, 1031 port=port, 1032 #target_network=self._network, 1033 target_service_account=target_service_account) 1034 if result.action != 'goto_next': 1035 return result 1036 1037 # VPC firewall rules 1038 return self._vpc_firewall.check_connectivity_ingress( 1039 src_ip=src_ip, 1040 ip_protocol=ip_protocol, 1041 port=port, 1042 source_service_account=source_service_account, 1043 source_tags=source_tags, 1044 target_service_account=target_service_account, 1045 target_tags=target_tags) 1046 1047 def check_connectivity_egress( 1048 self, # 1049 *, 1050 src_ip: IPAddrOrNet, 1051 ip_protocol: str, 1052 port: Optional[int] = None, 1053 source_service_account: Optional[str] = None, 1054 source_tags: Optional[List[str]] = None, 1055 target_service_account: Optional[str] = None, 1056 target_tags: Optional[List[str]] = None) -> FirewallCheckResult: 1057 1058 if ip_protocol != 'ICMP' and port is None: 1059 raise ValueError('TCP and UDP must have port numbers') 1060 1061 # Firewall policies (organization, folders) 1062 for p in self._policies: 1063 result = p.check_connectivity_egress( 1064 src_ip=src_ip, 1065 ip_protocol=ip_protocol, 1066 port=port, 1067 #target_network=self._network, 1068 target_service_account=target_service_account) 1069 if result.action != 'goto_next': 1070 return result 1071 1072 # VPC firewall rules 1073 return self._vpc_firewall.check_connectivity_egress( 1074 src_ip=src_ip, 1075 ip_protocol=ip_protocol, 1076 port=port, 1077 source_service_account=source_service_account, 1078 source_tags=source_tags, 1079 target_service_account=target_service_account, 1080 target_tags=target_tags) 1081 1082 def get_vpc_ingress_rules( 1083 self, 1084 name: Optional[str] = None, 1085 name_pattern: Optional[re.Pattern] = None, 1086 target_tags: Optional[List[str]] = None) -> List[VpcFirewallRule]: 1087 """Retrieve the list of ingress firewall rules matching name or name pattern and target tags. 1088 1089 Args: 1090 name (Optional[str], optional): firewall rune name. Defaults to None. 1091 name_pattern (Optional[re.Pattern], optional): firewall rule name pattern. Defaults to None. 1092 target_tags (Optional[List[str]], optional): firewall target tags 1093 (if not specified any tag will match). Defaults to None. 1094 1095 Returns: 1096 List[VpcFirewallRule]: List of ingress firewall rules 1097 """ 1098 rules = self._vpc_firewall.get_vpc_ingress_rules(name, name_pattern, 1099 target_tags) 1100 return rules 1101 1102 def get_vpc_egress_rules( 1103 self, 1104 name: Optional[str] = None, 1105 name_pattern: Optional[re.Pattern] = None, 1106 target_tags: Optional[List[str]] = None) -> List[VpcFirewallRule]: 1107 """Retrieve the list of egress firewall rules matching name or name pattern and target tags. 1108 1109 Args: 1110 name (Optional[str], optional): firewall rune name. Defaults to None. 1111 name_pattern (Optional[re.Pattern], optional): firewall rule name pattern. Defaults to None. 1112 target_tags (Optional[List[str]], optional): firewall target tags 1113 (if not specified any tag will match). Defaults to None. 1114 1115 Returns: 1116 List[VpcFirewallRule]: List of egress firewall rules 1117 """ 1118 rules = self._vpc_firewall.get_vpc_egress_rules(name, name_pattern, 1119 target_tags) 1120 return rules 1121 1122 def verify_ingress_rule_exists(self, name: str): 1123 """Verify that a certain VPC rule exists. This is useful to verify 1124 whether maybe a permission was missing on a shared VPC and an 1125 automatic rule couldn't be created.""" 1126 return self._vpc_firewall.verify_ingress_rule_exists(name) 1127 1128 def verify_egress_rule_exists(self, name: str): 1129 """Verify that a certain VPC rule exists. This is useful to verify 1130 whether maybe a permission was missing on a shared VPC and an 1131 automatic rule couldn't be created.""" 1132 return self._vpc_firewall.verify_egress_rule_exists(name)
Effective firewall rules for a VPC network or Instance.
Includes org/folder firewall policies).
1004 def __init__(self, resource_data): 1005 self._resource_data = resource_data 1006 self._policies = [] 1007 if 'firewallPolicys' in resource_data: 1008 for policy in resource_data['firewallPolicys']: 1009 self._policies.append(_FirewallPolicy(policy)) 1010 self._vpc_firewall = _VpcFirewall(resource_data.get('firewalls', {}))
1012 def check_connectivity_ingress( 1013 self, # 1014 *, 1015 src_ip: IPAddrOrNet, 1016 ip_protocol: str, 1017 port: Optional[int] = None, 1018 source_service_account: Optional[str] = None, 1019 source_tags: Optional[List[str]] = None, 1020 target_service_account: Optional[str] = None, 1021 target_tags: Optional[List[str]] = None) -> FirewallCheckResult: 1022 1023 if ip_protocol != 'ICMP' and port is None: 1024 raise ValueError('TCP and UDP must have port numbers') 1025 1026 # Firewall policies (organization, folders) 1027 for p in self._policies: 1028 result = p.check_connectivity_ingress( 1029 src_ip=src_ip, 1030 ip_protocol=ip_protocol, 1031 port=port, 1032 #target_network=self._network, 1033 target_service_account=target_service_account) 1034 if result.action != 'goto_next': 1035 return result 1036 1037 # VPC firewall rules 1038 return self._vpc_firewall.check_connectivity_ingress( 1039 src_ip=src_ip, 1040 ip_protocol=ip_protocol, 1041 port=port, 1042 source_service_account=source_service_account, 1043 source_tags=source_tags, 1044 target_service_account=target_service_account, 1045 target_tags=target_tags)
1047 def check_connectivity_egress( 1048 self, # 1049 *, 1050 src_ip: IPAddrOrNet, 1051 ip_protocol: str, 1052 port: Optional[int] = None, 1053 source_service_account: Optional[str] = None, 1054 source_tags: Optional[List[str]] = None, 1055 target_service_account: Optional[str] = None, 1056 target_tags: Optional[List[str]] = None) -> FirewallCheckResult: 1057 1058 if ip_protocol != 'ICMP' and port is None: 1059 raise ValueError('TCP and UDP must have port numbers') 1060 1061 # Firewall policies (organization, folders) 1062 for p in self._policies: 1063 result = p.check_connectivity_egress( 1064 src_ip=src_ip, 1065 ip_protocol=ip_protocol, 1066 port=port, 1067 #target_network=self._network, 1068 target_service_account=target_service_account) 1069 if result.action != 'goto_next': 1070 return result 1071 1072 # VPC firewall rules 1073 return self._vpc_firewall.check_connectivity_egress( 1074 src_ip=src_ip, 1075 ip_protocol=ip_protocol, 1076 port=port, 1077 source_service_account=source_service_account, 1078 source_tags=source_tags, 1079 target_service_account=target_service_account, 1080 target_tags=target_tags)
1082 def get_vpc_ingress_rules( 1083 self, 1084 name: Optional[str] = None, 1085 name_pattern: Optional[re.Pattern] = None, 1086 target_tags: Optional[List[str]] = None) -> List[VpcFirewallRule]: 1087 """Retrieve the list of ingress firewall rules matching name or name pattern and target tags. 1088 1089 Args: 1090 name (Optional[str], optional): firewall rune name. Defaults to None. 1091 name_pattern (Optional[re.Pattern], optional): firewall rule name pattern. Defaults to None. 1092 target_tags (Optional[List[str]], optional): firewall target tags 1093 (if not specified any tag will match). Defaults to None. 1094 1095 Returns: 1096 List[VpcFirewallRule]: List of ingress firewall rules 1097 """ 1098 rules = self._vpc_firewall.get_vpc_ingress_rules(name, name_pattern, 1099 target_tags) 1100 return rules
Retrieve the list of ingress firewall rules matching name or name pattern and target tags.
Arguments:
- name (Optional[str], optional): firewall rune name. Defaults to None.
- name_pattern (Optional[re.Pattern], optional): firewall rule name pattern. Defaults to None.
- target_tags (Optional[List[str]], optional): firewall target tags (if not specified any tag will match). Defaults to None.
Returns:
List[VpcFirewallRule]: List of ingress firewall rules
1102 def get_vpc_egress_rules( 1103 self, 1104 name: Optional[str] = None, 1105 name_pattern: Optional[re.Pattern] = None, 1106 target_tags: Optional[List[str]] = None) -> List[VpcFirewallRule]: 1107 """Retrieve the list of egress firewall rules matching name or name pattern and target tags. 1108 1109 Args: 1110 name (Optional[str], optional): firewall rune name. Defaults to None. 1111 name_pattern (Optional[re.Pattern], optional): firewall rule name pattern. Defaults to None. 1112 target_tags (Optional[List[str]], optional): firewall target tags 1113 (if not specified any tag will match). Defaults to None. 1114 1115 Returns: 1116 List[VpcFirewallRule]: List of egress firewall rules 1117 """ 1118 rules = self._vpc_firewall.get_vpc_egress_rules(name, name_pattern, 1119 target_tags) 1120 return rules
Retrieve the list of egress firewall rules matching name or name pattern and target tags.
Arguments:
- name (Optional[str], optional): firewall rune name. Defaults to None.
- name_pattern (Optional[re.Pattern], optional): firewall rule name pattern. Defaults to None.
- target_tags (Optional[List[str]], optional): firewall target tags (if not specified any tag will match). Defaults to None.
Returns:
List[VpcFirewallRule]: List of egress firewall rules
1122 def verify_ingress_rule_exists(self, name: str): 1123 """Verify that a certain VPC rule exists. This is useful to verify 1124 whether maybe a permission was missing on a shared VPC and an 1125 automatic rule couldn't be created.""" 1126 return self._vpc_firewall.verify_ingress_rule_exists(name)
Verify that a certain VPC rule exists. This is useful to verify whether maybe a permission was missing on a shared VPC and an automatic rule couldn't be created.
1128 def verify_egress_rule_exists(self, name: str): 1129 """Verify that a certain VPC rule exists. This is useful to verify 1130 whether maybe a permission was missing on a shared VPC and an 1131 automatic rule couldn't be created.""" 1132 return self._vpc_firewall.verify_egress_rule_exists(name)
Verify that a certain VPC rule exists. This is useful to verify whether maybe a permission was missing on a shared VPC and an automatic rule couldn't be created.
1135class VPCEffectiveFirewalls(EffectiveFirewalls): 1136 """Effective firewall rules for a VPC network. 1137 1138 Includes org/folder firewall policies). 1139 """ 1140 _network: Network 1141 1142 def __init__(self, network, resource_data): 1143 super().__init__(resource_data) 1144 self._network = network
Effective firewall rules for a VPC network.
Includes org/folder firewall policies).
1156@caching.cached_api_call(in_memory=True) 1157def get_network(project_id: str, network_name: str, 1158 context: models.Context) -> Network: 1159 logging.debug('fetching network: %s/%s', project_id, network_name) 1160 compute = apis.get_api('compute', 'v1', project_id) 1161 request = compute.networks().get(project=project_id, network=network_name) 1162 response = request.execute(num_retries=config.API_RETRIES) 1163 return Network(project_id, response, context)
1166def get_subnetwork_from_url(url: str) -> Subnetwork: 1167 """Returns Subnetwork object given subnetwork url""" 1168 m = re.match((r'https://www.googleapis.com/compute/v1/projects/' 1169 r'([^/]+)/regions/([^/]+)/subnetworks/([^/]+)$'), url) 1170 if not m: 1171 raise ValueError(f"can't parse network url: {url}") 1172 (project_id, region, subnetwork_name) = (m.group(1), m.group(2), m.group(3)) 1173 return get_subnetwork(project_id, region, subnetwork_name)
Returns Subnetwork object given subnetwork url
1176def get_network_from_url(url: str) -> Network: 1177 m = re.match( 1178 r'https://www.googleapis.com/compute/v1/projects/([^/]+)/global/networks/([^/]+)', 1179 url) 1180 if not m: 1181 raise ValueError(f"can't parse network url: {url}") 1182 (project_id, network_name) = (m.group(1), m.group(2)) 1183 return get_network(project_id, network_name, 1184 models.Context(project_id=project_id))
1187@caching.cached_api_call(in_memory=True) 1188def get_networks(context: models.Context) -> List[Network]: 1189 logging.debug('fetching network: %s', context.project_id) 1190 compute = apis.get_api('compute', 'v1', context.project_id) 1191 request = compute.networks().list(project=context.project_id) 1192 response = request.execute(num_retries=config.API_RETRIES) 1193 return [ 1194 Network(context.project_id, item, context) 1195 for item in response.get('items', []) 1196 ]
1199@caching.cached_api_call(in_memory=True) 1200def get_subnetwork(project_id: str, region: str, 1201 subnetwork_name: str) -> Subnetwork: 1202 logging.debug('fetching network: %s/%s', project_id, subnetwork_name) 1203 compute = apis.get_api('compute', 'v1', project_id) 1204 request = compute.subnetworks().get(project=project_id, 1205 region=region, 1206 subnetwork=subnetwork_name) 1207 response = request.execute(num_retries=config.API_RETRIES) 1208 if response is None: 1209 raise RuntimeError( 1210 f'failed to fetch subnetwork: {project_id}/{region}/{subnetwork_name}') 1211 return Subnetwork(project_id, 1212 response, 1213 context=models.Context(project_id=project_id))
1248@caching.cached_api_call(in_memory=True) 1249def get_routes(project_id: str) -> List[Route]: 1250 logging.debug('fetching routes: %s', project_id) 1251 compute = apis.get_api('compute', 'v1', project_id) 1252 request = compute.routes().list(project=project_id) 1253 response = request.execute(num_retries=config.API_RETRIES) 1254 return [Route(project_id, item) for item in response.get('items', [])]
1257@caching.cached_api_call(in_memory=True) 1258def get_zones(project_id: str) -> List[ManagedZone]: 1259 logging.debug('fetching DNS zones: %s', project_id) 1260 dns = apis.get_api('dns', 'v1beta2', project_id) 1261 request = dns.managedZones().list(project=project_id) 1262 response = request.execute(num_retries=config.API_RETRIES) 1263 zones = [] 1264 for zone in response.get('managedZones', []): 1265 request2 = dns.managedZones().get(project=project_id, 1266 managedZone=zone['name']) 1267 response2 = request2.execute(num_retries=config.API_RETRIES) 1268 zones.append(ManagedZone(project_id, response2)) 1269 return zones
1272@caching.cached_api_call(in_memory=True) 1273def get_routers(project_id: str, region: str, network) -> List[Router]: 1274 logging.debug('fetching routers: %s/%s', project_id, region) 1275 compute = apis.get_api('compute', 'v1', project_id) 1276 request = compute.routers().list(project=project_id, 1277 region=region, 1278 filter=f'network="{network.self_link}"') 1279 response = request.execute(num_retries=config.API_RETRIES) 1280 return [Router(project_id, item) for item in response.get('items', [])]
1283@caching.cached_api_call(in_memory=True) 1284def get_router(project_id: str, region: str, network) -> Router: 1285 logging.debug('fetching routers: %s/%s', project_id, region) 1286 compute = apis.get_api('compute', 'v1', project_id) 1287 request = compute.routers().list(project=project_id, 1288 region=region, 1289 filter=f'network="{network.self_link}"') 1290 response = request.execute(num_retries=config.API_RETRIES) 1291 return Router(project_id, next(iter(response.get('items', [{}]))))
1294@caching.cached_api_call(in_memory=True) 1295def get_router_by_name(project_id: str, region: str, 1296 router_name: str) -> Router: 1297 logging.debug('fetching router list: %s/%s in region %s', project_id, 1298 router_name, region) 1299 compute = apis.get_api('compute', 'v1', project_id) 1300 request = compute.routers().list(project=project_id, region=region) 1301 response = request.execute(num_retries=config.API_RETRIES) 1302 return next( 1303 Router(project_id, item) 1304 for item in response.get('items', []) 1305 if item['name'] == router_name)
1308@caching.cached_api_call(in_memory=True) 1309def nat_router_status(project_id: str, router_name: str, 1310 region: str) -> RouterStatus: 1311 logging.debug('fetching router status: %s/%s in region %s', project_id, 1312 router_name, region) 1313 compute = apis.get_api('compute', 'v1', project_id) 1314 request = compute.routers().getRouterStatus(project=project_id, 1315 router=router_name, 1316 region=region) 1317 response = request.execute(num_retries=config.API_RETRIES) 1318 if 'result' in str(response): 1319 return RouterStatus(project_id, response) 1320 else: 1321 logging.debug('unable to fetch router status: %s/%s in region %s', 1322 project_id, router_name, region) 1323 return RouterStatus(project_id, {})
1326@caching.cached_api_call(in_memory=True) 1327def get_nat_ip_info(project_id: str, router_name: str, 1328 region: str) -> RouterNatIpInfo: 1329 logging.debug('fetching NAT IP info for router: %s/%s in region %s', 1330 project_id, router_name, region) 1331 compute = apis.get_api('compute', 'v1', project_id) 1332 request = compute.routers().getNatIpInfo(project=project_id, 1333 router=router_name, 1334 region=region) 1335 response = request.execute(num_retries=config.API_RETRIES) 1336 if 'result' in str(response): 1337 return RouterNatIpInfo(project_id, response) 1338 else: 1339 logging.debug('unable to fetch Nat IP Info for router: %s/%s in region %s', 1340 project_id, router_name, region) 1341 return RouterNatIpInfo(project_id, {})
1344class VPCSubnetworkIAMPolicy(iam.BaseIAMPolicy): 1345 1346 def _is_resource_permission(self, permission): 1347 return True
Common class for IAM policies
1350@caching.cached_api_call(in_memory=True) 1351def get_subnetwork_iam_policy(context: models.Context, region: str, 1352 subnetwork_name: str) -> VPCSubnetworkIAMPolicy: 1353 project_id = context.project_id 1354 resource_name = (f'projects/{project_id}/regions/{region}/' 1355 f'subnetworks/{subnetwork_name}') 1356 1357 compute = apis.get_api('compute', 'v1', project_id) 1358 request = compute.subnetworks().getIamPolicy(project=project_id, 1359 region=region, 1360 resource=subnetwork_name) 1361 1362 return iam.fetch_iam_policy(request, VPCSubnetworkIAMPolicy, project_id, 1363 resource_name, context)
1366class Address(models.Resource): 1367 """IP Addresses.""" 1368 _resource_data: dict 1369 1370 def __init__(self, project_id, resource_data): 1371 super().__init__(project_id=project_id) 1372 self._resource_data = resource_data 1373 1374 @property 1375 def full_path(self) -> str: 1376 result = re.match(r'https://www.googleapis.com/compute/v1/(.*)', 1377 self.self_link) 1378 if result: 1379 return result.group(1) 1380 else: 1381 return f'>> {self.self_link}' 1382 1383 @property 1384 def short_path(self) -> str: 1385 path = self.project_id + '/' + self.name 1386 return path 1387 1388 @property 1389 def name(self) -> str: 1390 return self._resource_data['name'] 1391 1392 @property 1393 def self_link(self) -> str: 1394 return self._resource_data.get('selfLink', '') 1395 1396 @property 1397 def subnetwork(self) -> str: 1398 return self._resource_data['subnetwork'] 1399 1400 @property 1401 def status(self) -> str: 1402 return self._resource_data['status']
IP Addresses.
1374 @property 1375 def full_path(self) -> str: 1376 result = re.match(r'https://www.googleapis.com/compute/v1/(.*)', 1377 self.self_link) 1378 if result: 1379 return result.group(1) 1380 else: 1381 return f'>> {self.self_link}'
Returns the full path of this resource.
Example: 'projects/gcpdiag-gke-1-9b90/zones/europe-west4-a/clusters/gke1'
1383 @property 1384 def short_path(self) -> str: 1385 path = self.project_id + '/' + self.name 1386 return path
Returns the short name for this resource.
Note that it isn't clear from this name what kind of resource it is.
Example: 'gke1'
1405@caching.cached_api_call(in_memory=True) 1406def get_addresses(project_id: str) -> List[Address]: 1407 logging.debug('fetching addresses list: %s', project_id) 1408 compute = apis.get_api('compute', 'v1', project_id) 1409 addresses = [] 1410 request = compute.addresses().aggregatedList(project=project_id) 1411 response = request.execute(num_retries=config.API_RETRIES) 1412 addresses_by_regions = response['items'] 1413 for _, data_ in addresses_by_regions.items(): 1414 if 'addresses' not in data_: 1415 continue 1416 addresses.extend( 1417 [Address(project_id, address) for address in data_['addresses']]) 1418 return addresses