evaluation runtime

This commit is contained in:
jyong
2026-03-10 17:37:28 +08:00
parent 7a065b3f42
commit 2bd48e62a3
7 changed files with 33 additions and 215 deletions

View File

@ -21,7 +21,7 @@ class BaseEvaluationInstance(ABC):
def evaluate_llm(
self,
items: list[EvaluationItemInput],
default_metrics: list[dict[str, Any]],
default_metrics: str,
model_provider: str,
model_name: str,
tenant_id: str,
@ -33,7 +33,7 @@ class BaseEvaluationInstance(ABC):
def evaluate_retrieval(
self,
items: list[EvaluationItemInput],
default_metrics: list[dict[str, Any]],
default_metrics: str,
model_provider: str,
model_name: str,
tenant_id: str,
@ -45,7 +45,7 @@ class BaseEvaluationInstance(ABC):
def evaluate_agent(
self,
items: list[EvaluationItemInput],
default_metrics: list[dict[str, Any]],
default_metrics: str,
model_provider: str,
model_name: str,
tenant_id: str,
@ -53,188 +53,8 @@ class BaseEvaluationInstance(ABC):
"""Evaluate agent outputs using the configured framework."""
...
@abstractmethod
def evaluate_workflow(
self,
items: list[EvaluationItemInput],
default_metrics: list[dict[str, Any]],
model_provider: str,
model_name: str,
tenant_id: str,
) -> list[EvaluationItemResult]:
"""Evaluate workflow outputs using the configured framework."""
...
@abstractmethod
def get_supported_metrics(self, category: EvaluationCategory) -> list[str]:
"""Return the list of supported metric names for a given evaluation category."""
...
def evaluate_with_customized_workflow(
self,
items: list[EvaluationItemInput],
results: list[EvaluationItemResult],
customized_metrics: dict[str, Any],
tenant_id: str,
) -> list[EvaluationItemResult]:
"""Evaluate using a published workflow as the evaluator.
The evaluator workflow's output variables are treated as metrics:
each output variable name becomes a metric name, and its value
becomes the score.
Args:
items: Evaluation items with inputs, expected_output, context.
results: Results from Phase 1 (with actual_output populated).
customized_metrics: Must contain ``evaluation_workflow_id``
pointing to a published WORKFLOW-type App.
tenant_id: Tenant scope.
Returns:
A list of ``EvaluationItemResult`` with metrics extracted from
the workflow outputs.
"""
from sqlalchemy.orm import Session
from core.app.apps.workflow.app_generator import WorkflowAppGenerator
from core.app.entities.app_invoke_entities import InvokeFrom
from core.evaluation.runners import get_service_account_for_app
from models.engine import db
from models.model import App
from services.workflow_service import WorkflowService
workflow_id = customized_metrics.get("evaluation_workflow_id")
if not workflow_id:
raise ValueError(
"customized_metrics must contain 'evaluation_workflow_id' for customized evaluator"
)
# Load the evaluator workflow resources using a dedicated session
with Session(db.engine, expire_on_commit=False) as session, session.begin():
app = session.query(App).filter_by(
id=workflow_id, tenant_id=tenant_id
).first()
if not app:
raise ValueError(
f"Evaluation workflow app {workflow_id} not found in tenant {tenant_id}"
)
service_account = get_service_account_for_app(session, workflow_id)
workflow_service = WorkflowService()
published_workflow = workflow_service.get_published_workflow(app_model=app)
if not published_workflow:
raise ValueError(
f"No published workflow found for evaluation app {workflow_id}"
)
result_by_index = {r.index: r for r in results}
eval_results: list[EvaluationItemResult] = []
for item in items:
result = result_by_index.get(item.index)
try:
workflow_inputs = self._build_workflow_inputs(item, result)
generator = WorkflowAppGenerator()
response: Mapping[str, Any] = generator.generate(
app_model=app,
workflow=published_workflow,
user=service_account,
args={"inputs": workflow_inputs},
invoke_from=InvokeFrom.SERVICE_API,
streaming=False,
)
metrics = self._extract_workflow_metrics(response)
eval_results.append(
EvaluationItemResult(
index=item.index,
metrics=metrics,
metadata={
"workflow_response": _safe_serialize(response),
},
)
)
except Exception:
logger.exception(
"Customized evaluator failed for item %d with workflow %s",
item.index,
workflow_id,
)
eval_results.append(EvaluationItemResult(index=item.index))
return eval_results
@staticmethod
def _build_workflow_inputs(
item: EvaluationItemInput,
result: EvaluationItemResult | None,
) -> dict[str, Any]:
"""Build workflow input dict from evaluation data.
Maps evaluation data to conventional workflow input variable names:
- ``actual_output``: The target's actual output (from ``result``).
- ``expected_output``: The expected/reference output.
- ``inputs``: The original evaluation inputs as a JSON string.
- ``context``: All context strings joined by newlines.
"""
workflow_inputs: dict[str, Any] = {}
if result and result.actual_output:
workflow_inputs["actual_output"] = result.actual_output
if item.expected_output:
workflow_inputs["expected_output"] = item.expected_output
if item.inputs:
workflow_inputs["inputs"] = json.dumps(item.inputs, ensure_ascii=False)
if item.context:
workflow_inputs["context"] = "\n\n".join(item.context)
return workflow_inputs
@staticmethod
def _extract_workflow_metrics(
response: Mapping[str, Any],
) -> list[EvaluationMetric]:
"""Extract evaluation metrics from workflow output variables.
Each output variable is treated as a metric. The variable name
becomes the metric name, and its value becomes the score.
Non-numeric values are recorded with ``score=0.0`` and the raw
value stored in ``details``.
"""
metrics: list[EvaluationMetric] = []
data = response.get("data", {})
if not isinstance(data, Mapping):
logger.warning("Unexpected workflow response format: missing 'data' dict")
return metrics
outputs = data.get("outputs", {})
if not isinstance(outputs, Mapping):
logger.warning(
"Unexpected workflow response format: 'outputs' is not a dict"
)
return metrics
for key, value in outputs.items():
try:
score = float(value)
metrics.append(EvaluationMetric(name=key, score=score))
except (TypeError, ValueError):
metrics.append(
EvaluationMetric(
name=key, score=0.0, details={"raw_value": value}
)
)
return metrics
def _safe_serialize(response: Mapping[str, Any]) -> dict[str, Any]:
"""Safely serialize workflow response for metadata storage."""
try:
return dict(response)
except Exception:
return {"raw": str(response)}

View File

@ -42,7 +42,7 @@ class RagasEvaluator(BaseEvaluationInstance):
def evaluate_llm(
self,
items: list[EvaluationItemInput],
default_metrics: list[dict[str, Any]],
default_metrics: str,
model_provider: str,
model_name: str,
tenant_id: str,
@ -52,7 +52,7 @@ class RagasEvaluator(BaseEvaluationInstance):
def evaluate_retrieval(
self,
items: list[EvaluationItemInput],
default_metrics: list[dict[str, Any]],
default_metrics: str,
model_provider: str,
model_name: str,
tenant_id: str,
@ -64,7 +64,7 @@ class RagasEvaluator(BaseEvaluationInstance):
def evaluate_agent(
self,
items: list[EvaluationItemInput],
default_metrics: list[dict[str, Any]],
default_metrics: str,
model_provider: str,
model_name: str,
tenant_id: str,
@ -74,7 +74,7 @@ class RagasEvaluator(BaseEvaluationInstance):
def evaluate_workflow(
self,
items: list[EvaluationItemInput],
default_metrics: list[dict[str, Any]],
default_metrics: str,
model_provider: str,
model_name: str,
tenant_id: str,
@ -86,7 +86,7 @@ class RagasEvaluator(BaseEvaluationInstance):
def _evaluate(
self,
items: list[EvaluationItemInput],
default_metrics: list[dict[str, Any]],
default_metrics: str,
model_provider: str,
model_name: str,
tenant_id: str,
@ -98,9 +98,9 @@ class RagasEvaluator(BaseEvaluationInstance):
string similarity if RAGAS import fails.
"""
model_wrapper = DifyModelWrapper(model_provider, model_name, tenant_id)
# Extract metric names from default_metrics list; each item has a "metric" key.
# Extract metric names from default_metrics string.
requested_metrics = (
[m["metric"] for m in default_metrics if "metric" in m]
[default_metrics]
if default_metrics
else self.get_supported_metrics(category)
)

View File

@ -69,8 +69,8 @@ class AgentEvaluationRunner(BaseEvaluationRunner):
def evaluate_metrics(
self,
node_run_result_mapping: dict[str, NodeRunResult] | None,
node_run_result: NodeRunResult | None,
node_run_result_mapping_list: list[dict[str, NodeRunResult]] | None,
node_run_result_list: list[NodeRunResult] | None,
default_metric: DefaultMetric | None,
customized_metrics: CustomizedMetrics | None,
model_provider: str,

View File

@ -26,8 +26,8 @@ class LLMEvaluationRunner(BaseEvaluationRunner):
def evaluate_metrics(
self,
node_run_result_mapping: dict[str, NodeRunResult] | None,
node_run_result: NodeRunResult | None,
node_run_result_mapping_list: list[dict[str, NodeRunResult]] | None,
node_run_result_list: list[NodeRunResult] | None,
default_metric: DefaultMetric | None,
customized_metrics: CustomizedMetrics | None,
model_provider: str,
@ -36,7 +36,9 @@ class LLMEvaluationRunner(BaseEvaluationRunner):
) -> list[EvaluationItemResult]:
"""Use the evaluation instance to compute LLM metrics."""
# Merge actual_output into items for evaluation
merged_items = self._merge_results_into_items(items, results)
if not node_run_result_list:
return []
merged_items = self._merge_results_into_items(node_run_result_list)
return self.evaluation_instance.evaluate_llm(
merged_items, default_metrics, model_provider, model_name, tenant_id
)
@ -70,23 +72,19 @@ class LLMEvaluationRunner(BaseEvaluationRunner):
@staticmethod
def _merge_results_into_items(
items: list[EvaluationItemInput],
results: list[EvaluationItemResult],
items: list[NodeRunResult],
) -> list[EvaluationItemInput]:
"""Create new items with actual_output set as expected_output context for metrics."""
result_by_index = {r.index: r for r in results}
merged = []
for item in items:
result = result_by_index.get(item.index)
if result and result.actual_output:
merged.append(
EvaluationItemInput(
index=item.index,
inputs=item.inputs,
expected_output=item.expected_output,
context=[result.actual_output] + (item.context or []),
)
merged.append(
EvaluationItemInput(
index=item.index,
inputs={
"prompt": item.prompt,
},
output=item.output,
expected_output=item.expected_output,
)
else:
merged.append(item)
)
return merged

View File

@ -24,8 +24,8 @@ class RetrievalEvaluationRunner(BaseEvaluationRunner):
def evaluate_metrics(
self,
node_run_result_mapping: dict[str, NodeRunResult] | None,
node_run_result: NodeRunResult | None,
node_run_result_mapping_list: list[dict[str, NodeRunResult]] | None,
node_run_result_list: list[NodeRunResult] | None,
default_metric: DefaultMetric | None,
customized_metrics: CustomizedMetrics | None,
model_provider: str,

View File

@ -92,8 +92,8 @@ class SnippetEvaluationRunner(BaseEvaluationRunner):
def evaluate_metrics(
self,
node_run_result_mapping: dict[str, NodeRunResult] | None,
node_run_result: NodeRunResult | None,
node_run_result_mapping_list: list[dict[str, NodeRunResult]] | None,
node_run_result_list: list[NodeRunResult] | None,
default_metric: DefaultMetric | None,
customized_metrics: CustomizedMetrics | None,
model_provider: str,

View File

@ -26,8 +26,8 @@ class WorkflowEvaluationRunner(BaseEvaluationRunner):
def evaluate_metrics(
self,
node_run_result_mapping: dict[str, NodeRunResult] | None,
node_run_result: NodeRunResult | None,
node_run_result_mapping_list: list[dict[str, NodeRunResult]] | None,
node_run_result_list: list[NodeRunResult] | None,
default_metric: DefaultMetric | None,
customized_metrics: CustomizedMetrics | None,
model_provider: str,