diff --git a/tests/entrypoints/openai/test_chat.py b/tests/entrypoints/openai/test_chat.py index d110234d60..14181c6b8b 100644 --- a/tests/entrypoints/openai/test_chat.py +++ b/tests/entrypoints/openai/test_chat.py @@ -369,7 +369,7 @@ async def test_chat_completion_stream_options( assert chunk.usage is None else: assert chunk.usage is None - final_chunk = await stream.__anext__() + final_chunk = await anext(stream) assert final_chunk.usage is not None assert final_chunk.usage.prompt_tokens > 0 assert final_chunk.usage.completion_tokens > 0 diff --git a/tests/entrypoints/test_context.py b/tests/entrypoints/test_context.py index 6ad18fa08b..b0faa870a9 100644 --- a/tests/entrypoints/test_context.py +++ b/tests/entrypoints/test_context.py @@ -10,12 +10,6 @@ from vllm.entrypoints.context import HarmonyContext, StreamingHarmonyContext from vllm.outputs import CompletionOutput, RequestOutput -# Helper function for Python < 3.10 compatibility -async def async_next(async_iterator): - """Compatibility function equivalent to Python 3.10's anext().""" - return await async_iterator.__anext__() - - def create_mock_request_output( prompt_token_ids=None, output_token_ids=None, @@ -129,7 +123,7 @@ async def test_multi_turn_token_counting(): ) # First turn - initial prompt and response - mock_output1 = await async_next(mock_generator) + mock_output1 = await anext(mock_generator) context.append_output(mock_output1) # At this point, we should have 5 prompt tokens and 3 output tokens @@ -138,7 +132,7 @@ async def test_multi_turn_token_counting(): assert context.num_tool_output_tokens == 0 # Second turn - after tool output - mock_output2 = await async_next(mock_generator) + mock_output2 = await anext(mock_generator) context.append_output(mock_output2) # Current prompt tokens (15) - last_turn_input_tokens (5) - # last_turn_output_tokens (3) = 7 @@ -150,7 +144,7 @@ async def test_multi_turn_token_counting(): assert context.num_cached_tokens == 5 # Third turn - final response - mock_output3 = await async_next(mock_generator) + mock_output3 = await anext(mock_generator) context.append_output(mock_output3) # Additional tool output tokens from third turn: # Current prompt (20) - last_turn_input_tokens (15) - diff --git a/tests/utils_/test_utils.py b/tests/utils_/test_utils.py index cd5fa55049..308629ab05 100644 --- a/tests/utils_/test_utils.py +++ b/tests/utils_/test_utils.py @@ -75,8 +75,7 @@ async def test_merge_async_iterators(): for iterator in iterators: try: - # Can use anext() in python >= 3.10 - await asyncio.wait_for(iterator.__anext__(), 1) + await asyncio.wait_for(anext(iterator), 1) except StopAsyncIteration: # All iterators should be cancelled and print this message. print("Iterator was cancelled normally") diff --git a/tests/v1/entrypoints/openai/test_completion.py b/tests/v1/entrypoints/openai/test_completion.py index 35287f5b97..66dbed2b9f 100644 --- a/tests/v1/entrypoints/openai/test_completion.py +++ b/tests/v1/entrypoints/openai/test_completion.py @@ -420,7 +420,7 @@ async def test_completion_stream_options(client: openai.AsyncOpenAI, model_name: assert chunk.usage is None else: assert chunk.usage is None - final_chunk = await stream.__anext__() + final_chunk = await anext(stream) assert final_chunk.usage is not None assert final_chunk.usage.prompt_tokens > 0 assert final_chunk.usage.completion_tokens > 0 @@ -450,7 +450,7 @@ async def test_completion_stream_options(client: openai.AsyncOpenAI, model_name: chunk.usage.prompt_tokens + chunk.usage.completion_tokens ) if chunk.choices[0].finish_reason is not None: - final_chunk = await stream.__anext__() + final_chunk = await anext(stream) assert final_chunk.usage is not None assert final_chunk.usage.prompt_tokens > 0 assert final_chunk.usage.completion_tokens > 0 diff --git a/vllm/benchmarks/serve.py b/vllm/benchmarks/serve.py index f061c14799..c3c45f05f8 100644 --- a/vllm/benchmarks/serve.py +++ b/vllm/benchmarks/serve.py @@ -18,6 +18,7 @@ On the client side, run: import argparse import asyncio +import contextlib import gc import importlib.util import json @@ -605,17 +606,13 @@ async def benchmark( pbar = None if disable_tqdm else tqdm(total=len(input_requests)) - # This can be used once the minimum Python version is 3.10 or higher, - # and it will simplify the code in limited_request_func. - # semaphore = (asyncio.Semaphore(max_concurrency) - # if max_concurrency else contextlib.nullcontext()) - semaphore = asyncio.Semaphore(max_concurrency) if max_concurrency else None + semaphore = ( + asyncio.Semaphore(max_concurrency) + if max_concurrency + else contextlib.nullcontext() + ) async def limited_request_func(request_func_input, session, pbar): - if semaphore is None: - return await request_func( - request_func_input=request_func_input, session=session, pbar=pbar - ) async with semaphore: return await request_func( request_func_input=request_func_input, session=session, pbar=pbar diff --git a/vllm/utils/__init__.py b/vllm/utils/__init__.py index 4a6a79ad06..22c2a4b536 100644 --- a/vllm/utils/__init__.py +++ b/vllm/utils/__init__.py @@ -469,11 +469,6 @@ def make_async( return _async_wrapper -def _next_task(iterator: AsyncGenerator[T, None], loop: AbstractEventLoop) -> Task: - # Can use anext() in python >= 3.10 - return loop.create_task(iterator.__anext__()) # type: ignore[arg-type] - - async def merge_async_iterators( *iterators: AsyncGenerator[T, None], ) -> AsyncGenerator[tuple[int, T], None]: @@ -491,7 +486,7 @@ async def merge_async_iterators( loop = asyncio.get_running_loop() - awaits = {_next_task(pair[1], loop): pair for pair in enumerate(iterators)} + awaits = {loop.create_task(anext(it)): (i, it) for i, it in enumerate(iterators)} try: while awaits: done, _ = await asyncio.wait(awaits.keys(), return_when=FIRST_COMPLETED) @@ -500,7 +495,7 @@ async def merge_async_iterators( try: item = await d i, it = pair - awaits[_next_task(it, loop)] = pair + awaits[loop.create_task(anext(it))] = pair yield i, item except StopAsyncIteration: pass diff --git a/vllm/v1/serial_utils.py b/vllm/v1/serial_utils.py index 747d08dcd3..f4e1cbd2e0 100644 --- a/vllm/v1/serial_utils.py +++ b/vllm/v1/serial_utils.py @@ -290,9 +290,7 @@ class MsgpackDecoder: _log_insecure_serialization_warning() def decode(self, bufs: Union[bytestr, Sequence[bytestr]]) -> Any: - if isinstance(bufs, (bytes, bytearray, memoryview, zmq.Frame)): - # TODO - This check can become `isinstance(bufs, bytestr)` - # as of Python 3.10. + if isinstance(bufs, bytestr): # type: ignore return self.decoder.decode(bufs) self.aux_buffers = bufs