fix: fix session problem.

This commit is contained in:
FFXN
2026-01-28 20:30:16 +08:00
parent 91f730ce40
commit cd1c7e6c17

View File

@ -6,6 +6,8 @@ import uuid
from datetime import UTC, datetime
from typing import Any
from sqlalchemy.orm import Session
from core.db.session_factory import session_factory
from core.model_manager import ModelManager
from core.model_runtime.entities.llm_entities import LLMUsage
@ -117,6 +119,7 @@ class SummaryIndexService:
summary_record: DocumentSegmentSummary,
segment: DocumentSegment,
dataset: Dataset,
session: Session | None = None,
) -> None:
"""
Vectorize summary and store in vector database.
@ -125,6 +128,8 @@ class SummaryIndexService:
summary_record: DocumentSegmentSummary record
segment: Original DocumentSegment
dataset: Dataset containing the segment
session: Optional SQLAlchemy session. If provided, uses this session instead of creating a new one.
If not provided, creates a new session and commits automatically.
"""
if dataset.indexing_technique != "high_quality":
logger.warning(
@ -135,15 +140,25 @@ class SummaryIndexService:
# Get summary_record_id for later session queries
summary_record_id = summary_record.id
# Save the original session parameter for use in error handling
original_session = session
logger.debug(
"Starting vectorization for segment %s, summary_record_id=%s, using_provided_session=%s",
segment.id,
summary_record_id,
original_session is not None,
)
# Reuse existing index_node_id if available (like segment does), otherwise generate new one
old_summary_node_id = summary_record.summary_index_node_id
if old_summary_node_id:
# Reuse existing index_node_id (like segment behavior)
summary_index_node_id = old_summary_node_id
logger.debug("Reusing existing index_node_id %s for segment %s", summary_index_node_id, segment.id)
else:
# Generate new index node ID only for new summaries
summary_index_node_id = str(uuid.uuid4())
logger.debug("Generated new index_node_id %s for segment %s", summary_index_node_id, segment.id)
# Always regenerate hash (in case summary content changed)
summary_content = summary_record.summary_content
@ -200,11 +215,23 @@ class SummaryIndexService:
for attempt in range(max_retries):
try:
logger.debug(
"Attempting to vectorize summary for segment %s (attempt %s/%s)",
segment.id,
attempt + 1,
max_retries,
)
vector = Vector(dataset)
# Use duplicate_check=False to ensure re-vectorization even if old vector still exists
# The old vector should have been deleted above, but if deletion failed,
# we still want to re-vectorize (upsert will overwrite)
vector.add_texts([summary_document], duplicate_check=False)
logger.debug(
"Successfully added summary vector to database for segment %s (attempt %s/%s)",
segment.id,
attempt + 1,
max_retries,
)
# Log embedding token usage
if embedding_tokens > 0:
@ -215,46 +242,148 @@ class SummaryIndexService:
)
# Success - update summary record with index node info
with session_factory.create_session() as session:
# Refresh the summary record in the new session
summary_record_in_session = (
session.query(DocumentSegmentSummary).filter_by(id=summary_record_id).first()
)
if summary_record_in_session:
# Update all fields including summary_content
# Always use the summary_content from the parameter (which is the latest from outer session)
# rather than relying on what's in the database, in case outer session hasn't committed yet
summary_record_in_session.summary_index_node_id = summary_index_node_id
summary_record_in_session.summary_index_node_hash = summary_hash
summary_record_in_session.tokens = embedding_tokens # Save embedding tokens
summary_record_in_session.status = "completed"
# Ensure summary_content is preserved (use the latest from summary_record parameter)
# This is critical: use the parameter value, not the database value
summary_record_in_session.summary_content = summary_content
# Explicitly update updated_at to ensure it's refreshed even if other fields haven't changed
summary_record_in_session.updated_at = datetime.now(UTC).replace(tzinfo=None)
session.add(summary_record_in_session)
session.commit()
logger.info(
"Successfully vectorized summary for segment %s, index_node_id=%s, tokens=%s",
segment.id,
summary_index_node_id,
embedding_tokens,
)
# Update the original object for consistency
summary_record.summary_index_node_id = summary_index_node_id
summary_record.summary_index_node_hash = summary_hash
summary_record.tokens = embedding_tokens
summary_record.status = "completed"
summary_record.summary_content = summary_content
summary_record.updated_at = summary_record_in_session.updated_at
else:
logger.error(
"Summary record not found in database for segment %s (id=%s) after vectorization",
segment.id,
# Use provided session if available, otherwise create a new one
use_provided_session = session is not None
if not use_provided_session:
logger.debug("Creating new session for vectorization of segment %s", segment.id)
session_context = session_factory.create_session()
session = session_context.__enter__()
else:
logger.debug("Using provided session for vectorization of segment %s", segment.id)
session_context = None # Don't use context manager for provided session
try:
# If using provided session, merge the summary_record into it
if use_provided_session:
# Merge the summary_record into the provided session
logger.debug(
"Merging summary_record (id=%s) into provided session for segment %s",
summary_record_id,
segment.id,
)
raise ValueError(f"Summary record not found for segment {segment.id} after vectorization")
summary_record_in_session = session.merge(summary_record)
logger.debug(
"Successfully merged summary_record for segment %s, merged_id=%s",
segment.id,
summary_record_in_session.id,
)
else:
# Query the summary record in the new session
logger.debug(
"Querying summary_record by id=%s for segment %s in new session",
summary_record_id,
segment.id,
)
summary_record_in_session = (
session.query(DocumentSegmentSummary).filter_by(id=summary_record_id).first()
)
if not summary_record_in_session:
# Record not found - try to find by chunk_id and dataset_id instead
logger.debug(
"Summary record not found by id=%s, trying chunk_id=%s and dataset_id=%s "
"for segment %s",
summary_record_id,
segment.id,
dataset.id,
segment.id,
)
summary_record_in_session = (
session.query(DocumentSegmentSummary)
.filter_by(chunk_id=segment.id, dataset_id=dataset.id)
.first()
)
if not summary_record_in_session:
# Still not found - create a new one using the parameter data
logger.warning(
"Summary record not found in database for segment %s (id=%s), creating new one. "
"This may indicate a session isolation issue.",
segment.id,
summary_record_id,
)
summary_record_in_session = DocumentSegmentSummary(
id=summary_record_id, # Use the same ID if available
dataset_id=dataset.id,
document_id=segment.document_id,
chunk_id=segment.id,
summary_content=summary_content,
summary_index_node_id=summary_index_node_id,
summary_index_node_hash=summary_hash,
tokens=embedding_tokens,
status="completed",
enabled=True,
)
session.add(summary_record_in_session)
logger.info(
"Created new summary record (id=%s) for segment %s after vectorization",
summary_record_id,
segment.id,
)
else:
# Found by chunk_id - update it
logger.info(
"Found summary record for segment %s by chunk_id "
"(id mismatch: expected %s, found %s). "
"This may indicate the record was created in a different session.",
segment.id,
summary_record_id,
summary_record_in_session.id,
)
else:
logger.debug(
"Found summary_record (id=%s) for segment %s in new session",
summary_record_id,
segment.id,
)
# Update all fields including summary_content
# Always use the summary_content from the parameter (which is the latest from outer session)
# rather than relying on what's in the database, in case outer session hasn't committed yet
summary_record_in_session.summary_index_node_id = summary_index_node_id
summary_record_in_session.summary_index_node_hash = summary_hash
summary_record_in_session.tokens = embedding_tokens # Save embedding tokens
summary_record_in_session.status = "completed"
# Ensure summary_content is preserved (use the latest from summary_record parameter)
# This is critical: use the parameter value, not the database value
summary_record_in_session.summary_content = summary_content
# Explicitly update updated_at to ensure it's refreshed even if other fields haven't changed
summary_record_in_session.updated_at = datetime.now(UTC).replace(tzinfo=None)
session.add(summary_record_in_session)
# Only commit if we created the session ourselves
if not use_provided_session:
logger.debug("Committing session for segment %s (self-created session)", segment.id)
session.commit()
logger.debug("Successfully committed session for segment %s", segment.id)
else:
logger.debug(
"Skipping commit for segment %s (using provided session, caller will commit)",
segment.id,
)
# If using provided session, let the caller handle commit
logger.info(
"Successfully vectorized summary for segment %s, index_node_id=%s, tokens=%s, "
"summary_record_id=%s, use_provided_session=%s",
segment.id,
summary_index_node_id,
embedding_tokens,
summary_record_in_session.id,
use_provided_session,
)
# Update the original object for consistency
summary_record.summary_index_node_id = summary_index_node_id
summary_record.summary_index_node_hash = summary_hash
summary_record.tokens = embedding_tokens
summary_record.status = "completed"
summary_record.summary_content = summary_content
if summary_record_in_session.updated_at:
summary_record.updated_at = summary_record_in_session.updated_at
finally:
# Only close session if we created it ourselves
if not use_provided_session and session_context:
session_context.__exit__(None, None, None)
# Success, exit function
return
@ -278,7 +407,8 @@ class SummaryIndexService:
# Retry for connection errors
wait_time = retry_delay * (2**attempt) # Exponential backoff
logger.warning(
"Vectorization attempt %s/%s failed for segment %s: %s. Retrying in %.1f seconds...",
"Vectorization attempt %s/%s failed for segment %s (connection error): %s. "
"Retrying in %.1f seconds...",
attempt + 1,
max_retries,
segment.id,
@ -290,27 +420,69 @@ class SummaryIndexService:
else:
# Final attempt failed or non-connection error - log and update status
logger.error(
"Failed to vectorize summary for segment %s after %s attempts: %s",
"Failed to vectorize summary for segment %s after %s attempts: %s. "
"summary_record_id=%s, index_node_id=%s, use_provided_session=%s",
segment.id,
attempt + 1,
str(e),
summary_record_id,
summary_index_node_id,
session is not None,
exc_info=True,
)
# Update error status in session
with session_factory.create_session() as session:
# Use the original_session saved at function start (the function parameter)
logger.debug(
"Updating error status for segment %s, summary_record_id=%s, has_original_session=%s",
segment.id,
summary_record_id,
original_session is not None,
)
# Always create a new session for error handling to avoid issues with closed sessions
# Even if original_session was provided, we create a new one for safety
with session_factory.create_session() as error_session:
# Try to find the record by id first
summary_record_in_session = (
session.query(DocumentSegmentSummary).filter_by(id=summary_record_id).first()
error_session.query(DocumentSegmentSummary).filter_by(id=summary_record_id).first()
)
if not summary_record_in_session:
# Try to find by chunk_id and dataset_id
logger.debug(
"Summary record not found by id=%s, trying chunk_id=%s and dataset_id=%s "
"for segment %s",
summary_record_id,
segment.id,
dataset.id,
segment.id,
)
summary_record_in_session = (
error_session.query(DocumentSegmentSummary)
.filter_by(chunk_id=segment.id, dataset_id=dataset.id)
.first()
)
if summary_record_in_session:
summary_record_in_session.status = "error"
summary_record_in_session.error = f"Vectorization failed: {str(e)}"
summary_record_in_session.updated_at = datetime.now(UTC).replace(tzinfo=None)
session.add(summary_record_in_session)
session.commit()
error_session.add(summary_record_in_session)
error_session.commit()
logger.info(
"Updated error status in new session for segment %s, record_id=%s",
segment.id,
summary_record_in_session.id,
)
# Update the original object for consistency
summary_record.status = "error"
summary_record.error = summary_record_in_session.error
summary_record.updated_at = summary_record_in_session.updated_at
else:
logger.warning(
"Could not update error status: summary record not found for segment %s (id=%s). "
"This may indicate a session isolation issue.",
segment.id,
summary_record_id,
)
raise
@staticmethod
@ -469,7 +641,8 @@ class SummaryIndexService:
# vectorize_summary will update status to "completed" and tokens in its own session
# vectorize_summary will also ensure summary_content is preserved
try:
SummaryIndexService.vectorize_summary(summary_record_in_session, segment, dataset)
# Pass the session to vectorize_summary to avoid session isolation issues
SummaryIndexService.vectorize_summary(summary_record_in_session, segment, dataset, session=session)
# Refresh the object from database to get the updated status and tokens from vectorize_summary
session.refresh(summary_record_in_session)
# Commit the session
@ -734,7 +907,8 @@ class SummaryIndexService:
try:
# Re-vectorize summary (this will update status and tokens in its own session)
SummaryIndexService.vectorize_summary(summary, segment, dataset)
# Pass the session to vectorize_summary to avoid session isolation issues
SummaryIndexService.vectorize_summary(summary, segment, dataset, session=session)
# Refresh the object from database to get the updated status and tokens from vectorize_summary
session.refresh(summary)
@ -889,7 +1063,8 @@ class SummaryIndexService:
# Note: vectorize_summary may take time due to embedding API calls, but we need to complete it
# to ensure the summary is properly indexed
try:
SummaryIndexService.vectorize_summary(summary_record, segment, dataset)
# Pass the session to vectorize_summary to avoid session isolation issues
SummaryIndexService.vectorize_summary(summary_record, segment, dataset, session=session)
# Refresh the object from database to get the updated status and tokens from vectorize_summary
session.refresh(summary_record)
# Now commit the session (summary_record should have status="completed" and tokens from refresh)
@ -913,34 +1088,25 @@ class SummaryIndexService:
segment, dataset, summary_content, status="generating"
)
# Re-vectorize summary (this will update status to "completed" and tokens in its own session)
# Note: summary_record was created in a different session, but vectorize_summary will merge it
try:
SummaryIndexService.vectorize_summary(summary_record, segment, dataset)
# Pass the session to vectorize_summary - it will use merge to handle
# the record from different session
SummaryIndexService.vectorize_summary(summary_record, segment, dataset, session=session)
# Refresh to get updated status and tokens from database
with session_factory.create_session() as refresh_session:
refreshed_record = (
refresh_session.query(DocumentSegmentSummary).filter_by(id=summary_record.id).first()
)
if refreshed_record:
summary_record = refreshed_record
session.refresh(summary_record)
logger.info("Successfully created and vectorized summary for segment %s", segment.id)
return summary_record
except Exception as e:
# If vectorization fails, update status to error
with session_factory.create_session() as error_session:
error_record = (
error_session.query(DocumentSegmentSummary).filter_by(id=summary_record.id).first()
)
if error_record:
error_record.status = "error"
error_record.error = f"Vectorization failed: {str(e)}"
error_session.commit()
# Update the returned record
summary_record.status = "error"
summary_record.error = error_record.error
# If vectorization fails, update status to error in current session
# Merge the record into current session first
error_record = session.merge(summary_record)
error_record.status = "error"
error_record.error = f"Vectorization failed: {str(e)}"
session.commit()
logger.exception("Failed to vectorize summary for segment %s", segment.id)
# Return the record with error status instead of raising
# This allows the segment update to complete even if vectorization fails
return summary_record
return error_record
except Exception as e:
logger.exception("Failed to update summary for segment %s", segment.id)