gcpdiag.queries.network
35class Subnetwork(models.Resource): 36 """A VPC subnetwork.""" 37 38 _resource_data: dict 39 40 def __init__(self, project_id, resource_data): 41 super().__init__(project_id=project_id) 42 self._resource_data = resource_data 43 44 @property 45 def full_path(self) -> str: 46 result = re.match(r'https://www.googleapis.com/compute/v1/(.*)', 47 self.self_link) 48 if result: 49 return result.group(1) 50 else: 51 return f'>> {self.self_link}' 52 53 @property 54 def short_path(self) -> str: 55 path = self.project_id + '/' + self.name 56 return path 57 58 @property 59 def name(self) -> str: 60 return self._resource_data['name'] 61 62 @property 63 def self_link(self) -> str: 64 return self._resource_data.get('selfLink', '') 65 66 @property 67 def ip_network(self) -> IPv4NetOrIPv6Net: 68 return ipaddress.ip_network(self._resource_data['ipCidrRange']) 69 70 @property 71 def region(self) -> str: 72 # https://www.googleapis.com/compute/v1/projects/gcpdiag-gke1-aaaa/regions/europe-west4 73 m = re.match( 74 r'https://www.googleapis.com/compute/v1/projects/([^/]+)/regions/([^/]+)', 75 self._resource_data['region']) 76 if not m: 77 raise RuntimeError( 78 f"can't parse region URL: {self._resource_data['region']}") 79 return m.group(2) 80 81 def is_private_ip_google_access(self) -> bool: 82 return self._resource_data.get('privateIpGoogleAccess', False) 83 84 @property 85 def network(self): 86 return self._resource_data['network']
A VPC subnetwork.
44 @property 45 def full_path(self) -> str: 46 result = re.match(r'https://www.googleapis.com/compute/v1/(.*)', 47 self.self_link) 48 if result: 49 return result.group(1) 50 else: 51 return f'>> {self.self_link}'
Returns the full path of this resource.
Example: 'projects/gcpdiag-gke-1-9b90/zones/europe-west4-a/clusters/gke1'
53 @property 54 def short_path(self) -> str: 55 path = self.project_id + '/' + self.name 56 return path
Returns the short name for this resource.
Note that it isn't clear from this name what kind of resource it is.
Example: 'gke1'
70 @property 71 def region(self) -> str: 72 # https://www.googleapis.com/compute/v1/projects/gcpdiag-gke1-aaaa/regions/europe-west4 73 m = re.match( 74 r'https://www.googleapis.com/compute/v1/projects/([^/]+)/regions/([^/]+)', 75 self._resource_data['region']) 76 if not m: 77 raise RuntimeError( 78 f"can't parse region URL: {self._resource_data['region']}") 79 return m.group(2)
89class Route(models.Resource): 90 """A VPC Route.""" 91 92 _resource_data: dict 93 94 def __init__(self, project_id, resource_data): 95 super().__init__(project_id=project_id) 96 self._resource_data = resource_data 97 98 @property 99 def full_path(self) -> str: 100 result = re.match(r'https://www.googleapis.com/compute/v1/(.*)', 101 self.self_link) 102 if result: 103 return result.group(1) 104 else: 105 return f'>> {self.self_link}' 106 107 @property 108 def short_path(self) -> str: 109 path = self.project_id + '/' + self.name 110 return path 111 112 @property 113 def name(self) -> str: 114 return self._resource_data['name'] 115 116 @property 117 def kind(self) -> str: 118 return self._resource_data['kind'] 119 120 @property 121 def self_link(self) -> str: 122 return self._resource_data['selfLink'] 123 124 @property 125 def network(self) -> str: 126 return self._resource_data['network'] 127 128 @property 129 def tags(self) -> List[str]: 130 if 'tags' in self._resource_data: 131 return self._resource_data['tags'] 132 return [] 133 134 @property 135 def dest_range(self) -> str: 136 return self._resource_data['destRange'] 137 138 @property 139 def next_hop_gateway(self) -> Optional[str]: 140 if 'nextHopGateway' in self._resource_data: 141 return self._resource_data['nextHopGateway'] 142 return None 143 144 @property 145 def next_hop_vpn_tunnel(self) -> Optional[str]: 146 return self._resource_data.get('nextHopVpnTunnel') 147 148 @property 149 def next_hop_hub(self) -> Optional[str]: 150 return self._resource_data.get('nextHopHub') 151 152 @property 153 def priority(self) -> int: 154 return self._resource_data['priority'] 155 156 def get_next_hop(self) -> Union[Dict[str, Any], Optional[str]]: 157 hop_types = { 158 'nextHopGateway': 'nextHopGateway', 159 'nextHopVpnTunnel': 'nextHopVpnTunnel', 160 'nextHopHub': 'nextHopHub', 161 'nextHopInstance': 'nextHopInstance', 162 'nextHopAddress': 'nextHopAddress', 163 'nextHopPeering': 'nextHopPeering', 164 'nextHopIlb': 'nextHopIlb', 165 'nextHopNetwork': 'nextHopNetwork', 166 'nextHopIp': 'nextHopIp' 167 } 168 169 for hop_type, value in hop_types.items(): 170 if self._resource_data.get(hop_type): 171 return {'type': value, 'link': self._resource_data[hop_type]} 172 return None 173 174 def check_route_match(self, ip1: IPAddrOrNet, ip2: str) -> bool: 175 ip2_list = [ipaddress.ip_network(ip2)] 176 if _ip_match(ip1, ip2_list, 'allow'): 177 return True 178 return False
A VPC Route.
98 @property 99 def full_path(self) -> str: 100 result = re.match(r'https://www.googleapis.com/compute/v1/(.*)', 101 self.self_link) 102 if result: 103 return result.group(1) 104 else: 105 return f'>> {self.self_link}'
Returns the full path of this resource.
Example: 'projects/gcpdiag-gke-1-9b90/zones/europe-west4-a/clusters/gke1'
107 @property 108 def short_path(self) -> str: 109 path = self.project_id + '/' + self.name 110 return path
Returns the short name for this resource.
Note that it isn't clear from this name what kind of resource it is.
Example: 'gke1'
156 def get_next_hop(self) -> Union[Dict[str, Any], Optional[str]]: 157 hop_types = { 158 'nextHopGateway': 'nextHopGateway', 159 'nextHopVpnTunnel': 'nextHopVpnTunnel', 160 'nextHopHub': 'nextHopHub', 161 'nextHopInstance': 'nextHopInstance', 162 'nextHopAddress': 'nextHopAddress', 163 'nextHopPeering': 'nextHopPeering', 164 'nextHopIlb': 'nextHopIlb', 165 'nextHopNetwork': 'nextHopNetwork', 166 'nextHopIp': 'nextHopIp' 167 } 168 169 for hop_type, value in hop_types.items(): 170 if self._resource_data.get(hop_type): 171 return {'type': value, 'link': self._resource_data[hop_type]} 172 return None
181class ManagedZone(models.Resource): 182 """ 183 Represent a DNS zone (public or private 184 185 https://cloud.google.com/dns/docs/reference/v1beta2/managedZones 186 """ 187 _resource_data: dict 188 189 def __init__(self, project_id, resource_data): 190 super().__init__(project_id=project_id) 191 self._resource_data = resource_data 192 193 @property 194 def cloud_logging_config(self) -> bool: 195 return self._resource_data['cloudLoggingConfig'].get('enableLogging', False) 196 197 @property 198 def is_public(self) -> bool: 199 return self._resource_data['visibility'] == 'public' 200 201 @property 202 def vpc_attached(self) -> bool: 203 if 'privateVisibilityConfig' not in self._resource_data: 204 self._resource_data['privateVisibilityConfig'] = {} 205 206 return (self._resource_data['privateVisibilityConfig'].get( 207 'networks', False) or 208 self._resource_data['privateVisibilityConfig'].get( 209 'gkeClusters', False)) 210 211 @property 212 def dnssec_config_state(self) -> bool: 213 if 'dnssecConfig' not in self._resource_data: 214 self._resource_data['dnssecConfig'] = {} 215 216 return self._resource_data['dnssecConfig'].get('state', False) 217 218 @property 219 def name(self) -> str: 220 return self._resource_data['name'] 221 222 @property 223 def full_path(self) -> str: 224 result = re.match(r'https://dns.googleapis.com/dns/v1beta2/(.*)', 225 self.self_link) 226 if result: 227 return result.group(1) 228 else: 229 return f'>> {self.self_link}' 230 231 @property 232 def short_path(self) -> str: 233 path = self.project_id + '/' + self.name 234 return path 235 236 @property 237 def self_link(self) -> str: 238 return self._resource_data.get('selfLink', '')
Represent a DNS zone (public or private
https://cloud.google.com/dns/docs/reference/v1beta2/managedZones
201 @property 202 def vpc_attached(self) -> bool: 203 if 'privateVisibilityConfig' not in self._resource_data: 204 self._resource_data['privateVisibilityConfig'] = {} 205 206 return (self._resource_data['privateVisibilityConfig'].get( 207 'networks', False) or 208 self._resource_data['privateVisibilityConfig'].get( 209 'gkeClusters', False))
222 @property 223 def full_path(self) -> str: 224 result = re.match(r'https://dns.googleapis.com/dns/v1beta2/(.*)', 225 self.self_link) 226 if result: 227 return result.group(1) 228 else: 229 return f'>> {self.self_link}'
Returns the full path of this resource.
Example: 'projects/gcpdiag-gke-1-9b90/zones/europe-west4-a/clusters/gke1'
241class Router(models.Resource): 242 """A VPC Router.""" 243 244 _resource_data: dict 245 246 def __init__(self, project_id, resource_data): 247 super().__init__(project_id=project_id) 248 self._resource_data = resource_data 249 self._nats = None 250 251 @property 252 def full_path(self) -> str: 253 result = re.match(r'https://www.googleapis.com/compute/v1/(.*)', 254 self.self_link) 255 if result: 256 return result.group(1) 257 else: 258 return f'>> {self.self_link}' 259 260 @property 261 def short_path(self) -> str: 262 path = self.project_id + '/' + self.name 263 return path 264 265 @property 266 def name(self) -> str: 267 return self._resource_data['name'] 268 269 @property 270 def self_link(self) -> str: 271 return self._resource_data.get('selfLink', '') 272 273 @property 274 def network(self) -> str: 275 return self._resource_data['network'] 276 277 def get_network_name(self) -> str: 278 logging.info('inside get_network_name function') 279 if self._resource_data['network']: 280 return self._resource_data['network'].split('/')[-1] 281 return '' 282 283 @property 284 def nats(self): 285 return self._resource_data.get('nats', []) 286 287 def get_nat_ip_allocate_option(self, nat_gateway) -> str: 288 nats = self._resource_data.get('nats', []) 289 nat = [n for n in nats if n['name'] == nat_gateway] 290 return nat[0].get('natIpAllocateOption', '') 291 292 def get_enable_dynamic_port_allocation(self, nat_gateway) -> str: 293 nats = self._resource_data.get('nats', []) 294 nat = [n for n in nats if n['name'] == nat_gateway] 295 return nat[0].get('enableDynamicPortAllocation', '') 296 297 def subnet_has_nat(self, subnetwork): 298 if not self._resource_data.get('nats', []): 299 return False 300 for n in self._resource_data.get('nats', []): 301 if n['sourceSubnetworkIpRangesToNat'] == 'LIST_OF_SUBNETWORKS': 302 # Cloud NAT configure for specific subnets 303 if 'subnetworks' in n and subnetwork.self_link in [ 304 s['name'] for s in n['subnetworks'] 305 ]: 306 return True 307 else: 308 # Cloud NAT configured for all subnets 309 return True 310 return False
A VPC Router.
251 @property 252 def full_path(self) -> str: 253 result = re.match(r'https://www.googleapis.com/compute/v1/(.*)', 254 self.self_link) 255 if result: 256 return result.group(1) 257 else: 258 return f'>> {self.self_link}'
Returns the full path of this resource.
Example: 'projects/gcpdiag-gke-1-9b90/zones/europe-west4-a/clusters/gke1'
260 @property 261 def short_path(self) -> str: 262 path = self.project_id + '/' + self.name 263 return path
Returns the short name for this resource.
Note that it isn't clear from this name what kind of resource it is.
Example: 'gke1'
297 def subnet_has_nat(self, subnetwork): 298 if not self._resource_data.get('nats', []): 299 return False 300 for n in self._resource_data.get('nats', []): 301 if n['sourceSubnetworkIpRangesToNat'] == 'LIST_OF_SUBNETWORKS': 302 # Cloud NAT configure for specific subnets 303 if 'subnetworks' in n and subnetwork.self_link in [ 304 s['name'] for s in n['subnetworks'] 305 ]: 306 return True 307 else: 308 # Cloud NAT configured for all subnets 309 return True 310 return False
313class RouterStatus(models.Resource): 314 """NAT Router Status""" 315 316 _resource_data: dict 317 318 def __init__(self, project_id, resource_data): 319 super().__init__(project_id=project_id) 320 self._resource_data = resource_data 321 322 @property 323 def full_path(self) -> str: 324 result = re.match(r'https://www.googleapis.com/compute/v1/(.*)', 325 self.self_link) 326 if result: 327 return result.group(1) 328 else: 329 return f'>> {self.self_link}' 330 331 @property 332 def short_path(self) -> str: 333 path = self.project_id + '/' + self.name 334 return path 335 336 @property 337 def name(self) -> str: 338 return self._resource_data.get('name', '') 339 340 @property 341 def self_link(self) -> str: 342 return self._resource_data.get('selfLink', '') 343 344 @property 345 def min_extra_nat_ips_needed(self) -> str: 346 nat_status = self._resource_data.get('result', {}).get('natStatus', {}) 347 return nat_status[0].get('minExtraNatIpsNeeded', None) 348 349 @property 350 def num_vms_with_nat_mappings(self) -> str: 351 nat_status = self._resource_data.get('result', {}).get('natStatus', {}) 352 return nat_status[0].get('numVmEndpointsWithNatMappings', None)
NAT Router Status
322 @property 323 def full_path(self) -> str: 324 result = re.match(r'https://www.googleapis.com/compute/v1/(.*)', 325 self.self_link) 326 if result: 327 return result.group(1) 328 else: 329 return f'>> {self.self_link}'
Returns the full path of this resource.
Example: 'projects/gcpdiag-gke-1-9b90/zones/europe-west4-a/clusters/gke1'
331 @property 332 def short_path(self) -> str: 333 path = self.project_id + '/' + self.name 334 return path
Returns the short name for this resource.
Note that it isn't clear from this name what kind of resource it is.
Example: 'gke1'
355class RouterNatIpInfo(models.Resource): 356 """NAT IP Info""" 357 358 _resource_data: dict 359 360 def __init__(self, project_id, resource_data): 361 super().__init__(project_id=project_id) 362 self._resource_data = resource_data 363 364 @property 365 def full_path(self) -> str: 366 result = re.match(r'https://www.googleapis.com/compute/v1/(.*)', 367 self.self_link) 368 if result: 369 return result.group(1) 370 else: 371 return f'>> {self.self_link}' 372 373 @property 374 def short_path(self) -> str: 375 path = self.project_id + '/' + self.name 376 return path 377 378 @property 379 def self_link(self) -> str: 380 return self._resource_data.get('selfLink', '') 381 382 @property 383 def name(self) -> str: 384 return self._resource_data.get('name', '') 385 386 @property 387 def result(self) -> str: 388 return self._resource_data.get('result', [])
NAT IP Info
364 @property 365 def full_path(self) -> str: 366 result = re.match(r'https://www.googleapis.com/compute/v1/(.*)', 367 self.self_link) 368 if result: 369 return result.group(1) 370 else: 371 return f'>> {self.self_link}'
Returns the full path of this resource.
Example: 'projects/gcpdiag-gke-1-9b90/zones/europe-west4-a/clusters/gke1'
373 @property 374 def short_path(self) -> str: 375 path = self.project_id + '/' + self.name 376 return path
Returns the short name for this resource.
Note that it isn't clear from this name what kind of resource it is.
Example: 'gke1'
391@dataclasses.dataclass 392class Peering: 393 """VPC Peerings""" 394 395 name: str 396 url: str 397 state: str 398 exports_custom_routes: bool 399 imports_custom_routes: bool 400 auto_creates_routes: bool 401 402 def __str__(self): 403 return self.name
VPC Peerings
406class Network(models.Resource): 407 """A VPC network.""" 408 _resource_data: dict 409 _subnetworks: Optional[Dict[str, Subnetwork]] 410 411 def __init__(self, project_id, resource_data): 412 super().__init__(project_id=project_id) 413 self._resource_data = resource_data 414 self._subnetworks = None 415 416 @property 417 def full_path(self) -> str: 418 result = re.match(r'https://www.googleapis.com/compute/v1/(.*)', 419 self.self_link) 420 if result: 421 return result.group(1) 422 else: 423 return f'>> {self.self_link}' 424 425 @property 426 def short_path(self) -> str: 427 path = self.project_id + '/' + self.name 428 return path 429 430 @property 431 def name(self) -> str: 432 return self._resource_data['name'] 433 434 @property 435 def self_link(self) -> str: 436 return self._resource_data.get('selfLink', '') 437 438 @property 439 def firewall(self) -> 'EffectiveFirewalls': 440 return _get_effective_firewalls(self) 441 442 @property 443 def mtu(self) -> int: 444 if 'mtu' in self._resource_data: 445 return self._resource_data['mtu'] 446 return DEFAULT_MTU 447 448 @property 449 def subnetworks(self) -> Dict[str, Subnetwork]: 450 return _batch_get_subnetworks( 451 self._project_id, frozenset(self._resource_data.get('subnetworks', []))) 452 453 @property 454 def peerings(self) -> List[Peering]: 455 return [ 456 Peering(peer['name'], peer['network'], peer['state'], 457 peer['exportCustomRoutes'], peer['importCustomRoutes'], 458 peer['autoCreateRoutes']) 459 for peer in self._resource_data.get('peerings', []) 460 ]
A VPC network.
416 @property 417 def full_path(self) -> str: 418 result = re.match(r'https://www.googleapis.com/compute/v1/(.*)', 419 self.self_link) 420 if result: 421 return result.group(1) 422 else: 423 return f'>> {self.self_link}'
Returns the full path of this resource.
Example: 'projects/gcpdiag-gke-1-9b90/zones/europe-west4-a/clusters/gke1'
425 @property 426 def short_path(self) -> str: 427 path = self.project_id + '/' + self.name 428 return path
Returns the short name for this resource.
Note that it isn't clear from this name what kind of resource it is.
Example: 'gke1'
568@dataclasses.dataclass 569class FirewallCheckResult: 570 """The result of a firewall connectivity check.""" 571 572 action: str 573 firewall_policy_name: Optional[str] = None 574 firewall_policy_rule_description: Optional[str] = None 575 vpc_firewall_rule_id: Optional[str] = None 576 vpc_firewall_rule_name: Optional[str] = None 577 578 def __str__(self): 579 return self.action 580 581 @property 582 def matched_by_str(self): 583 if self.firewall_policy_name: 584 if self.firewall_policy_rule_description: 585 return f'policy: {self.firewall_policy_name}, rule: {self.firewall_policy_rule_description}' 586 else: 587 return f'policy: {self.firewall_policy_name}' 588 elif self.vpc_firewall_rule_name: 589 return f'vpc firewall rule: {self.vpc_firewall_rule_name}'
The result of a firewall connectivity check.
581 @property 582 def matched_by_str(self): 583 if self.firewall_policy_name: 584 if self.firewall_policy_rule_description: 585 return f'policy: {self.firewall_policy_name}, rule: {self.firewall_policy_rule_description}' 586 else: 587 return f'policy: {self.firewall_policy_name}' 588 elif self.vpc_firewall_rule_name: 589 return f'vpc firewall rule: {self.vpc_firewall_rule_name}'
592class FirewallRuleNotFoundError(Exception): 593 rule_name: str 594 595 def __init__(self, name, disabled=False): 596 # Call the base class constructor with the parameters it needs 597 super().__init__(f'firewall rule not found: {name}') 598 self.rule_name = name 599 self.disabled = disabled
Common base class for all non-exit exceptions.
602class VpcFirewallRule: 603 """Represents firewall rule""" 604 605 def __init__(self, resource_data): 606 self._resource_data = resource_data 607 608 @property 609 def name(self) -> str: 610 return self._resource_data['name'] 611 612 @property 613 def source_ranges(self) -> List[ipaddress.IPv4Network]: 614 return self._resource_data['sourceRanges'] 615 616 @property 617 def target_tags(self) -> set: 618 return self._resource_data['targetTags'] 619 620 @property 621 def allowed(self) -> List[dict]: 622 return self._resource_data['allowed'] 623 624 def is_enabled(self) -> bool: 625 return not self._resource_data['disabled']
Represents firewall rule
977class EffectiveFirewalls: 978 """Effective firewall rules for a VPC network or Instance. 979 980 Includes org/folder firewall policies).""" 981 _resource_data: dict 982 _policies: List[_FirewallPolicy] 983 _vpc_firewall: _VpcFirewall 984 985 def __init__(self, resource_data): 986 self._resource_data = resource_data 987 self._policies = [] 988 if 'firewallPolicys' in resource_data: 989 for policy in resource_data['firewallPolicys']: 990 self._policies.append(_FirewallPolicy(policy)) 991 self._vpc_firewall = _VpcFirewall(resource_data.get('firewalls', {})) 992 993 def check_connectivity_ingress( 994 self, # 995 *, 996 src_ip: IPAddrOrNet, 997 ip_protocol: str, 998 port: Optional[int] = None, 999 source_service_account: Optional[str] = None, 1000 source_tags: Optional[List[str]] = None, 1001 target_service_account: Optional[str] = None, 1002 target_tags: Optional[List[str]] = None) -> FirewallCheckResult: 1003 1004 if ip_protocol != 'ICMP' and port is None: 1005 raise ValueError('TCP and UDP must have port numbers') 1006 1007 # Firewall policies (organization, folders) 1008 for p in self._policies: 1009 result = p.check_connectivity_ingress( 1010 src_ip=src_ip, 1011 ip_protocol=ip_protocol, 1012 port=port, 1013 #target_network=self._network, 1014 target_service_account=target_service_account) 1015 if result.action != 'goto_next': 1016 return result 1017 1018 # VPC firewall rules 1019 return self._vpc_firewall.check_connectivity_ingress( 1020 src_ip=src_ip, 1021 ip_protocol=ip_protocol, 1022 port=port, 1023 source_service_account=source_service_account, 1024 source_tags=source_tags, 1025 target_service_account=target_service_account, 1026 target_tags=target_tags) 1027 1028 def check_connectivity_egress( 1029 self, # 1030 *, 1031 src_ip: IPAddrOrNet, 1032 ip_protocol: str, 1033 port: Optional[int] = None, 1034 source_service_account: Optional[str] = None, 1035 source_tags: Optional[List[str]] = None, 1036 target_service_account: Optional[str] = None, 1037 target_tags: Optional[List[str]] = None) -> FirewallCheckResult: 1038 1039 if ip_protocol != 'ICMP' and port is None: 1040 raise ValueError('TCP and UDP must have port numbers') 1041 1042 # Firewall policies (organization, folders) 1043 for p in self._policies: 1044 result = p.check_connectivity_egress( 1045 src_ip=src_ip, 1046 ip_protocol=ip_protocol, 1047 port=port, 1048 #target_network=self._network, 1049 target_service_account=target_service_account) 1050 if result.action != 'goto_next': 1051 return result 1052 1053 # VPC firewall rules 1054 return self._vpc_firewall.check_connectivity_egress( 1055 src_ip=src_ip, 1056 ip_protocol=ip_protocol, 1057 port=port, 1058 source_service_account=source_service_account, 1059 source_tags=source_tags, 1060 target_service_account=target_service_account, 1061 target_tags=target_tags) 1062 1063 def get_vpc_ingress_rules( 1064 self, 1065 name: Optional[str] = None, 1066 name_pattern: Optional[re.Pattern] = None, 1067 target_tags: Optional[List[str]] = None) -> List[VpcFirewallRule]: 1068 """Retrieve the list of ingress firewall rules matching name or name pattern and target tags. 1069 1070 Args: 1071 name (Optional[str], optional): firewall rune name. Defaults to None. 1072 name_pattern (Optional[re.Pattern], optional): firewall rule name pattern. Defaults to None. 1073 target_tags (Optional[List[str]], optional): firewall target tags 1074 (if not specified any tag will match). Defaults to None. 1075 1076 Returns: 1077 List[VpcFirewallRule]: List of ingress firewall rules 1078 """ 1079 rules = self._vpc_firewall.get_vpc_ingress_rules(name, name_pattern, 1080 target_tags) 1081 return rules 1082 1083 def get_vpc_egress_rules( 1084 self, 1085 name: Optional[str] = None, 1086 name_pattern: Optional[re.Pattern] = None, 1087 target_tags: Optional[List[str]] = None) -> List[VpcFirewallRule]: 1088 """Retrieve the list of egress firewall rules matching name or name pattern and target tags. 1089 1090 Args: 1091 name (Optional[str], optional): firewall rune name. Defaults to None. 1092 name_pattern (Optional[re.Pattern], optional): firewall rule name pattern. Defaults to None. 1093 target_tags (Optional[List[str]], optional): firewall target tags 1094 (if not specified any tag will match). Defaults to None. 1095 1096 Returns: 1097 List[VpcFirewallRule]: List of egress firewall rules 1098 """ 1099 rules = self._vpc_firewall.get_vpc_egress_rules(name, name_pattern, 1100 target_tags) 1101 return rules 1102 1103 def verify_ingress_rule_exists(self, name: str): 1104 """Verify that a certain VPC rule exists. This is useful to verify 1105 whether maybe a permission was missing on a shared VPC and an 1106 automatic rule couldn't be created.""" 1107 return self._vpc_firewall.verify_ingress_rule_exists(name) 1108 1109 def verify_egress_rule_exists(self, name: str): 1110 """Verify that a certain VPC rule exists. This is useful to verify 1111 whether maybe a permission was missing on a shared VPC and an 1112 automatic rule couldn't be created.""" 1113 return self._vpc_firewall.verify_egress_rule_exists(name)
Effective firewall rules for a VPC network or Instance.
Includes org/folder firewall policies).
985 def __init__(self, resource_data): 986 self._resource_data = resource_data 987 self._policies = [] 988 if 'firewallPolicys' in resource_data: 989 for policy in resource_data['firewallPolicys']: 990 self._policies.append(_FirewallPolicy(policy)) 991 self._vpc_firewall = _VpcFirewall(resource_data.get('firewalls', {}))
993 def check_connectivity_ingress( 994 self, # 995 *, 996 src_ip: IPAddrOrNet, 997 ip_protocol: str, 998 port: Optional[int] = None, 999 source_service_account: Optional[str] = None, 1000 source_tags: Optional[List[str]] = None, 1001 target_service_account: Optional[str] = None, 1002 target_tags: Optional[List[str]] = None) -> FirewallCheckResult: 1003 1004 if ip_protocol != 'ICMP' and port is None: 1005 raise ValueError('TCP and UDP must have port numbers') 1006 1007 # Firewall policies (organization, folders) 1008 for p in self._policies: 1009 result = p.check_connectivity_ingress( 1010 src_ip=src_ip, 1011 ip_protocol=ip_protocol, 1012 port=port, 1013 #target_network=self._network, 1014 target_service_account=target_service_account) 1015 if result.action != 'goto_next': 1016 return result 1017 1018 # VPC firewall rules 1019 return self._vpc_firewall.check_connectivity_ingress( 1020 src_ip=src_ip, 1021 ip_protocol=ip_protocol, 1022 port=port, 1023 source_service_account=source_service_account, 1024 source_tags=source_tags, 1025 target_service_account=target_service_account, 1026 target_tags=target_tags)
1028 def check_connectivity_egress( 1029 self, # 1030 *, 1031 src_ip: IPAddrOrNet, 1032 ip_protocol: str, 1033 port: Optional[int] = None, 1034 source_service_account: Optional[str] = None, 1035 source_tags: Optional[List[str]] = None, 1036 target_service_account: Optional[str] = None, 1037 target_tags: Optional[List[str]] = None) -> FirewallCheckResult: 1038 1039 if ip_protocol != 'ICMP' and port is None: 1040 raise ValueError('TCP and UDP must have port numbers') 1041 1042 # Firewall policies (organization, folders) 1043 for p in self._policies: 1044 result = p.check_connectivity_egress( 1045 src_ip=src_ip, 1046 ip_protocol=ip_protocol, 1047 port=port, 1048 #target_network=self._network, 1049 target_service_account=target_service_account) 1050 if result.action != 'goto_next': 1051 return result 1052 1053 # VPC firewall rules 1054 return self._vpc_firewall.check_connectivity_egress( 1055 src_ip=src_ip, 1056 ip_protocol=ip_protocol, 1057 port=port, 1058 source_service_account=source_service_account, 1059 source_tags=source_tags, 1060 target_service_account=target_service_account, 1061 target_tags=target_tags)
1063 def get_vpc_ingress_rules( 1064 self, 1065 name: Optional[str] = None, 1066 name_pattern: Optional[re.Pattern] = None, 1067 target_tags: Optional[List[str]] = None) -> List[VpcFirewallRule]: 1068 """Retrieve the list of ingress firewall rules matching name or name pattern and target tags. 1069 1070 Args: 1071 name (Optional[str], optional): firewall rune name. Defaults to None. 1072 name_pattern (Optional[re.Pattern], optional): firewall rule name pattern. Defaults to None. 1073 target_tags (Optional[List[str]], optional): firewall target tags 1074 (if not specified any tag will match). Defaults to None. 1075 1076 Returns: 1077 List[VpcFirewallRule]: List of ingress firewall rules 1078 """ 1079 rules = self._vpc_firewall.get_vpc_ingress_rules(name, name_pattern, 1080 target_tags) 1081 return rules
Retrieve the list of ingress firewall rules matching name or name pattern and target tags.
Arguments:
- name (Optional[str], optional): firewall rune name. Defaults to None.
- name_pattern (Optional[re.Pattern], optional): firewall rule name pattern. Defaults to None.
- target_tags (Optional[List[str]], optional): firewall target tags (if not specified any tag will match). Defaults to None.
Returns:
List[VpcFirewallRule]: List of ingress firewall rules
1083 def get_vpc_egress_rules( 1084 self, 1085 name: Optional[str] = None, 1086 name_pattern: Optional[re.Pattern] = None, 1087 target_tags: Optional[List[str]] = None) -> List[VpcFirewallRule]: 1088 """Retrieve the list of egress firewall rules matching name or name pattern and target tags. 1089 1090 Args: 1091 name (Optional[str], optional): firewall rune name. Defaults to None. 1092 name_pattern (Optional[re.Pattern], optional): firewall rule name pattern. Defaults to None. 1093 target_tags (Optional[List[str]], optional): firewall target tags 1094 (if not specified any tag will match). Defaults to None. 1095 1096 Returns: 1097 List[VpcFirewallRule]: List of egress firewall rules 1098 """ 1099 rules = self._vpc_firewall.get_vpc_egress_rules(name, name_pattern, 1100 target_tags) 1101 return rules
Retrieve the list of egress firewall rules matching name or name pattern and target tags.
Arguments:
- name (Optional[str], optional): firewall rune name. Defaults to None.
- name_pattern (Optional[re.Pattern], optional): firewall rule name pattern. Defaults to None.
- target_tags (Optional[List[str]], optional): firewall target tags (if not specified any tag will match). Defaults to None.
Returns:
List[VpcFirewallRule]: List of egress firewall rules
1103 def verify_ingress_rule_exists(self, name: str): 1104 """Verify that a certain VPC rule exists. This is useful to verify 1105 whether maybe a permission was missing on a shared VPC and an 1106 automatic rule couldn't be created.""" 1107 return self._vpc_firewall.verify_ingress_rule_exists(name)
Verify that a certain VPC rule exists. This is useful to verify whether maybe a permission was missing on a shared VPC and an automatic rule couldn't be created.
1109 def verify_egress_rule_exists(self, name: str): 1110 """Verify that a certain VPC rule exists. This is useful to verify 1111 whether maybe a permission was missing on a shared VPC and an 1112 automatic rule couldn't be created.""" 1113 return self._vpc_firewall.verify_egress_rule_exists(name)
Verify that a certain VPC rule exists. This is useful to verify whether maybe a permission was missing on a shared VPC and an automatic rule couldn't be created.
1116class VPCEffectiveFirewalls(EffectiveFirewalls): 1117 """Effective firewall rules for a VPC network. 1118 1119 Includes org/folder firewall policies).""" 1120 _network: Network 1121 1122 def __init__(self, network, resource_data): 1123 super().__init__(resource_data) 1124 self._network = network
Effective firewall rules for a VPC network.
Includes org/folder firewall policies).
1136@caching.cached_api_call(in_memory=True) 1137def get_network(project_id: str, network_name: str) -> Network: 1138 logging.info('fetching network: %s/%s', project_id, network_name) 1139 compute = apis.get_api('compute', 'v1', project_id) 1140 request = compute.networks().get(project=project_id, network=network_name) 1141 response = request.execute(num_retries=config.API_RETRIES) 1142 return Network(project_id, response)
1145def get_subnetwork_from_url(url: str) -> Subnetwork: 1146 """Returns Subnetwork object given subnetwork url""" 1147 m = re.match((r'https://www.googleapis.com/compute/v1/projects/' 1148 r'([^/]+)/regions/([^/]+)/subnetworks/([^/]+)$'), url) 1149 if not m: 1150 raise ValueError(f"can't parse network url: {url}") 1151 (project_id, region, subnetwork_name) = (m.group(1), m.group(2), m.group(3)) 1152 return get_subnetwork(project_id, region, subnetwork_name)
Returns Subnetwork object given subnetwork url
1155def get_network_from_url(url: str) -> Network: 1156 m = re.match( 1157 r'https://www.googleapis.com/compute/v1/projects/([^/]+)/global/networks/([^/]+)', 1158 url) 1159 if not m: 1160 raise ValueError(f"can't parse network url: {url}") 1161 (project_id, network_name) = (m.group(1), m.group(2)) 1162 return get_network(project_id, network_name)
1165@caching.cached_api_call(in_memory=True) 1166def get_networks(project_id: str) -> List[Network]: 1167 logging.info('fetching network: %s', project_id) 1168 compute = apis.get_api('compute', 'v1', project_id) 1169 request = compute.networks().list(project=project_id) 1170 response = request.execute(num_retries=config.API_RETRIES) 1171 return [Network(project_id, item) for item in response.get('items', [])]
1174@caching.cached_api_call(in_memory=True) 1175def get_subnetwork(project_id: str, region: str, 1176 subnetwork_name: str) -> Subnetwork: 1177 logging.info('fetching network: %s/%s', project_id, subnetwork_name) 1178 compute = apis.get_api('compute', 'v1', project_id) 1179 request = compute.subnetworks().get(project=project_id, 1180 region=region, 1181 subnetwork=subnetwork_name) 1182 response = request.execute(num_retries=config.API_RETRIES) 1183 return Subnetwork(project_id, response)
1212@caching.cached_api_call(in_memory=True) 1213def get_routes(project_id: str) -> List[Route]: 1214 logging.info('fetching routes: %s', project_id) 1215 compute = apis.get_api('compute', 'v1', project_id) 1216 request = compute.routes().list(project=project_id) 1217 response = request.execute(num_retries=config.API_RETRIES) 1218 return [Route(project_id, item) for item in response.get('items', [])]
1221@caching.cached_api_call(in_memory=True) 1222def get_zones(project_id: str) -> List[ManagedZone]: 1223 logging.info('fetching DNS zones: %s', project_id) 1224 dns = apis.get_api('dns', 'v1beta2', project_id) 1225 request = dns.managedZones().list(project=project_id) 1226 response = request.execute(num_retries=config.API_RETRIES) 1227 zones = [] 1228 for zone in response.get('managedZones', []): 1229 request2 = dns.managedZones().get(project=project_id, 1230 managedZone=zone['name']) 1231 response2 = request2.execute(num_retries=config.API_RETRIES) 1232 zones.append(ManagedZone(project_id, response2)) 1233 return zones
1236@caching.cached_api_call(in_memory=True) 1237def get_routers(project_id: str, region: str, network) -> List[Router]: 1238 logging.info('fetching routers: %s/%s', project_id, region) 1239 compute = apis.get_api('compute', 'v1', project_id) 1240 request = compute.routers().list(project=project_id, 1241 region=region, 1242 filter=f'network="{network.self_link}"') 1243 response = request.execute(num_retries=config.API_RETRIES) 1244 return [Router(project_id, item) for item in response.get('items', [])]
1247@caching.cached_api_call(in_memory=True) 1248def get_router(project_id: str, region: str, network) -> Router: 1249 logging.info('fetching routers: %s/%s', project_id, region) 1250 compute = apis.get_api('compute', 'v1', project_id) 1251 request = compute.routers().list(project=project_id, 1252 region=region, 1253 filter=f'network="{network.self_link}"') 1254 response = request.execute(num_retries=config.API_RETRIES) 1255 return Router(project_id, next(iter(response.get('items', [{}]))))
1258@caching.cached_api_call(in_memory=True) 1259def get_router_by_name(project_id: str, region: str, 1260 router_name: str) -> Router: 1261 logging.info('fetching router list: %s/%s in region %s', project_id, 1262 router_name, region) 1263 compute = apis.get_api('compute', 'v1', project_id) 1264 request = compute.routers().list(project=project_id, region=region) 1265 response = request.execute(num_retries=config.API_RETRIES) 1266 return next( 1267 Router(project_id, item) 1268 for item in response.get('items', []) 1269 if item['name'] == router_name)
1272@caching.cached_api_call(in_memory=True) 1273def nat_router_status(project_id: str, router_name: str, 1274 region: str) -> RouterStatus: 1275 logging.info('fetching router status: %s/%s in region %s', project_id, 1276 router_name, region) 1277 compute = apis.get_api('compute', 'v1', project_id) 1278 request = compute.routers().getRouterStatus(project=project_id, 1279 router=router_name, 1280 region=region) 1281 response = request.execute(num_retries=config.API_RETRIES) 1282 if 'result' in str(response): 1283 return RouterStatus(project_id, response) 1284 else: 1285 logging.info('unable to fetch router status: %s/%s in region %s', 1286 project_id, router_name, region) 1287 return RouterStatus(project_id, {})
1290@caching.cached_api_call(in_memory=True) 1291def get_nat_ip_info(project_id: str, router_name: str, 1292 region: str) -> RouterNatIpInfo: 1293 logging.info('fetching NAT IP info for router: %s/%s in region %s', 1294 project_id, router_name, region) 1295 compute = apis.get_api('compute', 'v1', project_id) 1296 request = compute.routers().getNatIpInfo(project=project_id, 1297 router=router_name, 1298 region=region) 1299 response = request.execute(num_retries=config.API_RETRIES) 1300 if 'result' in str(response): 1301 return RouterNatIpInfo(project_id, response) 1302 else: 1303 logging.info('unable to fetch Nat IP Info for router: %s/%s in region %s', 1304 project_id, router_name, region) 1305 return RouterNatIpInfo(project_id, {})
1308class VPCSubnetworkIAMPolicy(iam.BaseIAMPolicy): 1309 1310 def _is_resource_permission(self, permission): 1311 return True
Common class for IAM policies
1314@caching.cached_api_call(in_memory=True) 1315def get_subnetwork_iam_policy(project_id: str, region: str, 1316 subnetwork_name: str) -> VPCSubnetworkIAMPolicy: 1317 resource_name = (f'projects/{project_id}/regions/{region}/' 1318 f'subnetworks/{subnetwork_name}') 1319 1320 compute = apis.get_api('compute', 'v1', project_id) 1321 request = compute.subnetworks().getIamPolicy(project=project_id, 1322 region=region, 1323 resource=subnetwork_name) 1324 1325 return iam.fetch_iam_policy(request, VPCSubnetworkIAMPolicy, project_id, 1326 resource_name)
1329class Address(models.Resource): 1330 """IP Addresses.""" 1331 _resource_data: dict 1332 1333 def __init__(self, project_id, resource_data): 1334 super().__init__(project_id=project_id) 1335 self._resource_data = resource_data 1336 1337 @property 1338 def full_path(self) -> str: 1339 result = re.match(r'https://www.googleapis.com/compute/v1/(.*)', 1340 self.self_link) 1341 if result: 1342 return result.group(1) 1343 else: 1344 return f'>> {self.self_link}' 1345 1346 @property 1347 def short_path(self) -> str: 1348 path = self.project_id + '/' + self.name 1349 return path 1350 1351 @property 1352 def name(self) -> str: 1353 return self._resource_data['name'] 1354 1355 @property 1356 def self_link(self) -> str: 1357 return self._resource_data.get('selfLink', '') 1358 1359 @property 1360 def subnetwork(self) -> str: 1361 return self._resource_data['subnetwork'] 1362 1363 @property 1364 def status(self) -> str: 1365 return self._resource_data['status']
IP Addresses.
1337 @property 1338 def full_path(self) -> str: 1339 result = re.match(r'https://www.googleapis.com/compute/v1/(.*)', 1340 self.self_link) 1341 if result: 1342 return result.group(1) 1343 else: 1344 return f'>> {self.self_link}'
Returns the full path of this resource.
Example: 'projects/gcpdiag-gke-1-9b90/zones/europe-west4-a/clusters/gke1'
1346 @property 1347 def short_path(self) -> str: 1348 path = self.project_id + '/' + self.name 1349 return path
Returns the short name for this resource.
Note that it isn't clear from this name what kind of resource it is.
Example: 'gke1'
1368@caching.cached_api_call(in_memory=True) 1369def get_addresses(project_id: str) -> List[Address]: 1370 logging.info('fetching addresses list: %s', project_id) 1371 compute = apis.get_api('compute', 'v1', project_id) 1372 addresses = [] 1373 request = compute.addresses().aggregatedList(project=project_id) 1374 response = request.execute(num_retries=config.API_RETRIES) 1375 addresses_by_regions = response['items'] 1376 for _, data_ in addresses_by_regions.items(): 1377 if 'addresses' not in data_: 1378 continue 1379 addresses.extend( 1380 [Address(project_id, address) for address in data_['addresses']]) 1381 return addresses