Skip to content

Commit

Permalink
[Train] Simplify llama 2 workspace template (#38444)
Browse files Browse the repository at this point in the history
* Remove the need for prepare_node stuff by enabling the downloading as part of the training function
* Added a script to create job submission yamls
* Simplified the ray dataset creation by directly reading the json file into a ray dataset.
Signed-off-by: Kourosh Hakhamaneshi <[email protected]>
  • Loading branch information
kouroshHakha committed Aug 15, 2023
1 parent 0634a5c commit ab06452
Show file tree
Hide file tree
Showing 10 changed files with 126 additions and 156 deletions.
11 changes: 11 additions & 0 deletions doc/source/templates/04_finetuning_llms_with_deepspeed/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -191,4 +191,15 @@ scaling_config=air.ScalingConfig(
```


### Submiting a production job
You can easily submit a production job using the following command:

```
python create_job_yaml.py --size=7b --output-path=./job.yaml
```

This will create a job yaml file that you can use to submit a production job on Anyscale platform.

```
anyscale job submit job.yaml
```
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
region: us-west1
allowed_azs: [any]
head_node_type:
name: head_node_type
instance_type: g5.48xlarge
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
region: us-west1
allowed_azs: [any]
head_node_type:
name: head_node_type
instance_type: g5.48xlarge
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
# 1 g5.16xlarge + 15 g5.4xlarge --> 16 GPUs, 256G RAM on trainer and 64G RAM on workers
region: us-west1
allowed_azs: [any]
head_node_type:
name: head_node
instance_type: g5.16xlarge
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
from argparse import ArgumentParser

import yaml
import os
import pathlib


def _parse_args():
parser = ArgumentParser()
parser.add_argument(
"--size",
type=str,
default="7b",
choices=["7b", "13b", "70b"],
help="Size of the model to train",
)
parser.add_argument(
"--as-test", action="store_true", help="Whether to run in test mode"
)
parser.add_argument(
"--max-retries",
type=int,
default=0,
help="Number of times to retry the job if it fails",
)
parser.add_argument(
"--output-path",
type=str,
default="./job.yaml",
help="The path that job yaml should be stored.",
)
parser.add_argument("--compute-config", type=str, help="Path to the compute config")
parser.add_argument(
"--cluster-env-build-id",
type=str,
help="The build-id of the cluster env to use",
)
return parser.parse_args()


def main():
pargs = _parse_args()

# Resolve compute config
compute_config_kwargs = {}
if pargs.compute_config:
with open(pargs.compute_config, "r") as f:
compute_config = yaml.safe_load(f)
compute_config.update(
{
"cloud_id": os.environ["ANYSCALE_CLOUD_ID"],
}
)
compute_config_kwargs.update(compute_config=compute_config)

# Resolve cluster env config
cluster_env_config_kwargs = {}
if pargs.cluster_env_build_id:
cluster_env_config_kwargs.update(build_id=pargs.cluster_env_build_id)

base_cmd = f"chmod +x ./run_llama_ft.sh && ./run_llama_ft.sh --size={pargs.size}"
job_config = {
"name": f"llama-2-{pargs.size}",
"entrypoint": base_cmd + (" --as-test" if pargs.as_test else ""),
"max_retries": pargs.max_retries,
**compute_config_kwargs,
**cluster_env_config_kwargs,
}

pathlib.Path(os.path.dirname(pargs.output_path)).mkdir(parents=True, exist_ok=True)
with open(pargs.output_path, "w") as f:
yaml.safe_dump(job_config, f)
print("Job config written to ", pargs.output_path)
print("To submit the job, run:")
print(f"anyscale job submit {pargs.output_path}")


if __name__ == "__main__":
main()
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,12 @@
import functools
import time
import tree
import pandas as pd
from pathlib import Path
import torch.nn as nn
from ray import tune # noqa: F401
import tqdm
import tempfile
from filelock import FileLock

from accelerate import Accelerator, DeepSpeedPlugin
from accelerate.utils import DummyOptim, DummyScheduler, set_seed
Expand All @@ -30,7 +30,12 @@
from ray.train.torch import TorchTrainer
import ray.util.scheduling_strategies

from utils import get_checkpoint_and_refs_dir, get_mirror_link
from utils import (
get_checkpoint_and_refs_dir,
get_mirror_link,
download_model,
get_download_path,
)

OPTIM_BETAS = (0.9, 0.999)
OPTIM_EPS = 1e-8
Expand Down Expand Up @@ -76,20 +81,6 @@ def get_tokenizer(model_name, special_tokens):
return tokenizer


def create_ray_dataset(path):
# jsonl file
with open(path, "r") as json_file:
items = [json.loads(x) for x in json_file]

dataset = {"input": []}
for item in items:
assert set(item.keys()) == {"input"}
dataset["input"].append(item["input"])

df = pd.DataFrame.from_dict(dataset)
return ray.data.from_pandas(df)


def evaluate(
*, model, eval_ds, accelerator, bsize, ds_kwargs, as_test: bool = False
) -> Tuple[float, float]:
Expand Down Expand Up @@ -173,6 +164,18 @@ def training_function(kwargs: dict):
special_tokens = kwargs.get("special_tokens", [])
model_id = config["model_name"]

# We need to download the model weights on this machine if they don't exit.
# We need to acquire a lock to ensure that only one process downloads the model
bucket_uri = get_mirror_link(model_id)
download_path = get_download_path(model_id)
base_path = Path(download_path).parent
base_path.mkdir(parents=True, exist_ok=True)
lock_file = str(base_path / f'{model_id.replace("/", "--")}.lock')
with FileLock(lock_file):
download_model(
model_id=model_id, bucket_uri=bucket_uri, s3_sync_args=["--no-sign-request"]
)

# Sample hyper-parameters for learning rate, batch size, seed and a few other HPs
lr = config["lr"]
num_epochs = int(config["num_epochs"])
Expand Down Expand Up @@ -559,9 +562,10 @@ def main():
}
)

train_ds = create_ray_dataset(args.train_path)
# Read data
train_ds = ray.data.read_json(args.train_path)
if args.test_path is not None:
valid_ds = create_ray_dataset(args.test_path)
valid_ds = ray.data.read_json(args.test_path)
else:
valid_ds = None

Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,16 +1,6 @@
#!/bin/bash


# Function to prepare nodes
prepare_nodes() {
local model_id=$1
echo "Preparing nodes..."
if ! python prepare_nodes.py --hf-model-id "${model_id}"; then
echo "Failed to prepare nodes. Exiting..."
exit 1
fi
}

# Function to check if data directory exists, if not, run create_dataset.py
check_and_create_dataset() {
local data_dir=$1
Expand Down Expand Up @@ -96,7 +86,6 @@ esac
MODEL_ID="meta-llama/Llama-2-${SIZE}-hf"
CONFIG_DIR="./deepspeed_configs/zero_3_llama_2_${SIZE}.json"

prepare_nodes "${MODEL_ID}"
check_and_create_dataset "${DATA_DIR}"
fine_tune "$BS" "$ND" "$MODEL_ID" "$BASE_DIR" "$CONFIG_DIR" "$TRAIN_PATH" "$TEST_PATH" "$TOKEN_PATH" "${params[@]}"

Expand Down

This file was deleted.

13 changes: 8 additions & 5 deletions doc/source/templates/04_finetuning_llms_with_deepspeed/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,13 @@ def get_checkpoint_and_refs_dir(
return checkpoint_dir, refs_dir


def get_download_path(model_id: str):
from transformers.utils.hub import TRANSFORMERS_CACHE

path = os.path.join(TRANSFORMERS_CACHE, f"models--{model_id.replace('/', '--')}")
return path


def download_model(
model_id: str,
bucket_uri: str,
Expand All @@ -59,12 +66,8 @@ def download_model(
The downloaded model may have a 'hash' file containing the commit hash corresponding
to the commit on Hugging Face Hub.
"""

from transformers.utils.hub import TRANSFORMERS_CACHE

s3_sync_args = s3_sync_args or []

path = os.path.join(TRANSFORMERS_CACHE, f"models--{model_id.replace('/', '--')}")
path = get_download_path(model_id)

cmd = (
["awsv2", "s3", "sync"]
Expand Down

0 comments on commit ab06452

Please sign in to comment.