Skip to content

Commit

Permalink
Fixed several bugs with overrunner and changed beahvior on preemption…
Browse files Browse the repository at this point in the history
…: The system will now wait until the previous node is deleted before trying to create a new one
  • Loading branch information
ConnorJL committed May 30, 2019
1 parent 6159697 commit f205736
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 45 deletions.
30 changes: 16 additions & 14 deletions experimental/overrunner.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,25 @@

project = ''
location = 'us-central1-f'
max_attempts = 10
logging.basicConfig(filename='logs/overrunner.log', level=logging.INFO)
logging.getLogger().addHandler(logging.StreamHandler())

logger = logging.getLogger()
logger.setLevel(logging.INFO)
fh = logging.FileHandler('logs/overrunner.log')
fh.setLevel(logging.INFO)
sh = logging.StreamHandler()
sh.setLevel(logging.INFO)
formatter = logging.Formatter('%(asctime)s:%(name)s:%(levelname)s:%(message)s') # Add timestamp to file logging
fh.setFormatter(formatter)
logger.addHandler(fh)
logger.addHandler(sh)

backup_path = "gs:https://connors-models/backups"

if not os.path.exists("logs"):
os.mkdir("logs")

if not os.path.exists("logs/state.json"):
runners = [TPUSurvival(project=project, location=location, id=i+1, params=ex) for i, ex in enumerate(experiments)]
runners = [TPUSurvival(project=project, location=location, id=i, params=ex) for i, ex in enumerate(experiments)]
else:
with open("logs/state.json", "r") as f:
runners = [TPUSurvival(d=state) for state in json.load(f)]
Expand All @@ -44,7 +53,7 @@ def save(runners):
continue

ts.update_state()
logging.info("{} - State {}".format(ts.prefix, ts.state))
logging.info("{} - TPU State: {} - Process Running: {}".format(ts.prefix, ts.state, ts.current_process is not None))

if not ts.created:
logging.info("{} - Creating TPU".format(ts.prefix))
Expand Down Expand Up @@ -74,25 +83,18 @@ def save(runners):
ts.delete()
ts.kill_current_task()

# get ready for the next attempt
ts.increment_index()
if ts.current_index >= max_attempts:
logging.error("{} hit max attempts!".format(ts.prefix))
runners.remove(ts)
continue

if ts.running_time > 60*60*24: # Make a hard checkpoint save every day
logging.info("Backing up {}".format(ts.prefix))
subprocess.call(["gsutil", "cp", "-r", ts.params["model_dir"],
os.path.join(backup_path, ts.params["model_dir"].split("/")[-1] + "-" + str(ts.current_save))])
ts.current_save += 1

if time.time() - running > 60*10: # Save state every hour
if time.time() - running > 60*60: # Save state every hour
logging.info("Saving state")
save(runners)
running = time.time()

time.sleep(10)
time.sleep(30)

logging.info("All runners done.")

Expand Down
4 changes: 2 additions & 2 deletions experimental/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,5 @@ export MODEL=$2
export SCRIPT_NAME=main.py

python3 $SCRIPT_NAME \
--tpu=$TPU_NAME
--model=$MODEL
--tpu $TPU_NAME \
--model $MODEL
48 changes: 19 additions & 29 deletions experimental/tpu_survival.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ def __init__(self, project=None, location=None, id=None, params=None, d=None):
self.location = location
self.prefix = params["name"]
self.id = id
self.params = params["model_params"]
self.params = params
self.running_time = 0.
self.current_save = 0
self.done = False
Expand All @@ -41,25 +41,22 @@ def __init__(self, project=None, location=None, id=None, params=None, d=None):
self.current_save = d["current_save"]
self.done = d["done"]



# current running job
self.current_index = 0
self.current_process = None
self.state = None
self.created = False
self.task_running = False


def tpu_name(self, index=None):
def tpu_name(self):
"""Format tpu_name to be used in creation and deletion calls."""
index = index or self.current_index
return '{}-{}'.format(self.prefix, index)
return '{}'.format(self.prefix)

def tpu_cidr_block(self, index=None):
def tpu_cidr_block(self):
"""Format CIDR block to be used in creation calls."""
index = index or self.current_index
# Limited to 10 retries and a max of 24 experiments due to IPs
cidr = '10.0.{}.0/29'.format((self.id * 10) + index)
cidr = '10.0.{}.0/29'.format(self.id)
return cidr

def update_state(self):
Expand All @@ -75,16 +72,15 @@ def update_state(self):

# The node that is running the current task.
if tpu_name == self.tpu_name():
logging.info('{} - TPU health/state: {}: {}/{}'.format(self.prefix,
tpu_name, health, state))
# logging.info('{} - TPU health/state: {}: {}/{}'.format(self.prefix, tpu_name, health, state))
self.state = state

if self.state is None:
self.created = False

def kill_current_task(self):
"""Kill the current running task."""
logging.info('{} - killing current process: {}'.format(self.prefix, self.current_index))
logging.info('{} - killing current process: {}'.format(self.prefix, self.current_process.pid))

# The subprocess runs a shell command, which in turn calls python.
# This kills the whole process group with the shell command as the
Expand All @@ -94,11 +90,6 @@ def kill_current_task(self):
self.task_running = False
self.running_time += time.time() - self.started_time

def increment_index(self):
self.current_index += 1

logging.info('{} - current_index incremented to: {}'.format(self.prefix, self.current_index))

# run_task should be called at the beginning and
# then only after the call to kill current_process
def run_task(self):
Expand All @@ -111,7 +102,7 @@ def run_task(self):
with open(self.prefix + ".json", "w") as f:
json.dump(self.params["model_params"], f)

cmd = RUN_TASK_COMMAND.format(tpu_name=tpu_name, model=self.prefix)
cmd = RUN_TASK_COMMAND.format(tpu_name=tpu_name, model=self.prefix + ".json")
command = shlex.split(cmd)

# use popen so we can kill it when needed
Expand All @@ -122,11 +113,10 @@ def run_task(self):

self.current_process = p

def delete(self, index=None):
"""Delete the TPU node of the given index.
If the index is not provided the current node is deleted.
def delete(self):
"""Delete the TPU node.
"""
tpu_name = self.tpu_name(index)
tpu_name = self.tpu_name()

logging.info('{} - deleting: {}'.format(self.prefix, tpu_name))

Expand All @@ -136,9 +126,8 @@ def delete(self, index=None):

return p

def create(self, index=None):
"""Create a TPU node of the given index.
If the index is not provided self.current_index is used.
def create(self):
"""Create a TPU node.
"""
tpu_name = self.tpu_name()
tpu_cidr_block = self.tpu_cidr_block()
Expand Down Expand Up @@ -178,7 +167,8 @@ def list_tpus(project, location):
Returns
A Python dictionary with keys 'nodes' and 'nextPageToken'.
"""
service = discovery.build('tpu', 'v1', credentials=credentials)
logging.getLogger("googleapiclient.discovery").setLevel(logging.WARNING) # Silence URL spam
service = discovery.build('tpu', 'v1', credentials=credentials, cache_discovery=False)

parent = 'projects/{}/locations/{}'.format(project, location)

Expand All @@ -203,7 +193,7 @@ def create_tpu(project, location, tpu_name, accelerator_type='v2-8',
Returns
A TPU node creation operation object.
"""
service = discovery.build('tpu', 'v1', credentials=credentials)
service = discovery.build('tpu', 'v1', credentials=credentials, cache_discovery=False)

parent = 'projects/{}/locations/{}'.format(project, location)

Expand Down Expand Up @@ -233,7 +223,7 @@ def get_tpu(project, location, tpu_name):
Returns
A TPU node object.
"""
service = discovery.build('tpu', 'v1', credentials=credentials)
service = discovery.build('tpu', 'v1', credentials=credentials, cache_discovery=False)

name = 'projects/{}/locations/{}/nodes/{}'.format(
project, location, tpu_name)
Expand All @@ -252,7 +242,7 @@ def delete_tpu(project, location, tpu_name):
Returns
A TPU node deletion operation object.
"""
service = discovery.build('tpu', 'v1', credentials=credentials)
service = discovery.build('tpu', 'v1', credentials=credentials, cache_discovery=False)

name = 'projects/{}/locations/{}/nodes/{}'.format(
project, location, tpu_name)
Expand Down

0 comments on commit f205736

Please sign in to comment.