Skip to content

Commit

Permalink
Merge pull request EleutherAI#550 from Mistobaan/Mistobaan/issue549
Browse files Browse the repository at this point in the history
Update GitHub framework to run on GPUs
  • Loading branch information
StellaAthena committed Mar 18, 2022
2 parents f05a95e + 166f72d commit 0bf472e
Show file tree
Hide file tree
Showing 16 changed files with 166 additions and 69 deletions.
9 changes: 9 additions & 0 deletions .github/workflows/pull_request.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,12 @@ jobs:
with:
python-version: 3.8
- uses: pre-commit/[email protected]

run-tests:
runs-on: self-hosted
steps:
- uses: actions/checkout@v2
- name: prepare data
run: python prepare_data.py
- name: Run Tests
run: pytest --forked tests
2 changes: 1 addition & 1 deletion megatron/checkpointing.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@

def check_checkpoint_args(neox_args, checkpoint_args):
"""Ensure fixed arguments for a model are the same for the input
arguments and the one retrieved frm checkpoint."""
arguments and the one retrieved from checkpoint."""

assert isinstance(checkpoint_args, dict), "args stored in checkpoint is a dict"
for checkpoint_arg_name, checkpoint_arg_value in checkpoint_args.items():
Expand Down
21 changes: 17 additions & 4 deletions megatron/logging.py
Original file line number Diff line number Diff line change
Expand Up @@ -218,17 +218,30 @@ def add_to_logging(name):
)

# (optional) Log grad/param norms to wandb / tb every step
if neox_args.log_grad_pct_zeros or neox_args.log_grad_norm or neox_args.log_param_norm:
if (
neox_args.log_grad_pct_zeros
or neox_args.log_grad_norm
or neox_args.log_param_norm
):
if neox_args.log_grad_pct_zeros or neox_args.log_grad_norm:
model.store_gradients = True # start storing gradients

for i, (name, param) in enumerate(model.module.named_parameters()):
if neox_args.log_grad_pct_zeros:
if hasattr(model, 'stored_gradients') and model.stored_gradients is not None:
if (
hasattr(model, "stored_gradients")
and model.stored_gradients is not None
):
grad = model.stored_gradients[i]
if grad is not None:
tb_wandb_log(f'pct_grad_zeros/{name}', (grad == 0).float().mean().item()*100, iteration,
use_wandb=neox_args.use_wandb, tensorboard_writer=neox_args.tensorboard_writer, all_ranks=True)
tb_wandb_log(
f"pct_grad_zeros/{name}",
(grad == 0).float().mean().item() * 100,
iteration,
use_wandb=neox_args.use_wandb,
tensorboard_writer=neox_args.tensorboard_writer,
all_ranks=True,
)
if neox_args.log_grad_norm:
if (
hasattr(model, "stored_gradients")
Expand Down
2 changes: 1 addition & 1 deletion megatron/training.py
Original file line number Diff line number Diff line change
Expand Up @@ -706,7 +706,7 @@ def evaluate(

if neox_args.char_level_ppl:
# calculate character level perplexity, if specified
# if neox_args.char_level_perplexity:
# if neox_args.char_level_ppl:
# unwrap the data_iterator
tokens_per_char = data_iterator.tokens_per_char()
print_rank_0(f"Counting chars took {data_iterator.total_time} seconds")
Expand Down
4 changes: 3 additions & 1 deletion megatron/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,9 @@ def init_wandb(neox_args):
return

if not neox_args.wandb_init_all_ranks:
use_wandb = is_local_main() and (get_wandb_api_key(neox_args=neox_args) is not None)
use_wandb = is_local_main() and (
get_wandb_api_key(neox_args=neox_args) is not None
)
neox_args.update_value("use_wandb", use_wandb)
if neox_args.use_wandb:
group_name = neox_args.wandb_group
Expand Down
3 changes: 3 additions & 0 deletions requirements/requirements-dev.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
autopep8==1.5.6
pre-commit~=2.17.0
pytest==6.2.3
pytest-cov==2.11.1
pytest-forked==1.3.0
pytest-xdist
transformers~=4.16.2
2 changes: 1 addition & 1 deletion requirements/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,5 @@ regex
sentencepiece
six
tokenizers==0.10.2
transformers==4.5.0
transformers~=4.16.0
wandb==0.10.28
13 changes: 13 additions & 0 deletions tests/Readme.md → tests/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,11 @@ Tests use pytests with coverage and forked plugins. Install with:
pip install -r requirements/requirements-dev.txt
```

Download the required test data
```bash
python prepare_data.py
```

# Run

Tests can be run using pytest.
Expand Down Expand Up @@ -36,3 +41,11 @@ If a html coverage report has been created a simple http server can be run to se
```bash
python -m http.server --directory htmlcov 8000
```
## Tips and Tricks
if You see this kind of error:
```
RuntimeError: Cannot re-initialize CUDA in forked subprocess. To use CUDA with multiprocessing, you must use the 'spawn' start method
```
It means that you used some pytorch.cuda function before the test creates the processes.
73 changes: 71 additions & 2 deletions tests/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import torch
import torch.distributed as dist
from torch.multiprocessing import Process
import multiprocessing as mp
from yaml import load

try:
Expand All @@ -27,6 +28,58 @@
DEEPSPEED_UNIT_WORKER_TIMEOUT = 120


def get_xdist_worker_id():
xdist_worker = os.environ.get("PYTEST_XDIST_WORKER", None)
if xdist_worker is not None:
xdist_worker_id = xdist_worker.replace("gw", "")
return int(xdist_worker_id)
return None


def get_master_port():
master_port = os.environ.get("DS_TEST_PORT", "29503")
xdist_worker_id = get_xdist_worker_id()
if xdist_worker_id is not None:
master_port = str(int(master_port) + xdist_worker_id)
return master_port


_num_gpus = None


def count_gpus():
global _num_gpus
if _num_gpus is None:
import subprocess

nvidia_smi = subprocess.check_output(["nvidia-smi", "--list-gpus"])
_num_gpus = len(nvidia_smi.decode("utf-8").strip().split("\n"))
return _num_gpus


def set_cuda_visibile():
cuda_visible = os.environ.get("CUDA_VISIBLE_DEVICES", None)
xdist_worker_id = get_xdist_worker_id()
if xdist_worker_id is None:
xdist_worker_id = 0
if cuda_visible is None:
# CUDA_VISIBLE_DEVICES is not set, discover it from nvidia-smi instead
import subprocess

nvidia_smi = subprocess.check_output(["nvidia-smi", "--list-gpus"])
num_gpus = len(nvidia_smi.decode("utf-8").strip().split("\n"))
cuda_visible = ",".join(map(str, range(num_gpus)))

# rotate list based on xdist worker id, example below
# wid=0 -> ['0', '1', '2', '3']
# wid=1 -> ['1', '2', '3', '0']
# wid=2 -> ['2', '3', '0', '1']
# wid=3 -> ['3', '0', '1', '2']
dev_id_list = cuda_visible.split(",")
dev_id_list = dev_id_list[xdist_worker_id:] + dev_id_list[:xdist_worker_id]
os.environ["CUDA_VISIBLE_DEVICES"] = ",".join(dev_id_list)


def get_root_directory():
return Path(__file__).parents[1]

Expand Down Expand Up @@ -83,19 +136,27 @@ def dist_wrap(run_func):
def dist_init(local_rank, num_procs, *func_args, **func_kwargs):
"""Initialize torch.distributed and execute the user function."""
os.environ["MASTER_ADDR"] = "127.0.0.1"
os.environ["MASTER_PORT"] = "29503"
os.environ["MASTER_PORT"] = get_master_port()
os.environ["LOCAL_RANK"] = str(local_rank)
# NOTE: unit tests don't support multi-node so local_rank == global rank
os.environ["RANK"] = str(local_rank)
os.environ["WORLD_SIZE"] = str(num_procs)

# turn off NCCL logging if set
os.environ.pop("NCCL_DEBUG", None)

deepspeed.init_distributed(dist_backend=backend)

if torch.cuda.is_available():
torch.cuda.set_device(local_rank)

run_func(*func_args, **func_kwargs)

# make sure all ranks finish at the same time
torch.distributed.barrier()
# tear down after test completes
torch.distributed.destroy_process_group()

def dist_launcher(num_procs, *func_args, **func_kwargs):
"""Launch processes and gracefully handle failures."""

Expand Down Expand Up @@ -141,7 +202,15 @@ def dist_launcher(num_procs, *func_args, **func_kwargs):
def run_func_decorator(*func_args, **func_kwargs):
"""Entry point for @distributed_test()."""

gpus = count_gpus()

if isinstance(world_size, int):
if gpus < world_size:
pytest.mark.skip(
reason=f"at least {world_size} GPUs are required to run this test"
)
return

dist_launcher(world_size, *func_args, **func_kwargs)
elif isinstance(world_size, list):
for procs in world_size:
Expand Down Expand Up @@ -170,7 +239,7 @@ def model_setup(yaml_list=None, param_dict=None, clear_data=True):
clear_test_dirs()

overwrite_values = {
"user_script": str(get_root_directory() / "pretrain_gpt2.py"),
"user_script": str(get_root_directory() / "train.py"),
"save": TEST_CHECKPOINT_DIR,
"load": TEST_CHECKPOINT_DIR,
"log_dir": TEST_LOG_DIR,
Expand Down
50 changes: 20 additions & 30 deletions tests/model/test_fused_kernels.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,18 @@
import os

if __name__ == "__main__":
import sys

sys.path.append(os.path.abspath(""))

import math

import torch
from torch.nn import LayerNorm

from megatron.model.fused_softmax import FusedScaleMaskSoftmax, SoftmaxFusionTypes
from transformers import BertTokenizer
from transformers.models.bert.modeling_bert import BertModel

from transformers import BertTokenizer, GPT2Tokenizer
from transformers.models.bert.modeling_bert import BertModel
from transformers.models.gpt2.modeling_gpt2 import GPT2Model
import transformers

from megatron.model.gpt2_model import gpt2_attention_mask_func as attention_mask_func
transformers.logging.set_verbosity(
transformers.logging.FATAL,
)


def test_load_fused_kernels():
Expand All @@ -28,6 +28,11 @@ def test_load_fused_kernels():


def test_fused_softmax():
from megatron.model.fused_softmax import FusedScaleMaskSoftmax, SoftmaxFusionTypes
from megatron.model.gpt2_model import (
gpt2_attention_mask_func as attention_mask_func,
)

bert = BertModel.from_pretrained("bert-base-cased").cuda().half()
tokenizer = BertTokenizer.from_pretrained("bert-base-cased")
test_text = (
Expand Down Expand Up @@ -124,6 +129,11 @@ def test_fused_softmax():


def test_fused_upper_triangle_mask_softmax():
from megatron.model.gpt2_model import (
gpt2_attention_mask_func as attention_mask_func,
)
from megatron.model.fused_softmax import FusedScaleMaskSoftmax, SoftmaxFusionTypes

gpt = GPT2Model.from_pretrained("gpt2").cuda().half()
tokenizer = GPT2Tokenizer.from_pretrained("gpt2")
test_text = (
Expand Down Expand Up @@ -219,23 +229,3 @@ def test_fused_upper_triangle_mask_softmax():
f"\n > fused_values={fused_softmax_output[-1][-1][-1][:5].tolist()}, "
f"\n > torch_values={torch_softmax_output[-1][-1][-1][:5].tolist()}"
)


if __name__ == "__main__":
try:
from transformers import BertTokenizer, GPT2Tokenizer
from transformers.models.bert.modeling_bert import BertModel
from transformers.models.gpt2.modeling_gpt2 import GPT2Model
import transformers

transformers.logging.set_verbosity(
transformers.logging.FATAL,
)

except:
print("\n[Fail] Please install `transformers` package to test fused kernels\n")
exit(-1)

test_load_fused_kernels()
test_fused_softmax()
test_fused_upper_triangle_mask_softmax()
16 changes: 8 additions & 8 deletions tests/model/test_model_checkpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,8 @@
to run in order to perform follow up tests. Joining in one test reduces runtime at the expense of decreased transparency of test results in case of failures.
"""
import os

if __name__ == "__main__":
import sys

sys.path.append(os.path.abspath(""))
import shutil
import torch

import pytest
from tests.common import (
Expand Down Expand Up @@ -54,8 +51,14 @@
)


@pytest.mark.skip
@pytest.mark.parametrize("param_dict", parameters, ids=names)
def test_train(param_dict):
import tempfile

d = tempfile.mkdtemp()
param_dict["save"] = d

@distributed_test(world_size=2)
def wrapper():
run_checkpoint_test(param_dict=param_dict)
Expand Down Expand Up @@ -111,9 +114,6 @@ def run_checkpoint_test(yaml_list=None, param_dict=None):
params_equal = (p1 == p2).all().item()
assert params_equal, "run_checkpoint_test() params equal: " + str(n1)

if torch.distributed.get_world_size() == 1 or torch.distributed.get_rank() == 0:
clear_test_dirs()


if __name__ == "__main__":
params = list(
Expand Down
14 changes: 3 additions & 11 deletions tests/model/test_model_generation.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,8 @@


import os

if __name__ == "__main__":
import sys

sys.path.append(os.path.abspath(""))

import pytest
from tests.common import distributed_test, model_setup, parametrize, dict_repr
import torch
from tests.common import distributed_test, model_setup, parametrize

PARAMS_TO_TEST = {
"pipe_parallel_size,model_parallel_size,world_size": [
Expand Down Expand Up @@ -57,6 +50,7 @@
)


@pytest.mark.skip
@pytest.mark.parametrize("param_dict", parameters, ids=names)
def test_train(param_dict):
@distributed_test(world_size=param_dict.pop("world_size", 2))
Expand All @@ -82,9 +76,7 @@ def run_generate_test(param_dict, prompt):

param_dict.update(fixed_params)
# TODO: we don't need to reinstantiate the model every time if we're only changing sampling settings - should be a workaround for this
model, _, _, args_loaded = model_setup(
None, param_dict, clear_data=True, inference=True
)
model, _, _, args_loaded = model_setup(None, param_dict, clear_data=True)
model.eval()

prompts = [prompt for _ in range(args_loaded.num_samples)]
Expand Down
Loading

0 comments on commit 0bf472e

Please sign in to comment.