evaluation runtime

This commit is contained in:
jyong
2026-03-12 14:32:36 +08:00
parent 8ea3729fe9
commit 1d248053e6
2 changed files with 84 additions and 60 deletions

View File

@ -92,13 +92,8 @@ class RagasEvaluator(BaseEvaluationInstance):
tenant_id: str,
category: EvaluationCategory,
) -> list[EvaluationItemResult]:
"""Core evaluation logic using RAGAS.
Uses the Dify model wrapper as judge LLM. Falls back to simple
string similarity if RAGAS import fails.
"""
"""Core evaluation logic using RAGAS."""
model_wrapper = DifyModelWrapper(model_provider, model_name, tenant_id)
# Extract metric names from metric_name string.
requested_metrics = (
[metric_name]
if metric_name
@ -118,54 +113,88 @@ class RagasEvaluator(BaseEvaluationInstance):
model_wrapper: DifyModelWrapper,
category: EvaluationCategory,
) -> list[EvaluationItemResult]:
"""Evaluate using RAGAS library."""
from ragas import evaluate as ragas_evaluate
from ragas.dataset_schema import EvaluationDataset, SingleTurnSample
"""Evaluate using RAGAS library.
# Build RAGAS dataset
samples = []
Builds SingleTurnSample differently per category to match ragas requirements:
- LLM/Workflow: user_input=prompt, response=output, reference=expected_output
- Retrieval: user_input=query, reference=expected_output, retrieved_contexts=context
- Agent: Not supported via EvaluationDataset (requires message-based API)
"""
from ragas import evaluate as ragas_evaluate
from ragas.dataset_schema import EvaluationDataset
samples: list[Any] = []
for item in items:
sample = SingleTurnSample(
user_input=self._inputs_format(item.inputs, category),
response=item.expected_output or "",
retrieved_contexts=item.context or [],
)
if item.expected_output:
sample.reference = item.expected_output
sample = self._build_sample(item, category)
samples.append(sample)
dataset = EvaluationDataset(samples=samples)
# Build metric instances
ragas_metrics = self._build_ragas_metrics(requested_metrics)
if not ragas_metrics:
logger.warning("No valid RAGAS metrics found for: %s", requested_metrics)
return [EvaluationItemResult(index=item.index) for item in items]
# Run RAGAS evaluation
try:
result = ragas_evaluate(
dataset=dataset,
metrics=ragas_metrics,
)
# Convert RAGAS results to our format
results = []
results: list[EvaluationItemResult] = []
result_df = result.to_pandas()
for i, item in enumerate(items):
metrics = []
for metric_name in requested_metrics:
if metric_name in result_df.columns:
score = result_df.iloc[i][metric_name]
if score is not None and not (isinstance(score, float) and score != score): # NaN check
metrics.append(EvaluationMetric(name=metric_name, value=float(score)))
metrics: list[EvaluationMetric] = []
for m_name in requested_metrics:
if m_name in result_df.columns:
score = result_df.iloc[i][m_name]
if score is not None and not (isinstance(score, float) and score != score):
metrics.append(EvaluationMetric(name=m_name, value=float(score)))
results.append(EvaluationItemResult(index=item.index, metrics=metrics))
return results
except Exception:
logger.exception("RAGAS evaluation failed, falling back to simple evaluation")
return self._evaluate_simple(items, requested_metrics, model_wrapper)
@staticmethod
def _build_sample(item: EvaluationItemInput, category: EvaluationCategory) -> Any:
"""Build a ragas SingleTurnSample with the correct fields per category.
ragas metric field requirements:
- faithfulness: user_input, response, retrieved_contexts
- answer_relevancy: user_input, response
- answer_correctness: user_input, response, reference
- semantic_similarity: user_input, response, reference
- context_precision: user_input, reference, retrieved_contexts
- context_recall: user_input, reference, retrieved_contexts
- context_relevance: user_input, retrieved_contexts
"""
from ragas.dataset_schema import SingleTurnSample
user_input = _format_input(item.inputs, category)
match category:
case EvaluationCategory.LLM:
# response = actual LLM output, reference = expected output
return SingleTurnSample(
user_input=user_input,
response=item.output,
reference=item.expected_output or "",
retrieved_contexts=item.context or [],
)
case EvaluationCategory.RETRIEVAL:
# context_precision/recall only need reference + retrieved_contexts
return SingleTurnSample(
user_input=user_input,
reference=item.expected_output or "",
retrieved_contexts=item.context or [],
)
case _:
return SingleTurnSample(
user_input=user_input,
response=item.output,
)
def _evaluate_simple(
self,
items: list[EvaluationItemInput],
@ -173,18 +202,15 @@ class RagasEvaluator(BaseEvaluationInstance):
model_wrapper: DifyModelWrapper,
) -> list[EvaluationItemResult]:
"""Simple LLM-as-judge fallback when RAGAS is not available."""
results = []
results: list[EvaluationItemResult] = []
for item in items:
metrics = []
query = self._inputs_to_query(item.inputs)
for metric_name in requested_metrics:
metrics: list[EvaluationMetric] = []
for m_name in requested_metrics:
try:
score = self._judge_with_llm(model_wrapper, metric_name, query, item)
metrics.append(EvaluationMetric(name=metric_name, value=score))
score = self._judge_with_llm(model_wrapper, m_name, item)
metrics.append(EvaluationMetric(name=m_name, value=score))
except Exception:
logger.exception("Failed to compute metric %s for item %d", metric_name, item.index)
logger.exception("Failed to compute metric %s for item %d", m_name, item.index)
results.append(EvaluationItemResult(index=item.index, metrics=metrics))
return results
@ -192,19 +218,20 @@ class RagasEvaluator(BaseEvaluationInstance):
self,
model_wrapper: DifyModelWrapper,
metric_name: str,
query: str,
item: EvaluationItemInput,
) -> float:
"""Use the LLM to judge a single metric for a single item."""
prompt = self._build_judge_prompt(metric_name, query, item)
prompt = self._build_judge_prompt(metric_name, item)
response = model_wrapper.invoke(prompt)
return self._parse_score(response)
def _build_judge_prompt(self, metric_name: str, query: str, item: EvaluationItemInput) -> str:
@staticmethod
def _build_judge_prompt(metric_name: str, item: EvaluationItemInput) -> str:
"""Build a scoring prompt for the LLM judge."""
parts = [
f"Evaluate the following on the metric '{metric_name}' using a scale of 0.0 to 1.0.",
f"\nQuery: {query}",
f"\nInput: {item.inputs}",
f"\nOutput: {item.output}",
]
if item.expected_output:
parts.append(f"\nExpected Output: {item.expected_output}")
@ -218,31 +245,19 @@ class RagasEvaluator(BaseEvaluationInstance):
@staticmethod
def _parse_score(response: str) -> float:
"""Parse a float score from LLM response."""
import re
cleaned = response.strip()
try:
score = float(cleaned)
return max(0.0, min(1.0, score))
except ValueError:
# Try to extract first number from response
import re
match = re.search(r"(\d+\.?\d*)", cleaned)
if match:
score = float(match.group(1))
return max(0.0, min(1.0, score))
return 0.0
@staticmethod
def _inputs_format(inputs: dict[str, Any], category: EvaluationCategory) -> str:
"""Convert input dict to a prompt string."""
match category:
case EvaluationCategory.LLM:
return str(inputs["prompt"])
case EvaluationCategory.RETRIEVAL:
return str(inputs["query"])
case _:
return ""
@staticmethod
def _build_ragas_metrics(requested_metrics: list[str]) -> list[Any]:
"""Build RAGAS metric instances from metric names."""
@ -280,3 +295,14 @@ class RagasEvaluator(BaseEvaluationInstance):
except ImportError:
logger.warning("RAGAS metrics not available")
return []
def _format_input(inputs: dict[str, Any], category: EvaluationCategory) -> str:
"""Extract the user-facing input string from the inputs dict."""
match category:
case EvaluationCategory.LLM | EvaluationCategory.WORKFLOW:
return str(inputs.get("prompt", ""))
case EvaluationCategory.RETRIEVAL:
return str(inputs.get("query", ""))
case _:
return str(next(iter(inputs.values()), "")) if inputs else ""

View File

@ -42,13 +42,11 @@ class RetrievalEvaluationRunner(BaseEvaluationRunner):
for i, node_result in enumerate(node_run_result_list):
# Extract retrieved contexts from outputs
outputs = node_result.outputs
contexts = list(outputs.get("retrieved_contexts", []))
query = self._extract_query(dict(node_result.inputs))
# Extract retrieved content from result list
result_list = outputs.get("result", [])
output = "\n---\n".join(
str(item.get("content", "")) for item in result_list if item.get("content")
)
contexts = [item.get("content", "") for item in result_list if item.get("content")]
output = "\n---\n".join(contexts)
merged_items.append(
EvaluationItemInput(