gcpdiag.queries.networkmanagement
Queries related networkmanagement API.
IPv4AddrOrIPv6Addr =
typing.Union[ipaddress.IPv4Address, ipaddress.IPv6Address]
IPv4NetOrIPv6Net =
typing.Union[ipaddress.IPv4Network, ipaddress.IPv6Network]
IPAddrOrNet =
typing.Union[ipaddress.IPv4Address, ipaddress.IPv6Address, ipaddress.IPv4Network, ipaddress.IPv6Network]
@caching.cached_api_call(in_memory=False)
def
run_connectivity_test( project_id: str, src_ip: str, dest_ip: str, dest_port: int, protocol: str):
32@caching.cached_api_call(in_memory=False) 33def run_connectivity_test(project_id: str, src_ip: str, dest_ip: str, 34 dest_port: int, protocol: str): 35 """Method to create/run an idempotent connectivity test""" 36 test_id = f'gcpdiag-connectivity-test-{uuid.uuid4()}' 37 # initialize the networkmanagement api 38 networkmanagement = apis.get_api('networkmanagement', 'v1', project_id) 39 40 # test input 41 test_input = { 42 'source': { 43 'ipAddress': src_ip, 44 'networkType': 'GCP_NETWORK' 45 }, 46 'destination': { 47 'ipAddress': dest_ip, 48 'port': dest_port 49 }, 50 'protocol': protocol 51 } 52 53 create_request = (networkmanagement.projects().locations().global_( 54 ).connectivityTests().create(parent=f'projects/{project_id}/locations/global', 55 testId=test_id, 56 body=test_input)).execute() 57 logging.info('Running a new connectivity test..') 58 59 # Wait a max of 60 seconds to fetch the request_status. 60 count = 0 61 create_status = networkmanagement.projects().locations().global_().operations( 62 ).get(name=create_request['name']).execute() 63 while not create_status['done'] and count <= 15: 64 time.sleep(4) 65 create_status = networkmanagement.projects().locations().global_( 66 ).operations().get(name=create_request['name']).execute() 67 count += 1 68 69 if create_status['done']: 70 # get the result of the connectivity test 71 res = (networkmanagement.projects().locations().global_().connectivityTests( 72 ).get( 73 name= 74 f'projects/{project_id}/locations/global/connectivityTests/{test_id}')) 75 result = res.execute() 76 77 return result 78 else: 79 logging.warning('Timeout running the connectivity test...') 80 return None
Method to create/run an idempotent connectivity test