gcpdiag.queries.networkmanagement
Queries related networkmanagement API.
IPv4AddrOrIPv6Addr =
typing.Union[ipaddress.IPv4Address, ipaddress.IPv6Address]
IPv4NetOrIPv6Net =
typing.Union[ipaddress.IPv4Network, ipaddress.IPv6Network]
IPAddrOrNet =
typing.Union[ipaddress.IPv4Address, ipaddress.IPv6Address, ipaddress.IPv4Network, ipaddress.IPv6Network]
@caching.cached_api_call(in_memory=False)
def
run_connectivity_test( project_id: str, src_ip: str, dest_ip: str, dest_port: int, protocol: str):
31@caching.cached_api_call(in_memory=False) 32def run_connectivity_test(project_id: str, src_ip: str, dest_ip: str, 33 dest_port: int, protocol: str): 34 """Method to create/run an idempotent connectivity test""" 35 test_id = f'gcpdiag-connectivity-test-{uuid.uuid4()}' 36 # initialize the networkmanagement api 37 networkmanagement = apis.get_api('networkmanagement', 'v1', project_id) 38 39 # test input 40 test_input = { 41 'source': { 42 'ipAddress': src_ip, 43 'networkType': 'GCP_NETWORK' 44 }, 45 'destination': { 46 'ipAddress': dest_ip, 47 'port': dest_port 48 }, 49 'protocol': protocol 50 } 51 52 create_request = (networkmanagement.projects().locations().global_( 53 ).connectivityTests().create(parent=f'projects/{project_id}/locations/global', 54 testId=test_id, 55 body=test_input)).execute() 56 logging.info('Running a new connectivity test..') 57 58 # Wait a max of 60 seconds to fetch the request_status. 59 count = 0 60 create_status = networkmanagement.projects().locations().global_().operations( 61 ).get(name=create_request['name']).execute() 62 while not create_status['done'] and count <= 15: 63 time.sleep(4) 64 create_status = networkmanagement.projects().locations().global_( 65 ).operations().get(name=create_request['name']).execute() 66 count += 1 67 68 if create_status['done']: 69 # get the result of the connectivity test 70 res = (networkmanagement.projects().locations().global_().connectivityTests( 71 ).get( 72 name= 73 f'projects/{project_id}/locations/global/connectivityTests/{test_id}')) 74 result = res.execute() 75 76 return result 77 else: 78 logging.warning('Timeout running the connectivity test...') 79 return None
Method to create/run an idempotent connectivity test