[Performance] V1 Classify Models E2E Performance Optimization (#23541)
Signed-off-by: wang.yuqi <noooop@126.com>
This commit is contained in:
@ -62,3 +62,9 @@ def test_encode_api(llm: LLM):
|
||||
err_msg = "pooling_task must be one of.+"
|
||||
with pytest.raises(ValueError, match=err_msg):
|
||||
llm.encode(prompts, use_tqdm=False)
|
||||
|
||||
|
||||
def test_score_api(llm: LLM):
|
||||
err_msg = "Score API is only enabled for num_labels == 1."
|
||||
with pytest.raises(ValueError, match=err_msg):
|
||||
llm.score("ping", "pong", use_tqdm=False)
|
||||
|
||||
@ -226,3 +226,33 @@ def test_pooling(server: RemoteOpenAIServer, model_name: str):
|
||||
},
|
||||
)
|
||||
assert response.json()["error"]["type"] == "BadRequestError"
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@pytest.mark.parametrize("model_name", [MODEL_NAME])
|
||||
def test_score(server: RemoteOpenAIServer, model_name: str):
|
||||
# score api is only enabled for num_labels == 1.
|
||||
response = requests.post(
|
||||
server.url_for("score"),
|
||||
json={
|
||||
"model": model_name,
|
||||
"text_1": "ping",
|
||||
"text_2": "pong",
|
||||
},
|
||||
)
|
||||
assert response.json()["error"]["type"] == "BadRequestError"
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@pytest.mark.parametrize("model_name", [MODEL_NAME])
|
||||
def test_rerank(server: RemoteOpenAIServer, model_name: str):
|
||||
# rerank api is only enabled for num_labels == 1.
|
||||
response = requests.post(
|
||||
server.url_for("rerank"),
|
||||
json={
|
||||
"model": model_name,
|
||||
"query": "ping",
|
||||
"documents": ["pong"],
|
||||
},
|
||||
)
|
||||
assert response.json()["error"]["type"] == "BadRequestError"
|
||||
|
||||
@ -1805,17 +1805,13 @@ async def init_app_state(
|
||||
request_logger=request_logger,
|
||||
log_error_stack=args.log_error_stack,
|
||||
) if "classify" in supported_tasks else None
|
||||
|
||||
enable_serving_reranking = ("classify" in supported_tasks and getattr(
|
||||
model_config.hf_config, "num_labels", 0) == 1)
|
||||
state.openai_serving_scores = ServingScores(
|
||||
engine_client,
|
||||
model_config,
|
||||
state.openai_serving_models,
|
||||
request_logger=request_logger,
|
||||
log_error_stack=args.log_error_stack,
|
||||
) if ("embed" in supported_tasks or enable_serving_reranking) else None
|
||||
|
||||
) if ("embed" in supported_tasks or "score" in supported_tasks) else None
|
||||
state.openai_serving_tokenization = OpenAIServingTokenization(
|
||||
engine_client,
|
||||
model_config,
|
||||
|
||||
@ -13,12 +13,15 @@ import torch.nn.functional as F
|
||||
from transformers import PretrainedConfig
|
||||
|
||||
from vllm.config import ModelConfig, PoolerConfig
|
||||
from vllm.logger import init_logger
|
||||
from vllm.pooling_params import PoolingParams
|
||||
from vllm.sequence import PoolerOutput, PoolingSequenceGroupOutput
|
||||
from vllm.tasks import PoolingTask
|
||||
from vllm.utils import current_stream, resolve_obj_by_qualname
|
||||
from vllm.v1.pool.metadata import PoolingCursor, PoolingMetadata
|
||||
|
||||
logger = init_logger(__name__)
|
||||
|
||||
PoolingFn = Callable[
|
||||
[Union[torch.Tensor, list[torch.Tensor]], PoolingMetadata],
|
||||
Union[torch.Tensor, list[torch.Tensor]]]
|
||||
@ -183,7 +186,7 @@ def get_cross_encoder_activation_function(config: PretrainedConfig):
|
||||
fn = resolve_obj_by_qualname(function_name)()
|
||||
return PoolerActivation.wraps(fn)
|
||||
|
||||
return PoolerScore()
|
||||
return PoolerClassify()
|
||||
|
||||
|
||||
def build_output(
|
||||
@ -371,24 +374,31 @@ class PoolerMultiLabelClassify(PoolerActivation):
|
||||
|
||||
class PoolerClassify(PoolerActivation):
|
||||
|
||||
def __init__(self, *, static_num_labels: bool = True) -> None:
|
||||
super().__init__()
|
||||
|
||||
if static_num_labels:
|
||||
from vllm.config import get_current_vllm_config
|
||||
vllm_config = get_current_vllm_config()
|
||||
self.num_labels = getattr(vllm_config.model_config.hf_config,
|
||||
"num_labels", 0)
|
||||
if self.num_labels == 0:
|
||||
logger.warning("num_labels should be > 0 for classification"
|
||||
"models, falling back to softmax. "
|
||||
"Please check if the configuration is correct.")
|
||||
else:
|
||||
self.num_labels = None
|
||||
|
||||
def forward_chunk(self, pooled_data: torch.Tensor) -> torch.Tensor:
|
||||
num_labels = pooled_data.shape[-1]
|
||||
num_labels = (self.num_labels if self.num_labels is not None else
|
||||
pooled_data.shape[-1])
|
||||
|
||||
if num_labels < 2:
|
||||
return F.sigmoid(pooled_data.float()).to(pooled_data.dtype)
|
||||
|
||||
return F.softmax(pooled_data.float(), dim=-1).to(pooled_data.dtype)
|
||||
|
||||
|
||||
class PoolerScore(PoolerActivation):
|
||||
|
||||
def forward_chunk(self, pooled_data: torch.Tensor) -> torch.Tensor:
|
||||
num_labels = pooled_data.shape[-1]
|
||||
if num_labels < 2:
|
||||
return F.sigmoid(pooled_data.float()).to(pooled_data.dtype)
|
||||
|
||||
return pooled_data
|
||||
|
||||
|
||||
class LambdaPoolerActivation(PoolerActivation):
|
||||
|
||||
def __init__(self, fn: Callable[[torch.Tensor], torch.Tensor]):
|
||||
@ -428,6 +438,10 @@ class EmbeddingPoolerHead(PoolerHead):
|
||||
def forward(self, pooled_data: Union[list[torch.Tensor], torch.Tensor],
|
||||
pooling_metadata: PoolingMetadata):
|
||||
|
||||
if isinstance(pooled_data, list):
|
||||
pooled_data = torch.stack(pooled_data)
|
||||
# pooled_data shape: [batchsize, hidden_dimension]
|
||||
|
||||
# Apply ST projector
|
||||
if self.projector is not None:
|
||||
projector = cast(nn.Module, self.projector)
|
||||
@ -437,17 +451,11 @@ class EmbeddingPoolerHead(PoolerHead):
|
||||
y = projector(x.to(torch.float32))
|
||||
return y.to(orig_dtype)
|
||||
|
||||
if isinstance(pooled_data, torch.Tensor):
|
||||
pooled_data = _proj(pooled_data)
|
||||
else:
|
||||
pooled_data = [_proj(t) for t in pooled_data]
|
||||
pooled_data = _proj(pooled_data)
|
||||
# pooled_data shape: [batchsize, embedding_dimension]
|
||||
|
||||
pooling_params = get_pooling_params(pooling_metadata)
|
||||
|
||||
if isinstance(pooled_data, list):
|
||||
pooled_data = torch.stack(pooled_data)
|
||||
# pooled_data shape: [batchsize, embedding_dimension]
|
||||
|
||||
# for matryoshka representation
|
||||
dimensions_list = [
|
||||
pooling_param.dimensions for pooling_param in pooling_params
|
||||
@ -477,13 +485,14 @@ class EmbeddingPoolerHead(PoolerHead):
|
||||
for vecs, f in zip(pooled_data, flags)
|
||||
]
|
||||
|
||||
# pooled_data shape: [batchsize, embedding_dimension]
|
||||
return pooled_data
|
||||
|
||||
|
||||
class RewardPoolerHead(PoolerHead):
|
||||
|
||||
def __init__(self) -> None:
|
||||
super().__init__(activation=PoolerClassify())
|
||||
super().__init__(activation=PoolerClassify(static_num_labels=False))
|
||||
|
||||
def forward(self, pooled_data: Union[list[torch.Tensor], torch.Tensor],
|
||||
pooling_metadata: PoolingMetadata):
|
||||
@ -637,19 +646,13 @@ class ClassifierPooler(Pooler):
|
||||
pooling_metadata: PoolingMetadata,
|
||||
) -> PoolerOutput:
|
||||
pooled_data = self.pooling(hidden_states, pooling_metadata)
|
||||
|
||||
if isinstance(pooled_data, list):
|
||||
pooled_data = torch.stack(pooled_data)
|
||||
# pooled_data shape: [batchsize, hidden_size]
|
||||
|
||||
if self.classifier is not None:
|
||||
# apply classifier once on the full batch if possible
|
||||
if isinstance(pooled_data, torch.Tensor):
|
||||
pooled_data = self.classifier(pooled_data)
|
||||
elif len({data.shape for data in pooled_data}) <= 1:
|
||||
pooled_data = self.classifier(torch.stack(pooled_data))
|
||||
else:
|
||||
pooled_data = [self.classifier(data) for data in pooled_data]
|
||||
pooled_data = self.classifier(pooled_data)
|
||||
# pooled_data shape: [batchsize, num_labels]
|
||||
|
||||
pooling_params = get_pooling_params(pooling_metadata)
|
||||
flags = [p.activation for p in pooling_params]
|
||||
@ -662,6 +665,7 @@ class ClassifierPooler(Pooler):
|
||||
for vecs, f in zip(pooled_data, flags)
|
||||
]
|
||||
|
||||
# scores shape: [batchsize, num_labels]
|
||||
return build_output(scores)
|
||||
|
||||
|
||||
|
||||
@ -1248,10 +1248,17 @@ class GPUModelRunner(LoRAModelRunnerMixin, KVConnectorModelRunnerMixin):
|
||||
and "encode" in supported_tasks):
|
||||
supported_tasks.remove("encode")
|
||||
|
||||
logger.info_once("Chunked prefill is not supported with "
|
||||
"encode task which using ALL pooling. "
|
||||
"Please turn off chunked prefill by "
|
||||
"`--no-enable-chunked-prefill` before using it.")
|
||||
logger.debug_once("Chunked prefill is not supported with "
|
||||
"encode task which using ALL pooling. "
|
||||
"Please turn off chunked prefill by "
|
||||
"`--no-enable-chunked-prefill` before using it.")
|
||||
|
||||
if "score" in supported_tasks:
|
||||
num_labels = getattr(self.model_config.hf_config, "num_labels", 0)
|
||||
if num_labels != 1:
|
||||
supported_tasks.remove("score")
|
||||
logger.debug_once(
|
||||
"Score API is only enabled for num_labels == 1.")
|
||||
|
||||
return supported_tasks
|
||||
|
||||
|
||||
Reference in New Issue
Block a user