feat: propagate trigger metadata for plugin icons across UI

This commit is contained in:
lyzno1
2025-10-25 12:15:21 +08:00
parent e3484c8dc3
commit 3bd62f3fdf
9 changed files with 258 additions and 29 deletions

View File

@ -1,12 +1,16 @@
import json
import uuid
from datetime import datetime
from sqlalchemy import and_, func, or_, select
from sqlalchemy.orm import Session
from core.trigger.trigger_manager import TriggerManager
from core.workflow.enums import WorkflowExecutionStatus
from models import Account, App, EndUser, WorkflowAppLog, WorkflowRun
from models.enums import CreatorUserRole
from models.enums import AppTriggerType, CreatorUserRole
from models.provider_ids import TriggerProviderID
from models.trigger import WorkflowPluginTrigger, WorkflowTriggerLog
class WorkflowAppService:
@ -111,6 +115,10 @@ class WorkflowAppService:
# Execute query and get items
items = list(session.scalars(offset_stmt).all())
trigger_info_map = self._build_trigger_info_map(session, app_model, items)
for log in items:
log.trigger_info = trigger_info_map.get(log.workflow_run_id)
return {
"page": page,
"limit": limit,
@ -129,3 +137,101 @@ class WorkflowAppService:
return uuid.UUID(value)
except ValueError:
return None
def _build_trigger_info_map(self, session: Session, app_model: App, logs: list[WorkflowAppLog]) -> dict[str, dict]:
run_ids = [log.workflow_run_id for log in logs if log.workflow_run_id]
if not run_ids:
return {}
trigger_logs = (
session.execute(select(WorkflowTriggerLog).where(WorkflowTriggerLog.workflow_run_id.in_(run_ids)))
.scalars()
.all()
)
if not trigger_logs:
return {}
trigger_data_map: dict[str, dict] = {}
node_ids: set[str] = set()
for trigger_log in trigger_logs:
if not trigger_log.workflow_run_id:
continue
try:
trigger_data = json.loads(trigger_log.trigger_data)
except json.JSONDecodeError:
trigger_data = {}
node_id = trigger_data.get("root_node_id")
if node_id:
node_ids.add(node_id)
trigger_data_map[trigger_log.workflow_run_id] = {
"log": trigger_log,
"node_id": node_id,
}
plugin_trigger_map: dict[str, WorkflowPluginTrigger] = {}
if node_ids:
plugin_triggers = (
session.execute(
select(WorkflowPluginTrigger).where(
WorkflowPluginTrigger.app_id == app_model.id,
WorkflowPluginTrigger.node_id.in_(node_ids),
)
)
.scalars()
.all()
)
plugin_trigger_map = {plugin.node_id: plugin for plugin in plugin_triggers}
provider_cache: dict[str, dict[str, str]] = {}
def resolve_provider(provider_id: str) -> dict[str, str]:
if provider_id in provider_cache:
return provider_cache[provider_id]
metadata: dict[str, str] = {}
try:
controller = TriggerManager.get_trigger_provider(app_model.tenant_id, TriggerProviderID(provider_id))
api_entity = controller.to_api_entity()
metadata = {
"provider_name": api_entity.name,
"icon": api_entity.icon or "",
"plugin_id": controller.plugin_id,
"plugin_unique_identifier": controller.plugin_unique_identifier,
}
except Exception:
metadata = {}
provider_cache[provider_id] = metadata
return metadata
trigger_info_map: dict[str, dict] = {}
for run_id, context in trigger_data_map.items():
trigger_log = context["log"]
if isinstance(trigger_log.trigger_type, AppTriggerType):
trigger_type_value = trigger_log.trigger_type.value
else:
trigger_type_value = trigger_log.trigger_type
info = {
"type": trigger_type_value,
"node_id": context["node_id"],
"workflow_trigger_log_id": trigger_log.id,
}
if (
trigger_log.trigger_type == AppTriggerType.TRIGGER_PLUGIN # type: ignore[comparison-overlap]
and context["node_id"]
):
plugin_trigger = plugin_trigger_map.get(context["node_id"])
if plugin_trigger:
info.update(
{
"provider_id": plugin_trigger.provider_id,
"subscription_id": plugin_trigger.subscription_id,
"event_name": plugin_trigger.event_name,
}
)
provider_metadata = resolve_provider(plugin_trigger.provider_id)
if provider_metadata:
info.update(provider_metadata)
trigger_info_map[run_id] = info
return trigger_info_map