Files
dify/api/extensions/logstore/aliyun_logstore_pg.py

273 lines
10 KiB
Python

import logging
import os
import socket
import time
from collections.abc import Sequence
from contextlib import contextmanager
from typing import Any
import psycopg2
from sqlalchemy import create_engine
from configs import dify_config
logger = logging.getLogger(__name__)
class AliyunLogStorePG:
"""PostgreSQL protocol support for Aliyun SLS LogStore using SQLAlchemy connection pool."""
def __init__(self, access_key_id: str, access_key_secret: str, endpoint: str, project_name: str):
"""
Initialize PG connection for SLS.
Args:
access_key_id: Aliyun access key ID
access_key_secret: Aliyun access key secret
endpoint: SLS endpoint
project_name: SLS project name
"""
self._access_key_id = access_key_id
self._access_key_secret = access_key_secret
self._endpoint = endpoint
self.project_name = project_name
self._engine: Any = None # SQLAlchemy Engine
self._use_pg_protocol = False
def _check_port_connectivity(self, host: str, port: int, timeout: float = 2.0) -> bool:
"""Fast TCP port check to avoid long waits on unsupported regions."""
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(timeout)
result = sock.connect_ex((host, port))
sock.close()
return result == 0
except Exception as e:
logger.debug("Port connectivity check failed for %s:%d: %s", host, port, str(e))
return False
def init_connection(self) -> bool:
"""Initialize SQLAlchemy connection pool with pool_recycle and TCP keepalive support."""
try:
pg_host = self._endpoint.replace("http://", "").replace("https://", "")
# Pool configuration
pool_size = int(os.environ.get("ALIYUN_SLS_PG_POOL_SIZE", 5))
max_overflow = int(os.environ.get("ALIYUN_SLS_PG_MAX_OVERFLOW", 5))
pool_recycle = int(os.environ.get("ALIYUN_SLS_PG_POOL_RECYCLE", 3600))
pool_pre_ping = os.environ.get("ALIYUN_SLS_PG_POOL_PRE_PING", "false").lower() == "true"
logger.debug("Check PG protocol connection to SLS: host=%s, project=%s", pg_host, self.project_name)
# Fast port check to avoid long waits
if not self._check_port_connectivity(pg_host, 5432, timeout=1.0):
logger.debug("Using SDK mode for host=%s", pg_host)
return False
# Build connection URL
from urllib.parse import quote_plus
username = quote_plus(self._access_key_id)
password = quote_plus(self._access_key_secret)
database_url = (
f"postgresql+psycopg2://{username}:{password}@{pg_host}:5432/{self.project_name}?sslmode=require"
)
# Create SQLAlchemy engine with connection pool
self._engine = create_engine(
database_url,
pool_size=pool_size,
max_overflow=max_overflow,
pool_recycle=pool_recycle,
pool_pre_ping=pool_pre_ping,
pool_timeout=30,
connect_args={
"connect_timeout": 5,
"application_name": f"Dify-{dify_config.project.version}-fixautocommit",
"keepalives": 1,
"keepalives_idle": 60,
"keepalives_interval": 10,
"keepalives_count": 5,
},
)
self._use_pg_protocol = True
logger.info(
"PG protocol initialized for SLS project=%s (pool_size=%d, pool_recycle=%ds)",
self.project_name,
pool_size,
pool_recycle,
)
return True
except Exception as e:
self._use_pg_protocol = False
if self._engine:
try:
self._engine.dispose()
except Exception:
logger.debug("Failed to dispose engine during cleanup, ignoring")
self._engine = None
logger.debug("Using SDK mode for region: %s", str(e))
return False
@contextmanager
def _get_connection(self):
"""Get connection from SQLAlchemy pool. Pool handles recycle, invalidation, and keepalive automatically."""
if not self._engine:
raise RuntimeError("SQLAlchemy engine is not initialized")
connection = self._engine.raw_connection()
try:
connection.autocommit = True # SLS PG protocol does not support transactions
yield connection
except Exception:
raise
finally:
connection.close()
def close(self) -> None:
"""Dispose SQLAlchemy engine and close all connections."""
if self._engine:
try:
self._engine.dispose()
logger.info("SQLAlchemy engine disposed")
except Exception:
logger.exception("Failed to dispose engine")
def _is_retriable_error(self, error: Exception) -> bool:
"""Check if error is retriable (connection-related issues)."""
# Check for psycopg2 connection errors directly
if isinstance(error, (psycopg2.OperationalError, psycopg2.InterfaceError)):
return True
error_msg = str(error).lower()
retriable_patterns = [
"connection",
"timeout",
"closed",
"broken pipe",
"reset by peer",
"no route to host",
"network",
"operational error",
"interface error",
]
return any(pattern in error_msg for pattern in retriable_patterns)
def put_log(self, logstore: str, contents: Sequence[tuple[str, str]], log_enabled: bool = False) -> None:
"""Write log to SLS using INSERT with automatic retry (3 attempts with exponential backoff)."""
if not contents:
return
fields = [field_name for field_name, _ in contents]
values = [value for _, value in contents]
field_list = ", ".join([f'"{field}"' for field in fields])
if log_enabled:
logger.info(
"[LogStore-PG] PUT_LOG | logstore=%s | project=%s | items_count=%d",
logstore,
self.project_name,
len(contents),
)
max_retries = 3
retry_delay = 0.1
for attempt in range(max_retries):
try:
with self._get_connection() as conn:
with conn.cursor() as cursor:
placeholders = ", ".join(["%s"] * len(fields))
values_literal = cursor.mogrify(f"({placeholders})", values).decode("utf-8")
insert_sql = f'INSERT INTO "{logstore}" ({field_list}) VALUES {values_literal}'
cursor.execute(insert_sql)
return
except psycopg2.Error as e:
if not self._is_retriable_error(e):
logger.exception("Failed to put logs to logstore %s (non-retriable error)", logstore)
raise
if attempt < max_retries - 1:
logger.warning(
"Failed to put logs to logstore %s (attempt %d/%d): %s. Retrying...",
logstore,
attempt + 1,
max_retries,
str(e),
)
time.sleep(retry_delay)
retry_delay *= 2
else:
logger.exception("Failed to put logs to logstore %s after %d attempts", logstore, max_retries)
raise
def execute_sql(self, sql: str, logstore: str, log_enabled: bool = False) -> list[dict[str, Any]]:
"""Execute SQL query with automatic retry (3 attempts with exponential backoff)."""
if log_enabled:
logger.info(
"[LogStore-PG] EXECUTE_SQL | logstore=%s | project=%s | sql=%s",
logstore,
self.project_name,
sql,
)
max_retries = 3
retry_delay = 0.1
for attempt in range(max_retries):
try:
with self._get_connection() as conn:
with conn.cursor() as cursor:
cursor.execute(sql)
columns = [desc[0] for desc in cursor.description]
result = []
for row in cursor.fetchall():
row_dict = {}
for col, val in zip(columns, row):
row_dict[col] = "" if val is None else str(val)
result.append(row_dict)
if log_enabled:
logger.info(
"[LogStore-PG] EXECUTE_SQL RESULT | logstore=%s | returned_count=%d",
logstore,
len(result),
)
return result
except psycopg2.Error as e:
if not self._is_retriable_error(e):
logger.exception(
"Failed to execute SQL on logstore %s (non-retriable error): sql=%s",
logstore,
sql,
)
raise
if attempt < max_retries - 1:
logger.warning(
"Failed to execute SQL on logstore %s (attempt %d/%d): %s. Retrying...",
logstore,
attempt + 1,
max_retries,
str(e),
)
time.sleep(retry_delay)
retry_delay *= 2
else:
logger.exception(
"Failed to execute SQL on logstore %s after %d attempts: sql=%s",
logstore,
max_retries,
sql,
)
raise
return []