refactor(api): type workflow service dicts with TypedDict (#33829)

This commit is contained in:
BitToby
2026-03-20 15:36:31 +02:00
committed by GitHub
parent a1af085736
commit 3d5a29462e
2 changed files with 25 additions and 7 deletions

View File

@ -5,6 +5,7 @@ from typing import Any
from sqlalchemy import and_, func, or_, select
from sqlalchemy.orm import Session
from typing_extensions import TypedDict
from dify_graph.enums import WorkflowExecutionStatus
from models import Account, App, EndUser, TenantAccountJoin, WorkflowAppLog, WorkflowArchiveLog, WorkflowRun
@ -14,6 +15,10 @@ from services.plugin.plugin_service import PluginService
from services.workflow.entities import TriggerMetadata
class LogViewDetails(TypedDict):
trigger_metadata: dict[str, Any] | None
# Since the workflow_app_log table has exceeded 100 million records, we use an additional details field to extend it
class LogView:
"""Lightweight wrapper for WorkflowAppLog with computed details.
@ -22,12 +27,12 @@ class LogView:
- Proxies all other attributes to the underlying `WorkflowAppLog`
"""
def __init__(self, log: WorkflowAppLog, details: dict | None):
def __init__(self, log: WorkflowAppLog, details: LogViewDetails | None):
self.log = log
self.details_ = details
@property
def details(self) -> dict | None:
def details(self) -> LogViewDetails | None:
return self.details_
def __getattr__(self, name):