gcpdiag.queries.dataproc

Queries related to Dataproc.
class Cluster(gcpdiag.models.Resource):
 25class Cluster(models.Resource):
 26  """Represents Dataproc Cluster"""
 27
 28  name: str
 29  _resource_data: Mapping
 30
 31  def __init__(self, name: str, project_id: str, resource_data: Mapping):
 32    super().__init__(project_id)
 33    self.name = name
 34    self._resource_data = resource_data
 35
 36  def is_running(self) -> bool:
 37    return self.status == 'RUNNING'
 38
 39  def get_software_property(self, property_name) -> str:
 40    return self._resource_data['config']['softwareConfig']['properties'].get(
 41        property_name)
 42
 43  def is_stackdriver_logging_enabled(self) -> bool:
 44    # Unless overridden during create, properties with default values are not returned,
 45    # therefore get_software_property should only return when its false
 46    return (not self.get_software_property(
 47        'dataproc:dataproc.logging.stackdriver.enable') == 'false')
 48
 49  def is_stackdriver_monitoring_enabled(self) -> bool:
 50    return (self.get_software_property(
 51        'dataproc:dataproc.monitoring.stackdriver.enable') == 'true')
 52
 53  @property
 54  def region(self) -> str:
 55    """biggest regions have a trailing '-d' at most in its zoneUri
 56
 57    https://www.googleapis.com/compute/v1/projects/dataproc1/zones/us-central1-d
 58    """
 59    return self._resource_data['config']['gceClusterConfig']['zoneUri'].split(
 60        '/')[-1][0:-2]
 61
 62  @property
 63  def zone(self) -> Optional[str]:
 64    zone = (self._resource_data.get('config', {}).get('gceClusterConfig',
 65                                                      {}).get('zoneUri'))
 66    if zone:
 67      m = re.search(r'/zones/([^/]+)$', zone)
 68      if m:
 69        return m.group(1)
 70    raise RuntimeError(f"can't determine zone for cluster {self.name}")
 71
 72  @property
 73  def full_path(self) -> str:
 74    return (
 75        f'projects/{self.project_id}/regions/{self.region}/clusters/{self.name}'
 76    )
 77
 78  @property
 79  def short_path(self) -> str:
 80    return f'{self.project_id}/{self.region}/{self.name}'
 81
 82  @property
 83  def status(self) -> str:
 84    return self._resource_data['status']['state']
 85
 86  def __str__(self) -> str:
 87    return self.short_path
 88
 89  @property
 90  def image_version(self):
 91    return self._resource_data['config']['softwareConfig']['imageVersion']
 92
 93  @property
 94  def vm_service_account_email(self):
 95    sa = self._resource_data['config']['gceClusterConfig'].get('serviceAccount')
 96    if sa is None:
 97      sa = crm.get_project(self.project_id).default_compute_service_account
 98    return sa
 99
100  @property
101  def is_gce_cluster(self) -> bool:
102    return bool(self._resource_data.get('config', {}).get('gceClusterConfig'))
103
104  @property
105  def gce_network_uri(self) -> Optional[str]:
106    """Get network uri from cluster network or subnetwork"""
107    if not self.is_gce_cluster:
108      raise RuntimeError(
109          'Can not return network URI for a Dataproc on GKE cluster')
110    network_uri = (self._resource_data.get('config',
111                                           {}).get('gceClusterConfig',
112                                                   {}).get('networkUri'))
113    if not network_uri:
114      subnetwork_uri = (self._resource_data.get('config', {}).get(
115          'gceClusterConfig', {}).get('subnetworkUri'))
116      network_uri = network.get_subnetwork_from_url(subnetwork_uri).network
117    return network_uri
118
119  @property
120  def is_single_node_cluster(self) -> bool:
121    workers = (self._resource_data.get('config',
122                                       {}).get('workerConfig',
123                                               {}).get('numInstances', 0))
124    return workers == 0
125
126  @property
127  def is_ha_cluster(self) -> bool:
128    masters = (self._resource_data.get('config',
129                                       {}).get('masterConfig',
130                                               {}).get('numInstances', 1))
131    return masters != 1

Represents Dataproc Cluster

Cluster(name: str, project_id: str, resource_data: Mapping)
31  def __init__(self, name: str, project_id: str, resource_data: Mapping):
32    super().__init__(project_id)
33    self.name = name
34    self._resource_data = resource_data
name: str
def is_running(self) -> bool:
36  def is_running(self) -> bool:
37    return self.status == 'RUNNING'
def get_software_property(self, property_name) -> str:
39  def get_software_property(self, property_name) -> str:
40    return self._resource_data['config']['softwareConfig']['properties'].get(
41        property_name)
def is_stackdriver_logging_enabled(self) -> bool:
43  def is_stackdriver_logging_enabled(self) -> bool:
44    # Unless overridden during create, properties with default values are not returned,
45    # therefore get_software_property should only return when its false
46    return (not self.get_software_property(
47        'dataproc:dataproc.logging.stackdriver.enable') == 'false')
def is_stackdriver_monitoring_enabled(self) -> bool:
49  def is_stackdriver_monitoring_enabled(self) -> bool:
50    return (self.get_software_property(
51        'dataproc:dataproc.monitoring.stackdriver.enable') == 'true')
region: str
53  @property
54  def region(self) -> str:
55    """biggest regions have a trailing '-d' at most in its zoneUri
56
57    https://www.googleapis.com/compute/v1/projects/dataproc1/zones/us-central1-d
58    """
59    return self._resource_data['config']['gceClusterConfig']['zoneUri'].split(
60        '/')[-1][0:-2]

biggest regions have a trailing '-d' at most in its zoneUri

https://www.googleapis.com/compute/v1/projects/dataproc1/zones/us-central1-d

zone: Optional[str]
62  @property
63  def zone(self) -> Optional[str]:
64    zone = (self._resource_data.get('config', {}).get('gceClusterConfig',
65                                                      {}).get('zoneUri'))
66    if zone:
67      m = re.search(r'/zones/([^/]+)$', zone)
68      if m:
69        return m.group(1)
70    raise RuntimeError(f"can't determine zone for cluster {self.name}")
full_path: str
72  @property
73  def full_path(self) -> str:
74    return (
75        f'projects/{self.project_id}/regions/{self.region}/clusters/{self.name}'
76    )

Returns the full path of this resource.

Example: 'projects/gcpdiag-gke-1-9b90/zones/europe-west4-a/clusters/gke1'

short_path: str
78  @property
79  def short_path(self) -> str:
80    return f'{self.project_id}/{self.region}/{self.name}'

Returns the short name for this resource.

Note that it isn't clear from this name what kind of resource it is.

Example: 'gke1'

status: str
82  @property
83  def status(self) -> str:
84    return self._resource_data['status']['state']
image_version
89  @property
90  def image_version(self):
91    return self._resource_data['config']['softwareConfig']['imageVersion']
vm_service_account_email
93  @property
94  def vm_service_account_email(self):
95    sa = self._resource_data['config']['gceClusterConfig'].get('serviceAccount')
96    if sa is None:
97      sa = crm.get_project(self.project_id).default_compute_service_account
98    return sa
is_gce_cluster: bool
100  @property
101  def is_gce_cluster(self) -> bool:
102    return bool(self._resource_data.get('config', {}).get('gceClusterConfig'))
gce_network_uri: Optional[str]
104  @property
105  def gce_network_uri(self) -> Optional[str]:
106    """Get network uri from cluster network or subnetwork"""
107    if not self.is_gce_cluster:
108      raise RuntimeError(
109          'Can not return network URI for a Dataproc on GKE cluster')
110    network_uri = (self._resource_data.get('config',
111                                           {}).get('gceClusterConfig',
112                                                   {}).get('networkUri'))
113    if not network_uri:
114      subnetwork_uri = (self._resource_data.get('config', {}).get(
115          'gceClusterConfig', {}).get('subnetworkUri'))
116      network_uri = network.get_subnetwork_from_url(subnetwork_uri).network
117    return network_uri

Get network uri from cluster network or subnetwork

is_single_node_cluster: bool
119  @property
120  def is_single_node_cluster(self) -> bool:
121    workers = (self._resource_data.get('config',
122                                       {}).get('workerConfig',
123                                               {}).get('numInstances', 0))
124    return workers == 0
is_ha_cluster: bool
126  @property
127  def is_ha_cluster(self) -> bool:
128    masters = (self._resource_data.get('config',
129                                       {}).get('masterConfig',
130                                               {}).get('numInstances', 1))
131    return masters != 1
Inherited Members
gcpdiag.models.Resource
project_id
class Region:
134class Region:
135  """Represents Dataproc region"""
136
137  project_id: str
138  region: str
139
140  def __init__(self, project_id: str, region: str):
141    self.project_id = project_id
142    self.region = region
143
144  def get_clusters(self, context: models.Context) -> Iterable[Cluster]:
145    clusters = []
146    for cluster in self.query_api():
147      if not context.match_project_resource(resource=cluster.get('clusterName'),
148                                            labels=cluster.get('labels', {})):
149        continue
150      c = Cluster(
151          name=cluster['clusterName'],
152          project_id=self.project_id,
153          resource_data=cluster,
154      )
155      clusters.append(c)
156    return clusters
157
158  def query_api(self) -> Iterable[dict]:
159    api = apis.get_api('dataproc', 'v1', self.project_id)
160    query = (api.projects().regions().clusters().list(projectId=self.project_id,
161                                                      region=self.region))
162    # be careful not to retr too many times because querying all regions
163    # sometimes causes requests to fail permanently
164    resp = query.execute(num_retries=1)
165    return resp.get('clusters', [])

Represents Dataproc region

Region(project_id: str, region: str)
140  def __init__(self, project_id: str, region: str):
141    self.project_id = project_id
142    self.region = region
project_id: str
region: str
def get_clusters( self, context: gcpdiag.models.Context) -> Iterable[Cluster]:
144  def get_clusters(self, context: models.Context) -> Iterable[Cluster]:
145    clusters = []
146    for cluster in self.query_api():
147      if not context.match_project_resource(resource=cluster.get('clusterName'),
148                                            labels=cluster.get('labels', {})):
149        continue
150      c = Cluster(
151          name=cluster['clusterName'],
152          project_id=self.project_id,
153          resource_data=cluster,
154      )
155      clusters.append(c)
156    return clusters
def query_api(self) -> Iterable[dict]:
158  def query_api(self) -> Iterable[dict]:
159    api = apis.get_api('dataproc', 'v1', self.project_id)
160    query = (api.projects().regions().clusters().list(projectId=self.project_id,
161                                                      region=self.region))
162    # be careful not to retr too many times because querying all regions
163    # sometimes causes requests to fail permanently
164    resp = query.execute(num_retries=1)
165    return resp.get('clusters', [])
class Dataproc:
168class Dataproc:
169  """Represents Dataproc product"""
170
171  project_id: str
172
173  def __init__(self, project_id: str):
174    self.project_id = project_id
175
176  def get_regions(self) -> Iterable[Region]:
177    return [
178        Region(self.project_id, r.name)
179        for r in gce.get_all_regions(self.project_id)
180    ]
181
182  def is_api_enabled(self) -> bool:
183    return apis.is_enabled(self.project_id, 'dataproc')

Represents Dataproc product

Dataproc(project_id: str)
173  def __init__(self, project_id: str):
174    self.project_id = project_id
project_id: str
def get_regions(self) -> Iterable[Region]:
176  def get_regions(self) -> Iterable[Region]:
177    return [
178        Region(self.project_id, r.name)
179        for r in gce.get_all_regions(self.project_id)
180    ]
def is_api_enabled(self) -> bool:
182  def is_api_enabled(self) -> bool:
183    return apis.is_enabled(self.project_id, 'dataproc')
@caching.cached_api_call
def get_clusters( context: gcpdiag.models.Context) -> Iterable[Cluster]:
186@caching.cached_api_call
187def get_clusters(context: models.Context) -> Iterable[Cluster]:
188  r: List[Cluster] = []
189  dataproc = Dataproc(context.project_id)
190  if not dataproc.is_api_enabled():
191    return r
192  executor = get_executor()
193  for clusters in executor.map(lambda r: r.get_clusters(context),
194                               dataproc.get_regions()):
195    r += clusters
196  return r