gcpdiag.queries.networkmanagement

Queries related networkmanagement API.
IPv4AddrOrIPv6Addr = typing.Union[ipaddress.IPv4Address, ipaddress.IPv6Address]
IPv4NetOrIPv6Net = typing.Union[ipaddress.IPv4Network, ipaddress.IPv6Network]
IPAddrOrNet = typing.Union[ipaddress.IPv4Address, ipaddress.IPv6Address, ipaddress.IPv4Network, ipaddress.IPv6Network]
@caching.cached_api_call(in_memory=False)
def run_connectivity_test( project_id: str, src_ip: str, dest_ip: str, dest_port: int, protocol: str):
32@caching.cached_api_call(in_memory=False)
33def run_connectivity_test(project_id: str, src_ip: str, dest_ip: str,
34                          dest_port: int, protocol: str):
35  """Method to create/run an idempotent connectivity test"""
36  test_id = f'gcpdiag-connectivity-test-{uuid.uuid4()}'
37  # initialize the networkmanagement api
38  networkmanagement = apis.get_api('networkmanagement', 'v1', project_id)
39
40  # test input
41  test_input = {
42      'source': {
43          'ipAddress': src_ip,
44          'networkType': 'GCP_NETWORK'
45      },
46      'destination': {
47          'ipAddress': dest_ip,
48          'port': dest_port
49      },
50      'protocol': protocol
51  }
52
53  create_request = (networkmanagement.projects().locations().global_(
54  ).connectivityTests().create(parent=f'projects/{project_id}/locations/global',
55                               testId=test_id,
56                               body=test_input)).execute()
57  logging.info('Running a new connectivity test..')
58
59  # Wait a max of 60 seconds to fetch the request_status.
60  count = 0
61  create_status = networkmanagement.projects().locations().global_().operations(
62  ).get(name=create_request['name']).execute()
63  while not create_status['done'] and count <= 15:
64    time.sleep(4)
65    create_status = networkmanagement.projects().locations().global_(
66    ).operations().get(name=create_request['name']).execute()
67    count += 1
68
69  if create_status['done']:
70    # get the result of the connectivity test
71    res = (networkmanagement.projects().locations().global_().connectivityTests(
72    ).get(
73        name=
74        f'projects/{project_id}/locations/global/connectivityTests/{test_id}'))
75    result = res.execute()
76
77    return result
78  else:
79    logging.warning('Timeout running the connectivity test...')
80  return None

Method to create/run an idempotent connectivity test