Compare commits
8 Commits
remove-asy
...
add-nixl-t
| Author | SHA1 | Date | |
|---|---|---|---|
| c130e8edea | |||
| 88b009b540 | |||
| ca3156dffc | |||
| b7ac93cb08 | |||
| e0dd33aeff | |||
| edf35b63ce | |||
| 4127593c30 | |||
| 14f13ed690 |
@ -874,13 +874,17 @@ class NixlConnectorWorker:
|
|||||||
Returns:
|
Returns:
|
||||||
set of req_ids that have all done xfers
|
set of req_ids that have all done xfers
|
||||||
"""
|
"""
|
||||||
|
current_time = time.perf_counter()
|
||||||
|
|
||||||
done_req_ids: set[str] = set()
|
done_req_ids: set[str] = set()
|
||||||
for req_id, handles in list(transfers.items()):
|
for req_id, handles in list(transfers.items()):
|
||||||
in_progress = False
|
in_progress = False
|
||||||
for handle, _xfer_stime in handles:
|
for handle, _xfer_stime, remote_id in handles:
|
||||||
xfer_state = self.nixl_wrapper.check_xfer_state(handle)
|
xfer_state = self.nixl_wrapper.check_xfer_state(handle)
|
||||||
if xfer_state == "DONE":
|
if xfer_state == "DONE":
|
||||||
self.nixl_wrapper.release_xfer_handle(handle)
|
self.nixl_wrapper.release_xfer_handle(handle)
|
||||||
|
logger.info("========= TRANSFER: req_id %s remote_engine_id %s transfer time: %s",
|
||||||
|
req_id, remote_id, current_time - _xfer_stime)
|
||||||
elif xfer_state == "PROC":
|
elif xfer_state == "PROC":
|
||||||
in_progress = True
|
in_progress = True
|
||||||
continue
|
continue
|
||||||
@ -1012,6 +1016,7 @@ class NixlConnectorWorker:
|
|||||||
assert len(local_block_descs_ids) == len(remote_block_descs_ids)
|
assert len(local_block_descs_ids) == len(remote_block_descs_ids)
|
||||||
|
|
||||||
# Prepare transfer with Nixl.
|
# Prepare transfer with Nixl.
|
||||||
|
start = time.perf_counter()
|
||||||
handle = self.nixl_wrapper.make_prepped_xfer(
|
handle = self.nixl_wrapper.make_prepped_xfer(
|
||||||
"READ",
|
"READ",
|
||||||
local_xfer_side_handle,
|
local_xfer_side_handle,
|
||||||
@ -1019,15 +1024,20 @@ class NixlConnectorWorker:
|
|||||||
remote_xfer_side_handle,
|
remote_xfer_side_handle,
|
||||||
remote_block_descs_ids,
|
remote_block_descs_ids,
|
||||||
notif_msg=notif_id,
|
notif_msg=notif_id,
|
||||||
|
skip_desc_merge=True,
|
||||||
)
|
)
|
||||||
|
|
||||||
# Begin async xfer.
|
# Begin async xfer.
|
||||||
self.nixl_wrapper.transfer(handle)
|
self.nixl_wrapper.transfer(handle)
|
||||||
|
end = time.perf_counter()
|
||||||
|
# logger.debug(
|
||||||
|
# "[nixl connector]: req_id %s transfer launch time %s",
|
||||||
|
# request_id, end - start)
|
||||||
|
|
||||||
# Use handle to check completion in future step().
|
# Use handle to check completion in future step().
|
||||||
# TODO (NickLucche) surface xfer elapsed time
|
# TODO (NickLucche) surface xfer elapsed time
|
||||||
self._recving_transfers[request_id].append(
|
self._recving_transfers[request_id].append(
|
||||||
(handle, time.perf_counter()))
|
(handle, time.perf_counter(), dst_engine_id))
|
||||||
|
|
||||||
def _get_block_descs_ids(self,
|
def _get_block_descs_ids(self,
|
||||||
engine_id: str,
|
engine_id: str,
|
||||||
|
|||||||
@ -60,6 +60,7 @@ class AsyncLLM(EngineClient):
|
|||||||
client_addresses: Optional[dict[str, str]] = None,
|
client_addresses: Optional[dict[str, str]] = None,
|
||||||
client_index: int = 0,
|
client_index: int = 0,
|
||||||
) -> None:
|
) -> None:
|
||||||
|
print("====== HELLO ======")
|
||||||
"""
|
"""
|
||||||
Create an AsyncLLM.
|
Create an AsyncLLM.
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user