gcpdiag.queries.network
32class Subnetwork(models.Resource): 33 """A VPC subnetwork.""" 34 _resource_data: dict 35 36 def __init__(self, project_id, resource_data): 37 super().__init__(project_id=project_id) 38 self._resource_data = resource_data 39 40 @property 41 def full_path(self) -> str: 42 result = re.match(r'https://www.googleapis.com/compute/v1/(.*)', 43 self.self_link) 44 if result: 45 return result.group(1) 46 else: 47 return f'>> {self.self_link}' 48 49 @property 50 def short_path(self) -> str: 51 path = self.project_id + '/' + self.name 52 return path 53 54 @property 55 def name(self) -> str: 56 return self._resource_data['name'] 57 58 @property 59 def self_link(self) -> str: 60 return self._resource_data.get('selfLink', '') 61 62 @property 63 def ip_network(self) -> IPv4NetOrIPv6Net: 64 return ipaddress.ip_network(self._resource_data['ipCidrRange']) 65 66 @property 67 def region(self) -> str: 68 # https://www.googleapis.com/compute/v1/projects/gcpdiag-gke1-aaaa/regions/europe-west4 69 m = re.match( 70 r'https://www.googleapis.com/compute/v1/projects/([^/]+)/regions/([^/]+)', 71 self._resource_data['region']) 72 if not m: 73 raise RuntimeError( 74 f"can't parse region URL: {self._resource_data['region']}") 75 return m.group(2) 76 77 def is_private_ip_google_access(self) -> bool: 78 return self._resource_data.get('privateIpGoogleAccess', False) 79 80 @property 81 def network(self): 82 return self._resource_data['network']
A VPC subnetwork.
40 @property 41 def full_path(self) -> str: 42 result = re.match(r'https://www.googleapis.com/compute/v1/(.*)', 43 self.self_link) 44 if result: 45 return result.group(1) 46 else: 47 return f'>> {self.self_link}'
Returns the full path of this resource.
Example: 'projects/gcpdiag-gke-1-9b90/zones/europe-west4-a/clusters/gke1'
49 @property 50 def short_path(self) -> str: 51 path = self.project_id + '/' + self.name 52 return path
Returns the short name for this resource.
Note that it isn't clear from this name what kind of resource it is.
Example: 'gke1'
66 @property 67 def region(self) -> str: 68 # https://www.googleapis.com/compute/v1/projects/gcpdiag-gke1-aaaa/regions/europe-west4 69 m = re.match( 70 r'https://www.googleapis.com/compute/v1/projects/([^/]+)/regions/([^/]+)', 71 self._resource_data['region']) 72 if not m: 73 raise RuntimeError( 74 f"can't parse region URL: {self._resource_data['region']}") 75 return m.group(2)
Inherited Members
- gcpdiag.models.Resource
- project_id
85class Route(models.Resource): 86 """A VPC Route.""" 87 _resource_data: dict 88 89 def __init__(self, project_id, resource_data): 90 super().__init__(project_id=project_id) 91 self._resource_data = resource_data 92 93 @property 94 def full_path(self) -> str: 95 result = re.match(r'https://www.googleapis.com/compute/v1/(.*)', 96 self.self_link) 97 if result: 98 return result.group(1) 99 else: 100 return f'>> {self.self_link}' 101 102 @property 103 def short_path(self) -> str: 104 path = self.project_id + '/' + self.name 105 return path 106 107 @property 108 def name(self) -> str: 109 return self._resource_data['name'] 110 111 @property 112 def self_link(self) -> str: 113 return self._resource_data['selfLink'] 114 115 @property 116 def network(self) -> str: 117 return self._resource_data['network'] 118 119 @property 120 def tags(self) -> List[str]: 121 if 'tags' in self._resource_data: 122 return self._resource_data['tags'] 123 return [] 124 125 @property 126 def dest_range(self) -> str: 127 return self._resource_data['destRange'] 128 129 @property 130 def next_hop_gateway(self) -> Optional[str]: 131 if 'nextHopGateway' in self._resource_data: 132 return self._resource_data['nextHopGateway'] 133 return None 134 135 @property 136 def next_hop_vpn_tunnel(self) -> Optional[str]: 137 return self._resource_data.get('nextHopVpnTunnel') 138 139 @property 140 def next_hop_hub(self) -> Optional[str]: 141 return self._resource_data.get('nextHopHub') 142 143 @property 144 def priority(self) -> int: 145 return self._resource_data['priority'] 146 147 def check_route_match(self, ip1: IPAddrOrNet, ip2: str) -> bool: 148 ip2_list = [ipaddress.ip_network(ip2)] 149 if _ip_match(ip1, ip2_list, 'allow'): 150 return True 151 return False
A VPC Route.
93 @property 94 def full_path(self) -> str: 95 result = re.match(r'https://www.googleapis.com/compute/v1/(.*)', 96 self.self_link) 97 if result: 98 return result.group(1) 99 else: 100 return f'>> {self.self_link}'
Returns the full path of this resource.
Example: 'projects/gcpdiag-gke-1-9b90/zones/europe-west4-a/clusters/gke1'
102 @property 103 def short_path(self) -> str: 104 path = self.project_id + '/' + self.name 105 return path
Returns the short name for this resource.
Note that it isn't clear from this name what kind of resource it is.
Example: 'gke1'
Inherited Members
- gcpdiag.models.Resource
- project_id
154class ManagedZone(models.Resource): 155 """ 156 Represent a DNS zone (public or private 157 https://cloud.google.com/dns/docs/reference/v1beta2/managedZones 158 """ 159 _resource_data: dict 160 161 def __init__(self, project_id, resource_data): 162 super().__init__(project_id=project_id) 163 self._resource_data = resource_data 164 165 @property 166 def cloud_logging_config(self) -> bool: 167 return self._resource_data['cloudLoggingConfig'].get('enableLogging', False) 168 169 @property 170 def is_public(self) -> bool: 171 return self._resource_data['visibility'] == 'public' 172 173 @property 174 def vpc_attached(self) -> bool: 175 if 'privateVisibilityConfig' not in self._resource_data: 176 self._resource_data['privateVisibilityConfig'] = {} 177 178 return (self._resource_data['privateVisibilityConfig'].get( 179 'networks', False) or 180 self._resource_data['privateVisibilityConfig'].get( 181 'gkeClusters', False)) 182 183 @property 184 def dnssec_config_state(self) -> bool: 185 if 'dnssecConfig' not in self._resource_data: 186 self._resource_data['dnssecConfig'] = {} 187 188 return self._resource_data['dnssecConfig'].get('state', False) 189 190 @property 191 def name(self) -> str: 192 return self._resource_data['name'] 193 194 @property 195 def full_path(self) -> str: 196 result = re.match(r'https://dns.googleapis.com/dns/v1beta2/(.*)', 197 self.self_link) 198 if result: 199 return result.group(1) 200 else: 201 return f'>> {self.self_link}' 202 203 @property 204 def short_path(self) -> str: 205 path = self.project_id + '/' + self.name 206 return path 207 208 @property 209 def self_link(self) -> str: 210 return self._resource_data.get('selfLink', '')
Represent a DNS zone (public or private https://cloud.google.com/dns/docs/reference/v1beta2/managedZones
173 @property 174 def vpc_attached(self) -> bool: 175 if 'privateVisibilityConfig' not in self._resource_data: 176 self._resource_data['privateVisibilityConfig'] = {} 177 178 return (self._resource_data['privateVisibilityConfig'].get( 179 'networks', False) or 180 self._resource_data['privateVisibilityConfig'].get( 181 'gkeClusters', False))
194 @property 195 def full_path(self) -> str: 196 result = re.match(r'https://dns.googleapis.com/dns/v1beta2/(.*)', 197 self.self_link) 198 if result: 199 return result.group(1) 200 else: 201 return f'>> {self.self_link}'
Returns the full path of this resource.
Example: 'projects/gcpdiag-gke-1-9b90/zones/europe-west4-a/clusters/gke1'
203 @property 204 def short_path(self) -> str: 205 path = self.project_id + '/' + self.name 206 return path
Returns the short name for this resource.
Note that it isn't clear from this name what kind of resource it is.
Example: 'gke1'
Inherited Members
- gcpdiag.models.Resource
- project_id
213class Router(models.Resource): 214 """A VPC Router.""" 215 _resource_data: dict 216 217 def __init__(self, project_id, resource_data): 218 super().__init__(project_id=project_id) 219 self._resource_data = resource_data 220 self._nats = None 221 222 @property 223 def full_path(self) -> str: 224 result = re.match(r'https://www.googleapis.com/compute/v1/(.*)', 225 self.self_link) 226 if result: 227 return result.group(1) 228 else: 229 return f'>> {self.self_link}' 230 231 @property 232 def short_path(self) -> str: 233 path = self.project_id + '/' + self.name 234 return path 235 236 @property 237 def name(self) -> str: 238 return self._resource_data['name'] 239 240 @property 241 def self_link(self) -> str: 242 return self._resource_data.get('selfLink', '') 243 244 def subnet_has_nat(self, subnetwork): 245 if not self._resource_data.get('nats', []): 246 return False 247 for n in self._resource_data.get('nats', []): 248 if n['sourceSubnetworkIpRangesToNat'] == 'LIST_OF_SUBNETWORKS': 249 # Cloud NAT configure for specific subnets 250 if 'subnetworks' in n and subnetwork.self_link in [ 251 s['name'] for s in n['subnetworks'] 252 ]: 253 return True 254 else: 255 # Cloud NAT configured for all subnets 256 return True 257 return False
A VPC Router.
222 @property 223 def full_path(self) -> str: 224 result = re.match(r'https://www.googleapis.com/compute/v1/(.*)', 225 self.self_link) 226 if result: 227 return result.group(1) 228 else: 229 return f'>> {self.self_link}'
Returns the full path of this resource.
Example: 'projects/gcpdiag-gke-1-9b90/zones/europe-west4-a/clusters/gke1'
231 @property 232 def short_path(self) -> str: 233 path = self.project_id + '/' + self.name 234 return path
Returns the short name for this resource.
Note that it isn't clear from this name what kind of resource it is.
Example: 'gke1'
244 def subnet_has_nat(self, subnetwork): 245 if not self._resource_data.get('nats', []): 246 return False 247 for n in self._resource_data.get('nats', []): 248 if n['sourceSubnetworkIpRangesToNat'] == 'LIST_OF_SUBNETWORKS': 249 # Cloud NAT configure for specific subnets 250 if 'subnetworks' in n and subnetwork.self_link in [ 251 s['name'] for s in n['subnetworks'] 252 ]: 253 return True 254 else: 255 # Cloud NAT configured for all subnets 256 return True 257 return False
Inherited Members
- gcpdiag.models.Resource
- project_id
260@dataclasses.dataclass 261class Peering: 262 """VPC Peerings""" 263 name: str 264 url: str 265 state: str 266 exports_custom_routes: bool 267 imports_custom_routes: bool 268 auto_creates_routes: bool 269 270 def __str__(self): 271 return self.name
VPC Peerings
274class Network(models.Resource): 275 """A VPC network.""" 276 _resource_data: dict 277 _subnetworks: Optional[Dict[str, Subnetwork]] 278 279 def __init__(self, project_id, resource_data): 280 super().__init__(project_id=project_id) 281 self._resource_data = resource_data 282 self._subnetworks = None 283 284 @property 285 def full_path(self) -> str: 286 result = re.match(r'https://www.googleapis.com/compute/v1/(.*)', 287 self.self_link) 288 if result: 289 return result.group(1) 290 else: 291 return f'>> {self.self_link}' 292 293 @property 294 def short_path(self) -> str: 295 path = self.project_id + '/' + self.name 296 return path 297 298 @property 299 def name(self) -> str: 300 return self._resource_data['name'] 301 302 @property 303 def self_link(self) -> str: 304 return self._resource_data.get('selfLink', '') 305 306 @property 307 def firewall(self) -> 'EffectiveFirewalls': 308 return _get_effective_firewalls(self) 309 310 @property 311 def subnetworks(self) -> Dict[str, Subnetwork]: 312 return _batch_get_subnetworks( 313 self._project_id, frozenset(self._resource_data.get('subnetworks', []))) 314 315 @property 316 def peerings(self) -> List[Peering]: 317 return [ 318 Peering(peer['name'], peer['network'], peer['state'], 319 peer['exportCustomRoutes'], peer['importCustomRoutes'], 320 peer['autoCreateRoutes']) 321 for peer in self._resource_data.get('peerings', []) 322 ]
A VPC network.
284 @property 285 def full_path(self) -> str: 286 result = re.match(r'https://www.googleapis.com/compute/v1/(.*)', 287 self.self_link) 288 if result: 289 return result.group(1) 290 else: 291 return f'>> {self.self_link}'
Returns the full path of this resource.
Example: 'projects/gcpdiag-gke-1-9b90/zones/europe-west4-a/clusters/gke1'
293 @property 294 def short_path(self) -> str: 295 path = self.project_id + '/' + self.name 296 return path
Returns the short name for this resource.
Note that it isn't clear from this name what kind of resource it is.
Example: 'gke1'
Inherited Members
- gcpdiag.models.Resource
- project_id
430@dataclasses.dataclass 431class FirewallCheckResult: 432 """The result of a firewall connectivity check.""" 433 action: str 434 firewall_policy_name: Optional[str] = None 435 firewall_policy_rule_description: Optional[str] = None 436 vpc_firewall_rule_id: Optional[str] = None 437 vpc_firewall_rule_name: Optional[str] = None 438 439 def __str__(self): 440 return self.action 441 442 @property 443 def matched_by_str(self): 444 if self.firewall_policy_name: 445 if self.firewall_policy_rule_description: 446 return f'policy: {self.firewall_policy_name}, rule: {self.firewall_policy_rule_description}' 447 else: 448 return f'policy: {self.firewall_policy_name}' 449 elif self.vpc_firewall_rule_name: 450 return f'vpc firewall rule: {self.vpc_firewall_rule_name}'
The result of a firewall connectivity check.
442 @property 443 def matched_by_str(self): 444 if self.firewall_policy_name: 445 if self.firewall_policy_rule_description: 446 return f'policy: {self.firewall_policy_name}, rule: {self.firewall_policy_rule_description}' 447 else: 448 return f'policy: {self.firewall_policy_name}' 449 elif self.vpc_firewall_rule_name: 450 return f'vpc firewall rule: {self.vpc_firewall_rule_name}'
453class FirewallRuleNotFoundError(Exception): 454 rule_name: str 455 456 def __init__(self, name, disabled=False): 457 # Call the base class constructor with the parameters it needs 458 super().__init__(f'firewall rule not found: {name}') 459 self.rule_name = name 460 self.disabled = disabled
Common base class for all non-exit exceptions.
Inherited Members
- builtins.BaseException
- with_traceback
- args
463class VpcFirewallRule: 464 """Represents firewall rule""" 465 466 def __init__(self, resource_data): 467 self._resource_data = resource_data 468 469 @property 470 def name(self) -> str: 471 return self._resource_data['name'] 472 473 @property 474 def source_ranges(self) -> List[ipaddress.IPv4Network]: 475 return self._resource_data['sourceRanges'] 476 477 @property 478 def target_tags(self) -> set: 479 return self._resource_data['targetTags'] 480 481 @property 482 def allowed(self) -> List[dict]: 483 return self._resource_data['allowed'] 484 485 def is_enabled(self) -> bool: 486 return not self._resource_data['disabled']
Represents firewall rule
834class EffectiveFirewalls: 835 """Effective firewall rules for a VPC network or Instance. 836 837 Includes org/folder firewall policies).""" 838 _resource_data: dict 839 _policies: List[_FirewallPolicy] 840 _vpc_firewall: _VpcFirewall 841 842 def __init__(self, resource_data): 843 self._resource_data = resource_data 844 self._policies = [] 845 if 'firewallPolicys' in resource_data: 846 for policy in resource_data['firewallPolicys']: 847 self._policies.append(_FirewallPolicy(policy)) 848 self._vpc_firewall = _VpcFirewall(resource_data.get('firewalls', {})) 849 850 def check_connectivity_ingress( 851 self, # 852 *, 853 src_ip: IPAddrOrNet, 854 ip_protocol: str, 855 port: Optional[int] = None, 856 source_service_account: Optional[str] = None, 857 source_tags: Optional[List[str]] = None, 858 target_service_account: Optional[str] = None, 859 target_tags: Optional[List[str]] = None) -> FirewallCheckResult: 860 861 if ip_protocol != 'ICMP' and port is None: 862 raise ValueError('TCP and UDP must have port numbers') 863 864 # Firewall policies (organization, folders) 865 for p in self._policies: 866 result = p.check_connectivity_ingress( 867 src_ip=src_ip, 868 ip_protocol=ip_protocol, 869 port=port, 870 #target_network=self._network, 871 target_service_account=target_service_account) 872 if result.action != 'goto_next': 873 return result 874 875 # VPC firewall rules 876 return self._vpc_firewall.check_connectivity_ingress( 877 src_ip=src_ip, 878 ip_protocol=ip_protocol, 879 port=port, 880 source_service_account=source_service_account, 881 source_tags=source_tags, 882 target_service_account=target_service_account, 883 target_tags=target_tags) 884 885 def check_connectivity_egress( 886 self, # 887 *, 888 src_ip: IPAddrOrNet, 889 ip_protocol: str, 890 port: Optional[int] = None, 891 source_service_account: Optional[str] = None, 892 source_tags: Optional[List[str]] = None, 893 target_service_account: Optional[str] = None, 894 target_tags: Optional[List[str]] = None) -> FirewallCheckResult: 895 896 if ip_protocol != 'ICMP' and port is None: 897 raise ValueError('TCP and UDP must have port numbers') 898 899 # Firewall policies (organization, folders) 900 for p in self._policies: 901 result = p.check_connectivity_egress( 902 src_ip=src_ip, 903 ip_protocol=ip_protocol, 904 port=port, 905 #target_network=self._network, 906 target_service_account=target_service_account) 907 if result.action != 'goto_next': 908 return result 909 910 # VPC firewall rules 911 return self._vpc_firewall.check_connectivity_egress( 912 src_ip=src_ip, 913 ip_protocol=ip_protocol, 914 port=port, 915 source_service_account=source_service_account, 916 source_tags=source_tags, 917 target_service_account=target_service_account, 918 target_tags=target_tags) 919 920 def get_vpc_ingress_rules( 921 self, 922 name: Optional[str] = None, 923 name_pattern: Optional[re.Pattern] = None, 924 target_tags: Optional[List[str]] = None) -> List[VpcFirewallRule]: 925 """Retrieve the list of ingress firewall rules matching name or name pattern and target tags. 926 927 Args: 928 name (Optional[str], optional): firewall rune name. Defaults to None. 929 name_pattern (Optional[re.Pattern], optional): firewall rule name pattern. Defaults to None. 930 target_tags (Optional[List[str]], optional): firewall target tags 931 (if not specified any tag will match). Defaults to None. 932 933 Returns: 934 List[VpcFirewallRule]: List of ingress firewall rules 935 """ 936 rules = self._vpc_firewall.get_vpc_ingress_rules(name, name_pattern, 937 target_tags) 938 return rules 939 940 def get_vpc_egress_rules( 941 self, 942 name: Optional[str] = None, 943 name_pattern: Optional[re.Pattern] = None, 944 target_tags: Optional[List[str]] = None) -> List[VpcFirewallRule]: 945 """Retrieve the list of egress firewall rules matching name or name pattern and target tags. 946 947 Args: 948 name (Optional[str], optional): firewall rune name. Defaults to None. 949 name_pattern (Optional[re.Pattern], optional): firewall rule name pattern. Defaults to None. 950 target_tags (Optional[List[str]], optional): firewall target tags 951 (if not specified any tag will match). Defaults to None. 952 953 Returns: 954 List[VpcFirewallRule]: List of egress firewall rules 955 """ 956 rules = self._vpc_firewall.get_vpc_egress_rules(name, name_pattern, 957 target_tags) 958 return rules 959 960 def verify_ingress_rule_exists(self, name: str): 961 """Verify that a certain VPC rule exists. This is useful to verify 962 whether maybe a permission was missing on a shared VPC and an 963 automatic rule couldn't be created.""" 964 return self._vpc_firewall.verify_ingress_rule_exists(name) 965 966 def verify_egress_rule_exists(self, name: str): 967 """Verify that a certain VPC rule exists. This is useful to verify 968 whether maybe a permission was missing on a shared VPC and an 969 automatic rule couldn't be created.""" 970 return self._vpc_firewall.verify_egress_rule_exists(name)
Effective firewall rules for a VPC network or Instance.
Includes org/folder firewall policies).
842 def __init__(self, resource_data): 843 self._resource_data = resource_data 844 self._policies = [] 845 if 'firewallPolicys' in resource_data: 846 for policy in resource_data['firewallPolicys']: 847 self._policies.append(_FirewallPolicy(policy)) 848 self._vpc_firewall = _VpcFirewall(resource_data.get('firewalls', {}))
850 def check_connectivity_ingress( 851 self, # 852 *, 853 src_ip: IPAddrOrNet, 854 ip_protocol: str, 855 port: Optional[int] = None, 856 source_service_account: Optional[str] = None, 857 source_tags: Optional[List[str]] = None, 858 target_service_account: Optional[str] = None, 859 target_tags: Optional[List[str]] = None) -> FirewallCheckResult: 860 861 if ip_protocol != 'ICMP' and port is None: 862 raise ValueError('TCP and UDP must have port numbers') 863 864 # Firewall policies (organization, folders) 865 for p in self._policies: 866 result = p.check_connectivity_ingress( 867 src_ip=src_ip, 868 ip_protocol=ip_protocol, 869 port=port, 870 #target_network=self._network, 871 target_service_account=target_service_account) 872 if result.action != 'goto_next': 873 return result 874 875 # VPC firewall rules 876 return self._vpc_firewall.check_connectivity_ingress( 877 src_ip=src_ip, 878 ip_protocol=ip_protocol, 879 port=port, 880 source_service_account=source_service_account, 881 source_tags=source_tags, 882 target_service_account=target_service_account, 883 target_tags=target_tags)
885 def check_connectivity_egress( 886 self, # 887 *, 888 src_ip: IPAddrOrNet, 889 ip_protocol: str, 890 port: Optional[int] = None, 891 source_service_account: Optional[str] = None, 892 source_tags: Optional[List[str]] = None, 893 target_service_account: Optional[str] = None, 894 target_tags: Optional[List[str]] = None) -> FirewallCheckResult: 895 896 if ip_protocol != 'ICMP' and port is None: 897 raise ValueError('TCP and UDP must have port numbers') 898 899 # Firewall policies (organization, folders) 900 for p in self._policies: 901 result = p.check_connectivity_egress( 902 src_ip=src_ip, 903 ip_protocol=ip_protocol, 904 port=port, 905 #target_network=self._network, 906 target_service_account=target_service_account) 907 if result.action != 'goto_next': 908 return result 909 910 # VPC firewall rules 911 return self._vpc_firewall.check_connectivity_egress( 912 src_ip=src_ip, 913 ip_protocol=ip_protocol, 914 port=port, 915 source_service_account=source_service_account, 916 source_tags=source_tags, 917 target_service_account=target_service_account, 918 target_tags=target_tags)
920 def get_vpc_ingress_rules( 921 self, 922 name: Optional[str] = None, 923 name_pattern: Optional[re.Pattern] = None, 924 target_tags: Optional[List[str]] = None) -> List[VpcFirewallRule]: 925 """Retrieve the list of ingress firewall rules matching name or name pattern and target tags. 926 927 Args: 928 name (Optional[str], optional): firewall rune name. Defaults to None. 929 name_pattern (Optional[re.Pattern], optional): firewall rule name pattern. Defaults to None. 930 target_tags (Optional[List[str]], optional): firewall target tags 931 (if not specified any tag will match). Defaults to None. 932 933 Returns: 934 List[VpcFirewallRule]: List of ingress firewall rules 935 """ 936 rules = self._vpc_firewall.get_vpc_ingress_rules(name, name_pattern, 937 target_tags) 938 return rules
Retrieve the list of ingress firewall rules matching name or name pattern and target tags.
Arguments:
- name (Optional[str], optional): firewall rune name. Defaults to None.
- name_pattern (Optional[re.Pattern], optional): firewall rule name pattern. Defaults to None.
- target_tags (Optional[List[str]], optional): firewall target tags (if not specified any tag will match). Defaults to None.
Returns:
List[VpcFirewallRule]: List of ingress firewall rules
940 def get_vpc_egress_rules( 941 self, 942 name: Optional[str] = None, 943 name_pattern: Optional[re.Pattern] = None, 944 target_tags: Optional[List[str]] = None) -> List[VpcFirewallRule]: 945 """Retrieve the list of egress firewall rules matching name or name pattern and target tags. 946 947 Args: 948 name (Optional[str], optional): firewall rune name. Defaults to None. 949 name_pattern (Optional[re.Pattern], optional): firewall rule name pattern. Defaults to None. 950 target_tags (Optional[List[str]], optional): firewall target tags 951 (if not specified any tag will match). Defaults to None. 952 953 Returns: 954 List[VpcFirewallRule]: List of egress firewall rules 955 """ 956 rules = self._vpc_firewall.get_vpc_egress_rules(name, name_pattern, 957 target_tags) 958 return rules
Retrieve the list of egress firewall rules matching name or name pattern and target tags.
Arguments:
- name (Optional[str], optional): firewall rune name. Defaults to None.
- name_pattern (Optional[re.Pattern], optional): firewall rule name pattern. Defaults to None.
- target_tags (Optional[List[str]], optional): firewall target tags (if not specified any tag will match). Defaults to None.
Returns:
List[VpcFirewallRule]: List of egress firewall rules
960 def verify_ingress_rule_exists(self, name: str): 961 """Verify that a certain VPC rule exists. This is useful to verify 962 whether maybe a permission was missing on a shared VPC and an 963 automatic rule couldn't be created.""" 964 return self._vpc_firewall.verify_ingress_rule_exists(name)
Verify that a certain VPC rule exists. This is useful to verify whether maybe a permission was missing on a shared VPC and an automatic rule couldn't be created.
966 def verify_egress_rule_exists(self, name: str): 967 """Verify that a certain VPC rule exists. This is useful to verify 968 whether maybe a permission was missing on a shared VPC and an 969 automatic rule couldn't be created.""" 970 return self._vpc_firewall.verify_egress_rule_exists(name)
Verify that a certain VPC rule exists. This is useful to verify whether maybe a permission was missing on a shared VPC and an automatic rule couldn't be created.
973class VPCEffectiveFirewalls(EffectiveFirewalls): 974 """Effective firewall rules for a VPC network. 975 976 Includes org/folder firewall policies).""" 977 _network: Network 978 979 def __init__(self, network, resource_data): 980 super().__init__(resource_data) 981 self._network = network
Effective firewall rules for a VPC network.
Includes org/folder firewall policies).
993@caching.cached_api_call(in_memory=True) 994def get_network(project_id: str, network_name: str) -> Network: 995 logging.info('fetching network: %s/%s', project_id, network_name) 996 compute = apis.get_api('compute', 'v1', project_id) 997 request = compute.networks().get(project=project_id, network=network_name) 998 response = request.execute(num_retries=config.API_RETRIES) 999 return Network(project_id, response)
1002def get_subnetwork_from_url(url: str) -> Subnetwork: 1003 """Returns Subnetwork object given subnetwork url""" 1004 m = re.match((r'https://www.googleapis.com/compute/v1/projects/' 1005 r'([^/]+)/regions/([^/]+)/subnetworks/([^/]+)$'), url) 1006 if not m: 1007 raise ValueError(f"can't parse network url: {url}") 1008 (project_id, region, subnetwork_name) = (m.group(1), m.group(2), m.group(3)) 1009 return get_subnetwork(project_id, region, subnetwork_name)
Returns Subnetwork object given subnetwork url
1012def get_network_from_url(url: str) -> Network: 1013 m = re.match( 1014 r'https://www.googleapis.com/compute/v1/projects/([^/]+)/global/networks/([^/]+)', 1015 url) 1016 if not m: 1017 raise ValueError(f"can't parse network url: {url}") 1018 (project_id, network_name) = (m.group(1), m.group(2)) 1019 return get_network(project_id, network_name)
1022@caching.cached_api_call(in_memory=True) 1023def get_networks(project_id: str) -> List[Network]: 1024 logging.info('fetching network: %s', project_id) 1025 compute = apis.get_api('compute', 'v1', project_id) 1026 request = compute.networks().list(project=project_id) 1027 response = request.execute(num_retries=config.API_RETRIES) 1028 return [Network(project_id, item) for item in response.get('items', [])]
1031@caching.cached_api_call(in_memory=True) 1032def get_subnetwork(project_id: str, region: str, 1033 subnetwork_name: str) -> Subnetwork: 1034 logging.info('fetching network: %s/%s', project_id, subnetwork_name) 1035 compute = apis.get_api('compute', 'v1', project_id) 1036 request = compute.subnetworks().get(project=project_id, 1037 region=region, 1038 subnetwork=subnetwork_name) 1039 response = request.execute(num_retries=config.API_RETRIES) 1040 return Subnetwork(project_id, response)
1069@caching.cached_api_call(in_memory=True) 1070def get_routes(project_id: str) -> List[Route]: 1071 logging.info('fetching routes: %s', project_id) 1072 compute = apis.get_api('compute', 'v1', project_id) 1073 request = compute.routes().list(project=project_id) 1074 response = request.execute(num_retries=config.API_RETRIES) 1075 return [Route(project_id, item) for item in response.get('items', [])]
1078@caching.cached_api_call(in_memory=True) 1079def get_zones(project_id: str) -> List[ManagedZone]: 1080 logging.info('fetching DNS zones: %s', project_id) 1081 dns = apis.get_api('dns', 'v1beta2', project_id) 1082 request = dns.managedZones().list(project=project_id) 1083 response = request.execute(num_retries=config.API_RETRIES) 1084 zones = [] 1085 for zone in response.get('managedZones', []): 1086 request2 = dns.managedZones().get(project=project_id, 1087 managedZone=zone['name']) 1088 response2 = request2.execute(num_retries=config.API_RETRIES) 1089 zones.append(ManagedZone(project_id, response2)) 1090 return zones
1093@caching.cached_api_call(in_memory=True) 1094def get_router(project_id: str, region: str, network) -> Router: 1095 logging.info('fetching routers: %s/%s', project_id, region) 1096 compute = apis.get_api('compute', 'v1', project_id) 1097 request = compute.routers().list(project=project_id, 1098 region=region, 1099 filter=f'network="{network.self_link}"') 1100 response = request.execute(num_retries=config.API_RETRIES) 1101 return Router(project_id, next(iter(response.get('items', [{}]))))
1104class VPCSubnetworkIAMPolicy(iam.BaseIAMPolicy): 1105 1106 def _is_resource_permission(self, permission): 1107 return True
Common class for IAM policies
Inherited Members
- gcpdiag.queries.iam.BaseIAMPolicy
- BaseIAMPolicy
- full_path
- get_member_permissions
- get_members
- get_member_type
- has_permission
- has_any_permission
- has_role_permissions
- gcpdiag.models.Resource
- project_id
- short_path
1110@caching.cached_api_call(in_memory=True) 1111def get_subnetwork_iam_policy(project_id: str, region: str, 1112 subnetwork_name: str) -> VPCSubnetworkIAMPolicy: 1113 resource_name = (f'projects/{project_id}/regions/{region}/' 1114 f'subnetworks/{subnetwork_name}') 1115 1116 compute = apis.get_api('compute', 'v1', project_id) 1117 request = compute.subnetworks().getIamPolicy(project=project_id, 1118 region=region, 1119 resource=subnetwork_name) 1120 1121 return iam.fetch_iam_policy(request, VPCSubnetworkIAMPolicy, project_id, 1122 resource_name)
1125class Address(models.Resource): 1126 """IP Addresses.""" 1127 _resource_data: dict 1128 1129 def __init__(self, project_id, resource_data): 1130 super().__init__(project_id=project_id) 1131 self._resource_data = resource_data 1132 1133 @property 1134 def full_path(self) -> str: 1135 result = re.match(r'https://www.googleapis.com/compute/v1/(.*)', 1136 self.self_link) 1137 if result: 1138 return result.group(1) 1139 else: 1140 return f'>> {self.self_link}' 1141 1142 @property 1143 def short_path(self) -> str: 1144 path = self.project_id + '/' + self.name 1145 return path 1146 1147 @property 1148 def name(self) -> str: 1149 return self._resource_data['name'] 1150 1151 @property 1152 def self_link(self) -> str: 1153 return self._resource_data.get('selfLink', '') 1154 1155 @property 1156 def subnetwork(self) -> str: 1157 return self._resource_data['subnetwork'] 1158 1159 @property 1160 def status(self) -> str: 1161 return self._resource_data['status']
IP Addresses.
1133 @property 1134 def full_path(self) -> str: 1135 result = re.match(r'https://www.googleapis.com/compute/v1/(.*)', 1136 self.self_link) 1137 if result: 1138 return result.group(1) 1139 else: 1140 return f'>> {self.self_link}'
Returns the full path of this resource.
Example: 'projects/gcpdiag-gke-1-9b90/zones/europe-west4-a/clusters/gke1'
1142 @property 1143 def short_path(self) -> str: 1144 path = self.project_id + '/' + self.name 1145 return path
Returns the short name for this resource.
Note that it isn't clear from this name what kind of resource it is.
Example: 'gke1'
Inherited Members
- gcpdiag.models.Resource
- project_id
1164@caching.cached_api_call(in_memory=True) 1165def get_addresses(project_id: str) -> List[Address]: 1166 logging.info('fetching addresses list: %s', project_id) 1167 compute = apis.get_api('compute', 'v1', project_id) 1168 addresses = [] 1169 request = compute.addresses().aggregatedList(project=project_id) 1170 response = request.execute(num_retries=config.API_RETRIES) 1171 addresses_by_regions = response['items'] 1172 for _, data_ in addresses_by_regions.items(): 1173 if 'addresses' not in data_: 1174 continue 1175 addresses.extend( 1176 [Address(project_id, address) for address in data_['addresses']]) 1177 return addresses