gcpdiag.queries.vpn
Queries related to VPN tunnel.
class
Vpn(gcpdiag.models.Resource):
27class Vpn(models.Resource): 28 """ 29 Represents a Vpn Tunnel. 30 https://cloud.google.com/compute/docs/reference/rest/v1/vpnTunnels 31 """ 32 _resource_data: dict 33 34 def __init__(self, project_id, resource_data): 35 super().__init__(project_id=project_id) 36 self._resource_data = resource_data 37 38 @property 39 def name(self) -> str: 40 return self._resource_data['name'] 41 42 @property 43 def under_maintenance(self) -> bool: 44 return self._resource_data.get('status') == 'UNDER_MAINTENANCE' 45 46 @property 47 def peer_ip(self) -> str: 48 return self._resource_data['peerIp'] 49 50 @property 51 def status(self) -> str: 52 return self._resource_data['status'] 53 54 @property 55 def router(self) -> str: 56 return self._resource_data['router'] 57 58 @property 59 def id(self) -> str: 60 return self._resource_data['id'] 61 62 @property 63 def local_traffic_selector(self) -> List[str]: 64 return self._resource_data.get('localTrafficSelector', []) 65 66 @property 67 def remote_traffic_selector(self) -> List[str]: 68 return self._resource_data.get('remoteTrafficSelector', []) 69 70 @property 71 def self_link(self) -> str: 72 return self._resource_data['selfLink'] 73 74 @property 75 def full_path(self) -> str: 76 result = re.match(r'https://www.googleapis.com/compute/v1/(.*)', 77 self.self_link) 78 if result: 79 return result.group(1) 80 else: 81 return f'>> {self.self_link}' 82 83 @property 84 def short_path(self) -> str: 85 path = self.project_id + '/' + self.name 86 return path
Represents a Vpn Tunnel. https://cloud.google.com/compute/docs/reference/rest/v1/vpnTunnels
full_path: str
74 @property 75 def full_path(self) -> str: 76 result = re.match(r'https://www.googleapis.com/compute/v1/(.*)', 77 self.self_link) 78 if result: 79 return result.group(1) 80 else: 81 return f'>> {self.self_link}'
Returns the full path of this resource.
Example: 'projects/gcpdiag-gke-1-9b90/zones/europe-west4-a/clusters/gke1'
@caching.cached_api_call(in_memory=True)
def
get_vpn(project_id: str, vpn_name: str, region: str) -> Vpn:
89@caching.cached_api_call(in_memory=True) 90def get_vpn(project_id: str, vpn_name: str, region: str) -> Vpn: 91 logging.debug('fetching VPN: %s', vpn_name) 92 compute = apis.get_api('compute', 'v1', project_id) 93 94 request = compute.vpnTunnels().get(project=project_id, 95 vpnTunnel=vpn_name, 96 region=region) 97 try: 98 response = request.execute(num_retries=config.API_RETRIES) 99 except googleapiclient_errors.HttpError as err: 100 raise utils.GcpApiError(err) 101 return Vpn(project_id, response)