gcpdiag.queries.networkmanagement

Queries related networkmanagement API.
IPv4AddrOrIPv6Addr = typing.Union[ipaddress.IPv4Address, ipaddress.IPv6Address]
IPv4NetOrIPv6Net = typing.Union[ipaddress.IPv4Network, ipaddress.IPv6Network]
IPAddrOrNet = typing.Union[ipaddress.IPv4Address, ipaddress.IPv6Address, ipaddress.IPv4Network, ipaddress.IPv6Network]
@caching.cached_api_call(in_memory=False)
def run_connectivity_test( project_id: str, src_ip: str, dest_ip: str, dest_port: int, protocol: str):
31@caching.cached_api_call(in_memory=False)
32def run_connectivity_test(project_id: str, src_ip: str, dest_ip: str,
33                          dest_port: int, protocol: str):
34  """Method to create/run an idempotent connectivity test"""
35  test_id = f'gcpdiag-connectivity-test-{uuid.uuid4()}'
36  # initialize the networkmanagement api
37  networkmanagement = apis.get_api('networkmanagement', 'v1', project_id)
38
39  # test input
40  test_input = {
41      'source': {
42          'ipAddress': src_ip,
43          'networkType': 'GCP_NETWORK'
44      },
45      'destination': {
46          'ipAddress': dest_ip,
47          'port': dest_port
48      },
49      'protocol': protocol
50  }
51
52  create_request = (networkmanagement.projects().locations().global_(
53  ).connectivityTests().create(parent=f'projects/{project_id}/locations/global',
54                               testId=test_id,
55                               body=test_input)).execute()
56  logging.info('Running a new connectivity test..')
57
58  # Wait a max of 60 seconds to fetch the request_status.
59  count = 0
60  create_status = networkmanagement.projects().locations().global_().operations(
61  ).get(name=create_request['name']).execute()
62  while not create_status['done'] and count <= 15:
63    time.sleep(4)
64    create_status = networkmanagement.projects().locations().global_(
65    ).operations().get(name=create_request['name']).execute()
66    count += 1
67
68  if create_status['done']:
69    # get the result of the connectivity test
70    res = (networkmanagement.projects().locations().global_().connectivityTests(
71    ).get(
72        name=
73        f'projects/{project_id}/locations/global/connectivityTests/{test_id}'))
74    result = res.execute()
75
76    return result
77  else:
78    logging.warning('Timeout running the connectivity test...')
79  return None

Method to create/run an idempotent connectivity test