gcpdiag.queries.network
32class Subnetwork(models.Resource): 33 """A VPC subnetwork.""" 34 35 _resource_data: dict 36 37 def __init__(self, project_id, resource_data): 38 super().__init__(project_id=project_id) 39 self._resource_data = resource_data 40 41 @property 42 def full_path(self) -> str: 43 result = re.match(r'https://www.googleapis.com/compute/v1/(.*)', 44 self.self_link) 45 if result: 46 return result.group(1) 47 else: 48 return f'>> {self.self_link}' 49 50 @property 51 def short_path(self) -> str: 52 path = self.project_id + '/' + self.name 53 return path 54 55 @property 56 def name(self) -> str: 57 return self._resource_data['name'] 58 59 @property 60 def self_link(self) -> str: 61 return self._resource_data.get('selfLink', '') 62 63 @property 64 def ip_network(self) -> IPv4NetOrIPv6Net: 65 return ipaddress.ip_network(self._resource_data['ipCidrRange']) 66 67 @property 68 def region(self) -> str: 69 # https://www.googleapis.com/compute/v1/projects/gcpdiag-gke1-aaaa/regions/europe-west4 70 m = re.match( 71 r'https://www.googleapis.com/compute/v1/projects/([^/]+)/regions/([^/]+)', 72 self._resource_data['region']) 73 if not m: 74 raise RuntimeError( 75 f"can't parse region URL: {self._resource_data['region']}") 76 return m.group(2) 77 78 def is_private_ip_google_access(self) -> bool: 79 return self._resource_data.get('privateIpGoogleAccess', False) 80 81 @property 82 def network(self): 83 return self._resource_data['network']
A VPC subnetwork.
41 @property 42 def full_path(self) -> str: 43 result = re.match(r'https://www.googleapis.com/compute/v1/(.*)', 44 self.self_link) 45 if result: 46 return result.group(1) 47 else: 48 return f'>> {self.self_link}'
Returns the full path of this resource.
Example: 'projects/gcpdiag-gke-1-9b90/zones/europe-west4-a/clusters/gke1'
50 @property 51 def short_path(self) -> str: 52 path = self.project_id + '/' + self.name 53 return path
Returns the short name for this resource.
Note that it isn't clear from this name what kind of resource it is.
Example: 'gke1'
67 @property 68 def region(self) -> str: 69 # https://www.googleapis.com/compute/v1/projects/gcpdiag-gke1-aaaa/regions/europe-west4 70 m = re.match( 71 r'https://www.googleapis.com/compute/v1/projects/([^/]+)/regions/([^/]+)', 72 self._resource_data['region']) 73 if not m: 74 raise RuntimeError( 75 f"can't parse region URL: {self._resource_data['region']}") 76 return m.group(2)
86class Route(models.Resource): 87 """A VPC Route.""" 88 89 _resource_data: dict 90 91 def __init__(self, project_id, resource_data): 92 super().__init__(project_id=project_id) 93 self._resource_data = resource_data 94 95 @property 96 def full_path(self) -> str: 97 result = re.match(r'https://www.googleapis.com/compute/v1/(.*)', 98 self.self_link) 99 if result: 100 return result.group(1) 101 else: 102 return f'>> {self.self_link}' 103 104 @property 105 def short_path(self) -> str: 106 path = self.project_id + '/' + self.name 107 return path 108 109 @property 110 def name(self) -> str: 111 return self._resource_data['name'] 112 113 @property 114 def kind(self) -> str: 115 return self._resource_data['kind'] 116 117 @property 118 def self_link(self) -> str: 119 return self._resource_data['selfLink'] 120 121 @property 122 def network(self) -> str: 123 return self._resource_data['network'] 124 125 @property 126 def tags(self) -> List[str]: 127 if 'tags' in self._resource_data: 128 return self._resource_data['tags'] 129 return [] 130 131 @property 132 def dest_range(self) -> str: 133 return self._resource_data['destRange'] 134 135 @property 136 def next_hop_gateway(self) -> Optional[str]: 137 if 'nextHopGateway' in self._resource_data: 138 return self._resource_data['nextHopGateway'] 139 return None 140 141 @property 142 def next_hop_vpn_tunnel(self) -> Optional[str]: 143 return self._resource_data.get('nextHopVpnTunnel') 144 145 @property 146 def next_hop_hub(self) -> Optional[str]: 147 return self._resource_data.get('nextHopHub') 148 149 @property 150 def priority(self) -> int: 151 return self._resource_data['priority'] 152 153 def get_next_hop(self) -> Union[Dict[str, Any], Optional[str]]: 154 hop_types = { 155 'nextHopGateway': 'nextHopGateway', 156 'nextHopVpnTunnel': 'nextHopVpnTunnel', 157 'nextHopHub': 'nextHopHub', 158 'nextHopInstance': 'nextHopInstance', 159 'nextHopAddress': 'nextHopAddress', 160 'nextHopPeering': 'nextHopPeering', 161 'nextHopIlb': 'nextHopIlb', 162 'nextHopNetwork': 'nextHopNetwork', 163 'nextHopIp': 'nextHopIp' 164 } 165 166 for hop_type, value in hop_types.items(): 167 if self._resource_data.get(hop_type): 168 return {'type': value, 'link': self._resource_data[hop_type]} 169 return None 170 171 def check_route_match(self, ip1: IPAddrOrNet, ip2: str) -> bool: 172 ip2_list = [ipaddress.ip_network(ip2)] 173 if _ip_match(ip1, ip2_list, 'allow'): 174 return True 175 return False
A VPC Route.
95 @property 96 def full_path(self) -> str: 97 result = re.match(r'https://www.googleapis.com/compute/v1/(.*)', 98 self.self_link) 99 if result: 100 return result.group(1) 101 else: 102 return f'>> {self.self_link}'
Returns the full path of this resource.
Example: 'projects/gcpdiag-gke-1-9b90/zones/europe-west4-a/clusters/gke1'
104 @property 105 def short_path(self) -> str: 106 path = self.project_id + '/' + self.name 107 return path
Returns the short name for this resource.
Note that it isn't clear from this name what kind of resource it is.
Example: 'gke1'
153 def get_next_hop(self) -> Union[Dict[str, Any], Optional[str]]: 154 hop_types = { 155 'nextHopGateway': 'nextHopGateway', 156 'nextHopVpnTunnel': 'nextHopVpnTunnel', 157 'nextHopHub': 'nextHopHub', 158 'nextHopInstance': 'nextHopInstance', 159 'nextHopAddress': 'nextHopAddress', 160 'nextHopPeering': 'nextHopPeering', 161 'nextHopIlb': 'nextHopIlb', 162 'nextHopNetwork': 'nextHopNetwork', 163 'nextHopIp': 'nextHopIp' 164 } 165 166 for hop_type, value in hop_types.items(): 167 if self._resource_data.get(hop_type): 168 return {'type': value, 'link': self._resource_data[hop_type]} 169 return None
178class ManagedZone(models.Resource): 179 """ 180 Represent a DNS zone (public or private 181 182 https://cloud.google.com/dns/docs/reference/v1beta2/managedZones 183 """ 184 _resource_data: dict 185 186 def __init__(self, project_id, resource_data): 187 super().__init__(project_id=project_id) 188 self._resource_data = resource_data 189 190 @property 191 def cloud_logging_config(self) -> bool: 192 return self._resource_data['cloudLoggingConfig'].get('enableLogging', False) 193 194 @property 195 def is_public(self) -> bool: 196 return self._resource_data['visibility'] == 'public' 197 198 @property 199 def vpc_attached(self) -> bool: 200 if 'privateVisibilityConfig' not in self._resource_data: 201 self._resource_data['privateVisibilityConfig'] = {} 202 203 return (self._resource_data['privateVisibilityConfig'].get( 204 'networks', False) or 205 self._resource_data['privateVisibilityConfig'].get( 206 'gkeClusters', False)) 207 208 @property 209 def dnssec_config_state(self) -> bool: 210 if 'dnssecConfig' not in self._resource_data: 211 self._resource_data['dnssecConfig'] = {} 212 213 return self._resource_data['dnssecConfig'].get('state', False) 214 215 @property 216 def name(self) -> str: 217 return self._resource_data['name'] 218 219 @property 220 def full_path(self) -> str: 221 result = re.match(r'https://dns.googleapis.com/dns/v1beta2/(.*)', 222 self.self_link) 223 if result: 224 return result.group(1) 225 else: 226 return f'>> {self.self_link}' 227 228 @property 229 def short_path(self) -> str: 230 path = self.project_id + '/' + self.name 231 return path 232 233 @property 234 def self_link(self) -> str: 235 return self._resource_data.get('selfLink', '')
Represent a DNS zone (public or private
https://cloud.google.com/dns/docs/reference/v1beta2/managedZones
198 @property 199 def vpc_attached(self) -> bool: 200 if 'privateVisibilityConfig' not in self._resource_data: 201 self._resource_data['privateVisibilityConfig'] = {} 202 203 return (self._resource_data['privateVisibilityConfig'].get( 204 'networks', False) or 205 self._resource_data['privateVisibilityConfig'].get( 206 'gkeClusters', False))
219 @property 220 def full_path(self) -> str: 221 result = re.match(r'https://dns.googleapis.com/dns/v1beta2/(.*)', 222 self.self_link) 223 if result: 224 return result.group(1) 225 else: 226 return f'>> {self.self_link}'
Returns the full path of this resource.
Example: 'projects/gcpdiag-gke-1-9b90/zones/europe-west4-a/clusters/gke1'
238class Router(models.Resource): 239 """A VPC Router.""" 240 241 _resource_data: dict 242 243 def __init__(self, project_id, resource_data): 244 super().__init__(project_id=project_id) 245 self._resource_data = resource_data 246 self._nats = None 247 248 @property 249 def full_path(self) -> str: 250 result = re.match(r'https://www.googleapis.com/compute/v1/(.*)', 251 self.self_link) 252 if result: 253 return result.group(1) 254 else: 255 return f'>> {self.self_link}' 256 257 @property 258 def short_path(self) -> str: 259 path = self.project_id + '/' + self.name 260 return path 261 262 @property 263 def name(self) -> str: 264 return self._resource_data['name'] 265 266 @property 267 def self_link(self) -> str: 268 return self._resource_data.get('selfLink', '') 269 270 def subnet_has_nat(self, subnetwork): 271 if not self._resource_data.get('nats', []): 272 return False 273 for n in self._resource_data.get('nats', []): 274 if n['sourceSubnetworkIpRangesToNat'] == 'LIST_OF_SUBNETWORKS': 275 # Cloud NAT configure for specific subnets 276 if 'subnetworks' in n and subnetwork.self_link in [ 277 s['name'] for s in n['subnetworks'] 278 ]: 279 return True 280 else: 281 # Cloud NAT configured for all subnets 282 return True 283 return False
A VPC Router.
248 @property 249 def full_path(self) -> str: 250 result = re.match(r'https://www.googleapis.com/compute/v1/(.*)', 251 self.self_link) 252 if result: 253 return result.group(1) 254 else: 255 return f'>> {self.self_link}'
Returns the full path of this resource.
Example: 'projects/gcpdiag-gke-1-9b90/zones/europe-west4-a/clusters/gke1'
257 @property 258 def short_path(self) -> str: 259 path = self.project_id + '/' + self.name 260 return path
Returns the short name for this resource.
Note that it isn't clear from this name what kind of resource it is.
Example: 'gke1'
270 def subnet_has_nat(self, subnetwork): 271 if not self._resource_data.get('nats', []): 272 return False 273 for n in self._resource_data.get('nats', []): 274 if n['sourceSubnetworkIpRangesToNat'] == 'LIST_OF_SUBNETWORKS': 275 # Cloud NAT configure for specific subnets 276 if 'subnetworks' in n and subnetwork.self_link in [ 277 s['name'] for s in n['subnetworks'] 278 ]: 279 return True 280 else: 281 # Cloud NAT configured for all subnets 282 return True 283 return False
286@dataclasses.dataclass 287class Peering: 288 """VPC Peerings""" 289 290 name: str 291 url: str 292 state: str 293 exports_custom_routes: bool 294 imports_custom_routes: bool 295 auto_creates_routes: bool 296 297 def __str__(self): 298 return self.name
VPC Peerings
301class Network(models.Resource): 302 """A VPC network.""" 303 _resource_data: dict 304 _subnetworks: Optional[Dict[str, Subnetwork]] 305 306 def __init__(self, project_id, resource_data): 307 super().__init__(project_id=project_id) 308 self._resource_data = resource_data 309 self._subnetworks = None 310 311 @property 312 def full_path(self) -> str: 313 result = re.match(r'https://www.googleapis.com/compute/v1/(.*)', 314 self.self_link) 315 if result: 316 return result.group(1) 317 else: 318 return f'>> {self.self_link}' 319 320 @property 321 def short_path(self) -> str: 322 path = self.project_id + '/' + self.name 323 return path 324 325 @property 326 def name(self) -> str: 327 return self._resource_data['name'] 328 329 @property 330 def self_link(self) -> str: 331 return self._resource_data.get('selfLink', '') 332 333 @property 334 def firewall(self) -> 'EffectiveFirewalls': 335 return _get_effective_firewalls(self) 336 337 @property 338 def subnetworks(self) -> Dict[str, Subnetwork]: 339 return _batch_get_subnetworks( 340 self._project_id, frozenset(self._resource_data.get('subnetworks', []))) 341 342 @property 343 def peerings(self) -> List[Peering]: 344 return [ 345 Peering(peer['name'], peer['network'], peer['state'], 346 peer['exportCustomRoutes'], peer['importCustomRoutes'], 347 peer['autoCreateRoutes']) 348 for peer in self._resource_data.get('peerings', []) 349 ]
A VPC network.
311 @property 312 def full_path(self) -> str: 313 result = re.match(r'https://www.googleapis.com/compute/v1/(.*)', 314 self.self_link) 315 if result: 316 return result.group(1) 317 else: 318 return f'>> {self.self_link}'
Returns the full path of this resource.
Example: 'projects/gcpdiag-gke-1-9b90/zones/europe-west4-a/clusters/gke1'
320 @property 321 def short_path(self) -> str: 322 path = self.project_id + '/' + self.name 323 return path
Returns the short name for this resource.
Note that it isn't clear from this name what kind of resource it is.
Example: 'gke1'
457@dataclasses.dataclass 458class FirewallCheckResult: 459 """The result of a firewall connectivity check.""" 460 461 action: str 462 firewall_policy_name: Optional[str] = None 463 firewall_policy_rule_description: Optional[str] = None 464 vpc_firewall_rule_id: Optional[str] = None 465 vpc_firewall_rule_name: Optional[str] = None 466 467 def __str__(self): 468 return self.action 469 470 @property 471 def matched_by_str(self): 472 if self.firewall_policy_name: 473 if self.firewall_policy_rule_description: 474 return f'policy: {self.firewall_policy_name}, rule: {self.firewall_policy_rule_description}' 475 else: 476 return f'policy: {self.firewall_policy_name}' 477 elif self.vpc_firewall_rule_name: 478 return f'vpc firewall rule: {self.vpc_firewall_rule_name}'
The result of a firewall connectivity check.
470 @property 471 def matched_by_str(self): 472 if self.firewall_policy_name: 473 if self.firewall_policy_rule_description: 474 return f'policy: {self.firewall_policy_name}, rule: {self.firewall_policy_rule_description}' 475 else: 476 return f'policy: {self.firewall_policy_name}' 477 elif self.vpc_firewall_rule_name: 478 return f'vpc firewall rule: {self.vpc_firewall_rule_name}'
481class FirewallRuleNotFoundError(Exception): 482 rule_name: str 483 484 def __init__(self, name, disabled=False): 485 # Call the base class constructor with the parameters it needs 486 super().__init__(f'firewall rule not found: {name}') 487 self.rule_name = name 488 self.disabled = disabled
Common base class for all non-exit exceptions.
491class VpcFirewallRule: 492 """Represents firewall rule""" 493 494 def __init__(self, resource_data): 495 self._resource_data = resource_data 496 497 @property 498 def name(self) -> str: 499 return self._resource_data['name'] 500 501 @property 502 def source_ranges(self) -> List[ipaddress.IPv4Network]: 503 return self._resource_data['sourceRanges'] 504 505 @property 506 def target_tags(self) -> set: 507 return self._resource_data['targetTags'] 508 509 @property 510 def allowed(self) -> List[dict]: 511 return self._resource_data['allowed'] 512 513 def is_enabled(self) -> bool: 514 return not self._resource_data['disabled']
Represents firewall rule
866class EffectiveFirewalls: 867 """Effective firewall rules for a VPC network or Instance. 868 869 Includes org/folder firewall policies).""" 870 _resource_data: dict 871 _policies: List[_FirewallPolicy] 872 _vpc_firewall: _VpcFirewall 873 874 def __init__(self, resource_data): 875 self._resource_data = resource_data 876 self._policies = [] 877 if 'firewallPolicys' in resource_data: 878 for policy in resource_data['firewallPolicys']: 879 self._policies.append(_FirewallPolicy(policy)) 880 self._vpc_firewall = _VpcFirewall(resource_data.get('firewalls', {})) 881 882 def check_connectivity_ingress( 883 self, # 884 *, 885 src_ip: IPAddrOrNet, 886 ip_protocol: str, 887 port: Optional[int] = None, 888 source_service_account: Optional[str] = None, 889 source_tags: Optional[List[str]] = None, 890 target_service_account: Optional[str] = None, 891 target_tags: Optional[List[str]] = None) -> FirewallCheckResult: 892 893 if ip_protocol != 'ICMP' and port is None: 894 raise ValueError('TCP and UDP must have port numbers') 895 896 # Firewall policies (organization, folders) 897 for p in self._policies: 898 result = p.check_connectivity_ingress( 899 src_ip=src_ip, 900 ip_protocol=ip_protocol, 901 port=port, 902 #target_network=self._network, 903 target_service_account=target_service_account) 904 if result.action != 'goto_next': 905 return result 906 907 # VPC firewall rules 908 return self._vpc_firewall.check_connectivity_ingress( 909 src_ip=src_ip, 910 ip_protocol=ip_protocol, 911 port=port, 912 source_service_account=source_service_account, 913 source_tags=source_tags, 914 target_service_account=target_service_account, 915 target_tags=target_tags) 916 917 def check_connectivity_egress( 918 self, # 919 *, 920 src_ip: IPAddrOrNet, 921 ip_protocol: str, 922 port: Optional[int] = None, 923 source_service_account: Optional[str] = None, 924 source_tags: Optional[List[str]] = None, 925 target_service_account: Optional[str] = None, 926 target_tags: Optional[List[str]] = None) -> FirewallCheckResult: 927 928 if ip_protocol != 'ICMP' and port is None: 929 raise ValueError('TCP and UDP must have port numbers') 930 931 # Firewall policies (organization, folders) 932 for p in self._policies: 933 result = p.check_connectivity_egress( 934 src_ip=src_ip, 935 ip_protocol=ip_protocol, 936 port=port, 937 #target_network=self._network, 938 target_service_account=target_service_account) 939 if result.action != 'goto_next': 940 return result 941 942 # VPC firewall rules 943 return self._vpc_firewall.check_connectivity_egress( 944 src_ip=src_ip, 945 ip_protocol=ip_protocol, 946 port=port, 947 source_service_account=source_service_account, 948 source_tags=source_tags, 949 target_service_account=target_service_account, 950 target_tags=target_tags) 951 952 def get_vpc_ingress_rules( 953 self, 954 name: Optional[str] = None, 955 name_pattern: Optional[re.Pattern] = None, 956 target_tags: Optional[List[str]] = None) -> List[VpcFirewallRule]: 957 """Retrieve the list of ingress firewall rules matching name or name pattern and target tags. 958 959 Args: 960 name (Optional[str], optional): firewall rune name. Defaults to None. 961 name_pattern (Optional[re.Pattern], optional): firewall rule name pattern. Defaults to None. 962 target_tags (Optional[List[str]], optional): firewall target tags 963 (if not specified any tag will match). Defaults to None. 964 965 Returns: 966 List[VpcFirewallRule]: List of ingress firewall rules 967 """ 968 rules = self._vpc_firewall.get_vpc_ingress_rules(name, name_pattern, 969 target_tags) 970 return rules 971 972 def get_vpc_egress_rules( 973 self, 974 name: Optional[str] = None, 975 name_pattern: Optional[re.Pattern] = None, 976 target_tags: Optional[List[str]] = None) -> List[VpcFirewallRule]: 977 """Retrieve the list of egress firewall rules matching name or name pattern and target tags. 978 979 Args: 980 name (Optional[str], optional): firewall rune name. Defaults to None. 981 name_pattern (Optional[re.Pattern], optional): firewall rule name pattern. Defaults to None. 982 target_tags (Optional[List[str]], optional): firewall target tags 983 (if not specified any tag will match). Defaults to None. 984 985 Returns: 986 List[VpcFirewallRule]: List of egress firewall rules 987 """ 988 rules = self._vpc_firewall.get_vpc_egress_rules(name, name_pattern, 989 target_tags) 990 return rules 991 992 def verify_ingress_rule_exists(self, name: str): 993 """Verify that a certain VPC rule exists. This is useful to verify 994 whether maybe a permission was missing on a shared VPC and an 995 automatic rule couldn't be created.""" 996 return self._vpc_firewall.verify_ingress_rule_exists(name) 997 998 def verify_egress_rule_exists(self, name: str): 999 """Verify that a certain VPC rule exists. This is useful to verify 1000 whether maybe a permission was missing on a shared VPC and an 1001 automatic rule couldn't be created.""" 1002 return self._vpc_firewall.verify_egress_rule_exists(name)
Effective firewall rules for a VPC network or Instance.
Includes org/folder firewall policies).
874 def __init__(self, resource_data): 875 self._resource_data = resource_data 876 self._policies = [] 877 if 'firewallPolicys' in resource_data: 878 for policy in resource_data['firewallPolicys']: 879 self._policies.append(_FirewallPolicy(policy)) 880 self._vpc_firewall = _VpcFirewall(resource_data.get('firewalls', {}))
882 def check_connectivity_ingress( 883 self, # 884 *, 885 src_ip: IPAddrOrNet, 886 ip_protocol: str, 887 port: Optional[int] = None, 888 source_service_account: Optional[str] = None, 889 source_tags: Optional[List[str]] = None, 890 target_service_account: Optional[str] = None, 891 target_tags: Optional[List[str]] = None) -> FirewallCheckResult: 892 893 if ip_protocol != 'ICMP' and port is None: 894 raise ValueError('TCP and UDP must have port numbers') 895 896 # Firewall policies (organization, folders) 897 for p in self._policies: 898 result = p.check_connectivity_ingress( 899 src_ip=src_ip, 900 ip_protocol=ip_protocol, 901 port=port, 902 #target_network=self._network, 903 target_service_account=target_service_account) 904 if result.action != 'goto_next': 905 return result 906 907 # VPC firewall rules 908 return self._vpc_firewall.check_connectivity_ingress( 909 src_ip=src_ip, 910 ip_protocol=ip_protocol, 911 port=port, 912 source_service_account=source_service_account, 913 source_tags=source_tags, 914 target_service_account=target_service_account, 915 target_tags=target_tags)
917 def check_connectivity_egress( 918 self, # 919 *, 920 src_ip: IPAddrOrNet, 921 ip_protocol: str, 922 port: Optional[int] = None, 923 source_service_account: Optional[str] = None, 924 source_tags: Optional[List[str]] = None, 925 target_service_account: Optional[str] = None, 926 target_tags: Optional[List[str]] = None) -> FirewallCheckResult: 927 928 if ip_protocol != 'ICMP' and port is None: 929 raise ValueError('TCP and UDP must have port numbers') 930 931 # Firewall policies (organization, folders) 932 for p in self._policies: 933 result = p.check_connectivity_egress( 934 src_ip=src_ip, 935 ip_protocol=ip_protocol, 936 port=port, 937 #target_network=self._network, 938 target_service_account=target_service_account) 939 if result.action != 'goto_next': 940 return result 941 942 # VPC firewall rules 943 return self._vpc_firewall.check_connectivity_egress( 944 src_ip=src_ip, 945 ip_protocol=ip_protocol, 946 port=port, 947 source_service_account=source_service_account, 948 source_tags=source_tags, 949 target_service_account=target_service_account, 950 target_tags=target_tags)
952 def get_vpc_ingress_rules( 953 self, 954 name: Optional[str] = None, 955 name_pattern: Optional[re.Pattern] = None, 956 target_tags: Optional[List[str]] = None) -> List[VpcFirewallRule]: 957 """Retrieve the list of ingress firewall rules matching name or name pattern and target tags. 958 959 Args: 960 name (Optional[str], optional): firewall rune name. Defaults to None. 961 name_pattern (Optional[re.Pattern], optional): firewall rule name pattern. Defaults to None. 962 target_tags (Optional[List[str]], optional): firewall target tags 963 (if not specified any tag will match). Defaults to None. 964 965 Returns: 966 List[VpcFirewallRule]: List of ingress firewall rules 967 """ 968 rules = self._vpc_firewall.get_vpc_ingress_rules(name, name_pattern, 969 target_tags) 970 return rules
Retrieve the list of ingress firewall rules matching name or name pattern and target tags.
Arguments:
- name (Optional[str], optional): firewall rune name. Defaults to None.
- name_pattern (Optional[re.Pattern], optional): firewall rule name pattern. Defaults to None.
- target_tags (Optional[List[str]], optional): firewall target tags (if not specified any tag will match). Defaults to None.
Returns:
List[VpcFirewallRule]: List of ingress firewall rules
972 def get_vpc_egress_rules( 973 self, 974 name: Optional[str] = None, 975 name_pattern: Optional[re.Pattern] = None, 976 target_tags: Optional[List[str]] = None) -> List[VpcFirewallRule]: 977 """Retrieve the list of egress firewall rules matching name or name pattern and target tags. 978 979 Args: 980 name (Optional[str], optional): firewall rune name. Defaults to None. 981 name_pattern (Optional[re.Pattern], optional): firewall rule name pattern. Defaults to None. 982 target_tags (Optional[List[str]], optional): firewall target tags 983 (if not specified any tag will match). Defaults to None. 984 985 Returns: 986 List[VpcFirewallRule]: List of egress firewall rules 987 """ 988 rules = self._vpc_firewall.get_vpc_egress_rules(name, name_pattern, 989 target_tags) 990 return rules
Retrieve the list of egress firewall rules matching name or name pattern and target tags.
Arguments:
- name (Optional[str], optional): firewall rune name. Defaults to None.
- name_pattern (Optional[re.Pattern], optional): firewall rule name pattern. Defaults to None.
- target_tags (Optional[List[str]], optional): firewall target tags (if not specified any tag will match). Defaults to None.
Returns:
List[VpcFirewallRule]: List of egress firewall rules
992 def verify_ingress_rule_exists(self, name: str): 993 """Verify that a certain VPC rule exists. This is useful to verify 994 whether maybe a permission was missing on a shared VPC and an 995 automatic rule couldn't be created.""" 996 return self._vpc_firewall.verify_ingress_rule_exists(name)
Verify that a certain VPC rule exists. This is useful to verify whether maybe a permission was missing on a shared VPC and an automatic rule couldn't be created.
998 def verify_egress_rule_exists(self, name: str): 999 """Verify that a certain VPC rule exists. This is useful to verify 1000 whether maybe a permission was missing on a shared VPC and an 1001 automatic rule couldn't be created.""" 1002 return self._vpc_firewall.verify_egress_rule_exists(name)
Verify that a certain VPC rule exists. This is useful to verify whether maybe a permission was missing on a shared VPC and an automatic rule couldn't be created.
1005class VPCEffectiveFirewalls(EffectiveFirewalls): 1006 """Effective firewall rules for a VPC network. 1007 1008 Includes org/folder firewall policies).""" 1009 _network: Network 1010 1011 def __init__(self, network, resource_data): 1012 super().__init__(resource_data) 1013 self._network = network
Effective firewall rules for a VPC network.
Includes org/folder firewall policies).
1025@caching.cached_api_call(in_memory=True) 1026def get_network(project_id: str, network_name: str) -> Network: 1027 logging.info('fetching network: %s/%s', project_id, network_name) 1028 compute = apis.get_api('compute', 'v1', project_id) 1029 request = compute.networks().get(project=project_id, network=network_name) 1030 response = request.execute(num_retries=config.API_RETRIES) 1031 return Network(project_id, response)
1034def get_subnetwork_from_url(url: str) -> Subnetwork: 1035 """Returns Subnetwork object given subnetwork url""" 1036 m = re.match((r'https://www.googleapis.com/compute/v1/projects/' 1037 r'([^/]+)/regions/([^/]+)/subnetworks/([^/]+)$'), url) 1038 if not m: 1039 raise ValueError(f"can't parse network url: {url}") 1040 (project_id, region, subnetwork_name) = (m.group(1), m.group(2), m.group(3)) 1041 return get_subnetwork(project_id, region, subnetwork_name)
Returns Subnetwork object given subnetwork url
1044def get_network_from_url(url: str) -> Network: 1045 m = re.match( 1046 r'https://www.googleapis.com/compute/v1/projects/([^/]+)/global/networks/([^/]+)', 1047 url) 1048 if not m: 1049 raise ValueError(f"can't parse network url: {url}") 1050 (project_id, network_name) = (m.group(1), m.group(2)) 1051 return get_network(project_id, network_name)
1054@caching.cached_api_call(in_memory=True) 1055def get_networks(project_id: str) -> List[Network]: 1056 logging.info('fetching network: %s', project_id) 1057 compute = apis.get_api('compute', 'v1', project_id) 1058 request = compute.networks().list(project=project_id) 1059 response = request.execute(num_retries=config.API_RETRIES) 1060 return [Network(project_id, item) for item in response.get('items', [])]
1063@caching.cached_api_call(in_memory=True) 1064def get_subnetwork(project_id: str, region: str, 1065 subnetwork_name: str) -> Subnetwork: 1066 logging.info('fetching network: %s/%s', project_id, subnetwork_name) 1067 compute = apis.get_api('compute', 'v1', project_id) 1068 request = compute.subnetworks().get(project=project_id, 1069 region=region, 1070 subnetwork=subnetwork_name) 1071 response = request.execute(num_retries=config.API_RETRIES) 1072 return Subnetwork(project_id, response)
1101@caching.cached_api_call(in_memory=True) 1102def get_routes(project_id: str) -> List[Route]: 1103 logging.info('fetching routes: %s', project_id) 1104 compute = apis.get_api('compute', 'v1', project_id) 1105 request = compute.routes().list(project=project_id) 1106 response = request.execute(num_retries=config.API_RETRIES) 1107 return [Route(project_id, item) for item in response.get('items', [])]
1110@caching.cached_api_call(in_memory=True) 1111def get_zones(project_id: str) -> List[ManagedZone]: 1112 logging.info('fetching DNS zones: %s', project_id) 1113 dns = apis.get_api('dns', 'v1beta2', project_id) 1114 request = dns.managedZones().list(project=project_id) 1115 response = request.execute(num_retries=config.API_RETRIES) 1116 zones = [] 1117 for zone in response.get('managedZones', []): 1118 request2 = dns.managedZones().get(project=project_id, 1119 managedZone=zone['name']) 1120 response2 = request2.execute(num_retries=config.API_RETRIES) 1121 zones.append(ManagedZone(project_id, response2)) 1122 return zones
1125@caching.cached_api_call(in_memory=True) 1126def get_router(project_id: str, region: str, network) -> Router: 1127 logging.info('fetching routers: %s/%s', project_id, region) 1128 compute = apis.get_api('compute', 'v1', project_id) 1129 request = compute.routers().list(project=project_id, 1130 region=region, 1131 filter=f'network="{network.self_link}"') 1132 response = request.execute(num_retries=config.API_RETRIES) 1133 return Router(project_id, next(iter(response.get('items', [{}]))))
1136class VPCSubnetworkIAMPolicy(iam.BaseIAMPolicy): 1137 1138 def _is_resource_permission(self, permission): 1139 return True
Common class for IAM policies
1142@caching.cached_api_call(in_memory=True) 1143def get_subnetwork_iam_policy(project_id: str, region: str, 1144 subnetwork_name: str) -> VPCSubnetworkIAMPolicy: 1145 resource_name = (f'projects/{project_id}/regions/{region}/' 1146 f'subnetworks/{subnetwork_name}') 1147 1148 compute = apis.get_api('compute', 'v1', project_id) 1149 request = compute.subnetworks().getIamPolicy(project=project_id, 1150 region=region, 1151 resource=subnetwork_name) 1152 1153 return iam.fetch_iam_policy(request, VPCSubnetworkIAMPolicy, project_id, 1154 resource_name)
1157class Address(models.Resource): 1158 """IP Addresses.""" 1159 _resource_data: dict 1160 1161 def __init__(self, project_id, resource_data): 1162 super().__init__(project_id=project_id) 1163 self._resource_data = resource_data 1164 1165 @property 1166 def full_path(self) -> str: 1167 result = re.match(r'https://www.googleapis.com/compute/v1/(.*)', 1168 self.self_link) 1169 if result: 1170 return result.group(1) 1171 else: 1172 return f'>> {self.self_link}' 1173 1174 @property 1175 def short_path(self) -> str: 1176 path = self.project_id + '/' + self.name 1177 return path 1178 1179 @property 1180 def name(self) -> str: 1181 return self._resource_data['name'] 1182 1183 @property 1184 def self_link(self) -> str: 1185 return self._resource_data.get('selfLink', '') 1186 1187 @property 1188 def subnetwork(self) -> str: 1189 return self._resource_data['subnetwork'] 1190 1191 @property 1192 def status(self) -> str: 1193 return self._resource_data['status']
IP Addresses.
1165 @property 1166 def full_path(self) -> str: 1167 result = re.match(r'https://www.googleapis.com/compute/v1/(.*)', 1168 self.self_link) 1169 if result: 1170 return result.group(1) 1171 else: 1172 return f'>> {self.self_link}'
Returns the full path of this resource.
Example: 'projects/gcpdiag-gke-1-9b90/zones/europe-west4-a/clusters/gke1'
1174 @property 1175 def short_path(self) -> str: 1176 path = self.project_id + '/' + self.name 1177 return path
Returns the short name for this resource.
Note that it isn't clear from this name what kind of resource it is.
Example: 'gke1'
1196@caching.cached_api_call(in_memory=True) 1197def get_addresses(project_id: str) -> List[Address]: 1198 logging.info('fetching addresses list: %s', project_id) 1199 compute = apis.get_api('compute', 'v1', project_id) 1200 addresses = [] 1201 request = compute.addresses().aggregatedList(project=project_id) 1202 response = request.execute(num_retries=config.API_RETRIES) 1203 addresses_by_regions = response['items'] 1204 for _, data_ in addresses_by_regions.items(): 1205 if 'addresses' not in data_: 1206 continue 1207 addresses.extend( 1208 [Address(project_id, address) for address in data_['addresses']]) 1209 return addresses