chore(api): fix lint and style issues

This commit is contained in:
QuantumGhost
2026-01-19 15:36:30 +08:00
parent 578cac35e3
commit 33dd9c8ad7
5 changed files with 85 additions and 93 deletions

View File

@ -1,9 +1,7 @@
import json
import logging
import time
import uuid
from collections.abc import Callable, Generator, Mapping
from typing import Any, Union, cast
from typing import Union, cast
from sqlalchemy import select
from sqlalchemy.orm import Session
@ -12,6 +10,7 @@ from core.app.app_config.entities import EasyUIBasedAppConfig, EasyUIBasedAppMod
from core.app.apps.base_app_generator import BaseAppGenerator
from core.app.apps.base_app_queue_manager import AppQueueManager
from core.app.apps.exc import GenerateTaskStoppedError
from core.app.apps.streaming_utils import stream_topic_events
from core.app.entities.app_invoke_entities import (
AdvancedChatAppGenerateEntity,
AgentChatAppGenerateEntity,
@ -25,14 +24,12 @@ from core.app.entities.task_entities import (
ChatbotAppStreamResponse,
CompletionAppBlockingResponse,
CompletionAppStreamResponse,
StreamEvent,
)
from core.app.task_pipeline.easy_ui_based_generate_task_pipeline import EasyUIBasedGenerateTaskPipeline
from core.prompt.utils.prompt_template_parser import PromptTemplateParser
from extensions.ext_database import db
from extensions.ext_redis import get_pubsub_broadcast_channel
from libs.broadcast_channel.channel import Topic
from libs.broadcast_channel.exc import SubscriptionClosedError
from libs.datetime_utils import naive_utc_now
from models import Account
from models.enums import CreatorUserRole
@ -315,38 +312,8 @@ class MessageBasedAppGenerator(BaseAppGenerator):
on_subscribe: Callable[[], None] | None = None,
) -> Generator[Mapping | str, None, None]:
topic = cls.get_response_topic(app_mode, workflow_run_id)
return _topic_msg_generator(topic, idle_timeout, on_subscribe)
def _topic_msg_generator(
topic: Topic,
idle_timeout: float,
on_subscribe: Callable[[], None] | None = None,
) -> Generator[Mapping[str, Any], None, None]:
last_msg_time = time.time()
with topic.subscribe() as sub:
# on_subscribe fires only after the Redis subscription is active.
# This is used to gate task start and reduce pub/sub race for the first event.
if on_subscribe is not None:
on_subscribe()
while True:
try:
msg = sub.receive()
except SubscriptionClosedError:
return
if msg is None:
current_time = time.time()
if current_time - last_msg_time > idle_timeout:
return
# skip the `None` message
continue
last_msg_time = time.time()
event = json.loads(msg)
yield event
if not isinstance(event, dict):
continue
event_type = event.get("event")
if event_type in (StreamEvent.WORKFLOW_FINISHED, StreamEvent.WORKFLOW_PAUSED):
return
return stream_topic_events(
topic=topic,
idle_timeout=idle_timeout,
on_subscribe=on_subscribe,
)

View File

@ -1,14 +1,8 @@
import json
import time
from collections.abc import Callable, Generator, Mapping
from typing import Any
from core.app.entities.task_entities import (
StreamEvent,
)
from core.app.apps.streaming_utils import stream_topic_events
from extensions.ext_redis import get_pubsub_broadcast_channel
from libs.broadcast_channel.channel import Topic
from libs.broadcast_channel.exc import SubscriptionClosedError
from models.model import AppMode
@ -34,44 +28,9 @@ class MessageGenerator:
on_subscribe: Callable[[], None] | None = None,
) -> Generator[Mapping | str, None, None]:
topic = cls.get_response_topic(app_mode, workflow_run_id)
return _topic_msg_generator(topic, idle_timeout, ping_interval, on_subscribe)
def _topic_msg_generator(
topic: Topic,
idle_timeout: float,
ping_interval: float,
on_subscribe: Callable[[], None] | None = None,
) -> Generator[Mapping[str, Any], None, None]:
last_msg_time = time.time()
last_ping_time = last_msg_time
with topic.subscribe() as sub:
# on_subscribe fires only after the Redis subscription is active.
# This is used to gate task start and reduce pub/sub race for the first event.
if on_subscribe is not None:
on_subscribe()
while True:
try:
msg = sub.receive(timeout=0.1)
except SubscriptionClosedError:
return
if msg is None:
current_time = time.time()
if current_time - last_msg_time > idle_timeout:
return
if current_time - last_ping_time >= ping_interval:
yield "ping"
last_ping_time = current_time
# skip the `None` message
continue
last_msg_time = time.time()
last_ping_time = last_msg_time
event = json.loads(msg)
yield event
if not isinstance(event, dict):
continue
event_type = event.get("event")
if event_type in (StreamEvent.WORKFLOW_FINISHED, StreamEvent.WORKFLOW_PAUSED):
return
return stream_topic_events(
topic=topic,
idle_timeout=idle_timeout,
ping_interval=ping_interval,
on_subscribe=on_subscribe,
)

View File

@ -0,0 +1,64 @@
from __future__ import annotations
import json
import time
from collections.abc import Callable, Generator, Iterable, Mapping
from typing import Any
from core.app.entities.task_entities import StreamEvent
from libs.broadcast_channel.channel import Topic
from libs.broadcast_channel.exc import SubscriptionClosedError
def stream_topic_events(
*,
topic: Topic,
idle_timeout: float,
ping_interval: float | None = None,
on_subscribe: Callable[[], None] | None = None,
terminal_events: Iterable[str | StreamEvent] | None = None,
) -> Generator[Mapping[str, Any] | str, None, None]:
terminal_values = _normalize_terminal_events(terminal_events)
last_msg_time = time.time()
last_ping_time = last_msg_time
with topic.subscribe() as sub:
# on_subscribe fires only after the Redis subscription is active.
# This is used to gate task start and reduce pub/sub race for the first event.
if on_subscribe is not None:
on_subscribe()
while True:
try:
msg = sub.receive(timeout=0.1)
except SubscriptionClosedError:
return
if msg is None:
current_time = time.time()
if current_time - last_msg_time > idle_timeout:
return
if ping_interval is not None and current_time - last_ping_time >= ping_interval:
yield StreamEvent.PING.value
last_ping_time = current_time
continue
last_msg_time = time.time()
last_ping_time = last_msg_time
event = json.loads(msg)
yield event
if not isinstance(event, dict):
continue
event_type = event.get("event")
if event_type in terminal_values:
return
def _normalize_terminal_events(terminal_events: Iterable[str | StreamEvent] | None) -> set[str]:
if not terminal_events:
return {StreamEvent.WORKFLOW_FINISHED.value}
values: set[str] = set()
for item in terminal_events:
if isinstance(item, StreamEvent):
values.add(item.value)
else:
values.add(str(item))
return values

View File

@ -119,7 +119,11 @@ def build_workflow_event_stream(
except queue.Empty:
current_time = time.time()
if current_time - last_msg_time > idle_timeout:
return
logger.debug(
"No workflow events received for %s seconds, keeping stream open",
idle_timeout,
)
last_msg_time = current_time
if current_time - last_ping_time >= ping_interval:
yield StreamEvent.PING.value
last_ping_time = current_time
@ -416,7 +420,7 @@ def _is_terminal_event(event: Mapping[str, Any] | str) -> bool:
if not isinstance(event, Mapping):
return False
event_type = event.get("event")
return event_type in (StreamEvent.WORKFLOW_FINISHED.value, StreamEvent.WORKFLOW_PAUSED.value)
return event_type == StreamEvent.WORKFLOW_FINISHED.value
def _collect_snapshot_keys(events: Iterable[Mapping[str, Any]]) -> set[tuple[str, str]]:

View File

@ -1025,9 +1025,7 @@ class WorkflowService:
continue
email = payload.get("email")
if email:
recipients_data.append(
DeliveryTestEmailRecipient(email=email, form_token=recipient.access_token)
)
recipients_data.append(DeliveryTestEmailRecipient(email=email, form_token=recipient.access_token))
return recipients_data
def _build_human_input_node(