Compare commits

..

2 Commits

Author SHA1 Message Date
b7973bd349 [autofix.ci] apply automated fixes 2026-05-30 02:56:43 +00:00
9eb6a8d294 feat(api): add MCP user-identity forwarding (M2)
When an MCP provider has forward_user_identity enabled, MCPTool now asks
dify-enterprise to mint a short-lived per-user SSO id_token (via the M1
/inner/api/mcp/issue-token endpoint) and stamps it as the Authorization
Bearer on every outbound MCP request — so an MCP server can act on behalf
of the verified end user instead of seeing only "Dify is calling."

- Adds forward_user_identity (bool) + identity_mode ("off" | "idp_token")
  to tool_mcp_providers, plumbed through MCPProviderEntity, the controller,
  service-layer CRUD, and the tool/provider runtime.
- EnterpriseService.issue_mcp_token wraps the enterprise endpoint and maps
  428 → MCPNoRefreshTokenError, 401 → MCPIdentityRefreshError so workflows
  halt with a clear "please re-authenticate" instead of silently going
  anonymous.
- Identity_mode is intentionally an enum-shaped string column so future
  modes (e.g. RFC 8693 token exchange) land without UI/DB churn.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-29 19:53:12 -07:00
15 changed files with 283 additions and 15 deletions

0
.codex Normal file
View File

View File

@ -1,8 +0,0 @@
# THIS IS AUTOGENERATED. DO NOT EDIT MANUALLY
version = 1
name = "dify"
[setup]
script = '''
pnpm install --frozen-lockfile --prefer-offline
'''

View File

@ -209,6 +209,11 @@ class MCPProviderBasePayload(BaseModel):
configuration: dict[str, Any] | None = Field(default_factory=dict)
headers: dict[str, Any] | None = Field(default_factory=dict)
authentication: dict[str, Any] | None = Field(default_factory=dict)
# M3 — user-identity forwarding (M2 backend already supports these on the
# service layer). Defaults preserve pre-M3 behavior for clients that don't
# send the fields yet.
forward_user_identity: bool = False
identity_mode: Literal["off", "idp_token"] = "off"
class MCPProviderCreatePayload(MCPProviderBasePayload):
@ -985,6 +990,8 @@ class ToolProviderMCPApi(Resource):
headers=payload.headers or {},
configuration=configuration,
authentication=authentication,
forward_user_identity=payload.forward_user_identity,
identity_mode=payload.identity_mode,
)
# 2) Try to fetch tools immediately after creation so they appear without a second save.
@ -1052,6 +1059,8 @@ class ToolProviderMCPApi(Resource):
configuration=configuration,
authentication=authentication,
validation_result=validation_result,
forward_user_identity=payload.forward_user_identity,
identity_mode=payload.identity_mode,
)
return {"result": "success"}

View File

@ -3,7 +3,7 @@ from __future__ import annotations
import json
from datetime import datetime
from enum import StrEnum
from typing import TYPE_CHECKING, Any
from typing import TYPE_CHECKING, Any, Literal
from urllib.parse import urlparse
from pydantic import BaseModel
@ -76,6 +76,14 @@ class MCPProviderEntity(BaseModel):
created_at: datetime
updated_at: datetime
# M2 — user-identity forwarding. When forward_user_identity is True AND
# identity_mode is "idp_token", the MCP tool runtime asks dify-enterprise
# to mint a fresh SSO id_token for the calling user and stamps it on the
# outbound MCP request as `Authorization: Bearer <token>`. Defaults keep
# pre-M2 providers unchanged (no forwarding).
forward_user_identity: bool = False
identity_mode: Literal["off", "idp_token"] = "off"
@classmethod
def from_db_model(cls, db_provider: MCPToolProvider) -> MCPProviderEntity:
"""Create entity from database model with decryption"""
@ -96,6 +104,8 @@ class MCPProviderEntity(BaseModel):
icon=db_provider.icon or "",
created_at=db_provider.created_at,
updated_at=db_provider.updated_at,
forward_user_identity=db_provider.forward_user_identity,
identity_mode=db_provider.identity_mode, # type: ignore[arg-type]
)
@property
@ -170,6 +180,8 @@ class MCPProviderEntity(BaseModel):
"updated_at": int(self.updated_at.timestamp()),
"label": I18nObject(en_US=self.name, zh_Hans=self.name).to_dict(),
"description": I18nObject(en_US="", zh_Hans="").to_dict(),
"forward_user_identity": self.forward_user_identity,
"identity_mode": self.identity_mode,
}
# Add configuration

View File

@ -54,6 +54,12 @@ class ToolProviderApiEntity(BaseModel):
configuration: MCPConfiguration | None = Field(
default=None, description="The timeout and sse_read_timeout of the MCP tool"
)
# M3 — user-identity forwarding flags. Round-tripped through the console
# API so the create/edit modal can hydrate the toggle state.
forward_user_identity: bool = Field(
default=False, description="Whether Dify forwards the calling user's SSO identity to this MCP server"
)
identity_mode: str = Field(default="off", description="Identity-forwarding mechanism: 'off' or 'idp_token'")
# Workflow
workflow_app_id: str | None = Field(default=None, description="The app id of the workflow tool")
@ -92,6 +98,10 @@ class ToolProviderApiEntity(BaseModel):
optional_fields.update(self.optional_field("is_dynamic_registration", self.is_dynamic_registration))
optional_fields.update(self.optional_field("masked_headers", self.masked_headers))
optional_fields.update(self.optional_field("original_headers", self.original_headers))
# M3 — forwarding flags. Always emit (False/"off" are valid
# values that the UI must hydrate, not skip).
optional_fields["forward_user_identity"] = self.forward_user_identity
optional_fields["identity_mode"] = self.identity_mode
case ToolProviderType.WORKFLOW:
optional_fields.update(self.optional_field("workflow_app_id", self.workflow_app_id))
case _:

View File

@ -1,4 +1,4 @@
from typing import Any, Self
from typing import Any, Literal, Self
from core.entities.mcp_provider import MCPProviderEntity
from core.mcp.types import Tool as RemoteMCPTool
@ -28,6 +28,8 @@ class MCPToolProviderController(ToolProviderController):
headers: dict[str, str] | None = None,
timeout: float | None = None,
sse_read_timeout: float | None = None,
forward_user_identity: bool = False,
identity_mode: Literal["off", "idp_token"] = "off",
):
super().__init__(entity)
self.entity: ToolProviderEntityWithPlugin = entity
@ -37,6 +39,8 @@ class MCPToolProviderController(ToolProviderController):
self.headers = headers or {}
self.timeout = timeout
self.sse_read_timeout = sse_read_timeout
self.forward_user_identity = forward_user_identity
self.identity_mode: Literal["off", "idp_token"] = identity_mode
@property
def provider_type(self) -> ToolProviderType:
@ -105,6 +109,8 @@ class MCPToolProviderController(ToolProviderController):
headers=entity.headers,
timeout=entity.timeout,
sse_read_timeout=entity.sse_read_timeout,
forward_user_identity=entity.forward_user_identity,
identity_mode=entity.identity_mode,
)
def _validate_credentials(self, user_id: str, credentials: dict[str, Any]):
@ -134,6 +140,8 @@ class MCPToolProviderController(ToolProviderController):
headers=self.headers,
timeout=self.timeout,
sse_read_timeout=self.sse_read_timeout,
forward_user_identity=self.forward_user_identity,
identity_mode=self.identity_mode,
)
def get_tools(self) -> list[MCPTool]:
@ -151,6 +159,8 @@ class MCPToolProviderController(ToolProviderController):
headers=self.headers,
timeout=self.timeout,
sse_read_timeout=self.sse_read_timeout,
forward_user_identity=self.forward_user_identity,
identity_mode=self.identity_mode,
)
for tool_entity in self.entity.tools
]

View File

@ -4,7 +4,7 @@ import base64
import json
import logging
from collections.abc import Generator, Mapping
from typing import Any, cast
from typing import Any, Literal, cast
from core.mcp.auth_client import MCPClientWithAuthRetry
from core.mcp.error import MCPConnectionError
@ -38,6 +38,8 @@ class MCPTool(Tool):
headers: dict[str, str] | None = None,
timeout: float | None = None,
sse_read_timeout: float | None = None,
forward_user_identity: bool = False,
identity_mode: Literal["off", "idp_token"] = "off",
):
super().__init__(entity, runtime)
self.tenant_id = tenant_id
@ -47,6 +49,8 @@ class MCPTool(Tool):
self.headers = headers or {}
self.timeout = timeout
self.sse_read_timeout = sse_read_timeout
self.forward_user_identity = forward_user_identity
self.identity_mode: Literal["off", "idp_token"] = identity_mode
self._latest_usage = LLMUsage.empty_usage()
def tool_provider_type(self) -> ToolProviderType:
@ -60,7 +64,7 @@ class MCPTool(Tool):
app_id: str | None = None,
message_id: str | None = None,
) -> Generator[ToolInvokeMessage, None, None]:
result = self.invoke_remote_mcp_tool(tool_parameters)
result = self.invoke_remote_mcp_tool(tool_parameters, user_id=user_id, app_id=app_id)
# Extract usage metadata from MCP protocol's _meta field
self._latest_usage = self._derive_usage_from_result(result)
@ -234,6 +238,8 @@ class MCPTool(Tool):
headers=self.headers,
timeout=self.timeout,
sse_read_timeout=self.sse_read_timeout,
forward_user_identity=self.forward_user_identity,
identity_mode=self.identity_mode,
)
def _handle_none_parameter(self, parameter: dict[str, Any]) -> dict[str, Any]:
@ -246,7 +252,12 @@ class MCPTool(Tool):
if value is not None and not (isinstance(value, str) and value.strip() == "")
}
def invoke_remote_mcp_tool(self, tool_parameters: dict[str, Any]) -> CallToolResult:
def invoke_remote_mcp_tool(
self,
tool_parameters: dict[str, Any],
user_id: str | None = None,
app_id: str | None = None,
) -> CallToolResult:
headers = self.headers.copy() if self.headers else {}
tool_parameters = self._handle_none_parameter(tool_parameters)
@ -271,6 +282,14 @@ class MCPTool(Tool):
if tokens and tokens.access_token:
headers["Authorization"] = f"{tokens.token_type.capitalize()} {tokens.access_token}"
# User-identity forwarding: if enabled on this provider, ask the
# enterprise side to mint a fresh SSO id_token (audience-scoped to
# the MCP server's URL per RFC 8707) and stamp it as Authorization.
# This OVERRIDES any Authorization already on the request — the
# forwarded identity is what the MCP server should trust.
if self.forward_user_identity and self.identity_mode == "idp_token" and user_id:
self._inject_forwarded_identity(headers, user_id=user_id, app_id=app_id, audience=server_url)
# Step 2: Session is now closed, perform network operations without holding database connection
# MCPClientWithAuthRetry will create a new session lazily only if auth retry is needed
try:
@ -286,3 +305,31 @@ class MCPTool(Tool):
raise ToolInvokeError(f"Failed to connect to MCP server: {e}") from e
except Exception as e:
raise ToolInvokeError(f"Failed to invoke tool: {e}") from e
def _inject_forwarded_identity(
self,
headers: dict[str, str],
*,
user_id: str,
app_id: str | None,
audience: str,
) -> None:
"""Call the enterprise IssueMCPToken endpoint and stamp Authorization.
Errors are surfaced as ToolInvokeError so the workflow halts with a
clear message instead of silently dropping identity and hitting the
MCP server unauthenticated.
"""
from services.enterprise.base import MCPTokenError
from services.enterprise.enterprise_service import EnterpriseService
try:
token, _expires_at = EnterpriseService.issue_mcp_token(
user_id=user_id,
tenant_id=self.tenant_id,
app_id=app_id,
audience=audience,
)
except MCPTokenError as e:
raise ToolInvokeError(f"Failed to obtain forwarded identity token: {e}") from e
headers["Authorization"] = f"Bearer {token}"

View File

@ -0,0 +1,56 @@
"""add identity mode to mcp tool provider
Revision ID: 3df4dbcc1e21
Revises: 7885bd53f9a9
Create Date: 2026-05-29 15:00:00.000000
Adds two columns to `tool_mcp_providers` that drive the M2 MCP user-identity
forwarding feature:
* `forward_user_identity` (bool, default false) — master switch per provider.
* `identity_mode` (string, default "off") — which forwarding mechanism to use:
"off" — no header forwarded (default; pre-M2 behaviour).
"idp_token" — call dify-enterprise /inner/api/mcp/issue-token, stamp
the returned id_token on the outbound MCP request as
`Authorization: Bearer <token>`.
The columns are filled with safe defaults for existing rows so older providers
keep their current behaviour (no identity forwarding) until an admin opts in.
"""
import sqlalchemy as sa
from alembic import op
import models as models
# revision identifiers, used by Alembic.
revision = "3df4dbcc1e21"
down_revision = "7885bd53f9a9"
branch_labels = None
depends_on = None
def upgrade():
op.add_column(
"tool_mcp_providers",
sa.Column(
"forward_user_identity",
sa.Boolean(),
nullable=False,
server_default=sa.text("false"),
),
)
op.add_column(
"tool_mcp_providers",
sa.Column(
"identity_mode",
sa.String(length=32),
nullable=False,
server_default=sa.text("'off'"),
),
)
def downgrade():
op.drop_column("tool_mcp_providers", "identity_mode")
op.drop_column("tool_mcp_providers", "forward_user_identity")

View File

@ -343,6 +343,21 @@ class MCPToolProvider(TypeBase):
# encrypted headers for MCP server requests
encrypted_headers: Mapped[str | None] = mapped_column(LongText, nullable=True, default=None)
# M2 (MCP user-identity forwarding) — master switch per provider. When True
# AND identity_mode is "idp_token", workflows that invoke tools on this
# provider will have the caller's SSO id_token stamped on the outbound
# request as `Authorization: Bearer …`. Off by default so existing
# providers retain pre-M2 behaviour.
forward_user_identity: Mapped[bool] = mapped_column(
sa.Boolean, nullable=False, server_default=sa.text("false"), default=False
)
# M2 — which identity-forwarding mechanism to use. Reserved values:
# "off" — no forwarding (default).
# "idp_token" — forward a Bearer id_token minted by dify-enterprise.
identity_mode: Mapped[str] = mapped_column(
sa.String(32), nullable=False, server_default=sa.text("'off'"), default="off"
)
def load_user(self) -> Account | None:
return db.session.scalar(select(Account).where(Account.id == self.user_id))

View File

@ -13761,10 +13761,12 @@ Enum class for large language model mode.
| ---- | ---- | ----------- | -------- |
| authentication | object | | No |
| configuration | object | | No |
| forward_user_identity | boolean | | No |
| headers | object | | No |
| icon | string | | Yes |
| icon_background | string | | No |
| icon_type | string | | Yes |
| identity_mode | string | *Enum:* `"idp_token"`, `"off"` | No |
| name | string | | Yes |
| server_identifier | string | | Yes |
| server_url | string | | Yes |
@ -13781,10 +13783,12 @@ Enum class for large language model mode.
| ---- | ---- | ----------- | -------- |
| authentication | object | | No |
| configuration | object | | No |
| forward_user_identity | boolean | | No |
| headers | object | | No |
| icon | string | | Yes |
| icon_background | string | | No |
| icon_type | string | | Yes |
| identity_mode | string | *Enum:* `"idp_token"`, `"off"` | No |
| name | string | | Yes |
| provider_id | string | | Yes |
| server_identifier | string | | Yes |

View File

@ -12,8 +12,34 @@ from services.errors.enterprise import (
EnterpriseAPIForbiddenError,
EnterpriseAPINotFoundError,
EnterpriseAPIUnauthorizedError,
EnterpriseServiceError,
)
# M2 — IssueMCPToken specific errors. Co-located here (rather than in
# services/errors/enterprise.py) because services.enterprise.base is part of
# the leaf-mounted file set the local dev override applies; the errors module
# stays at the EE image's baked-in version.
class MCPTokenError(EnterpriseServiceError):
"""Generic failure of the IssueMCPToken RPC."""
class MCPNoRefreshTokenError(MCPTokenError):
"""The user has no stored SSO refresh_token on the enterprise side.
The workflow should ask them to re-authenticate."""
def __init__(self, description: str = ""):
super().__init__(description, status_code=428)
class MCPIdentityRefreshError(MCPTokenError):
"""The enterprise side tried to refresh the user's SSO refresh_token
against the IdP and failed (revoked/expired/IdP error)."""
def __init__(self, description: str = ""):
super().__init__(description, status_code=401)
logger = logging.getLogger(__name__)

View File

@ -11,7 +11,16 @@ from pydantic import BaseModel, ConfigDict, Field, model_validator
from configs import dify_config
from extensions.ext_redis import redis_client
from services.enterprise.base import EnterpriseRequest
from services.enterprise.base import (
EnterpriseRequest,
MCPIdentityRefreshError,
MCPNoRefreshTokenError,
MCPTokenError,
)
from services.errors.enterprise import (
EnterpriseAPIError,
EnterpriseAPIUnauthorizedError,
)
if TYPE_CHECKING:
from services.feature_service import LicenseStatus
@ -121,6 +130,62 @@ class EnterpriseService:
def get_workspace_info(cls, tenant_id: str):
return EnterpriseRequest.send_request("GET", f"/workspace/{tenant_id}/info")
@classmethod
def issue_mcp_token(
cls,
user_id: str,
tenant_id: str,
app_id: str | None,
audience: str,
) -> tuple[str, int]:
"""Mint a short-lived SSO id_token (or OAuth2 access_token) representing
the calling Dify user, audience-scoped to the given MCP server identifier.
Used by MCPTool.invoke_remote_mcp_tool to stamp `Authorization: Bearer
<token>` on outbound MCP requests when the provider has
forward_user_identity=True and identity_mode="idp_token".
Returns:
(token, expires_at_unix_seconds)
Raises:
MCPNoRefreshTokenError: user has no stored SSO refresh_token on the
enterprise side; surface to the workflow as "please log in via SSO".
MCPIdentityRefreshError: enterprise tried to refresh against the IdP
and the IdP rejected (revoked/expired session).
MCPTokenError: any other failure of the enterprise endpoint.
"""
try:
response = EnterpriseRequest.send_request(
"POST",
"/mcp/issue-token",
json={
"user_id": user_id,
"tenant_id": tenant_id,
"app_id": app_id or "",
"audience": audience,
},
)
except EnterpriseAPIUnauthorizedError as e:
# Enterprise side returns 401 when the IdP rejected the refresh.
raise MCPIdentityRefreshError(str(e) or "identity refresh failed; please re-authenticate") from e
except EnterpriseAPIError as e:
# Map the 428 PreconditionRequired we emit on no-stored-refresh-token.
if getattr(e, "status_code", None) == 428:
raise MCPNoRefreshTokenError(
str(e) or "user has no stored SSO refresh token; please re-authenticate"
) from e
raise MCPTokenError(f"issue_mcp_token failed: {e}") from e
if not isinstance(response, dict):
raise MCPTokenError("invalid response shape from enterprise /mcp/issue-token")
token = response.get("token")
expires_at = response.get("expires_at")
if not token or not isinstance(token, str) or not isinstance(expires_at, int):
raise MCPTokenError(f"missing token/expires_at in enterprise response: {response}")
return token, expires_at
@classmethod
def initiate_device_flow_sso(cls, signed_state: str) -> dict:
return EnterpriseRequest.send_request(

View File

@ -4,7 +4,7 @@ import logging
from collections.abc import Mapping
from datetime import datetime
from enum import StrEnum
from typing import Any
from typing import Any, Literal
from urllib.parse import urlparse
from pydantic import BaseModel, Field
@ -136,6 +136,8 @@ class MCPToolManageService:
configuration: MCPConfiguration,
authentication: MCPAuthentication | None = None,
headers: dict[str, str] | None = None,
forward_user_identity: bool = False,
identity_mode: Literal["off", "idp_token"] = "off",
) -> ToolProviderApiEntity:
"""Create a new MCP provider."""
# Validate URL format
@ -171,6 +173,8 @@ class MCPToolManageService:
sse_read_timeout=configuration.sse_read_timeout,
encrypted_headers=encrypted_headers,
encrypted_credentials=encrypted_credentials,
forward_user_identity=forward_user_identity,
identity_mode=identity_mode,
)
self._session.add(mcp_tool)
@ -194,6 +198,8 @@ class MCPToolManageService:
configuration: MCPConfiguration,
authentication: MCPAuthentication | None = None,
validation_result: ServerUrlValidationResult | None = None,
forward_user_identity: bool | None = None,
identity_mode: Literal["off", "idp_token"] | None = None,
) -> None:
"""
Update an MCP provider.
@ -255,6 +261,14 @@ class MCPToolManageService:
if authentication and authentication.client_id:
mcp_provider.encrypted_credentials = self._process_credentials(authentication, mcp_provider, tenant_id)
# Update user-identity forwarding settings if provided.
# None means "leave unchanged" so this stays backwards-compatible
# with existing callers that don't know about M2.
if forward_user_identity is not None:
mcp_provider.forward_user_identity = forward_user_identity
if identity_mode is not None:
mcp_provider.identity_mode = identity_mode
# Flush changes to database
self._session.flush()

View File

@ -392,12 +392,14 @@ export type McpProviderCreatePayload = {
configuration?: {
[key: string]: unknown
} | null
forward_user_identity?: boolean
headers?: {
[key: string]: unknown
} | null
icon: string
icon_background?: string
icon_type: string
identity_mode?: 'idp_token' | 'off'
name: string
server_identifier: string
server_url: string
@ -410,12 +412,14 @@ export type McpProviderUpdatePayload = {
configuration?: {
[key: string]: unknown
} | null
forward_user_identity?: boolean
headers?: {
[key: string]: unknown
} | null
icon: string
icon_background?: string
icon_type: string
identity_mode?: 'idp_token' | 'off'
name: string
provider_id: string
server_identifier: string

View File

@ -361,10 +361,12 @@ export const zMcpProviderDeletePayload = z.object({
export const zMcpProviderCreatePayload = z.object({
authentication: z.record(z.string(), z.unknown()).nullish(),
configuration: z.record(z.string(), z.unknown()).nullish(),
forward_user_identity: z.boolean().optional().default(false),
headers: z.record(z.string(), z.unknown()).nullish(),
icon: z.string(),
icon_background: z.string().optional().default(''),
icon_type: z.string(),
identity_mode: z.enum(['idp_token', 'off']).optional().default('off'),
name: z.string(),
server_identifier: z.string(),
server_url: z.string(),
@ -376,10 +378,12 @@ export const zMcpProviderCreatePayload = z.object({
export const zMcpProviderUpdatePayload = z.object({
authentication: z.record(z.string(), z.unknown()).nullish(),
configuration: z.record(z.string(), z.unknown()).nullish(),
forward_user_identity: z.boolean().optional().default(false),
headers: z.record(z.string(), z.unknown()).nullish(),
icon: z.string(),
icon_background: z.string().optional().default(''),
icon_type: z.string(),
identity_mode: z.enum(['idp_token', 'off']).optional().default('off'),
name: z.string(),
provider_id: z.string(),
server_identifier: z.string(),