feat(api): support variable reference and substitution in Email delivery

The EmailDeliveryConfig.body now support referencing variables generated
by precedent nodes.
This commit is contained in:
QuantumGhost
2026-01-21 15:19:17 +08:00
parent 32dab780ba
commit 20be1dd819
7 changed files with 189 additions and 18 deletions

View File

@ -11,6 +11,8 @@ from sqlalchemy.orm import Session, sessionmaker
from configs import dify_config
from core.workflow.nodes.human_input.entities import EmailDeliveryConfig, EmailDeliveryMethod
from core.workflow.runtime import GraphRuntimeState, VariablePool
from core.app.layers.pause_state_persist_layer import WorkflowResumptionContext
from extensions.ext_database import db
from extensions.ext_mail import mail
from libs.email_template_renderer import render_email_template
@ -21,6 +23,7 @@ from models.human_input import (
HumanInputFormRecipient,
RecipientType,
)
from repositories.factory import DifyAPIRepositoryFactory
from services.feature_service import FeatureService
logger = logging.getLogger(__name__)
@ -108,11 +111,41 @@ def _render_subject(subject_template: str, substitutions: dict[str, str]) -> str
return render_email_template(subject_template, substitutions)
def _render_body(body_template: str, substitutions: dict[str, str]) -> str:
templated_body = EmailDeliveryConfig.replace_url_placeholder(body_template, substitutions.get("form_link"))
def _render_body(
body_template: str,
substitutions: dict[str, str],
*,
variable_pool: VariablePool | None,
) -> str:
templated_body = EmailDeliveryConfig.render_body_template(
body=body_template,
url=substitutions.get("form_link"),
variable_pool=variable_pool,
)
return render_email_template(templated_body, substitutions)
def _load_variable_pool(workflow_run_id: str | None) -> VariablePool | None:
if not workflow_run_id:
return None
session_factory = sessionmaker(bind=db.engine, expire_on_commit=False)
workflow_run_repo = DifyAPIRepositoryFactory.create_api_workflow_run_repository(session_factory)
pause_entity = workflow_run_repo.get_workflow_pause(workflow_run_id)
if pause_entity is None:
logger.info("No pause state found for workflow run %s", workflow_run_id)
return None
try:
resumption_context = WorkflowResumptionContext.loads(pause_entity.get_state().decode())
except Exception:
logger.exception("Failed to load resumption context for workflow run %s", workflow_run_id)
return None
graph_runtime_state = GraphRuntimeState.from_snapshot(resumption_context.serialized_graph_runtime_state)
return graph_runtime_state.variable_pool
def _build_substitutions(
*,
job: _EmailDeliveryJob,
@ -163,11 +196,13 @@ def dispatch_human_input_email_task(form_id: str, node_title: str | None = None,
return
jobs = _load_email_jobs(session, form)
variable_pool = _load_variable_pool(form.workflow_run_id)
for job in jobs:
for recipient in job.recipients:
substitutions = _build_substitutions(job=job, recipient=recipient, node_title=node_title)
subject = _render_subject(job.subject, substitutions)
body = _render_body(job.body, substitutions)
body = _render_body(job.body, substitutions, variable_pool=variable_pool)
mail.send(
to=recipient.email,