Skip to content

Commit 679c243

Browse files
authored
Merge pull request #114 from seung-lab/heartbeat
Shutdown workers if manager node died Extend memory monitor script to check manager node heartbeat regularly, if the check fails for half an hour, shutdown the google cloud workers.
2 parents 9f447e8 + 065d7b6 commit 679c243

9 files changed

Lines changed: 284 additions & 168 deletions

File tree

cloud/google/common.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ def GenerateAirflowVar(context, hostname_manager):
7979

8080
sqlalchemy_conn = f'''postgresql+psycopg2://{postgres_user}:{postgres_password}@{hostname_manager}/{postgres_db}'''
8181
airflow_variable = {
82-
'AIRFLOW__CORE__HOSTNAME_CALLABLE': 'google_metadata.gce_hostname',
82+
'AIRFLOW__CORE__HOSTNAME_CALLABLE': 'common.google_api.gce_hostname',
8383
'AIRFLOW__DATABASE__SQL_ALCHEMY_CONN': sqlalchemy_conn,
8484
'AIRFLOW__CORE__FERNET_KEY': context.properties['airflow'].get('fernetKey', fernet_key),
8585
'AIRFLOW__CELERY__BROKER_URL': f'amqp://{hostname_manager}',

common/google_api.py

Lines changed: 130 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,130 @@
1+
from googleapiclient import discovery
2+
import requests
3+
import json
4+
5+
def get_project_data(key):
6+
apiurl = f"http://metadata/computeMetadata/v1/project/{key}"
7+
response = requests.get(apiurl, headers={"Metadata-Flavor": "Google"})
8+
response.raise_for_status()
9+
return response.text
10+
11+
def get_instance_data(key):
12+
apiurl = f"http://metadata/computeMetadata/v1/instance/{key}"
13+
response = requests.get(apiurl, headers={"Metadata-Flavor": "Google"})
14+
response.raise_for_status()
15+
return response.text
16+
17+
def get_instance_metadata(project, zone, instance):
18+
service = discovery.build('compute', 'v1')
19+
request = service.instances().get(project=project, zone=zone, instance=instance)
20+
info = request.execute()
21+
return info['metadata']
22+
23+
def set_instance_metadata(project, zone, instance, data):
24+
service = discovery.build('compute', 'v1')
25+
request = service.instances().setMetadata(body=data, project=project, zone=zone, instance=instance)
26+
return request.execute()
27+
28+
def gce_external_ip():
29+
return get_instance_data("network-interfaces/0/access-configs/0/external-ip")
30+
31+
def gce_internal_ip():
32+
return get_instance_data("network-interfaces/0/ip")
33+
34+
def gce_hostname():
35+
return get_instance_data("hostname")
36+
37+
def get_project_id():
38+
apiurl = "http://metadata.google.internal/computeMetadata/v1/project/project-id"
39+
response = requests.get(apiurl, headers={"Metadata-Flavor": "Google"})
40+
response.raise_for_status()
41+
return response.text
42+
43+
def get_zone():
44+
apiurl = "http://metadata.google.internal/computeMetadata/v1/instance/zone"
45+
response = requests.get(apiurl, headers={"Metadata-Flavor": "Google"})
46+
response.raise_for_status()
47+
return response.text.split('/')[-1]
48+
49+
def delete_instances(ig, instances):
50+
project_id = get_project_id()
51+
request_body = {
52+
"instances": instances,
53+
"skipInstancesOnValidationError": True,
54+
}
55+
service = discovery.build('compute', 'v1')
56+
request = service.instanceGroupManagers().deleteInstances(project=project_id, zone=ig["zone"], instanceGroupManager=ig["name"], body=request_body)
57+
return request.execute()
58+
59+
def get_created_by():
60+
apiurl = "http://metadata.google.internal/computeMetadata/v1/instance/attributes/created-by"
61+
response = requests.get(apiurl, headers={"Metadata-Flavor": "Google"})
62+
response.raise_for_status()
63+
return response.text
64+
65+
def instance_group_manager_info(project_id, instance_group):
66+
service = discovery.build('compute', 'v1')
67+
request = service.instanceGroupManagers().get(project=project_id, zone=instance_group['zone'], instanceGroupManager=instance_group['name'])
68+
return request.execute()
69+
70+
def instance_group_manager_error(project_id, instance_group):
71+
service = discovery.build('compute', 'v1')
72+
request = service.instanceGroupManagers().listErrors(project=project_id, zone=instance_group['zone'], instanceGroupManager=instance_group['name'], orderBy="creationTimestamp desc")
73+
return request.execute()
74+
75+
def instance_group_info(project_id, instance_group):
76+
service = discovery.build('compute', 'v1')
77+
request = service.instanceGroups().get(project=project_id, zone=instance_group['zone'], instanceGroup=instance_group['name'])
78+
return request.execute()
79+
80+
def list_managed_instances(instance_group):
81+
project_id = get_project_id()
82+
service = discovery.build("compute", "v1")
83+
page_token = None
84+
instances = []
85+
while True:
86+
request = service.instanceGroupManagers().listManagedInstances(project=project_id, zone=instance_group["zone"], instanceGroupManager=instance_group["name"], pageToken=page_token, maxResults=20)
87+
ret = request.execute()
88+
if not ret:
89+
return instances
90+
instances += [r["instance"] for r in ret['managedInstances']]
91+
page_token = ret.get("nextPageToken", None)
92+
if not page_token:
93+
break
94+
95+
return instances
96+
97+
def get_instance_property(instance_zone, instance, key):
98+
project_id = get_project_id()
99+
service = discovery.build("compute", "v1")
100+
request = service.instances().get(project=project_id, zone=instance_zone, instance=instance)
101+
ret = request.execute()
102+
return ret[key]
103+
104+
def resize_instance_group(ig, size):
105+
project_id = get_project_id()
106+
service = discovery.build('compute', 'v1')
107+
request = service.instanceGroupManagers().resize(project=project_id, zone=ig['zone'], instanceGroupManager=ig['name'], size=size)
108+
return request.execute()
109+
110+
def start_instance(instance_name, zone):
111+
service = discovery.build('compute', 'v1')
112+
request = service.instances().start(
113+
project=get_project_id(),
114+
zone=zone,
115+
instance=instance_name
116+
)
117+
response = request.execute()
118+
return response
119+
120+
121+
def stop_instance(instance_name, zone):
122+
service = discovery.build('compute', 'v1')
123+
request = service.instances().stop(
124+
discardLocalSsd=True,
125+
project=get_project_id(),
126+
zone=zone,
127+
instance=instance_name
128+
)
129+
response = request.execute()
130+
return response

common/google_metadata.py

Lines changed: 0 additions & 34 deletions
This file was deleted.

0 commit comments

Comments
 (0)