Mergin main into fix/chore-fix

This commit is contained in:
Yeuoly
2024-10-14 16:22:12 +08:00
433 changed files with 11823 additions and 2782 deletions

View File

@ -1,4 +1,5 @@
import json
from typing import Optional
from core.agent.cot_agent_runner import CotAgentRunner
from core.model_runtime.entities.message_entities import AssistantPromptMessage, PromptMessage, UserPromptMessage
@ -21,7 +22,7 @@ class CotCompletionAgentRunner(CotAgentRunner):
return system_prompt
def _organize_historic_prompt(self, current_session_messages: list[PromptMessage] = None) -> str:
def _organize_historic_prompt(self, current_session_messages: Optional[list[PromptMessage]] = None) -> str:
"""
Organize historic prompt
"""

View File

@ -14,7 +14,7 @@ class CotAgentOutputParser:
) -> Generator[Union[str, AgentScratchpadUnit.Action], None, None]:
def parse_action(json_str):
try:
action = json.loads(json_str)
action = json.loads(json_str, strict=False)
action_name = None
action_input = None

View File

@ -124,6 +124,7 @@ class AdvancedChatAppGenerator(MessageBasedAppGenerator):
# always enable retriever resource in debugger mode
app_config.additional_features.show_retrieve_source = True
workflow_run_id = str(uuid.uuid4())
# init application generate entity
application_generate_entity = AdvancedChatAppGenerateEntity(
task_id=str(uuid.uuid4()),
@ -138,6 +139,7 @@ class AdvancedChatAppGenerator(MessageBasedAppGenerator):
invoke_from=invoke_from,
extras=extras,
trace_manager=trace_manager,
workflow_run_id=workflow_run_id,
)
contexts.tenant_id.set(application_generate_entity.app_config.tenant_id)

View File

@ -149,6 +149,9 @@ class AdvancedChatAppRunner(WorkflowBasedAppRunner):
SystemVariableKey.CONVERSATION_ID: self.conversation.id,
SystemVariableKey.USER_ID: user_id,
SystemVariableKey.DIALOGUE_COUNT: conversation_dialogue_count,
SystemVariableKey.APP_ID: app_config.app_id,
SystemVariableKey.WORKFLOW_ID: app_config.workflow_id,
SystemVariableKey.WORKFLOW_RUN_ID: self.application_generate_entity.workflow_run_id,
}
# init variable pool

View File

@ -45,6 +45,7 @@ from core.app.entities.task_entities import (
from core.app.task_pipeline.based_generate_task_pipeline import BasedGenerateTaskPipeline
from core.app.task_pipeline.message_cycle_manage import MessageCycleManage
from core.app.task_pipeline.workflow_cycle_manage import WorkflowCycleManage
from core.model_runtime.entities.llm_entities import LLMUsage
from core.model_runtime.utils.encoders import jsonable_encoder
from core.ops.ops_trace_manager import TraceQueueManager
from core.workflow.enums import SystemVariableKey
@ -55,6 +56,7 @@ from models.account import Account
from models.model import Conversation, EndUser, Message
from models.workflow import (
Workflow,
WorkflowNodeExecution,
WorkflowRunStatus,
)
@ -71,6 +73,7 @@ class AdvancedChatAppGenerateTaskPipeline(BasedGenerateTaskPipeline, WorkflowCyc
_workflow: Workflow
_user: Union[Account, EndUser]
_workflow_system_variables: dict[SystemVariableKey, Any]
_wip_workflow_node_executions: dict[str, WorkflowNodeExecution]
def __init__(
self,
@ -107,9 +110,14 @@ class AdvancedChatAppGenerateTaskPipeline(BasedGenerateTaskPipeline, WorkflowCyc
SystemVariableKey.FILES: application_generate_entity.files,
SystemVariableKey.CONVERSATION_ID: conversation.id,
SystemVariableKey.USER_ID: user_id,
SystemVariableKey.DIALOGUE_COUNT: conversation.dialogue_count,
SystemVariableKey.APP_ID: application_generate_entity.app_config.app_id,
SystemVariableKey.WORKFLOW_ID: workflow.id,
SystemVariableKey.WORKFLOW_RUN_ID: application_generate_entity.workflow_run_id,
}
self._task_state = WorkflowTaskState()
self._wip_workflow_node_executions = {}
self._conversation_name_generate_thread = None
@ -505,6 +513,10 @@ class AdvancedChatAppGenerateTaskPipeline(BasedGenerateTaskPipeline, WorkflowCyc
self._message.total_price = usage.total_price
self._message.currency = usage.currency
self._task_state.metadata["usage"] = jsonable_encoder(usage)
else:
self._task_state.metadata["usage"] = jsonable_encoder(LLMUsage.empty_usage())
db.session.commit()
message_was_created.send(

View File

@ -111,6 +111,7 @@ class WorkflowAppGenerator(BaseAppGenerator):
user_id = user.id if isinstance(user, Account) else user.session_id
trace_manager = TraceQueueManager(app_model.id, user_id)
workflow_run_id = str(uuid.uuid4())
# init application generate entity
application_generate_entity = WorkflowAppGenerateEntity(
task_id=str(uuid.uuid4()),
@ -122,6 +123,7 @@ class WorkflowAppGenerator(BaseAppGenerator):
invoke_from=invoke_from,
call_depth=call_depth,
trace_manager=trace_manager,
workflow_run_id=workflow_run_id,
)
contexts.tenant_id.set(application_generate_entity.app_config.tenant_id)

View File

@ -90,6 +90,9 @@ class WorkflowAppRunner(WorkflowBasedAppRunner):
system_inputs = {
SystemVariableKey.FILES: files,
SystemVariableKey.USER_ID: user_id,
SystemVariableKey.APP_ID: app_config.app_id,
SystemVariableKey.WORKFLOW_ID: app_config.workflow_id,
SystemVariableKey.WORKFLOW_RUN_ID: self.application_generate_entity.workflow_run_id,
}
variable_pool = VariablePool(

View File

@ -52,6 +52,7 @@ from models.workflow import (
Workflow,
WorkflowAppLog,
WorkflowAppLogCreatedFrom,
WorkflowNodeExecution,
WorkflowRun,
WorkflowRunStatus,
)
@ -69,6 +70,7 @@ class WorkflowAppGenerateTaskPipeline(BasedGenerateTaskPipeline, WorkflowCycleMa
_task_state: WorkflowTaskState
_application_generate_entity: WorkflowAppGenerateEntity
_workflow_system_variables: dict[SystemVariableKey, Any]
_wip_workflow_node_executions: dict[str, WorkflowNodeExecution]
def __init__(
self,
@ -97,9 +99,13 @@ class WorkflowAppGenerateTaskPipeline(BasedGenerateTaskPipeline, WorkflowCycleMa
self._workflow_system_variables = {
SystemVariableKey.FILES: application_generate_entity.files,
SystemVariableKey.USER_ID: user_id,
SystemVariableKey.APP_ID: application_generate_entity.app_config.app_id,
SystemVariableKey.WORKFLOW_ID: workflow.id,
SystemVariableKey.WORKFLOW_RUN_ID: application_generate_entity.workflow_run_id,
}
self._task_state = WorkflowTaskState()
self._wip_workflow_node_executions = {}
def process(self) -> Union[WorkflowAppBlockingResponse, Generator[WorkflowAppStreamResponse, None, None]]:
"""

View File

@ -152,6 +152,7 @@ class AdvancedChatAppGenerateEntity(AppGenerateEntity):
conversation_id: Optional[str] = None
parent_message_id: Optional[str] = None
workflow_run_id: Optional[str] = None
query: str
class SingleIterationRunEntity(BaseModel):
@ -172,6 +173,7 @@ class WorkflowAppGenerateEntity(AppGenerateEntity):
# app config
app_config: WorkflowUIBasedAppConfig
workflow_run_id: Optional[str] = None
class SingleIterationRunEntity(BaseModel):
"""

View File

@ -1,2 +1,2 @@
class VariableError(Exception):
class VariableError(ValueError):
pass

View File

@ -1,8 +1,10 @@
import logging
from threading import Thread
from typing import Optional, Union
from flask import Flask, current_app
from configs import dify_config
from core.app.entities.app_invoke_entities import (
AdvancedChatAppGenerateEntity,
AgentChatAppGenerateEntity,
@ -82,7 +84,9 @@ class MessageCycleManage:
try:
name = LLMGenerator.generate_conversation_name(app_model.tenant_id, query)
conversation.name = name
except:
except Exception as e:
if dify_config.DEBUG:
logging.exception(f"generate conversation name failed: {e}")
pass
db.session.merge(conversation)

View File

@ -57,6 +57,7 @@ class WorkflowCycleManage:
_user: Union[Account, EndUser]
_task_state: WorkflowTaskState
_workflow_system_variables: dict[SystemVariableKey, Any]
_wip_workflow_node_executions: dict[str, WorkflowNodeExecution]
def _handle_workflow_run_start(self) -> WorkflowRun:
max_sequence = (
@ -85,6 +86,9 @@ class WorkflowCycleManage:
# init workflow run
workflow_run = WorkflowRun()
workflow_run_id = self._workflow_system_variables[SystemVariableKey.WORKFLOW_RUN_ID]
if workflow_run_id:
workflow_run.id = workflow_run_id
workflow_run.tenant_id = self._workflow.tenant_id
workflow_run.app_id = self._workflow.app_id
workflow_run.sequence_number = new_sequence_number
@ -248,6 +252,8 @@ class WorkflowCycleManage:
db.session.refresh(workflow_node_execution)
db.session.close()
self._wip_workflow_node_executions[workflow_node_execution.node_execution_id] = workflow_node_execution
return workflow_node_execution
def _handle_workflow_node_execution_success(self, event: QueueNodeSucceededEvent) -> WorkflowNodeExecution:
@ -260,20 +266,36 @@ class WorkflowCycleManage:
inputs = WorkflowEntry.handle_special_values(event.inputs)
outputs = WorkflowEntry.handle_special_values(event.outputs)
execution_metadata = (
json.dumps(jsonable_encoder(event.execution_metadata)) if event.execution_metadata else None
)
finished_at = datetime.now(timezone.utc).replace(tzinfo=None)
elapsed_time = (finished_at - event.start_at).total_seconds()
db.session.query(WorkflowNodeExecution).filter(WorkflowNodeExecution.id == workflow_node_execution.id).update(
{
WorkflowNodeExecution.status: WorkflowNodeExecutionStatus.SUCCEEDED.value,
WorkflowNodeExecution.inputs: json.dumps(inputs) if inputs else None,
WorkflowNodeExecution.process_data: json.dumps(event.process_data) if event.process_data else None,
WorkflowNodeExecution.outputs: json.dumps(outputs) if outputs else None,
WorkflowNodeExecution.execution_metadata: execution_metadata,
WorkflowNodeExecution.finished_at: finished_at,
WorkflowNodeExecution.elapsed_time: elapsed_time,
}
)
db.session.commit()
db.session.close()
workflow_node_execution.status = WorkflowNodeExecutionStatus.SUCCEEDED.value
workflow_node_execution.inputs = json.dumps(inputs) if inputs else None
workflow_node_execution.process_data = json.dumps(event.process_data) if event.process_data else None
workflow_node_execution.outputs = json.dumps(outputs) if outputs else None
workflow_node_execution.execution_metadata = (
json.dumps(jsonable_encoder(event.execution_metadata)) if event.execution_metadata else None
)
workflow_node_execution.finished_at = datetime.now(timezone.utc).replace(tzinfo=None)
workflow_node_execution.elapsed_time = (workflow_node_execution.finished_at - event.start_at).total_seconds()
workflow_node_execution.execution_metadata = execution_metadata
workflow_node_execution.finished_at = finished_at
workflow_node_execution.elapsed_time = elapsed_time
db.session.commit()
db.session.refresh(workflow_node_execution)
db.session.close()
self._wip_workflow_node_executions.pop(workflow_node_execution.node_execution_id)
return workflow_node_execution
@ -287,18 +309,33 @@ class WorkflowCycleManage:
inputs = WorkflowEntry.handle_special_values(event.inputs)
outputs = WorkflowEntry.handle_special_values(event.outputs)
finished_at = datetime.now(timezone.utc).replace(tzinfo=None)
elapsed_time = (finished_at - event.start_at).total_seconds()
db.session.query(WorkflowNodeExecution).filter(WorkflowNodeExecution.id == workflow_node_execution.id).update(
{
WorkflowNodeExecution.status: WorkflowNodeExecutionStatus.FAILED.value,
WorkflowNodeExecution.error: event.error,
WorkflowNodeExecution.inputs: json.dumps(inputs) if inputs else None,
WorkflowNodeExecution.process_data: json.dumps(event.process_data) if event.process_data else None,
WorkflowNodeExecution.outputs: json.dumps(outputs) if outputs else None,
WorkflowNodeExecution.finished_at: finished_at,
WorkflowNodeExecution.elapsed_time: elapsed_time,
}
)
db.session.commit()
db.session.close()
workflow_node_execution.status = WorkflowNodeExecutionStatus.FAILED.value
workflow_node_execution.error = event.error
workflow_node_execution.finished_at = datetime.now(timezone.utc).replace(tzinfo=None)
workflow_node_execution.inputs = json.dumps(inputs) if inputs else None
workflow_node_execution.process_data = json.dumps(event.process_data) if event.process_data else None
workflow_node_execution.outputs = json.dumps(outputs) if outputs else None
workflow_node_execution.elapsed_time = (workflow_node_execution.finished_at - event.start_at).total_seconds()
workflow_node_execution.finished_at = finished_at
workflow_node_execution.elapsed_time = elapsed_time
db.session.commit()
db.session.refresh(workflow_node_execution)
db.session.close()
self._wip_workflow_node_executions.pop(workflow_node_execution.node_execution_id)
return workflow_node_execution
@ -675,17 +712,7 @@ class WorkflowCycleManage:
:param node_execution_id: workflow node execution id
:return:
"""
workflow_node_execution = (
db.session.query(WorkflowNodeExecution)
.filter(
WorkflowNodeExecution.tenant_id == self._application_generate_entity.app_config.tenant_id,
WorkflowNodeExecution.app_id == self._application_generate_entity.app_config.app_id,
WorkflowNodeExecution.workflow_id == self._workflow.id,
WorkflowNodeExecution.triggered_from == WorkflowNodeExecutionTriggeredFrom.WORKFLOW_RUN.value,
WorkflowNodeExecution.node_execution_id == node_execution_id,
)
.first()
)
workflow_node_execution = self._wip_workflow_node_executions.get(node_execution_id)
if not workflow_node_execution:
raise Exception(f"Workflow node execution not found: {node_execution_id}")

View File

@ -1,9 +1,9 @@
import os
from collections.abc import Iterable, Mapping
from typing import Any, Optional, TextIO, Union
from pydantic import BaseModel
from configs import dify_config
from core.ops.entities.trace_entity import TraceTaskName
from core.ops.ops_trace_manager import TraceQueueManager, TraceTask
from core.tools.entities.tool_entities import ToolInvokeMessage
@ -50,7 +50,8 @@ class DifyAgentCallbackHandler(BaseModel):
tool_inputs: Mapping[str, Any],
) -> None:
"""Do nothing."""
print_text("\n[on_tool_start] ToolCall:" + tool_name + "\n" + str(tool_inputs) + "\n", color=self.color)
if dify_config.DEBUG:
print_text("\n[on_tool_start] ToolCall:" + tool_name + "\n" + str(tool_inputs) + "\n", color=self.color)
def on_tool_end(
self,
@ -62,11 +63,12 @@ class DifyAgentCallbackHandler(BaseModel):
trace_manager: Optional[TraceQueueManager] = None,
) -> None:
"""If not the final action, print out observation."""
print_text("\n[on_tool_end]\n", color=self.color)
print_text("Tool: " + tool_name + "\n", color=self.color)
print_text("Inputs: " + str(tool_inputs) + "\n", color=self.color)
print_text("Outputs: " + str(tool_outputs)[:1000] + "\n", color=self.color)
print_text("\n")
if dify_config.DEBUG:
print_text("\n[on_tool_end]\n", color=self.color)
print_text("Tool: " + tool_name + "\n", color=self.color)
print_text("Inputs: " + str(tool_inputs) + "\n", color=self.color)
print_text("Outputs: " + str(tool_outputs)[:1000] + "\n", color=self.color)
print_text("\n")
if trace_manager:
trace_manager.add_trace_task(
@ -82,30 +84,33 @@ class DifyAgentCallbackHandler(BaseModel):
def on_tool_error(self, error: Union[Exception, KeyboardInterrupt], **kwargs: Any) -> None:
"""Do nothing."""
print_text("\n[on_tool_error] Error: " + str(error) + "\n", color="red")
if dify_config.DEBUG:
print_text("\n[on_tool_error] Error: " + str(error) + "\n", color="red")
def on_agent_start(self, thought: str) -> None:
"""Run on agent start."""
if thought:
print_text(
"\n[on_agent_start] \nCurrent Loop: " + str(self.current_loop) + "\nThought: " + thought + "\n",
color=self.color,
)
else:
print_text("\n[on_agent_start] \nCurrent Loop: " + str(self.current_loop) + "\n", color=self.color)
if dify_config.DEBUG:
if thought:
print_text(
"\n[on_agent_start] \nCurrent Loop: " + str(self.current_loop) + "\nThought: " + thought + "\n",
color=self.color,
)
else:
print_text("\n[on_agent_start] \nCurrent Loop: " + str(self.current_loop) + "\n", color=self.color)
def on_agent_finish(self, color: Optional[str] = None, **kwargs: Any) -> None:
"""Run on agent end."""
print_text("\n[on_agent_finish]\n Loop: " + str(self.current_loop) + "\n", color=self.color)
if dify_config.DEBUG:
print_text("\n[on_agent_finish]\n Loop: " + str(self.current_loop) + "\n", color=self.color)
self.current_loop += 1
@property
def ignore_agent(self) -> bool:
"""Whether to ignore agent callbacks."""
return not os.environ.get("DEBUG") or os.environ.get("DEBUG", "").lower() != "true"
return not dify_config.DEBUG
@property
def ignore_chat_model(self) -> bool:
"""Whether to ignore chat model callbacks."""
return not os.environ.get("DEBUG") or os.environ.get("DEBUG", "").lower() != "true"
return not dify_config.DEBUG

View File

@ -44,7 +44,6 @@ class DatasetIndexToolCallbackHandler:
DocumentSegment.index_node_id == document.metadata["doc_id"]
)
# if 'dataset_id' in document.metadata:
if "dataset_id" in document.metadata:
query = query.filter(DocumentSegment.dataset_id == document.metadata["dataset_id"])
@ -59,7 +58,7 @@ class DatasetIndexToolCallbackHandler:
for item in resource:
dataset_retriever_resource = DatasetRetrieverResource(
message_id=self._message_id,
position=item.get("position"),
position=item.get("position") or 0,
dataset_id=item.get("dataset_id"),
dataset_name=item.get("dataset_name"),
document_id=item.get("document_id"),

View File

@ -5,6 +5,7 @@ from typing import Optional, cast
import numpy as np
from sqlalchemy.exc import IntegrityError
from configs import dify_config
from core.embedding.embedding_constant import EmbeddingInputType
from core.model_manager import ModelInstance
from core.model_runtime.entities.model_entities import ModelPropertyKey
@ -110,6 +111,8 @@ class CacheEmbedding(Embeddings):
embedding_results = embedding_result.embeddings[0]
embedding_results = (embedding_results / np.linalg.norm(embedding_results)).tolist()
except Exception as ex:
if dify_config.DEBUG:
logging.exception(f"Failed to embed query text: {ex}")
raise ex
try:
@ -122,6 +125,8 @@ class CacheEmbedding(Embeddings):
encoded_str = encoded_vector.decode("utf-8")
redis_client.setex(embedding_cache_key, 600, encoded_str)
except Exception as ex:
logging.exception("Failed to add embedding to redis %s", ex)
if dify_config.DEBUG:
logging.exception("Failed to add embedding to redis %s", ex)
raise ex
return embedding_results

View File

@ -198,16 +198,34 @@ class MessageFileParser:
if "amazonaws.com" not in parsed_url.netloc:
return False
query_params = parse_qs(parsed_url.query)
required_params = ["Signature", "Expires"]
for param in required_params:
if param not in query_params:
def check_presign_v2(query_params):
required_params = ["Signature", "Expires"]
for param in required_params:
if param not in query_params:
return False
if not query_params["Expires"][0].isdigit():
return False
if not query_params["Expires"][0].isdigit():
return False
signature = query_params["Signature"][0]
if not re.match(r"^[A-Za-z0-9+/]+={0,2}$", signature):
return False
return True
signature = query_params["Signature"][0]
if not re.match(r"^[A-Za-z0-9+/]+={0,2}$", signature):
return False
return True
def check_presign_v4(query_params):
required_params = ["X-Amz-Signature", "X-Amz-Expires"]
for param in required_params:
if param not in query_params:
return False
if not query_params["X-Amz-Expires"][0].isdigit():
return False
signature = query_params["X-Amz-Signature"][0]
if not re.match(r"^[A-Za-z0-9+/]+={0,2}$", signature):
return False
return True
return check_presign_v4(query_params) or check_presign_v2(query_params)
except Exception:
return False

View File

@ -211,9 +211,9 @@ class IndexingRunner:
tenant_id: str,
extract_settings: list[ExtractSetting],
tmp_processing_rule: dict,
doc_form: str = None,
doc_form: Optional[str] = None,
doc_language: str = "English",
dataset_id: str = None,
dataset_id: Optional[str] = None,
indexing_technique: str = "economy",
) -> dict:
"""

View File

@ -58,7 +58,11 @@ class TokenBufferMemory:
# instead of all messages from the conversation, we only need to extract messages
# that belong to the thread of last message
thread_messages = extract_thread_messages(messages)
thread_messages.pop(0)
# for newly created message, its answer is temporarily empty, we don't need to add it to memory
if thread_messages and not thread_messages[0].answer:
thread_messages.pop(0)
messages = list(reversed(thread_messages))
message_file_parser = MessageFileParser(tenant_id=app_record.tenant_id, app_id=app_record.id)

View File

@ -85,6 +85,7 @@ class LargeLanguageModel(AIModel):
)
try:
<<<<<<< HEAD
plugin_model_manager = PluginModelManager()
result = plugin_model_manager.invoke_llm(
tenant_id=self.tenant_id,
@ -116,6 +117,10 @@ class LargeLanguageModel(AIModel):
break
result = LLMResult(
=======
if "response_format" in model_parameters and model_parameters["response_format"] in {"JSON", "XML"}:
result = self._code_block_mode_wrapper(
>>>>>>> main
model=model,
prompt_messages=prompt_messages,
message=AssistantPromptMessage(content=content or content_list),

View File

@ -1,5 +1,11 @@
import logging
<<<<<<< HEAD
from typing import Optional
=======
import re
from abc import abstractmethod
from typing import Any, Optional
>>>>>>> main
from pydantic import ConfigDict
@ -59,6 +65,7 @@ class TTSModel(AIModel):
:param credentials: model credentials
:return: voices lists
"""
<<<<<<< HEAD
plugin_model_manager = PluginModelManager()
return plugin_model_manager.get_tts_model_voices(
tenant_id=self.tenant_id,
@ -69,3 +76,85 @@ class TTSModel(AIModel):
credentials=credentials,
language=language,
)
=======
model_schema = self.get_model_schema(model, credentials)
if model_schema and ModelPropertyKey.VOICES in model_schema.model_properties:
voices = model_schema.model_properties[ModelPropertyKey.VOICES]
if language:
return [
{"name": d["name"], "value": d["mode"]}
for d in voices
if language and language in d.get("language")
]
else:
return [{"name": d["name"], "value": d["mode"]} for d in voices]
def _get_model_default_voice(self, model: str, credentials: dict) -> Any:
"""
Get voice for given tts model
:param model: model name
:param credentials: model credentials
:return: voice
"""
model_schema = self.get_model_schema(model, credentials)
if model_schema and ModelPropertyKey.DEFAULT_VOICE in model_schema.model_properties:
return model_schema.model_properties[ModelPropertyKey.DEFAULT_VOICE]
def _get_model_audio_type(self, model: str, credentials: dict) -> str:
"""
Get audio type for given tts model
:param model: model name
:param credentials: model credentials
:return: voice
"""
model_schema = self.get_model_schema(model, credentials)
if model_schema and ModelPropertyKey.AUDIO_TYPE in model_schema.model_properties:
return model_schema.model_properties[ModelPropertyKey.AUDIO_TYPE]
def _get_model_word_limit(self, model: str, credentials: dict) -> int:
"""
Get audio type for given tts model
:return: audio type
"""
model_schema = self.get_model_schema(model, credentials)
if model_schema and ModelPropertyKey.WORD_LIMIT in model_schema.model_properties:
return model_schema.model_properties[ModelPropertyKey.WORD_LIMIT]
def _get_model_workers_limit(self, model: str, credentials: dict) -> int:
"""
Get audio max workers for given tts model
:return: audio type
"""
model_schema = self.get_model_schema(model, credentials)
if model_schema and ModelPropertyKey.MAX_WORKERS in model_schema.model_properties:
return model_schema.model_properties[ModelPropertyKey.MAX_WORKERS]
@staticmethod
def _split_text_into_sentences(org_text, max_length=2000, pattern=r"[。.!?]"):
match = re.compile(pattern)
tx = match.finditer(org_text)
start = 0
result = []
one_sentence = ""
for i in tx:
end = i.regs[0][1]
tmp = org_text[start:end]
if len(one_sentence + tmp) > max_length:
result.append(one_sentence)
one_sentence = ""
one_sentence += tmp
start = end
last_sens = org_text[start:]
if last_sens:
one_sentence += last_sens
if one_sentence != "":
result.append(one_sentence)
return result
>>>>>>> main

View File

@ -1,168 +0,0 @@
from collections.abc import Mapping
from typing import Optional
import openai
from httpx import Timeout
from openai import OpenAI
from openai.types import ModerationCreateResponse
from core.model_runtime.entities.model_entities import ModelPropertyKey
from core.model_runtime.errors.invoke import (
InvokeAuthorizationError,
InvokeBadRequestError,
InvokeConnectionError,
InvokeError,
InvokeRateLimitError,
InvokeServerUnavailableError,
)
from core.model_runtime.errors.validate import CredentialsValidateFailedError
from core.model_runtime.model_providers.__base.moderation_model import ModerationModel
class OpenAIModerationModel(ModerationModel):
"""
Model class for OpenAI text moderation model.
"""
def _invoke(self, model: str, credentials: dict, text: str, user: Optional[str] = None) -> bool:
"""
Invoke moderation model
:param model: model name
:param credentials: model credentials
:param text: text to moderate
:param user: unique user id
:return: false if text is safe, true otherwise
"""
# transform credentials to kwargs for model instance
credentials_kwargs = self._to_credential_kwargs(credentials)
# init model client
client = OpenAI(**credentials_kwargs)
# chars per chunk
length = self._get_max_characters_per_chunk(model, credentials)
text_chunks = [text[i : i + length] for i in range(0, len(text), length)]
max_text_chunks = self._get_max_chunks(model, credentials)
chunks = [text_chunks[i : i + max_text_chunks] for i in range(0, len(text_chunks), max_text_chunks)]
for text_chunk in chunks:
moderation_result = self._moderation_invoke(model=model, client=client, texts=text_chunk)
for result in moderation_result.results:
if result.flagged is True:
return True
return False
def validate_credentials(self, model: str, credentials: dict) -> None:
"""
Validate model credentials
:param model: model name
:param credentials: model credentials
:return:
"""
try:
# transform credentials to kwargs for model instance
credentials_kwargs = self._to_credential_kwargs(credentials)
client = OpenAI(**credentials_kwargs)
# call moderation model
self._moderation_invoke(
model=model,
client=client,
texts=["ping"],
)
except Exception as ex:
raise CredentialsValidateFailedError(str(ex))
def _moderation_invoke(self, model: str, client: OpenAI, texts: list[str]) -> ModerationCreateResponse:
"""
Invoke moderation model
:param model: model name
:param client: model client
:param texts: texts to moderate
:return: false if text is safe, true otherwise
"""
# call moderation model
moderation_result = client.moderations.create(model=model, input=texts)
return moderation_result
def _get_max_characters_per_chunk(self, model: str, credentials: dict) -> int:
"""
Get max characters per chunk
:param model: model name
:param credentials: model credentials
:return: max characters per chunk
"""
model_schema = self.get_model_schema(model, credentials)
if model_schema and ModelPropertyKey.MAX_CHARACTERS_PER_CHUNK in model_schema.model_properties:
return model_schema.model_properties[ModelPropertyKey.MAX_CHARACTERS_PER_CHUNK]
return 2000
def _get_max_chunks(self, model: str, credentials: dict) -> int:
"""
Get max chunks for given embedding model
:param model: model name
:param credentials: model credentials
:return: max chunks
"""
model_schema = self.get_model_schema(model, credentials)
if model_schema and ModelPropertyKey.MAX_CHUNKS in model_schema.model_properties:
return model_schema.model_properties[ModelPropertyKey.MAX_CHUNKS]
return 1
def _to_credential_kwargs(self, credentials: Mapping) -> dict:
"""
Transform credentials to kwargs for model instance
:param credentials:
:return:
"""
credentials_kwargs = {
"api_key": credentials["openai_api_key"],
"timeout": Timeout(315.0, read=300.0, write=10.0, connect=5.0),
"max_retries": 1,
}
if credentials.get("openai_api_base"):
openai_api_base = credentials["openai_api_base"].rstrip("/")
credentials_kwargs["base_url"] = openai_api_base + "/v1"
if "openai_organization" in credentials:
credentials_kwargs["organization"] = credentials["openai_organization"]
return credentials_kwargs
@property
def _invoke_error_mapping(self) -> dict[type[InvokeError], list[type[Exception]]]:
"""
Map model invoke error to unified error
The key is the error type thrown to the caller
The value is the error type thrown by the model,
which needs to be converted into a unified error type for the caller.
:return: Invoke error mapping
"""
return {
InvokeConnectionError: [openai.APIConnectionError, openai.APITimeoutError],
InvokeServerUnavailableError: [openai.InternalServerError],
InvokeRateLimitError: [openai.RateLimitError],
InvokeAuthorizationError: [openai.AuthenticationError, openai.PermissionDeniedError],
InvokeBadRequestError: [
openai.BadRequestError,
openai.NotFoundError,
openai.UnprocessableEntityError,
openai.APIError,
],
}

View File

@ -1,21 +0,0 @@
<svg version="1.0" xmlns="http://www.w3.org/2000/svg" width="100.000000pt" height="19.000000pt" viewBox="0 0 300.000000 57.000000" preserveAspectRatio="xMidYMid meet"><g transform="translate(0.000000,57.000000) scale(0.100000,-0.100000)" fill="#000000" stroke="none"><path d="M2505 368 c-38 -84 -86 -188 -106 -230 l-38 -78 27 0 c24 0 30 7 55
75 l28 75 100 0 100 0 25 -55 c13 -31 24 -64 24 -75 0 -17 7 -20 44 -20 l43 0
-37 73 c-20 39 -68 143 -106 229 -38 87 -74 158 -80 158 -5 0 -41 -69 -79
-152z m110 -30 c22 -51 41 -95 42 -98 2 -3 -36 -6 -83 -7 -76 -1 -85 0 -81 15
12 40 72 182 77 182 3 0 24 -41 45 -92z"/><path d="M63 493 c19 -61 197 -438 209 -440 10 -2 147 282 216 449 2 4 -10 8
-27 8 -23 0 -31 -5 -31 -17 0 -16 -142 -365 -146 -360 -8 11 -144 329 -149
350 -6 23 -12 27 -42 27 -29 0 -34 -3 -30 -17z"/><path d="M2855 285 l0 -225 30 0 30 0 0 225 0 225 -30 0 -30 0 0 -225z"/><path d="M588 380 c-55 -30 -82 -74 -86 -145 -3 -50 0 -66 20 -95 39 -58 82
-80 153 -80 68 0 110 21 149 73 32 43 30 150 -3 196 -47 66 -158 90 -233 51z
m133 -16 c59 -30 89 -156 54 -224 -45 -87 -162 -78 -201 16 -18 44 -18 128 1
164 28 55 90 73 146 44z"/><path d="M935 303 l76 -98 -7 -72 -6 -73 33 0 34 0 -3 78 -4 77 71 93 c65 85
68 92 46 92 -15 0 -29 -9 -36 -22 -18 -33 -90 -128 -98 -128 -6 1 -67 85 -88
122 -8 15 -24 23 -53 25 l-41 4 76 -98z"/><path d="M1257 230 c-82 -169 -83 -170 -57 -170 17 0 27 6 27 15 0 8 7 31 17
52 l17 38 79 0 78 1 16 -34 c9 -18 16 -42 16 -52 0 -17 7 -20 41 -20 22 0 39
3 37 8 -2 4 -39 80 -83 170 -43 89 -84 162 -92 162 -7 0 -50 -76 -96 -170z
m90 -38 c-33 -2 -61 -1 -63 1 -2 2 10 34 26 71 l31 68 33 -68 33 -69 -60 -3z"/><path d="M1665 386 c-37 -16 -84 -63 -97 -96 -13 -35 -12 -104 2 -132 49 -94
182 -134 280 -83 24 12 29 22 32 64 3 49 3 49 -30 53 l-33 4 3 -45 c4 -61 -5
-71 -60 -71 -93 0 -142 57 -142 164 0 44 5 60 25 85 47 55 136 65 184 20 30
-28 35 -20 11 19 -19 31 -22 32 -82 32 -35 -1 -76 -7 -93 -14z"/><path d="M1955 230 l0 -170 91 0 c76 0 93 3 98 16 4 9 5 18 4 20 -2 1 -31 -1
-66 -5 -34 -4 -64 -5 -67 -3 -3 3 -5 36 -5 73 l0 68 55 -6 c49 -5 55 -4 55 13
0 17 -6 19 -55 16 l-55 -4 0 61 0 61 64 0 c48 0 65 4 70 15 4 13 -10 15 -92
15 l-97 0 0 -170z"/></g></svg>

Before

Width:  |  Height:  |  Size: 2.2 KiB

View File

@ -1,8 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<svg width="64px" height="64px" viewBox="0 0 64 64" version="1.1" xmlns="http://www.w3.org/2000/svg" xmlns:xlink="http://www.w3.org/1999/xlink">
<title>voyage</title>
<g id="voyage" stroke="none" stroke-width="1" fill="none" fill-rule="evenodd">
<rect id="矩形" fill="#333333" x="0" y="0" width="64" height="64" rx="12"></rect>
<path d="M12.1128004,51.4376727 C13.8950799,45.8316747 30.5922254,11.1847688 31.7178757,11.0009656 C32.6559176,10.8171624 45.5070913,36.9172188 51.9795803,52.2647871 C52.1671887,52.6323936 51.0415384,53 49.4468672,53 C47.2893709,53 46.5389374,52.540492 46.5389374,51.4376727 C46.5389374,49.967247 33.2187427,17.8935861 32.8435259,18.3530942 C32.0930924,19.3640118 19.3357228,48.5887229 18.8667019,50.5186566 C18.3038768,52.6323936 17.7410516,53 14.926926,53 C12.2066045,53 11.7375836,52.7242952 12.1128004,51.4376727 Z" id="路径" fill="#FFFFFF" transform="translate(32, 32) scale(1, -1) translate(-32, -32)"></path>
</g>
</svg>

Before

Width:  |  Height:  |  Size: 1.0 KiB

View File

@ -1,4 +0,0 @@
model: rerank-1
model_type: rerank
model_properties:
context_size: 8000

View File

@ -1,4 +0,0 @@
model: rerank-lite-1
model_type: rerank
model_properties:
context_size: 4000

View File

@ -1,123 +0,0 @@
from typing import Optional
import httpx
from core.model_runtime.entities.common_entities import I18nObject
from core.model_runtime.entities.model_entities import AIModelEntity, FetchFrom, ModelPropertyKey, ModelType
from core.model_runtime.entities.rerank_entities import RerankDocument, RerankResult
from core.model_runtime.errors.invoke import (
InvokeAuthorizationError,
InvokeBadRequestError,
InvokeConnectionError,
InvokeError,
InvokeRateLimitError,
InvokeServerUnavailableError,
)
from core.model_runtime.errors.validate import CredentialsValidateFailedError
from core.model_runtime.model_providers.__base.rerank_model import RerankModel
class VoyageRerankModel(RerankModel):
"""
Model class for Voyage rerank model.
"""
def _invoke(
self,
model: str,
credentials: dict,
query: str,
docs: list[str],
score_threshold: Optional[float] = None,
top_n: Optional[int] = None,
user: Optional[str] = None,
) -> RerankResult:
"""
Invoke rerank model
:param model: model name
:param credentials: model credentials
:param query: search query
:param docs: docs for reranking
:param score_threshold: score threshold
:param top_n: top n documents to return
:param user: unique user id
:return: rerank result
"""
if len(docs) == 0:
return RerankResult(model=model, docs=[])
base_url = credentials.get("base_url", "https://api.voyageai.com/v1")
base_url = base_url.removesuffix("/")
try:
response = httpx.post(
base_url + "/rerank",
json={"model": model, "query": query, "documents": docs, "top_k": top_n, "return_documents": True},
headers={"Authorization": f"Bearer {credentials.get('api_key')}", "Content-Type": "application/json"},
)
response.raise_for_status()
results = response.json()
rerank_documents = []
for result in results["data"]:
rerank_document = RerankDocument(
index=result["index"],
text=result["document"],
score=result["relevance_score"],
)
if score_threshold is None or result["relevance_score"] >= score_threshold:
rerank_documents.append(rerank_document)
return RerankResult(model=model, docs=rerank_documents)
except httpx.HTTPStatusError as e:
raise InvokeServerUnavailableError(str(e))
def validate_credentials(self, model: str, credentials: dict) -> None:
"""
Validate model credentials
:param model: model name
:param credentials: model credentials
:return:
"""
try:
self._invoke(
model=model,
credentials=credentials,
query="What is the capital of the United States?",
docs=[
"Carson City is the capital city of the American state of Nevada. At the 2010 United States "
"Census, Carson City had a population of 55,274.",
"The Commonwealth of the Northern Mariana Islands is a group of islands in the Pacific Ocean that "
"are a political division controlled by the United States. Its capital is Saipan.",
],
score_threshold=0.8,
)
except Exception as ex:
raise CredentialsValidateFailedError(str(ex))
@property
def _invoke_error_mapping(self) -> dict[type[InvokeError], list[type[Exception]]]:
"""
Map model invoke error to unified error
"""
return {
InvokeConnectionError: [httpx.ConnectError],
InvokeServerUnavailableError: [httpx.RemoteProtocolError],
InvokeRateLimitError: [],
InvokeAuthorizationError: [httpx.HTTPStatusError],
InvokeBadRequestError: [httpx.RequestError],
}
def get_customizable_model_schema(self, model: str, credentials: dict) -> AIModelEntity:
"""
generate custom model entities from credentials
"""
entity = AIModelEntity(
model=model,
label=I18nObject(en_US=model),
model_type=ModelType.RERANK,
fetch_from=FetchFrom.CUSTOMIZABLE_MODEL,
model_properties={ModelPropertyKey.CONTEXT_SIZE: int(credentials.get("context_size", "8000"))},
)
return entity

View File

@ -1,172 +0,0 @@
import time
from json import JSONDecodeError, dumps
from typing import Optional
import requests
from core.embedding.embedding_constant import EmbeddingInputType
from core.model_runtime.entities.common_entities import I18nObject
from core.model_runtime.entities.model_entities import AIModelEntity, FetchFrom, ModelPropertyKey, ModelType, PriceType
from core.model_runtime.entities.text_embedding_entities import EmbeddingUsage, TextEmbeddingResult
from core.model_runtime.errors.invoke import (
InvokeAuthorizationError,
InvokeBadRequestError,
InvokeConnectionError,
InvokeError,
InvokeRateLimitError,
InvokeServerUnavailableError,
)
from core.model_runtime.errors.validate import CredentialsValidateFailedError
from core.model_runtime.model_providers.__base.text_embedding_model import TextEmbeddingModel
class VoyageTextEmbeddingModel(TextEmbeddingModel):
"""
Model class for Voyage text embedding model.
"""
api_base: str = "https://api.voyageai.com/v1"
def _invoke(
self,
model: str,
credentials: dict,
texts: list[str],
user: Optional[str] = None,
input_type: EmbeddingInputType = EmbeddingInputType.DOCUMENT,
) -> TextEmbeddingResult:
"""
Invoke text embedding model
:param model: model name
:param credentials: model credentials
:param texts: texts to embed
:param user: unique user id
:param input_type: input type
:return: embeddings result
"""
api_key = credentials["api_key"]
if not api_key:
raise CredentialsValidateFailedError("api_key is required")
base_url = credentials.get("base_url", self.api_base)
base_url = base_url.removesuffix("/")
url = base_url + "/embeddings"
headers = {"Authorization": "Bearer " + api_key, "Content-Type": "application/json"}
voyage_input_type = "null"
if input_type is not None:
voyage_input_type = input_type.value
data = {"model": model, "input": texts, "input_type": voyage_input_type}
try:
response = requests.post(url, headers=headers, data=dumps(data))
except Exception as e:
raise InvokeConnectionError(str(e))
if response.status_code != 200:
try:
resp = response.json()
msg = resp["detail"]
if response.status_code == 401:
raise InvokeAuthorizationError(msg)
elif response.status_code == 429:
raise InvokeRateLimitError(msg)
elif response.status_code == 500:
raise InvokeServerUnavailableError(msg)
else:
raise InvokeBadRequestError(msg)
except JSONDecodeError as e:
raise InvokeServerUnavailableError(
f"Failed to convert response to json: {e} with text: {response.text}"
)
try:
resp = response.json()
embeddings = resp["data"]
usage = resp["usage"]
except Exception as e:
raise InvokeServerUnavailableError(f"Failed to convert response to json: {e} with text: {response.text}")
usage = self._calc_response_usage(model=model, credentials=credentials, tokens=usage["total_tokens"])
result = TextEmbeddingResult(
model=model, embeddings=[[float(data) for data in x["embedding"]] for x in embeddings], usage=usage
)
return result
def get_num_tokens(self, model: str, credentials: dict, texts: list[str]) -> int:
"""
Get number of tokens for given prompt messages
:param model: model name
:param credentials: model credentials
:param texts: texts to embed
:return:
"""
return sum(self._get_num_tokens_by_gpt2(text) for text in texts)
def validate_credentials(self, model: str, credentials: dict) -> None:
"""
Validate model credentials
:param model: model name
:param credentials: model credentials
:return:
"""
try:
self._invoke(model=model, credentials=credentials, texts=["ping"])
except Exception as e:
raise CredentialsValidateFailedError(f"Credentials validation failed: {e}")
@property
def _invoke_error_mapping(self) -> dict[type[InvokeError], list[type[Exception]]]:
return {
InvokeConnectionError: [InvokeConnectionError],
InvokeServerUnavailableError: [InvokeServerUnavailableError],
InvokeRateLimitError: [InvokeRateLimitError],
InvokeAuthorizationError: [InvokeAuthorizationError],
InvokeBadRequestError: [KeyError, InvokeBadRequestError],
}
def _calc_response_usage(self, model: str, credentials: dict, tokens: int) -> EmbeddingUsage:
"""
Calculate response usage
:param model: model name
:param credentials: model credentials
:param tokens: input tokens
:return: usage
"""
# get input price info
input_price_info = self.get_price(
model=model, credentials=credentials, price_type=PriceType.INPUT, tokens=tokens
)
# transform usage
usage = EmbeddingUsage(
tokens=tokens,
total_tokens=tokens,
unit_price=input_price_info.unit_price,
price_unit=input_price_info.unit,
total_price=input_price_info.total_amount,
currency=input_price_info.currency,
latency=time.perf_counter() - self.started_at,
)
return usage
def get_customizable_model_schema(self, model: str, credentials: dict) -> AIModelEntity:
"""
generate custom model entities from credentials
"""
entity = AIModelEntity(
model=model,
label=I18nObject(en_US=model),
model_type=ModelType.TEXT_EMBEDDING,
fetch_from=FetchFrom.CUSTOMIZABLE_MODEL,
model_properties={ModelPropertyKey.CONTEXT_SIZE: int(credentials.get("context_size"))},
)
return entity

View File

@ -1,8 +0,0 @@
model: voyage-3-lite
model_type: text-embedding
model_properties:
context_size: 32000
pricing:
input: '0.00002'
unit: '0.001'
currency: USD

View File

@ -1,8 +0,0 @@
model: voyage-3
model_type: text-embedding
model_properties:
context_size: 32000
pricing:
input: '0.00006'
unit: '0.001'
currency: USD

View File

@ -1,28 +0,0 @@
import logging
from core.model_runtime.entities.model_entities import ModelType
from core.model_runtime.errors.validate import CredentialsValidateFailedError
from core.model_runtime.model_providers.__base.model_provider import ModelProvider
logger = logging.getLogger(__name__)
class VoyageProvider(ModelProvider):
def validate_provider_credentials(self, credentials: dict) -> None:
"""
Validate provider credentials
if validate failed, raise exception
:param credentials: provider credentials, credentials form defined in `provider_credential_schema`.
"""
try:
model_instance = self.get_model_instance(ModelType.TEXT_EMBEDDING)
# Use `voyage-3` model for validate,
# no matter what model you pass in, text completion model or chat model
model_instance.validate_credentials(model="voyage-3", credentials=credentials)
except CredentialsValidateFailedError as ex:
raise ex
except Exception as ex:
logger.exception(f"{self.get_provider_schema().provider} credentials validate failed")
raise ex

View File

@ -1,31 +0,0 @@
provider: voyage
label:
en_US: Voyage
description:
en_US: Embedding and Rerank Model Supported
icon_small:
en_US: icon_s_en.svg
icon_large:
en_US: icon_l_en.svg
background: "#EFFDFD"
help:
title:
en_US: Get your API key from Voyage AI
zh_Hans: 从 Voyage 获取 API Key
url:
en_US: https://dash.voyageai.com/
supported_model_types:
- text-embedding
- rerank
configurate_methods:
- predefined-model
provider_credential_schema:
credential_form_schemas:
- variable: api_key
label:
en_US: API Key
type: secret-input
required: true
placeholder:
zh_Hans: 在此输入您的 API Key
en_US: Enter your API Key

View File

@ -18,8 +18,12 @@ class KeywordsModeration(Moderation):
if not config.get("keywords"):
raise ValueError("keywords is required")
if len(config.get("keywords")) > 1000:
raise ValueError("keywords length must be less than 1000")
if len(config.get("keywords")) > 10000:
raise ValueError("keywords length must be less than 10000")
keywords_row_len = config["keywords"].split("\n")
if len(keywords_row_len) > 100:
raise ValueError("the number of rows for the keywords must be less than 100")
def moderation_for_inputs(self, inputs: dict, query: str = "") -> ModerationInputsResult:
flagged = False

View File

@ -110,26 +110,35 @@ class LangFuseDataTrace(BaseTraceInstance):
self.add_trace(langfuse_trace_data=trace_data)
# through workflow_run_id get all_nodes_execution
workflow_nodes_executions = (
db.session.query(
WorkflowNodeExecution.id,
WorkflowNodeExecution.tenant_id,
WorkflowNodeExecution.app_id,
WorkflowNodeExecution.title,
WorkflowNodeExecution.node_type,
WorkflowNodeExecution.status,
WorkflowNodeExecution.inputs,
WorkflowNodeExecution.outputs,
WorkflowNodeExecution.created_at,
WorkflowNodeExecution.elapsed_time,
WorkflowNodeExecution.process_data,
WorkflowNodeExecution.execution_metadata,
)
workflow_nodes_execution_id_records = (
db.session.query(WorkflowNodeExecution.id)
.filter(WorkflowNodeExecution.workflow_run_id == trace_info.workflow_run_id)
.all()
)
for node_execution in workflow_nodes_executions:
for node_execution_id_record in workflow_nodes_execution_id_records:
node_execution = (
db.session.query(
WorkflowNodeExecution.id,
WorkflowNodeExecution.tenant_id,
WorkflowNodeExecution.app_id,
WorkflowNodeExecution.title,
WorkflowNodeExecution.node_type,
WorkflowNodeExecution.status,
WorkflowNodeExecution.inputs,
WorkflowNodeExecution.outputs,
WorkflowNodeExecution.created_at,
WorkflowNodeExecution.elapsed_time,
WorkflowNodeExecution.process_data,
WorkflowNodeExecution.execution_metadata,
)
.filter(WorkflowNodeExecution.id == node_execution_id_record.id)
.first()
)
if not node_execution:
continue
node_execution_id = node_execution.id
tenant_id = node_execution.tenant_id
app_id = node_execution.app_id
@ -159,6 +168,16 @@ class LangFuseDataTrace(BaseTraceInstance):
"status": status,
}
)
process_data = json.loads(node_execution.process_data) if node_execution.process_data else {}
model_provider = process_data.get("model_provider", None)
model_name = process_data.get("model_name", None)
if model_provider is not None and model_name is not None:
metadata.update(
{
"model_provider": model_provider,
"model_name": model_name,
}
)
# add span
if trace_info.message_id:
@ -191,7 +210,6 @@ class LangFuseDataTrace(BaseTraceInstance):
self.add_span(langfuse_span_data=span_data)
process_data = json.loads(node_execution.process_data) if node_execution.process_data else {}
if process_data and process_data.get("model_mode") == "chat":
total_token = metadata.get("total_tokens", 0)
# add generation

View File

@ -100,26 +100,35 @@ class LangSmithDataTrace(BaseTraceInstance):
self.add_run(langsmith_run)
# through workflow_run_id get all_nodes_execution
workflow_nodes_executions = (
db.session.query(
WorkflowNodeExecution.id,
WorkflowNodeExecution.tenant_id,
WorkflowNodeExecution.app_id,
WorkflowNodeExecution.title,
WorkflowNodeExecution.node_type,
WorkflowNodeExecution.status,
WorkflowNodeExecution.inputs,
WorkflowNodeExecution.outputs,
WorkflowNodeExecution.created_at,
WorkflowNodeExecution.elapsed_time,
WorkflowNodeExecution.process_data,
WorkflowNodeExecution.execution_metadata,
)
workflow_nodes_execution_id_records = (
db.session.query(WorkflowNodeExecution.id)
.filter(WorkflowNodeExecution.workflow_run_id == trace_info.workflow_run_id)
.all()
)
for node_execution in workflow_nodes_executions:
for node_execution_id_record in workflow_nodes_execution_id_records:
node_execution = (
db.session.query(
WorkflowNodeExecution.id,
WorkflowNodeExecution.tenant_id,
WorkflowNodeExecution.app_id,
WorkflowNodeExecution.title,
WorkflowNodeExecution.node_type,
WorkflowNodeExecution.status,
WorkflowNodeExecution.inputs,
WorkflowNodeExecution.outputs,
WorkflowNodeExecution.created_at,
WorkflowNodeExecution.elapsed_time,
WorkflowNodeExecution.process_data,
WorkflowNodeExecution.execution_metadata,
)
.filter(WorkflowNodeExecution.id == node_execution_id_record.id)
.first()
)
if not node_execution:
continue
node_execution_id = node_execution.id
tenant_id = node_execution.tenant_id
app_id = node_execution.app_id

View File

@ -27,9 +27,11 @@ class BaseKeyword(ABC):
def delete_by_ids(self, ids: list[str]) -> None:
raise NotImplementedError
@abstractmethod
def delete(self) -> None:
raise NotImplementedError
@abstractmethod
def search(self, query: str, **kwargs: Any) -> list[Document]:
raise NotImplementedError

View File

@ -10,6 +10,7 @@ from core.rag.rerank.constants.rerank_mode import RerankMode
from core.rag.retrieval.retrieval_methods import RetrievalMethod
from extensions.ext_database import db
from models.dataset import Dataset
from services.external_knowledge_service import ExternalDatasetService
default_retrieval_model = {
"search_method": RetrievalMethod.SEMANTIC_SEARCH.value,
@ -34,6 +35,9 @@ class RetrievalService:
weights: Optional[dict] = None,
):
dataset = db.session.query(Dataset).filter(Dataset.id == dataset_id).first()
if not dataset:
return []
if not dataset or dataset.available_document_count == 0 or dataset.available_segment_count == 0:
return []
all_documents = []
@ -108,6 +112,16 @@ class RetrievalService:
)
return all_documents
@classmethod
def external_retrieve(cls, dataset_id: str, query: str, external_retrieval_model: Optional[dict] = None):
dataset = db.session.query(Dataset).filter(Dataset.id == dataset_id).first()
if not dataset:
return []
all_documents = ExternalDatasetService.fetch_external_knowledge_retrieval(
dataset.tenant_id, dataset_id, query, external_retrieval_model
)
return all_documents
@classmethod
def keyword_search(
cls, flask_app: Flask, dataset_id: str, query: str, top_k: int, all_documents: list, exceptions: list

View File

@ -0,0 +1,272 @@
import json
import time
import uuid
from typing import Any
from pydantic import BaseModel, model_validator
from pymochow import MochowClient
from pymochow.auth.bce_credentials import BceCredentials
from pymochow.configuration import Configuration
from pymochow.model.enum import FieldType, IndexState, IndexType, MetricType, TableState
from pymochow.model.schema import Field, HNSWParams, Schema, VectorIndex
from pymochow.model.table import AnnSearch, HNSWSearchParams, Partition, Row
from configs import dify_config
from core.rag.datasource.entity.embedding import Embeddings
from core.rag.datasource.vdb.vector_base import BaseVector
from core.rag.datasource.vdb.vector_factory import AbstractVectorFactory
from core.rag.datasource.vdb.vector_type import VectorType
from core.rag.models.document import Document
from extensions.ext_redis import redis_client
from models.dataset import Dataset
class BaiduConfig(BaseModel):
endpoint: str
connection_timeout_in_mills: int = 30 * 1000
account: str
api_key: str
database: str
index_type: str = "HNSW"
metric_type: str = "L2"
shard: int = 1
replicas: int = 3
@model_validator(mode="before")
@classmethod
def validate_config(cls, values: dict) -> dict:
if not values["endpoint"]:
raise ValueError("config BAIDU_VECTOR_DB_ENDPOINT is required")
if not values["account"]:
raise ValueError("config BAIDU_VECTOR_DB_ACCOUNT is required")
if not values["api_key"]:
raise ValueError("config BAIDU_VECTOR_DB_API_KEY is required")
if not values["database"]:
raise ValueError("config BAIDU_VECTOR_DB_DATABASE is required")
return values
class BaiduVector(BaseVector):
field_id: str = "id"
field_vector: str = "vector"
field_text: str = "text"
field_metadata: str = "metadata"
field_app_id: str = "app_id"
field_annotation_id: str = "annotation_id"
index_vector: str = "vector_idx"
def __init__(self, collection_name: str, config: BaiduConfig):
super().__init__(collection_name)
self._client_config = config
self._client = self._init_client(config)
self._db = self._init_database()
def get_type(self) -> str:
return VectorType.BAIDU
def to_index_struct(self) -> dict:
return {"type": self.get_type(), "vector_store": {"class_prefix": self._collection_name}}
def create(self, texts: list[Document], embeddings: list[list[float]], **kwargs):
self._create_table(len(embeddings[0]))
self.add_texts(texts, embeddings)
def add_texts(self, documents: list[Document], embeddings: list[list[float]], **kwargs):
texts = [doc.page_content for doc in documents]
metadatas = [doc.metadata for doc in documents]
total_count = len(documents)
batch_size = 1000
# upsert texts and embeddings batch by batch
table = self._db.table(self._collection_name)
for start in range(0, total_count, batch_size):
end = min(start + batch_size, total_count)
rows = []
for i in range(start, end, 1):
row = Row(
id=metadatas[i].get("doc_id", str(uuid.uuid4())),
vector=embeddings[i],
text=texts[i],
metadata=json.dumps(metadatas[i]),
app_id=metadatas[i].get("app_id", ""),
annotation_id=metadatas[i].get("annotation_id", ""),
)
rows.append(row)
table.upsert(rows=rows)
# rebuild vector index after upsert finished
table.rebuild_index(self.index_vector)
while True:
time.sleep(1)
index = table.describe_index(self.index_vector)
if index.state == IndexState.NORMAL:
break
def text_exists(self, id: str) -> bool:
res = self._db.table(self._collection_name).query(primary_key={self.field_id: id})
if res and res.code == 0:
return True
return False
def delete_by_ids(self, ids: list[str]) -> None:
quoted_ids = [f"'{id}'" for id in ids]
self._db.table(self._collection_name).delete(filter=f"id IN({', '.join(quoted_ids)})")
def delete_by_metadata_field(self, key: str, value: str) -> None:
self._db.table(self._collection_name).delete(filter=f"{key} = '{value}'")
def search_by_vector(self, query_vector: list[float], **kwargs: Any) -> list[Document]:
anns = AnnSearch(
vector_field=self.field_vector,
vector_floats=query_vector,
params=HNSWSearchParams(ef=kwargs.get("ef", 10), limit=kwargs.get("top_k", 4)),
)
res = self._db.table(self._collection_name).search(
anns=anns,
projections=[self.field_id, self.field_text, self.field_metadata],
retrieve_vector=True,
)
score_threshold = float(kwargs.get("score_threshold") or 0.0)
return self._get_search_res(res, score_threshold)
def search_by_full_text(self, query: str, **kwargs: Any) -> list[Document]:
# baidu vector database doesn't support bm25 search on current version
return []
def _get_search_res(self, res, score_threshold):
docs = []
for row in res.rows:
row_data = row.get("row", {})
meta = row_data.get(self.field_metadata)
if meta is not None:
meta = json.loads(meta)
score = row.get("score", 0.0)
if score > score_threshold:
meta["score"] = score
doc = Document(page_content=row_data.get(self.field_text), metadata=meta)
docs.append(doc)
return docs
def delete(self) -> None:
self._db.drop_table(table_name=self._collection_name)
def _init_client(self, config) -> MochowClient:
config = Configuration(credentials=BceCredentials(config.account, config.api_key), endpoint=config.endpoint)
client = MochowClient(config)
return client
def _init_database(self):
exists = False
for db in self._client.list_databases():
if db.database_name == self._client_config.database:
exists = True
break
# Create database if not existed
if exists:
return self._client.database(self._client_config.database)
else:
return self._client.create_database(database_name=self._client_config.database)
def _table_existed(self) -> bool:
tables = self._db.list_table()
return any(table.table_name == self._collection_name for table in tables)
def _create_table(self, dimension: int) -> None:
# Try to grab distributed lock and create table
lock_name = "vector_indexing_lock_{}".format(self._collection_name)
with redis_client.lock(lock_name, timeout=20):
table_exist_cache_key = "vector_indexing_{}".format(self._collection_name)
if redis_client.get(table_exist_cache_key):
return
if self._table_existed():
return
self.delete()
# check IndexType and MetricType
index_type = None
for k, v in IndexType.__members__.items():
if k == self._client_config.index_type:
index_type = v
if index_type is None:
raise ValueError("unsupported index_type")
metric_type = None
for k, v in MetricType.__members__.items():
if k == self._client_config.metric_type:
metric_type = v
if metric_type is None:
raise ValueError("unsupported metric_type")
# Construct field schema
fields = []
fields.append(
Field(
self.field_id,
FieldType.STRING,
primary_key=True,
partition_key=True,
auto_increment=False,
not_null=True,
)
)
fields.append(Field(self.field_metadata, FieldType.STRING, not_null=True))
fields.append(Field(self.field_app_id, FieldType.STRING))
fields.append(Field(self.field_annotation_id, FieldType.STRING))
fields.append(Field(self.field_text, FieldType.TEXT, not_null=True))
fields.append(Field(self.field_vector, FieldType.FLOAT_VECTOR, not_null=True, dimension=dimension))
# Construct vector index params
indexes = []
indexes.append(
VectorIndex(
index_name="vector_idx",
index_type=index_type,
field="vector",
metric_type=metric_type,
params=HNSWParams(m=16, efconstruction=200),
)
)
# Create table
self._db.create_table(
table_name=self._collection_name,
replication=self._client_config.replicas,
partition=Partition(partition_num=self._client_config.shard),
schema=Schema(fields=fields, indexes=indexes),
description="Table for Dify",
)
redis_client.set(table_exist_cache_key, 1, ex=3600)
# Wait for table created
while True:
time.sleep(1)
table = self._db.describe_table(self._collection_name)
if table.state == TableState.NORMAL:
break
class BaiduVectorFactory(AbstractVectorFactory):
def init_vector(self, dataset: Dataset, attributes: list, embeddings: Embeddings) -> BaiduVector:
if dataset.index_struct_dict:
class_prefix: str = dataset.index_struct_dict["vector_store"]["class_prefix"]
collection_name = class_prefix.lower()
else:
dataset_id = dataset.id
collection_name = Dataset.gen_collection_name_by_id(dataset_id).lower()
dataset.index_struct = json.dumps(self.gen_index_struct_dict(VectorType.BAIDU, collection_name))
return BaiduVector(
collection_name=collection_name,
config=BaiduConfig(
endpoint=dify_config.BAIDU_VECTOR_DB_ENDPOINT,
connection_timeout_in_mills=dify_config.BAIDU_VECTOR_DB_CONNECTION_TIMEOUT_MS,
account=dify_config.BAIDU_VECTOR_DB_ACCOUNT,
api_key=dify_config.BAIDU_VECTOR_DB_API_KEY,
database=dify_config.BAIDU_VECTOR_DB_DATABASE,
shard=dify_config.BAIDU_VECTOR_DB_SHARD,
replicas=dify_config.BAIDU_VECTOR_DB_REPLICAS,
),
)

View File

@ -1,5 +1,6 @@
import json
import logging
import math
from typing import Any, Optional
from urllib.parse import urlparse
@ -76,7 +77,7 @@ class ElasticSearchVector(BaseVector):
raise ValueError("Elasticsearch vector database version must be greater than 8.0.0")
def get_type(self) -> str:
return "elasticsearch"
return VectorType.ELASTICSEARCH
def add_texts(self, documents: list[Document], embeddings: list[list[float]], **kwargs):
uuids = self._get_uuids(documents)
@ -112,7 +113,8 @@ class ElasticSearchVector(BaseVector):
def search_by_vector(self, query_vector: list[float], **kwargs: Any) -> list[Document]:
top_k = kwargs.get("top_k", 10)
knn = {"field": Field.VECTOR.value, "query_vector": query_vector, "k": top_k}
num_candidates = math.ceil(top_k * 1.5)
knn = {"field": Field.VECTOR.value, "query_vector": query_vector, "k": top_k, "num_candidates": num_candidates}
results = self._client.search(index=self._collection_name, knn=knn, size=top_k)

View File

@ -166,7 +166,7 @@ class PGVector(BaseVector):
with self._get_cursor() as cur:
cur.execute(
f"""SELECT meta, text, ts_rank(to_tsvector(coalesce(text, '')), to_tsquery(%s)) AS score
f"""SELECT meta, text, ts_rank(to_tsvector(coalesce(text, '')), plainto_tsquery(%s)) AS score
FROM {self.table_name}
WHERE to_tsvector(text) @@ plainto_tsquery(%s)
ORDER BY score DESC

View File

@ -162,7 +162,7 @@ class RelytVector(BaseVector):
else:
return None
def delete_by_uuids(self, ids: list[str] = None):
def delete_by_uuids(self, ids: Optional[list[str]] = None):
"""Delete by vector IDs.
Args:

View File

@ -1,5 +1,5 @@
from abc import ABC, abstractmethod
from typing import Any
from typing import Any, Optional
from configs import dify_config
from core.embedding.cached_embedding import CacheEmbedding
@ -25,7 +25,7 @@ class AbstractVectorFactory(ABC):
class Vector:
def __init__(self, dataset: Dataset, attributes: list = None):
def __init__(self, dataset: Dataset, attributes: Optional[list] = None):
if attributes is None:
attributes = ["doc_id", "dataset_id", "document_id", "doc_hash"]
self._dataset = dataset
@ -103,10 +103,18 @@ class Vector:
from core.rag.datasource.vdb.analyticdb.analyticdb_vector import AnalyticdbVectorFactory
return AnalyticdbVectorFactory
case VectorType.BAIDU:
from core.rag.datasource.vdb.baidu.baidu_vector import BaiduVectorFactory
return BaiduVectorFactory
case VectorType.VIKINGDB:
from core.rag.datasource.vdb.vikingdb.vikingdb_vector import VikingDBVectorFactory
return VikingDBVectorFactory
case _:
raise ValueError(f"Vector store {vector_type} is not supported.")
def create(self, texts: list = None, **kwargs):
def create(self, texts: Optional[list] = None, **kwargs):
if texts:
embeddings = self._embeddings.embed_documents([document.page_content for document in texts])
self._vector_processor.create(texts=texts, embeddings=embeddings, **kwargs)

View File

@ -16,3 +16,5 @@ class VectorType(str, Enum):
TENCENT = "tencent"
ORACLE = "oracle"
ELASTICSEARCH = "elasticsearch"
BAIDU = "baidu"
VIKINGDB = "vikingdb"

View File

@ -0,0 +1,239 @@
import json
from typing import Any
from pydantic import BaseModel
from volcengine.viking_db import (
Data,
DistanceType,
Field,
FieldType,
IndexType,
QuantType,
VectorIndexParams,
VikingDBService,
)
from configs import dify_config
from core.rag.datasource.entity.embedding import Embeddings
from core.rag.datasource.vdb.field import Field as vdb_Field
from core.rag.datasource.vdb.vector_base import BaseVector
from core.rag.datasource.vdb.vector_factory import AbstractVectorFactory
from core.rag.datasource.vdb.vector_type import VectorType
from core.rag.models.document import Document
from extensions.ext_redis import redis_client
from models.dataset import Dataset
class VikingDBConfig(BaseModel):
access_key: str
secret_key: str
host: str
region: str
scheme: str
connection_timeout: int
socket_timeout: int
index_type: str = IndexType.HNSW
distance: str = DistanceType.L2
quant: str = QuantType.Float
class VikingDBVector(BaseVector):
def __init__(self, collection_name: str, group_id: str, config: VikingDBConfig):
super().__init__(collection_name)
self._group_id = group_id
self._client_config = config
self._index_name = f"{self._collection_name}_idx"
self._client = VikingDBService(
host=config.host,
region=config.region,
scheme=config.scheme,
connection_timeout=config.connection_timeout,
socket_timeout=config.socket_timeout,
ak=config.access_key,
sk=config.secret_key,
)
def _has_collection(self) -> bool:
try:
self._client.get_collection(self._collection_name)
except Exception:
return False
return True
def _has_index(self) -> bool:
try:
self._client.get_index(self._collection_name, self._index_name)
except Exception:
return False
return True
def _create_collection(self, dimension: int):
lock_name = f"vector_indexing_lock_{self._collection_name}"
with redis_client.lock(lock_name, timeout=20):
collection_exist_cache_key = f"vector_indexing_{self._collection_name}"
if redis_client.get(collection_exist_cache_key):
return
if not self._has_collection():
fields = [
Field(field_name=vdb_Field.PRIMARY_KEY.value, field_type=FieldType.String, is_primary_key=True),
Field(field_name=vdb_Field.METADATA_KEY.value, field_type=FieldType.String),
Field(field_name=vdb_Field.GROUP_KEY.value, field_type=FieldType.String),
Field(field_name=vdb_Field.CONTENT_KEY.value, field_type=FieldType.Text),
Field(field_name=vdb_Field.VECTOR.value, field_type=FieldType.Vector, dim=dimension),
]
self._client.create_collection(
collection_name=self._collection_name,
fields=fields,
description="Collection For Dify",
)
if not self._has_index():
vector_index = VectorIndexParams(
distance=self._client_config.distance,
index_type=self._client_config.index_type,
quant=self._client_config.quant,
)
self._client.create_index(
collection_name=self._collection_name,
index_name=self._index_name,
vector_index=vector_index,
partition_by=vdb_Field.GROUP_KEY.value,
description="Index For Dify",
)
redis_client.set(collection_exist_cache_key, 1, ex=3600)
def get_type(self) -> str:
return VectorType.VIKINGDB
def create(self, texts: list[Document], embeddings: list[list[float]], **kwargs):
dimension = len(embeddings[0])
self._create_collection(dimension)
self.add_texts(texts, embeddings, **kwargs)
def add_texts(self, documents: list[Document], embeddings: list[list[float]], **kwargs):
page_contents = [doc.page_content for doc in documents]
metadatas = [doc.metadata for doc in documents]
docs = []
for i, page_content in enumerate(page_contents):
metadata = {}
if metadatas is not None:
for key, val in metadatas[i].items():
metadata[key] = val
doc = Data(
{
vdb_Field.PRIMARY_KEY.value: metadatas[i]["doc_id"],
vdb_Field.VECTOR.value: embeddings[i] if embeddings else None,
vdb_Field.CONTENT_KEY.value: page_content,
vdb_Field.METADATA_KEY.value: json.dumps(metadata),
vdb_Field.GROUP_KEY.value: self._group_id,
}
)
docs.append(doc)
self._client.get_collection(self._collection_name).upsert_data(docs)
def text_exists(self, id: str) -> bool:
docs = self._client.get_collection(self._collection_name).fetch_data(id)
not_exists_str = "data does not exist"
if docs is not None and not_exists_str not in docs.fields.get("message", ""):
return True
return False
def delete_by_ids(self, ids: list[str]) -> None:
self._client.get_collection(self._collection_name).delete_data(ids)
def get_ids_by_metadata_field(self, key: str, value: str):
# Note: Metadata field value is an dict, but vikingdb field
# not support json type
results = self._client.get_index(self._collection_name, self._index_name).search(
filter={"op": "must", "field": vdb_Field.GROUP_KEY.value, "conds": [self._group_id]},
# max value is 5000
limit=5000,
)
if not results:
return []
ids = []
for result in results:
metadata = result.fields.get(vdb_Field.METADATA_KEY.value)
if metadata is not None:
metadata = json.loads(metadata)
if metadata.get(key) == value:
ids.append(result.id)
return ids
def delete_by_metadata_field(self, key: str, value: str) -> None:
ids = self.get_ids_by_metadata_field(key, value)
self.delete_by_ids(ids)
def search_by_vector(self, query_vector: list[float], **kwargs: Any) -> list[Document]:
results = self._client.get_index(self._collection_name, self._index_name).search_by_vector(
query_vector, limit=kwargs.get("top_k", 50)
)
score_threshold = float(kwargs.get("score_threshold") or 0.0)
return self._get_search_res(results, score_threshold)
def _get_search_res(self, results, score_threshold):
if len(results) == 0:
return []
docs = []
for result in results:
metadata = result.fields.get(vdb_Field.METADATA_KEY.value)
if metadata is not None:
metadata = json.loads(metadata)
if result.score > score_threshold:
metadata["score"] = result.score
doc = Document(page_content=result.fields.get(vdb_Field.CONTENT_KEY.value), metadata=metadata)
docs.append(doc)
docs = sorted(docs, key=lambda x: x.metadata["score"], reverse=True)
return docs
def search_by_full_text(self, query: str, **kwargs: Any) -> list[Document]:
return []
def delete(self) -> None:
if self._has_index():
self._client.drop_index(self._collection_name, self._index_name)
if self._has_collection():
self._client.drop_collection(self._collection_name)
class VikingDBVectorFactory(AbstractVectorFactory):
def init_vector(self, dataset: Dataset, attributes: list, embeddings: Embeddings) -> VikingDBVector:
if dataset.index_struct_dict:
class_prefix: str = dataset.index_struct_dict["vector_store"]["class_prefix"]
collection_name = class_prefix.lower()
else:
dataset_id = dataset.id
collection_name = Dataset.gen_collection_name_by_id(dataset_id).lower()
dataset.index_struct = json.dumps(self.gen_index_struct_dict(VectorType.VIKINGDB, collection_name))
if dify_config.VIKINGDB_ACCESS_KEY is None:
raise ValueError("VIKINGDB_ACCESS_KEY should not be None")
if dify_config.VIKINGDB_SECRET_KEY is None:
raise ValueError("VIKINGDB_SECRET_KEY should not be None")
if dify_config.VIKINGDB_HOST is None:
raise ValueError("VIKINGDB_HOST should not be None")
if dify_config.VIKINGDB_REGION is None:
raise ValueError("VIKINGDB_REGION should not be None")
if dify_config.VIKINGDB_SCHEME is None:
raise ValueError("VIKINGDB_SCHEME should not be None")
return VikingDBVector(
collection_name=collection_name,
group_id=dataset.id,
config=VikingDBConfig(
access_key=dify_config.VIKINGDB_ACCESS_KEY,
secret_key=dify_config.VIKINGDB_SECRET_KEY,
host=dify_config.VIKINGDB_HOST,
region=dify_config.VIKINGDB_REGION,
scheme=dify_config.VIKINGDB_SCHEME,
connection_timeout=dify_config.VIKINGDB_CONNECTION_TIMEOUT,
socket_timeout=dify_config.VIKINGDB_SOCKET_TIMEOUT,
),
)

View File

@ -0,0 +1,12 @@
from typing import Optional
from pydantic import BaseModel
class DocumentContext(BaseModel):
"""
Model class for document context.
"""
content: str
score: Optional[float] = None

View File

@ -1,7 +1,7 @@
import re
import tempfile
from pathlib import Path
from typing import Union
from typing import Optional, Union
from urllib.parse import unquote
from configs import dify_config
@ -12,6 +12,7 @@ from core.rag.extractor.entity.extract_setting import ExtractSetting
from core.rag.extractor.excel_extractor import ExcelExtractor
from core.rag.extractor.firecrawl.firecrawl_web_extractor import FirecrawlWebExtractor
from core.rag.extractor.html_extractor import HtmlExtractor
from core.rag.extractor.jina_reader_extractor import JinaReaderWebExtractor
from core.rag.extractor.markdown_extractor import MarkdownExtractor
from core.rag.extractor.notion_extractor import NotionExtractor
from core.rag.extractor.pdf_extractor import PdfExtractor
@ -83,7 +84,7 @@ class ExtractProcessor:
@classmethod
def extract(
cls, extract_setting: ExtractSetting, is_automatic: bool = False, file_path: str = None
cls, extract_setting: ExtractSetting, is_automatic: bool = False, file_path: Optional[str] = None
) -> list[Document]:
if extract_setting.datasource_type == DatasourceType.FILE.value:
with tempfile.TemporaryDirectory() as temp_dir:
@ -171,6 +172,15 @@ class ExtractProcessor:
only_main_content=extract_setting.website_info.only_main_content,
)
return extractor.extract()
elif extract_setting.website_info.provider == "jinareader":
extractor = JinaReaderWebExtractor(
url=extract_setting.website_info.url,
job_id=extract_setting.website_info.job_id,
tenant_id=extract_setting.website_info.tenant_id,
mode=extract_setting.website_info.mode,
only_main_content=extract_setting.website_info.only_main_content,
)
return extractor.extract()
else:
raise ValueError(f"Unsupported website provider: {extract_setting.website_info.provider}")
else:

View File

@ -0,0 +1,35 @@
from core.rag.extractor.extractor_base import BaseExtractor
from core.rag.models.document import Document
from services.website_service import WebsiteService
class JinaReaderWebExtractor(BaseExtractor):
"""
Crawl and scrape websites and return content in clean llm-ready markdown.
"""
def __init__(self, url: str, job_id: str, tenant_id: str, mode: str = "crawl", only_main_content: bool = False):
"""Initialize with url, api_key, base_url and mode."""
self._url = url
self.job_id = job_id
self.tenant_id = tenant_id
self.mode = mode
self.only_main_content = only_main_content
def extract(self) -> list[Document]:
"""Extract content from the URL."""
documents = []
if self.mode == "crawl":
crawl_data = WebsiteService.get_crawl_url_data(self.job_id, "jinareader", self._url, self.tenant_id)
if crawl_data is None:
return []
document = Document(
page_content=crawl_data.get("content", ""),
metadata={
"source_url": crawl_data.get("url"),
"description": crawl_data.get("description"),
"title": crawl_data.get("title"),
},
)
documents.append(document)
return documents

View File

@ -1,4 +1,5 @@
import logging
from typing import Optional
from core.rag.extractor.extractor_base import BaseExtractor
from core.rag.models.document import Document
@ -17,7 +18,7 @@ class UnstructuredEpubExtractor(BaseExtractor):
def __init__(
self,
file_path: str,
api_url: str = None,
api_url: Optional[str] = None,
):
"""Initialize with file path."""
self._file_path = file_path

View File

@ -17,6 +17,8 @@ class Document(BaseModel):
"""
metadata: Optional[dict] = Field(default_factory=dict)
provider: Optional[str] = "dify"
class BaseDocumentTransformer(ABC):
"""Abstract base class for document transformation systems.

View File

@ -28,11 +28,16 @@ class RerankModelRunner:
docs = []
doc_id = []
unique_documents = []
for document in documents:
dify_documents = [item for item in documents if item.provider == "dify"]
external_documents = [item for item in documents if item.provider == "external"]
for document in dify_documents:
if document.metadata["doc_id"] not in doc_id:
doc_id.append(document.metadata["doc_id"])
docs.append(document.page_content)
unique_documents.append(document)
for document in external_documents:
docs.append(document.page_content)
unique_documents.append(document)
documents = unique_documents
@ -46,14 +51,10 @@ class RerankModelRunner:
# format document
rerank_document = Document(
page_content=result.text,
metadata={
"doc_id": documents[result.index].metadata["doc_id"],
"doc_hash": documents[result.index].metadata["doc_hash"],
"document_id": documents[result.index].metadata["document_id"],
"dataset_id": documents[result.index].metadata["dataset_id"],
"score": result.score,
},
metadata=documents[result.index].metadata,
provider=documents[result.index].provider,
)
rerank_document.metadata["score"] = result.score
rerank_documents.append(rerank_document)
return rerank_documents

View File

@ -20,6 +20,7 @@ from core.ops.utils import measure_time
from core.rag.data_post_processor.data_post_processor import DataPostProcessor
from core.rag.datasource.keyword.jieba.jieba_keyword_table_handler import JiebaKeywordTableHandler
from core.rag.datasource.retrieval_service import RetrievalService
from core.rag.entities.context_entities import DocumentContext
from core.rag.models.document import Document
from core.rag.retrieval.retrieval_methods import RetrievalMethod
from core.rag.retrieval.router.multi_dataset_function_call_router import FunctionCallMultiDatasetRouter
@ -28,6 +29,7 @@ from core.tools.utils.dataset_retriever.dataset_retriever_base_tool import Datas
from extensions.ext_database import db
from models.dataset import Dataset, DatasetQuery, DocumentSegment
from models.dataset import Document as DatasetDocument
from services.external_knowledge_service import ExternalDatasetService
default_retrieval_model = {
"search_method": RetrievalMethod.SEMANTIC_SEARCH.value,
@ -108,7 +110,7 @@ class DatasetRetrieval:
continue
# pass if dataset is not available
if dataset and dataset.available_document_count == 0:
if dataset and dataset.available_document_count == 0 and dataset.provider != "external":
continue
available_datasets.append(dataset)
@ -144,69 +146,96 @@ class DatasetRetrieval:
message_id,
)
document_score_list = {}
for item in all_documents:
if item.metadata.get("score"):
document_score_list[item.metadata["doc_id"]] = item.metadata["score"]
dify_documents = [item for item in all_documents if item.provider == "dify"]
external_documents = [item for item in all_documents if item.provider == "external"]
document_context_list = []
index_node_ids = [document.metadata["doc_id"] for document in all_documents]
segments = DocumentSegment.query.filter(
DocumentSegment.dataset_id.in_(dataset_ids),
DocumentSegment.completed_at.isnot(None),
DocumentSegment.status == "completed",
DocumentSegment.enabled == True,
DocumentSegment.index_node_id.in_(index_node_ids),
).all()
retrieval_resource_list = []
# deal with external documents
for item in external_documents:
document_context_list.append(DocumentContext(content=item.page_content, score=item.metadata.get("score")))
source = {
"dataset_id": item.metadata.get("dataset_id"),
"dataset_name": item.metadata.get("dataset_name"),
"document_name": item.metadata.get("title"),
"data_source_type": "external",
"retriever_from": invoke_from.to_source(),
"score": item.metadata.get("score"),
"content": item.page_content,
}
retrieval_resource_list.append(source)
document_score_list = {}
# deal with dify documents
if dify_documents:
for item in dify_documents:
if item.metadata.get("score"):
document_score_list[item.metadata["doc_id"]] = item.metadata["score"]
if segments:
index_node_id_to_position = {id: position for position, id in enumerate(index_node_ids)}
sorted_segments = sorted(
segments, key=lambda segment: index_node_id_to_position.get(segment.index_node_id, float("inf"))
)
for segment in sorted_segments:
if segment.answer:
document_context_list.append(f"question:{segment.get_sign_content()} answer:{segment.answer}")
else:
document_context_list.append(segment.get_sign_content())
if show_retrieve_source:
context_list = []
resource_number = 1
index_node_ids = [document.metadata["doc_id"] for document in dify_documents]
segments = DocumentSegment.query.filter(
DocumentSegment.dataset_id.in_(dataset_ids),
DocumentSegment.status == "completed",
DocumentSegment.enabled == True,
DocumentSegment.index_node_id.in_(index_node_ids),
).all()
if segments:
index_node_id_to_position = {id: position for position, id in enumerate(index_node_ids)}
sorted_segments = sorted(
segments, key=lambda segment: index_node_id_to_position.get(segment.index_node_id, float("inf"))
)
for segment in sorted_segments:
dataset = Dataset.query.filter_by(id=segment.dataset_id).first()
document = DatasetDocument.query.filter(
DatasetDocument.id == segment.document_id,
DatasetDocument.enabled == True,
DatasetDocument.archived == False,
).first()
if dataset and document:
source = {
"position": resource_number,
"dataset_id": dataset.id,
"dataset_name": dataset.name,
"document_id": document.id,
"document_name": document.name,
"data_source_type": document.data_source_type,
"segment_id": segment.id,
"retriever_from": invoke_from.to_source(),
"score": document_score_list.get(segment.index_node_id, None),
}
if segment.answer:
document_context_list.append(
DocumentContext(
content=f"question:{segment.get_sign_content()} answer:{segment.answer}",
score=document_score_list.get(segment.index_node_id, None),
)
)
else:
document_context_list.append(
DocumentContext(
content=segment.get_sign_content(),
score=document_score_list.get(segment.index_node_id, None),
)
)
if show_retrieve_source:
for segment in sorted_segments:
dataset = Dataset.query.filter_by(id=segment.dataset_id).first()
document = DatasetDocument.query.filter(
DatasetDocument.id == segment.document_id,
DatasetDocument.enabled == True,
DatasetDocument.archived == False,
).first()
if dataset and document:
source = {
"dataset_id": dataset.id,
"dataset_name": dataset.name,
"document_id": document.id,
"document_name": document.name,
"data_source_type": document.data_source_type,
"segment_id": segment.id,
"retriever_from": invoke_from.to_source(),
"score": document_score_list.get(segment.index_node_id, None),
}
if invoke_from.to_source() == "dev":
source["hit_count"] = segment.hit_count
source["word_count"] = segment.word_count
source["segment_position"] = segment.position
source["index_node_hash"] = segment.index_node_hash
if segment.answer:
source["content"] = f"question:{segment.content} \nanswer:{segment.answer}"
else:
source["content"] = segment.content
context_list.append(source)
resource_number += 1
if hit_callback:
hit_callback.return_retriever_resource_info(context_list)
return str("\n".join(document_context_list))
if invoke_from.to_source() == "dev":
source["hit_count"] = segment.hit_count
source["word_count"] = segment.word_count
source["segment_position"] = segment.position
source["index_node_hash"] = segment.index_node_hash
if segment.answer:
source["content"] = f"question:{segment.content} \nanswer:{segment.answer}"
else:
source["content"] = segment.content
retrieval_resource_list.append(source)
if hit_callback and retrieval_resource_list:
retrieval_resource_list = sorted(retrieval_resource_list, key=lambda x: x.get("score"), reverse=True)
for position, item in enumerate(retrieval_resource_list, start=1):
item["position"] = position
hit_callback.return_retriever_resource_info(retrieval_resource_list)
if document_context_list:
document_context_list = sorted(document_context_list, key=lambda x: x.score, reverse=True)
return str("\n".join([document_context.content for document_context in document_context_list]))
return ""
def single_retrieve(
@ -254,36 +283,58 @@ class DatasetRetrieval:
# get retrieval model config
dataset = db.session.query(Dataset).filter(Dataset.id == dataset_id).first()
if dataset:
retrieval_model_config = dataset.retrieval_model or default_retrieval_model
# get top k
top_k = retrieval_model_config["top_k"]
# get retrieval method
if dataset.indexing_technique == "economy":
retrieval_method = "keyword_search"
else:
retrieval_method = retrieval_model_config["search_method"]
# get reranking model
reranking_model = (
retrieval_model_config["reranking_model"] if retrieval_model_config["reranking_enable"] else None
)
# get score threshold
score_threshold = 0.0
score_threshold_enabled = retrieval_model_config.get("score_threshold_enabled")
if score_threshold_enabled:
score_threshold = retrieval_model_config.get("score_threshold")
with measure_time() as timer:
results = RetrievalService.retrieve(
retrieval_method=retrieval_method,
dataset_id=dataset.id,
results = []
if dataset.provider == "external":
external_documents = ExternalDatasetService.fetch_external_knowledge_retrieval(
tenant_id=dataset.tenant_id,
dataset_id=dataset_id,
query=query,
top_k=top_k,
score_threshold=score_threshold,
reranking_model=reranking_model,
reranking_mode=retrieval_model_config.get("reranking_mode", "reranking_model"),
weights=retrieval_model_config.get("weights", None),
external_retrieval_parameters=dataset.retrieval_model,
)
for external_document in external_documents:
document = Document(
page_content=external_document.get("content"),
metadata=external_document.get("metadata"),
provider="external",
)
document.metadata["score"] = external_document.get("score")
document.metadata["title"] = external_document.get("title")
document.metadata["dataset_id"] = dataset_id
document.metadata["dataset_name"] = dataset.name
results.append(document)
else:
retrieval_model_config = dataset.retrieval_model or default_retrieval_model
# get top k
top_k = retrieval_model_config["top_k"]
# get retrieval method
if dataset.indexing_technique == "economy":
retrieval_method = "keyword_search"
else:
retrieval_method = retrieval_model_config["search_method"]
# get reranking model
reranking_model = (
retrieval_model_config["reranking_model"]
if retrieval_model_config["reranking_enable"]
else None
)
# get score threshold
score_threshold = 0.0
score_threshold_enabled = retrieval_model_config.get("score_threshold_enabled")
if score_threshold_enabled:
score_threshold = retrieval_model_config.get("score_threshold")
with measure_time() as timer:
results = RetrievalService.retrieve(
retrieval_method=retrieval_method,
dataset_id=dataset.id,
query=query,
top_k=top_k,
score_threshold=score_threshold,
reranking_model=reranking_model,
reranking_mode=retrieval_model_config.get("reranking_mode", "reranking_model"),
weights=retrieval_model_config.get("weights", None),
)
self._on_query(query, [dataset_id], app_id, user_from, user_id)
if results:
@ -354,7 +405,8 @@ class DatasetRetrieval:
self, documents: list[Document], message_id: Optional[str] = None, timer: Optional[dict] = None
) -> None:
"""Handle retrieval end."""
for document in documents:
dify_documents = [document for document in documents if document.provider == "dify"]
for document in dify_documents:
query = db.session.query(DocumentSegment).filter(
DocumentSegment.index_node_id == document.metadata["doc_id"]
)
@ -407,35 +459,54 @@ class DatasetRetrieval:
if not dataset:
return []
# get retrieval model , if the model is not setting , using default
retrieval_model = dataset.retrieval_model or default_retrieval_model
if dataset.indexing_technique == "economy":
# use keyword table query
documents = RetrievalService.retrieve(
retrieval_method="keyword_search", dataset_id=dataset.id, query=query, top_k=top_k
if dataset.provider == "external":
external_documents = ExternalDatasetService.fetch_external_knowledge_retrieval(
tenant_id=dataset.tenant_id,
dataset_id=dataset_id,
query=query,
external_retrieval_parameters=dataset.retrieval_model,
)
if documents:
all_documents.extend(documents)
else:
if top_k > 0:
# retrieval source
documents = RetrievalService.retrieve(
retrieval_method=retrieval_model["search_method"],
dataset_id=dataset.id,
query=query,
top_k=retrieval_model.get("top_k") or 2,
score_threshold=retrieval_model.get("score_threshold", 0.0)
if retrieval_model["score_threshold_enabled"]
else 0.0,
reranking_model=retrieval_model.get("reranking_model", None)
if retrieval_model["reranking_enable"]
else None,
reranking_mode=retrieval_model.get("reranking_mode") or "reranking_model",
weights=retrieval_model.get("weights", None),
for external_document in external_documents:
document = Document(
page_content=external_document.get("content"),
metadata=external_document.get("metadata"),
provider="external",
)
document.metadata["score"] = external_document.get("score")
document.metadata["title"] = external_document.get("title")
document.metadata["dataset_id"] = dataset_id
document.metadata["dataset_name"] = dataset.name
all_documents.append(document)
else:
# get retrieval model , if the model is not setting , using default
retrieval_model = dataset.retrieval_model or default_retrieval_model
all_documents.extend(documents)
if dataset.indexing_technique == "economy":
# use keyword table query
documents = RetrievalService.retrieve(
retrieval_method="keyword_search", dataset_id=dataset.id, query=query, top_k=top_k
)
if documents:
all_documents.extend(documents)
else:
if top_k > 0:
# retrieval source
documents = RetrievalService.retrieve(
retrieval_method=retrieval_model["search_method"],
dataset_id=dataset.id,
query=query,
top_k=retrieval_model.get("top_k") or 2,
score_threshold=retrieval_model.get("score_threshold", 0.0)
if retrieval_model["score_threshold_enabled"]
else 0.0,
reranking_model=retrieval_model.get("reranking_model", None)
if retrieval_model["reranking_enable"]
else None,
reranking_mode=retrieval_model.get("reranking_mode") or "reranking_model",
weights=retrieval_model.get("weights", None),
)
all_documents.extend(documents)
def to_dataset_retriever_tool(
self,
@ -466,7 +537,7 @@ class DatasetRetrieval:
continue
# pass if dataset is not available
if dataset and dataset.available_document_count == 0:
if dataset and dataset.provider != "external" and dataset.available_document_count == 0:
continue
available_datasets.append(dataset)

View File

@ -0,0 +1,44 @@
from datetime import datetime
from typing import Any, Union
import pytz
from core.tools.builtin_tool.tool import BuiltinTool
from core.tools.entities.tool_entities import ToolInvokeMessage
from core.tools.errors import ToolInvokeError
class LocaltimeToTimestampTool(BuiltinTool):
def _invoke(
self,
user_id: str,
tool_parameters: dict[str, Any],
) -> Union[ToolInvokeMessage, list[ToolInvokeMessage]]:
"""
Convert localtime to timestamp
"""
localtime = tool_parameters.get("localtime")
timezone = tool_parameters.get("timezone", "Asia/Shanghai")
if not timezone:
timezone = None
time_format = "%Y-%m-%d %H:%M:%S"
timestamp = self.localtime_to_timestamp(localtime, time_format, timezone)
if not timestamp:
return self.create_text_message(f"Invalid localtime: {localtime}")
return self.create_text_message(f"{timestamp}")
@staticmethod
def localtime_to_timestamp(localtime: str, time_format: str, local_tz=None) -> int | None:
try:
if local_tz is None:
local_tz = datetime.now().astimezone().tzinfo
if isinstance(local_tz, str):
local_tz = pytz.timezone(local_tz)
local_time = datetime.strptime(localtime, time_format)
localtime = local_tz.localize(local_time)
timestamp = int(localtime.timestamp())
return timestamp
except Exception as e:
raise ToolInvokeError(str(e))

View File

@ -0,0 +1,33 @@
identity:
name: localtime_to_timestamp
author: zhuhao
label:
en_US: localtime to timestamp
zh_Hans: 获取时间戳
description:
human:
en_US: A tool for localtime convert to timestamp
zh_Hans: 获取时间戳
llm: A tool for localtime convert to timestamp
parameters:
- name: localtime
type: string
required: true
form: llm
label:
en_US: localtime
zh_Hans: 本地时间
human_description:
en_US: localtime, such as 2024-1-1 0:0:0
zh_Hans: 本地时间, 比如2024-1-1 0:0:0
- name: timezone
type: string
required: false
form: llm
label:
en_US: Timezone
zh_Hans: 时区
human_description:
en_US: Timezone, such as Asia/Shanghai
zh_Hans: 时区, 比如Asia/Shanghai
default: Asia/Shanghai

View File

@ -0,0 +1,44 @@
from datetime import datetime
from typing import Any, Union
import pytz
from core.tools.builtin_tool.tool import BuiltinTool
from core.tools.entities.tool_entities import ToolInvokeMessage
from core.tools.errors import ToolInvokeError
class TimestampToLocaltimeTool(BuiltinTool):
def _invoke(
self,
user_id: str,
tool_parameters: dict[str, Any],
) -> Union[ToolInvokeMessage, list[ToolInvokeMessage]]:
"""
Convert timestamp to localtime
"""
timestamp = tool_parameters.get("timestamp")
timezone = tool_parameters.get("timezone", "Asia/Shanghai")
if not timezone:
timezone = None
time_format = "%Y-%m-%d %H:%M:%S"
locatime = self.timestamp_to_localtime(timestamp, timezone)
if not locatime:
return self.create_text_message(f"Invalid timestamp: {timestamp}")
localtime_format = locatime.strftime(time_format)
return self.create_text_message(f"{localtime_format}")
@staticmethod
def timestamp_to_localtime(timestamp: int, local_tz=None) -> datetime | None:
try:
if local_tz is None:
local_tz = datetime.now().astimezone().tzinfo
if isinstance(local_tz, str):
local_tz = pytz.timezone(local_tz)
local_time = datetime.fromtimestamp(timestamp, local_tz)
return local_time
except Exception as e:
raise ToolInvokeError(str(e))

View File

@ -0,0 +1,33 @@
identity:
name: timestamp_to_localtime
author: zhuhao
label:
en_US: Timestamp to localtime
zh_Hans: 时间戳转换
description:
human:
en_US: A tool for timestamp convert to localtime
zh_Hans: 时间戳转换
llm: A tool for timestamp convert to localtime
parameters:
- name: timestamp
type: number
required: true
form: llm
label:
en_US: Timestamp
zh_Hans: 时间戳
human_description:
en_US: Timestamp
zh_Hans: 时间戳
- name: timezone
type: string
required: false
form: llm
label:
en_US: Timezone
zh_Hans: 时区
human_description:
en_US: Timezone, such as Asia/Shanghai
zh_Hans: 时区, 比如Asia/Shanghai
default: Asia/Shanghai

View File

@ -1,3 +1,5 @@
from typing import Optional
from core.model_runtime.entities.llm_entities import LLMResult
from core.model_runtime.entities.message_entities import PromptMessage, SystemPromptMessage, UserPromptMessage
from core.tools.__base.tool import Tool

View File

@ -1,10 +1,12 @@
from pydantic import BaseModel, Field
from core.rag.datasource.retrieval_service import RetrievalService
from core.rag.models.document import Document as RetrievalDocument
from core.rag.retrieval.retrieval_methods import RetrievalMethod
from core.tools.tool.dataset_retriever.dataset_retriever_base_tool import DatasetRetrieverBaseTool
from extensions.ext_database import db
from models.dataset import Dataset, Document, DocumentSegment
from services.external_knowledge_service import ExternalDatasetService
default_retrieval_model = {
"search_method": RetrievalMethod.SEMANTIC_SEARCH.value,
@ -53,97 +55,137 @@ class DatasetRetrieverTool(DatasetRetrieverBaseTool):
for hit_callback in self.hit_callbacks:
hit_callback.on_query(query, dataset.id)
# get retrieval model , if the model is not setting , using default
retrieval_model = dataset.retrieval_model or default_retrieval_model
if dataset.indexing_technique == "economy":
# use keyword table query
documents = RetrievalService.retrieve(
retrieval_method="keyword_search", dataset_id=dataset.id, query=query, top_k=self.top_k
if dataset.provider == "external":
results = []
external_documents = ExternalDatasetService.fetch_external_knowledge_retrieval(
tenant_id=dataset.tenant_id,
dataset_id=dataset.id,
query=query,
external_retrieval_parameters=dataset.retrieval_model,
)
return str("\n".join([document.page_content for document in documents]))
else:
if self.top_k > 0:
# retrieval source
documents = RetrievalService.retrieve(
retrieval_method=retrieval_model.get("search_method", "semantic_search"),
dataset_id=dataset.id,
query=query,
top_k=self.top_k,
score_threshold=retrieval_model.get("score_threshold", 0.0)
if retrieval_model["score_threshold_enabled"]
else 0.0,
reranking_model=retrieval_model.get("reranking_model", None)
if retrieval_model["reranking_enable"]
else None,
reranking_mode=retrieval_model.get("reranking_mode") or "reranking_model",
weights=retrieval_model.get("weights", None),
for external_document in external_documents:
document = RetrievalDocument(
page_content=external_document.get("content"),
metadata=external_document.get("metadata"),
provider="external",
)
else:
documents = []
document.metadata["score"] = external_document.get("score")
document.metadata["title"] = external_document.get("title")
document.metadata["dataset_id"] = dataset.id
document.metadata["dataset_name"] = dataset.name
results.append(document)
# deal with external documents
context_list = []
for position, item in enumerate(results, start=1):
source = {
"position": position,
"dataset_id": item.metadata.get("dataset_id"),
"dataset_name": item.metadata.get("dataset_name"),
"document_name": item.metadata.get("title"),
"data_source_type": "external",
"retriever_from": self.retriever_from,
"score": item.metadata.get("score"),
"title": item.metadata.get("title"),
"content": item.page_content,
}
context_list.append(source)
for hit_callback in self.hit_callbacks:
hit_callback.on_tool_end(documents)
document_score_list = {}
if dataset.indexing_technique != "economy":
for item in documents:
if item.metadata.get("score"):
document_score_list[item.metadata["doc_id"]] = item.metadata["score"]
document_context_list = []
index_node_ids = [document.metadata["doc_id"] for document in documents]
segments = DocumentSegment.query.filter(
DocumentSegment.dataset_id == self.dataset_id,
DocumentSegment.completed_at.isnot(None),
DocumentSegment.status == "completed",
DocumentSegment.enabled == True,
DocumentSegment.index_node_id.in_(index_node_ids),
).all()
hit_callback.return_retriever_resource_info(context_list)
if segments:
index_node_id_to_position = {id: position for position, id in enumerate(index_node_ids)}
sorted_segments = sorted(
segments, key=lambda segment: index_node_id_to_position.get(segment.index_node_id, float("inf"))
return str("\n".join([item.page_content for item in results]))
else:
# get retrieval model , if the model is not setting , using default
retrieval_model = dataset.retrieval_model or default_retrieval_model
if dataset.indexing_technique == "economy":
# use keyword table query
documents = RetrievalService.retrieve(
retrieval_method="keyword_search", dataset_id=dataset.id, query=query, top_k=self.top_k
)
for segment in sorted_segments:
if segment.answer:
document_context_list.append(f"question:{segment.get_sign_content()} answer:{segment.answer}")
else:
document_context_list.append(segment.get_sign_content())
if self.return_resource:
context_list = []
resource_number = 1
return str("\n".join([document.page_content for document in documents]))
else:
if self.top_k > 0:
# retrieval source
documents = RetrievalService.retrieve(
retrieval_method=retrieval_model.get("search_method", "semantic_search"),
dataset_id=dataset.id,
query=query,
top_k=self.top_k,
score_threshold=retrieval_model.get("score_threshold", 0.0)
if retrieval_model["score_threshold_enabled"]
else 0.0,
reranking_model=retrieval_model.get("reranking_model", None)
if retrieval_model["reranking_enable"]
else None,
reranking_mode=retrieval_model.get("reranking_mode") or "reranking_model",
weights=retrieval_model.get("weights", None),
)
else:
documents = []
for hit_callback in self.hit_callbacks:
hit_callback.on_tool_end(documents)
document_score_list = {}
if dataset.indexing_technique != "economy":
for item in documents:
if item.metadata.get("score"):
document_score_list[item.metadata["doc_id"]] = item.metadata["score"]
document_context_list = []
index_node_ids = [document.metadata["doc_id"] for document in documents]
segments = DocumentSegment.query.filter(
DocumentSegment.dataset_id == self.dataset_id,
DocumentSegment.completed_at.isnot(None),
DocumentSegment.status == "completed",
DocumentSegment.enabled == True,
DocumentSegment.index_node_id.in_(index_node_ids),
).all()
if segments:
index_node_id_to_position = {id: position for position, id in enumerate(index_node_ids)}
sorted_segments = sorted(
segments, key=lambda segment: index_node_id_to_position.get(segment.index_node_id, float("inf"))
)
for segment in sorted_segments:
context = {}
document = Document.query.filter(
Document.id == segment.document_id,
Document.enabled == True,
Document.archived == False,
).first()
if dataset and document:
source = {
"position": resource_number,
"dataset_id": dataset.id,
"dataset_name": dataset.name,
"document_id": document.id,
"document_name": document.name,
"data_source_type": document.data_source_type,
"segment_id": segment.id,
"retriever_from": self.retriever_from,
"score": document_score_list.get(segment.index_node_id, None),
}
if self.retriever_from == "dev":
source["hit_count"] = segment.hit_count
source["word_count"] = segment.word_count
source["segment_position"] = segment.position
source["index_node_hash"] = segment.index_node_hash
if segment.answer:
source["content"] = f"question:{segment.content} \nanswer:{segment.answer}"
else:
source["content"] = segment.content
context_list.append(source)
resource_number += 1
if segment.answer:
document_context_list.append(
f"question:{segment.get_sign_content()} answer:{segment.answer}"
)
else:
document_context_list.append(segment.get_sign_content())
if self.return_resource:
context_list = []
resource_number = 1
for segment in sorted_segments:
context = {}
document = Document.query.filter(
Document.id == segment.document_id,
Document.enabled == True,
Document.archived == False,
).first()
if dataset and document:
source = {
"position": resource_number,
"dataset_id": dataset.id,
"dataset_name": dataset.name,
"document_id": document.id,
"document_name": document.name,
"data_source_type": document.data_source_type,
"segment_id": segment.id,
"retriever_from": self.retriever_from,
"score": document_score_list.get(segment.index_node_id, None),
}
if self.retriever_from == "dev":
source["hit_count"] = segment.hit_count
source["word_count"] = segment.word_count
source["segment_position"] = segment.position
source["index_node_hash"] = segment.index_node_hash
if segment.answer:
source["content"] = f"question:{segment.content} \nanswer:{segment.answer}"
else:
source["content"] = segment.content
context_list.append(source)
resource_number += 1
for hit_callback in self.hit_callbacks:
hit_callback.return_retriever_resource_info(context_list)
for hit_callback in self.hit_callbacks:
hit_callback.return_retriever_resource_info(context_list)
return str("\n".join(document_context_list))
return str("\n".join(document_context_list))

View File

@ -1,3 +1,5 @@
from typing import Optional
import httpx
from core.tools.errors import ToolProviderCredentialValidationError
@ -32,7 +34,12 @@ class FeishuRequest:
return res.get("tenant_access_token")
def _send_request(
self, url: str, method: str = "post", require_token: bool = True, payload: dict = None, params: dict = None
self,
url: str,
method: str = "post",
require_token: bool = True,
payload: Optional[dict] = None,
params: Optional[dict] = None,
):
headers = {
"Content-Type": "application/json",

View File

@ -3,6 +3,7 @@ import uuid
from json import dumps as json_dumps
from json import loads as json_loads
from json.decoder import JSONDecodeError
from typing import Optional
from requests import get
from yaml import YAMLError, safe_load

View File

@ -92,10 +92,13 @@ class WorkflowTool(Tool):
if data.get("error"):
raise Exception(data.get("error"))
outputs = data.get("outputs", {})
outputs, files = self._extract_files(outputs)
for file in files:
yield self.create_file_var_message(file)
outputs = data.get("outputs")
if outputs == None:
outputs = {}
else:
outputs, files = self._extract_files(outputs)
for file in files:
yield self.create_file_var_message(file)
yield self.create_text_message(json.dumps(outputs, ensure_ascii=False))
yield self.create_json_message(outputs)

View File

@ -11,3 +11,6 @@ class SystemVariableKey(str, Enum):
CONVERSATION_ID = "conversation_id"
USER_ID = "user_id"
DIALOGUE_COUNT = "dialogue_count"
APP_ID = "app_id"
WORKFLOW_ID = "workflow_id"
WORKFLOW_RUN_ID = "workflow_run_id"

View File

@ -79,8 +79,9 @@ class KnowledgeRetrievalNode(BaseNode):
results = (
db.session.query(Dataset)
.join(subquery, Dataset.id == subquery.c.dataset_id)
.outerjoin(subquery, Dataset.id == subquery.c.dataset_id)
.filter(Dataset.tenant_id == self.tenant_id, Dataset.id.in_(dataset_ids))
.filter((subquery.c.available_document_count > 0) | (Dataset.provider == "external"))
.all()
)
@ -121,10 +122,13 @@ class KnowledgeRetrievalNode(BaseNode):
)
elif node_data.retrieval_mode == DatasetRetrieveConfigEntity.RetrieveStrategy.MULTIPLE.value:
if node_data.multiple_retrieval_config.reranking_mode == "reranking_model":
reranking_model = {
"reranking_provider_name": node_data.multiple_retrieval_config.reranking_model.provider,
"reranking_model_name": node_data.multiple_retrieval_config.reranking_model.model,
}
if node_data.multiple_retrieval_config.reranking_model:
reranking_model = {
"reranking_provider_name": node_data.multiple_retrieval_config.reranking_model.provider,
"reranking_model_name": node_data.multiple_retrieval_config.reranking_model.model,
}
else:
reranking_model = None
weights = None
elif node_data.multiple_retrieval_config.reranking_mode == "weighted_score":
reranking_model = None
@ -156,16 +160,34 @@ class KnowledgeRetrievalNode(BaseNode):
weights,
node_data.multiple_retrieval_config.reranking_enable,
)
context_list = []
if all_documents:
dify_documents = [item for item in all_documents if item.provider == "dify"]
external_documents = [item for item in all_documents if item.provider == "external"]
retrieval_resource_list = []
# deal with external documents
for item in external_documents:
source = {
"metadata": {
"_source": "knowledge",
"dataset_id": item.metadata.get("dataset_id"),
"dataset_name": item.metadata.get("dataset_name"),
"document_name": item.metadata.get("title"),
"data_source_type": "external",
"retriever_from": "workflow",
"score": item.metadata.get("score"),
},
"title": item.metadata.get("title"),
"content": item.page_content,
}
retrieval_resource_list.append(source)
document_score_list = {}
# deal with dify documents
if dify_documents:
document_score_list = {}
page_number_list = {}
for item in all_documents:
for item in dify_documents:
if item.metadata.get("score"):
document_score_list[item.metadata["doc_id"]] = item.metadata["score"]
index_node_ids = [document.metadata["doc_id"] for document in all_documents]
index_node_ids = [document.metadata["doc_id"] for document in dify_documents]
segments = DocumentSegment.query.filter(
DocumentSegment.dataset_id.in_(dataset_ids),
DocumentSegment.completed_at.isnot(None),
@ -186,13 +208,10 @@ class KnowledgeRetrievalNode(BaseNode):
Document.enabled == True,
Document.archived == False,
).first()
resource_number = 1
if dataset and document:
source = {
"metadata": {
"_source": "knowledge",
"position": resource_number,
"dataset_id": dataset.id,
"dataset_name": dataset.name,
"document_id": document.id,
@ -212,9 +231,16 @@ class KnowledgeRetrievalNode(BaseNode):
source["content"] = f"question:{segment.get_sign_content()} \nanswer:{segment.answer}"
else:
source["content"] = segment.get_sign_content()
context_list.append(source)
resource_number += 1
return context_list
retrieval_resource_list.append(source)
if retrieval_resource_list:
retrieval_resource_list = sorted(
retrieval_resource_list, key=lambda x: x.get("metadata").get("score"), reverse=True
)
position = 1
for item in retrieval_resource_list:
item["metadata"]["position"] = position
position += 1
return retrieval_resource_list
@classmethod
def _extract_variable_selector_to_variable_mapping(