mirror of
https://github.com/langgenius/dify.git
synced 2026-05-05 09:58:04 +08:00
feat:update latest commit (#51)
* test: adding some web tests (#27792) * feat: add validation to prevent saving empty opening statement in conversation opener modal (#27843) * fix(web): improve the consistency of the inputs-form UI (#27837) * fix(web): increase z-index of PortalToFollowElemContent (#27823) * fix: installation_id is missing when in tools page (#27849) * fix: avoid passing empty uniqueIdentifier to InstallFromMarketplace (#27802) Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> * test: create new test scripts and update some existing test scripts o… (#27850) * feat: change feedback to forum (#27862) * chore: translate i18n files and update type definitions (#27868) Co-authored-by: crazywoola <100913391+crazywoola@users.noreply.github.com> * Fix/template transformer line number (#27867) Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com> Co-authored-by: crazywoola <100913391+crazywoola@users.noreply.github.com> * bump vite to 6.4.1 (#27877) * Add WEAVIATE_GRPC_ENDPOINT as designed in weaviate migration guide (#27861) Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com> * Fix: correct DraftWorkflowApi.post response model (#27289) Signed-off-by: Yongtao Huang <yongtaoh2022@gmail.com> Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com> * fix Version 2.0.0-beta.2: Chat annotations Api Error #25506 (#27206) Co-authored-by: crazywoola <100913391+crazywoola@users.noreply.github.com> Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> Co-authored-by: Asuka Minato <i@asukaminato.eu.org> * fix jina reader creadential migration command (#27883) * fix agent putout the output of workflow-tool twice (#26835) (#27087) * fix jina reader transform (#27922) * fix: prevent fetch version info in enterprise edition (#27923) * fix(api): fix `VariablePool.get` adding unexpected keys to variable_dictionary (#26767) Co-authored-by: -LAN- <laipz8200@outlook.com> Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * refactor: implement tenant self queue for rag tasks (#27559) Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com> Co-authored-by: -LAN- <laipz8200@outlook.com> * fix: bump brotli to 1.2.0 resloved CVE-2025-6176 (#27950) Signed-off-by: kenwoodjw <blackxin55+@gmail.com> --------- Signed-off-by: Yongtao Huang <yongtaoh2022@gmail.com> Signed-off-by: kenwoodjw <blackxin55+@gmail.com> Co-authored-by: aka James4u <smart.jamesjin@gmail.com> Co-authored-by: Novice <novice12185727@gmail.com> Co-authored-by: yangzheli <43645580+yangzheli@users.noreply.github.com> Co-authored-by: Elliott <105957288+Elliott-byte@users.noreply.github.com> Co-authored-by: crazywoola <100913391+crazywoola@users.noreply.github.com> Co-authored-by: johnny0120 <johnny0120@users.noreply.github.com> Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> Co-authored-by: Gritty_dev <101377478+codomposer@users.noreply.github.com> Co-authored-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com> Co-authored-by: wangjifeng <163279492+kk-wangjifeng@users.noreply.github.com> Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com> Co-authored-by: Boris Polonsky <BorisPolonsky@users.noreply.github.com> Co-authored-by: Yongtao Huang <yongtaoh2022@gmail.com> Co-authored-by: Cursx <33718736+Cursx@users.noreply.github.com> Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> Co-authored-by: Asuka Minato <i@asukaminato.eu.org> Co-authored-by: Jyong <76649700+JohnJyong@users.noreply.github.com> Co-authored-by: red_sun <56100962+redSun64@users.noreply.github.com> Co-authored-by: NFish <douxc512@gmail.com> Co-authored-by: QuantumGhost <obelisk.reg+git@gmail.com> Co-authored-by: -LAN- <laipz8200@outlook.com> Co-authored-by: hj24 <huangjian@dify.ai> Co-authored-by: kenwoodjw <blackxin55+@gmail.com>
This commit is contained in:
@ -40,20 +40,15 @@ from core.workflow.repositories.draft_variable_repository import DraftVariableSa
|
||||
from core.workflow.repositories.workflow_execution_repository import WorkflowExecutionRepository
|
||||
from core.workflow.repositories.workflow_node_execution_repository import WorkflowNodeExecutionRepository
|
||||
from core.workflow.variable_loader import DUMMY_VARIABLE_LOADER, VariableLoader
|
||||
from enums.cloud_plan import CloudPlan
|
||||
from extensions.ext_database import db
|
||||
from extensions.ext_redis import redis_client
|
||||
from libs.flask_utils import preserve_flask_contexts
|
||||
from models import Account, EndUser, Workflow, WorkflowNodeExecutionTriggeredFrom
|
||||
from models.dataset import Document, DocumentPipelineExecutionLog, Pipeline
|
||||
from models.enums import WorkflowRunTriggeredFrom
|
||||
from models.model import AppMode
|
||||
from services.datasource_provider_service import DatasourceProviderService
|
||||
from services.feature_service import FeatureService
|
||||
from services.file_service import FileService
|
||||
from services.rag_pipeline.rag_pipeline_task_proxy import RagPipelineTaskProxy
|
||||
from services.workflow_draft_variable_service import DraftVarLoader, WorkflowDraftVariableService
|
||||
from tasks.rag_pipeline.priority_rag_pipeline_run_task import priority_rag_pipeline_run_task
|
||||
from tasks.rag_pipeline.rag_pipeline_run_task import rag_pipeline_run_task
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@ -249,34 +244,7 @@ class PipelineGenerator(BaseAppGenerator):
|
||||
)
|
||||
|
||||
if rag_pipeline_invoke_entities:
|
||||
# store the rag_pipeline_invoke_entities to object storage
|
||||
text = [item.model_dump() for item in rag_pipeline_invoke_entities]
|
||||
name = "rag_pipeline_invoke_entities.json"
|
||||
# Convert list to proper JSON string
|
||||
json_text = json.dumps(text)
|
||||
upload_file = FileService(db.engine).upload_text(json_text, name, user.id, dataset.tenant_id)
|
||||
features = FeatureService.get_features(dataset.tenant_id)
|
||||
if features.billing.enabled and features.billing.subscription.plan == CloudPlan.SANDBOX:
|
||||
tenant_pipeline_task_key = f"tenant_pipeline_task:{dataset.tenant_id}"
|
||||
tenant_self_pipeline_task_queue = f"tenant_self_pipeline_task_queue:{dataset.tenant_id}"
|
||||
|
||||
if redis_client.get(tenant_pipeline_task_key):
|
||||
# Add to waiting queue using List operations (lpush)
|
||||
redis_client.lpush(tenant_self_pipeline_task_queue, upload_file.id)
|
||||
else:
|
||||
# Set flag and execute task
|
||||
redis_client.set(tenant_pipeline_task_key, 1, ex=60 * 60)
|
||||
rag_pipeline_run_task.delay( # type: ignore
|
||||
rag_pipeline_invoke_entities_file_id=upload_file.id,
|
||||
tenant_id=dataset.tenant_id,
|
||||
)
|
||||
|
||||
else:
|
||||
priority_rag_pipeline_run_task.delay( # type: ignore
|
||||
rag_pipeline_invoke_entities_file_id=upload_file.id,
|
||||
tenant_id=dataset.tenant_id,
|
||||
)
|
||||
|
||||
RagPipelineTaskProxy(dataset.tenant_id, user.id, rag_pipeline_invoke_entities).delay()
|
||||
# return batch, dataset, documents
|
||||
return {
|
||||
"batch": batch,
|
||||
|
||||
15
api/core/entities/document_task.py
Normal file
15
api/core/entities/document_task.py
Normal file
@ -0,0 +1,15 @@
|
||||
from collections.abc import Sequence
|
||||
from dataclasses import dataclass
|
||||
|
||||
|
||||
@dataclass
|
||||
class DocumentTask:
|
||||
"""Document task entity for document indexing operations.
|
||||
|
||||
This class represents a document indexing task that can be queued
|
||||
and processed by the document indexing system.
|
||||
"""
|
||||
|
||||
tenant_id: str
|
||||
dataset_id: str
|
||||
document_ids: Sequence[str]
|
||||
@ -6,10 +6,7 @@ from core.helper.code_executor.template_transformer import TemplateTransformer
|
||||
class NodeJsTemplateTransformer(TemplateTransformer):
|
||||
@classmethod
|
||||
def get_runner_script(cls) -> str:
|
||||
runner_script = dedent(
|
||||
f"""
|
||||
// declare main function
|
||||
{cls._code_placeholder}
|
||||
runner_script = dedent(f""" {cls._code_placeholder}
|
||||
|
||||
// decode and prepare input object
|
||||
var inputs_obj = JSON.parse(Buffer.from('{cls._inputs_placeholder}', 'base64').toString('utf-8'))
|
||||
@ -21,6 +18,5 @@ class NodeJsTemplateTransformer(TemplateTransformer):
|
||||
var output_json = JSON.stringify(output_obj)
|
||||
var result = `<<RESULT>>${{output_json}}<<RESULT>>`
|
||||
console.log(result)
|
||||
"""
|
||||
)
|
||||
""")
|
||||
return runner_script
|
||||
|
||||
@ -6,9 +6,7 @@ from core.helper.code_executor.template_transformer import TemplateTransformer
|
||||
class Python3TemplateTransformer(TemplateTransformer):
|
||||
@classmethod
|
||||
def get_runner_script(cls) -> str:
|
||||
runner_script = dedent(f"""
|
||||
# declare main function
|
||||
{cls._code_placeholder}
|
||||
runner_script = dedent(f""" {cls._code_placeholder}
|
||||
|
||||
import json
|
||||
from base64 import b64decode
|
||||
|
||||
@ -39,11 +39,13 @@ class WeaviateConfig(BaseModel):
|
||||
|
||||
Attributes:
|
||||
endpoint: Weaviate server endpoint URL
|
||||
grpc_endpoint: Optional Weaviate gRPC server endpoint URL
|
||||
api_key: Optional API key for authentication
|
||||
batch_size: Number of objects to batch per insert operation
|
||||
"""
|
||||
|
||||
endpoint: str
|
||||
grpc_endpoint: str | None = None
|
||||
api_key: str | None = None
|
||||
batch_size: int = 100
|
||||
|
||||
@ -88,9 +90,22 @@ class WeaviateVector(BaseVector):
|
||||
http_secure = p.scheme == "https"
|
||||
http_port = p.port or (443 if http_secure else 80)
|
||||
|
||||
grpc_host = host
|
||||
grpc_secure = http_secure
|
||||
grpc_port = 443 if grpc_secure else 50051
|
||||
# Parse gRPC configuration
|
||||
if config.grpc_endpoint:
|
||||
# Urls without scheme won't be parsed correctly in some python verions,
|
||||
# see https://bugs.python.org/issue27657
|
||||
grpc_endpoint_with_scheme = (
|
||||
config.grpc_endpoint if "://" in config.grpc_endpoint else f"grpc://{config.grpc_endpoint}"
|
||||
)
|
||||
grpc_p = urlparse(grpc_endpoint_with_scheme)
|
||||
grpc_host = grpc_p.hostname or "localhost"
|
||||
grpc_port = grpc_p.port or (443 if grpc_p.scheme == "grpcs" else 50051)
|
||||
grpc_secure = grpc_p.scheme == "grpcs"
|
||||
else:
|
||||
# Infer from HTTP endpoint as fallback
|
||||
grpc_host = host
|
||||
grpc_secure = http_secure
|
||||
grpc_port = 443 if grpc_secure else 50051
|
||||
|
||||
client = weaviate.connect_to_custom(
|
||||
http_host=host,
|
||||
@ -432,6 +447,7 @@ class WeaviateVectorFactory(AbstractVectorFactory):
|
||||
collection_name=collection_name,
|
||||
config=WeaviateConfig(
|
||||
endpoint=dify_config.WEAVIATE_ENDPOINT or "",
|
||||
grpc_endpoint=dify_config.WEAVIATE_GRPC_ENDPOINT or "",
|
||||
api_key=dify_config.WEAVIATE_API_KEY,
|
||||
batch_size=dify_config.WEAVIATE_BATCH_SIZE,
|
||||
),
|
||||
|
||||
0
api/core/rag/pipeline/__init__.py
Normal file
0
api/core/rag/pipeline/__init__.py
Normal file
79
api/core/rag/pipeline/queue.py
Normal file
79
api/core/rag/pipeline/queue.py
Normal file
@ -0,0 +1,79 @@
|
||||
import json
|
||||
from collections.abc import Sequence
|
||||
from typing import Any
|
||||
|
||||
from pydantic import BaseModel, ValidationError
|
||||
|
||||
from extensions.ext_redis import redis_client
|
||||
|
||||
_DEFAULT_TASK_TTL = 60 * 60 # 1 hour
|
||||
|
||||
|
||||
class TaskWrapper(BaseModel):
|
||||
data: Any
|
||||
|
||||
def serialize(self) -> str:
|
||||
return self.model_dump_json()
|
||||
|
||||
@classmethod
|
||||
def deserialize(cls, serialized_data: str) -> "TaskWrapper":
|
||||
return cls.model_validate_json(serialized_data)
|
||||
|
||||
|
||||
class TenantIsolatedTaskQueue:
|
||||
"""
|
||||
Simple queue for tenant isolated tasks, used for rag related tenant tasks isolation.
|
||||
It uses Redis list to store tasks, and Redis key to store task waiting flag.
|
||||
Support tasks that can be serialized by json.
|
||||
"""
|
||||
|
||||
def __init__(self, tenant_id: str, unique_key: str):
|
||||
self._tenant_id = tenant_id
|
||||
self._unique_key = unique_key
|
||||
self._queue = f"tenant_self_{unique_key}_task_queue:{tenant_id}"
|
||||
self._task_key = f"tenant_{unique_key}_task:{tenant_id}"
|
||||
|
||||
def get_task_key(self):
|
||||
return redis_client.get(self._task_key)
|
||||
|
||||
def set_task_waiting_time(self, ttl: int = _DEFAULT_TASK_TTL):
|
||||
redis_client.setex(self._task_key, ttl, 1)
|
||||
|
||||
def delete_task_key(self):
|
||||
redis_client.delete(self._task_key)
|
||||
|
||||
def push_tasks(self, tasks: Sequence[Any]):
|
||||
serialized_tasks = []
|
||||
for task in tasks:
|
||||
# Store str list directly, maintaining full compatibility for pipeline scenarios
|
||||
if isinstance(task, str):
|
||||
serialized_tasks.append(task)
|
||||
else:
|
||||
# Use TaskWrapper to do JSON serialization for non-string tasks
|
||||
wrapper = TaskWrapper(data=task)
|
||||
serialized_data = wrapper.serialize()
|
||||
serialized_tasks.append(serialized_data)
|
||||
|
||||
redis_client.lpush(self._queue, *serialized_tasks)
|
||||
|
||||
def pull_tasks(self, count: int = 1) -> Sequence[Any]:
|
||||
if count <= 0:
|
||||
return []
|
||||
|
||||
tasks = []
|
||||
for _ in range(count):
|
||||
serialized_task = redis_client.rpop(self._queue)
|
||||
if not serialized_task:
|
||||
break
|
||||
|
||||
if isinstance(serialized_task, bytes):
|
||||
serialized_task = serialized_task.decode("utf-8")
|
||||
|
||||
try:
|
||||
wrapper = TaskWrapper.deserialize(serialized_task)
|
||||
tasks.append(wrapper.data)
|
||||
except (json.JSONDecodeError, ValidationError, TypeError, ValueError):
|
||||
# Fall back to raw string for legacy format or invalid JSON
|
||||
tasks.append(serialized_task)
|
||||
|
||||
return tasks
|
||||
@ -210,12 +210,13 @@ class Tool(ABC):
|
||||
meta=meta,
|
||||
)
|
||||
|
||||
def create_json_message(self, object: dict) -> ToolInvokeMessage:
|
||||
def create_json_message(self, object: dict, suppress_output: bool = False) -> ToolInvokeMessage:
|
||||
"""
|
||||
create a json message
|
||||
"""
|
||||
return ToolInvokeMessage(
|
||||
type=ToolInvokeMessage.MessageType.JSON, message=ToolInvokeMessage.JsonMessage(json_object=object)
|
||||
type=ToolInvokeMessage.MessageType.JSON,
|
||||
message=ToolInvokeMessage.JsonMessage(json_object=object, suppress_output=suppress_output),
|
||||
)
|
||||
|
||||
def create_variable_message(
|
||||
|
||||
@ -129,6 +129,7 @@ class ToolInvokeMessage(BaseModel):
|
||||
|
||||
class JsonMessage(BaseModel):
|
||||
json_object: dict
|
||||
suppress_output: bool = Field(default=False, description="Whether to suppress JSON output in result string")
|
||||
|
||||
class BlobMessage(BaseModel):
|
||||
blob: bytes
|
||||
|
||||
@ -245,6 +245,9 @@ class ToolEngine:
|
||||
+ "you do not need to create it, just tell the user to check it now."
|
||||
)
|
||||
elif response.type == ToolInvokeMessage.MessageType.JSON:
|
||||
json_message = cast(ToolInvokeMessage.JsonMessage, response.message)
|
||||
if json_message.suppress_output:
|
||||
continue
|
||||
json_parts.append(
|
||||
json.dumps(
|
||||
safe_json_value(cast(ToolInvokeMessage.JsonMessage, response.message).json_object),
|
||||
|
||||
@ -117,7 +117,7 @@ class WorkflowTool(Tool):
|
||||
self._latest_usage = self._derive_usage_from_result(data)
|
||||
|
||||
yield self.create_text_message(json.dumps(outputs, ensure_ascii=False))
|
||||
yield self.create_json_message(outputs)
|
||||
yield self.create_json_message(outputs, suppress_output=True)
|
||||
|
||||
@property
|
||||
def latest_usage(self) -> LLMUsage:
|
||||
|
||||
@ -153,7 +153,11 @@ class VariablePool(BaseModel):
|
||||
return None
|
||||
|
||||
node_id, name = self._selector_to_keys(selector)
|
||||
segment: Segment | None = self.variable_dictionary[node_id].get(name)
|
||||
node_map = self.variable_dictionary.get(node_id)
|
||||
if node_map is None:
|
||||
return None
|
||||
|
||||
segment: Segment | None = node_map.get(name)
|
||||
|
||||
if segment is None:
|
||||
return None
|
||||
|
||||
Reference in New Issue
Block a user