gcpdiag.queries.network
32class Subnetwork(models.Resource): 33 """A VPC subnetwork.""" 34 35 _resource_data: dict 36 37 def __init__(self, project_id, resource_data): 38 super().__init__(project_id=project_id) 39 self._resource_data = resource_data 40 41 @property 42 def full_path(self) -> str: 43 result = re.match(r'https://www.googleapis.com/compute/v1/(.*)', 44 self.self_link) 45 if result: 46 return result.group(1) 47 else: 48 return f'>> {self.self_link}' 49 50 @property 51 def short_path(self) -> str: 52 path = self.project_id + '/' + self.name 53 return path 54 55 @property 56 def name(self) -> str: 57 return self._resource_data['name'] 58 59 @property 60 def self_link(self) -> str: 61 return self._resource_data.get('selfLink', '') 62 63 @property 64 def ip_network(self) -> IPv4NetOrIPv6Net: 65 return ipaddress.ip_network(self._resource_data['ipCidrRange']) 66 67 @property 68 def region(self) -> str: 69 # https://www.googleapis.com/compute/v1/projects/gcpdiag-gke1-aaaa/regions/europe-west4 70 m = re.match( 71 r'https://www.googleapis.com/compute/v1/projects/([^/]+)/regions/([^/]+)', 72 self._resource_data['region']) 73 if not m: 74 raise RuntimeError( 75 f"can't parse region URL: {self._resource_data['region']}") 76 return m.group(2) 77 78 def is_private_ip_google_access(self) -> bool: 79 return self._resource_data.get('privateIpGoogleAccess', False) 80 81 @property 82 def network(self): 83 return self._resource_data['network']
A VPC subnetwork.
41 @property 42 def full_path(self) -> str: 43 result = re.match(r'https://www.googleapis.com/compute/v1/(.*)', 44 self.self_link) 45 if result: 46 return result.group(1) 47 else: 48 return f'>> {self.self_link}'
Returns the full path of this resource.
Example: 'projects/gcpdiag-gke-1-9b90/zones/europe-west4-a/clusters/gke1'
50 @property 51 def short_path(self) -> str: 52 path = self.project_id + '/' + self.name 53 return path
Returns the short name for this resource.
Note that it isn't clear from this name what kind of resource it is.
Example: 'gke1'
67 @property 68 def region(self) -> str: 69 # https://www.googleapis.com/compute/v1/projects/gcpdiag-gke1-aaaa/regions/europe-west4 70 m = re.match( 71 r'https://www.googleapis.com/compute/v1/projects/([^/]+)/regions/([^/]+)', 72 self._resource_data['region']) 73 if not m: 74 raise RuntimeError( 75 f"can't parse region URL: {self._resource_data['region']}") 76 return m.group(2)
86class Route(models.Resource): 87 """A VPC Route.""" 88 89 _resource_data: dict 90 91 def __init__(self, project_id, resource_data): 92 super().__init__(project_id=project_id) 93 self._resource_data = resource_data 94 95 @property 96 def full_path(self) -> str: 97 result = re.match(r'https://www.googleapis.com/compute/v1/(.*)', 98 self.self_link) 99 if result: 100 return result.group(1) 101 else: 102 return f'>> {self.self_link}' 103 104 @property 105 def short_path(self) -> str: 106 path = self.project_id + '/' + self.name 107 return path 108 109 @property 110 def name(self) -> str: 111 return self._resource_data['name'] 112 113 @property 114 def kind(self) -> str: 115 return self._resource_data['kind'] 116 117 @property 118 def self_link(self) -> str: 119 return self._resource_data['selfLink'] 120 121 @property 122 def network(self) -> str: 123 return self._resource_data['network'] 124 125 @property 126 def tags(self) -> List[str]: 127 if 'tags' in self._resource_data: 128 return self._resource_data['tags'] 129 return [] 130 131 @property 132 def dest_range(self) -> str: 133 return self._resource_data['destRange'] 134 135 @property 136 def next_hop_gateway(self) -> Optional[str]: 137 if 'nextHopGateway' in self._resource_data: 138 return self._resource_data['nextHopGateway'] 139 return None 140 141 @property 142 def next_hop_vpn_tunnel(self) -> Optional[str]: 143 return self._resource_data.get('nextHopVpnTunnel') 144 145 @property 146 def next_hop_hub(self) -> Optional[str]: 147 return self._resource_data.get('nextHopHub') 148 149 @property 150 def priority(self) -> int: 151 return self._resource_data['priority'] 152 153 def get_next_hop(self) -> Union[Dict[str, Any], Optional[str]]: 154 hop_types = { 155 'nextHopGateway': 'nextHopGateway', 156 'nextHopVpnTunnel': 'nextHopVpnTunnel', 157 'nextHopHub': 'nextHopHub', 158 'nextHopInstance': 'nextHopInstance', 159 'nextHopAddress': 'nextHopAddress', 160 'nextHopPeering': 'nextHopPeering', 161 'nextHopIlb': 'nextHopIlb', 162 'nextHopNetwork': 'nextHopNetwork', 163 'nextHopIp': 'nextHopIp' 164 } 165 166 for hop_type, value in hop_types.items(): 167 if self._resource_data.get(hop_type): 168 return {'type': value, 'link': self._resource_data[hop_type]} 169 return None 170 171 def check_route_match(self, ip1: IPAddrOrNet, ip2: str) -> bool: 172 ip2_list = [ipaddress.ip_network(ip2)] 173 if _ip_match(ip1, ip2_list, 'allow'): 174 return True 175 return False
A VPC Route.
95 @property 96 def full_path(self) -> str: 97 result = re.match(r'https://www.googleapis.com/compute/v1/(.*)', 98 self.self_link) 99 if result: 100 return result.group(1) 101 else: 102 return f'>> {self.self_link}'
Returns the full path of this resource.
Example: 'projects/gcpdiag-gke-1-9b90/zones/europe-west4-a/clusters/gke1'
104 @property 105 def short_path(self) -> str: 106 path = self.project_id + '/' + self.name 107 return path
Returns the short name for this resource.
Note that it isn't clear from this name what kind of resource it is.
Example: 'gke1'
153 def get_next_hop(self) -> Union[Dict[str, Any], Optional[str]]: 154 hop_types = { 155 'nextHopGateway': 'nextHopGateway', 156 'nextHopVpnTunnel': 'nextHopVpnTunnel', 157 'nextHopHub': 'nextHopHub', 158 'nextHopInstance': 'nextHopInstance', 159 'nextHopAddress': 'nextHopAddress', 160 'nextHopPeering': 'nextHopPeering', 161 'nextHopIlb': 'nextHopIlb', 162 'nextHopNetwork': 'nextHopNetwork', 163 'nextHopIp': 'nextHopIp' 164 } 165 166 for hop_type, value in hop_types.items(): 167 if self._resource_data.get(hop_type): 168 return {'type': value, 'link': self._resource_data[hop_type]} 169 return None
178class ManagedZone(models.Resource): 179 """ 180 Represent a DNS zone (public or private 181 182 https://cloud.google.com/dns/docs/reference/v1beta2/managedZones 183 """ 184 _resource_data: dict 185 186 def __init__(self, project_id, resource_data): 187 super().__init__(project_id=project_id) 188 self._resource_data = resource_data 189 190 @property 191 def cloud_logging_config(self) -> bool: 192 return self._resource_data['cloudLoggingConfig'].get('enableLogging', False) 193 194 @property 195 def is_public(self) -> bool: 196 return self._resource_data['visibility'] == 'public' 197 198 @property 199 def vpc_attached(self) -> bool: 200 if 'privateVisibilityConfig' not in self._resource_data: 201 self._resource_data['privateVisibilityConfig'] = {} 202 203 return (self._resource_data['privateVisibilityConfig'].get( 204 'networks', False) or 205 self._resource_data['privateVisibilityConfig'].get( 206 'gkeClusters', False)) 207 208 @property 209 def dnssec_config_state(self) -> bool: 210 if 'dnssecConfig' not in self._resource_data: 211 self._resource_data['dnssecConfig'] = {} 212 213 return self._resource_data['dnssecConfig'].get('state', False) 214 215 @property 216 def name(self) -> str: 217 return self._resource_data['name'] 218 219 @property 220 def full_path(self) -> str: 221 result = re.match(r'https://dns.googleapis.com/dns/v1beta2/(.*)', 222 self.self_link) 223 if result: 224 return result.group(1) 225 else: 226 return f'>> {self.self_link}' 227 228 @property 229 def short_path(self) -> str: 230 path = self.project_id + '/' + self.name 231 return path 232 233 @property 234 def self_link(self) -> str: 235 return self._resource_data.get('selfLink', '')
Represent a DNS zone (public or private
https://cloud.google.com/dns/docs/reference/v1beta2/managedZones
198 @property 199 def vpc_attached(self) -> bool: 200 if 'privateVisibilityConfig' not in self._resource_data: 201 self._resource_data['privateVisibilityConfig'] = {} 202 203 return (self._resource_data['privateVisibilityConfig'].get( 204 'networks', False) or 205 self._resource_data['privateVisibilityConfig'].get( 206 'gkeClusters', False))
219 @property 220 def full_path(self) -> str: 221 result = re.match(r'https://dns.googleapis.com/dns/v1beta2/(.*)', 222 self.self_link) 223 if result: 224 return result.group(1) 225 else: 226 return f'>> {self.self_link}'
Returns the full path of this resource.
Example: 'projects/gcpdiag-gke-1-9b90/zones/europe-west4-a/clusters/gke1'
238class Router(models.Resource): 239 """A VPC Router.""" 240 241 _resource_data: dict 242 243 def __init__(self, project_id, resource_data): 244 super().__init__(project_id=project_id) 245 self._resource_data = resource_data 246 self._nats = None 247 248 @property 249 def full_path(self) -> str: 250 result = re.match(r'https://www.googleapis.com/compute/v1/(.*)', 251 self.self_link) 252 if result: 253 return result.group(1) 254 else: 255 return f'>> {self.self_link}' 256 257 @property 258 def short_path(self) -> str: 259 path = self.project_id + '/' + self.name 260 return path 261 262 @property 263 def name(self) -> str: 264 return self._resource_data['name'] 265 266 @property 267 def self_link(self) -> str: 268 return self._resource_data.get('selfLink', '') 269 270 @property 271 def nats(self): 272 return self._resource_data.get('nats', []) 273 274 def get_nat_ip_allocate_option(self, nat_gateway) -> str: 275 nats = self._resource_data.get('nats', []) 276 nat = [n for n in nats if n['name'] == nat_gateway] 277 return nat[0].get('natIpAllocateOption', '') 278 279 def get_enable_dynamic_port_allocation(self, nat_gateway) -> str: 280 nats = self._resource_data.get('nats', []) 281 nat = [n for n in nats if n['name'] == nat_gateway] 282 return nat[0].get('enableDynamicPortAllocation', '') 283 284 def subnet_has_nat(self, subnetwork): 285 if not self._resource_data.get('nats', []): 286 return False 287 for n in self._resource_data.get('nats', []): 288 if n['sourceSubnetworkIpRangesToNat'] == 'LIST_OF_SUBNETWORKS': 289 # Cloud NAT configure for specific subnets 290 if 'subnetworks' in n and subnetwork.self_link in [ 291 s['name'] for s in n['subnetworks'] 292 ]: 293 return True 294 else: 295 # Cloud NAT configured for all subnets 296 return True 297 return False
A VPC Router.
248 @property 249 def full_path(self) -> str: 250 result = re.match(r'https://www.googleapis.com/compute/v1/(.*)', 251 self.self_link) 252 if result: 253 return result.group(1) 254 else: 255 return f'>> {self.self_link}'
Returns the full path of this resource.
Example: 'projects/gcpdiag-gke-1-9b90/zones/europe-west4-a/clusters/gke1'
257 @property 258 def short_path(self) -> str: 259 path = self.project_id + '/' + self.name 260 return path
Returns the short name for this resource.
Note that it isn't clear from this name what kind of resource it is.
Example: 'gke1'
284 def subnet_has_nat(self, subnetwork): 285 if not self._resource_data.get('nats', []): 286 return False 287 for n in self._resource_data.get('nats', []): 288 if n['sourceSubnetworkIpRangesToNat'] == 'LIST_OF_SUBNETWORKS': 289 # Cloud NAT configure for specific subnets 290 if 'subnetworks' in n and subnetwork.self_link in [ 291 s['name'] for s in n['subnetworks'] 292 ]: 293 return True 294 else: 295 # Cloud NAT configured for all subnets 296 return True 297 return False
300class RouterStatus(models.Resource): 301 """NAT Router Status""" 302 303 _resource_data: dict 304 305 def __init__(self, project_id, resource_data): 306 super().__init__(project_id=project_id) 307 self._resource_data = resource_data 308 309 @property 310 def full_path(self) -> str: 311 result = re.match(r'https://www.googleapis.com/compute/v1/(.*)', 312 self.self_link) 313 if result: 314 return result.group(1) 315 else: 316 return f'>> {self.self_link}' 317 318 @property 319 def short_path(self) -> str: 320 path = self.project_id + '/' + self.name 321 return path 322 323 @property 324 def name(self) -> str: 325 return self._resource_data.get('name', '') 326 327 @property 328 def self_link(self) -> str: 329 return self._resource_data.get('selfLink', '') 330 331 @property 332 def min_extra_nat_ips_needed(self) -> str: 333 nat_status = self._resource_data.get('result', {}).get('natStatus', {}) 334 return nat_status[0].get('minExtraNatIpsNeeded', None) 335 336 @property 337 def num_vms_with_nat_mappings(self) -> str: 338 nat_status = self._resource_data.get('result', {}).get('natStatus', {}) 339 return nat_status[0].get('numVmEndpointsWithNatMappings', None)
NAT Router Status
309 @property 310 def full_path(self) -> str: 311 result = re.match(r'https://www.googleapis.com/compute/v1/(.*)', 312 self.self_link) 313 if result: 314 return result.group(1) 315 else: 316 return f'>> {self.self_link}'
Returns the full path of this resource.
Example: 'projects/gcpdiag-gke-1-9b90/zones/europe-west4-a/clusters/gke1'
318 @property 319 def short_path(self) -> str: 320 path = self.project_id + '/' + self.name 321 return path
Returns the short name for this resource.
Note that it isn't clear from this name what kind of resource it is.
Example: 'gke1'
342class RouterNatIpInfo(models.Resource): 343 """NAT IP Info""" 344 345 _resource_data: dict 346 347 def __init__(self, project_id, resource_data): 348 super().__init__(project_id=project_id) 349 self._resource_data = resource_data 350 351 @property 352 def full_path(self) -> str: 353 result = re.match(r'https://www.googleapis.com/compute/v1/(.*)', 354 self.self_link) 355 if result: 356 return result.group(1) 357 else: 358 return f'>> {self.self_link}' 359 360 @property 361 def short_path(self) -> str: 362 path = self.project_id + '/' + self.name 363 return path 364 365 @property 366 def self_link(self) -> str: 367 return self._resource_data.get('selfLink', '') 368 369 @property 370 def name(self) -> str: 371 return self._resource_data.get('name', '') 372 373 @property 374 def result(self) -> str: 375 return self._resource_data.get('result', [])
NAT IP Info
351 @property 352 def full_path(self) -> str: 353 result = re.match(r'https://www.googleapis.com/compute/v1/(.*)', 354 self.self_link) 355 if result: 356 return result.group(1) 357 else: 358 return f'>> {self.self_link}'
Returns the full path of this resource.
Example: 'projects/gcpdiag-gke-1-9b90/zones/europe-west4-a/clusters/gke1'
360 @property 361 def short_path(self) -> str: 362 path = self.project_id + '/' + self.name 363 return path
Returns the short name for this resource.
Note that it isn't clear from this name what kind of resource it is.
Example: 'gke1'
378@dataclasses.dataclass 379class Peering: 380 """VPC Peerings""" 381 382 name: str 383 url: str 384 state: str 385 exports_custom_routes: bool 386 imports_custom_routes: bool 387 auto_creates_routes: bool 388 389 def __str__(self): 390 return self.name
VPC Peerings
393class Network(models.Resource): 394 """A VPC network.""" 395 _resource_data: dict 396 _subnetworks: Optional[Dict[str, Subnetwork]] 397 398 def __init__(self, project_id, resource_data): 399 super().__init__(project_id=project_id) 400 self._resource_data = resource_data 401 self._subnetworks = None 402 403 @property 404 def full_path(self) -> str: 405 result = re.match(r'https://www.googleapis.com/compute/v1/(.*)', 406 self.self_link) 407 if result: 408 return result.group(1) 409 else: 410 return f'>> {self.self_link}' 411 412 @property 413 def short_path(self) -> str: 414 path = self.project_id + '/' + self.name 415 return path 416 417 @property 418 def name(self) -> str: 419 return self._resource_data['name'] 420 421 @property 422 def self_link(self) -> str: 423 return self._resource_data.get('selfLink', '') 424 425 @property 426 def firewall(self) -> 'EffectiveFirewalls': 427 return _get_effective_firewalls(self) 428 429 @property 430 def subnetworks(self) -> Dict[str, Subnetwork]: 431 return _batch_get_subnetworks( 432 self._project_id, frozenset(self._resource_data.get('subnetworks', []))) 433 434 @property 435 def peerings(self) -> List[Peering]: 436 return [ 437 Peering(peer['name'], peer['network'], peer['state'], 438 peer['exportCustomRoutes'], peer['importCustomRoutes'], 439 peer['autoCreateRoutes']) 440 for peer in self._resource_data.get('peerings', []) 441 ]
A VPC network.
403 @property 404 def full_path(self) -> str: 405 result = re.match(r'https://www.googleapis.com/compute/v1/(.*)', 406 self.self_link) 407 if result: 408 return result.group(1) 409 else: 410 return f'>> {self.self_link}'
Returns the full path of this resource.
Example: 'projects/gcpdiag-gke-1-9b90/zones/europe-west4-a/clusters/gke1'
412 @property 413 def short_path(self) -> str: 414 path = self.project_id + '/' + self.name 415 return path
Returns the short name for this resource.
Note that it isn't clear from this name what kind of resource it is.
Example: 'gke1'
549@dataclasses.dataclass 550class FirewallCheckResult: 551 """The result of a firewall connectivity check.""" 552 553 action: str 554 firewall_policy_name: Optional[str] = None 555 firewall_policy_rule_description: Optional[str] = None 556 vpc_firewall_rule_id: Optional[str] = None 557 vpc_firewall_rule_name: Optional[str] = None 558 559 def __str__(self): 560 return self.action 561 562 @property 563 def matched_by_str(self): 564 if self.firewall_policy_name: 565 if self.firewall_policy_rule_description: 566 return f'policy: {self.firewall_policy_name}, rule: {self.firewall_policy_rule_description}' 567 else: 568 return f'policy: {self.firewall_policy_name}' 569 elif self.vpc_firewall_rule_name: 570 return f'vpc firewall rule: {self.vpc_firewall_rule_name}'
The result of a firewall connectivity check.
562 @property 563 def matched_by_str(self): 564 if self.firewall_policy_name: 565 if self.firewall_policy_rule_description: 566 return f'policy: {self.firewall_policy_name}, rule: {self.firewall_policy_rule_description}' 567 else: 568 return f'policy: {self.firewall_policy_name}' 569 elif self.vpc_firewall_rule_name: 570 return f'vpc firewall rule: {self.vpc_firewall_rule_name}'
573class FirewallRuleNotFoundError(Exception): 574 rule_name: str 575 576 def __init__(self, name, disabled=False): 577 # Call the base class constructor with the parameters it needs 578 super().__init__(f'firewall rule not found: {name}') 579 self.rule_name = name 580 self.disabled = disabled
Common base class for all non-exit exceptions.
583class VpcFirewallRule: 584 """Represents firewall rule""" 585 586 def __init__(self, resource_data): 587 self._resource_data = resource_data 588 589 @property 590 def name(self) -> str: 591 return self._resource_data['name'] 592 593 @property 594 def source_ranges(self) -> List[ipaddress.IPv4Network]: 595 return self._resource_data['sourceRanges'] 596 597 @property 598 def target_tags(self) -> set: 599 return self._resource_data['targetTags'] 600 601 @property 602 def allowed(self) -> List[dict]: 603 return self._resource_data['allowed'] 604 605 def is_enabled(self) -> bool: 606 return not self._resource_data['disabled']
Represents firewall rule
958class EffectiveFirewalls: 959 """Effective firewall rules for a VPC network or Instance. 960 961 Includes org/folder firewall policies).""" 962 _resource_data: dict 963 _policies: List[_FirewallPolicy] 964 _vpc_firewall: _VpcFirewall 965 966 def __init__(self, resource_data): 967 self._resource_data = resource_data 968 self._policies = [] 969 if 'firewallPolicys' in resource_data: 970 for policy in resource_data['firewallPolicys']: 971 self._policies.append(_FirewallPolicy(policy)) 972 self._vpc_firewall = _VpcFirewall(resource_data.get('firewalls', {})) 973 974 def check_connectivity_ingress( 975 self, # 976 *, 977 src_ip: IPAddrOrNet, 978 ip_protocol: str, 979 port: Optional[int] = None, 980 source_service_account: Optional[str] = None, 981 source_tags: Optional[List[str]] = None, 982 target_service_account: Optional[str] = None, 983 target_tags: Optional[List[str]] = None) -> FirewallCheckResult: 984 985 if ip_protocol != 'ICMP' and port is None: 986 raise ValueError('TCP and UDP must have port numbers') 987 988 # Firewall policies (organization, folders) 989 for p in self._policies: 990 result = p.check_connectivity_ingress( 991 src_ip=src_ip, 992 ip_protocol=ip_protocol, 993 port=port, 994 #target_network=self._network, 995 target_service_account=target_service_account) 996 if result.action != 'goto_next': 997 return result 998 999 # VPC firewall rules 1000 return self._vpc_firewall.check_connectivity_ingress( 1001 src_ip=src_ip, 1002 ip_protocol=ip_protocol, 1003 port=port, 1004 source_service_account=source_service_account, 1005 source_tags=source_tags, 1006 target_service_account=target_service_account, 1007 target_tags=target_tags) 1008 1009 def check_connectivity_egress( 1010 self, # 1011 *, 1012 src_ip: IPAddrOrNet, 1013 ip_protocol: str, 1014 port: Optional[int] = None, 1015 source_service_account: Optional[str] = None, 1016 source_tags: Optional[List[str]] = None, 1017 target_service_account: Optional[str] = None, 1018 target_tags: Optional[List[str]] = None) -> FirewallCheckResult: 1019 1020 if ip_protocol != 'ICMP' and port is None: 1021 raise ValueError('TCP and UDP must have port numbers') 1022 1023 # Firewall policies (organization, folders) 1024 for p in self._policies: 1025 result = p.check_connectivity_egress( 1026 src_ip=src_ip, 1027 ip_protocol=ip_protocol, 1028 port=port, 1029 #target_network=self._network, 1030 target_service_account=target_service_account) 1031 if result.action != 'goto_next': 1032 return result 1033 1034 # VPC firewall rules 1035 return self._vpc_firewall.check_connectivity_egress( 1036 src_ip=src_ip, 1037 ip_protocol=ip_protocol, 1038 port=port, 1039 source_service_account=source_service_account, 1040 source_tags=source_tags, 1041 target_service_account=target_service_account, 1042 target_tags=target_tags) 1043 1044 def get_vpc_ingress_rules( 1045 self, 1046 name: Optional[str] = None, 1047 name_pattern: Optional[re.Pattern] = None, 1048 target_tags: Optional[List[str]] = None) -> List[VpcFirewallRule]: 1049 """Retrieve the list of ingress firewall rules matching name or name pattern and target tags. 1050 1051 Args: 1052 name (Optional[str], optional): firewall rune name. Defaults to None. 1053 name_pattern (Optional[re.Pattern], optional): firewall rule name pattern. Defaults to None. 1054 target_tags (Optional[List[str]], optional): firewall target tags 1055 (if not specified any tag will match). Defaults to None. 1056 1057 Returns: 1058 List[VpcFirewallRule]: List of ingress firewall rules 1059 """ 1060 rules = self._vpc_firewall.get_vpc_ingress_rules(name, name_pattern, 1061 target_tags) 1062 return rules 1063 1064 def get_vpc_egress_rules( 1065 self, 1066 name: Optional[str] = None, 1067 name_pattern: Optional[re.Pattern] = None, 1068 target_tags: Optional[List[str]] = None) -> List[VpcFirewallRule]: 1069 """Retrieve the list of egress firewall rules matching name or name pattern and target tags. 1070 1071 Args: 1072 name (Optional[str], optional): firewall rune name. Defaults to None. 1073 name_pattern (Optional[re.Pattern], optional): firewall rule name pattern. Defaults to None. 1074 target_tags (Optional[List[str]], optional): firewall target tags 1075 (if not specified any tag will match). Defaults to None. 1076 1077 Returns: 1078 List[VpcFirewallRule]: List of egress firewall rules 1079 """ 1080 rules = self._vpc_firewall.get_vpc_egress_rules(name, name_pattern, 1081 target_tags) 1082 return rules 1083 1084 def verify_ingress_rule_exists(self, name: str): 1085 """Verify that a certain VPC rule exists. This is useful to verify 1086 whether maybe a permission was missing on a shared VPC and an 1087 automatic rule couldn't be created.""" 1088 return self._vpc_firewall.verify_ingress_rule_exists(name) 1089 1090 def verify_egress_rule_exists(self, name: str): 1091 """Verify that a certain VPC rule exists. This is useful to verify 1092 whether maybe a permission was missing on a shared VPC and an 1093 automatic rule couldn't be created.""" 1094 return self._vpc_firewall.verify_egress_rule_exists(name)
Effective firewall rules for a VPC network or Instance.
Includes org/folder firewall policies).
966 def __init__(self, resource_data): 967 self._resource_data = resource_data 968 self._policies = [] 969 if 'firewallPolicys' in resource_data: 970 for policy in resource_data['firewallPolicys']: 971 self._policies.append(_FirewallPolicy(policy)) 972 self._vpc_firewall = _VpcFirewall(resource_data.get('firewalls', {}))
974 def check_connectivity_ingress( 975 self, # 976 *, 977 src_ip: IPAddrOrNet, 978 ip_protocol: str, 979 port: Optional[int] = None, 980 source_service_account: Optional[str] = None, 981 source_tags: Optional[List[str]] = None, 982 target_service_account: Optional[str] = None, 983 target_tags: Optional[List[str]] = None) -> FirewallCheckResult: 984 985 if ip_protocol != 'ICMP' and port is None: 986 raise ValueError('TCP and UDP must have port numbers') 987 988 # Firewall policies (organization, folders) 989 for p in self._policies: 990 result = p.check_connectivity_ingress( 991 src_ip=src_ip, 992 ip_protocol=ip_protocol, 993 port=port, 994 #target_network=self._network, 995 target_service_account=target_service_account) 996 if result.action != 'goto_next': 997 return result 998 999 # VPC firewall rules 1000 return self._vpc_firewall.check_connectivity_ingress( 1001 src_ip=src_ip, 1002 ip_protocol=ip_protocol, 1003 port=port, 1004 source_service_account=source_service_account, 1005 source_tags=source_tags, 1006 target_service_account=target_service_account, 1007 target_tags=target_tags)
1009 def check_connectivity_egress( 1010 self, # 1011 *, 1012 src_ip: IPAddrOrNet, 1013 ip_protocol: str, 1014 port: Optional[int] = None, 1015 source_service_account: Optional[str] = None, 1016 source_tags: Optional[List[str]] = None, 1017 target_service_account: Optional[str] = None, 1018 target_tags: Optional[List[str]] = None) -> FirewallCheckResult: 1019 1020 if ip_protocol != 'ICMP' and port is None: 1021 raise ValueError('TCP and UDP must have port numbers') 1022 1023 # Firewall policies (organization, folders) 1024 for p in self._policies: 1025 result = p.check_connectivity_egress( 1026 src_ip=src_ip, 1027 ip_protocol=ip_protocol, 1028 port=port, 1029 #target_network=self._network, 1030 target_service_account=target_service_account) 1031 if result.action != 'goto_next': 1032 return result 1033 1034 # VPC firewall rules 1035 return self._vpc_firewall.check_connectivity_egress( 1036 src_ip=src_ip, 1037 ip_protocol=ip_protocol, 1038 port=port, 1039 source_service_account=source_service_account, 1040 source_tags=source_tags, 1041 target_service_account=target_service_account, 1042 target_tags=target_tags)
1044 def get_vpc_ingress_rules( 1045 self, 1046 name: Optional[str] = None, 1047 name_pattern: Optional[re.Pattern] = None, 1048 target_tags: Optional[List[str]] = None) -> List[VpcFirewallRule]: 1049 """Retrieve the list of ingress firewall rules matching name or name pattern and target tags. 1050 1051 Args: 1052 name (Optional[str], optional): firewall rune name. Defaults to None. 1053 name_pattern (Optional[re.Pattern], optional): firewall rule name pattern. Defaults to None. 1054 target_tags (Optional[List[str]], optional): firewall target tags 1055 (if not specified any tag will match). Defaults to None. 1056 1057 Returns: 1058 List[VpcFirewallRule]: List of ingress firewall rules 1059 """ 1060 rules = self._vpc_firewall.get_vpc_ingress_rules(name, name_pattern, 1061 target_tags) 1062 return rules
Retrieve the list of ingress firewall rules matching name or name pattern and target tags.
Arguments:
- name (Optional[str], optional): firewall rune name. Defaults to None.
- name_pattern (Optional[re.Pattern], optional): firewall rule name pattern. Defaults to None.
- target_tags (Optional[List[str]], optional): firewall target tags (if not specified any tag will match). Defaults to None.
Returns:
List[VpcFirewallRule]: List of ingress firewall rules
1064 def get_vpc_egress_rules( 1065 self, 1066 name: Optional[str] = None, 1067 name_pattern: Optional[re.Pattern] = None, 1068 target_tags: Optional[List[str]] = None) -> List[VpcFirewallRule]: 1069 """Retrieve the list of egress firewall rules matching name or name pattern and target tags. 1070 1071 Args: 1072 name (Optional[str], optional): firewall rune name. Defaults to None. 1073 name_pattern (Optional[re.Pattern], optional): firewall rule name pattern. Defaults to None. 1074 target_tags (Optional[List[str]], optional): firewall target tags 1075 (if not specified any tag will match). Defaults to None. 1076 1077 Returns: 1078 List[VpcFirewallRule]: List of egress firewall rules 1079 """ 1080 rules = self._vpc_firewall.get_vpc_egress_rules(name, name_pattern, 1081 target_tags) 1082 return rules
Retrieve the list of egress firewall rules matching name or name pattern and target tags.
Arguments:
- name (Optional[str], optional): firewall rune name. Defaults to None.
- name_pattern (Optional[re.Pattern], optional): firewall rule name pattern. Defaults to None.
- target_tags (Optional[List[str]], optional): firewall target tags (if not specified any tag will match). Defaults to None.
Returns:
List[VpcFirewallRule]: List of egress firewall rules
1084 def verify_ingress_rule_exists(self, name: str): 1085 """Verify that a certain VPC rule exists. This is useful to verify 1086 whether maybe a permission was missing on a shared VPC and an 1087 automatic rule couldn't be created.""" 1088 return self._vpc_firewall.verify_ingress_rule_exists(name)
Verify that a certain VPC rule exists. This is useful to verify whether maybe a permission was missing on a shared VPC and an automatic rule couldn't be created.
1090 def verify_egress_rule_exists(self, name: str): 1091 """Verify that a certain VPC rule exists. This is useful to verify 1092 whether maybe a permission was missing on a shared VPC and an 1093 automatic rule couldn't be created.""" 1094 return self._vpc_firewall.verify_egress_rule_exists(name)
Verify that a certain VPC rule exists. This is useful to verify whether maybe a permission was missing on a shared VPC and an automatic rule couldn't be created.
1097class VPCEffectiveFirewalls(EffectiveFirewalls): 1098 """Effective firewall rules for a VPC network. 1099 1100 Includes org/folder firewall policies).""" 1101 _network: Network 1102 1103 def __init__(self, network, resource_data): 1104 super().__init__(resource_data) 1105 self._network = network
Effective firewall rules for a VPC network.
Includes org/folder firewall policies).
1117@caching.cached_api_call(in_memory=True) 1118def get_network(project_id: str, network_name: str) -> Network: 1119 logging.info('fetching network: %s/%s', project_id, network_name) 1120 compute = apis.get_api('compute', 'v1', project_id) 1121 request = compute.networks().get(project=project_id, network=network_name) 1122 response = request.execute(num_retries=config.API_RETRIES) 1123 return Network(project_id, response)
1126def get_subnetwork_from_url(url: str) -> Subnetwork: 1127 """Returns Subnetwork object given subnetwork url""" 1128 m = re.match((r'https://www.googleapis.com/compute/v1/projects/' 1129 r'([^/]+)/regions/([^/]+)/subnetworks/([^/]+)$'), url) 1130 if not m: 1131 raise ValueError(f"can't parse network url: {url}") 1132 (project_id, region, subnetwork_name) = (m.group(1), m.group(2), m.group(3)) 1133 return get_subnetwork(project_id, region, subnetwork_name)
Returns Subnetwork object given subnetwork url
1136def get_network_from_url(url: str) -> Network: 1137 m = re.match( 1138 r'https://www.googleapis.com/compute/v1/projects/([^/]+)/global/networks/([^/]+)', 1139 url) 1140 if not m: 1141 raise ValueError(f"can't parse network url: {url}") 1142 (project_id, network_name) = (m.group(1), m.group(2)) 1143 return get_network(project_id, network_name)
1146@caching.cached_api_call(in_memory=True) 1147def get_networks(project_id: str) -> List[Network]: 1148 logging.info('fetching network: %s', project_id) 1149 compute = apis.get_api('compute', 'v1', project_id) 1150 request = compute.networks().list(project=project_id) 1151 response = request.execute(num_retries=config.API_RETRIES) 1152 return [Network(project_id, item) for item in response.get('items', [])]
1155@caching.cached_api_call(in_memory=True) 1156def get_subnetwork(project_id: str, region: str, 1157 subnetwork_name: str) -> Subnetwork: 1158 logging.info('fetching network: %s/%s', project_id, subnetwork_name) 1159 compute = apis.get_api('compute', 'v1', project_id) 1160 request = compute.subnetworks().get(project=project_id, 1161 region=region, 1162 subnetwork=subnetwork_name) 1163 response = request.execute(num_retries=config.API_RETRIES) 1164 return Subnetwork(project_id, response)
1193@caching.cached_api_call(in_memory=True) 1194def get_routes(project_id: str) -> List[Route]: 1195 logging.info('fetching routes: %s', project_id) 1196 compute = apis.get_api('compute', 'v1', project_id) 1197 request = compute.routes().list(project=project_id) 1198 response = request.execute(num_retries=config.API_RETRIES) 1199 return [Route(project_id, item) for item in response.get('items', [])]
1202@caching.cached_api_call(in_memory=True) 1203def get_zones(project_id: str) -> List[ManagedZone]: 1204 logging.info('fetching DNS zones: %s', project_id) 1205 dns = apis.get_api('dns', 'v1beta2', project_id) 1206 request = dns.managedZones().list(project=project_id) 1207 response = request.execute(num_retries=config.API_RETRIES) 1208 zones = [] 1209 for zone in response.get('managedZones', []): 1210 request2 = dns.managedZones().get(project=project_id, 1211 managedZone=zone['name']) 1212 response2 = request2.execute(num_retries=config.API_RETRIES) 1213 zones.append(ManagedZone(project_id, response2)) 1214 return zones
1217@caching.cached_api_call(in_memory=True) 1218def get_routers(project_id: str, region: str, network) -> List[Router]: 1219 logging.info('fetching routers: %s/%s', project_id, region) 1220 compute = apis.get_api('compute', 'v1', project_id) 1221 request = compute.routers().list(project=project_id, 1222 region=region, 1223 filter=f'network="{network.self_link}"') 1224 response = request.execute(num_retries=config.API_RETRIES) 1225 return [Router(project_id, item) for item in response.get('items', [])]
1228@caching.cached_api_call(in_memory=True) 1229def get_router(project_id: str, region: str, network) -> Router: 1230 logging.info('fetching routers: %s/%s', project_id, region) 1231 compute = apis.get_api('compute', 'v1', project_id) 1232 request = compute.routers().list(project=project_id, 1233 region=region, 1234 filter=f'network="{network.self_link}"') 1235 response = request.execute(num_retries=config.API_RETRIES) 1236 return Router(project_id, next(iter(response.get('items', [{}]))))
1239@caching.cached_api_call(in_memory=True) 1240def nat_router_status(project_id: str, router_name: str, 1241 region: str) -> RouterStatus: 1242 logging.info('fetching router status: %s/%s in region %s', project_id, 1243 router_name, region) 1244 compute = apis.get_api('compute', 'v1', project_id) 1245 request = compute.routers().getRouterStatus(project=project_id, 1246 router=router_name, 1247 region=region) 1248 response = request.execute(num_retries=config.API_RETRIES) 1249 if 'result' in str(response): 1250 return RouterStatus(project_id, response) 1251 else: 1252 logging.info('unable to fetch router status: %s/%s in region %s', 1253 project_id, router_name, region) 1254 return RouterStatus(project_id, {})
1257@caching.cached_api_call(in_memory=True) 1258def get_nat_ip_info(project_id: str, router_name: str, 1259 region: str) -> RouterNatIpInfo: 1260 logging.info('fetching NAT IP info for router: %s/%s in region %s', 1261 project_id, router_name, region) 1262 compute = apis.get_api('compute', 'v1', project_id) 1263 request = compute.routers().getNatIpInfo(project=project_id, 1264 router=router_name, 1265 region=region) 1266 response = request.execute(num_retries=config.API_RETRIES) 1267 if 'result' in str(response): 1268 return RouterNatIpInfo(project_id, response) 1269 else: 1270 logging.info('unable to fetch Nat IP Info for router: %s/%s in region %s', 1271 project_id, router_name, region) 1272 return RouterNatIpInfo(project_id, {})
1275class VPCSubnetworkIAMPolicy(iam.BaseIAMPolicy): 1276 1277 def _is_resource_permission(self, permission): 1278 return True
Common class for IAM policies
1281@caching.cached_api_call(in_memory=True) 1282def get_subnetwork_iam_policy(project_id: str, region: str, 1283 subnetwork_name: str) -> VPCSubnetworkIAMPolicy: 1284 resource_name = (f'projects/{project_id}/regions/{region}/' 1285 f'subnetworks/{subnetwork_name}') 1286 1287 compute = apis.get_api('compute', 'v1', project_id) 1288 request = compute.subnetworks().getIamPolicy(project=project_id, 1289 region=region, 1290 resource=subnetwork_name) 1291 1292 return iam.fetch_iam_policy(request, VPCSubnetworkIAMPolicy, project_id, 1293 resource_name)
1296class Address(models.Resource): 1297 """IP Addresses.""" 1298 _resource_data: dict 1299 1300 def __init__(self, project_id, resource_data): 1301 super().__init__(project_id=project_id) 1302 self._resource_data = resource_data 1303 1304 @property 1305 def full_path(self) -> str: 1306 result = re.match(r'https://www.googleapis.com/compute/v1/(.*)', 1307 self.self_link) 1308 if result: 1309 return result.group(1) 1310 else: 1311 return f'>> {self.self_link}' 1312 1313 @property 1314 def short_path(self) -> str: 1315 path = self.project_id + '/' + self.name 1316 return path 1317 1318 @property 1319 def name(self) -> str: 1320 return self._resource_data['name'] 1321 1322 @property 1323 def self_link(self) -> str: 1324 return self._resource_data.get('selfLink', '') 1325 1326 @property 1327 def subnetwork(self) -> str: 1328 return self._resource_data['subnetwork'] 1329 1330 @property 1331 def status(self) -> str: 1332 return self._resource_data['status']
IP Addresses.
1304 @property 1305 def full_path(self) -> str: 1306 result = re.match(r'https://www.googleapis.com/compute/v1/(.*)', 1307 self.self_link) 1308 if result: 1309 return result.group(1) 1310 else: 1311 return f'>> {self.self_link}'
Returns the full path of this resource.
Example: 'projects/gcpdiag-gke-1-9b90/zones/europe-west4-a/clusters/gke1'
1313 @property 1314 def short_path(self) -> str: 1315 path = self.project_id + '/' + self.name 1316 return path
Returns the short name for this resource.
Note that it isn't clear from this name what kind of resource it is.
Example: 'gke1'
1335@caching.cached_api_call(in_memory=True) 1336def get_addresses(project_id: str) -> List[Address]: 1337 logging.info('fetching addresses list: %s', project_id) 1338 compute = apis.get_api('compute', 'v1', project_id) 1339 addresses = [] 1340 request = compute.addresses().aggregatedList(project=project_id) 1341 response = request.execute(num_retries=config.API_RETRIES) 1342 addresses_by_regions = response['items'] 1343 for _, data_ in addresses_by_regions.items(): 1344 if 'addresses' not in data_: 1345 continue 1346 addresses.extend( 1347 [Address(project_id, address) for address in data_['addresses']]) 1348 return addresses