gcpdiag.queries.dataproc
Queries related to Dataproc.
class
Cluster(gcpdiag.models.Resource):
25class Cluster(models.Resource): 26 """Represents Dataproc Cluster""" 27 28 name: str 29 _resource_data: Mapping 30 31 def __init__(self, name: str, project_id: str, resource_data: Mapping): 32 super().__init__(project_id) 33 self.name = name 34 self._resource_data = resource_data 35 36 def is_running(self) -> bool: 37 return self.status == 'RUNNING' 38 39 def get_software_property(self, property_name) -> str: 40 return self._resource_data['config']['softwareConfig']['properties'].get( 41 property_name) 42 43 def is_stackdriver_logging_enabled(self) -> bool: 44 # Unless overridden during create, properties with default values are not returned, 45 # therefore get_software_property should only return when its false 46 return (not self.get_software_property( 47 'dataproc:dataproc.logging.stackdriver.enable') == 'false') 48 49 def is_stackdriver_monitoring_enabled(self) -> bool: 50 return (self.get_software_property( 51 'dataproc:dataproc.monitoring.stackdriver.enable') == 'true') 52 53 @property 54 def region(self) -> str: 55 """biggest regions have a trailing '-d' at most in its zoneUri 56 57 https://www.googleapis.com/compute/v1/projects/dataproc1/zones/us-central1-d 58 """ 59 return self._resource_data['config']['gceClusterConfig']['zoneUri'].split( 60 '/')[-1][0:-2] 61 62 @property 63 def zone(self) -> Optional[str]: 64 zone = (self._resource_data.get('config', {}).get('gceClusterConfig', 65 {}).get('zoneUri')) 66 if zone: 67 m = re.search(r'/zones/([^/]+)$', zone) 68 if m: 69 return m.group(1) 70 raise RuntimeError(f"can't determine zone for cluster {self.name}") 71 72 @property 73 def full_path(self) -> str: 74 return ( 75 f'projects/{self.project_id}/regions/{self.region}/clusters/{self.name}' 76 ) 77 78 @property 79 def short_path(self) -> str: 80 return f'{self.project_id}/{self.region}/{self.name}' 81 82 @property 83 def status(self) -> str: 84 return self._resource_data['status']['state'] 85 86 def __str__(self) -> str: 87 return self.short_path 88 89 @property 90 def image_version(self): 91 return self._resource_data['config']['softwareConfig']['imageVersion'] 92 93 @property 94 def vm_service_account_email(self): 95 sa = self._resource_data['config']['gceClusterConfig'].get('serviceAccount') 96 if sa is None: 97 sa = crm.get_project(self.project_id).default_compute_service_account 98 return sa 99 100 @property 101 def is_gce_cluster(self) -> bool: 102 return bool(self._resource_data.get('config', {}).get('gceClusterConfig')) 103 104 @property 105 def gce_network_uri(self) -> Optional[str]: 106 """Get network uri from cluster network or subnetwork""" 107 if not self.is_gce_cluster: 108 raise RuntimeError( 109 'Can not return network URI for a Dataproc on GKE cluster') 110 network_uri = (self._resource_data.get('config', 111 {}).get('gceClusterConfig', 112 {}).get('networkUri')) 113 if not network_uri: 114 subnetwork_uri = (self._resource_data.get('config', {}).get( 115 'gceClusterConfig', {}).get('subnetworkUri')) 116 network_uri = network.get_subnetwork_from_url(subnetwork_uri).network 117 return network_uri 118 119 @property 120 def is_single_node_cluster(self) -> bool: 121 workers = (self._resource_data.get('config', 122 {}).get('workerConfig', 123 {}).get('numInstances', 0)) 124 return workers == 0 125 126 @property 127 def is_ha_cluster(self) -> bool: 128 masters = (self._resource_data.get('config', 129 {}).get('masterConfig', 130 {}).get('numInstances', 1)) 131 return masters != 1
Represents Dataproc Cluster
def
is_stackdriver_logging_enabled(self) -> bool:
43 def is_stackdriver_logging_enabled(self) -> bool: 44 # Unless overridden during create, properties with default values are not returned, 45 # therefore get_software_property should only return when its false 46 return (not self.get_software_property( 47 'dataproc:dataproc.logging.stackdriver.enable') == 'false')
region: str
53 @property 54 def region(self) -> str: 55 """biggest regions have a trailing '-d' at most in its zoneUri 56 57 https://www.googleapis.com/compute/v1/projects/dataproc1/zones/us-central1-d 58 """ 59 return self._resource_data['config']['gceClusterConfig']['zoneUri'].split( 60 '/')[-1][0:-2]
biggest regions have a trailing '-d' at most in its zoneUri
https://www.googleapis.com/compute/v1/projects/dataproc1/zones/us-central1-d
zone: Optional[str]
62 @property 63 def zone(self) -> Optional[str]: 64 zone = (self._resource_data.get('config', {}).get('gceClusterConfig', 65 {}).get('zoneUri')) 66 if zone: 67 m = re.search(r'/zones/([^/]+)$', zone) 68 if m: 69 return m.group(1) 70 raise RuntimeError(f"can't determine zone for cluster {self.name}")
full_path: str
72 @property 73 def full_path(self) -> str: 74 return ( 75 f'projects/{self.project_id}/regions/{self.region}/clusters/{self.name}' 76 )
Returns the full path of this resource.
Example: 'projects/gcpdiag-gke-1-9b90/zones/europe-west4-a/clusters/gke1'
short_path: str
78 @property 79 def short_path(self) -> str: 80 return f'{self.project_id}/{self.region}/{self.name}'
Returns the short name for this resource.
Note that it isn't clear from this name what kind of resource it is.
Example: 'gke1'
gce_network_uri: Optional[str]
104 @property 105 def gce_network_uri(self) -> Optional[str]: 106 """Get network uri from cluster network or subnetwork""" 107 if not self.is_gce_cluster: 108 raise RuntimeError( 109 'Can not return network URI for a Dataproc on GKE cluster') 110 network_uri = (self._resource_data.get('config', 111 {}).get('gceClusterConfig', 112 {}).get('networkUri')) 113 if not network_uri: 114 subnetwork_uri = (self._resource_data.get('config', {}).get( 115 'gceClusterConfig', {}).get('subnetworkUri')) 116 network_uri = network.get_subnetwork_from_url(subnetwork_uri).network 117 return network_uri
Get network uri from cluster network or subnetwork
Inherited Members
- gcpdiag.models.Resource
- project_id
class
Region:
134class Region: 135 """Represents Dataproc region""" 136 137 project_id: str 138 region: str 139 140 def __init__(self, project_id: str, region: str): 141 self.project_id = project_id 142 self.region = region 143 144 def get_clusters(self, context: models.Context) -> Iterable[Cluster]: 145 clusters = [] 146 for cluster in self.query_api(): 147 if not context.match_project_resource(resource=cluster.get('clusterName'), 148 labels=cluster.get('labels', {})): 149 continue 150 c = Cluster( 151 name=cluster['clusterName'], 152 project_id=self.project_id, 153 resource_data=cluster, 154 ) 155 clusters.append(c) 156 return clusters 157 158 def query_api(self) -> Iterable[dict]: 159 api = apis.get_api('dataproc', 'v1', self.project_id) 160 query = (api.projects().regions().clusters().list(projectId=self.project_id, 161 region=self.region)) 162 # be careful not to retr too many times because querying all regions 163 # sometimes causes requests to fail permanently 164 resp = query.execute(num_retries=1) 165 return resp.get('clusters', [])
Represents Dataproc region
144 def get_clusters(self, context: models.Context) -> Iterable[Cluster]: 145 clusters = [] 146 for cluster in self.query_api(): 147 if not context.match_project_resource(resource=cluster.get('clusterName'), 148 labels=cluster.get('labels', {})): 149 continue 150 c = Cluster( 151 name=cluster['clusterName'], 152 project_id=self.project_id, 153 resource_data=cluster, 154 ) 155 clusters.append(c) 156 return clusters
def
query_api(self) -> Iterable[dict]:
158 def query_api(self) -> Iterable[dict]: 159 api = apis.get_api('dataproc', 'v1', self.project_id) 160 query = (api.projects().regions().clusters().list(projectId=self.project_id, 161 region=self.region)) 162 # be careful not to retr too many times because querying all regions 163 # sometimes causes requests to fail permanently 164 resp = query.execute(num_retries=1) 165 return resp.get('clusters', [])
class
Dataproc:
168class Dataproc: 169 """Represents Dataproc product""" 170 171 project_id: str 172 173 def __init__(self, project_id: str): 174 self.project_id = project_id 175 176 def get_regions(self) -> Iterable[Region]: 177 return [ 178 Region(self.project_id, r.name) 179 for r in gce.get_all_regions(self.project_id) 180 ] 181 182 def is_api_enabled(self) -> bool: 183 return apis.is_enabled(self.project_id, 'dataproc')
Represents Dataproc product
186@caching.cached_api_call 187def get_clusters(context: models.Context) -> Iterable[Cluster]: 188 r: List[Cluster] = [] 189 dataproc = Dataproc(context.project_id) 190 if not dataproc.is_api_enabled(): 191 return r 192 executor = get_executor() 193 for clusters in executor.map(lambda r: r.get_clusters(context), 194 dataproc.get_regions()): 195 r += clusters 196 return r