Files
dify/api/core/app/file_access/controller.py

120 lines
3.6 KiB
Python

from __future__ import annotations
from collections.abc import Callable
from typing import override
from sqlalchemy import and_, or_, select
from sqlalchemy.orm import Session
from sqlalchemy.sql import Select
from models import ToolFile, UploadFile
from models.enums import CreatorUserRole
from .protocols import FileAccessControllerProtocol
from .scope import FileAccessScope, get_current_file_access_scope
class DatabaseFileAccessController(FileAccessControllerProtocol):
"""Workflow-layer authorization helper for database-backed file lookups.
Tenant scoping remains mandatory. When the current execution belongs to an
end user, the lookup is additionally constrained to that end user's file
ownership markers, plus upload files explicitly granted by the current
execution context.
"""
_scope_getter: Callable[[], FileAccessScope | None]
def __init__(
self,
*,
scope_getter: Callable[[], FileAccessScope | None] = get_current_file_access_scope,
) -> None:
self._scope_getter = scope_getter
@override
def current_scope(self) -> FileAccessScope | None:
return self._scope_getter()
@override
def apply_upload_file_filters(
self,
stmt: Select[tuple[UploadFile]],
*,
scope: FileAccessScope | None = None,
) -> Select[tuple[UploadFile]]:
resolved_scope = scope or self.current_scope()
if resolved_scope is None:
return stmt
scoped_stmt = stmt.where(UploadFile.tenant_id == resolved_scope.tenant_id)
if not resolved_scope.requires_user_ownership:
return scoped_stmt
user_owned_filter = and_(
UploadFile.created_by_role == CreatorUserRole.END_USER,
UploadFile.created_by == resolved_scope.user_id,
)
if not resolved_scope.granted_upload_file_ids:
return scoped_stmt.where(user_owned_filter)
return scoped_stmt.where(
or_(
user_owned_filter,
UploadFile.id.in_(resolved_scope.granted_upload_file_ids),
)
)
@override
def apply_tool_file_filters(
self,
stmt: Select[tuple[ToolFile]],
*,
scope: FileAccessScope | None = None,
) -> Select[tuple[ToolFile]]:
resolved_scope = scope or self.current_scope()
if resolved_scope is None:
return stmt
scoped_stmt = stmt.where(ToolFile.tenant_id == resolved_scope.tenant_id)
if not resolved_scope.requires_user_ownership:
return scoped_stmt
return scoped_stmt.where(ToolFile.user_id == resolved_scope.user_id)
@override
def get_upload_file(
self,
*,
session: Session,
file_id: str,
scope: FileAccessScope | None = None,
) -> UploadFile | None:
resolved_scope = scope or self.current_scope()
if resolved_scope is None:
return session.get(UploadFile, file_id)
stmt = self.apply_upload_file_filters(
select(UploadFile).where(UploadFile.id == file_id),
scope=resolved_scope,
)
return session.scalar(stmt)
@override
def get_tool_file(
self,
*,
session: Session,
file_id: str,
scope: FileAccessScope | None = None,
) -> ToolFile | None:
resolved_scope = scope or self.current_scope()
if resolved_scope is None:
return session.get(ToolFile, file_id)
stmt = self.apply_tool_file_filters(
select(ToolFile).where(ToolFile.id == file_id),
scope=resolved_scope,
)
return session.scalar(stmt)