# Llama2 Pretrain/Finetune
This document presents examples of using ATorch to pretrain or finetune the HuggingFace Llama2 model, including [FSDP (ZeRO3)](#FSDP), 3D hybrid parallelism for [semi-automatic optimization](#DS-3D-Parallel), and [fully automatic optimization](#Automatic-Training-Optimization).
- Note:
- Llama2 model and wikitext dataset is used in the examples. If you have already downloaded llama2 model, set environment variable: `MODEL_NAME_OR_PATH=llama2_model_directory`. If you have wikitext in your system, set environment variable: `DATASET_PATH=wikitext_data_directory`
- If llama2 model or wikitext dataset is not present, the training script will automatically download them for you. Note that downloading may take quite some time.
## FSDP
Fully Sharded Data Parallel (FSDP) is PyTorch's implementation of ZeRO3. This example uses FSDP for distributed training, and can be used with other training optimizations, such as mixed precision (fp16/bf16/fp8), gradient checkpointing, etc. This is implemented by calling auto_accelerate API with load_strategy argument, and load_strategy specifies the training optimization method combination.
### Scripts
- training file [fsdp_llama2.py](fsdp_llama2.py)
- launch script [fsdp_llama2_entry.sh](fsdp_llama2_entry.sh)
```bash
cd dlrover/atorch/examples/llama2
pip install -r requirements.txt
# Configurable environment variable: DATASET_PATH, MODEL_NAME_OR_PATH, PER_DEVICE_TRAIN_BATCH_SIZE, etc.
bash fsdp_llama2_entry.sh
# use fp8
USE_FP8=1 sh fsdp_llama2_entry.sh
# use lora
USE_LORA=1 bash fsdp_llama2_entry.sh
```
Note that transformer_engine is required for fp8. Your can use docker image registry.cn-hangzhou.aliyuncs.com/atorch/atorch:pt210_te
, which has transformer_engine pre-installed.
## DS 3D Parallel
### Intro
- For large-scale model training (with 100B+ levels), besides using FSDP/zero3 parallelism, 3D parallelism is widely used in deep learning community. 3D parallelism includes tensor parallel, pipeline parallel, and data parallel. Megatron-LM and DeepSpeed provide excellent 3D parallelism implementation which are popular among users.
- Megatron-LM offers a Col/Row parallel layer that users can incorporate into the model definition to achieve tensor parallelism. DeepSpeed's pipeline parallel feature requires converting the model into a sequential list of LayerSpec, making its usage complicated especially for non-sequential models.
Megatron-LM embeds ParallelLinear
```python
class ParallelAttention(MegatronModule):
def __init__(self, ...):
...
self.query_key_value = mpu.ColumnParallelLinear(
args.hidden_size,
3 * projection_size,
gather_output=False,
init_method=init_method)
...
self.dense = mpu.RowParallelLinear(
projection_size,
args.hidden_size,
input_is_parallel=True,
init_method=output_layer_init_method,
skip_bias_add=True)
...
```
Deepspeed pipeline retrofitting
```python
def model_provider(pre_process=True, post_process=True):
...
if args.deepspeed and not args.no_pipeline_parallel:
model = GPTModelPipe(
num_tokentypes=0,
parallel_output=True
)
else:
model = GPTModel(
num_tokentypes=0,
parallel_output=True,
pre_process=pre_process,
post_process=post_process
)
class GPTModelPipe(PipelineModule,MegatronModule):
def __init__(self, ...):
...
# Embedding layer
self.specs.append(TiedLayerSpec('embed',
EmbeddingPipe,
args.hidden_size,
args.padded_vocab_size,
args.max_position_embeddings,
args.hidden_dropout,
init_method=init_method,
num_tokentypes=num_tokentypes,
tied_weight_attr='word_embeddings_weight'))
for layer_idx in range(args.num_layers):
self.specs.append(
LayerSpec(ParallelTransformerLayerPipe,
init_method=init_method,
output_layer_init_method=scaled_init_method_normal(args.init_method_std,
args.num_layers),
layer_number=layer_idx,
self_attn_mask_type=AttnMaskType.causal))
def _logits_helper(embedding, lm_output):
"""A wrapper to massage inputs/outputs from pipeline. """
return parallel_lm_logits(
lm_output,
embedding.word_embeddings_weight,
self.parallel_output)
self.specs.append(
TiedLayerSpec('embed',
EmbeddingPipe,
args.hidden_size,
args.padded_vocab_size,
args.max_position_embeddings,
args.hidden_dropout,
init_method=init_method,
num_tokentypes=num_tokentypes,
forward_fn=_logits_helper,
tied_weight_attr='word_embeddings_weight')
)
```
- ATorch supports 3D parallel training based on DeepSpeed/Megatron, and supports easy usage by using auto_accelerate with ds_3d_parallel optimization method.
### User Interface
- A simple example of using Interface for 3D parallelization of Transformers Model
example
```python
from transformers.xxx import XXXConfig, XXXModel
from atorch.auto.opt_lib.ds_3d_parallel_optimization import DeepSpeed3DParallelConfig
from atorch.utils.meta_model_utils import record_module_init
# init distributed environment and create 3d parallel groups
atorch.init_distributed("nccl")
# meta model for ds 3d parallel
with record_module_init():
meta_model = XXXModel(XXXConfig(...))
# tensor parallel info and pipeline forward patcher
ds_3d_parallel_cfg = DeepSpeed3DParallelConfig(
tpinfo=get_xxx_tpinfo(),
custom_patcher=get_xxx_custom_patcher(),
)
strategy = [
("parallel_mode", ([("tensor", tensor_size), ("data", data_size), ("pipeline", pipeline_size)], None)),
("deepspeed_3d_parallel", ds_3d_parallel_cfg),
]
# auto_accelerate
status, result, best_strategy = auto_accelerate(
meta_model,
loss_func=my_loss_func,
load_strategy=strategy,
ignore_dryrun_on_load_strategy=True,
)
# DeepSpeed PipelineEngine model
model = result.model
```
- Omits optim and dataset related batch_fn.
- The user specifies the module name information of Tensor Parallel Shard, and the customized forward patcheres if some modules in the pipeline need.
### Related API
- `record_module_init` contextmanager for meta initialization + init args/kwargs recording.
record_module_init
```python
# atorch/utils/meta_model_utils.py
@contextmanager
def record_module_init():
"""
Record modules' init args and kwargs while meta constructing model. Since we don't
save or offload the initial weight, we should reset_paramters or (hf)_init_weights
after building the real modules with the recorded args/kwargs.
This contextmanager was originally designed for building deepspeed PipelineModule from
native torch model implementation.
"""
def init_record_helper(f):
@functools.wraps(f)
def wrapper(module: torch.nn.Module, *args, **kwargs):
f(module, *args, **kwargs)
# record args/kwargs after original init, in case parent cls init covers them
# in mistake; it must be satisfied that args/kwargs not changed in init
module._init_args = args
module._init_kwargs = kwargs
# torch.device('meta') contextmanager may not handle nn.Parameter(...),
# .to('meta') manually to force everything in meta
module.to("meta")
return wrapper
def _enable_class(cls):
cls._old_init = cls.__init__
cls.__init__ = init_record_helper(cls.__init__)
def _disable_class(cls):
cls.__init__ = cls._old_init
delattr(cls, "_old_init")
def _init_subclass(cls, **kwargs):
cls.__init__ = init_record_helper(cls.__init__)
def substitute_init_recursively(cls, func, visited):
for subcls in cls.__subclasses__():
substitute_init_recursively(subcls, func, visited)
if subcls not in visited:
func(subcls)
visited.add(subcls)
try:
substitute_init_recursively(torch.nn.modules.module.Module, _enable_class, set())
torch.nn.modules.module.Module._old_init_subclass = torch.nn.modules.module.Module.__init_subclass__
torch.nn.modules.module.Module.__init_subclass__ = classmethod(_init_subclass)
# torch meta init
torch.device("meta").__enter__()
yield
finally:
substitute_init_recursively(torch.nn.modules.module.Module, _disable_class, set())
torch.nn.modules.module.Module.__init_subclass__ = torch.nn.modules.module.Module._old_init_subclass
delattr(torch.nn.modules.module.Module, "_old_init_subclass")
torch.device("meta").__exit__()
```
- `TPInfo` the name information for tensor paralle shard.
TPInfo
```python
# atorch/utils/manual_tp_utils.py
class TPInfo:
"""
Manual tensor parallel information class.
Example:
>>> gpt2_tpinfo = TPInfo()
>>> gpt2_tpinfo.shard_col({"attn.c_attn": {"stride": 3}}, "mlp.c_fc")
>>> gpt2_tpinfo.shard_row("attn.c_proj", "mlp.c_proj")
>>> gpt2_tpinfo.shard_vocab("wte")
>>> gpt2_tpinfo.replic_drop("resid_dropout", "mlp.dropout", "drop")
>>> gpt2_tpinfo.parallel_drop("attn_dropout")
>>> gpt2_tpinfo.shrink({".attn": {"embed_dim", "split_size", "num_heads"}})
>>> tp_manual_shard_custom_fn(meta_gpt2, gpt2_tpinfo)
"""
...
```
- Note: `shard_col/row/vocab` supports arguments in `Union[Dict[str, Dict], str]`, where `str` is the suffix of the unique module, the inner `Dict` is the initialization arguments of ATorchTPLayer module (RowParallelLinear /ColumnParallelLinear/ VocabParallelEmbedding).
- `DeepSpeed3DParallelConfig` configuration class
DeepSpeed3DParallelConfig
```python
# atorch/auto/opt_lib/ds_3d_parallel_optimization.py
class DeepSpeed3DParallelConfig:
def __init__(
self,
base_seed=1234,
tpinfo=None,
custom_patcher=None,
tie_first=True,
logit_helper=None,
ds_config=None,
batch_fn=None,
):
self.base_seed = base_seed
# TPinfo
self.tpinfo = tpinfo if tpinfo is not None else TPInfo()
# PipeModuleFromRecordedMeta
self.custom_patcher = custom_patcher if custom_patcher is not None else dict()
self.tie_first = tie_first
# logit helper
if self.tpinfo.is_vocab_parallelled:
if logit_helper is None:
self.logit_helper = vocab_parallel_logit_helper
else:
logger.warning("Tensor parallel is using VocabParallelEmb, make sure lm_output copied to group")
else:
self.logit_helper = logit_helper
# DeepSpeed config
self.ds_config = ds_config # dict() or path
self.batch_fn = batch_fn if batch_fn is not None else lambda x: x
```
- Note:
- `base_seed`: for initializing `MultiDimParallelRandomizer`.
- `tpinfo`: an instance of `TPInfo`.
- `custom_patcher`: `Optional[Dict[str, patch_fn]]`. `str` is the path_name of module to be patched, from the module_list that `PipeModuleFromRecordedMeta` traverses. `patch_fn` will patch the forward funciton when building the corresponding LayerSpec. GPT2 example:
gpt2_custom_patcher
```python
def gpt2_custom_patcher(cfg):
def wpe_patcher(fw, self):
@functools.wraps(fw)
def fw_wrapper(input):
assert (
isinstance(input, tuple) and len(input) == 3
), "input should be (hidden_states, position_ids, attention_mask)"
hidden_states, position_ids, attention_mask = input
position_embeddings = fw(position_ids)
hidden_states = hidden_states + position_embeddings
return hidden_states, attention_mask
return fw_wrapper
def h_patcher(fw, self):
@functools.wraps(fw)
def fw_wrapper(input):
assert isinstance(input, tuple) and len(input) == 2, "input should be (hidden_states, attention_mask)"
hidden_states, attention_mask = input
ori_attn_mask = attention_mask
attention_mask = attention_mask[:, None, None, :]
attention_mask = attention_mask.to(hidden_states.dtype) # fp16 compatibility
attention_mask = (1.0 - attention_mask) * torch.finfo(hidden_states.dtype).min
outputs = fw(hidden_states, attention_mask=attention_mask)
hidden_states = outputs[0]
return hidden_states, ori_attn_mask
return fw_wrapper
gpt2_custom_forward_patchers = {"wpe": wpe_patcher}
gpt2_custom_forward_patchers.update({f"h.{i}": h_patcher for i in range(cfg.n_layer)})
return gpt2_custom_forward_patchers
# Note: DeepSpeed needs to require_grad the float tensor passed in the middle, GPT2 h patcher converts the mask, and then returns the original int tensor mask in output.
```
- `tie_first`: whether to tie the first module in deepspeed pipeline (e.g. vocab embedding in transformer).
- `logit_helper`: take effect when `tie_first` is True. The helper function to compute logti on the tied embedding. `_default_logit_helper` will be used if None.
- `ds_config`: `Dict` for deepspeed config or `str` for json file path.
- `batch_fn`: input adaptation function for deepspeed pipeline (inputs,), (labels,).
### Scripts
- training file [ds_3d_llama2.py](ds_3d_llama2.py)
- launch script [ds_3d_llama2_entry.sh](ds_3d_llama2_entry.sh)
```bash
cd dlrover/atorch/examples/llama2
pip install -r requirements.txt
# Configurable environment variable:
# DATASET_PATH, MODEL_NAME_OR_PATH, PIPELINE_PARALLEL_SIZE, MODEL_PARALLEL_SIZE, etc.
# e.x. in a 8-gpu system, to run mp-2 dp-2 pp-2,
# use `PIPELINE_PARALLEL_SIZE=2 MODEL_PARALLEL_SIZE=2 bash ds_3d_llama2_entry.sh`
bash ds_3d_llama2_entry.sh
```
### Performance
- Model: Llama2-7b
- Cluster: 8 A100-80GB GPUs in each node, with NVLink and 800 Gb/s RDMA
| | Use LoRA | Num of GPUs | TP | PP | DP | Batch Per GPU | Grad Accu Steps | Global Batch Size | TFLOPs |HFU(%)| Mem Max allocated(MB) |
|-------|----------|------------:|---:|---:|---:|--------------:|----------------:|------------------:|-------:|------|-----------------------|
| FSDP | Yes | 8 | / | / | 8 | 4 | 1 | 32 | 177.90 | 57.0 | 33354.1 |
| FSDP | No | 8 | / | / | 8 | 4 | 1 | 32 | 204.67 | 65.6 | 39596.3 |
| FSDP | Yes | 16 | / | / | 16 | 4 | 1 | 64 | 165.03 | 52.9 | 31735.5 |
| FSDP | No | 16 | / | / | 16 | 4 | 1 | 64 | 197.22 | 63.2 | 34776.7 |
| FSDP | Yes | 32 | / | / | 32 | 4 | 1 | 128 | 164.04 | 52.6 | 30926.2 |
| FSDP | No | 32 | / | / | 32 | 4 | 1 | 128 | 196.04 | 62.8 | 32366.8 |
| FSDP | Yes | 64 | / | / | 64 | 4 | 1 | 256 | 160.69 | 51.5 | 30521.6 |
| FSDP | No | 64 | / | / | 64 | 4 | 1 | 256 | 195.14 | 62.5 | 31161.9 |
| DS 3D | / | 8 | 1 | 2 | 4 | 8 | 64 | 2048 | 190.27 | 61.0 | 40006.9 |
| DS 3D | / | 8 | 2 | 1 | 4 | 8 | 64 | 2048 | 159.19 | 51.0 | 38635.6 |
| DS 3D | / | 8 | 2 | 2 | 2 | 8 | 64 | 1024 | 155.73 | 49.9 | 31132.3 |
| DS 3D | / | 16 | 2 | 2 | 4 | 8 | 64 | 2048 | 154.84 | 49.6 | 26309.4 |
| DS 3D | / | 32 | 2 | 2 | 8 | 8 | 64 | 4096 | 154.44 | 49.5 | 23901.8 |
| DS 3D | / | 64 | 2 | 2 | 16 | 8 | 64 | 8192 | 153.93 | 49.3 | 22695.7 |
## Automatic Training Optimization
If the users are not sure which strategy would achieve the largest throughput, auto_accelerate is able to automatically searching the best strategy given models and hardware conditions. This is achieved using Bayesian Optimization (BO)implemented by [HEBO](https://github.com/huawei-noah/HEBO) aiming to find the strategy with largest training throughput efficiently. BO is a machine learning technique used for optimizing black-box functions that are expensive to evaluate. Specifically, BO learns the mapping from strategies to throughput using the data from dryrun, which runs few training steps. Moreover, BO recommends the potential high-throughput strategy to achieve the throughput from dryrun, and updates the mapping iteratively. This iterative process continues until the desired optimization criteria are met or a predefined budget is exhausted.
This fully-automatic training example is implemented by calling auto_accelerate API without load_strategy argument, so that BO algorithm would find the best optimization method combination to achieve the largest throughput.
### Fine-tuning or Pre-training
1. Fine-tuning:
Specify `--model_name_or_path`, and the script will load the `.bin` files within that directory.
```shell
python -m atorch.distributed.run --fault_tolerant --max_restarts=0 \
--nproc_per_node="$NUM_GPUS_PER_NODE" \
bayes_opt_sg_llama2.py \
--model_name_or_path $PRETRAINED_MODEL_DIR
```
2. Pre-training:
Specify `--config_name` and `--tokenizer_name`. The script will not load the `.bin` files within that directory, and instead initialize a model randomly using `config.json`.
```shell
python -m atorch.distributed.run --fault_tolerant --max_restarts=0 \
--nproc_per_node="$NUM_GPUS_PER_NODE" \
bayes_opt_sg_llama2.py \
--config_name $PRETRAINED_MODEL_DIR \
--tokenizer_name $PRETRAINED_MODEL_DIR \
```
### Scripts
- training file [bayes_opt_sg_llama2.py](bayes_opt_sg_llama2.py)
- launch script [bayes_opt_sg_llama2_entry.sh](bayes_opt_sg_llama2_entry.sh)
```bash
cd dlrover/atorch/examples/llama2
# Configurable environment variable: DATASET_PATH, MODEL_NAME_OR_PATH, PER_DEVICE_TRAIN_BATCH_SIZE, etc.
# Change BO_SG_MAX_IETR(the maximum search rounds of the BO), RANDOM_SAMPLE(the initial sampling steps of BO) if needed.
bash bayes_opt_sg_llama2_entry.sh
```
### Results
- Model: Llama2-7b
- Cluster: 8 A100-80GB (NVLink) GPUs in one node.
| batch size per gpu | Strategy | DryRun Throughput (samples/s) |
|:----------------------------|:-----------------:|:-------:|
| 4 | module_replace+ amp_native+zero2+ddp | 11.28 |
Strategy:
```
amp zero parallel_mode module_replace checkpoint
0 NotChosen NotChosen {"data": 8, "tensor": 1} NotChosen NotChosen
1 amp_native zero2_fsdp {"data": 8, "tensor": 1} module_replace checkpoint
2 amp_native fsdp {"data": 8, "tensor": 1} module_replace checkpoint
3 amp_native zero2_fsdp {"data": 8, "tensor": 1} module_replace checkpoint
4 amp_native fsdp {"data": 8, "tensor": 1} module_replace checkpoint
5 amp_native NotChosen {"data": 8, "tensor": 1} module_replace NotChosen
6 amp_native zero1 {"data": 8, "tensor": 1} module_replace checkpoint
7 NotChosen zero2 {"data": 8, "tensor": 1} NotChosen NotChosen
```
Corresponding Dryrun Throughput
```
[[ 0. ]
[11.28738554]
[11.15851216]
[11.27479499]
[11.16217471]
[ 0. ]
[ 0. ]
[ 0. ]
]
```
The zero throughput in the above means that OOM occurs due to the unappropriated strategy combination.