gcpdiag.queries.gcs
Queries related to GCP Cloud Storage
@dataclasses.dataclass(frozen=True)
class
RetentionPolicy:
33@dataclasses.dataclass(frozen=True) 34class RetentionPolicy: 35 """Bucket's retention policy.""" 36 retention_period: int
Bucket's retention policy.
class
RetentionPolicyBuilder:
39class RetentionPolicyBuilder: 40 """Builds Bucket's retention policy from dict representation.""" 41 42 def __init__(self, retention_policy): 43 self._retention_policy = retention_policy 44 45 def build(self) -> RetentionPolicy: 46 return RetentionPolicy(retention_period=self._get_retention_period()) 47 48 def _get_retention_period(self) -> int: 49 try: 50 return int(self._retention_policy['retentionPeriod']) 51 except (KeyError, ValueError): 52 return 0
Builds Bucket's retention policy from dict representation.
class
Bucket(gcpdiag.models.Resource):
55class Bucket(models.Resource): 56 """Represents a GCS Bucket.""" 57 _resource_data: dict 58 59 def __init__(self, project_id, resource_data): 60 super().__init__(project_id=project_id) 61 self._resource_data = resource_data 62 self._metadata_dict = None 63 64 @property 65 def id(self) -> str: 66 return self._resource_data['id'] 67 68 @property 69 def name(self) -> str: 70 return self._resource_data['name'] 71 72 def is_uniform_access(self) -> bool: 73 return get_path(self._resource_data, 74 ('iamConfiguration', 'uniformBucketLevelAccess', 'enabled'), 75 default=False) 76 77 @property 78 def full_path(self) -> str: 79 result = re.match(r'https://www.googleapis.com/storage/v1/(.*)', 80 self._resource_data['selfLink']) 81 if result: 82 return result.group(1) 83 else: 84 return '>> ' + self._resource_data['selfLink'] 85 86 @property 87 def short_path(self) -> str: 88 return self.name 89 90 @property 91 def labels(self) -> dict: 92 return self._resource_data.get('labels', {}) 93 94 @property 95 def retention_policy(self) -> RetentionPolicy: 96 return RetentionPolicyBuilder(self._resource_data.get( 97 'retentionPolicy', {})).build()
Represents a GCS Bucket.
full_path: str
77 @property 78 def full_path(self) -> str: 79 result = re.match(r'https://www.googleapis.com/storage/v1/(.*)', 80 self._resource_data['selfLink']) 81 if result: 82 return result.group(1) 83 else: 84 return '>> ' + self._resource_data['selfLink']
Returns the full path of this resource.
Example: 'projects/gcpdiag-gke-1-9b90/zones/europe-west4-a/clusters/gke1'
short_path: str
Returns the short name for this resource.
Note that it isn't clear from this name what kind of resource it is.
Example: 'gke1'
retention_policy: RetentionPolicy
100class BucketIAMPolicy(iam.BaseIAMPolicy): 101 102 def _is_resource_permission(self, permission): 103 return True
Common class for IAM policies
@caching.cached_api_call(in_memory=True)
def
get_bucket_iam_policy(project_id: str, bucket: str) -> BucketIAMPolicy:
106@caching.cached_api_call(in_memory=True) 107def get_bucket_iam_policy(project_id: str, bucket: str) -> BucketIAMPolicy: 108 gcs_api = apis.get_api('storage', 'v1', project_id) 109 request = gcs_api.buckets().getIamPolicy(bucket=bucket) 110 111 return iam.fetch_iam_policy(request, BucketIAMPolicy, project_id, bucket)
@caching.cached_api_call(in_memory=True)
def
get_bucket( context: gcpdiag.models.Context, bucket: str) -> Bucket:
114@caching.cached_api_call(in_memory=True) 115def get_bucket(context: models.Context, bucket: str) -> Bucket: 116 gcs_api = apis.get_api('storage', 'v1', context.project_id) 117 logging.info('fetching GCS bucket %s', bucket) 118 query = gcs_api.buckets().get(bucket=bucket) 119 try: 120 response = query.execute(num_retries=config.API_RETRIES) 121 except googleapiclient.errors.HttpError as err: 122 print(err) 123 raise utils.GcpApiError(err) from err 124 print(response) 125 # Resource data only provides us project number. 126 # We don't know project id at this point. 127 return Bucket(project_id=None, resource_data=response)
@caching.cached_api_call(in_memory=True)
def
get_buckets( context: gcpdiag.models.Context) -> Mapping[str, Bucket]:
130@caching.cached_api_call(in_memory=True) 131def get_buckets(context: models.Context) -> Mapping[str, Bucket]: 132 buckets: Dict[str, Bucket] = {} 133 if not apis.is_enabled(context.project_id, 'storage'): 134 return buckets 135 gcs_api = apis.get_api('storage', 'v1', context.project_id) 136 logging.info('fetching list of GCS buckets in project %s', context.project_id) 137 query = gcs_api.buckets().list(project=context.project_id) 138 try: 139 resp = query.execute(num_retries=config.API_RETRIES) 140 if 'items' not in resp: 141 return buckets 142 for b in resp['items']: 143 # verify that we have some minimal data that we expect 144 if 'id' not in b: 145 raise RuntimeError('missing data in bucket response') 146 # Does not support matching for location for buckets 147 # names are globally unique and should suffice 148 if not context.match_project_resource( 149 resource=b.get('name'), 150 labels=b.get('labels', {}), 151 ): 152 continue 153 154 buckets[b['name']] = Bucket(project_id=context.project_id, 155 resource_data=b) 156 except googleapiclient.errors.HttpError as err: 157 raise utils.GcpApiError(err) from err 158 return buckets