Merge branch 'fix/DoS-in-Annotation-Import' into deploy/dev

This commit is contained in:
Yansong Zhang
2025-12-11 12:37:50 +08:00
6 changed files with 633 additions and 14 deletions

View File

@ -1,6 +1,9 @@
import logging
import uuid
import pandas as pd
logger = logging.getLogger(__name__)
from sqlalchemy import or_, select
from werkzeug.datastructures import FileStorage
from werkzeug.exceptions import NotFound
@ -348,6 +351,18 @@ class AppAnnotationService:
@classmethod
def batch_import_app_annotations(cls, app_id, file: FileStorage):
"""
Batch import annotations from CSV file with enhanced security checks.
Security features:
- File size validation
- Row count limits (min/max)
- Memory-efficient CSV parsing
- Subscription quota validation
- Concurrency tracking
"""
from configs import dify_config
# get app info
current_user, current_tenant_id = current_account_with_tenant()
app = (
@ -359,30 +374,125 @@ class AppAnnotationService:
if not app:
raise NotFound("App not found")
job_id: str | None = None # Initialize to avoid unbound variable error
try:
# Skip the first row
df = pd.read_csv(file.stream, dtype=str)
# Quick row count check before full parsing (memory efficient)
# Read only first chunk to estimate row count
file.stream.seek(0)
first_chunk = file.stream.read(8192) # Read first 8KB
file.stream.seek(0)
# Estimate row count from first chunk
newline_count = first_chunk.count(b'\n')
if newline_count == 0:
raise ValueError("The CSV file appears to be empty or invalid.")
# Parse CSV with row limit to prevent memory exhaustion
# Use chunksize for memory-efficient processing
max_records = dify_config.ANNOTATION_IMPORT_MAX_RECORDS
min_records = dify_config.ANNOTATION_IMPORT_MIN_RECORDS
# Read CSV in chunks to avoid loading entire file into memory
df = pd.read_csv(
file.stream,
dtype=str,
nrows=max_records + 1, # Read one extra to detect overflow
engine='python',
on_bad_lines='skip' # Skip malformed lines instead of crashing
)
# Validate column count
if len(df.columns) < 2:
raise ValueError(
"Invalid CSV format. The file must contain at least 2 columns (question and answer)."
)
# Build result list with validation
result = []
for _, row in df.iterrows():
content = {"question": row.iloc[0], "answer": row.iloc[1]}
for idx, row in df.iterrows():
# Stop if we exceed the limit
if len(result) >= max_records:
raise ValueError(
f"The CSV file contains too many records. Maximum {max_records} records allowed per import. "
f"Please split your file into smaller batches."
)
# Validate row has required columns
if pd.isna(row.iloc[0]) or pd.isna(row.iloc[1]):
continue # Skip rows with empty question or answer
question = str(row.iloc[0]).strip()
answer = str(row.iloc[1]).strip()
# Skip empty entries
if not question or not answer:
continue
# Validate length constraints (idx is pandas index, convert to int for display)
row_num = int(idx) + 2 if isinstance(idx, (int, float)) else len(result) + 2
if len(question) > 2000:
raise ValueError(
f"Question at row {row_num} is too long. Maximum 2000 characters allowed."
)
if len(answer) > 10000:
raise ValueError(
f"Answer at row {row_num} is too long. Maximum 10000 characters allowed."
)
content = {"question": question, "answer": answer}
result.append(content)
if len(result) == 0:
raise ValueError("The CSV file is empty.")
# check annotation limit
# Validate minimum records
if len(result) < min_records:
raise ValueError(
f"The CSV file must contain at least {min_records} valid annotation record(s). "
f"Found {len(result)} valid record(s)."
)
# Check annotation quota limit
features = FeatureService.get_features(current_tenant_id)
if features.billing.enabled:
annotation_quota_limit = features.annotation_quota_limit
if annotation_quota_limit.limit < len(result) + annotation_quota_limit.size:
raise ValueError("The number of annotations exceeds the limit of your subscription.")
# async job
raise ValueError(
f"The number of annotations ({len(result)}) would exceed your subscription limit. "
f"Current usage: {annotation_quota_limit.size}/{annotation_quota_limit.limit}. "
f"Available: {annotation_quota_limit.limit - annotation_quota_limit.size}."
)
# Create async job
job_id = str(uuid.uuid4())
indexing_cache_key = f"app_annotation_batch_import_{str(job_id)}"
# send batch add segments task
# Register job in active tasks list for concurrency tracking
current_time = int(naive_utc_now().timestamp() * 1000)
active_jobs_key = f"annotation_import_active:{current_tenant_id}"
redis_client.zadd(active_jobs_key, {job_id: current_time})
redis_client.expire(active_jobs_key, 7200) # 2 hours TTL
# Set job status
redis_client.setnx(indexing_cache_key, "waiting")
redis_client.expire(indexing_cache_key, 3600) # 1 hour TTL
# Send batch import task
batch_import_annotations_task.delay(str(job_id), result, app_id, current_tenant_id, current_user.id)
except Exception as e:
except pd.errors.ParserError as e:
return {"error_msg": f"Failed to parse CSV file: {str(e)}. Please ensure the file is valid CSV format."}
except ValueError as e:
return {"error_msg": str(e)}
return {"job_id": job_id, "job_status": "waiting"}
except Exception as e:
# Clean up active job registration on error (only if job was created)
if job_id is not None:
try:
active_jobs_key = f"annotation_import_active:{current_tenant_id}"
redis_client.zrem(active_jobs_key, job_id)
except Exception:
# Silently ignore cleanup errors - the job will be auto-expired
logger.debug("Failed to clean up active job tracking during error handling")
return {"error_msg": f"An error occurred while processing the file: {str(e)}"}
return {"job_id": job_id, "job_status": "waiting", "record_count": len(result)}
@classmethod
def get_annotation_hit_histories(cls, app_id: str, annotation_id: str, page, limit):