gcpdiag.queries.network
35class Subnetwork(models.Resource): 36 """A VPC subnetwork.""" 37 38 _resource_data: dict 39 40 def __init__(self, project_id, resource_data): 41 super().__init__(project_id=project_id) 42 self._resource_data = resource_data 43 44 @property 45 def full_path(self) -> str: 46 result = re.match(r'https://www.googleapis.com/compute/v1/(.*)', 47 self.self_link) 48 if result: 49 return result.group(1) 50 else: 51 return f'>> {self.self_link}' 52 53 @property 54 def short_path(self) -> str: 55 path = self.project_id + '/' + self.name 56 return path 57 58 @property 59 def name(self) -> str: 60 return self._resource_data['name'] 61 62 @property 63 def self_link(self) -> str: 64 return self._resource_data.get('selfLink', '') 65 66 @property 67 def ip_network(self) -> IPv4NetOrIPv6Net: 68 return ipaddress.ip_network(self._resource_data['ipCidrRange']) 69 70 @property 71 def region(self) -> str: 72 # https://www.googleapis.com/compute/v1/projects/gcpdiag-gke1-aaaa/regions/europe-west4 73 m = re.match( 74 r'https://www.googleapis.com/compute/v1/projects/([^/]+)/regions/([^/]+)', 75 self._resource_data['region']) 76 if not m: 77 raise RuntimeError( 78 f"can't parse region URL: {self._resource_data['region']}") 79 return m.group(2) 80 81 def is_private_ip_google_access(self) -> bool: 82 return self._resource_data.get('privateIpGoogleAccess', False) 83 84 @property 85 def network(self): 86 return self._resource_data['network']
A VPC subnetwork.
44 @property 45 def full_path(self) -> str: 46 result = re.match(r'https://www.googleapis.com/compute/v1/(.*)', 47 self.self_link) 48 if result: 49 return result.group(1) 50 else: 51 return f'>> {self.self_link}'
Returns the full path of this resource.
Example: 'projects/gcpdiag-gke-1-9b90/zones/europe-west4-a/clusters/gke1'
53 @property 54 def short_path(self) -> str: 55 path = self.project_id + '/' + self.name 56 return path
Returns the short name for this resource.
Note that it isn't clear from this name what kind of resource it is.
Example: 'gke1'
70 @property 71 def region(self) -> str: 72 # https://www.googleapis.com/compute/v1/projects/gcpdiag-gke1-aaaa/regions/europe-west4 73 m = re.match( 74 r'https://www.googleapis.com/compute/v1/projects/([^/]+)/regions/([^/]+)', 75 self._resource_data['region']) 76 if not m: 77 raise RuntimeError( 78 f"can't parse region URL: {self._resource_data['region']}") 79 return m.group(2)
89class Route(models.Resource): 90 """A VPC Route.""" 91 92 _resource_data: dict 93 94 def __init__(self, project_id, resource_data): 95 super().__init__(project_id=project_id) 96 self._resource_data = resource_data 97 98 @property 99 def full_path(self) -> str: 100 result = re.match(r'https://www.googleapis.com/compute/v1/(.*)', 101 self.self_link) 102 if result: 103 return result.group(1) 104 else: 105 return f'>> {self.self_link}' 106 107 @property 108 def short_path(self) -> str: 109 path = self.project_id + '/' + self.name 110 return path 111 112 @property 113 def name(self) -> str: 114 return self._resource_data['name'] 115 116 @property 117 def kind(self) -> str: 118 return self._resource_data['kind'] 119 120 @property 121 def self_link(self) -> str: 122 return self._resource_data['selfLink'] 123 124 @property 125 def network(self) -> str: 126 return self._resource_data['network'] 127 128 @property 129 def tags(self) -> List[str]: 130 if 'tags' in self._resource_data: 131 return self._resource_data['tags'] 132 return [] 133 134 @property 135 def dest_range(self) -> str: 136 return self._resource_data['destRange'] 137 138 @property 139 def next_hop_gateway(self) -> Optional[str]: 140 if 'nextHopGateway' in self._resource_data: 141 return self._resource_data['nextHopGateway'] 142 return None 143 144 @property 145 def next_hop_vpn_tunnel(self) -> Optional[str]: 146 return self._resource_data.get('nextHopVpnTunnel') 147 148 @property 149 def next_hop_hub(self) -> Optional[str]: 150 return self._resource_data.get('nextHopHub') 151 152 @property 153 def priority(self) -> int: 154 return self._resource_data['priority'] 155 156 def get_next_hop(self) -> Union[Dict[str, Any], Optional[str]]: 157 hop_types = { 158 'nextHopGateway': 'nextHopGateway', 159 'nextHopVpnTunnel': 'nextHopVpnTunnel', 160 'nextHopHub': 'nextHopHub', 161 'nextHopInstance': 'nextHopInstance', 162 'nextHopAddress': 'nextHopAddress', 163 'nextHopPeering': 'nextHopPeering', 164 'nextHopIlb': 'nextHopIlb', 165 'nextHopNetwork': 'nextHopNetwork', 166 'nextHopIp': 'nextHopIp' 167 } 168 169 for hop_type, value in hop_types.items(): 170 if self._resource_data.get(hop_type): 171 return {'type': value, 'link': self._resource_data[hop_type]} 172 return None 173 174 def check_route_match(self, ip1: IPAddrOrNet, ip2: str) -> bool: 175 ip2_list = [ipaddress.ip_network(ip2)] 176 if _ip_match(ip1, ip2_list, 'allow'): 177 return True 178 return False
A VPC Route.
98 @property 99 def full_path(self) -> str: 100 result = re.match(r'https://www.googleapis.com/compute/v1/(.*)', 101 self.self_link) 102 if result: 103 return result.group(1) 104 else: 105 return f'>> {self.self_link}'
Returns the full path of this resource.
Example: 'projects/gcpdiag-gke-1-9b90/zones/europe-west4-a/clusters/gke1'
107 @property 108 def short_path(self) -> str: 109 path = self.project_id + '/' + self.name 110 return path
Returns the short name for this resource.
Note that it isn't clear from this name what kind of resource it is.
Example: 'gke1'
156 def get_next_hop(self) -> Union[Dict[str, Any], Optional[str]]: 157 hop_types = { 158 'nextHopGateway': 'nextHopGateway', 159 'nextHopVpnTunnel': 'nextHopVpnTunnel', 160 'nextHopHub': 'nextHopHub', 161 'nextHopInstance': 'nextHopInstance', 162 'nextHopAddress': 'nextHopAddress', 163 'nextHopPeering': 'nextHopPeering', 164 'nextHopIlb': 'nextHopIlb', 165 'nextHopNetwork': 'nextHopNetwork', 166 'nextHopIp': 'nextHopIp' 167 } 168 169 for hop_type, value in hop_types.items(): 170 if self._resource_data.get(hop_type): 171 return {'type': value, 'link': self._resource_data[hop_type]} 172 return None
181class ManagedZone(models.Resource): 182 """ 183 Represent a DNS zone (public or private 184 185 https://cloud.google.com/dns/docs/reference/v1beta2/managedZones 186 """ 187 _resource_data: dict 188 189 def __init__(self, project_id, resource_data): 190 super().__init__(project_id=project_id) 191 self._resource_data = resource_data 192 193 @property 194 def cloud_logging_config(self) -> bool: 195 return self._resource_data['cloudLoggingConfig'].get('enableLogging', False) 196 197 @property 198 def is_public(self) -> bool: 199 return self._resource_data['visibility'] == 'public' 200 201 @property 202 def vpc_attached(self) -> bool: 203 if 'privateVisibilityConfig' not in self._resource_data: 204 self._resource_data['privateVisibilityConfig'] = {} 205 206 return (self._resource_data['privateVisibilityConfig'].get( 207 'networks', False) or 208 self._resource_data['privateVisibilityConfig'].get( 209 'gkeClusters', False)) 210 211 @property 212 def dnssec_config_state(self) -> bool: 213 if 'dnssecConfig' not in self._resource_data: 214 self._resource_data['dnssecConfig'] = {} 215 216 return self._resource_data['dnssecConfig'].get('state', False) 217 218 @property 219 def name(self) -> str: 220 return self._resource_data['name'] 221 222 @property 223 def full_path(self) -> str: 224 result = re.match(r'https://dns.googleapis.com/dns/v1beta2/(.*)', 225 self.self_link) 226 if result: 227 return result.group(1) 228 else: 229 return f'>> {self.self_link}' 230 231 @property 232 def short_path(self) -> str: 233 path = self.project_id + '/' + self.name 234 return path 235 236 @property 237 def self_link(self) -> str: 238 return self._resource_data.get('selfLink', '')
Represent a DNS zone (public or private
https://cloud.google.com/dns/docs/reference/v1beta2/managedZones
201 @property 202 def vpc_attached(self) -> bool: 203 if 'privateVisibilityConfig' not in self._resource_data: 204 self._resource_data['privateVisibilityConfig'] = {} 205 206 return (self._resource_data['privateVisibilityConfig'].get( 207 'networks', False) or 208 self._resource_data['privateVisibilityConfig'].get( 209 'gkeClusters', False))
222 @property 223 def full_path(self) -> str: 224 result = re.match(r'https://dns.googleapis.com/dns/v1beta2/(.*)', 225 self.self_link) 226 if result: 227 return result.group(1) 228 else: 229 return f'>> {self.self_link}'
Returns the full path of this resource.
Example: 'projects/gcpdiag-gke-1-9b90/zones/europe-west4-a/clusters/gke1'
241class Router(models.Resource): 242 """A VPC Router.""" 243 244 _resource_data: dict 245 246 def __init__(self, project_id, resource_data): 247 super().__init__(project_id=project_id) 248 self._resource_data = resource_data 249 self._nats = None 250 251 @property 252 def full_path(self) -> str: 253 result = re.match(r'https://www.googleapis.com/compute/v1/(.*)', 254 self.self_link) 255 if result: 256 return result.group(1) 257 else: 258 return f'>> {self.self_link}' 259 260 @property 261 def short_path(self) -> str: 262 path = self.project_id + '/' + self.name 263 return path 264 265 @property 266 def name(self) -> str: 267 return self._resource_data['name'] 268 269 @property 270 def self_link(self) -> str: 271 return self._resource_data.get('selfLink', '') 272 273 @property 274 def network(self) -> str: 275 return self._resource_data['network'] 276 277 def get_network_name(self) -> str: 278 logging.info('inside get_network_name function') 279 if self._resource_data['network']: 280 return self._resource_data['network'].split('/')[-1] 281 return '' 282 283 @property 284 def nats(self): 285 return self._resource_data.get('nats', []) 286 287 def get_nat_ip_allocate_option(self, nat_gateway) -> str: 288 nats = self._resource_data.get('nats', []) 289 nat = [n for n in nats if n['name'] == nat_gateway] 290 return nat[0].get('natIpAllocateOption', '') 291 292 def get_enable_dynamic_port_allocation(self, nat_gateway) -> str: 293 nats = self._resource_data.get('nats', []) 294 nat = [n for n in nats if n['name'] == nat_gateway] 295 return nat[0].get('enableDynamicPortAllocation', '') 296 297 def subnet_has_nat(self, subnetwork): 298 if not self._resource_data.get('nats', []): 299 return False 300 for n in self._resource_data.get('nats', []): 301 if n['sourceSubnetworkIpRangesToNat'] == 'LIST_OF_SUBNETWORKS': 302 # Cloud NAT configure for specific subnets 303 if 'subnetworks' in n and subnetwork.self_link in [ 304 s['name'] for s in n['subnetworks'] 305 ]: 306 return True 307 else: 308 # Cloud NAT configured for all subnets 309 return True 310 return False
A VPC Router.
251 @property 252 def full_path(self) -> str: 253 result = re.match(r'https://www.googleapis.com/compute/v1/(.*)', 254 self.self_link) 255 if result: 256 return result.group(1) 257 else: 258 return f'>> {self.self_link}'
Returns the full path of this resource.
Example: 'projects/gcpdiag-gke-1-9b90/zones/europe-west4-a/clusters/gke1'
260 @property 261 def short_path(self) -> str: 262 path = self.project_id + '/' + self.name 263 return path
Returns the short name for this resource.
Note that it isn't clear from this name what kind of resource it is.
Example: 'gke1'
297 def subnet_has_nat(self, subnetwork): 298 if not self._resource_data.get('nats', []): 299 return False 300 for n in self._resource_data.get('nats', []): 301 if n['sourceSubnetworkIpRangesToNat'] == 'LIST_OF_SUBNETWORKS': 302 # Cloud NAT configure for specific subnets 303 if 'subnetworks' in n and subnetwork.self_link in [ 304 s['name'] for s in n['subnetworks'] 305 ]: 306 return True 307 else: 308 # Cloud NAT configured for all subnets 309 return True 310 return False
313class RouterStatus(models.Resource): 314 """NAT Router Status""" 315 316 _resource_data: dict 317 318 def __init__(self, project_id, resource_data): 319 super().__init__(project_id=project_id) 320 self._resource_data = resource_data 321 322 @property 323 def full_path(self) -> str: 324 result = re.match(r'https://www.googleapis.com/compute/v1/(.*)', 325 self.self_link) 326 if result: 327 return result.group(1) 328 else: 329 return f'>> {self.self_link}' 330 331 @property 332 def short_path(self) -> str: 333 path = self.project_id + '/' + self.name 334 return path 335 336 @property 337 def name(self) -> str: 338 return self._resource_data.get('name', '') 339 340 @property 341 def self_link(self) -> str: 342 return self._resource_data.get('selfLink', '') 343 344 @property 345 def min_extra_nat_ips_needed(self) -> str: 346 nat_status = self._resource_data.get('result', {}).get('natStatus', {}) 347 return nat_status[0].get('minExtraNatIpsNeeded', None) 348 349 @property 350 def num_vms_with_nat_mappings(self) -> str: 351 nat_status = self._resource_data.get('result', {}).get('natStatus', {}) 352 return nat_status[0].get('numVmEndpointsWithNatMappings', None) 353 354 @property 355 def bgp_peer_status(self) -> str: 356 bgp_peer_status = self._resource_data.get('result', 357 {}).get('bgpPeerStatus', {}) 358 return bgp_peer_status
NAT Router Status
322 @property 323 def full_path(self) -> str: 324 result = re.match(r'https://www.googleapis.com/compute/v1/(.*)', 325 self.self_link) 326 if result: 327 return result.group(1) 328 else: 329 return f'>> {self.self_link}'
Returns the full path of this resource.
Example: 'projects/gcpdiag-gke-1-9b90/zones/europe-west4-a/clusters/gke1'
331 @property 332 def short_path(self) -> str: 333 path = self.project_id + '/' + self.name 334 return path
Returns the short name for this resource.
Note that it isn't clear from this name what kind of resource it is.
Example: 'gke1'
361class RouterNatIpInfo(models.Resource): 362 """NAT IP Info""" 363 364 _resource_data: dict 365 366 def __init__(self, project_id, resource_data): 367 super().__init__(project_id=project_id) 368 self._resource_data = resource_data 369 370 @property 371 def full_path(self) -> str: 372 result = re.match(r'https://www.googleapis.com/compute/v1/(.*)', 373 self.self_link) 374 if result: 375 return result.group(1) 376 else: 377 return f'>> {self.self_link}' 378 379 @property 380 def short_path(self) -> str: 381 path = self.project_id + '/' + self.name 382 return path 383 384 @property 385 def self_link(self) -> str: 386 return self._resource_data.get('selfLink', '') 387 388 @property 389 def name(self) -> str: 390 return self._resource_data.get('name', '') 391 392 @property 393 def result(self) -> str: 394 return self._resource_data.get('result', [])
NAT IP Info
370 @property 371 def full_path(self) -> str: 372 result = re.match(r'https://www.googleapis.com/compute/v1/(.*)', 373 self.self_link) 374 if result: 375 return result.group(1) 376 else: 377 return f'>> {self.self_link}'
Returns the full path of this resource.
Example: 'projects/gcpdiag-gke-1-9b90/zones/europe-west4-a/clusters/gke1'
379 @property 380 def short_path(self) -> str: 381 path = self.project_id + '/' + self.name 382 return path
Returns the short name for this resource.
Note that it isn't clear from this name what kind of resource it is.
Example: 'gke1'
397@dataclasses.dataclass 398class Peering: 399 """VPC Peerings""" 400 401 name: str 402 url: str 403 state: str 404 exports_custom_routes: bool 405 imports_custom_routes: bool 406 auto_creates_routes: bool 407 408 def __str__(self): 409 return self.name
VPC Peerings
412class Network(models.Resource): 413 """A VPC network.""" 414 _resource_data: dict 415 _subnetworks: Optional[Dict[str, Subnetwork]] 416 417 def __init__(self, project_id, resource_data): 418 super().__init__(project_id=project_id) 419 self._resource_data = resource_data 420 self._subnetworks = None 421 422 @property 423 def full_path(self) -> str: 424 result = re.match(r'https://www.googleapis.com/compute/v1/(.*)', 425 self.self_link) 426 if result: 427 return result.group(1) 428 else: 429 return f'>> {self.self_link}' 430 431 @property 432 def short_path(self) -> str: 433 path = self.project_id + '/' + self.name 434 return path 435 436 @property 437 def name(self) -> str: 438 return self._resource_data['name'] 439 440 @property 441 def self_link(self) -> str: 442 return self._resource_data.get('selfLink', '') 443 444 @property 445 def firewall(self) -> 'EffectiveFirewalls': 446 return _get_effective_firewalls(self) 447 448 @property 449 def mtu(self) -> int: 450 if 'mtu' in self._resource_data: 451 return self._resource_data['mtu'] 452 return DEFAULT_MTU 453 454 @property 455 def subnetworks(self) -> Dict[str, Subnetwork]: 456 return _batch_get_subnetworks( 457 self._project_id, frozenset(self._resource_data.get('subnetworks', []))) 458 459 @property 460 def peerings(self) -> List[Peering]: 461 return [ 462 Peering(peer['name'], peer['network'], peer['state'], 463 peer['exportCustomRoutes'], peer['importCustomRoutes'], 464 peer['autoCreateRoutes']) 465 for peer in self._resource_data.get('peerings', []) 466 ]
A VPC network.
422 @property 423 def full_path(self) -> str: 424 result = re.match(r'https://www.googleapis.com/compute/v1/(.*)', 425 self.self_link) 426 if result: 427 return result.group(1) 428 else: 429 return f'>> {self.self_link}'
Returns the full path of this resource.
Example: 'projects/gcpdiag-gke-1-9b90/zones/europe-west4-a/clusters/gke1'
431 @property 432 def short_path(self) -> str: 433 path = self.project_id + '/' + self.name 434 return path
Returns the short name for this resource.
Note that it isn't clear from this name what kind of resource it is.
Example: 'gke1'
574@dataclasses.dataclass 575class FirewallCheckResult: 576 """The result of a firewall connectivity check.""" 577 578 action: str 579 firewall_policy_name: Optional[str] = None 580 firewall_policy_rule_description: Optional[str] = None 581 vpc_firewall_rule_id: Optional[str] = None 582 vpc_firewall_rule_name: Optional[str] = None 583 584 def __str__(self): 585 return self.action 586 587 @property 588 def matched_by_str(self): 589 if self.firewall_policy_name: 590 if self.firewall_policy_rule_description: 591 return f'policy: {self.firewall_policy_name}, rule: {self.firewall_policy_rule_description}' 592 else: 593 return f'policy: {self.firewall_policy_name}' 594 elif self.vpc_firewall_rule_name: 595 return f'vpc firewall rule: {self.vpc_firewall_rule_name}'
The result of a firewall connectivity check.
587 @property 588 def matched_by_str(self): 589 if self.firewall_policy_name: 590 if self.firewall_policy_rule_description: 591 return f'policy: {self.firewall_policy_name}, rule: {self.firewall_policy_rule_description}' 592 else: 593 return f'policy: {self.firewall_policy_name}' 594 elif self.vpc_firewall_rule_name: 595 return f'vpc firewall rule: {self.vpc_firewall_rule_name}'
598class FirewallRuleNotFoundError(Exception): 599 rule_name: str 600 601 def __init__(self, name, disabled=False): 602 # Call the base class constructor with the parameters it needs 603 super().__init__(f'firewall rule not found: {name}') 604 self.rule_name = name 605 self.disabled = disabled
Common base class for all non-exit exceptions.
608class VpcFirewallRule: 609 """Represents firewall rule""" 610 611 def __init__(self, resource_data): 612 self._resource_data = resource_data 613 614 @property 615 def name(self) -> str: 616 return self._resource_data['name'] 617 618 @property 619 def source_ranges(self) -> List[ipaddress.IPv4Network]: 620 return self._resource_data['sourceRanges'] 621 622 @property 623 def target_tags(self) -> set: 624 return self._resource_data['targetTags'] 625 626 @property 627 def allowed(self) -> List[dict]: 628 return self._resource_data['allowed'] 629 630 def is_enabled(self) -> bool: 631 return not self._resource_data['disabled']
Represents firewall rule
983class EffectiveFirewalls: 984 """Effective firewall rules for a VPC network or Instance. 985 986 Includes org/folder firewall policies).""" 987 _resource_data: dict 988 _policies: List[_FirewallPolicy] 989 _vpc_firewall: _VpcFirewall 990 991 def __init__(self, resource_data): 992 self._resource_data = resource_data 993 self._policies = [] 994 if 'firewallPolicys' in resource_data: 995 for policy in resource_data['firewallPolicys']: 996 self._policies.append(_FirewallPolicy(policy)) 997 self._vpc_firewall = _VpcFirewall(resource_data.get('firewalls', {})) 998 999 def check_connectivity_ingress( 1000 self, # 1001 *, 1002 src_ip: IPAddrOrNet, 1003 ip_protocol: str, 1004 port: Optional[int] = None, 1005 source_service_account: Optional[str] = None, 1006 source_tags: Optional[List[str]] = None, 1007 target_service_account: Optional[str] = None, 1008 target_tags: Optional[List[str]] = None) -> FirewallCheckResult: 1009 1010 if ip_protocol != 'ICMP' and port is None: 1011 raise ValueError('TCP and UDP must have port numbers') 1012 1013 # Firewall policies (organization, folders) 1014 for p in self._policies: 1015 result = p.check_connectivity_ingress( 1016 src_ip=src_ip, 1017 ip_protocol=ip_protocol, 1018 port=port, 1019 #target_network=self._network, 1020 target_service_account=target_service_account) 1021 if result.action != 'goto_next': 1022 return result 1023 1024 # VPC firewall rules 1025 return self._vpc_firewall.check_connectivity_ingress( 1026 src_ip=src_ip, 1027 ip_protocol=ip_protocol, 1028 port=port, 1029 source_service_account=source_service_account, 1030 source_tags=source_tags, 1031 target_service_account=target_service_account, 1032 target_tags=target_tags) 1033 1034 def check_connectivity_egress( 1035 self, # 1036 *, 1037 src_ip: IPAddrOrNet, 1038 ip_protocol: str, 1039 port: Optional[int] = None, 1040 source_service_account: Optional[str] = None, 1041 source_tags: Optional[List[str]] = None, 1042 target_service_account: Optional[str] = None, 1043 target_tags: Optional[List[str]] = None) -> FirewallCheckResult: 1044 1045 if ip_protocol != 'ICMP' and port is None: 1046 raise ValueError('TCP and UDP must have port numbers') 1047 1048 # Firewall policies (organization, folders) 1049 for p in self._policies: 1050 result = p.check_connectivity_egress( 1051 src_ip=src_ip, 1052 ip_protocol=ip_protocol, 1053 port=port, 1054 #target_network=self._network, 1055 target_service_account=target_service_account) 1056 if result.action != 'goto_next': 1057 return result 1058 1059 # VPC firewall rules 1060 return self._vpc_firewall.check_connectivity_egress( 1061 src_ip=src_ip, 1062 ip_protocol=ip_protocol, 1063 port=port, 1064 source_service_account=source_service_account, 1065 source_tags=source_tags, 1066 target_service_account=target_service_account, 1067 target_tags=target_tags) 1068 1069 def get_vpc_ingress_rules( 1070 self, 1071 name: Optional[str] = None, 1072 name_pattern: Optional[re.Pattern] = None, 1073 target_tags: Optional[List[str]] = None) -> List[VpcFirewallRule]: 1074 """Retrieve the list of ingress firewall rules matching name or name pattern and target tags. 1075 1076 Args: 1077 name (Optional[str], optional): firewall rune name. Defaults to None. 1078 name_pattern (Optional[re.Pattern], optional): firewall rule name pattern. Defaults to None. 1079 target_tags (Optional[List[str]], optional): firewall target tags 1080 (if not specified any tag will match). Defaults to None. 1081 1082 Returns: 1083 List[VpcFirewallRule]: List of ingress firewall rules 1084 """ 1085 rules = self._vpc_firewall.get_vpc_ingress_rules(name, name_pattern, 1086 target_tags) 1087 return rules 1088 1089 def get_vpc_egress_rules( 1090 self, 1091 name: Optional[str] = None, 1092 name_pattern: Optional[re.Pattern] = None, 1093 target_tags: Optional[List[str]] = None) -> List[VpcFirewallRule]: 1094 """Retrieve the list of egress firewall rules matching name or name pattern and target tags. 1095 1096 Args: 1097 name (Optional[str], optional): firewall rune name. Defaults to None. 1098 name_pattern (Optional[re.Pattern], optional): firewall rule name pattern. Defaults to None. 1099 target_tags (Optional[List[str]], optional): firewall target tags 1100 (if not specified any tag will match). Defaults to None. 1101 1102 Returns: 1103 List[VpcFirewallRule]: List of egress firewall rules 1104 """ 1105 rules = self._vpc_firewall.get_vpc_egress_rules(name, name_pattern, 1106 target_tags) 1107 return rules 1108 1109 def verify_ingress_rule_exists(self, name: str): 1110 """Verify that a certain VPC rule exists. This is useful to verify 1111 whether maybe a permission was missing on a shared VPC and an 1112 automatic rule couldn't be created.""" 1113 return self._vpc_firewall.verify_ingress_rule_exists(name) 1114 1115 def verify_egress_rule_exists(self, name: str): 1116 """Verify that a certain VPC rule exists. This is useful to verify 1117 whether maybe a permission was missing on a shared VPC and an 1118 automatic rule couldn't be created.""" 1119 return self._vpc_firewall.verify_egress_rule_exists(name)
Effective firewall rules for a VPC network or Instance.
Includes org/folder firewall policies).
991 def __init__(self, resource_data): 992 self._resource_data = resource_data 993 self._policies = [] 994 if 'firewallPolicys' in resource_data: 995 for policy in resource_data['firewallPolicys']: 996 self._policies.append(_FirewallPolicy(policy)) 997 self._vpc_firewall = _VpcFirewall(resource_data.get('firewalls', {}))
999 def check_connectivity_ingress( 1000 self, # 1001 *, 1002 src_ip: IPAddrOrNet, 1003 ip_protocol: str, 1004 port: Optional[int] = None, 1005 source_service_account: Optional[str] = None, 1006 source_tags: Optional[List[str]] = None, 1007 target_service_account: Optional[str] = None, 1008 target_tags: Optional[List[str]] = None) -> FirewallCheckResult: 1009 1010 if ip_protocol != 'ICMP' and port is None: 1011 raise ValueError('TCP and UDP must have port numbers') 1012 1013 # Firewall policies (organization, folders) 1014 for p in self._policies: 1015 result = p.check_connectivity_ingress( 1016 src_ip=src_ip, 1017 ip_protocol=ip_protocol, 1018 port=port, 1019 #target_network=self._network, 1020 target_service_account=target_service_account) 1021 if result.action != 'goto_next': 1022 return result 1023 1024 # VPC firewall rules 1025 return self._vpc_firewall.check_connectivity_ingress( 1026 src_ip=src_ip, 1027 ip_protocol=ip_protocol, 1028 port=port, 1029 source_service_account=source_service_account, 1030 source_tags=source_tags, 1031 target_service_account=target_service_account, 1032 target_tags=target_tags)
1034 def check_connectivity_egress( 1035 self, # 1036 *, 1037 src_ip: IPAddrOrNet, 1038 ip_protocol: str, 1039 port: Optional[int] = None, 1040 source_service_account: Optional[str] = None, 1041 source_tags: Optional[List[str]] = None, 1042 target_service_account: Optional[str] = None, 1043 target_tags: Optional[List[str]] = None) -> FirewallCheckResult: 1044 1045 if ip_protocol != 'ICMP' and port is None: 1046 raise ValueError('TCP and UDP must have port numbers') 1047 1048 # Firewall policies (organization, folders) 1049 for p in self._policies: 1050 result = p.check_connectivity_egress( 1051 src_ip=src_ip, 1052 ip_protocol=ip_protocol, 1053 port=port, 1054 #target_network=self._network, 1055 target_service_account=target_service_account) 1056 if result.action != 'goto_next': 1057 return result 1058 1059 # VPC firewall rules 1060 return self._vpc_firewall.check_connectivity_egress( 1061 src_ip=src_ip, 1062 ip_protocol=ip_protocol, 1063 port=port, 1064 source_service_account=source_service_account, 1065 source_tags=source_tags, 1066 target_service_account=target_service_account, 1067 target_tags=target_tags)
1069 def get_vpc_ingress_rules( 1070 self, 1071 name: Optional[str] = None, 1072 name_pattern: Optional[re.Pattern] = None, 1073 target_tags: Optional[List[str]] = None) -> List[VpcFirewallRule]: 1074 """Retrieve the list of ingress firewall rules matching name or name pattern and target tags. 1075 1076 Args: 1077 name (Optional[str], optional): firewall rune name. Defaults to None. 1078 name_pattern (Optional[re.Pattern], optional): firewall rule name pattern. Defaults to None. 1079 target_tags (Optional[List[str]], optional): firewall target tags 1080 (if not specified any tag will match). Defaults to None. 1081 1082 Returns: 1083 List[VpcFirewallRule]: List of ingress firewall rules 1084 """ 1085 rules = self._vpc_firewall.get_vpc_ingress_rules(name, name_pattern, 1086 target_tags) 1087 return rules
Retrieve the list of ingress firewall rules matching name or name pattern and target tags.
Arguments:
- name (Optional[str], optional): firewall rune name. Defaults to None.
- name_pattern (Optional[re.Pattern], optional): firewall rule name pattern. Defaults to None.
- target_tags (Optional[List[str]], optional): firewall target tags (if not specified any tag will match). Defaults to None.
Returns:
List[VpcFirewallRule]: List of ingress firewall rules
1089 def get_vpc_egress_rules( 1090 self, 1091 name: Optional[str] = None, 1092 name_pattern: Optional[re.Pattern] = None, 1093 target_tags: Optional[List[str]] = None) -> List[VpcFirewallRule]: 1094 """Retrieve the list of egress firewall rules matching name or name pattern and target tags. 1095 1096 Args: 1097 name (Optional[str], optional): firewall rune name. Defaults to None. 1098 name_pattern (Optional[re.Pattern], optional): firewall rule name pattern. Defaults to None. 1099 target_tags (Optional[List[str]], optional): firewall target tags 1100 (if not specified any tag will match). Defaults to None. 1101 1102 Returns: 1103 List[VpcFirewallRule]: List of egress firewall rules 1104 """ 1105 rules = self._vpc_firewall.get_vpc_egress_rules(name, name_pattern, 1106 target_tags) 1107 return rules
Retrieve the list of egress firewall rules matching name or name pattern and target tags.
Arguments:
- name (Optional[str], optional): firewall rune name. Defaults to None.
- name_pattern (Optional[re.Pattern], optional): firewall rule name pattern. Defaults to None.
- target_tags (Optional[List[str]], optional): firewall target tags (if not specified any tag will match). Defaults to None.
Returns:
List[VpcFirewallRule]: List of egress firewall rules
1109 def verify_ingress_rule_exists(self, name: str): 1110 """Verify that a certain VPC rule exists. This is useful to verify 1111 whether maybe a permission was missing on a shared VPC and an 1112 automatic rule couldn't be created.""" 1113 return self._vpc_firewall.verify_ingress_rule_exists(name)
Verify that a certain VPC rule exists. This is useful to verify whether maybe a permission was missing on a shared VPC and an automatic rule couldn't be created.
1115 def verify_egress_rule_exists(self, name: str): 1116 """Verify that a certain VPC rule exists. This is useful to verify 1117 whether maybe a permission was missing on a shared VPC and an 1118 automatic rule couldn't be created.""" 1119 return self._vpc_firewall.verify_egress_rule_exists(name)
Verify that a certain VPC rule exists. This is useful to verify whether maybe a permission was missing on a shared VPC and an automatic rule couldn't be created.
1122class VPCEffectiveFirewalls(EffectiveFirewalls): 1123 """Effective firewall rules for a VPC network. 1124 1125 Includes org/folder firewall policies).""" 1126 _network: Network 1127 1128 def __init__(self, network, resource_data): 1129 super().__init__(resource_data) 1130 self._network = network
Effective firewall rules for a VPC network.
Includes org/folder firewall policies).
1142@caching.cached_api_call(in_memory=True) 1143def get_network(project_id: str, network_name: str) -> Network: 1144 logging.debug('fetching network: %s/%s', project_id, network_name) 1145 compute = apis.get_api('compute', 'v1', project_id) 1146 request = compute.networks().get(project=project_id, network=network_name) 1147 response = request.execute(num_retries=config.API_RETRIES) 1148 return Network(project_id, response)
1151def get_subnetwork_from_url(url: str) -> Subnetwork: 1152 """Returns Subnetwork object given subnetwork url""" 1153 m = re.match((r'https://www.googleapis.com/compute/v1/projects/' 1154 r'([^/]+)/regions/([^/]+)/subnetworks/([^/]+)$'), url) 1155 if not m: 1156 raise ValueError(f"can't parse network url: {url}") 1157 (project_id, region, subnetwork_name) = (m.group(1), m.group(2), m.group(3)) 1158 return get_subnetwork(project_id, region, subnetwork_name)
Returns Subnetwork object given subnetwork url
1161def get_network_from_url(url: str) -> Network: 1162 m = re.match( 1163 r'https://www.googleapis.com/compute/v1/projects/([^/]+)/global/networks/([^/]+)', 1164 url) 1165 if not m: 1166 raise ValueError(f"can't parse network url: {url}") 1167 (project_id, network_name) = (m.group(1), m.group(2)) 1168 return get_network(project_id, network_name)
1171@caching.cached_api_call(in_memory=True) 1172def get_networks(project_id: str) -> List[Network]: 1173 logging.debug('fetching network: %s', project_id) 1174 compute = apis.get_api('compute', 'v1', project_id) 1175 request = compute.networks().list(project=project_id) 1176 response = request.execute(num_retries=config.API_RETRIES) 1177 return [Network(project_id, item) for item in response.get('items', [])]
1180@caching.cached_api_call(in_memory=True) 1181def get_subnetwork(project_id: str, region: str, 1182 subnetwork_name: str) -> Subnetwork: 1183 logging.debug('fetching network: %s/%s', project_id, subnetwork_name) 1184 compute = apis.get_api('compute', 'v1', project_id) 1185 request = compute.subnetworks().get(project=project_id, 1186 region=region, 1187 subnetwork=subnetwork_name) 1188 response = request.execute(num_retries=config.API_RETRIES) 1189 return Subnetwork(project_id, response)
1218@caching.cached_api_call(in_memory=True) 1219def get_routes(project_id: str) -> List[Route]: 1220 logging.debug('fetching routes: %s', project_id) 1221 compute = apis.get_api('compute', 'v1', project_id) 1222 request = compute.routes().list(project=project_id) 1223 response = request.execute(num_retries=config.API_RETRIES) 1224 return [Route(project_id, item) for item in response.get('items', [])]
1227@caching.cached_api_call(in_memory=True) 1228def get_zones(project_id: str) -> List[ManagedZone]: 1229 logging.debug('fetching DNS zones: %s', project_id) 1230 dns = apis.get_api('dns', 'v1beta2', project_id) 1231 request = dns.managedZones().list(project=project_id) 1232 response = request.execute(num_retries=config.API_RETRIES) 1233 zones = [] 1234 for zone in response.get('managedZones', []): 1235 request2 = dns.managedZones().get(project=project_id, 1236 managedZone=zone['name']) 1237 response2 = request2.execute(num_retries=config.API_RETRIES) 1238 zones.append(ManagedZone(project_id, response2)) 1239 return zones
1242@caching.cached_api_call(in_memory=True) 1243def get_routers(project_id: str, region: str, network) -> List[Router]: 1244 logging.debug('fetching routers: %s/%s', project_id, region) 1245 compute = apis.get_api('compute', 'v1', project_id) 1246 request = compute.routers().list(project=project_id, 1247 region=region, 1248 filter=f'network="{network.self_link}"') 1249 response = request.execute(num_retries=config.API_RETRIES) 1250 return [Router(project_id, item) for item in response.get('items', [])]
1253@caching.cached_api_call(in_memory=True) 1254def get_router(project_id: str, region: str, network) -> Router: 1255 logging.debug('fetching routers: %s/%s', project_id, region) 1256 compute = apis.get_api('compute', 'v1', project_id) 1257 request = compute.routers().list(project=project_id, 1258 region=region, 1259 filter=f'network="{network.self_link}"') 1260 response = request.execute(num_retries=config.API_RETRIES) 1261 return Router(project_id, next(iter(response.get('items', [{}]))))
1264@caching.cached_api_call(in_memory=True) 1265def get_router_by_name(project_id: str, region: str, 1266 router_name: str) -> Router: 1267 logging.debug('fetching router list: %s/%s in region %s', project_id, 1268 router_name, region) 1269 compute = apis.get_api('compute', 'v1', project_id) 1270 request = compute.routers().list(project=project_id, region=region) 1271 response = request.execute(num_retries=config.API_RETRIES) 1272 return next( 1273 Router(project_id, item) 1274 for item in response.get('items', []) 1275 if item['name'] == router_name)
1278@caching.cached_api_call(in_memory=True) 1279def nat_router_status(project_id: str, router_name: str, 1280 region: str) -> RouterStatus: 1281 logging.debug('fetching router status: %s/%s in region %s', project_id, 1282 router_name, region) 1283 compute = apis.get_api('compute', 'v1', project_id) 1284 request = compute.routers().getRouterStatus(project=project_id, 1285 router=router_name, 1286 region=region) 1287 response = request.execute(num_retries=config.API_RETRIES) 1288 if 'result' in str(response): 1289 return RouterStatus(project_id, response) 1290 else: 1291 logging.debug('unable to fetch router status: %s/%s in region %s', 1292 project_id, router_name, region) 1293 return RouterStatus(project_id, {})
1296@caching.cached_api_call(in_memory=True) 1297def get_nat_ip_info(project_id: str, router_name: str, 1298 region: str) -> RouterNatIpInfo: 1299 logging.debug('fetching NAT IP info for router: %s/%s in region %s', 1300 project_id, router_name, region) 1301 compute = apis.get_api('compute', 'v1', project_id) 1302 request = compute.routers().getNatIpInfo(project=project_id, 1303 router=router_name, 1304 region=region) 1305 response = request.execute(num_retries=config.API_RETRIES) 1306 if 'result' in str(response): 1307 return RouterNatIpInfo(project_id, response) 1308 else: 1309 logging.debug('unable to fetch Nat IP Info for router: %s/%s in region %s', 1310 project_id, router_name, region) 1311 return RouterNatIpInfo(project_id, {})
1314class VPCSubnetworkIAMPolicy(iam.BaseIAMPolicy): 1315 1316 def _is_resource_permission(self, permission): 1317 return True
Common class for IAM policies
1320@caching.cached_api_call(in_memory=True) 1321def get_subnetwork_iam_policy(project_id: str, region: str, 1322 subnetwork_name: str) -> VPCSubnetworkIAMPolicy: 1323 resource_name = (f'projects/{project_id}/regions/{region}/' 1324 f'subnetworks/{subnetwork_name}') 1325 1326 compute = apis.get_api('compute', 'v1', project_id) 1327 request = compute.subnetworks().getIamPolicy(project=project_id, 1328 region=region, 1329 resource=subnetwork_name) 1330 1331 return iam.fetch_iam_policy(request, VPCSubnetworkIAMPolicy, project_id, 1332 resource_name)
1335class Address(models.Resource): 1336 """IP Addresses.""" 1337 _resource_data: dict 1338 1339 def __init__(self, project_id, resource_data): 1340 super().__init__(project_id=project_id) 1341 self._resource_data = resource_data 1342 1343 @property 1344 def full_path(self) -> str: 1345 result = re.match(r'https://www.googleapis.com/compute/v1/(.*)', 1346 self.self_link) 1347 if result: 1348 return result.group(1) 1349 else: 1350 return f'>> {self.self_link}' 1351 1352 @property 1353 def short_path(self) -> str: 1354 path = self.project_id + '/' + self.name 1355 return path 1356 1357 @property 1358 def name(self) -> str: 1359 return self._resource_data['name'] 1360 1361 @property 1362 def self_link(self) -> str: 1363 return self._resource_data.get('selfLink', '') 1364 1365 @property 1366 def subnetwork(self) -> str: 1367 return self._resource_data['subnetwork'] 1368 1369 @property 1370 def status(self) -> str: 1371 return self._resource_data['status']
IP Addresses.
1343 @property 1344 def full_path(self) -> str: 1345 result = re.match(r'https://www.googleapis.com/compute/v1/(.*)', 1346 self.self_link) 1347 if result: 1348 return result.group(1) 1349 else: 1350 return f'>> {self.self_link}'
Returns the full path of this resource.
Example: 'projects/gcpdiag-gke-1-9b90/zones/europe-west4-a/clusters/gke1'
1352 @property 1353 def short_path(self) -> str: 1354 path = self.project_id + '/' + self.name 1355 return path
Returns the short name for this resource.
Note that it isn't clear from this name what kind of resource it is.
Example: 'gke1'
1374@caching.cached_api_call(in_memory=True) 1375def get_addresses(project_id: str) -> List[Address]: 1376 logging.debug('fetching addresses list: %s', project_id) 1377 compute = apis.get_api('compute', 'v1', project_id) 1378 addresses = [] 1379 request = compute.addresses().aggregatedList(project=project_id) 1380 response = request.execute(num_retries=config.API_RETRIES) 1381 addresses_by_regions = response['items'] 1382 for _, data_ in addresses_by_regions.items(): 1383 if 'addresses' not in data_: 1384 continue 1385 addresses.extend( 1386 [Address(project_id, address) for address in data_['addresses']]) 1387 return addresses