mirror of
https://github.com/langgenius/dify.git
synced 2026-04-19 18:27:27 +08:00
fix endpoint get struct
This commit is contained in:
@ -27,21 +27,29 @@ class TidbService:
|
||||
def fetch_qdrant_endpoint(api_url: str, public_key: str, private_key: str, cluster_id: str) -> str | None:
|
||||
"""Fetch the qdrant endpoint for a cluster by calling the Get Cluster API.
|
||||
|
||||
The Get Cluster response contains ``status.connection_strings.standard.host``
|
||||
(e.g. ``gateway01.xx.tidbcloud.com``). We prepend ``qdrant-`` and wrap it
|
||||
as an ``https://`` URL.
|
||||
The v1beta1 serverless Get Cluster response contains
|
||||
``endpoints.public.host`` (e.g. ``gateway01.xx.tidbcloud.com``).
|
||||
We prepend ``qdrant-`` and wrap it as an ``https://`` URL.
|
||||
"""
|
||||
try:
|
||||
logger.info("Fetching qdrant endpoint for cluster %s", cluster_id)
|
||||
cluster_response = TidbService.get_tidb_serverless_cluster(api_url, public_key, private_key, cluster_id)
|
||||
if not cluster_response:
|
||||
logger.warning("Empty response from Get Cluster API for cluster %s", cluster_id)
|
||||
return None
|
||||
# v1beta: status.connection_strings.standard.host
|
||||
status = cluster_response.get("status") or {}
|
||||
connection_strings = status.get("connection_strings") or {}
|
||||
standard = connection_strings.get("standard") or {}
|
||||
host = standard.get("host")
|
||||
# v1beta1 serverless: endpoints.public.host
|
||||
endpoints = cluster_response.get("endpoints") or {}
|
||||
public = endpoints.get("public") or {}
|
||||
host = public.get("host")
|
||||
if host:
|
||||
return f"https://qdrant-{host}"
|
||||
qdrant_url = f"https://qdrant-{host}"
|
||||
logger.info("Resolved qdrant endpoint for cluster %s: %s", cluster_id, qdrant_url)
|
||||
return qdrant_url
|
||||
logger.warning(
|
||||
"No endpoints.public.host found for cluster %s, response keys: %s",
|
||||
cluster_id,
|
||||
list(cluster_response.keys()),
|
||||
)
|
||||
except Exception:
|
||||
logger.exception("Failed to fetch qdrant endpoint for cluster %s", cluster_id)
|
||||
return None
|
||||
@ -83,6 +91,7 @@ class TidbService:
|
||||
"rootPassword": password,
|
||||
}
|
||||
|
||||
logger.info("Creating TiDB serverless cluster: display_name=%s, region=%s", display_name, region)
|
||||
response = _tidb_http_client.post(
|
||||
f"{api_url}/clusters", json=cluster_data, auth=DigestAuth(public_key, private_key)
|
||||
)
|
||||
@ -90,6 +99,7 @@ class TidbService:
|
||||
if response.status_code == 200:
|
||||
response_data = response.json()
|
||||
cluster_id = response_data["clusterId"]
|
||||
logger.info("Cluster created, cluster_id=%s, waiting for ACTIVE state", cluster_id)
|
||||
retry_count = 0
|
||||
max_retries = 30
|
||||
while retry_count < max_retries:
|
||||
@ -97,6 +107,10 @@ class TidbService:
|
||||
if cluster_response["state"] == "ACTIVE":
|
||||
user_prefix = cluster_response["userPrefix"]
|
||||
qdrant_endpoint = TidbService.fetch_qdrant_endpoint(api_url, public_key, private_key, cluster_id)
|
||||
logger.info(
|
||||
"Cluster %s is ACTIVE, user_prefix=%s, qdrant_endpoint=%s",
|
||||
cluster_id, user_prefix, qdrant_endpoint,
|
||||
)
|
||||
return {
|
||||
"cluster_id": cluster_id,
|
||||
"cluster_name": display_name,
|
||||
@ -104,9 +118,12 @@ class TidbService:
|
||||
"password": password,
|
||||
"qdrant_endpoint": qdrant_endpoint,
|
||||
}
|
||||
time.sleep(30) # wait 30 seconds before retrying
|
||||
logger.info("Cluster %s state=%s, retry %d/%d", cluster_id, cluster_response["state"], retry_count + 1, max_retries)
|
||||
time.sleep(30)
|
||||
retry_count += 1
|
||||
logger.error("Cluster %s did not become ACTIVE after %d retries", cluster_id, max_retries)
|
||||
else:
|
||||
logger.error("Failed to create cluster: status=%d, body=%s", response.status_code, response.text)
|
||||
response.raise_for_status()
|
||||
|
||||
@staticmethod
|
||||
@ -271,22 +288,30 @@ class TidbService:
|
||||
if response.status_code == 200:
|
||||
response_data = response.json()
|
||||
cluster_infos = []
|
||||
logger.info("Batch created %d clusters", len(response_data.get("clusters", [])))
|
||||
for item in response_data["clusters"]:
|
||||
cache_key = f"tidb_serverless_cluster_password:{item['displayName']}"
|
||||
cached_password = redis_client.get(cache_key)
|
||||
if not cached_password:
|
||||
logger.warning("No cached password for cluster %s, skipping", item["displayName"])
|
||||
continue
|
||||
qdrant_endpoint = TidbService.fetch_qdrant_endpoint(
|
||||
api_url, public_key, private_key, item["clusterId"]
|
||||
)
|
||||
logger.info(
|
||||
"Batch cluster %s: qdrant_endpoint=%s",
|
||||
item["clusterId"], qdrant_endpoint,
|
||||
)
|
||||
cluster_info = {
|
||||
"cluster_id": item["clusterId"],
|
||||
"cluster_name": item["displayName"],
|
||||
"account": "root",
|
||||
"password": cached_password.decode("utf-8"),
|
||||
"qdrant_endpoint": TidbService.fetch_qdrant_endpoint(
|
||||
api_url, public_key, private_key, item["clusterId"]
|
||||
),
|
||||
"qdrant_endpoint": qdrant_endpoint,
|
||||
}
|
||||
cluster_infos.append(cluster_info)
|
||||
return cluster_infos
|
||||
else:
|
||||
logger.error("Batch create failed: status=%d, body=%s", response.status_code, response.text)
|
||||
response.raise_for_status()
|
||||
return []
|
||||
|
||||
@ -1,4 +1,5 @@
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import uuid
|
||||
from collections.abc import Generator, Iterable, Sequence
|
||||
@ -7,6 +8,8 @@ from typing import TYPE_CHECKING, Any
|
||||
|
||||
import httpx
|
||||
import qdrant_client
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
from flask import current_app
|
||||
from httpx import DigestAuth
|
||||
from pydantic import BaseModel
|
||||
@ -421,13 +424,16 @@ class TidbOnQdrantVector(BaseVector):
|
||||
|
||||
class TidbOnQdrantVectorFactory(AbstractVectorFactory):
|
||||
def init_vector(self, dataset: Dataset, attributes: list, embeddings: Embeddings) -> TidbOnQdrantVector:
|
||||
logger.info("init_vector: tenant_id=%s, dataset_id=%s", dataset.tenant_id, dataset.id)
|
||||
stmt = select(TidbAuthBinding).where(TidbAuthBinding.tenant_id == dataset.tenant_id)
|
||||
tidb_auth_binding = db.session.scalars(stmt).one_or_none()
|
||||
if not tidb_auth_binding:
|
||||
logger.info("No existing TidbAuthBinding for tenant %s, acquiring lock", dataset.tenant_id)
|
||||
with redis_client.lock("create_tidb_serverless_cluster_lock", timeout=900):
|
||||
stmt = select(TidbAuthBinding).where(TidbAuthBinding.tenant_id == dataset.tenant_id)
|
||||
tidb_auth_binding = db.session.scalars(stmt).one_or_none()
|
||||
if tidb_auth_binding:
|
||||
logger.info("Found binding after lock: cluster_id=%s", tidb_auth_binding.cluster_id)
|
||||
TIDB_ON_QDRANT_API_KEY = f"{tidb_auth_binding.account}:{tidb_auth_binding.password}"
|
||||
|
||||
else:
|
||||
@ -437,12 +443,17 @@ class TidbOnQdrantVectorFactory(AbstractVectorFactory):
|
||||
.limit(1)
|
||||
)
|
||||
if idle_tidb_auth_binding:
|
||||
logger.info(
|
||||
"Assigning idle cluster %s to tenant %s",
|
||||
idle_tidb_auth_binding.cluster_id, dataset.tenant_id,
|
||||
)
|
||||
idle_tidb_auth_binding.active = True
|
||||
idle_tidb_auth_binding.tenant_id = dataset.tenant_id
|
||||
db.session.commit()
|
||||
tidb_auth_binding = idle_tidb_auth_binding
|
||||
TIDB_ON_QDRANT_API_KEY = f"{idle_tidb_auth_binding.account}:{idle_tidb_auth_binding.password}"
|
||||
else:
|
||||
logger.info("No idle clusters available, creating new cluster for tenant %s", dataset.tenant_id)
|
||||
new_cluster = TidbService.create_tidb_serverless_cluster(
|
||||
dify_config.TIDB_PROJECT_ID or "",
|
||||
dify_config.TIDB_API_URL or "",
|
||||
@ -451,6 +462,10 @@ class TidbOnQdrantVectorFactory(AbstractVectorFactory):
|
||||
dify_config.TIDB_PRIVATE_KEY or "",
|
||||
dify_config.TIDB_REGION or "",
|
||||
)
|
||||
logger.info(
|
||||
"New cluster created: cluster_id=%s, qdrant_endpoint=%s",
|
||||
new_cluster["cluster_id"], new_cluster.get("qdrant_endpoint"),
|
||||
)
|
||||
new_tidb_auth_binding = TidbAuthBinding(
|
||||
cluster_id=new_cluster["cluster_id"],
|
||||
cluster_name=new_cluster["cluster_name"],
|
||||
@ -466,11 +481,18 @@ class TidbOnQdrantVectorFactory(AbstractVectorFactory):
|
||||
tidb_auth_binding = new_tidb_auth_binding
|
||||
TIDB_ON_QDRANT_API_KEY = f"{new_tidb_auth_binding.account}:{new_tidb_auth_binding.password}"
|
||||
else:
|
||||
logger.info("Existing binding found: cluster_id=%s", tidb_auth_binding.cluster_id)
|
||||
TIDB_ON_QDRANT_API_KEY = f"{tidb_auth_binding.account}:{tidb_auth_binding.password}"
|
||||
|
||||
qdrant_url = (
|
||||
(tidb_auth_binding.qdrant_endpoint if tidb_auth_binding else None) or dify_config.TIDB_ON_QDRANT_URL or ""
|
||||
)
|
||||
logger.info(
|
||||
"Using qdrant endpoint: %s (from_binding=%s, fallback_global=%s)",
|
||||
qdrant_url,
|
||||
tidb_auth_binding.qdrant_endpoint if tidb_auth_binding else None,
|
||||
dify_config.TIDB_ON_QDRANT_URL,
|
||||
)
|
||||
|
||||
if dataset.index_struct_dict:
|
||||
class_prefix: str = dataset.index_struct_dict["vector_store"]["class_prefix"]
|
||||
|
||||
Reference in New Issue
Block a user