Compare commits

...

7 Commits

Author SHA1 Message Date
f4331d1b8b updated
Signed-off-by: Robert Shaw <robshaw@redhat.com>
2025-09-09 03:02:08 +00:00
7742eb6c59 updated
Signed-off-by: Robert Shaw <robshaw@redhat.com>
2025-09-09 02:59:39 +00:00
22a0070530 Bump actions/setup-python from 5.4.0 to 6.0.0 (#24414)
Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Co-authored-by: Russell Bryant <rbryant@redhat.com>
2025-09-09 02:54:58 +00:00
170129eb28 [gpt-oss] Harmony changes with container tool support (#23386)
Signed-off-by: zhiweiz <zhiweiz@fb.com>
Signed-off-by: Aaron Pham <contact@aarnphm.xyz>
Signed-off-by: Lu Fang <30275821+houseroad@users.noreply.github.com>
Co-authored-by: zhiweiz <zhiweiz@fb.com>
Co-authored-by: Aaron Pham <contact@aarnphm.xyz>
Co-authored-by: Simon Mo <simon.mo@hey.com>
Co-authored-by: Lu Fang <30275821+houseroad@users.noreply.github.com>
2025-09-08 19:03:50 -07:00
955c624915 [Bugfix][Wide EP] Fix redundant work when using DeepEP, TP Attn, and EP MoE (#24134)
Signed-off-by: Tyler Michael Smith <tlrmchlsmth@gmail.com>
2025-09-08 19:01:51 -07:00
4f87abdcc6 Update reviewers for modelopt related files (#24468) 2025-09-09 01:53:13 +00:00
6910b56da2 [CI] Add nightly multiarch manifests to dockerhub (#24102)
Signed-off-by: Sahithi Chigurupati <chigurupati.sahithi@gmail.com>
Signed-off-by: Simon Mo <simon.mo@hey.com>
Signed-off-by: simon-mo <simon.mo@hey.com>
Co-authored-by: Simon Mo <simon.mo@hey.com>
2025-09-09 01:18:09 +00:00
15 changed files with 443 additions and 88 deletions

View File

@ -149,3 +149,25 @@ steps:
- "docker push public.ecr.aws/q9t5s3a7/vllm-cpu-release-repo:$(buildkite-agent meta-data get release-version)"
env:
DOCKER_BUILDKIT: "1"
- label: "Build and publish nightly multi-arch image to DockerHub"
depends_on:
- create-multi-arch-manifest
if: build.env("NIGHTLY") == "1"
agents:
queue: cpu_queue_postmerge
commands:
- "aws ecr-public get-login-password --region us-east-1 | docker login --username AWS --password-stdin public.ecr.aws/q9t5s3a7"
- "docker pull public.ecr.aws/q9t5s3a7/vllm-release-repo:$BUILDKITE_COMMIT"
- "docker tag public.ecr.aws/q9t5s3a7/vllm-release-repo:$BUILDKITE_COMMIT vllm/vllm-openai:nightly"
- "docker tag public.ecr.aws/q9t5s3a7/vllm-release-repo:$BUILDKITE_COMMIT vllm/vllm-openai:nightly-$BUILDKITE_COMMIT"
- "docker push vllm/vllm-openai:nightly"
- "docker push vllm/vllm-openai:nightly-$BUILDKITE_COMMIT"
# Clean up old nightly builds (keep only last 14)
- "bash .buildkite/scripts/cleanup-nightly-builds.sh"
plugins:
- docker-login#v3.0.0:
username: vllmbot
password-env: DOCKERHUB_TOKEN
env:
DOCKER_BUILDKIT: "1"

View File

@ -0,0 +1,97 @@
#!/bin/bash
set -ex
# Clean up old nightly builds from DockerHub, keeping only the last 14 builds
# This script uses DockerHub API to list and delete old tags with "nightly-" prefix
# DockerHub API endpoint for vllm/vllm-openai repository
REPO_API_URL="https://hub.docker.com/v2/repositories/vllm/vllm-openai/tags"
# Get DockerHub token from environment
if [ -z "$DOCKERHUB_TOKEN" ]; then
echo "Error: DOCKERHUB_TOKEN environment variable is not set"
exit 1
fi
# Function to get all tags from DockerHub
get_all_tags() {
local page=1
local all_tags=""
while true; do
local response=$(curl -s -H "Authorization: Bearer $DOCKERHUB_TOKEN" \
"$REPO_API_URL?page=$page&page_size=100")
# Get both last_updated timestamp and tag name, separated by |
local tags=$(echo "$response" | jq -r '.results[] | select(.name | startswith("nightly-")) | "\(.last_updated)|\(.name)"')
if [ -z "$tags" ]; then
break
fi
all_tags="$all_tags$tags"$'\n'
page=$((page + 1))
done
# Sort by timestamp (newest first) and extract just the tag names
echo "$all_tags" | sort -r | cut -d'|' -f2
}
delete_tag() {
local tag_name="$1"
echo "Deleting tag: $tag_name"
local delete_url="https://hub.docker.com/v2/repositories/vllm/vllm-openai/tags/$tag_name"
local response=$(curl -s -X DELETE -H "Authorization: Bearer $DOCKERHUB_TOKEN" "$delete_url")
if echo "$response" | jq -e '.detail' > /dev/null 2>&1; then
echo "Warning: Failed to delete tag $tag_name: $(echo "$response" | jq -r '.detail')"
else
echo "Successfully deleted tag: $tag_name"
fi
}
# Get all nightly- prefixed tags, sorted by last_updated timestamp (newest first)
echo "Fetching all tags from DockerHub..."
all_tags=$(get_all_tags)
if [ -z "$all_tags" ]; then
echo "No tags found to clean up"
exit 0
fi
# Count total tags
total_tags=$(echo "$all_tags" | wc -l)
echo "Found $total_tags tags"
# Keep only the last 14 builds (including the current one)
tags_to_keep=14
tags_to_delete=$((total_tags - tags_to_keep))
if [ $tags_to_delete -le 0 ]; then
echo "No tags need to be deleted (only $total_tags tags found, keeping $tags_to_keep)"
exit 0
fi
echo "Will delete $tags_to_delete old tags, keeping the newest $tags_to_keep"
# Get tags to delete (skip the first $tags_to_keep tags)
tags_to_delete_list=$(echo "$all_tags" | tail -n +$((tags_to_keep + 1)))
if [ -z "$tags_to_delete_list" ]; then
echo "No tags to delete"
exit 0
fi
# Delete old tags
echo "Deleting old tags..."
while IFS= read -r tag; do
if [ -n "$tag" ]; then
delete_tag "$tag"
# Add a small delay to avoid rate limiting
sleep 1
fi
done <<< "$tags_to_delete_list"
echo "Cleanup completed successfully"

14
.github/mergify.yml vendored
View File

@ -273,6 +273,20 @@ pull_request_rules:
users:
- "sangstar"
- name: assign reviewer for modelopt changes
conditions:
- or:
- files~=^vllm/model_executor/layers/quantization/modelopt\.py$
- files~=^vllm/model_executor/layers/quantization/__init__\.py$
- files~=^tests/models/quantization/test_modelopt\.py$
- files~=^tests/quantization/test_modelopt\.py$
- files~=^tests/models/quantization/test_nvfp4\.py$
- files~=^docs/features/quantization/modelopt\.md$
actions:
assign:
users:
- "Edwardf0t1"
- name: remove 'needs-rebase' label when conflict is resolved
conditions:
- -conflict

View File

@ -16,7 +16,7 @@ jobs:
uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
- name: Set up Python
uses: actions/setup-python@42375524e23c412d93fb67b49958b491fce71c38 # v5.4.0
uses: actions/setup-python@e797f83bcb11b83ae66e0230d6156d7c80228e7c # v6.0.0
with:
python-version: '3.12'

View File

@ -17,7 +17,7 @@ jobs:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
- uses: actions/setup-python@42375524e23c412d93fb67b49958b491fce71c38 # v5.4.0
- uses: actions/setup-python@e797f83bcb11b83ae66e0230d6156d7c80228e7c # v6.0.0
with:
python-version: "3.12"
- run: echo "::add-matcher::.github/workflows/matchers/actionlint.json"

View File

@ -1,5 +1,7 @@
# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project
import asyncio
import contextlib
import json
import logging
from abc import ABC, abstractmethod
@ -57,9 +59,14 @@ class ConversationContext(ABC):
@abstractmethod
async def init_tool_sessions(self, tool_server: Optional[ToolServer],
exit_stack: AsyncExitStack) -> None:
exit_stack: AsyncExitStack,
request_id: str) -> None:
pass
@abstractmethod
async def cleanup_session(self) -> None:
raise NotImplementedError("Should not be called.")
class SimpleContext(ConversationContext):
@ -89,9 +96,13 @@ class SimpleContext(ConversationContext):
raise NotImplementedError("Should not be called.")
async def init_tool_sessions(self, tool_server: Optional[ToolServer],
exit_stack: AsyncExitStack) -> None:
exit_stack: AsyncExitStack,
request_id: str) -> None:
pass
async def cleanup_session(self) -> None:
raise NotImplementedError("Should not be called.")
class HarmonyContext(ConversationContext):
@ -103,6 +114,7 @@ class HarmonyContext(ConversationContext):
self._messages = messages
self.available_tools = available_tools
self._tool_sessions: dict[str, Union[ClientSession, Tool]] = {}
self.called_tools: set[str] = set()
self.parser = get_streamable_parser_for_assistant()
self.num_init_messages = len(messages)
@ -234,7 +246,8 @@ class HarmonyContext(ConversationContext):
last_msg = self.messages[-1]
recipient = last_msg.recipient
return recipient is not None and (recipient.startswith("browser.")
or recipient.startswith("python"))
or recipient.startswith("python") or
recipient.startswith("container."))
async def call_tool(self) -> list[Message]:
if not self.messages:
@ -248,6 +261,9 @@ class HarmonyContext(ConversationContext):
elif recipient.startswith("python"):
return await self.call_python_tool(
self._tool_sessions["python"], last_msg)
elif recipient.startswith("container."):
return await self.call_container_tool(
self._tool_sessions["container"], last_msg)
raise ValueError("No tool call found")
def render_for_completion(self) -> list[int]:
@ -256,6 +272,7 @@ class HarmonyContext(ConversationContext):
async def call_search_tool(self, tool_session: Union["ClientSession",
Tool],
last_msg: Message) -> list[Message]:
self.called_tools.add("browser")
if isinstance(tool_session, Tool):
return await tool_session.get_result(self)
tool_name = last_msg.recipient.split(".")[1]
@ -265,12 +282,16 @@ class HarmonyContext(ConversationContext):
content = TextContent(text=result_str)
author = Author(role=Role.TOOL, name=last_msg.recipient)
return [
Message(author=author, content=[content], recipient=Role.ASSISTANT)
Message(author=author,
content=[content],
recipient=Role.ASSISTANT,
channel=last_msg.channel)
]
async def call_python_tool(self, tool_session: Union["ClientSession",
Tool],
last_msg: Message) -> list[Message]:
self.called_tools.add("python")
if isinstance(tool_session, Tool):
return await tool_session.get_result(self)
param = {
@ -290,13 +311,63 @@ class HarmonyContext(ConversationContext):
]
async def init_tool_sessions(self, tool_server: Optional[ToolServer],
exit_stack: AsyncExitStack) -> None:
exit_stack: AsyncExitStack,
request_id: str) -> None:
if tool_server:
for tool_name in self.available_tools:
if tool_name not in self._tool_sessions:
self._tool_sessions[
tool_name] = await exit_stack.enter_async_context(
tool_server.new_session(tool_name))
tool_session = await exit_stack.enter_async_context(
tool_server.new_session(tool_name, request_id))
self._tool_sessions[tool_name] = tool_session
exit_stack.push_async_exit(self.cleanup_session)
async def call_container_tool(self, tool_session: Union["ClientSession",
Tool],
last_msg: Message) -> list[Message]:
"""
Call container tool. Expect this to be run in a stateful docker
with command line terminal.
The official container tool would at least
expect the following format:
- for tool name: exec
- args:
{
"cmd":List[str] "command to execute",
"workdir":optional[str] "current working directory",
"env":optional[object/dict] "environment variables",
"session_name":optional[str] "session name",
"timeout":optional[int] "timeout in seconds",
"user":optional[str] "user name",
}
"""
self.called_tools.add("container")
if isinstance(tool_session, Tool):
return await tool_session.get_result(self)
tool_name = last_msg.recipient.split(".")[1].split(" ")[0]
args = json.loads(last_msg.content[0].text)
result = await tool_session.call_tool(tool_name, args)
result_str = result.content[0].text
content = TextContent(text=result_str)
author = Author(role=Role.TOOL, name=last_msg.recipient)
return [
Message(author=author,
content=[content],
recipient=Role.ASSISTANT,
channel=last_msg.channel)
]
async def cleanup_session(self, *args, **kwargs) -> None:
"""Can be used as coro to used in __aexit__"""
async def cleanup_tool_session(tool_session):
if not isinstance(tool_session, Tool):
logger.info("Cleaning up tool session for %s",
tool_session._client_info)
with contextlib.suppress(Exception):
await tool_session.call_tool("cleanup_session", {})
await asyncio.gather(*(cleanup_tool_session(self._tool_sessions[tool])
for tool in self.called_tools))
class StreamingHarmonyContext(HarmonyContext):

View File

@ -16,11 +16,13 @@ from openai.types.responses.response_function_web_search import (
from openai.types.responses.response_reasoning_item import (
Content as ResponseReasoningTextContent)
from openai.types.responses.tool import Tool
from openai_harmony import (Author, Conversation, DeveloperContent,
HarmonyEncodingName, Message, ReasoningEffort,
Role, StreamableParser, SystemContent, TextContent,
ToolDescription, load_harmony_encoding)
from openai_harmony import (Author, ChannelConfig, Conversation,
DeveloperContent, HarmonyEncodingName, Message,
ReasoningEffort, Role, StreamableParser,
SystemContent, TextContent, ToolDescription,
load_harmony_encoding)
from vllm import envs
from vllm.entrypoints.openai.protocol import (ChatCompletionToolsParam,
ResponseInputOutputItem)
from vllm.utils import random_uuid
@ -33,6 +35,20 @@ REASONING_EFFORT = {
_harmony_encoding = None
# Builtin tools that should be included in the system message when
# they are available and requested by the user.
# Tool args are provided by MCP tool descriptions. Output
# of the tools are stringified.
BUILTIN_TOOLS = {
"web_search_preview",
"code_interpreter",
"container",
}
def has_custom_tools(tool_types: list[str]) -> bool:
return not set(tool_types).issubset(BUILTIN_TOOLS)
def get_encoding():
global _harmony_encoding
@ -48,10 +64,19 @@ def get_system_message(
start_date: Optional[str] = None,
browser_description: Optional[str] = None,
python_description: Optional[str] = None,
container_description: Optional[str] = None,
instructions: Optional[str] = None,
with_custom_tools: bool = False,
) -> Message:
sys_msg_content = SystemContent.new()
if model_identity is not None:
sys_msg_content = sys_msg_content.with_model_identity(model_identity)
if (instructions is not None
and envs.VLLM_GPT_OSS_HARMONY_SYSTEM_INSTRUCTIONS):
current_identity = sys_msg_content.model_identity
new_identity = (f'{current_identity}\n{instructions}'
if current_identity else instructions)
sys_msg_content = sys_msg_content.with_model_identity(new_identity)
if reasoning_effort is not None:
sys_msg_content = sys_msg_content.with_reasoning_effort(
REASONING_EFFORT[reasoning_effort])
@ -63,6 +88,14 @@ def get_system_message(
sys_msg_content = sys_msg_content.with_tools(browser_description)
if python_description is not None:
sys_msg_content = sys_msg_content.with_tools(python_description)
if container_description is not None:
sys_msg_content = sys_msg_content.with_tools(container_description)
if not with_custom_tools:
channel_config = sys_msg_content.channel_config
invalid_channel = "commentary"
new_config = ChannelConfig.require_channels(
[c for c in channel_config.valid_channels if c != invalid_channel])
sys_msg_content = sys_msg_content.with_channel_config(new_config)
sys_msg = Message.from_role_and_content(Role.SYSTEM, sys_msg_content)
return sys_msg
@ -86,14 +119,17 @@ def get_developer_message(
tools: Optional[list[Union[Tool, ChatCompletionToolsParam]]] = None,
) -> Message:
dev_msg_content = DeveloperContent.new()
if instructions is not None:
if (instructions is not None
and not envs.VLLM_GPT_OSS_HARMONY_SYSTEM_INSTRUCTIONS):
dev_msg_content = dev_msg_content.with_instructions(instructions)
if tools is not None:
function_tools: list[Union[Tool, ChatCompletionToolsParam]] = []
for tool in tools:
if tool.type in ("web_search_preview", "code_interpreter"):
if tool.type in ("web_search_preview", "code_interpreter",
"container"):
# These are built-in tools that are added to the system message.
pass
elif tool.type == "function":
function_tools.append(tool)
else:
@ -136,6 +172,8 @@ def parse_response_input(
TextContent(text=text_prefix + c["text"]) for c in content
]
msg = Message.from_role_and_contents(role, contents)
if role == "assistant":
msg = msg.with_channel("final")
elif response_msg["type"] == "function_call_output":
call_id = response_msg["call_id"]
call_response: Optional[ResponseFunctionToolCall] = None

View File

@ -44,8 +44,9 @@ from vllm.entrypoints.context import (ConversationContext, HarmonyContext,
SimpleContext, StreamingHarmonyContext)
from vllm.entrypoints.harmony_utils import (
get_developer_message, get_stop_tokens_for_assistant_actions,
get_system_message, get_user_message, parse_output_message,
parse_remaining_state, parse_response_input, render_for_completion)
get_system_message, get_user_message, has_custom_tools,
parse_output_message, parse_remaining_state, parse_response_input,
render_for_completion)
from vllm.entrypoints.logger import RequestLogger
# yapf conflicts with isort for this block
# yapf: disable
@ -266,6 +267,8 @@ class OpenAIServingResponses(OpenAIServing):
builtin_tool_list.append("browser")
if self.tool_server.has_tool("python"):
builtin_tool_list.append("python")
if self.tool_server.has_tool("container"):
builtin_tool_list.append("container")
if self.tool_server is not None:
available_tools = builtin_tool_list
@ -448,7 +451,8 @@ class OpenAIServingResponses(OpenAIServing):
async with AsyncExitStack() as exit_stack:
try:
await context.init_tool_sessions(self.tool_server, exit_stack)
await context.init_tool_sessions(self.tool_server, exit_stack,
request.request_id)
async for _ in result_generator:
pass
except asyncio.CancelledError:
@ -710,13 +714,21 @@ class OpenAIServingResponses(OpenAIServing):
# New conversation.
reasoning_effort = (request.reasoning.effort
if request.reasoning else None)
# Temporary: OpenAI types doesn't have container tool
# so we used MCP to cover that, up for change
tool_types = [tool.type for tool in request.tools]
if envs.VLLM_GPT_OSS_USE_CONTAINER_TOOL:
tool_types.append("container")
enable_browser = ("web_search_preview" in tool_types
and self.tool_server is not None
and self.tool_server.has_tool("browser"))
enable_code_interpreter = ("code_interpreter" in tool_types
and self.tool_server is not None
and self.tool_server.has_tool("python"))
enable_container = ("container" in tool_types
and self.tool_server is not None
and self.tool_server.has_tool("container"))
with_custom_tools = has_custom_tools(tool_types)
sys_msg = get_system_message(
reasoning_effort=reasoning_effort,
browser_description=self.tool_server.get_tool_description(
@ -725,11 +737,17 @@ class OpenAIServingResponses(OpenAIServing):
python_description=self.tool_server.get_tool_description(
"python") if enable_code_interpreter
and self.tool_server is not None else None,
container_description=self.tool_server.get_tool_description(
"container")
if enable_container and self.tool_server is not None else None,
instructions=request.instructions,
with_custom_tools=with_custom_tools,
)
messages.append(sys_msg)
dev_msg = get_developer_message(request.instructions,
request.tools)
messages.append(dev_msg)
if with_custom_tools:
dev_msg = get_developer_message(
instructions=request.instructions, tools=request.tools)
messages.append(dev_msg)
else:
# Continue the previous conversation.
# FIXME(woosuk): Currently, request params like reasoning and
@ -1613,7 +1631,8 @@ class OpenAIServingResponses(OpenAIServing):
async with AsyncExitStack() as exit_stack:
processer = None
if self.use_harmony:
await context.init_tool_sessions(self.tool_server, exit_stack)
await context.init_tool_sessions(self.tool_server, exit_stack,
request.request_id)
processer = self._process_harmony_streaming_events
else:
processer = self._process_simple_streaming_events

View File

@ -86,7 +86,8 @@ class ToolServer(ABC):
pass
@abstractmethod
def new_session(self, tool_name: str) -> AbstractAsyncContextManager[Any]:
def new_session(self, tool_name: str,
session_id: str) -> AbstractAsyncContextManager[Any]:
"""
Create a session for the tool.
"""
@ -124,7 +125,8 @@ class MCPToolServer(ToolServer):
description=tool.description,
parameters=tool.inputSchema)
for tool in list_tools_response.tools
])
],
)
self.harmony_tool_descriptions[tool_from_mcp.name] = tool_from_mcp
if tool_from_mcp.name not in self.urls:
self.urls[tool_from_mcp.name] = url
@ -142,14 +144,16 @@ class MCPToolServer(ToolServer):
return self.harmony_tool_descriptions.get(tool_name)
@asynccontextmanager
async def new_session(self, tool_name: str):
async def new_session(self, tool_name: str, session_id: str):
from mcp import ClientSession
from mcp.client.sse import sse_client
url = self.urls.get(tool_name)
headers = {"x-session-id": session_id}
if not url:
raise KeyError(f"Tool '{tool_name}' is not supported")
async with sse_client(url=url) as streams, ClientSession(
*streams) as session:
async with sse_client(url=url,
headers=headers) as streams, ClientSession(
*streams) as session:
await session.initialize()
yield session
@ -182,7 +186,7 @@ class DemoToolServer(ToolServer):
raise ValueError(f"Unknown tool {tool_name}")
@asynccontextmanager
async def new_session(self, tool_name: str):
async def new_session(self, tool_name: str, session_id: str):
if tool_name not in self.tools:
raise KeyError(f"Tool '{tool_name}' is not supported")
yield self.tools[tool_name]

View File

@ -168,6 +168,8 @@ if TYPE_CHECKING:
VLLM_ALLREDUCE_USE_SYMM_MEM: bool = False
VLLM_TUNED_CONFIG_FOLDER: Optional[str] = None
VLLM_DISABLE_PAD_FOR_CUDAGRAPH: bool = False
VLLM_GPT_OSS_USE_CONTAINER_TOOL: bool = False
VLLM_GPT_OSS_HARMONY_SYSTEM_INSTRUCTIONS: bool = False
VLLM_CUSTOM_SCOPES_FOR_PROFILING: bool = False
@ -1201,6 +1203,15 @@ environment_variables: dict[str, Callable[[], Any]] = {
"VLLM_TUNED_CONFIG_FOLDER":
lambda: os.getenv("VLLM_TUNED_CONFIG_FOLDER", None),
# Allows vllm use container tool
"VLLM_GPT_OSS_USE_CONTAINER_TOOL":
lambda: bool(int(os.getenv("VLLM_GPT_OSS_USE_CONTAINER_TOOL", "0"))),
# Allows harmony instructions to be injected on system messages
"VLLM_GPT_OSS_HARMONY_SYSTEM_INSTRUCTIONS":
lambda: bool(
int(os.getenv("VLLM_GPT_OSS_HARMONY_SYSTEM_INSTRUCTIONS", "0"))),
# Add optional custom scopes for profiling, disable to avoid overheads
"VLLM_CUSTOM_SCOPES_FOR_PROFILING":
lambda: bool(int(os.getenv("VLLM_CUSTOM_SCOPES_FOR_PROFILING", "0"))),

View File

@ -35,7 +35,7 @@ from vllm.model_executor.layers.quantization.base_config import (
from vllm.model_executor.utils import set_weight_attrs
from vllm.platforms import current_platform
from vllm.platforms.interface import CpuArchEnum
from vllm.utils import (direct_register_custom_op, has_deep_ep, has_pplx,
from vllm.utils import (cdiv, direct_register_custom_op, has_deep_ep, has_pplx,
round_up)
if current_platform.is_cuda_alike():
@ -786,6 +786,7 @@ class FusedMoE(CustomOp):
enable_eplb: bool = False,
num_redundant_experts: int = 0,
has_bias: bool = False,
is_sequence_parallel=False,
):
super().__init__()
if params_dtype is None:
@ -797,6 +798,10 @@ class FusedMoE(CustomOp):
dp_size_ = (dp_size
if dp_size is not None else get_dp_group().world_size)
self.is_sequence_parallel = is_sequence_parallel
if self.is_sequence_parallel:
self.sp_size = tp_size_
vllm_config = get_current_vllm_config()
self.moe_parallel_config: FusedMoEParallelConfig = (
FusedMoEParallelConfig.make(
@ -1699,14 +1704,22 @@ class FusedMoE(CustomOp):
ctx = get_forward_context()
# flashinfer_cutlass_kernels can handle: optional DP + TP/EP
max_tokens_across_dp = ctx.dp_metadata.max_tokens_across_dp_cpu
max_tokens_across_dispatchers = ctx.dp_metadata.max_tokens_across_dp_cpu
moe_dp_chunk_size_per_rank = self.moe_config.max_num_tokens
# If the input to the MoE is sequence parallel then divide by sp_size
# to find the maximum number of tokens for any individual dispatcher.
if self.is_sequence_parallel:
max_tokens_across_dispatchers = cdiv(max_tokens_across_dispatchers,
self.sp_size)
num_tokens = full_hidden_states.size(0)
for chunk_idx, chunk_start_ in enumerate(
range(0, max_tokens_across_dp, moe_dp_chunk_size_per_rank)):
range(0, max_tokens_across_dispatchers,
moe_dp_chunk_size_per_rank)):
chunk_start = chunk_start_
chunk_end = min(chunk_start + moe_dp_chunk_size_per_rank,
max_tokens_across_dp)
max_tokens_across_dispatchers)
# clamp start and end
chunk_start = min(chunk_start, num_tokens - 1)
chunk_end = min(chunk_end, num_tokens)

View File

@ -37,8 +37,6 @@ class DeepseekV2Model(nn.Module):
super().__init__()
self.config = vllm_config. \
speculative_config.draft_model_config.hf_config
model_config = vllm_config.model_config
cache_config = vllm_config.cache_config
quant_config = vllm_config.quant_config
self.vocab_size = self.config.vocab_size
@ -51,11 +49,8 @@ class DeepseekV2Model(nn.Module):
self.layers = nn.ModuleList([
DeepseekV2DecoderLayer(
self.config,
vllm_config,
prefix=maybe_prefix(prefix, f"layers.{i + start_layer_id}"),
model_config=model_config,
cache_config=cache_config,
quant_config=quant_config,
) for i in range(self.config.num_hidden_layers)
])

View File

@ -7,7 +7,7 @@ import torch
import torch.nn as nn
from transformers import PretrainedConfig
from vllm.config import CacheConfig, ModelConfig, VllmConfig
from vllm.config import VllmConfig
from vllm.model_executor.layers.fused_moe import FusedMoE
from vllm.model_executor.layers.layernorm import RMSNorm
from vllm.model_executor.layers.logits_processor import LogitsProcessor
@ -43,23 +43,19 @@ class SharedHead(nn.Module):
class DeepSeekMultiTokenPredictorLayer(nn.Module):
def __init__(
self,
config: PretrainedConfig,
prefix: str,
model_config: ModelConfig,
cache_config: Optional[CacheConfig] = None,
quant_config: Optional[QuantizationConfig] = None,
) -> None:
def __init__(self, vllm_config: VllmConfig, prefix: str) -> None:
super().__init__()
config = vllm_config.model_config.hf_config
quant_config = vllm_config.quant_config
self.enorm = RMSNorm(config.hidden_size, eps=config.rms_norm_eps)
self.hnorm = RMSNorm(config.hidden_size, eps=config.rms_norm_eps)
self.eh_proj = nn.Linear(config.hidden_size * 2,
config.hidden_size,
bias=False)
self.shared_head = SharedHead(config=config, quant_config=quant_config)
self.mtp_block = DeepseekV2DecoderLayer(config, prefix, model_config,
cache_config, quant_config)
self.mtp_block = DeepseekV2DecoderLayer(vllm_config, prefix)
def forward(
self,
@ -95,13 +91,8 @@ class DeepSeekMultiTokenPredictor(nn.Module):
# to map the exact layer index from weights
self.layers = torch.nn.ModuleDict({
str(idx):
DeepSeekMultiTokenPredictorLayer(
config,
f"{prefix}.layers.{idx}",
model_config=vllm_config.model_config,
cache_config=vllm_config.cache_config,
quant_config=vllm_config.quant_config,
)
DeepSeekMultiTokenPredictorLayer(vllm_config,
f"{prefix}.layers.{idx}")
for idx in range(self.mtp_start_layer_idx,
self.mtp_start_layer_idx + self.num_mtp_layers)
})

View File

@ -32,12 +32,14 @@ import torch
from torch import nn
from transformers import DeepseekV2Config, DeepseekV3Config
import vllm.envs as envs
from vllm.attention import Attention
from vllm.compilation.decorators import support_torch_compile
from vllm.config import (CacheConfig, ModelConfig, VllmConfig,
get_current_vllm_config)
from vllm.config import CacheConfig, ParallelConfig, VllmConfig
from vllm.distributed import (get_ep_group, get_pp_group,
get_tensor_model_parallel_world_size)
get_tensor_model_parallel_rank,
get_tensor_model_parallel_world_size,
tensor_model_parallel_all_gather)
from vllm.model_executor.layers.activation import SiluAndMul
from vllm.model_executor.layers.fused_moe import FusedMoE
from vllm.model_executor.layers.layernorm import RMSNorm
@ -55,7 +57,9 @@ from vllm.model_executor.layers.vocab_parallel_embedding import (
from vllm.model_executor.model_loader.weight_utils import (
default_weight_loader, maybe_remap_kv_scale_name)
from vllm.model_executor.sampling_metadata import SamplingMetadata
from vllm.platforms import current_platform
from vllm.sequence import IntermediateTensors
from vllm.utils import cdiv, direct_register_custom_op
from .interfaces import MixtureOfExperts, SupportsLoRA, SupportsPP
from .utils import (PPMissingLayer, is_pp_missing_parameter,
@ -72,19 +76,27 @@ class DeepseekV2MLP(nn.Module):
hidden_act: str,
quant_config: Optional[QuantizationConfig] = None,
reduce_results: bool = True,
is_sequence_parallel=False,
prefix: str = "",
) -> None:
super().__init__()
# If is_sequence_parallel, the input and output tensors are sharded
# across the ranks within the tp_group. In this case the weights are
# replicated and no collective ops are needed.
# Otherwise we use standard TP with an allreduce at the end.
self.gate_up_proj = MergedColumnParallelLinear(
hidden_size, [intermediate_size] * 2,
bias=False,
quant_config=quant_config,
disable_tp=is_sequence_parallel,
prefix=f"{prefix}.gate_up_proj")
self.down_proj = RowParallelLinear(intermediate_size,
hidden_size,
bias=False,
quant_config=quant_config,
reduce_results=reduce_results,
disable_tp=is_sequence_parallel,
prefix=f"{prefix}.down_proj")
if hidden_act != "silu":
raise ValueError(f"Unsupported activation: {hidden_act}. "
@ -98,17 +110,58 @@ class DeepseekV2MLP(nn.Module):
return x
# Chunk x along the num_tokens axis for sequence parallelism
# NOTE: This is wrapped in a torch custom op to work around the following issue:
# The output tensor can have a sequence length 0 at small input sequence lengths
# even though we explicitly pad to avoid this.
def sequence_parallel_chunk(x: torch.Tensor) -> torch.Tensor:
tp_size = get_tensor_model_parallel_world_size()
tp_rank = get_tensor_model_parallel_rank()
# all_gather needs the sequence length to be divisible by tp_size
seq_len = x.size(0)
remainder = seq_len % tp_size
if remainder != 0:
pad_len = tp_size - remainder
x = nn.functional.pad(x, (0, 0, 0, pad_len))
chunk = x.shape[0] // tp_size
start = tp_rank * chunk
return torch.narrow(x, 0, start, chunk)
def sequence_parallel_chunk_fake(x: torch.Tensor) -> torch.Tensor:
tp_size = get_tensor_model_parallel_world_size()
seq_len = cdiv(x.size(0), tp_size)
shape = list(x.shape)
shape[0] = seq_len
out = torch.empty(shape, dtype=x.dtype, device=x.device)
return out
direct_register_custom_op(
op_name="sequence_parallel_chunk",
op_func=sequence_parallel_chunk,
mutates_args=[],
fake_impl=sequence_parallel_chunk_fake,
dispatch_key=current_platform.dispatch_key,
tags=(torch.Tag.needs_fixed_stride_order, ),
)
class DeepseekV2MoE(nn.Module):
def __init__(
self,
config: Union[DeepseekV2Config, DeepseekV3Config],
parallel_config: ParallelConfig,
quant_config: Optional[QuantizationConfig] = None,
prefix: str = "",
enable_eplb: bool = False,
):
super().__init__()
self.tp_size = get_tensor_model_parallel_world_size()
self.tp_rank = get_tensor_model_parallel_rank()
self.routed_scaling_factor = config.routed_scaling_factor
self.ep_group = get_ep_group().device_group
@ -117,6 +170,21 @@ class DeepseekV2MoE(nn.Module):
self.n_routed_experts: int = config.n_routed_experts
self.n_shared_experts: int = config.n_shared_experts
# The all_reduce at the end of attention (during o_proj) means that
# inputs are replicated across each rank of the tensor parallel group.
# If using expert-parallelism with DeepEP All2All ops, replicated
# tokens results in useless duplicate computation and communication.
#
# In this case, ensure the input to the experts is sequence parallel
# to avoid the excess work.
#
# Not needed for pplx-kernels as it can handle duplicate input tokens.
self.is_sequence_parallel = (envs.VLLM_ALL2ALL_BACKEND
in ("deepep_high_throughput",
"deepep_low_latency")
and parallel_config.enable_expert_parallel
and self.tp_size > 1)
if config.hidden_act != "silu":
raise ValueError(f"Unsupported activation: {config.hidden_act}. "
"Only silu is supported for now.")
@ -133,9 +201,8 @@ class DeepseekV2MoE(nn.Module):
self.gate.e_score_correction_bias = None
# Load balancing settings.
vllm_config = get_current_vllm_config()
eplb_config = vllm_config.parallel_config.eplb_config
self.enable_eplb = enable_eplb
eplb_config = parallel_config.eplb_config
self.enable_eplb = parallel_config.enable_eplb
self.n_redundant_experts = eplb_config.num_redundant_experts
self.n_logical_experts = self.n_routed_experts
@ -166,7 +233,9 @@ class DeepseekV2MoE(nn.Module):
routed_scaling_factor=1.0,
e_score_correction_bias=self.gate.e_score_correction_bias,
enable_eplb=self.enable_eplb,
num_redundant_experts=self.n_redundant_experts)
num_redundant_experts=self.n_redundant_experts,
is_sequence_parallel=self.is_sequence_parallel,
)
self.shared_experts = None
else:
intermediate_size = (config.moe_intermediate_size *
@ -177,6 +246,7 @@ class DeepseekV2MoE(nn.Module):
intermediate_size=intermediate_size,
hidden_act=config.hidden_act,
quant_config=quant_config,
is_sequence_parallel=self.is_sequence_parallel,
reduce_results=False,
prefix=f"{prefix}.shared_experts",
)
@ -199,11 +269,22 @@ class DeepseekV2MoE(nn.Module):
routed_scaling_factor=1.0,
e_score_correction_bias=self.gate.e_score_correction_bias,
enable_eplb=self.enable_eplb,
num_redundant_experts=self.n_redundant_experts)
num_redundant_experts=self.n_redundant_experts,
is_sequence_parallel=self.is_sequence_parallel,
)
def forward(self, hidden_states: torch.Tensor) -> torch.Tensor:
num_tokens, hidden_dim = hidden_states.shape
hidden_states = hidden_states.view(-1, hidden_dim)
# Chunk the hidden states so they aren't replicated across TP ranks.
# This avoids duplicate computation in self.experts.
# TODO: We can replace the all_reduce at the end of attn with a
# reduce_scatter instead of chunking here.
if self.is_sequence_parallel:
hidden_states = torch.ops.vllm.sequence_parallel_chunk(
hidden_states)
# router_logits: (num_tokens, n_experts)
router_logits, _ = self.gate(hidden_states)
@ -228,7 +309,11 @@ class DeepseekV2MoE(nn.Module):
assert shared_output is not None
final_hidden_states += shared_output
if self.tp_size > 1:
if self.is_sequence_parallel:
final_hidden_states = tensor_model_parallel_all_gather(
final_hidden_states, 0)
final_hidden_states = final_hidden_states[:num_tokens]
elif self.tp_size > 1:
final_hidden_states = (
self.experts.maybe_all_reduce_tensor_model_parallel(
final_hidden_states))
@ -532,16 +617,15 @@ class DeepseekV2MLAAttention(nn.Module):
class DeepseekV2DecoderLayer(nn.Module):
def __init__(
self,
config: Union[DeepseekV2Config, DeepseekV3Config],
prefix: str,
model_config: ModelConfig,
cache_config: Optional[CacheConfig] = None,
quant_config: Optional[QuantizationConfig] = None,
enable_eplb: bool = False,
) -> None:
def __init__(self, vllm_config: VllmConfig, prefix: str) -> None:
super().__init__()
config = vllm_config.model_config.hf_config
model_config = vllm_config.model_config
cache_config = vllm_config.cache_config
quant_config = vllm_config.quant_config
parallel_config = vllm_config.parallel_config
self.hidden_size = config.hidden_size
rope_theta = getattr(config, "rope_theta", 10000)
rope_scaling = getattr(config, "rope_scaling", None)
@ -578,9 +662,9 @@ class DeepseekV2DecoderLayer(nn.Module):
and layer_idx % config.moe_layer_freq == 0):
self.mlp = DeepseekV2MoE(
config=config,
parallel_config=parallel_config,
quant_config=quant_config,
prefix=f"{prefix}.mlp",
enable_eplb=enable_eplb,
)
else:
self.mlp = DeepseekV2MLP(
@ -650,10 +734,7 @@ class DeepseekV2Model(nn.Module):
super().__init__()
config = vllm_config.model_config.hf_config
model_config = vllm_config.model_config
cache_config = vllm_config.cache_config
quant_config = vllm_config.quant_config
enable_eplb = vllm_config.parallel_config.enable_eplb
self.config = config
self.vocab_size = config.vocab_size
@ -669,14 +750,7 @@ class DeepseekV2Model(nn.Module):
self.start_layer, self.end_layer, self.layers = make_layers(
config.num_hidden_layers,
lambda prefix: DeepseekV2DecoderLayer(
config,
prefix,
model_config=model_config,
cache_config=cache_config,
quant_config=quant_config,
enable_eplb=enable_eplb,
),
lambda prefix: DeepseekV2DecoderLayer(vllm_config, prefix),
prefix=f"{prefix}.layers")
if get_pp_group().is_last_rank:

View File

@ -641,7 +641,13 @@ class WorkerProc:
def worker_busy_loop(self, cancel: Optional[threading.Event] = None):
"""Main busy loop for Multiprocessing Workers"""
import os, psutil
p = psutil.Process(os.getpid())
i = 0
while True:
if i % 100 == 0:
logger.info("WorkerProc RSS MB: %d", p.memory_info().rss/1024/1024)
i += 1
method, args, kwargs, output_rank = self.rpc_broadcast_mq.dequeue(
cancel=cancel)
try: