diff --git a/.github/workflows/ci-npu-mindspeed.yml b/.github/workflows/ci-npu-mindspeed.yml new file mode 100644 index 000000000..8405c86cb --- /dev/null +++ b/.github/workflows/ci-npu-mindspeed.yml @@ -0,0 +1,187 @@ +name: MindSpeed NPU Tests + +on: + workflow_dispatch: + inputs: + mindspeed_repo: + description: "MindSpeed git repository" + required: false + default: "https://github.com/ascend/MindSpeed.git" + mindspeed_ref: + description: "MindSpeed branch, tag, or ref to install" + required: false + default: "core_r0.16.0" + push: + branches: [main, npu_ci_all] + paths: + - ".github/workflows/ci-npu-mindspeed.yml" + - "mcore_adapter/**" + - "roll/third_party/megatron/**" + - "tests/third_party/megatron/**" + - "requirements_common.txt" + - "requirements_vision.txt" + - "setup.py" + - "pyproject.toml" + pull_request: + branches: [main, npu_ci_all] + paths: + - ".github/workflows/ci-npu-mindspeed.yml" + - "mcore_adapter/**" + - "roll/third_party/megatron/**" + - "tests/third_party/megatron/**" + - "requirements_common.txt" + - "requirements_vision.txt" + - "setup.py" + - "pyproject.toml" + +permissions: + contents: read + +concurrency: + group: ${{ github.workflow }}-${{ github.ref }} + cancel-in-progress: true + +jobs: + mindspeed-npu-test: + name: MindSpeed 0.16 Core NPU Tests + if: github.event_name != 'pull_request' || github.event.pull_request.head.repo.full_name == github.repository + runs-on: linux-aarch64-a3-8 + timeout-minutes: 90 + container: + image: quay.io/ascend/vllm-ascend:v0.18.0-a3 + env: + PIP_CACHE_DIR: ${{ github.workspace }}/.pip-cache + PIP_INDEX_URL: https://repo.huaweicloud.com/repository/pypi/simple + PIP_TRUSTED_HOST: repo.huaweicloud.com + HF_ENDPOINT: https://hf-mirror.com + PYTORCH_NPU_ALLOC_CONF: "expandable_segments:True" + TASK_QUEUE_ENABLE: "2" + VLLM_USE_V1: "1" + VLLM_ASCEND_ENABLE_FLASHCOMM: "0" + VLLM_ASCEND_ENABLE_NZ: "0" + MINDSPEED_REPO: ${{ github.event.inputs.mindspeed_repo || 'https://github.com/ascend/MindSpeed.git' }} + MINDSPEED_REF: ${{ github.event.inputs.mindspeed_ref || 'core_r0.16.0' }} + MINDSPEED_CACHE_KEY: "core-r0.16.0" + + steps: + - name: Checkout code + uses: actions/checkout@v4 + with: + submodules: recursive + + - name: Cache NPU pip packages + uses: actions/cache@v4 + with: + path: .pip-cache + key: ${{ runner.os }}-npu-mindspeed-${{ env.MINDSPEED_CACHE_KEY }}-${{ hashFiles('requirements_common.txt', 'requirements_vision.txt', 'mcore_adapter/pyproject.toml', 'mcore_adapter/requirements.txt', 'setup.py', 'pyproject.toml', '.github/workflows/ci-npu-mindspeed.yml') }} + restore-keys: | + ${{ runner.os }}-npu-mindspeed-${{ env.MINDSPEED_CACHE_KEY }}- + ${{ runner.os }}-npu-mindspeed- + ${{ runner.os }}-npu-pip- + + - name: Configure pip cache + run: | + mkdir -p "$PIP_CACHE_DIR" + + - name: Configure Ascend runtime + shell: bash + run: | + for env_file in \ + /usr/local/Ascend/ascend-toolkit/set_env.sh \ + /usr/local/Ascend/nnal/atb/set_env.sh; do + [ -f "${env_file}" ] && source "${env_file}" + done + + ASCEND_HOME_PATH="${ASCEND_HOME_PATH:-/usr/local/Ascend/ascend-toolkit/latest}" + ASCEND_TOOLKIT_HOME="${ASCEND_TOOLKIT_HOME:-${ASCEND_HOME_PATH}}" + ASCEND_OPP_PATH="${ASCEND_OPP_PATH:-${ASCEND_HOME_PATH}/opp}" + ASCEND_AICPU_PATH="${ASCEND_AICPU_PATH:-${ASCEND_HOME_PATH}}" + LD_LIBRARY_PATH="${ASCEND_HOME_PATH}/lib64:${ASCEND_HOME_PATH}/runtime/lib64:${ASCEND_HOME_PATH}/runtime/lib64/stub:${ASCEND_HOME_PATH}/tools/hccl/lib64:${ASCEND_HOME_PATH}/hccl/lib64:${LD_LIBRARY_PATH:-}" + + for path in \ + "${ASCEND_OPP_PATH}/built-in/op_impl/ai_core/tbe" \ + "${ASCEND_HOME_PATH}/python/site-packages"; do + [ -d "${path}" ] && PYTHONPATH="${path}:${PYTHONPATH:-}" + done + + { + echo "ASCEND_HOME_PATH=${ASCEND_HOME_PATH}" + echo "ASCEND_TOOLKIT_HOME=${ASCEND_TOOLKIT_HOME}" + echo "ASCEND_OPP_PATH=${ASCEND_OPP_PATH}" + echo "ASCEND_AICPU_PATH=${ASCEND_AICPU_PATH}" + echo "LD_LIBRARY_PATH=${LD_LIBRARY_PATH}" + echo "PYTHONPATH=${PYTHONPATH:-}" + } >> "${GITHUB_ENV}" + { + echo "${ASCEND_HOME_PATH}/bin" + echo "${ASCEND_HOME_PATH}/compiler/ccec_compiler/bin" + } >> "${GITHUB_PATH}" + + - name: Check NPU environment + run: | + python3 - <<'PY' + import importlib.util + + import torch + import torch_npu + + if importlib.util.find_spec("tbe") is None: + raise RuntimeError("CANN tbe Python module is not visible in PYTHONPATH") + if not torch.npu.is_available(): + raise RuntimeError("torch.npu.is_available() is False") + print(f"npu_device_count={torch.npu.device_count()}") + PY + + - name: Install ROLL requirements + shell: bash + run: | + python3 -m pip install --upgrade pip wheel + # torchair still imports pkg_resources; setuptools 82 removed it. + python3 -m pip install "setuptools<82" + python3 -m pip install --retries 10 --timeout 120 pytest-timeout + python3 -m pip install --retries 10 --timeout 120 -r requirements_common.txt + python3 -m pip install --retries 10 --timeout 120 deepspeed==0.16.4 tensorboard + python3 -m pip install "setuptools<82" + python3 -c "import pkg_resources" + + - name: Install MindSpeed core_r0.16.0 + shell: bash + run: | + set -eo pipefail + export MINDSPEED_SRC="/tmp/MindSpeed" + rm -rf "${MINDSPEED_SRC}" + git clone --depth 1 --branch "${MINDSPEED_REF}" "${MINDSPEED_REPO}" "${MINDSPEED_SRC}" + cd "${MINDSPEED_SRC}" + python3 -m pip install --no-build-isolation --no-deps -e . + + - name: Install ROLL + run: | + python3 -m pip install -e . + + - name: Prepare Megatron test model + shell: bash + run: | + set -eo pipefail + local_model="/data/cpfs_0/common/models/Qwen2.5-0.5B-Instruct" + if [ -d "${local_model}" ]; then + echo "ROLL_MEGATRON_TEST_MODEL=${local_model}" >> "${GITHUB_ENV}" + exit 0 + fi + + python3 - <<'PY' + import os + from huggingface_hub import snapshot_download + + model_path = snapshot_download("Qwen/Qwen2.5-0.5B-Instruct") + with open(os.environ["GITHUB_ENV"], "a", encoding="utf-8") as env_file: + env_file.write(f"ROLL_MEGATRON_TEST_MODEL={model_path}\n") + PY + + - name: Run MindSpeed Megatron offload tests + shell: bash + run: | + export PYTHONPATH="${GITHUB_WORKSPACE}/mcore_adapter/src:${GITHUB_WORKSPACE}:${PYTHONPATH:-}" + torchrun --standalone --nnodes=1 --nproc-per-node=2 \ + -m pytest -q --tb=short tests/third_party/megatron/test_offload_states.py + env: + ROLL_NPU_CI: "1" diff --git a/examples/ascend_examples/qwen3_4B_dpo_megatron.yaml b/examples/ascend_examples/qwen3_4B_dpo_megatron.yaml new file mode 100644 index 000000000..58f810b17 --- /dev/null +++ b/examples/ascend_examples/qwen3_4B_dpo_megatron.yaml @@ -0,0 +1,100 @@ +defaults: + - ../config/deepspeed_zero@_here_ + - ../config/deepspeed_zero2@_here_ + - ../config/deepspeed_zero3@_here_ + - ../config/deepspeed_zero3_cpuoffload@_here_ + +hydra: + run: + dir: . + output_subdir: null + +exp_name: "qwen3-4B-dpo-config" +seed: 42 +logging_dir: ./output/logs +output_dir: ./output +system_envs: + USE_MODELSCOPE: '1' + +checkpoint_config: + type: file_system + output_dir: ./ckpt + + +track_name: None + + +max_steps: 500 +save_steps: 500 +logging_steps: 1 +eval_steps: 100 +resume_from_checkpoint: false + +sequence_length: 512 +train_batch_size: 64 +val_batch_size: 64 + +# local_rank: -1 +num_nodes: 1 +num_gpus_per_node: 4 + +pretrain: Qwen/Qwen3-4B + +ipo: false +beta: 0.1 +label_smoothing: 0.0 + +chosen_key: chosen +rejected_key: rejected + +validation: + data_args: + template: qwen3 + file_name: data/comparison_gpt4_data_zh.json + +actor_train: + model_args: + disable_gradient_checkpointing: false + dtype: bf16 + model_type: ~ + training_args: + lr_scheduler_type: constant + learning_rate: 1.0e-6 + weight_decay: 0 + per_device_train_batch_size: 16 + gradient_accumulation_steps: 1 + warmup_steps: 20 + num_train_epochs: 10 + data_args: + template: qwen3 + file_name: + - data/comparison_gpt4_data_zh.json + dataset_dir: data + preprocessing_num_workers: 1 + strategy_args: + strategy_name: megatron_train + strategy_config: + tensor_model_parallel_size: 1 + pipeline_model_parallel_size: 1 + expert_model_parallel_size: 1 + use_distributed_optimizer: true + recompute_granularity: full + device_mapping: list(range(0,2)) + infer_batch_size: 16 + + +reference: + model_args: + disable_gradient_checkpointing: true + dtype: bf16 + model_type: ~ + data_args: + template: qwen3 + strategy_args: + strategy_name: megatron_infer + strategy_config: + tensor_model_parallel_size: 1 + pipeline_model_parallel_size: 1 + expert_model_parallel_size: 1 + device_mapping: list(range(2,4)) + infer_batch_size: 16 \ No newline at end of file diff --git a/mcore_adapter/src/mcore_adapter/initialize.py b/mcore_adapter/src/mcore_adapter/initialize.py index 7357bfa4c..2fb0959d5 100644 --- a/mcore_adapter/src/mcore_adapter/initialize.py +++ b/mcore_adapter/src/mcore_adapter/initialize.py @@ -1,18 +1,80 @@ import os import random +import sys +from typing import TYPE_CHECKING import numpy as np import torch from megatron.core import mpu, tensor_parallel from .platforms import current_platform -from .training_args import TrainingArguments from .utils import get_logger +if TYPE_CHECKING: + from .training_args import TrainingArguments + logger = get_logger(__name__) +_NPU_RUNTIME_BOOTSTRAPPED = False + + +def bootstrap_npu_runtime(): + global _NPU_RUNTIME_BOOTSTRAPPED + + if _NPU_RUNTIME_BOOTSTRAPPED or not current_platform.is_npu(): + return + + import torch_npu # noqa: F401 + + try: + import mindspeed.megatron_adaptor # noqa: F401 + except ImportError: + pass + + import megatron.core.tensor_parallel.random as meg_random + + if not hasattr(meg_random, "_npu_patched"): + meg_random.initialize_rng_tracker() + + def patched_set(new_state, device=-1, graph_safe=False): + torch.npu.set_rng_state(new_state) + return + + def patched_get(device="npu", clone=False, graph_safe=False): + return torch.npu.get_rng_state() + + meg_random._set_cuda_rng_state = patched_set + meg_random._get_cuda_rng_state = patched_get + + rng_state = torch.npu.get_rng_state() + meg_random._CUDA_RNG_STATE_TRACKER.states_["model-parallel-rng"] = rng_state + meg_random._CUDA_RNG_STATE_TRACKER.states_["data-parallel-rng"] = rng_state + + meg_random._npu_patched = True + + if not hasattr(torch.cuda, "_npu_patched"): + torch.cuda.current_device = lambda: torch.npu.current_device() + torch.cuda._npu_patched = True + + _NPU_RUNTIME_BOOTSTRAPPED = True + + +def apply_mindspeed_feature_defaults(config): + if "mindspeed.megatron_adaptor" not in sys.modules: + return + + try: + from mindspeed.args_utils import get_mindspeed_args + except ImportError: + return + + for name, value in vars(get_mindspeed_args(get_defaults=True)).items(): + if not hasattr(config, name): + setattr(config, name, value) + + def is_distribute_initialized(): return mpu.model_parallel_is_initialized() @@ -29,13 +91,14 @@ def _set_random_seed(seed_): random.seed(seed) np.random.seed(seed) torch.manual_seed(seed) - if current_platform.device_count() > 0: + if current_platform.is_cuda() and current_platform.device_count() > 0: tensor_parallel.model_parallel_cuda_manual_seed(seed) else: raise ValueError("Seed ({}) should be a positive integer.".format(seed)) def initialize_megatron(args: "TrainingArguments"): + bootstrap_npu_runtime() if not is_distribute_initialized(): _initialize_distributed(args) _set_random_seed(args.seed) diff --git a/mcore_adapter/src/mcore_adapter/models/model_config.py b/mcore_adapter/src/mcore_adapter/models/model_config.py index d7eecbf87..8a0c91434 100644 --- a/mcore_adapter/src/mcore_adapter/models/model_config.py +++ b/mcore_adapter/src/mcore_adapter/models/model_config.py @@ -16,7 +16,8 @@ from transformers.configuration_utils import CONFIG_NAME as HF_CONFIG_NAME from ..constants import HUGGINGFACE_AUTOMAP_CACHE, MCA_CONFIG_NAME -from ..initialize import initialize_megatron +from ..initialize import apply_mindspeed_feature_defaults, initialize_megatron +from ..platforms import current_platform from ..training_args import DistributingParallelArguments, TrainingArguments from ..utils import get_logger from .converter.template import get_template @@ -308,6 +309,9 @@ class McaModelConfig(TransformerConfig, PretrainedConfig): ) def __post_init__(self): + if current_platform.is_npu() and self.transformer_impl == "transformer_engine": + self.transformer_impl = "local" + if self.virtual_pipeline_model_parallel_size is None and self.overlap_p2p_comm: self.overlap_p2p_comm = False logger.warning("Non-interleaved pipeline parallelism does not support overlapping p2p communication!") @@ -385,6 +389,7 @@ def squared_relu(x): pipeline_model_parallel_size=self.pipeline_model_parallel_size, ) + apply_mindspeed_feature_defaults(self) super().__post_init__() pipeline_size = self.pipeline_model_parallel_size if self.virtual_pipeline_model_parallel_size is not None: diff --git a/mcore_adapter/src/mcore_adapter/models/model_factory.py b/mcore_adapter/src/mcore_adapter/models/model_factory.py index 36ea6b6a8..87089227f 100644 --- a/mcore_adapter/src/mcore_adapter/models/model_factory.py +++ b/mcore_adapter/src/mcore_adapter/models/model_factory.py @@ -44,6 +44,28 @@ logger = get_logger(__name__) +def _replace_with_rmsnorm(submodules, attr_name): + if not hasattr(submodules, attr_name): + return + norm = getattr(submodules, attr_name) + norm_name = norm.__name__ if isinstance(norm, type) else norm.__class__.__name__ + if not norm_name.endswith("RMSNorm"): + setattr(submodules, attr_name, RMSNorm) + + +def _apply_npu_rmsnorm_overrides(module_spec, qk_layernorm: bool = False, set_layer_norm: bool = False): + if set_layer_norm: + module_spec.layer_norm = RMSNorm + + _replace_with_rmsnorm(module_spec.submodules, "input_layernorm") + _replace_with_rmsnorm(module_spec.submodules, "pre_mlp_layernorm") + + self_attn = getattr(module_spec.submodules, "self_attention", None) + if qk_layernorm and hasattr(self_attn, "submodules"): + _replace_with_rmsnorm(self_attn.submodules, "q_layernorm") + _replace_with_rmsnorm(self_attn.submodules, "k_layernorm") + + class VirtualModels: # a wrapper for model list to support virtual pipeline model parallel def __init__(self, cls, config: "McaModelConfig", *args, **kwargs): @@ -385,14 +407,32 @@ def _get_transformer_layer_spec(self, config: Optional["McaModelConfig"] = None) transformer_block_spec.layer_norm = RMSNorm for transformer_layer_spec in transformer_block_spec.layer_specs: if not use_te and config.normalization == "RMSNorm": - transformer_layer_spec.submodules.input_layernorm = RMSNorm - transformer_layer_spec.submodules.pre_mlp_layernorm = RMSNorm + if current_platform.is_npu(): + _apply_npu_rmsnorm_overrides(transformer_layer_spec) + else: + transformer_layer_spec.submodules.input_layernorm = RMSNorm + transformer_layer_spec.submodules.pre_mlp_layernorm = RMSNorm return transformer_block_spec if use_te: return get_gpt_layer_with_transformer_engine_spec( config.num_moe_experts, config.moe_grouped_gemm, qk_layernorm=config.qk_layernorm ) else: + if current_platform.is_npu(): + module_spec = get_gpt_layer_local_spec( + config.num_moe_experts, + config.moe_grouped_gemm, + qk_layernorm=config.qk_layernorm, + normalization=config.normalization, + ) + if config.normalization == "RMSNorm": + _apply_npu_rmsnorm_overrides( + module_spec, + qk_layernorm=config.qk_layernorm, + set_layer_norm=True, + ) + return module_spec + module_spec = get_gpt_layer_local_spec( config.num_moe_experts, config.moe_grouped_gemm, qk_layernorm=config.qk_layernorm ) diff --git a/roll/distributed/strategy/megatron_strategy.py b/roll/distributed/strategy/megatron_strategy.py index a50d29b11..08d082f7d 100644 --- a/roll/distributed/strategy/megatron_strategy.py +++ b/roll/distributed/strategy/megatron_strategy.py @@ -529,6 +529,13 @@ def inner_forward_step(self, loss_func, data_iterator: Iterator[DataProto], mode else: input_ids = self._get_feature_on_this_cp_rank(input_ids, "input_ids") attention_mask = self._get_feature_on_this_cp_rank(attention_mask, "attention_mask") + + if hasattr(torch, "npu") and torch.npu.is_available() and attention_mask is not None: + attention_mask = attention_mask.bool() + B, S = attention_mask.shape + attention_mask = attention_mask[:, None, None, :] # [B,1,1,S] + attention_mask = attention_mask.expand(B, 1, S, S) # [B,1,S,S] + if labels is not None: labels = self._get_feature_on_this_cp_rank(labels, "labels") loss_mask = self._get_feature_on_this_cp_rank(loss_mask, "loss_mask") diff --git a/roll/pipeline/base_worker.py b/roll/pipeline/base_worker.py index 3e522bea7..e7daf3c67 100644 --- a/roll/pipeline/base_worker.py +++ b/roll/pipeline/base_worker.py @@ -457,8 +457,7 @@ async def offload_states_partial(self, target_dp_ranks: List[int]): # Verify offloaded workers have near-zero GPU memory usage if self.rank_info.dp_rank in target_dp_ranks: - import torch - gpu_memory_gb = torch.cuda.memory_allocated() / 1024**3 + gpu_memory_gb = current_platform.memory_allocated() / 1024**3 if gpu_memory_gb > 1.0: raise RuntimeError( f"GPU memory not properly offloaded for Worker {self.rank} (DP {self.rank_info.dp_rank}): " diff --git a/roll/platforms/__init__.py b/roll/platforms/__init__.py index 6869621f4..c9dff3f15 100644 --- a/roll/platforms/__init__.py +++ b/roll/platforms/__init__.py @@ -25,26 +25,31 @@ def _init_platform() -> Platform: Returns: An instance of a subclass of Platform corresponding to the detected hardware. """ + try: + import torch_npu # noqa: F401 + + if hasattr(torch, "npu") and torch.npu.is_available(): + logger.debug("Detected NPU (torch_npu). Initializing NPU platform.") + return NpuPlatform() + except ImportError: + pass + if torch.cuda.is_available(): device_name = torch.cuda.get_device_name().upper() logger.debug(f"Detected CUDA device: {device_name}") + if "NVIDIA" in device_name: logger.debug("Initializing CUDA platform (NVIDIA).") return CudaPlatform() elif "AMD" in device_name: logger.debug("Initializing ROCm platform (AMD).") return RocmPlatform() + logger.warning("Unrecognized CUDA device. Falling back to UnknownPlatform.") return UnknownPlatform() - else: - try: - import torch_npu # noqa: F401 - logger.debug("Detected torch_npu. Initializing NPU platform.") - return NpuPlatform() - except ImportError: - logger.debug("No supported accelerator detected. Initializing CPU platform.") - return CpuPlatform() + logger.debug("No supported accelerator detected. Initializing CPU platform.") + return CpuPlatform() # Global singleton representing the current platform in use. diff --git a/roll/third_party/megatron/offload_states_patch.py b/roll/third_party/megatron/offload_states_patch.py index 9009fae09..9804177e3 100644 --- a/roll/third_party/megatron/offload_states_patch.py +++ b/roll/third_party/megatron/offload_states_patch.py @@ -219,9 +219,7 @@ def move_ddp_model_params_tensor_to_device(optimizer: DistributedOptimizer, param_range = gbuf_range["param_map"][model_param]["param"] # fp16, bf16 params. - if model_param.type() in [f'torch.{current_platform.device_type}.HalfTensor', f'torch.{current_platform.device_type}.BFloat16Tensor', - 'torch.BFloat16Tensor', 'torch.HalfTensor'] or \ - (current_platform.device_type == "cuda" and model_param.type() in ['torch.HalfTensor', 'torch.BFloat16Tensor']): + if model_param.dtype in (torch.float16, torch.bfloat16): # Clone model -> main. shard_model_param = model_param.detach().view(-1)[param_range.start: param_range.end] @@ -233,7 +231,7 @@ def move_ddp_model_params_tensor_to_device(optimizer: DistributedOptimizer, len(shard_float16_params_this_group)] = shard_model_param shard_float16_params_this_group.append(shard_model_param) # fp32 params. - elif model_param.type() in [f'torch.{current_platform.device_type}.FloatTensor', 'torch.FloatTensor']: + elif model_param.dtype == torch.float32: shard_model_param = model_param.view(-1)[param_range.start: param_range.end] optimizer.shard_fp32_groups[group_index][ len(shard_fp32_params_this_group)].data = shard_model_param.data diff --git a/roll/third_party/megatron/optimizer.py b/roll/third_party/megatron/optimizer.py index 888dc7a87..4f860ac2e 100644 --- a/roll/third_party/megatron/optimizer.py +++ b/roll/third_party/megatron/optimizer.py @@ -69,9 +69,15 @@ def get_megatron_optimizer( optimizers = [] model_chunk_offset = 0 kwargs = {} - if "config_overrides" in inspect.signature(_get_param_groups_and_buffers).parameters: + _param_groups_sig = inspect.signature(_get_param_groups_and_buffers).parameters + if "config_overrides" in _param_groups_sig: # config_overrides is required in mcore-core>=0.16 - kwargs = {"config_overrides": None} + kwargs["config_overrides"] = None + if "no_weight_decay_cond" in _param_groups_sig: + # no_weight_decay_cond, scale_lr_cond, lr_mult are required in newer mcore versions + kwargs["no_weight_decay_cond"] = no_weight_decay_cond + kwargs["scale_lr_cond"] = scale_lr_cond + kwargs["lr_mult"] = lr_mult for dense_model_chunks, overlap_param_gather_with_optimizer_step in zip( all_dense_model_chunks, overlap_param_gather_with_optimizer_step_flags ): diff --git a/tests/distributed/strategy/test_vllm_strategy_beam_search.py b/tests/distributed/strategy/test_vllm_strategy_beam_search.py index d172f31bb..3e968de29 100644 --- a/tests/distributed/strategy/test_vllm_strategy_beam_search.py +++ b/tests/distributed/strategy/test_vllm_strategy_beam_search.py @@ -59,6 +59,7 @@ def _install_mock_vllm_modules(monkeypatch): mock_vllm.__version__ = "0.8.4" mock_vllm.RequestOutput = MockRequestOutput mock_vllm.SamplingParams = MockSamplingParams + mock_vllm.MockTokensPrompt = MockTokensPrompt sampling_params = ModuleType("vllm.sampling_params") sampling_params.RequestOutputKind = Mock() @@ -75,6 +76,7 @@ def _install_mock_vllm_modules(monkeypatch): inputs = ModuleType("vllm.inputs") inputs.__path__ = [] + inputs.TokensPrompt = MockTokensPrompt inputs_data = ModuleType("vllm.inputs.data") inputs_data.TokensPrompt = MockTokensPrompt diff --git a/tests/third_party/megatron/test_offload_states.py b/tests/third_party/megatron/test_offload_states.py index cb6416ed9..f3e1699f8 100644 --- a/tests/third_party/megatron/test_offload_states.py +++ b/tests/third_party/megatron/test_offload_states.py @@ -3,7 +3,6 @@ import pytest import torch -import torch.distributed as dist from roll.platforms import current_platform from megatron.core import DistributedDataParallel @@ -33,11 +32,25 @@ ) from roll.third_party.megatron.optimizer import get_megatron_optimizer +try: + # NPU patch + import mindspeed.megatron_adaptor +except ImportError: + pass + + +def _default_model_name(): + local_model = "/data/cpfs_0/common/models/Qwen2.5-0.5B-Instruct" + return os.environ.get( + "ROLL_MEGATRON_TEST_MODEL", + local_model if os.path.isdir(local_model) else "Qwen/Qwen2.5-0.5B-Instruct", + ) + class McaModelCreator: - def __init__(self, optimizer_type, model_name="/data/cpfs_0/common/models/Qwen2.5-0.5B-Instruct"): - self.model_name = model_name + def __init__(self, optimizer_type, model_name=None): + self.model_name = model_name or _default_model_name() if optimizer_type is None: self.megatron_train_args = TrainingArguments( output_dir="./output", @@ -128,7 +141,9 @@ def create_mca_infer_only(self): self.tokenizer = default_tokenizer_provider(model_args=self.model_args) self.model = default_actor_model_provider( - tokenizer=self.tokenizer, training_args=self.megatron_train_args, model_args=self.model_args + tokenizer=self.tokenizer, + training_args=self.megatron_train_args, + model_args=self.model_args, ) for module in self.model.get_models(): module.requires_grad_(False) @@ -141,8 +156,14 @@ def create_mca_model(self): self.tokenizer = default_tokenizer_provider(model_args=self.model_args) self.model = default_actor_model_provider( - tokenizer=self.tokenizer, training_args=self.megatron_train_args, model_args=self.model_args + tokenizer=self.tokenizer, + training_args=self.megatron_train_args, + model_args=self.model_args, + is_trainable=True, ) + for module in self.model.get_models(): + module.train() + module.requires_grad_(True) ddp_config = DistributedDataParallelConfig( grad_reduce_in_fp32=self.megatron_train_args.accumulate_allreduce_grads_in_fp32, @@ -188,26 +209,33 @@ def create_mca_model(self): bind_megatron_offload_states_func(optimizer=self.optimizer) if not isinstance(self.optimizer, ChainedOptimizer): - self.scheduler = get_scheduler( - "cosine", - optimizer=self.optimizer if self.optimizer is None else self.optimizer.optimizer, - num_warmup_steps=self.megatron_train_args.get_warmup_steps(self.megatron_train_args.max_steps), - num_training_steps=self.megatron_train_args.max_steps, - scheduler_specific_kwargs=self.megatron_train_args.lr_scheduler_kwargs, - ) - else: - lr_schedulers = [] - for opt in self.optimizer.chained_optimizers: - sch = get_scheduler( + base_optimizer = self.optimizer if self.optimizer is None else self.optimizer.optimizer + if base_optimizer is not None: + self.scheduler = get_scheduler( "cosine", - optimizer=opt if opt is None else opt.optimizer, + optimizer=base_optimizer, num_warmup_steps=self.megatron_train_args.get_warmup_steps(self.megatron_train_args.max_steps), num_training_steps=self.megatron_train_args.max_steps, scheduler_specific_kwargs=self.megatron_train_args.lr_scheduler_kwargs, ) - lr_schedulers.append(sch) + else: + lr_schedulers = [] + for opt in self.optimizer.chained_optimizers: + base_optimizer = opt if opt is None else opt.optimizer + if base_optimizer is None: + continue + lr_schedulers.append( + get_scheduler( + "cosine", + optimizer=base_optimizer, + num_warmup_steps=self.megatron_train_args.get_warmup_steps(self.megatron_train_args.max_steps), + num_training_steps=self.megatron_train_args.max_steps, + scheduler_specific_kwargs=self.megatron_train_args.lr_scheduler_kwargs, + ) + ) - self.scheduler = ChainedScheduler(lr_schedulers) + if lr_schedulers: + self.scheduler = ChainedScheduler(lr_schedulers) """ @@ -216,64 +244,24 @@ def create_mca_model(self): """ -def test_megatron_init_memory(): - MAX_NUM_OF_MEM_EVENTS_PER_SNAPSHOT: int = 100000 - torch.cuda.memory._record_memory_history( - max_entries=MAX_NUM_OF_MEM_EVENTS_PER_SNAPSHOT, - ) - +def test_megatron_dist_optimizer_offload_reload_smoke(): mca_model = McaModelCreator(optimizer_type="dist_optimizer") - - # buffer_data = [] - # for buffer in mca_model.optimizer.buffers: - # buffer_data.append(buffer.param_data.data.storage().data_ptr()) - - mca_model.optimizer.offload_states(include=[MegatronOffloadStateType.other_params], pin_memory=True) - - t0 = torch.randint(0, 100, (1024, 1024, 1024), device="cuda") - del t0 - - mca_model.optimizer.reload_states(include=[MegatronOffloadStateType.model_params]) - if dist.get_rank() == 0: - t0 = torch.randint(0, 100, (1024, 1024, 1024), device="cuda") - dump_file = f"./memory_dump/snapshot_megatron_init_offload_{os.environ['RANK']}.pickle" - os.makedirs(os.path.dirname(dump_file), exist_ok=True) - torch.cuda.memory._dump_snapshot(dump_file) - torch.cuda.memory._record_memory_history(enabled=None) - - # tensors_group_by_data_ptr = defaultdict(list) - # tensors = objgraph.by_type('Tensor') - # print(f"len(tensor)={len(tensors)}") - # for tensor in tensors: - # tensors_group_by_data_ptr[tensor.storage().data_ptr()].append(tensor) - # - # for buffer in buffer_data: - # objgraph.show_backrefs(tensors_group_by_data_ptr[buffer], max_depth=10, - # extra_ignore=[id(locals())], - # filename=f'/checkpoint/binary/ScaleAligner/memory_dump/buffer_data_group_tensors.param_data_{datetime.now().strftime("%Y%m%d-%H%M%S")}.png') - - -def test_megatron_init_ddp_memory(): - MAX_NUM_OF_MEM_EVENTS_PER_SNAPSHOT: int = 100000 - torch.cuda.memory._record_memory_history( - max_entries=MAX_NUM_OF_MEM_EVENTS_PER_SNAPSHOT, + run_model_dist_optimizer( + mca_model, + included_state=[MegatronOffloadStateType.other_params], + pin_memory=True, + non_blocking=True, ) - mca_model = McaModelCreator(optimizer_type=None) - - offload_megatron_no_grad_module(model_chunks=mca_model.model.get_models()) - t0 = torch.randint(0, 100, (1024, 1024, 1024), device="cuda") - del t0 - - reload_megatron_no_grad_module(model_chunks=mca_model.model.get_models()) - - if dist.get_rank() == 0: - t0 = torch.randint(0, 100, (1024, 1024, 1024), device="cuda") - dump_file = f"./memory_dump/snapshot_megatron_init_ddp_offload_{os.environ['RANK']}.pickle" - os.makedirs(os.path.dirname(dump_file), exist_ok=True) - torch.cuda.memory._dump_snapshot(dump_file) - torch.cuda.memory._record_memory_history(enabled=None) +def test_megatron_no_grad_module_offload_reload_smoke(): + mca_model = McaModelCreator(optimizer_type=None) + run_model_infer( + mca_model, + included_state=[MegatronOffloadStateType.model_params], + pin_memory=True, + non_blocking=True, + ) def check_devices(tensors: List[torch.Tensor], target_device) -> None: @@ -284,16 +272,32 @@ def check_devices(tensors: List[torch.Tensor], target_device) -> None: def check_tensors(expected_tensors: List[torch.Tensor], tensors: List[torch.Tensor]) -> None: for tensor_expected, tensor_restored in zip(expected_tensors, tensors): - assert torch.equal(tensor_expected, tensor_restored) + assert torch.equal(tensor_expected.to(tensor_restored.device), tensor_restored) + + +def current_device(): + return f"{current_platform.device_type}:{current_platform.current_device()}" + + +def prepare_model_batch(batch): + input_ids, token_attention_mask = batch + input_ids = input_ids.to(current_device()) + token_attention_mask = token_attention_mask.to(current_device()) + position_ids = torch.clip(torch.cumsum(token_attention_mask, dim=-1) - 1, min=0, max=None) + seq_length = input_ids.size(1) + causal_mask = torch.triu( + torch.ones((1, 1, seq_length, seq_length), dtype=torch.bool, device=input_ids.device), + diagonal=1, + ) + padding_mask = token_attention_mask[:, None, None, :].eq(0) + attention_mask = causal_mask | padding_mask + return input_ids, attention_mask, position_ids def run_model_infer(mca_model: McaModelCreator, included_state, pin_memory, non_blocking): with torch.no_grad(): for batch in mca_model.data_loader: - input_ids, attention_mask = batch - input_ids = input_ids.to("cuda") - attention_mask = attention_mask.to("cuda") - position_ids = torch.clip(torch.cumsum(attention_mask, dim=-1) - 1, min=0, max=None) + input_ids, attention_mask, position_ids = prepare_model_batch(batch) models = mca_model.model.get_models() for model in models: @@ -329,10 +333,7 @@ def run_model_dist_optimizer(mca_model: McaModelCreator, included_state, pin_mem assert isinstance(mca_model.optimizer, DistributedOptimizer) for batch in mca_model.data_loader: - input_ids, attention_mask = batch - input_ids = input_ids.to("cuda") - attention_mask = attention_mask.to("cuda") - position_ids = torch.clip(torch.cumsum(attention_mask, dim=-1) - 1, min=0, max=None) + input_ids, attention_mask, position_ids = prepare_model_batch(batch) models = mca_model.model.get_models() for model in models: @@ -534,10 +535,7 @@ def run_model_fp16_optimizer(mca_model: McaModelCreator, included_state, pin_mem assert isinstance(mca_model.optimizer, Float16OptimizerWithFloat16Params) for batch in mca_model.data_loader: - input_ids, attention_mask = batch - input_ids = input_ids.to("cuda") - attention_mask = attention_mask.to("cuda") - position_ids = torch.clip(torch.cumsum(attention_mask, dim=-1) - 1, min=0, max=None) + input_ids, attention_mask, position_ids = prepare_model_batch(batch) models = mca_model.model.get_models() for model in models: @@ -710,10 +708,7 @@ def run_model_fp32_optimizer(mca_model: McaModelCreator, included_state, pin_mem assert isinstance(mca_model.optimizer, FP32Optimizer) for batch in mca_model.data_loader: - input_ids, attention_mask = batch - input_ids = input_ids.to("cuda") - attention_mask = attention_mask.to("cuda") - position_ids = torch.clip(torch.cumsum(attention_mask, dim=-1) - 1, min=0, max=None) + input_ids, attention_mask, position_ids = prepare_model_batch(batch) models = mca_model.model.get_models() for model in models: