Compare commits

..

7 Commits

21 changed files with 393 additions and 505 deletions

View File

@ -158,7 +158,7 @@ WEB_API_CORS_ALLOW_ORIGINS=http://localhost:3000,*
CONSOLE_CORS_ALLOW_ORIGINS=http://localhost:3000,*
# Vector database configuration
# Supported values are `weaviate`, `qdrant`, `milvus`, `myscale`, `relyt`, `pgvector`, `pgvecto-rs`, `chroma`, `opensearch`, `oracle`, `tencent`, `elasticsearch`, `elasticsearch-ja`, `analyticdb`, `couchbase`, `vikingdb`, `oceanbase`, `opengauss`, `tablestore`,`vastbase`,`tidb`,`tidb_on_qdrant`,`baidu`,`lindorm`,`huawei_cloud`,`upstash`, `matrixone`, `pinecone`.
# Supported values are `weaviate`, `qdrant`, `milvus`, `myscale`, `relyt`, `pgvector`, `pgvecto-rs`, `chroma`, `opensearch`, `oracle`, `tencent`, `elasticsearch`, `elasticsearch-ja`, `analyticdb`, `couchbase`, `vikingdb`, `oceanbase`, `opengauss`, `tablestore`,`vastbase`,`tidb`,`tidb_on_qdrant`,`baidu`,`lindorm`,`huawei_cloud`,`upstash`, `matrixone`.
VECTOR_STORE=weaviate
# Prefix used to create collection name in vector database
VECTOR_INDEX_NAME_PREFIX=Vector_index
@ -365,16 +365,6 @@ PROMPT_GENERATION_MAX_TOKENS=512
CODE_GENERATION_MAX_TOKENS=1024
PLUGIN_BASED_TOKEN_COUNTING_ENABLED=false
# Pinecone configuration, only available when VECTOR_STORE is `pinecone`
PINECONE_API_KEY=your-pinecone-api-key
PINECONE_ENVIRONMENT=your-pinecone-environment
PINECONE_INDEX_NAME=dify-index
PINECONE_CLIENT_TIMEOUT=30
PINECONE_BATCH_SIZE=100
PINECONE_METRIC=cosine
PINECONE_PODS=1
PINECONE_POD_TYPE=s1
# Mail configuration, support: resend, smtp, sendgrid
MAIL_TYPE=
# If using SendGrid, use the 'from' field for authentication if necessary.

View File

@ -18,3 +18,18 @@ class EnterpriseFeatureConfig(BaseSettings):
description="Allow customization of the enterprise logo.",
default=False,
)
UPLOAD_KNOWLEDGE_PIPELINE_TEMPLATE_TOKEN: str = Field(
description="Token for uploading knowledge pipeline template.",
default="",
)
KNOWLEDGE_PIPELINE_TEMPLATE_COPYRIGHT: str = Field(
description="Knowledge pipeline template copyright.",
default="Copyright 2023 Dify",
)
KNOWLEDGE_PIPELINE_TEMPLATE_PRIVACY_POLICY: str = Field(
description="Knowledge pipeline template privacy policy.",
default="https://dify.ai",
)

View File

@ -35,7 +35,6 @@ from .vdb.opensearch_config import OpenSearchConfig
from .vdb.oracle_config import OracleConfig
from .vdb.pgvector_config import PGVectorConfig
from .vdb.pgvectors_config import PGVectoRSConfig
from .vdb.pinecone_config import PineconeConfig
from .vdb.qdrant_config import QdrantConfig
from .vdb.relyt_config import RelytConfig
from .vdb.tablestore_config import TableStoreConfig
@ -337,7 +336,6 @@ class MiddlewareConfig(
PGVectorConfig,
VastbaseVectorConfig,
PGVectoRSConfig,
PineconeConfig,
QdrantConfig,
RelytConfig,
TencentVectorDBConfig,

View File

@ -1,38 +0,0 @@
from pydantic import Field, PositiveInt
from pydantic_settings import BaseSettings
class PineconeConfig(BaseSettings):
"""
Configuration settings for Pinecone vector database
"""
PINECONE_API_KEY: str | None = Field(
description="API key for authenticating with Pinecone service",
default=None,
)
PINECONE_ENVIRONMENT: str | None = Field(
description="Pinecone environment (e.g., 'us-west1-gcp', 'us-east-1-aws')",
default=None,
)
PINECONE_INDEX_NAME: str | None = Field(
description="Default Pinecone index name",
default=None,
)
PINECONE_CLIENT_TIMEOUT: PositiveInt = Field(
description="Timeout in seconds for Pinecone client operations (default is 30 seconds)",
default=30,
)
PINECONE_BATCH_SIZE: PositiveInt = Field(
description="Batch size for Pinecone operations (default is 100)",
default=100,
)
PINECONE_METRIC: str = Field(
description="Distance metric for Pinecone index (cosine, euclidean, dotproduct)",
default="cosine",
)

View File

@ -784,7 +784,6 @@ class DatasetRetrievalSettingApi(Resource):
| VectorType.PGVECTO_RS
| VectorType.VIKINGDB
| VectorType.UPSTASH
| VectorType.PINECONE
):
return {"retrieval_method": [RetrievalMethod.SEMANTIC_SEARCH.value]}
case (
@ -841,7 +840,6 @@ class DatasetRetrievalSettingMockApi(Resource):
| VectorType.PGVECTO_RS
| VectorType.VIKINGDB
| VectorType.UPSTASH
| VectorType.PINECONE
):
return {"retrieval_method": [RetrievalMethod.SEMANTIC_SEARCH.value]}
case (

View File

@ -14,7 +14,10 @@ from controllers.console.wraps import (
from extensions.ext_database import db
from libs.login import login_required
from models.dataset import PipelineCustomizedTemplate
from services.entities.knowledge_entities.rag_pipeline_entities import PipelineTemplateInfoEntity
from services.entities.knowledge_entities.rag_pipeline_entities import (
PipelineBuiltInTemplateEntity,
PipelineTemplateInfoEntity,
)
from services.rag_pipeline.rag_pipeline import RagPipelineService
logger = logging.getLogger(__name__)
@ -26,12 +29,6 @@ def _validate_name(name):
return name
def _validate_description_length(description):
if len(description) > 400:
raise ValueError("Description cannot exceed 400 characters.")
return description
class PipelineTemplateListApi(Resource):
@setup_required
@login_required
@ -146,6 +143,186 @@ class PublishCustomizedPipelineTemplateApi(Resource):
return {"result": "success"}
class PipelineTemplateInstallApi(Resource):
"""API endpoint for installing built-in pipeline templates"""
def post(self):
"""
Install a built-in pipeline template
Args:
template_id: The template ID from URL parameter
Returns:
Success response or error with appropriate HTTP status
"""
try:
# Extract and validate Bearer token
auth_token = self._extract_bearer_token()
# Parse and validate request parameters
template_args = self._parse_template_args()
# Process uploaded template file
file_content = self._process_template_file()
# Create template entity
pipeline_built_in_template_entity = PipelineBuiltInTemplateEntity(**template_args)
# Install the template
rag_pipeline_service = RagPipelineService()
rag_pipeline_service.install_built_in_pipeline_template(
pipeline_built_in_template_entity, file_content, auth_token
)
return {"result": "success", "message": "Template installed successfully"}, 200
except ValueError as e:
logger.exception("Validation error in template installation")
return {"error": str(e)}, 400
except Exception as e:
logger.exception("Unexpected error in template installation")
return {"error": "An unexpected error occurred during template installation"}, 500
def _extract_bearer_token(self) -> str:
"""
Extract and validate Bearer token from Authorization header
Returns:
The extracted token string
Raises:
ValueError: If token is missing or invalid
"""
auth_header = request.headers.get("Authorization", "").strip()
if not auth_header:
raise ValueError("Authorization header is required")
if not auth_header.startswith("Bearer "):
raise ValueError("Authorization header must start with 'Bearer '")
token_parts = auth_header.split(" ", 1)
if len(token_parts) != 2:
raise ValueError("Invalid Authorization header format")
auth_token = token_parts[1].strip()
if not auth_token:
raise ValueError("Bearer token cannot be empty")
return auth_token
def _parse_template_args(self) -> dict:
"""
Parse and validate template arguments from form data
Args:
template_id: The template ID from URL
Returns:
Dictionary of validated template arguments
"""
# Use reqparse for consistent parameter parsing
parser = reqparse.RequestParser()
parser.add_argument(
"template_id",
type=str,
location="form",
required=False,
help="Template ID for updating existing template"
)
parser.add_argument(
"language",
type=str,
location="form",
required=True,
default="en-US",
choices=["en-US", "zh-CN", "ja-JP"],
help="Template language code"
)
parser.add_argument(
"name",
type=str,
location="form",
required=True,
default="New Pipeline Template",
help="Template name (1-200 characters)"
)
parser.add_argument(
"description",
type=str,
location="form",
required=False,
default="",
help="Template description (max 1000 characters)"
)
args = parser.parse_args()
# Additional validation
if args.get("name"):
args["name"] = self._validate_name(args["name"])
if args.get("description") and len(args["description"]) > 1000:
raise ValueError("Description must not exceed 1000 characters")
# Filter out None values
return {k: v for k, v in args.items() if v is not None}
def _validate_name(self, name: str) -> str:
"""
Validate template name
Args:
name: Template name to validate
Returns:
Validated and trimmed name
Raises:
ValueError: If name is invalid
"""
name = name.strip()
if not name or len(name) < 1 or len(name) > 200:
raise ValueError("Template name must be between 1 and 200 characters")
return name
def _process_template_file(self) -> str:
"""
Process and validate uploaded template file
Returns:
File content as string
Raises:
ValueError: If file is missing or invalid
"""
if "file" not in request.files:
raise ValueError("Template file is required")
file = request.files["file"]
# Validate file
if not file or not file.filename:
raise ValueError("No file selected")
filename = file.filename.strip()
if not filename:
raise ValueError("File name cannot be empty")
# Check file extension
if not filename.lower().endswith(".pipeline"):
raise ValueError("Template file must be a pipeline file (.pipeline)")
try:
file_content = file.read().decode("utf-8")
except UnicodeDecodeError:
raise ValueError("Template file must be valid UTF-8 text")
return file_content
api.add_resource(
PipelineTemplateListApi,
"/rag/pipeline/templates",
@ -162,3 +339,7 @@ api.add_resource(
PublishCustomizedPipelineTemplateApi,
"/rag/pipelines/<string:pipeline_id>/customized/publish",
)
api.add_resource(
PipelineTemplateInstallApi,
"/rag/pipeline/built-in/templates/install",
)

View File

@ -1,338 +0,0 @@
import json
import time
from typing import Any
from pinecone import Pinecone, ServerlessSpec
from pydantic import BaseModel
from configs import dify_config
from core.rag.datasource.vdb.field import Field
from core.rag.datasource.vdb.vector_base import BaseVector
from core.rag.datasource.vdb.vector_factory import AbstractVectorFactory
from core.rag.datasource.vdb.vector_type import VectorType
from core.rag.embedding.embedding_base import Embeddings
from core.rag.models.document import Document
from extensions.ext_database import db
from extensions.ext_redis import redis_client
from models.dataset import Dataset, DatasetCollectionBinding
class PineconeConfig(BaseModel):
"""Pinecone configuration class"""
api_key: str
environment: str
index_name: str | None = None
timeout: float = 30
batch_size: int = 100
metric: str = "cosine"
class PineconeVector(BaseVector):
"""Pinecone vector database concrete implementation class"""
def __init__(self, collection_name: str, group_id: str, config: PineconeConfig):
super().__init__(collection_name)
self._client_config = config
self._group_id = group_id
# Initialize Pinecone client with SSL configuration
try:
self._pc = Pinecone(
api_key=config.api_key,
# Configure SSL to handle connection issues
ssl_ca_certs=None, # Use system default CA certificates
)
except Exception as e:
# Fallback to basic initialization if SSL config fails
self._pc = Pinecone(api_key=config.api_key)
# Normalize index name: lowercase, only a-z0-9- and <=45 chars
import hashlib
import re
base_name = collection_name.lower()
base_name = re.sub(r"[^a-z0-9-]+", "-", base_name) # replace invalid chars with '-'
base_name = re.sub(r"-+", "-", base_name).strip("-")
# Use longer secure suffix to reduce collision risk
suffix_len = 24 # 24 hex digits (96-bit entropy)
if len(base_name) > 45:
hash_suffix = hashlib.sha256(base_name.encode()).hexdigest()[:suffix_len]
truncated_name = base_name[: 45 - (suffix_len + 1)].rstrip("-")
self._index_name = f"{truncated_name}-{hash_suffix}"
else:
self._index_name = base_name
# Guard empty name
if not self._index_name:
self._index_name = f"index-{hashlib.sha256(collection_name.encode()).hexdigest()[:suffix_len]}"
# Pinecone index handle, lazily initialized
self._index: Any | None = None
def get_type(self) -> str:
"""Return vector database type identifier."""
return VectorType.PINECONE
def _ensure_index_initialized(self) -> None:
"""Ensure that self._index is attached to an existing Pinecone index."""
if self._index is not None:
return
try:
existing_indexes = self._pc.list_indexes().names()
if self._index_name in existing_indexes:
self._index = self._pc.Index(self._index_name)
else:
raise ValueError("Index not initialized. Please ingest documents to create index.")
except Exception:
raise
def to_index_struct(self) -> dict:
"""Generate index structure dictionary"""
return {"type": self.get_type(), "vector_store": {"class_prefix": self._collection_name}}
def create(self, texts: list[Document], embeddings: list[list[float]], **kwargs):
"""Create vector index"""
if texts:
# Get vector dimension
vector_size = len(embeddings[0])
# Create Pinecone index
self.create_index(vector_size)
# Add vector data
self.add_texts(texts, embeddings, **kwargs)
def create_index(self, dimension: int):
"""Create Pinecone index"""
lock_name = f"vector_indexing_lock_{self._index_name}"
with redis_client.lock(lock_name, timeout=30):
# Check Redis cache
index_exist_cache_key = f"vector_indexing_{self._index_name}"
if redis_client.get(index_exist_cache_key):
self._index = self._pc.Index(self._index_name)
return
# Check if index already exists
existing_indexes = self._pc.list_indexes().names()
if self._index_name not in existing_indexes:
# Create new index using ServerlessSpec
self._pc.create_index(
name=self._index_name,
dimension=dimension,
metric=self._client_config.metric,
spec=ServerlessSpec(cloud="aws", region=self._client_config.environment),
)
# Wait for index creation to complete
while not self._pc.describe_index(self._index_name).status["ready"]:
time.sleep(1)
else:
# Get index instance
self._index = self._pc.Index(self._index_name)
# Set cache
redis_client.set(index_exist_cache_key, 1, ex=3600)
def add_texts(self, documents: list[Document], embeddings: list[list[float]], **kwargs):
"""Batch add document vectors"""
if not self._index:
raise ValueError("Index not initialized. Call create() first.")
total_docs = len(documents)
uuids = self._get_uuids(documents)
batch_size = self._client_config.batch_size
added_ids = []
# Batch processing
total_batches = (total_docs + batch_size - 1) // batch_size # Ceiling division
for batch_idx, i in enumerate(range(0, len(documents), batch_size), 1):
batch_documents = documents[i : i + batch_size]
batch_embeddings = embeddings[i : i + batch_size]
batch_uuids = uuids[i : i + batch_size]
batch_size_actual = len(batch_documents)
# Build Pinecone vector data (metadata must be primitives or list[str])
vectors_to_upsert = []
for doc, embedding, doc_id in zip(batch_documents, batch_embeddings, batch_uuids):
raw_meta = doc.metadata or {}
safe_meta: dict[str, Any] = {}
# lift common identifiers to top-level fields for filtering
for k, v in raw_meta.items():
if isinstance(v, (str, int, float, bool)) or (
isinstance(v, list) and all(isinstance(x, str) for x in v)
):
safe_meta[k] = v
else:
safe_meta[k] = json.dumps(v, ensure_ascii=False)
# keep content as string metadata if needed
safe_meta[Field.CONTENT_KEY.value] = doc.page_content
# group id as string
safe_meta[Field.GROUP_KEY.value] = str(self._group_id)
vectors_to_upsert.append({"id": doc_id, "values": embedding, "metadata": safe_meta})
# Batch insert to Pinecone
try:
self._index.upsert(vectors=vectors_to_upsert)
added_ids.extend(batch_uuids)
except Exception as e:
raise
return added_ids
def search_by_vector(self, query_vector: list[float], **kwargs) -> list[Document]:
"""Vector similarity search"""
# Lazily attach to an existing index if needed
self._ensure_index_initialized()
top_k = kwargs.get("top_k", 4)
score_threshold = float(kwargs.get("score_threshold", 0.0))
# Build filter conditions
filter_dict = {Field.GROUP_KEY.value: {"$eq": str(self._group_id)}}
# Document scope filtering
document_ids_filter = kwargs.get("document_ids_filter")
if document_ids_filter:
filter_dict["document_id"] = {"$in": document_ids_filter}
# Execute search
try:
index = self._index
assert index is not None
response = index.query(vector=query_vector, top_k=top_k, include_metadata=True, filter=filter_dict)
except Exception as e:
raise
# Convert results
docs = []
filtered_count = 0
for match in response.matches:
if match.score >= score_threshold:
page_content = match.metadata.get(Field.CONTENT_KEY.value, "")
metadata = dict(match.metadata or {})
metadata.pop(Field.CONTENT_KEY.value, None)
metadata.pop(Field.GROUP_KEY.value, None)
metadata["score"] = match.score
doc = Document(page_content=page_content, metadata=metadata)
docs.append(doc)
else:
filtered_count += 1
# Sort by similarity score in descending order
docs.sort(key=lambda x: x.metadata.get("score", 0), reverse=True)
return docs
def search_by_full_text(self, query: str, **kwargs) -> list[Document]:
"""Full-text search - Pinecone does not natively support it, returns empty list"""
return []
def delete_by_metadata_field(self, key: str, value: str):
"""Delete by metadata field"""
self._ensure_index_initialized()
try:
# Build filter conditions
filter_dict = {
Field.GROUP_KEY.value: {"$eq": self._group_id},
f"{Field.METADATA_KEY.value}.{key}": {"$eq": value},
}
# Pinecone delete operation
index = self._index
assert index is not None
index.delete(filter=filter_dict)
except Exception as e:
# Ignore delete errors
pass
def delete_by_ids(self, ids: list[str]) -> None:
"""Batch delete by ID list"""
self._ensure_index_initialized()
try:
# Pinecone delete by ID
index = self._index
assert index is not None
index.delete(ids=ids)
except Exception as e:
raise
def delete(self) -> None:
"""Delete all vector data for the entire dataset"""
self._ensure_index_initialized()
try:
# Delete all vectors by group_id
filter_dict = {Field.GROUP_KEY.value: {"$eq": self._group_id}}
index = self._index
assert index is not None
index.delete(filter=filter_dict)
except Exception as e:
raise
def text_exists(self, id: str) -> bool:
"""Check if document exists"""
try:
self._ensure_index_initialized()
except Exception:
return False
try:
# Check if vector exists through query
index = self._index
assert index is not None
response = index.fetch(ids=[id])
exists = id in response.vectors
return exists
except Exception as e:
return False
class PineconeVectorFactory(AbstractVectorFactory):
"""Pinecone vector database factory class"""
def init_vector(self, dataset: Dataset, attributes: list, embeddings: Embeddings) -> PineconeVector:
"""Create PineconeVector instance"""
# Determine index name
if dataset.collection_binding_id:
dataset_collection_binding = (
db.session.query(DatasetCollectionBinding)
.where(DatasetCollectionBinding.id == dataset.collection_binding_id)
.one_or_none()
)
if dataset_collection_binding:
collection_name = dataset_collection_binding.collection_name
else:
raise ValueError("Dataset Collection Bindings does not exist!")
else:
if dataset.index_struct_dict:
class_prefix: str = dataset.index_struct_dict["vector_store"]["class_prefix"]
collection_name = class_prefix
else:
dataset_id = dataset.id
collection_name = Dataset.gen_collection_name_by_id(dataset_id)
# Set index structure
if not dataset.index_struct_dict:
dataset.index_struct = json.dumps(self.gen_index_struct_dict(VectorType.PINECONE, collection_name))
# Create PineconeVector instance
return PineconeVector(
collection_name=collection_name,
group_id=dataset.id,
config=PineconeConfig(
api_key=dify_config.PINECONE_API_KEY or "",
environment=dify_config.PINECONE_ENVIRONMENT or "",
index_name=dify_config.PINECONE_INDEX_NAME,
timeout=dify_config.PINECONE_CLIENT_TIMEOUT,
batch_size=dify_config.PINECONE_BATCH_SIZE,
metric=dify_config.PINECONE_METRIC,
),
)

View File

@ -87,10 +87,6 @@ class Vector:
from core.rag.datasource.vdb.pgvecto_rs.pgvecto_rs import PGVectoRSFactory
return PGVectoRSFactory
case VectorType.PINECONE:
from core.rag.datasource.vdb.pinecone.pinecone_vector import PineconeVectorFactory
return PineconeVectorFactory
case VectorType.QDRANT:
from core.rag.datasource.vdb.qdrant.qdrant_vector import QdrantVectorFactory

View File

@ -31,4 +31,3 @@ class VectorType(StrEnum):
HUAWEI_CLOUD = "huawei_cloud"
MATRIXONE = "matrixone"
CLICKZETTA = "clickzetta"
PINECONE = "pinecone"

View File

@ -0,0 +1,37 @@
"""remove-builtin-template-user
Revision ID: bf0bcbf45396
Revises: 68519ad5cd18
Create Date: 2025-09-25 16:50:32.245503
"""
from alembic import op
import models as models
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = 'bf0bcbf45396'
down_revision = '68519ad5cd18'
branch_labels = None
depends_on = None
def upgrade():
# ### commands auto generated by Alembic - please adjust! ###
with op.batch_alter_table('pipeline_built_in_templates', schema=None) as batch_op:
batch_op.drop_column('updated_by')
batch_op.drop_column('created_by')
# ### end Alembic commands ###
def downgrade():
# ### commands auto generated by Alembic - please adjust! ###
with op.batch_alter_table('pipeline_built_in_templates', schema=None) as batch_op:
batch_op.add_column(sa.Column('created_by', sa.UUID(), autoincrement=False, nullable=False))
batch_op.add_column(sa.Column('updated_by', sa.UUID(), autoincrement=False, nullable=True))
# ### end Alembic commands ###

View File

@ -1239,15 +1239,6 @@ class PipelineBuiltInTemplate(Base): # type: ignore[name-defined]
language = db.Column(db.String(255), nullable=False)
created_at = db.Column(db.DateTime, nullable=False, server_default=func.current_timestamp())
updated_at = db.Column(db.DateTime, nullable=False, server_default=func.current_timestamp())
created_by = db.Column(StringUUID, nullable=False)
updated_by = db.Column(StringUUID, nullable=True)
@property
def created_user_name(self):
account = db.session.query(Account).where(Account.id == self.created_by).first()
if account:
return account.name
return ""
class PipelineCustomizedTemplate(Base): # type: ignore[name-defined]

View File

@ -88,8 +88,7 @@ dependencies = [
"httpx-sse~=0.4.0",
"sendgrid~=6.12.3",
"flask-restx~=1.3.0",
"packaging>=24.2,<25.0",
"pinecone>=7.3.0",
"packaging~=23.2",
]
# Before adding new dependency, consider place it in
# alphabet order (a-z) and suitable group.

View File

@ -128,3 +128,10 @@ class KnowledgeConfiguration(BaseModel):
if v is None:
return ""
return v
class PipelineBuiltInTemplateEntity(BaseModel):
template_id: str | None = None
name: str
description: str
language: str

View File

@ -74,5 +74,4 @@ class DatabasePipelineTemplateRetrieval(PipelineTemplateRetrievalBase):
"chunk_structure": pipeline_template.chunk_structure,
"export_data": pipeline_template.yaml_content,
"graph": graph_data,
"created_by": pipeline_template.created_user_name,
}

View File

@ -8,6 +8,7 @@ from datetime import UTC, datetime
from typing import Any, Union, cast
from uuid import uuid4
import yaml
from flask_login import current_user
from sqlalchemy import func, or_, select
from sqlalchemy.orm import Session, sessionmaker
@ -60,6 +61,7 @@ from models.dataset import ( # type: ignore
Document,
DocumentPipelineExecutionLog,
Pipeline,
PipelineBuiltInTemplate,
PipelineCustomizedTemplate,
PipelineRecommendedPlugin,
)
@ -76,6 +78,7 @@ from repositories.factory import DifyAPIRepositoryFactory
from services.datasource_provider_service import DatasourceProviderService
from services.entities.knowledge_entities.rag_pipeline_entities import (
KnowledgeConfiguration,
PipelineBuiltInTemplateEntity,
PipelineTemplateInfoEntity,
)
from services.errors.app import WorkflowHashNotEqualError
@ -1454,3 +1457,140 @@ class RagPipelineService:
if not pipeline:
raise ValueError("Pipeline not found")
return pipeline
def install_built_in_pipeline_template(
self, args: PipelineBuiltInTemplateEntity, file_content: str, auth_token: str
) -> None:
"""
Install built-in pipeline template
Args:
args: Pipeline built-in template entity with template metadata
file_content: YAML content of the pipeline template
auth_token: Authentication token for authorization
Raises:
ValueError: If validation fails or template processing errors occur
"""
# Validate authentication
self._validate_auth_token(auth_token)
# Parse and validate template content
pipeline_template_dsl = self._parse_template_content(file_content)
# Extract template metadata
icon = self._extract_icon_metadata(pipeline_template_dsl)
chunk_structure = self._extract_chunk_structure(pipeline_template_dsl)
# Prepare template data
template_data = {
"name": args.name,
"description": args.description,
"chunk_structure": chunk_structure,
"icon": icon,
"language": args.language,
"yaml_content": file_content,
}
# Use transaction for database operations
try:
if args.template_id:
self._update_existing_template(args.template_id, template_data)
else:
self._create_new_template(template_data)
db.session.commit()
except Exception as e:
db.session.rollback()
raise ValueError(f"Failed to install pipeline template: {str(e)}")
def _validate_auth_token(self, auth_token: str) -> None:
"""Validate the authentication token"""
config_auth_token = dify_config.UPLOAD_KNOWLEDGE_PIPELINE_TEMPLATE_TOKEN
if not config_auth_token:
raise ValueError("Auth token configuration is required")
if config_auth_token != auth_token:
raise ValueError("Auth token is incorrect")
def _parse_template_content(self, file_content: str) -> dict:
"""Parse and validate YAML template content"""
try:
pipeline_template_dsl = yaml.safe_load(file_content)
except yaml.YAMLError as e:
raise ValueError(f"Invalid YAML content: {str(e)}")
if not pipeline_template_dsl:
raise ValueError("Pipeline template DSL is required")
return pipeline_template_dsl
def _extract_icon_metadata(self, pipeline_template_dsl: dict) -> dict:
"""Extract icon metadata from template DSL"""
rag_pipeline_info = pipeline_template_dsl.get("rag_pipeline", {})
return {
"icon": rag_pipeline_info.get("icon", "📙"),
"icon_type": rag_pipeline_info.get("icon_type", "emoji"),
"icon_background": rag_pipeline_info.get("icon_background", "#FFEAD5"),
"icon_url": rag_pipeline_info.get("icon_url"),
}
def _extract_chunk_structure(self, pipeline_template_dsl: dict) -> str:
"""Extract chunk structure from template DSL"""
nodes = pipeline_template_dsl.get("workflow", {}).get("graph", {}).get("nodes", [])
# Use generator expression for efficiency
chunk_structure = next(
(
node.get("data", {}).get("chunk_structure")
for node in nodes
if node.get("data", {}).get("type") == NodeType.KNOWLEDGE_INDEX.value
),
None
)
if not chunk_structure:
raise ValueError("Chunk structure is required in template")
return chunk_structure
def _update_existing_template(self, template_id: str, template_data: dict) -> None:
"""Update an existing pipeline template"""
pipeline_built_in_template = (
db.session.query(PipelineBuiltInTemplate)
.filter(PipelineBuiltInTemplate.id == template_id)
.first()
)
if not pipeline_built_in_template:
raise ValueError(f"Pipeline built-in template not found: {template_id}")
# Update template fields
for key, value in template_data.items():
setattr(pipeline_built_in_template, key, value)
db.session.add(pipeline_built_in_template)
def _create_new_template(self, template_data: dict) -> None:
"""Create a new pipeline template"""
# Get the next available position
position = self._get_next_position(template_data["language"])
# Add additional fields for new template
template_data.update({
"position": position,
"install_count": 0,
"copyright": dify_config.KNOWLEDGE_PIPELINE_TEMPLATE_COPYRIGHT,
"privacy_policy": dify_config.KNOWLEDGE_PIPELINE_TEMPLATE_PRIVACY_POLICY,
})
new_template = PipelineBuiltInTemplate(**template_data)
db.session.add(new_template)
def _get_next_position(self, language: str) -> int:
"""Get the next available position for a template in the specified language"""
max_position = (
db.session.query(func.max(PipelineBuiltInTemplate.position))
.filter(PipelineBuiltInTemplate.language == language)
.scalar()
)
return (max_position or 0) + 1

View File

@ -1,27 +0,0 @@
from core.rag.datasource.vdb.pinecone.pinecone_vector import PineconeConfig, PineconeVector
from tests.integration_tests.vdb.test_vector_store import (
AbstractVectorTest,
setup_mock_redis,
)
class PineconeVectorTest(AbstractVectorTest):
def __init__(self):
super().__init__()
self.attributes = ["doc_id", "dataset_id", "document_id", "doc_hash"]
self.vector = PineconeVector(
collection_name=self.collection_name,
group_id=self.dataset_id,
config=PineconeConfig(
api_key="test_api_key",
environment="test_environment",
index_name="test_index",
),
)
def search_by_vector(self):
super().search_by_vector()
def test_pinecone_vector():
PineconeVectorTest().run_all_tests()

49
api/uv.lock generated
View File

@ -1334,7 +1334,6 @@ dependencies = [
{ name = "packaging" },
{ name = "pandas", extra = ["excel", "output-formatting", "performance"] },
{ name = "pandoc" },
{ name = "pinecone" },
{ name = "psycogreen" },
{ name = "psycopg2-binary" },
{ name = "pycryptodome" },
@ -1526,10 +1525,9 @@ requires-dist = [
{ name = "opentelemetry-semantic-conventions", specifier = "==0.48b0" },
{ name = "opentelemetry-util-http", specifier = "==0.48b0" },
{ name = "opik", specifier = "~=1.7.25" },
{ name = "packaging", specifier = ">=24.2,<25.0" },
{ name = "packaging", specifier = "~=23.2" },
{ name = "pandas", extras = ["excel", "output-formatting", "performance"], specifier = "~=2.2.2" },
{ name = "pandoc", specifier = "~=2.4" },
{ name = "pinecone", specifier = ">=7.3.0" },
{ name = "psycogreen", specifier = "~=1.0.2" },
{ name = "psycopg2-binary", specifier = "~=2.9.6" },
{ name = "pycryptodome", specifier = "==3.19.1" },
@ -4163,11 +4161,11 @@ wheels = [
[[package]]
name = "packaging"
version = "24.2"
version = "23.2"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/d0/63/68dbb6eb2de9cb10ee4c9c14a0148804425e13c4fb20d61cce69f53106da/packaging-24.2.tar.gz", hash = "sha256:c228a6dc5e932d346bc5739379109d49e8853dd8223571c7c5b55260edc0b97f", size = 163950, upload-time = "2024-11-08T09:47:47.202Z" }
sdist = { url = "https://files.pythonhosted.org/packages/fb/2b/9b9c33ffed44ee921d0967086d653047286054117d584f1b1a7c22ceaf7b/packaging-23.2.tar.gz", hash = "sha256:048fb0e9405036518eaaf48a55953c750c11e1a1b68e0dd1a9d62ed0c092cfc5", size = 146714, upload-time = "2023-10-01T13:50:05.279Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/88/ef/eb23f262cca3c0c4eb7ab1933c3b1f03d021f2c48f54763065b6f0e321be/packaging-24.2-py3-none-any.whl", hash = "sha256:09abb1bccd265c01f4a3aa3f7a7db064b36514d2cba19a2f694fe6150451a759", size = 65451, upload-time = "2024-11-08T09:47:44.722Z" },
{ url = "https://files.pythonhosted.org/packages/ec/1a/610693ac4ee14fcdf2d9bf3c493370e4f2ef7ae2e19217d7a237ff42367d/packaging-23.2-py3-none-any.whl", hash = "sha256:8c491190033a9af7e1d931d0b5dacc2ef47509b34dd0de67ed209b5203fc88c7", size = 53011, upload-time = "2023-10-01T13:50:03.745Z" },
]
[[package]]
@ -4328,45 +4326,6 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/34/e7/ae39f538fd6844e982063c3a5e4598b8ced43b9633baa3a85ef33af8c05c/pillow-11.3.0-pp311-pypy311_pp73-win_amd64.whl", hash = "sha256:c84d689db21a1c397d001aa08241044aa2069e7587b398c8cc63020390b1c1b8", size = 6984598, upload-time = "2025-07-01T09:16:27.732Z" },
]
[[package]]
name = "pinecone"
version = "7.3.0"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "certifi" },
{ name = "pinecone-plugin-assistant" },
{ name = "pinecone-plugin-interface" },
{ name = "python-dateutil" },
{ name = "typing-extensions" },
{ name = "urllib3" },
]
sdist = { url = "https://files.pythonhosted.org/packages/fa/38/12731d4af470851b4963eba616605868a8599ef4df51c7b6c928e5f3166d/pinecone-7.3.0.tar.gz", hash = "sha256:307edc155621d487c20dc71b76c3ad5d6f799569ba42064190d03917954f9a7b", size = 235256, upload-time = "2025-06-27T20:03:51.498Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/b7/a6/c5d54a5fb1de3983a8739c1a1660e7a7074db2cbadfa875b823fcf29b629/pinecone-7.3.0-py3-none-any.whl", hash = "sha256:315b8fef20320bef723ecbb695dec0aafa75d8434d86e01e5a0e85933e1009a8", size = 587563, upload-time = "2025-06-27T20:03:50.249Z" },
]
[[package]]
name = "pinecone-plugin-assistant"
version = "1.8.0"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "packaging" },
{ name = "requests" },
]
sdist = { url = "https://files.pythonhosted.org/packages/b2/01/65c4c3a81732fa379f8e7f78a8c18aa57a1139f5b79d58b93a69f2fc8cb0/pinecone_plugin_assistant-1.8.0.tar.gz", hash = "sha256:8e8682cff30f9bae9243b384021aba71c91f4e6ef1650e9d63ee64aab83cba87", size = 150435, upload-time = "2025-08-31T14:31:18.046Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/dd/49/62ab8e2f9098bf8593e36bbe6e729fcc0500bafca7d88be7b62eac66c8b0/pinecone_plugin_assistant-1.8.0-py3-none-any.whl", hash = "sha256:71ae42c3b4478d23138cbc4fe3505db561319a826f5aff4ef2e306a25ac56686", size = 259281, upload-time = "2025-08-31T14:31:16.587Z" },
]
[[package]]
name = "pinecone-plugin-interface"
version = "0.0.7"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/f4/fb/e8a4063264953ead9e2b24d9b390152c60f042c951c47f4592e9996e57ff/pinecone_plugin_interface-0.0.7.tar.gz", hash = "sha256:b8e6675e41847333aa13923cc44daa3f85676d7157324682dc1640588a982846", size = 3370, upload-time = "2024-06-05T01:57:52.093Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/3b/1d/a21fdfcd6d022cb64cef5c2a29ee6691c6c103c4566b41646b080b7536a5/pinecone_plugin_interface-0.0.7-py3-none-any.whl", hash = "sha256:875857ad9c9fc8bbc074dbe780d187a2afd21f5bfe0f3b08601924a61ef1bba8", size = 6249, upload-time = "2024-06-05T01:57:50.583Z" },
]
[[package]]
name = "platformdirs"
version = "4.4.0"

View File

@ -708,16 +708,6 @@ CLICKZETTA_ANALYZER_TYPE=chinese
CLICKZETTA_ANALYZER_MODE=smart
CLICKZETTA_VECTOR_DISTANCE_FUNCTION=cosine_distance
# Pinecone configuration, only available when VECTOR_STORE is `pinecone`
PINECONE_API_KEY=your-pinecone-api-key
PINECONE_ENVIRONMENT=your-pinecone-environment
PINECONE_INDEX_NAME=dify-index
PINECONE_CLIENT_TIMEOUT=30
PINECONE_BATCH_SIZE=100
PINECONE_METRIC=cosine
PINECONE_PODS=1
PINECONE_POD_TYPE=s1
# ------------------------------
# Knowledge Configuration
# ------------------------------

View File

@ -339,14 +339,6 @@ x-shared-env: &shared-api-worker-env
CLICKZETTA_ANALYZER_TYPE: ${CLICKZETTA_ANALYZER_TYPE:-chinese}
CLICKZETTA_ANALYZER_MODE: ${CLICKZETTA_ANALYZER_MODE:-smart}
CLICKZETTA_VECTOR_DISTANCE_FUNCTION: ${CLICKZETTA_VECTOR_DISTANCE_FUNCTION:-cosine_distance}
PINECONE_API_KEY: ${PINECONE_API_KEY:-your-pinecone-api-key}
PINECONE_ENVIRONMENT: ${PINECONE_ENVIRONMENT:-your-pinecone-environment}
PINECONE_INDEX_NAME: ${PINECONE_INDEX_NAME:-dify-index}
PINECONE_CLIENT_TIMEOUT: ${PINECONE_CLIENT_TIMEOUT:-30}
PINECONE_BATCH_SIZE: ${PINECONE_BATCH_SIZE:-100}
PINECONE_METRIC: ${PINECONE_METRIC:-cosine}
PINECONE_PODS: ${PINECONE_PODS:-1}
PINECONE_POD_TYPE: ${PINECONE_POD_TYPE:-s1}
UPLOAD_FILE_SIZE_LIMIT: ${UPLOAD_FILE_SIZE_LIMIT:-15}
UPLOAD_FILE_BATCH_LIMIT: ${UPLOAD_FILE_BATCH_LIMIT:-5}
ETL_TYPE: ${ETL_TYPE:-dify}