gcpdiag.queries.orgpolicy

Queries related to organization policy constraints.
RESOURCE_TYPE_PROJECT = 'projects'
RESOURCE_TYPE_ORGANIZATION = 'organizations'
class PolicyConstraint:
29class PolicyConstraint:
30
31  def __init__(self, name, resource_data):
32    self.name = name
33    self._resource_data = resource_data
34
35  def __str__(self):
36    return self.name + ': ' + self._resource_data.__str__()
37
38  pass
PolicyConstraint(name, resource_data)
31  def __init__(self, name, resource_data):
32    self.name = name
33    self._resource_data = resource_data
name
class BooleanPolicyConstraint(PolicyConstraint):
41class BooleanPolicyConstraint(PolicyConstraint):
42
43  def is_enforced(self) -> bool:
44    return self._resource_data.get('enforced', False)
def is_enforced(self) -> bool:
43  def is_enforced(self) -> bool:
44    return self._resource_data.get('enforced', False)
class ListPolicyConstraint(PolicyConstraint):
47class ListPolicyConstraint(PolicyConstraint):
48
49  def allowed_values(self) -> List[str]:
50    return self._resource_data.get('allowedValues', [])
51
52  def denied_values(self) -> List[str]:
53    return self._resource_data.get('deniedValues', [])
def allowed_values(self) -> List[str]:
49  def allowed_values(self) -> List[str]:
50    return self._resource_data.get('allowedValues', [])
def denied_values(self) -> List[str]:
52  def denied_values(self) -> List[str]:
53    return self._resource_data.get('deniedValues', [])
class RestoreDefaultPolicyConstraint(PolicyConstraint):
56class RestoreDefaultPolicyConstraint(PolicyConstraint):
57
58  def is_default_restored(self) -> bool:
59    """Indicates that the constraintDefault enforcement behavior is restored."""
60    return True
def is_default_restored(self) -> bool:
58  def is_default_restored(self) -> bool:
59    """Indicates that the constraintDefault enforcement behavior is restored."""
60    return True

Indicates that the constraintDefault enforcement behavior is restored.

def get_effective_org_policy(project_id: str, constraint: str):
 79def get_effective_org_policy(project_id: str, constraint: str):
 80  """Get the effective org policy for a project and a given constraint.
 81
 82  This function will first try to get the policy from a cached list of all
 83  policies that are set on the project. If the policy is not found, it will
 84  make a direct API call to get the effective policy for the given constraint.
 85  """
 86  all_constraints = _get_effective_org_policy_all_constraints(project_id)
 87  if constraint in all_constraints:
 88    return all_constraints[constraint]
 89
 90  # If the constraint is not in the list of all policies, it means that
 91  # the policy is not set on the project. In this case, we need to get the
 92  # effective policy directly.
 93  crm_api = apis.get_api('cloudresourcemanager', 'v1', project_id)
 94  try:
 95    req = crm_api.projects().getEffectiveOrgPolicy(
 96        resource=f'projects/{project_id}', body={'constraint': constraint})
 97    result = req.execute(num_retries=config.API_RETRIES)
 98  except googleapiclient.errors.HttpError as err:
 99    raise utils.GcpApiError(err) from err
100
101  if 'booleanPolicy' in result:
102    return BooleanPolicyConstraint(result['constraint'],
103                                   result['booleanPolicy'])
104  elif 'listPolicy' in result:
105    return ListPolicyConstraint(result['constraint'], result['listPolicy'])
106  else:
107    raise ValueError(f'unknown constraint type: {result}')

Get the effective org policy for a project and a given constraint.

This function will first try to get the policy from a cached list of all policies that are set on the project. If the policy is not found, it will make a direct API call to get the effective policy for the given constraint.

@caching.cached_api_call
def get_all_project_org_policies(project_id: str):
110@caching.cached_api_call
111def get_all_project_org_policies(project_id: str):
112  """list all the org policies set for a particular resource.
113
114  Args:
115      project_id: The project ID.
116
117  Returns:
118      A dictionary of PolicyConstraint objects, keyed by constraint name.
119
120  Raises:
121      utils.GcpApiError: on API errors.
122  """
123  crm_api = apis.get_api('cloudresourcemanager', 'v1', project_id)
124  resource = f'projects/{project_id}'
125  all_constraints: Dict[str, PolicyConstraint] = {}
126  logging.debug('listing org policies of %s', project_id)
127
128  request = crm_api.projects().listOrgPolicies(resource=resource)
129
130  while request:
131    try:
132      response = request.execute(num_retries=config.API_RETRIES)
133    except googleapiclient.errors.HttpError as err:
134      raise utils.GcpApiError(err) from err
135
136    policies_list = response.get('policies', [])
137
138    for policy in policies_list:
139      constraint_name = policy.get('constraint')
140
141      if 'booleanPolicy' in policy:
142        all_constraints[constraint_name] = BooleanPolicyConstraint(
143            constraint_name, policy['booleanPolicy'])
144      elif 'listPolicy' in policy:
145        all_constraints[constraint_name] = ListPolicyConstraint(
146            constraint_name, policy['listPolicy'])
147      elif 'restoreDefault' in policy:
148        all_constraints[constraint_name] = RestoreDefaultPolicyConstraint(
149            constraint_name, policy['restoreDefault'])
150      else:
151        logging.warning('unknown constraint type: %s', policy)
152
153    request = crm_api.projects().listOrgPolicies_next(request, response)
154
155  return all_constraints

list all the org policies set for a particular resource.

Arguments:
  • project_id: The project ID.
Returns:

A dictionary of PolicyConstraint objects, keyed by constraint name.

Raises:
  • utils.GcpApiError: on API errors.