From 757320b154767bd038de2d97658950034344081d Mon Sep 17 00:00:00 2001 From: haileyschoelkopf Date: Wed, 9 Aug 2023 23:36:58 +0000 Subject: [PATCH 1/8] add s3 checkpoint syncing --- megatron/checkpointing.py | 122 ++++++++++++++++++++++++++- megatron/neox_arguments/neox_args.py | 10 +++ megatron/training.py | 1 + requirements/requirements.txt | 2 + 4 files changed, 133 insertions(+), 2 deletions(-) diff --git a/megatron/checkpointing.py b/megatron/checkpointing.py index 8ad2a88c7..2c08f46c2 100644 --- a/megatron/checkpointing.py +++ b/megatron/checkpointing.py @@ -18,13 +18,17 @@ """Input/output checkpointing.""" import json +import math import os import re import shutil +import time import random import sys import numpy as np +import boto3 +import hf_transfer import torch from glob import glob @@ -137,6 +141,10 @@ def get_checkpoint_name(checkpoints_path, iteration, release=False, mp_rank=None ) +def get_checkpoint_tag(iteration: int) -> str: + return f"global_step{iteration}" + + def delete_old_checkpoints(save_dir, n_to_keep): if torch.distributed.get_rank() == 0: ckpt_dir_regex = r"global_step[\d]*" @@ -188,7 +196,7 @@ def save_ds_checkpoint(iteration, model, neox_args): sd["checkpoint_validation_logits"] = logits # checkpoint folder name - tag = f"global_step{iteration}" + tag = get_checkpoint_tag(iteration) # save checkpoint model.save_checkpoint(neox_args.save, tag=tag, client_state=sd) @@ -203,6 +211,111 @@ def save_ds_checkpoint(iteration, model, neox_args): f.write(config_data) else: json.dump(config_data, f) +def multiprocessing_starmap(func, args, num_processes=None): + """Wrapper to allow for re-usable multiprocessing pools with `spawn` context handling + Args: + func (Callable): Function to call + args (Iterable): Iterable of arguments to pass to `func` + num_processes (int, optional): Number of processes to spawn. Defaults to `multiprocessing.cpu_count() - 1` + """ + import multiprocessing + num_processes = num_processes or (multiprocessing.cpu_count() - 1) + with multiprocessing.get_context("spawn").Pool(processes=num_processes) as process_pool: + process_pool.starmap(func, args) + process_pool.terminate() + process_pool.join() + del process_pool + + +def _upload( + file_path: str, + s3_key: str, + chunk_size: int = 104_857_600, + max_files: int = 64, + parallel_failures: int = 63, + max_retries: int = 5, +): + """Upload local file to S3 using `hf_transfer` library + Args: + file_path (str): Local filename to upload + s3_key (str): S3 key to upload to. E.g. `s3://bucket-name/path/to/file` + chunk_size (int, optional): Chunk size to use for multipart upload. + Defaults to 100MiB = 104_857_600 + max_files (int, optional): Number of open file handles, which determines + the maximum number of parallel downloads. Defaults to 64 + parallel_failures (int, optional): Number of maximum failures of different + chunks in parallel (cannot exceed max_files). Defaults to 63 + max_retries (int, optional): Number of retries for each chunk. Defaults to 5 + """ + s3 = boto3.client('s3') + bucket = s3_key.split("s3://")[1].split("/")[0] + key = s3_key.split(bucket)[1].lstrip("/") + + # 1. Init multipart upload and obtain unique upload identifier + upload = s3.create_multipart_upload( + ACL="bucket-owner-full-control", + Bucket=bucket, + Key=key, + ) + upload_id = upload["UploadId"] + + # 2. Generate presigned URLs for each part + file_size = os.stat(file_path).st_size + urls = [] + nb_parts = math.ceil(file_size / chunk_size) + for part_number in range(1, nb_parts + 1): + params = { + "Bucket": bucket, + "Key": key, + "PartNumber": part_number, + "UploadId": upload_id, + } + urls.append( + s3.generate_presigned_url( + ClientMethod="upload_part", Params=params, ExpiresIn=86400 + ) + ) + + # 3. Upload parts in parallel + responses = hf_transfer.multipart_upload( + file_path=file_path, + parts_urls=urls, + chunk_size=chunk_size, + max_files=max_files, + parallel_failures=parallel_failures, + max_retries=max_retries, + ) + + # 4. Complete multipart upload request with ETag values + etag_with_parts = [] + for part_number, header in enumerate(responses): + etag = header.get("etag") + etag_with_parts.append({"ETag": etag, "PartNumber": part_number + 1}) + parts = {"Parts": etag_with_parts} + s3.complete_multipart_upload( + Bucket=bucket, Key=key, MultipartUpload=parts, UploadId=upload_id + ) + + +def upload_checkpoint(iteration, neox_args): + local_checkpoint_path = os.path.join(os.path.abspath(neox_args.save), get_checkpoint_tag(iteration)) + local_checkpoint_list = sorted(filter( + lambda x: os.path.isfile(x), + [str(p) for p in Path(local_checkpoint_path).rglob("*")], + )) + remote_checkpoint_path = os.path.join( + neox_args.s3_path, os.path.basename(neox_args.save), get_checkpoint_tag(iteration)) + remote_checkpoint_list = [ + os.path.join(remote_checkpoint_path, os.path.relpath(local_checkpoint, local_checkpoint_path)) + for local_checkpoint in local_checkpoint_list + ] + inputs = zip(local_checkpoint_list, remote_checkpoint_list, [neox_args.s3_chunk_size] * len(local_checkpoint_list)) + + print_rank_0(f"[RANK {torch.distributed.get_rank()}] Uploading checkpoint `{local_checkpoint_path}` to `{remote_checkpoint_path}`...") + start = time.time() + multiprocessing_starmap(_upload, inputs) + total_time = time.time() - start + print_rank_0(f"[RANK {torch.distributed.get_rank()}] Uploaded checkpoint `{local_checkpoint_path}` to `{remote_checkpoint_path}` in {total_time:.2f}s") def save_checkpoint(neox_args, iteration, model, optimizer, lr_scheduler): @@ -213,6 +326,11 @@ def save_checkpoint(neox_args, iteration, model, optimizer, lr_scheduler): else: raise ValueError("Must be using deepspeed to use neox") + torch.distributed.barrier() + upload_to_s3 = torch.distributed.get_rank() == 0 and neox_args.s3_path is not None + if upload_to_s3: + upload_checkpoint(iteration, neox_args) + # Wait so everyone is done (necessary) torch.distributed.barrier() if neox_args.keep_last_n_checkpoints is not None: @@ -233,7 +351,7 @@ def load_checkpoint( if neox_args.finetune: load_optim_and_scheduler = False if iteration is not None: - tag = f"global_step{iteration}" + tag = get_checkpoint_tag(iteration) else: tag = None checkpoint_name, state_dict = model.load_checkpoint( diff --git a/megatron/neox_arguments/neox_args.py b/megatron/neox_arguments/neox_args.py index e1ea16a16..a3173246e 100644 --- a/megatron/neox_arguments/neox_args.py +++ b/megatron/neox_arguments/neox_args.py @@ -793,6 +793,16 @@ class NeoXArgsTraining(NeoXArgsTemplate): Output directory to save checkpoints to. """ + s3_path: str = None + """ + Path to s3 bucket for saving checkpoints. + """ + + s3_chunk_size: int = 104_857_600 + """ + The number of bytes in each file chunk when uploading to s3. Defaults to 100MiB. + """ + config_files: dict = None """ Store of original config files mapping config filename to file contents diff --git a/megatron/training.py b/megatron/training.py index 96a94a1d0..9e03262f3 100644 --- a/megatron/training.py +++ b/megatron/training.py @@ -30,6 +30,7 @@ from deepspeed.runtime.data_pipeline.curriculum_scheduler import CurriculumScheduler import numpy as np +from CPCargo import Heartbeat from megatron.utils import ( Timers, init_wandb, diff --git a/requirements/requirements.txt b/requirements/requirements.txt index 3f3a70882..443f162e6 100644 --- a/requirements/requirements.txt +++ b/requirements/requirements.txt @@ -3,6 +3,7 @@ git+https://github.com/EleutherAI/DeeperSpeed.git#egg=deepspeed ftfy>=6.0.1 git+https://github.com/EleutherAI/lm_dataformat.git@4eec05349977071bf67fc072290b95e31c8dd836 huggingface_hub>=0.11.0 +hf-transfer>=0.1.3 lm_eval>=0.3.0 mpi4py>=3.0.3 numpy>=1.22.0 @@ -13,3 +14,4 @@ six tiktoken>=0.1.2 tokenizers>=0.12.1 transformers>=4.24.0 +git+https://github.com/samikama/CPCargo@main From fa74642d8977e9bc38ff4c0962b79cc14645f194 Mon Sep 17 00:00:00 2001 From: github-actions Date: Wed, 9 Aug 2023 23:37:49 +0000 Subject: [PATCH 2/8] Update NeoXArgs docs automatically --- configs/neox_arguments.md | 18 +++++++++++++++++- 1 file changed, 17 insertions(+), 1 deletion(-) diff --git a/configs/neox_arguments.md b/configs/neox_arguments.md index c50e7ff01..e0cfe7f62 100644 --- a/configs/neox_arguments.md +++ b/configs/neox_arguments.md @@ -111,7 +111,7 @@ Logging Arguments - **git_hash**: str - Default = d3e481c + Default = 757320b current git hash of repository @@ -1169,6 +1169,22 @@ Training Arguments +- **s3_path**: str + + Default = None + + Path to s3 bucket for saving checkpoints. + + + +- **s3_chunk_size**: int + + Default = 104857600 + + The number of bytes in each file chunk when uploading to s3. Defaults to 100MiB. + + + - **config_files**: dict Default = None From f6092439b1fdf819e3929461c348cc3d3762ffc9 Mon Sep 17 00:00:00 2001 From: haileyschoelkopf Date: Wed, 20 Sep 2023 15:01:31 +0000 Subject: [PATCH 3/8] remove CPCargo requirement --- megatron/training.py | 1 - requirements/requirements.txt | 1 - 2 files changed, 2 deletions(-) diff --git a/megatron/training.py b/megatron/training.py index 9e03262f3..96a94a1d0 100644 --- a/megatron/training.py +++ b/megatron/training.py @@ -30,7 +30,6 @@ from deepspeed.runtime.data_pipeline.curriculum_scheduler import CurriculumScheduler import numpy as np -from CPCargo import Heartbeat from megatron.utils import ( Timers, init_wandb, diff --git a/requirements/requirements.txt b/requirements/requirements.txt index 443f162e6..9da852b31 100644 --- a/requirements/requirements.txt +++ b/requirements/requirements.txt @@ -14,4 +14,3 @@ six tiktoken>=0.1.2 tokenizers>=0.12.1 transformers>=4.24.0 -git+https://github.com/samikama/CPCargo@main From 4e548a47b0814a3d6759c34091323b20a147a8f4 Mon Sep 17 00:00:00 2001 From: github-actions Date: Wed, 20 Sep 2023 15:03:00 +0000 Subject: [PATCH 4/8] Update NeoXArgs docs automatically --- configs/neox_arguments.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/configs/neox_arguments.md b/configs/neox_arguments.md index b73c78e15..0d5aaaaaf 100644 --- a/configs/neox_arguments.md +++ b/configs/neox_arguments.md @@ -111,7 +111,7 @@ Logging Arguments - **git_hash**: str - Default = 757320b + Default = 534599a current git hash of repository From 3d76d4f311fe13fb9a49e3cb83ae7c1c49d904d7 Mon Sep 17 00:00:00 2001 From: Quentin Anthony Date: Sat, 23 Sep 2023 12:51:17 -0400 Subject: [PATCH 5/8] Make s3 imports try-except and separate requirements to s3 file --- megatron/checkpointing.py | 10 ++++++++-- requirements/requirements-s3.txt | 2 ++ requirements/requirements.txt | 1 - 3 files changed, 10 insertions(+), 3 deletions(-) create mode 100644 requirements/requirements-s3.txt diff --git a/megatron/checkpointing.py b/megatron/checkpointing.py index 2c08f46c2..8bcc01f3b 100644 --- a/megatron/checkpointing.py +++ b/megatron/checkpointing.py @@ -27,8 +27,14 @@ import sys import numpy as np -import boto3 -import hf_transfer +try: + import boto3 +except ModuleNotFoundError: + print("For s3 checkpointing, please install boto3 either using requirements/requirements-s3.txt or https://github.com/boto/boto3") +try: + import hf_transfer +except ModuleNotFoundError: + print("For s3 checkpointing, please install hf_transfer either using requirements/requirements-s3.txt or https://github.com/huggingface/hf_transfer") import torch from glob import glob diff --git a/requirements/requirements-s3.txt b/requirements/requirements-s3.txt new file mode 100644 index 000000000..7a2924ccd --- /dev/null +++ b/requirements/requirements-s3.txt @@ -0,0 +1,2 @@ +hf-transfer>=0.1.3 +boto3 \ No newline at end of file diff --git a/requirements/requirements.txt b/requirements/requirements.txt index 03bf47a45..88e49f073 100644 --- a/requirements/requirements.txt +++ b/requirements/requirements.txt @@ -3,7 +3,6 @@ git+https://github.com/EleutherAI/DeeperSpeed.git#egg=deepspeed ftfy>=6.0.1 git+https://github.com/EleutherAI/lm_dataformat.git@4eec05349977071bf67fc072290b95e31c8dd836 huggingface_hub>=0.11.0 -hf-transfer>=0.1.3 lm_eval>=0.3.0 mpi4py>=3.0.3 numpy>=1.22.0 From b75b7b30d729050ee3c496b1c442b45450d9cc1c Mon Sep 17 00:00:00 2001 From: github-actions Date: Sat, 23 Sep 2023 16:53:26 +0000 Subject: [PATCH 6/8] Update NeoXArgs docs automatically --- configs/neox_arguments.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/configs/neox_arguments.md b/configs/neox_arguments.md index 7c36209c2..22745ca75 100644 --- a/configs/neox_arguments.md +++ b/configs/neox_arguments.md @@ -111,7 +111,7 @@ Logging Arguments - **git_hash**: str - Default = 1d20559 + Default = 3762440 current git hash of repository From aa6c176d80e7242451bb9b02d5cacb51e56a4bb1 Mon Sep 17 00:00:00 2001 From: Quentin Anthony Date: Sat, 23 Sep 2023 12:57:57 -0400 Subject: [PATCH 7/8] Announce feature --- README.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/README.md b/README.md index bd4af5c0b..c211f4fc3 100644 --- a/README.md +++ b/README.md @@ -18,6 +18,8 @@ GPT-NeoX leverages many of the same features and technologies as the popular Meg * Easy connections with the open source ecosystem, including Hugging Face's [tokenizers](https://github.com/huggingface/tokenizers) and [transformers](https://github.com/huggingface/transformers/) libraries, logging via [WandB](https://wandb.ai/site), and evaluation via our [Language Model Evaluation Harness](https://github.com/EleutherAI/lm-evaluation-harness). ## News +**[8/10/2023]** We now support checkpointing with AWS S3! Activate with the `s3_path` config option (for more detail, see [the PR](https://github.com/EleutherAI/gpt-neox/pull/1010)) + **[9/20/2023]** As of https://github.com/EleutherAI/gpt-neox/pull/1035, we have deprecated Flash Attention 0.x and 1.x, and migrated support to Flash Attention 2.x. We don't believe this will cause problems, but if you have a specific use-case that requires old flash support using the latest GPT-NeoX, please raise an issue. **[8/10/2023]** We have experimental support for LLaMA 2 and Flash Attention v2 supported in our [math-lm](https://github.com/EleutherAI/math-lm) project that will be upstreamed later this month. From a66bd5c30edc692d925cc5ce4c35169e03c479ff Mon Sep 17 00:00:00 2001 From: github-actions Date: Sat, 23 Sep 2023 16:58:24 +0000 Subject: [PATCH 8/8] Update NeoXArgs docs automatically --- configs/neox_arguments.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/configs/neox_arguments.md b/configs/neox_arguments.md index 22745ca75..f0c146ca6 100644 --- a/configs/neox_arguments.md +++ b/configs/neox_arguments.md @@ -111,7 +111,7 @@ Logging Arguments - **git_hash**: str - Default = 3762440 + Default = aa6c176 current git hash of repository