diff --git a/admin/client/admin_client.py b/admin/client/admin_client.py index 174cd5857..79284a53f 100644 --- a/admin/client/admin_client.py +++ b/admin/client/admin_client.py @@ -17,6 +17,7 @@ import argparse import base64 import getpass +import urllib.parse from cmd import Cmd from typing import Any, Dict, List @@ -60,6 +61,9 @@ sql_command: list_services | list_variables | list_configs | list_environments + | generate_key + | list_keys + | drop_key // meta command definition meta_command: "\\" meta_command_name [meta_args] @@ -107,6 +111,9 @@ VAR: "VAR"i VARS: "VARS"i CONFIGS: "CONFIGS"i ENVS: "ENVS"i +KEY: "KEY"i +KEYS: "KEYS"i +GENERATE: "GENERATE"i list_services: LIST SERVICES ";" show_service: SHOW SERVICE NUMBER ";" @@ -144,6 +151,10 @@ list_variables: LIST VARS ";" list_configs: LIST CONFIGS ";" list_environments: LIST ENVS ";" +generate_key: GENERATE KEY FOR USER quoted_string ";" +list_keys: LIST KEYS OF quoted_string ";" +drop_key: DROP KEY quoted_string OF quoted_string ";" + show_version: SHOW VERSION ";" action_list: identifier ("," identifier)* @@ -296,6 +307,19 @@ class AdminTransformer(Transformer): def list_environments(self, items): return {"type": "list_environments"} + def generate_key(self, items): + user_name = items[4] + return {"type": "generate_key", "user_name": user_name} + + def list_keys(self, items): + user_name = items[3] + return {"type": "list_keys", "user_name": user_name} + + def drop_key(self, items): + key = items[2] + user_name = items[4] + return {"type": "drop_key", "key": key, "user_name": user_name} + def action_list(self, items): return items @@ -362,6 +386,9 @@ SHOW USER PERMISSION SHOW VERSION GRANT ADMIN REVOKE ADMIN +GENERATE KEY FOR USER +LIST KEYS OF +DROP KEY OF Meta Commands: \\?, \\h, \\help Show this help @@ -664,6 +691,12 @@ class AdminCLI(Cmd): self._list_configs(command_dict) case "list_environments": self._list_environments(command_dict) + case "generate_key": + self._generate_key(command_dict) + case "list_keys": + self._list_keys(command_dict) + case "drop_key": + self._drop_key(command_dict) case "meta": self._handle_meta_command(command_dict) case _: @@ -796,7 +829,6 @@ class AdminCLI(Cmd): else: print(f"Unknown activate status: {activate_status}.") - def _grant_admin(self, command): user_name_tree: Tree = command["user_name"] user_name: str = user_name_tree.children[0].strip("'\"") @@ -1044,6 +1076,46 @@ class AdminCLI(Cmd): else: print(f"Fail to show version, code: {res_json['code']}, message: {res_json['message']}") + def _generate_key(self, command: dict[str, Any]) -> None: + username_tree: Tree = command["user_name"] + user_name: str = username_tree.children[0].strip("'\"") + print(f"Generating API key for user: {user_name}") + url: str = f"http://{self.host}:{self.port}/api/v1/admin/users/{user_name}/new_token" + response: requests.Response = self.session.post(url) + res_json: dict[str, Any] = response.json() + if response.status_code == 200: + self._print_table_simple(res_json["data"]) + else: + print(f"Failed to generate key for user {user_name}, code: {res_json['code']}, message: {res_json['message']}") + + def _list_keys(self, command: dict[str, Any]) -> None: + username_tree: Tree = command["user_name"] + user_name: str = username_tree.children[0].strip("'\"") + print(f"Listing API keys for user: {user_name}") + url: str = f"http://{self.host}:{self.port}/api/v1/admin/users/{user_name}/token_list" + response: requests.Response = self.session.get(url) + res_json: dict[str, Any] = response.json() + if response.status_code == 200: + self._print_table_simple(res_json["data"]) + else: + print(f"Failed to list keys for user {user_name}, code: {res_json['code']}, message: {res_json['message']}") + + def _drop_key(self, command: dict[str, Any]) -> None: + key_tree: Tree = command["key"] + key: str = key_tree.children[0].strip("'\"") + username_tree: Tree = command["user_name"] + user_name: str = username_tree.children[0].strip("'\"") + print(f"Dropping API key for user: {user_name}") + # URL encode the key to handle special characters + encoded_key: str = urllib.parse.quote(key, safe="") + url: str = f"http://{self.host}:{self.port}/api/v1/admin/users/{user_name}/token/{encoded_key}" + response: requests.Response = self.session.delete(url) + res_json: dict[str, Any] = response.json() + if response.status_code == 200: + print(res_json["message"]) + else: + print(f"Failed to drop key for user {user_name}, code: {res_json['code']}, message: {res_json['message']}") + def _handle_meta_command(self, command): meta_command = command["command"] args = command.get("args", []) @@ -1077,11 +1149,11 @@ def main(): else: if cli.verify_admin(args, single_command=False): print(r""" - ____ ___ ______________ ___ __ _ - / __ \/ | / ____/ ____/ /___ _ __ / | ____/ /___ ___ (_)___ + ____ ___ ______________ ___ __ _ + / __ \/ | / ____/ ____/ /___ _ __ / | ____/ /___ ___ (_)___ / /_/ / /| |/ / __/ /_ / / __ \ | /| / / / /| |/ __ / __ `__ \/ / __ \ / _, _/ ___ / /_/ / __/ / / /_/ / |/ |/ / / ___ / /_/ / / / / / / / / / / - /_/ |_/_/ |_\____/_/ /_/\____/|__/|__/ /_/ |_\__,_/_/ /_/ /_/_/_/ /_/ + /_/ |_/_/ |_\____/_/ /_/\____/|__/|__/ /_/ |_\__,_/_/ /_/ /_/_/_/ /_/ """) cli.cmdloop() diff --git a/admin/server/routes.py b/admin/server/routes.py index ec63dbfe1..d7d3e53f8 100644 --- a/admin/server/routes.py +++ b/admin/server/routes.py @@ -15,8 +15,11 @@ # import secrets +from typing import Any -from flask import Blueprint, request +from common.time_utils import current_timestamp, datetime_format +from datetime import datetime +from flask import Blueprint, Response, request from flask_login import current_user, login_required, logout_user from auth import login_verify, login_admin, check_admin_auth @@ -25,19 +28,20 @@ from services import UserMgr, ServiceMgr, UserServiceMgr, SettingsMgr, ConfigMgr from roles import RoleMgr from api.common.exceptions import AdminException from common.versions import get_ragflow_version +from api.utils.api_utils import generate_confirmation_token -admin_bp = Blueprint('admin', __name__, url_prefix='/api/v1/admin') +admin_bp = Blueprint("admin", __name__, url_prefix="/api/v1/admin") -@admin_bp.route('/ping', methods=['GET']) +@admin_bp.route("/ping", methods=["GET"]) def ping(): - return success_response('PONG') + return success_response("PONG") -@admin_bp.route('/login', methods=['POST']) +@admin_bp.route("/login", methods=["POST"]) def login(): if not request.json: - return error_response('Authorize admin failed.' ,400) + return error_response("Authorize admin failed.", 400) try: email = request.json.get("email", "") password = request.json.get("password", "") @@ -46,7 +50,7 @@ def login(): return error_response(str(e), 500) -@admin_bp.route('/logout', methods=['GET']) +@admin_bp.route("/logout", methods=["GET"]) @login_required def logout(): try: @@ -58,7 +62,7 @@ def logout(): return error_response(str(e), 500) -@admin_bp.route('/auth', methods=['GET']) +@admin_bp.route("/auth", methods=["GET"]) @login_verify def auth_admin(): try: @@ -67,7 +71,7 @@ def auth_admin(): return error_response(str(e), 500) -@admin_bp.route('/users', methods=['GET']) +@admin_bp.route("/users", methods=["GET"]) @login_required @check_admin_auth def list_users(): @@ -78,18 +82,18 @@ def list_users(): return error_response(str(e), 500) -@admin_bp.route('/users', methods=['POST']) +@admin_bp.route("/users", methods=["POST"]) @login_required @check_admin_auth def create_user(): try: data = request.get_json() - if not data or 'username' not in data or 'password' not in data: + if not data or "username" not in data or "password" not in data: return error_response("Username and password are required", 400) - username = data['username'] - password = data['password'] - role = data.get('role', 'user') + username = data["username"] + password = data["password"] + role = data.get("role", "user") res = UserMgr.create_user(username, password, role) if res["success"]: @@ -105,7 +109,7 @@ def create_user(): return error_response(str(e)) -@admin_bp.route('/users/', methods=['DELETE']) +@admin_bp.route("/users/", methods=["DELETE"]) @login_required @check_admin_auth def delete_user(username): @@ -122,16 +126,16 @@ def delete_user(username): return error_response(str(e), 500) -@admin_bp.route('/users//password', methods=['PUT']) +@admin_bp.route("/users//password", methods=["PUT"]) @login_required @check_admin_auth def change_password(username): try: data = request.get_json() - if not data or 'new_password' not in data: + if not data or "new_password" not in data: return error_response("New password is required", 400) - new_password = data['new_password'] + new_password = data["new_password"] msg = UserMgr.update_user_password(username, new_password) return success_response(None, msg) @@ -141,15 +145,15 @@ def change_password(username): return error_response(str(e), 500) -@admin_bp.route('/users//activate', methods=['PUT']) +@admin_bp.route("/users//activate", methods=["PUT"]) @login_required @check_admin_auth def alter_user_activate_status(username): try: data = request.get_json() - if not data or 'activate_status' not in data: + if not data or "activate_status" not in data: return error_response("Activation status is required", 400) - activate_status = data['activate_status'] + activate_status = data["activate_status"] msg = UserMgr.update_user_activate_status(username, activate_status) return success_response(None, msg) except AdminException as e: @@ -158,7 +162,7 @@ def alter_user_activate_status(username): return error_response(str(e), 500) -@admin_bp.route('/users//admin', methods=['PUT']) +@admin_bp.route("/users//admin", methods=["PUT"]) @login_required @check_admin_auth def grant_admin(username): @@ -173,7 +177,8 @@ def grant_admin(username): except Exception as e: return error_response(str(e), 500) -@admin_bp.route('/users//admin', methods=['DELETE']) + +@admin_bp.route("/users//admin", methods=["DELETE"]) @login_required @check_admin_auth def revoke_admin(username): @@ -188,7 +193,8 @@ def revoke_admin(username): except Exception as e: return error_response(str(e), 500) -@admin_bp.route('/users/', methods=['GET']) + +@admin_bp.route("/users/", methods=["GET"]) @login_required @check_admin_auth def get_user_details(username): @@ -202,7 +208,7 @@ def get_user_details(username): return error_response(str(e), 500) -@admin_bp.route('/users//datasets', methods=['GET']) +@admin_bp.route("/users//datasets", methods=["GET"]) @login_required @check_admin_auth def get_user_datasets(username): @@ -216,7 +222,7 @@ def get_user_datasets(username): return error_response(str(e), 500) -@admin_bp.route('/users//agents', methods=['GET']) +@admin_bp.route("/users//agents", methods=["GET"]) @login_required @check_admin_auth def get_user_agents(username): @@ -230,7 +236,7 @@ def get_user_agents(username): return error_response(str(e), 500) -@admin_bp.route('/services', methods=['GET']) +@admin_bp.route("/services", methods=["GET"]) @login_required @check_admin_auth def get_services(): @@ -241,7 +247,7 @@ def get_services(): return error_response(str(e), 500) -@admin_bp.route('/service_types/', methods=['GET']) +@admin_bp.route("/service_types/", methods=["GET"]) @login_required @check_admin_auth def get_services_by_type(service_type_str): @@ -252,7 +258,7 @@ def get_services_by_type(service_type_str): return error_response(str(e), 500) -@admin_bp.route('/services/', methods=['GET']) +@admin_bp.route("/services/", methods=["GET"]) @login_required @check_admin_auth def get_service(service_id): @@ -263,7 +269,7 @@ def get_service(service_id): return error_response(str(e), 500) -@admin_bp.route('/services/', methods=['DELETE']) +@admin_bp.route("/services/", methods=["DELETE"]) @login_required @check_admin_auth def shutdown_service(service_id): @@ -274,7 +280,7 @@ def shutdown_service(service_id): return error_response(str(e), 500) -@admin_bp.route('/services/', methods=['PUT']) +@admin_bp.route("/services/", methods=["PUT"]) @login_required @check_admin_auth def restart_service(service_id): @@ -285,38 +291,38 @@ def restart_service(service_id): return error_response(str(e), 500) -@admin_bp.route('/roles', methods=['POST']) +@admin_bp.route("/roles", methods=["POST"]) @login_required @check_admin_auth def create_role(): try: data = request.get_json() - if not data or 'role_name' not in data: + if not data or "role_name" not in data: return error_response("Role name is required", 400) - role_name: str = data['role_name'] - description: str = data['description'] + role_name: str = data["role_name"] + description: str = data["description"] res = RoleMgr.create_role(role_name, description) return success_response(res) except Exception as e: return error_response(str(e), 500) -@admin_bp.route('/roles/', methods=['PUT']) +@admin_bp.route("/roles/", methods=["PUT"]) @login_required @check_admin_auth def update_role(role_name: str): try: data = request.get_json() - if not data or 'description' not in data: + if not data or "description" not in data: return error_response("Role description is required", 400) - description: str = data['description'] + description: str = data["description"] res = RoleMgr.update_role_description(role_name, description) return success_response(res) except Exception as e: return error_response(str(e), 500) -@admin_bp.route('/roles/', methods=['DELETE']) +@admin_bp.route("/roles/", methods=["DELETE"]) @login_required @check_admin_auth def delete_role(role_name: str): @@ -327,7 +333,7 @@ def delete_role(role_name: str): return error_response(str(e), 500) -@admin_bp.route('/roles', methods=['GET']) +@admin_bp.route("/roles", methods=["GET"]) @login_required @check_admin_auth def list_roles(): @@ -338,7 +344,7 @@ def list_roles(): return error_response(str(e), 500) -@admin_bp.route('/roles//permission', methods=['GET']) +@admin_bp.route("/roles//permission", methods=["GET"]) @login_required @check_admin_auth def get_role_permission(role_name: str): @@ -349,54 +355,54 @@ def get_role_permission(role_name: str): return error_response(str(e), 500) -@admin_bp.route('/roles//permission', methods=['POST']) +@admin_bp.route("/roles//permission", methods=["POST"]) @login_required @check_admin_auth def grant_role_permission(role_name: str): try: data = request.get_json() - if not data or 'actions' not in data or 'resource' not in data: + if not data or "actions" not in data or "resource" not in data: return error_response("Permission is required", 400) - actions: list = data['actions'] - resource: str = data['resource'] + actions: list = data["actions"] + resource: str = data["resource"] res = RoleMgr.grant_role_permission(role_name, actions, resource) return success_response(res) except Exception as e: return error_response(str(e), 500) -@admin_bp.route('/roles//permission', methods=['DELETE']) +@admin_bp.route("/roles//permission", methods=["DELETE"]) @login_required @check_admin_auth def revoke_role_permission(role_name: str): try: data = request.get_json() - if not data or 'actions' not in data or 'resource' not in data: + if not data or "actions" not in data or "resource" not in data: return error_response("Permission is required", 400) - actions: list = data['actions'] - resource: str = data['resource'] + actions: list = data["actions"] + resource: str = data["resource"] res = RoleMgr.revoke_role_permission(role_name, actions, resource) return success_response(res) except Exception as e: return error_response(str(e), 500) -@admin_bp.route('/users//role', methods=['PUT']) +@admin_bp.route("/users//role", methods=["PUT"]) @login_required @check_admin_auth def update_user_role(user_name: str): try: data = request.get_json() - if not data or 'role_name' not in data: + if not data or "role_name" not in data: return error_response("Role name is required", 400) - role_name: str = data['role_name'] + role_name: str = data["role_name"] res = RoleMgr.update_user_role(user_name, role_name) return success_response(res) except Exception as e: return error_response(str(e), 500) -@admin_bp.route('/users//permission', methods=['GET']) +@admin_bp.route("/users//permission", methods=["GET"]) @login_required @check_admin_auth def get_user_permission(user_name: str): @@ -406,19 +412,20 @@ def get_user_permission(user_name: str): except Exception as e: return error_response(str(e), 500) -@admin_bp.route('/variables', methods=['PUT']) + +@admin_bp.route("/variables", methods=["PUT"]) @login_required @check_admin_auth def set_variable(): try: data = request.get_json() - if not data and 'var_name' not in data: + if not data and "var_name" not in data: return error_response("Var name is required", 400) - if 'var_value' not in data: + if "var_value" not in data: return error_response("Var value is required", 400) - var_name: str = data['var_name'] - var_value: str = data['var_value'] + var_name: str = data["var_name"] + var_value: str = data["var_value"] SettingsMgr.update_by_name(var_name, var_value) return success_response(None, "Set variable successfully") @@ -427,7 +434,8 @@ def set_variable(): except Exception as e: return error_response(str(e), 500) -@admin_bp.route('/variables', methods=['GET']) + +@admin_bp.route("/variables", methods=["GET"]) @login_required @check_admin_auth def get_variable(): @@ -439,9 +447,9 @@ def get_variable(): # get var data = request.get_json() - if not data and 'var_name' not in data: + if not data and "var_name" not in data: return error_response("Var name is required", 400) - var_name: str = data['var_name'] + var_name: str = data["var_name"] res = SettingsMgr.get_by_name(var_name) return success_response(res) except AdminException as e: @@ -449,7 +457,8 @@ def get_variable(): except Exception as e: return error_response(str(e), 500) -@admin_bp.route('/configs', methods=['GET']) + +@admin_bp.route("/configs", methods=["GET"]) @login_required @check_admin_auth def get_config(): @@ -461,7 +470,8 @@ def get_config(): except Exception as e: return error_response(str(e), 500) -@admin_bp.route('/environments', methods=['GET']) + +@admin_bp.route("/environments", methods=["GET"]) @login_required @check_admin_auth def get_environments(): @@ -473,7 +483,69 @@ def get_environments(): except Exception as e: return error_response(str(e), 500) -@admin_bp.route('/version', methods=['GET']) + +@admin_bp.route("/users//new_token", methods=["POST"]) +@login_required +@check_admin_auth +def generate_user_api_key(username: str) -> tuple[Response, int]: + try: + user_details: list[dict[str, Any]] = UserMgr.get_user_details(username) + if not user_details: + return error_response("User not found!", 404) + tenants: list[dict[str, Any]] = UserServiceMgr.get_user_tenants(username) + if not tenants: + return error_response("Tenant not found!", 404) + tenant_id: str = tenants[0]["tenant_id"] + token: str = generate_confirmation_token() + obj: dict[str, Any] = { + "tenant_id": tenant_id, + "token": token, + "beta": generate_confirmation_token().replace("ragflow-", "")[:32], + "create_time": current_timestamp(), + "create_date": datetime_format(datetime.now()), + "update_time": None, + "update_date": None, + } + + if not UserMgr.save_api_token(obj): + return error_response("Failed to generate API key!", 500) + return success_response(obj, "API key generated successfully") + except AdminException as e: + return error_response(e.message, e.code) + except Exception as e: + return error_response(str(e), 500) + + +@admin_bp.route("/users//token_list", methods=["GET"]) +@login_required +@check_admin_auth +def get_user_api_keys(username: str) -> tuple[Response, int]: + try: + api_keys: list[dict[str, Any]] = UserMgr.get_user_api_key(username) + return success_response(api_keys, "Get user API keys") + except AdminException as e: + return error_response(e.message, e.code) + except Exception as e: + return error_response(str(e), 500) + + +@admin_bp.route("/users//token/", methods=["DELETE"]) +@login_required +@check_admin_auth +def delete_user_api_key(username: str, token: str) -> tuple[Response, int]: + try: + deleted = UserMgr.delete_api_token(username, token) + if deleted: + return success_response(None, "API key deleted successfully") + else: + return error_response("API key not found or could not be deleted", 404) + except AdminException as e: + return error_response(e.message, e.code) + except Exception as e: + return error_response(str(e), 500) + + +@admin_bp.route("/version", methods=["GET"]) @login_required @check_admin_auth def show_version(): diff --git a/admin/server/services.py b/admin/server/services.py index a3e29a51c..8b4e23476 100644 --- a/admin/server/services.py +++ b/admin/server/services.py @@ -17,14 +17,18 @@ import os import logging import re +from typing import Any + from werkzeug.security import check_password_hash from common.constants import ActiveEnum from api.db.services import UserService from api.db.joint_services.user_account_service import create_new_user, delete_user_data from api.db.services.canvas_service import UserCanvasService -from api.db.services.user_service import TenantService +from api.db.services.user_service import TenantService, UserTenantService from api.db.services.knowledgebase_service import KnowledgebaseService from api.db.services.system_settings_service import SystemSettingsService +from api.db.services.api_service import APITokenService +from api.db.db_models import APIToken from api.utils.crypt import decrypt from api.utils import health_utils @@ -38,13 +42,15 @@ class UserMgr: users = UserService.get_all_users() result = [] for user in users: - result.append({ - 'email': user.email, - 'nickname': user.nickname, - 'create_date': user.create_date, - 'is_active': user.is_active, - 'is_superuser': user.is_superuser, - }) + result.append( + { + "email": user.email, + "nickname": user.nickname, + "create_date": user.create_date, + "is_active": user.is_active, + "is_superuser": user.is_superuser, + } + ) return result @staticmethod @@ -53,19 +59,21 @@ class UserMgr: users = UserService.query_user_by_email(username) result = [] for user in users: - result.append({ - 'avatar': user.avatar, - 'email': user.email, - 'language': user.language, - 'last_login_time': user.last_login_time, - 'is_active': user.is_active, - 'is_anonymous': user.is_anonymous, - 'login_channel': user.login_channel, - 'status': user.status, - 'is_superuser': user.is_superuser, - 'create_date': user.create_date, - 'update_date': user.update_date - }) + result.append( + { + "avatar": user.avatar, + "email": user.email, + "language": user.language, + "last_login_time": user.last_login_time, + "is_active": user.is_active, + "is_anonymous": user.is_anonymous, + "login_channel": user.login_channel, + "status": user.status, + "is_superuser": user.is_superuser, + "create_date": user.create_date, + "update_date": user.update_date, + } + ) return result @staticmethod @@ -127,8 +135,8 @@ class UserMgr: # format activate_status before handle _activate_status = activate_status.lower() target_status = { - 'on': ActiveEnum.ACTIVE.value, - 'off': ActiveEnum.INACTIVE.value, + "on": ActiveEnum.ACTIVE.value, + "off": ActiveEnum.INACTIVE.value, }.get(_activate_status) if not target_status: raise AdminException(f"Invalid activate_status: {activate_status}") @@ -138,6 +146,49 @@ class UserMgr: UserService.update_user(usr.id, {"is_active": target_status}) return f"Turn {_activate_status} user activate status successfully!" + @staticmethod + def get_user_api_key(username: str) -> list[dict[str, Any]]: + # use email to find user. check exist and unique. + user_list: list[Any] = UserService.query_user_by_email(username) + if not user_list: + raise UserNotFoundError(username) + elif len(user_list) > 1: + raise AdminException(f"More than one user with username '{username}' found!") + + usr: Any = user_list[0] + # tenant_id is typically the same as user_id for the owner tenant + tenant_id: str = usr.id + + # Query all API tokens for this tenant + api_tokens: Any = APITokenService.query(tenant_id=tenant_id) + + result: list[dict[str, Any]] = [] + for token_obj in api_tokens: + result.append(token_obj.to_dict()) + + return result + + @staticmethod + def save_api_token(api_token: dict[str, Any]) -> bool: + return APITokenService.save(**api_token) + + @staticmethod + def delete_api_token(username: str, token: str) -> bool: + # use email to find user. check exist and unique. + user_list: list[Any] = UserService.query_user_by_email(username) + if not user_list: + raise UserNotFoundError(username) + elif len(user_list) > 1: + raise AdminException(f"Exist more than 1 user: {username}!") + + usr: Any = user_list[0] + # tenant_id is typically the same as user_id for the owner tenant + tenant_id: str = usr.id + + # Delete the API token + deleted_count: int = APITokenService.filter_delete([APIToken.tenant_id == tenant_id, APIToken.token == token]) + return deleted_count > 0 + @staticmethod def grant_admin(username: str): # use email to find user. check exist and unique. @@ -146,6 +197,7 @@ class UserMgr: raise UserNotFoundError(username) elif len(user_list) > 1: raise AdminException(f"Exist more than 1 user: {username}!") + # check activate status different from new usr = user_list[0] if usr.is_superuser: @@ -172,7 +224,6 @@ class UserMgr: class UserServiceMgr: - @staticmethod def get_user_datasets(username): # use email to find user. @@ -202,39 +253,43 @@ class UserServiceMgr: tenant_ids = [m["tenant_id"] for m in tenants] # filter permitted agents and owned agents res = UserCanvasService.get_all_agents_by_tenant_ids(tenant_ids, usr.id) - return [{ - 'title': r['title'], - 'permission': r['permission'], - 'canvas_category': r['canvas_category'].split('_')[0], - 'avatar': r['avatar'] - } for r in res] + return [{"title": r["title"], "permission": r["permission"], "canvas_category": r["canvas_category"].split("_")[0], "avatar": r["avatar"]} for r in res] + + @staticmethod + def get_user_tenants(email: str) -> list[dict[str, Any]]: + users: list[Any] = UserService.query_user_by_email(email) + if not users: + raise UserNotFoundError(email) + user: Any = users[0] + + tenants: list[dict[str, Any]] = UserTenantService.get_tenants_by_user_id(user.id) + return tenants class ServiceMgr: - @staticmethod def get_all_services(): - doc_engine = os.getenv('DOC_ENGINE', 'elasticsearch') + doc_engine = os.getenv("DOC_ENGINE", "elasticsearch") result = [] configs = SERVICE_CONFIGS.configs for service_id, config in enumerate(configs): config_dict = config.to_dict() - if config_dict['service_type'] == 'retrieval': - if config_dict['extra']['retrieval_type'] != doc_engine: + if config_dict["service_type"] == "retrieval": + if config_dict["extra"]["retrieval_type"] != doc_engine: continue try: service_detail = ServiceMgr.get_service_details(service_id) if "status" in service_detail: - config_dict['status'] = service_detail['status'] + config_dict["status"] = service_detail["status"] else: - config_dict['status'] = 'timeout' + config_dict["status"] = "timeout" except Exception as e: logging.warning(f"Can't get service details, error: {e}") - config_dict['status'] = 'timeout' - if not config_dict['host']: - config_dict['host'] = '-' - if not config_dict['port']: - config_dict['port'] = '-' + config_dict["status"] = "timeout" + if not config_dict["host"]: + config_dict["host"] = "-" + if not config_dict["port"]: + config_dict["port"] = "-" result.append(config_dict) return result @@ -250,11 +305,11 @@ class ServiceMgr: raise AdminException(f"invalid service_index: {service_idx}") service_config = configs[service_idx] - service_info = {'name': service_config.name, 'detail_func_name': service_config.detail_func_name} + service_info = {"name": service_config.name, "detail_func_name": service_config.detail_func_name} - detail_func = getattr(health_utils, service_info.get('detail_func_name')) + detail_func = getattr(health_utils, service_info.get("detail_func_name")) res = detail_func() - res.update({'service_name': service_info.get('name')}) + res.update({"service_name": service_info.get("name")}) return res @staticmethod @@ -265,19 +320,21 @@ class ServiceMgr: def restart_service(service_id: int): raise AdminException("restart_service: not implemented") + class SettingsMgr: @staticmethod def get_all(): - settings = SystemSettingsService.get_all() result = [] for setting in settings: - result.append({ - 'name': setting.name, - 'source': setting.source, - 'data_type': setting.data_type, - 'value': setting.value, - }) + result.append( + { + "name": setting.name, + "source": setting.source, + "data_type": setting.data_type, + "value": setting.value, + } + ) return result @staticmethod @@ -287,12 +344,14 @@ class SettingsMgr: raise AdminException(f"Can't get setting: {name}") result = [] for setting in settings: - result.append({ - 'name': setting.name, - 'source': setting.source, - 'data_type': setting.data_type, - 'value': setting.value, - }) + result.append( + { + "name": setting.name, + "source": setting.source, + "data_type": setting.data_type, + "value": setting.value, + } + ) return result @staticmethod @@ -308,8 +367,8 @@ class SettingsMgr: else: raise AdminException(f"No setting: {name}") -class ConfigMgr: +class ConfigMgr: @staticmethod def get_all(): result = [] @@ -319,12 +378,13 @@ class ConfigMgr: result.append(config_dict) return result + class EnvironmentsMgr: @staticmethod def get_all(): result = [] - env_kv = {"env": "DOC_ENGINE", "value": os.getenv('DOC_ENGINE')} + env_kv = {"env": "DOC_ENGINE", "value": os.getenv("DOC_ENGINE")} result.append(env_kv) env_kv = {"env": "DEFAULT_SUPERUSER_EMAIL", "value": os.getenv("DEFAULT_SUPERUSER_EMAIL", "admin@ragflow.io")} diff --git a/docs/guides/admin/admin_cli.md b/docs/guides/admin/admin_cli.md index a8a7f0983..fed8a6264 100644 --- a/docs/guides/admin/admin_cli.md +++ b/docs/guides/admin/admin_cli.md @@ -93,6 +93,21 @@ Commands are case-insensitive and must be terminated with a semicolon(;). - Changes the user to active or inactive. - [Example](#example-alter-user-active) +`GENERATE KEY FOR USER ;` + +- Generates a new API key for the specified user. +- [Example](#example-generate-key) + +`LIST KEYS OF ;` + +- Lists all API keys associated with the specified user. +- [Example](#example-list-keys) + +`DROP KEY OF ;` + +- Deletes a specific API key for the specified user. +- [Example](#example-drop-key) + ### Data and Agent Commands `LIST DATASETS OF ;` @@ -345,6 +360,44 @@ Delete done! Delete user's data at the same time. + + +- Generate API key for user. + +``` +admin> generate key for user "example@ragflow.io"; +Generating API key for user: example@ragflow.io ++----------------------------------+-------------------------------+---------------+----------------------------------+-----------------------------------------------------+-------------+-------------+ +| beta | create_date | create_time | tenant_id | token | update_date | update_time | ++----------------------------------+-------------------------------+---------------+----------------------------------+-----------------------------------------------------+-------------+-------------+ +| Es9OpZ6hrnPGeYA3VU1xKUkj6NCb7cp- | Mon, 12 Jan 2026 15:19:11 GMT | 1768227551361 | 5d5ea8a3efc111f0a79b80fa5b90e659 | ragflow-piwVJHEk09M5UN3LS_Xx9HA7yehs3yNOc9GGsD4jzus | None | None | ++----------------------------------+-------------------------------+---------------+----------------------------------+-----------------------------------------------------+-------------+-------------+ +``` + + + +- List all API keys for user. + +``` +admin> list keys of "example@ragflow.io"; +Listing API keys for user: example@ragflow.io ++----------------------------------+-------------------------------+---------------+-----------+--------+----------------------------------+-----------------------------------------------------+-------------------------------+---------------+ +| beta | create_date | create_time | dialog_id | source | tenant_id | token | update_date | update_time | ++----------------------------------+-------------------------------+---------------+-----------+--------+----------------------------------+-----------------------------------------------------+-------------------------------+---------------+ +| Es9OpZ6hrnPGeYA3VU1xKUkj6NCb7cp- | Mon, 12 Jan 2026 15:19:11 GMT | 1768227551361 | None | None | 5d5ea8a3efc111f0a79b80fa5b90e659 | ragflow-piwVJHEk09M5UN3LS_Xx9HA7yehs3yNOc9GGsD4jzus | Mon, 12 Jan 2026 15:19:11 GMT | 1768227551361 | ++----------------------------------+-------------------------------+---------------+-----------+--------+----------------------------------+-----------------------------------------------------+-------------------------------+---------------+ +``` + + + +- Drop API key for user. + +``` +admin> drop key "ragflow-piwVJHEk09M5UN3LS_Xx9HA7yehs3yNOc9GGsD4jzus" of "example@ragflow.io"; +Dropping API key for user: example@ragflow.io +API key deleted successfully +``` + - List the specified user's dataset. @@ -499,19 +552,34 @@ admin> \help command: \help Commands: - LIST SERVICES - SHOW SERVICE - STARTUP SERVICE - SHUTDOWN SERVICE - RESTART SERVICE - LIST USERS - SHOW USER - DROP USER - CREATE USER - ALTER USER PASSWORD - ALTER USER ACTIVE - LIST DATASETS OF - LIST AGENTS OF +LIST SERVICES +SHOW SERVICE +STARTUP SERVICE +SHUTDOWN SERVICE +RESTART SERVICE +LIST USERS +SHOW USER +DROP USER +CREATE USER +ALTER USER PASSWORD +ALTER USER ACTIVE +LIST DATASETS OF +LIST AGENTS OF +CREATE ROLE +DROP ROLE +ALTER ROLE SET DESCRIPTION +LIST ROLES +SHOW ROLE +GRANT ON TO ROLE +REVOKE ON TO ROLE +ALTER USER SET ROLE +SHOW USER PERMISSION +SHOW VERSION +GRANT ADMIN +REVOKE ADMIN +GENERATE KEY FOR USER +LIST KEYS OF +DROP KEY OF Meta Commands: \?, \h, \help Show this help @@ -525,4 +593,3 @@ admin> \q command: \q Goodbye! ``` - diff --git a/test/testcases/test_admin_api/conftest.py b/test/testcases/test_admin_api/conftest.py new file mode 100644 index 000000000..45c9875f7 --- /dev/null +++ b/test/testcases/test_admin_api/conftest.py @@ -0,0 +1,120 @@ +# +# Copyright 2025 The InfiniFlow Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import os +import urllib.parse +from typing import Any + +import pytest +import requests +from configs import VERSION + +# Admin API runs on port 9381 +ADMIN_HOST_ADDRESS = os.getenv("ADMIN_HOST_ADDRESS", "http://127.0.0.1:9381") + +UNAUTHORIZED_ERROR_MESSAGE = "\n\n401 unauthorized\n

unauthorized

\n

the server could not verify that you are authorized to access the url requested. you either supplied the wrong credentials (e.g. a bad password), or your browser doesn't understand how to supply the credentials required.

\n" + +# password is "admin" +ENCRYPTED_ADMIN_PASSWORD: str = """WBPsJbL/W+1HN+hchm5pgu1YC3yMEb/9MFtsanZrpKEE9kAj4u09EIIVDtIDZhJOdTjz5pp5QW9TwqXBfQ2qzDqVJiwK7HGcNsoPi4wQPCmnLo0fs62QklMlg7l1Q7fjGRgV+KWtvNUce2PFzgrcAGDqRIuA/slSclKUEISEiK4z62rdDgvHT8LyuACuF1lPUY5wV0m/MbmGijRJlgvglAF8BX0BP8rQr8wZeaJdcnAy/keuODCjltMZDL06tYluN7HoiU+qlhBB+ltqG411oO/+vVhBgWsuVVOHd8uMjJEL320GUWUicprDUZvjlLaSSqVyyOiRMHpqAE9eHEecWg==""" + + +def admin_login(session: requests.Session, email: str = "admin@ragflow.io", password: str = "admin") -> str: + """Helper function to login as admin and return authorization token""" + url: str = f"{ADMIN_HOST_ADDRESS}/api/{VERSION}/admin/login" + response: requests.Response = session.post(url, json={"email": email, "password": ENCRYPTED_ADMIN_PASSWORD}) + res_json: dict[str, Any] = response.json() + if res_json.get("code") != 0: + raise Exception(res_json.get("message")) + # Admin login uses session cookies and Authorization header + # Set Authorization header for subsequent requests + auth: str = response.headers.get("Authorization", "") + if auth: + session.headers.update({"Authorization": auth}) + return auth + + +@pytest.fixture(scope="session") +def admin_session() -> requests.Session: + """Fixture to create an admin session with login""" + session: requests.Session = requests.Session() + try: + admin_login(session) + except Exception as e: + pytest.skip(f"Admin login failed: {e}") + return session + + +def generate_user_api_key(session: requests.Session, user_name: str) -> dict[str, Any]: + """Helper function to generate API key for a user + + Returns: + Dict containing the full API response with keys: code, message, data + """ + url: str = f"{ADMIN_HOST_ADDRESS}/api/{VERSION}/admin/users/{user_name}/new_token" + response: requests.Response = session.post(url) + + # Some error responses (e.g., 401) may return HTML instead of JSON. + try: + res_json: dict[str, Any] = response.json() + except requests.exceptions.JSONDecodeError: + return { + "code": response.status_code, + "message": response.text, + "data": None, + } + return res_json + + +def get_user_api_key(session: requests.Session, username: str) -> dict[str, Any]: + """Helper function to get API keys for a user + + Returns: + Dict containing the full API response with keys: code, message, data + """ + url: str = f"{ADMIN_HOST_ADDRESS}/api/{VERSION}/admin/users/{username}/token_list" + response: requests.Response = session.get(url) + + try: + res_json: dict[str, Any] = response.json() + except requests.exceptions.JSONDecodeError: + return { + "code": response.status_code, + "message": response.text, + "data": None, + } + return res_json + + +def delete_user_api_key(session: requests.Session, username: str, token: str) -> dict[str, Any]: + """Helper function to delete an API key for a user + + Returns: + Dict containing the full API response with keys: code, message, data + """ + # URL encode the token to handle special characters + encoded_token: str = urllib.parse.quote(token, safe="") + url: str = f"{ADMIN_HOST_ADDRESS}/api/{VERSION}/admin/users/{username}/token/{encoded_token}" + response: requests.Response = session.delete(url) + + try: + res_json: dict[str, Any] = response.json() + except requests.exceptions.JSONDecodeError: + return { + "code": response.status_code, + "message": response.text, + "data": None, + } + return res_json diff --git a/test/testcases/test_admin_api/test_user_api_key_management/test_delete_user_api_key.py b/test/testcases/test_admin_api/test_user_api_key_management/test_delete_user_api_key.py new file mode 100644 index 000000000..5e89f57a5 --- /dev/null +++ b/test/testcases/test_admin_api/test_user_api_key_management/test_delete_user_api_key.py @@ -0,0 +1,191 @@ +# +# Copyright 2025 The InfiniFlow Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +from typing import Any + +import pytest +import requests + +from conftest import delete_user_api_key, generate_user_api_key, get_user_api_key, UNAUTHORIZED_ERROR_MESSAGE +from common.constants import RetCode +from configs import EMAIL, HOST_ADDRESS, PASSWORD, VERSION + + +class TestDeleteUserApiKey: + @pytest.mark.p1 + def test_delete_user_api_key_success(self, admin_session: requests.Session) -> None: + """Test successfully deleting an API key for a user""" + user_name: str = EMAIL + + # Generate an API key first + generate_response: dict[str, Any] = generate_user_api_key(admin_session, user_name) + assert generate_response.get("code") == RetCode.SUCCESS, f"Generate should succeed, got code {generate_response.get('code')}" + generated_key: dict[str, Any] = generate_response["data"] + token: str = generated_key["token"] + + # Delete the API key + delete_response: dict[str, Any] = delete_user_api_key(admin_session, user_name, token) + + # Verify response + assert delete_response.get("code") == RetCode.SUCCESS, f"Delete should succeed, got code {delete_response.get('code')}" + assert "message" in delete_response, "Response should contain message" + message: str = delete_response.get("message", "") + assert message == "API key deleted successfully", f"Message should indicate success, got: {message}" + + @pytest.mark.p1 + def test_user_api_key_removed_from_list_after_deletion(self, admin_session: requests.Session) -> None: + """Test that deleted API key is removed from the list""" + user_name: str = EMAIL + + # Generate an API key + generate_response: dict[str, Any] = generate_user_api_key(admin_session, user_name) + assert generate_response.get("code") == RetCode.SUCCESS, f"Generate should succeed, got code {generate_response.get('code')}" + generated_key: dict[str, Any] = generate_response["data"] + token: str = generated_key["token"] + + # Verify the key exists in the list + get_response_before: dict[str, Any] = get_user_api_key(admin_session, user_name) + assert get_response_before.get("code") == RetCode.SUCCESS, f"Get should succeed, got code {get_response_before.get('code')}" + api_keys_before: list[dict[str, Any]] = get_response_before["data"] + token_found_before: bool = any(key.get("token") == token for key in api_keys_before) + assert token_found_before, "Generated API key should be in the list before deletion" + + # Delete the API key + delete_response: dict[str, Any] = delete_user_api_key(admin_session, user_name, token) + assert delete_response.get("code") == RetCode.SUCCESS, f"Delete should succeed, got code {delete_response.get('code')}" + + # Verify the key is no longer in the list + get_response_after: dict[str, Any] = get_user_api_key(admin_session, user_name) + assert get_response_after.get("code") == RetCode.SUCCESS, f"Get should succeed, got code {get_response_after.get('code')}" + api_keys_after: list[dict[str, Any]] = get_response_after["data"] + token_found_after: bool = any(key.get("token") == token for key in api_keys_after) + assert not token_found_after, "Deleted API key should not be in the list after deletion" + + @pytest.mark.p2 + def test_delete_user_api_key_response_structure(self, admin_session: requests.Session) -> None: + """Test that delete_user_api_key returns correct response structure""" + user_name: str = EMAIL + + # Generate an API key + generate_response: dict[str, Any] = generate_user_api_key(admin_session, user_name) + assert generate_response.get("code") == RetCode.SUCCESS, f"Generate should succeed, got code {generate_response.get('code')}" + token: str = generate_response["data"]["token"] + + # Delete the API key + delete_response: dict[str, Any] = delete_user_api_key(admin_session, user_name, token) + + # Verify response structure + assert delete_response.get("code") == RetCode.SUCCESS, f"Response code should be {RetCode.SUCCESS}, got {delete_response.get('code')}" + assert "message" in delete_response, "Response should contain message" + # Data can be None for delete operations + assert "data" in delete_response, "Response should contain data field" + + @pytest.mark.p2 + def test_delete_user_api_key_twice(self, admin_session: requests.Session) -> None: + """Test that deleting the same token twice behaves correctly""" + user_name: str = EMAIL + + # Generate an API key + generate_response: dict[str, Any] = generate_user_api_key(admin_session, user_name) + assert generate_response.get("code") == RetCode.SUCCESS, f"Generate should succeed, got code {generate_response.get('code')}" + token: str = generate_response["data"]["token"] + + # Delete the API key first time + delete_response1: dict[str, Any] = delete_user_api_key(admin_session, user_name, token) + assert delete_response1.get("code") == RetCode.SUCCESS, f"First delete should succeed, got code {delete_response1.get('code')}" + + # Try to delete the same token again + delete_response2: dict[str, Any] = delete_user_api_key(admin_session, user_name, token) + + # Second delete should fail since token no longer exists + assert delete_response2.get("code") == RetCode.NOT_FOUND, "Second delete should fail for already deleted token" + assert "message" in delete_response2, "Response should contain message" + + @pytest.mark.p2 + def test_delete_user_api_key_with_nonexistent_token(self, admin_session: requests.Session) -> None: + """Test deleting a non-existent API key fails""" + user_name: str = EMAIL + nonexistent_token: str = "ragflow-nonexistent-token-12345" + + # Try to delete a non-existent token + delete_response: dict[str, Any] = delete_user_api_key(admin_session, user_name, nonexistent_token) + + # Should return error + assert delete_response.get("code") == RetCode.NOT_FOUND, "Delete should fail for non-existent token" + assert "message" in delete_response, "Response should contain message" + message: str = delete_response.get("message", "") + assert message == "API key not found or could not be deleted", f"Message should indicate token not found, got: {message}" + + @pytest.mark.p2 + def test_delete_user_api_key_with_nonexistent_user(self, admin_session: requests.Session) -> None: + """Test deleting API key for non-existent user fails""" + nonexistent_user: str = "nonexistent_user_12345@example.com" + token: str = "ragflow-test-token-12345" + + # Try to delete token for non-existent user + delete_response: dict[str, Any] = delete_user_api_key(admin_session, nonexistent_user, token) + + # Should return error + assert delete_response.get("code") == RetCode.NOT_FOUND, "Delete should fail for non-existent user" + assert "message" in delete_response, "Response should contain message" + message: str = delete_response.get("message", "") + expected_message: str = f"User '{nonexistent_user}' not found" + assert message == expected_message, f"Message should indicate user not found, got: {message}" + + @pytest.mark.p2 + def test_delete_user_api_key_wrong_user_token(self, admin_session: requests.Session) -> None: + """Test that deleting a token belonging to another user fails""" + user_name: str = EMAIL + + # create second user + url: str = HOST_ADDRESS + f"/{VERSION}/user/register" + user2_email: str = "qa2@ragflow.io" + register_data: dict[str, str] = {"email": user2_email, "nickname": "qa2", "password": PASSWORD} + res: Any = requests.post(url=url, json=register_data) + res: dict[str, Any] = res.json() + if res.get("code") != 0 and "has already registered" not in res.get("message"): + raise Exception(f"Failed to create second user: {res.get("message")}") + + # Generate a token for the test user + generate_response: dict[str, Any] = generate_user_api_key(admin_session, user_name) + assert generate_response.get("code") == RetCode.SUCCESS, f"Generate should succeed, got code {generate_response.get('code')}" + token: str = generate_response["data"]["token"] + + # Try to delete with the second username + delete_response: dict[str, Any] = delete_user_api_key(admin_session, user2_email, token) + + # Should fail because user doesn't exist or token doesn't belong to that user + assert delete_response.get("code") == RetCode.NOT_FOUND, "Delete should fail for wrong user" + assert "message" in delete_response, "Response should contain message" + message: str = delete_response.get("message", "") + expected_message: str = "API key not found or could not be deleted" + assert message == expected_message, f"Message should indicate user not found, got: {message}" + + @pytest.mark.p3 + def test_delete_user_api_key_without_auth(self) -> None: + """Test that deleting API key without admin auth fails""" + session: requests.Session = requests.Session() + user_name: str = EMAIL + token: str = "ragflow-test-token-12345" + + response: dict[str, Any] = delete_user_api_key(session, user_name, token) + + # Verify error response + assert response.get("code") == RetCode.UNAUTHORIZED, "Response code should indicate error" + assert "message" in response, "Response should contain message" + message: str = response.get("message", "").lower() + # The message is an HTML string indicating unauthorized user. + assert message == UNAUTHORIZED_ERROR_MESSAGE diff --git a/test/testcases/test_admin_api/test_user_api_key_management/test_generate_user_api_key.py b/test/testcases/test_admin_api/test_user_api_key_management/test_generate_user_api_key.py new file mode 100644 index 000000000..3dc502967 --- /dev/null +++ b/test/testcases/test_admin_api/test_user_api_key_management/test_generate_user_api_key.py @@ -0,0 +1,232 @@ +# +# Copyright 2025 The InfiniFlow Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +from typing import Any, Dict, List + +import pytest +import requests + +from common.constants import RetCode +from conftest import generate_user_api_key, get_user_api_key, UNAUTHORIZED_ERROR_MESSAGE +from configs import EMAIL + + +class TestGenerateUserApiKey: + @pytest.mark.p1 + def test_generate_user_api_key_success(self, admin_session: requests.Session) -> None: + """Test successfully generating API key for a user""" + # Use the test user email (get_user_details expects email) + user_name: str = EMAIL + + # Generate API key + response: Dict[str, Any] = generate_user_api_key(admin_session, user_name) + + # Verify response code, message, and data + assert response.get("code") == RetCode.SUCCESS, f"Response code should be {RetCode.SUCCESS}, got {response.get('code')}" + assert "message" in response, "Response should contain message" + assert "data" in response, "Response should contain data" + assert response.get("data") is not None, "API key generation should return data" + + result: Dict[str, Any] = response["data"] + + # Verify response structure + assert "tenant_id" in result, "Response should contain tenant_id" + assert "token" in result, "Response should contain token" + assert "beta" in result, "Response should contain beta" + assert "create_time" in result, "Response should contain create_time" + assert "create_date" in result, "Response should contain create_date" + + # Verify token format (should start with "ragflow-") + token: str = result["token"] + assert isinstance(token, str), "Token should be a string" + assert len(token) > 0, "Token should not be empty" + + # Verify beta is independently generated + beta: str = result["beta"] + assert isinstance(beta, str), "Beta should be a string" + assert len(beta) == 32, "Beta should be 32 characters" + # Beta should be independent from token (not derived from it) + if token.startswith("ragflow-"): + token_without_prefix: str = token.replace("ragflow-", "")[:32] + assert beta != token_without_prefix, "Beta should be independently generated, not derived from token" + + @pytest.mark.p1 + def test_generate_user_api_key_appears_in_list(self, admin_session: requests.Session) -> None: + """Test that generated API key appears in get_user_api_key list""" + user_name: str = EMAIL + + # Generate API key + generate_response: Dict[str, Any] = generate_user_api_key(admin_session, user_name) + assert generate_response.get("code") == RetCode.SUCCESS, f"Generate should succeed, got code {generate_response.get('code')}" + generated_key: Dict[str, Any] = generate_response["data"] + token: str = generated_key["token"] + + # Get all API keys for the user + get_response: Dict[str, Any] = get_user_api_key(admin_session, user_name) + assert get_response.get("code") == RetCode.SUCCESS, f"Get should succeed, got code {get_response.get('code')}" + api_keys: List[Dict[str, Any]] = get_response["data"] + + # Verify the generated key is in the list + assert len(api_keys) > 0, "User should have at least one API key" + token_found: bool = any(key.get("token") == token for key in api_keys) + assert token_found, "Generated API key should appear in the list" + + @pytest.mark.p1 + def test_generate_user_api_key_response_structure(self, admin_session: requests.Session) -> None: + """Test that generate_user_api_key returns correct response structure""" + user_name: str = EMAIL + + response: Dict[str, Any] = generate_user_api_key(admin_session, user_name) + + # Verify response code, message, and data + assert response.get("code") == RetCode.SUCCESS, f"Response code should be {RetCode.SUCCESS}, got {response.get('code')}" + assert "message" in response, "Response should contain message" + assert "data" in response, "Response should contain data" + + result: Dict[str, Any] = response["data"] + + # Verify all required fields + assert "tenant_id" in result, "Response should have tenant_id" + assert "token" in result, "Response should have token" + assert "beta" in result, "Response should have beta" + assert "create_time" in result, "Response should have create_time" + assert "create_date" in result, "Response should have create_date" + assert "update_time" in result, "Response should have update_time" + assert "update_date" in result, "Response should have update_date" + + # Verify field types + assert isinstance(result["tenant_id"], str), "tenant_id should be string" + assert isinstance(result["token"], str), "token should be string" + assert isinstance(result["beta"], str), "beta should be string" + assert isinstance(result["create_time"], (int, type(None))), "create_time should be int or None" + assert isinstance(result["create_date"], (str, type(None))), "create_date should be string or None" + + @pytest.mark.p2 + def test_generate_user_api_key_multiple_times(self, admin_session: requests.Session) -> None: + """Test generating multiple API keys for the same user""" + user_name: str = EMAIL + + # Generate first API key + response1: Dict[str, Any] = generate_user_api_key(admin_session, user_name) + assert response1.get("code") == RetCode.SUCCESS, f"First generate should succeed, got code {response1.get('code')}" + key1: Dict[str, Any] = response1["data"] + token1: str = key1["token"] + + # Generate second API key + response2: Dict[str, Any] = generate_user_api_key(admin_session, user_name) + assert response2.get("code") == RetCode.SUCCESS, f"Second generate should succeed, got code {response2.get('code')}" + key2: Dict[str, Any] = response2["data"] + token2: str = key2["token"] + + # Tokens should be different + assert token1 != token2, "Multiple API keys should have different tokens" + + # Both should appear in the list + get_response: Dict[str, Any] = get_user_api_key(admin_session, user_name) + assert get_response.get("code") == RetCode.SUCCESS, f"Get should succeed, got code {get_response.get('code')}" + api_keys: List[Dict[str, Any]] = get_response["data"] + tokens: List[str] = [key.get("token") for key in api_keys] + assert token1 in tokens, "First token should be in the list" + assert token2 in tokens, "Second token should be in the list" + + @pytest.mark.p2 + def test_generate_user_api_key_nonexistent_user(self, admin_session: requests.Session) -> None: + """Test generating API key for non-existent user fails""" + response: Dict[str, Any] = generate_user_api_key(admin_session, "nonexistent_user_12345") + + # Verify error response + assert response.get("code") == RetCode.NOT_FOUND, "Response code should indicate error" + assert "message" in response, "Response should contain message" + message: str = response.get("message", "") + assert message == "User not found!", f"Message should indicate user not found, got: {message}" + + @pytest.mark.p2 + def test_generate_user_api_key_tenant_id_consistency(self, admin_session: requests.Session) -> None: + """Test that generated API keys have consistent tenant_id""" + user_name: str = EMAIL + + # Generate multiple API keys + response1: Dict[str, Any] = generate_user_api_key(admin_session, user_name) + assert response1.get("code") == RetCode.SUCCESS, f"First generate should succeed, got code {response1.get('code')}" + key1: Dict[str, Any] = response1["data"] + + response2: Dict[str, Any] = generate_user_api_key(admin_session, user_name) + assert response2.get("code") == RetCode.SUCCESS, f"Second generate should succeed, got code {response2.get('code')}" + key2: Dict[str, Any] = response2["data"] + + # Tenant IDs should be the same for the same user + assert key1["tenant_id"] == key2["tenant_id"], "Same user should have same tenant_id" + + @pytest.mark.p2 + def test_generate_user_api_key_token_format(self, admin_session: requests.Session) -> None: + """Test that generated API key has correct format""" + user_name: str = EMAIL + + response: Dict[str, Any] = generate_user_api_key(admin_session, user_name) + assert response.get("code") == RetCode.SUCCESS, f"Response code should be {RetCode.SUCCESS}, got {response.get('code')}" + result: Dict[str, Any] = response["data"] + token: str = result["token"] + + # Token should be a non-empty string + assert isinstance(token, str), "Token should be a string" + assert len(token) > 0, "Token should not be empty" + + # Beta should be independently generated (32 chars, not derived from token) + beta: str = result["beta"] + assert isinstance(beta, str), "Beta should be a string" + assert len(beta) == 32, "Beta should be 32 characters" + # Beta should be independent from token (not derived from it) + if token.startswith("ragflow-"): + token_without_prefix: str = token.replace("ragflow-", "")[:32] + assert beta != token_without_prefix, "Beta should be independently generated, not derived from token" + + @pytest.mark.p1 + def test_generate_user_api_key_without_auth(self) -> None: + """Test that generating API key without admin auth fails""" + session: requests.Session = requests.Session() + user_name: str = EMAIL + + response: Dict[str, Any] = generate_user_api_key(session, user_name) + + # Verify error response + assert response.get("code") == RetCode.UNAUTHORIZED, "Response code should indicate error" + assert "message" in response, "Response should contain message" + message: str = response.get("message", "").lower() + # The message is an HTML string indicating unauthorized user . + assert message == UNAUTHORIZED_ERROR_MESSAGE + + @pytest.mark.p3 + def test_generate_user_api_key_timestamp_fields(self, admin_session: requests.Session) -> None: + """Test that generated API key has correct timestamp fields""" + user_name: str = EMAIL + + response: Dict[str, Any] = generate_user_api_key(admin_session, user_name) + assert response.get("code") == RetCode.SUCCESS, f"Response code should be {RetCode.SUCCESS}, got {response.get('code')}" + result: Dict[str, Any] = response["data"] + + # create_time should be a timestamp (int) + create_time: Any = result.get("create_time") + assert create_time is None or isinstance(create_time, int), "create_time should be int or None" + if create_time is not None: + assert create_time > 0, "create_time should be positive" + + # create_date should be a date string + create_date: Any = result.get("create_date") + assert create_date is None or isinstance(create_date, str), "create_date should be string or None" + + # update_time and update_date should be None for new keys + assert result.get("update_time") is None, "update_time should be None for new keys" + assert result.get("update_date") is None, "update_date should be None for new keys" diff --git a/test/testcases/test_admin_api/test_user_api_key_management/test_get_user_api_key.py b/test/testcases/test_admin_api/test_user_api_key_management/test_get_user_api_key.py new file mode 100644 index 000000000..f2941a5af --- /dev/null +++ b/test/testcases/test_admin_api/test_user_api_key_management/test_get_user_api_key.py @@ -0,0 +1,169 @@ +# +# Copyright 2025 The InfiniFlow Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +from typing import Any, Dict, List + +import pytest +import requests + +from conftest import generate_user_api_key, get_user_api_key, UNAUTHORIZED_ERROR_MESSAGE +from common.constants import RetCode +from configs import EMAIL + + +class TestGetUserApiKey: + @pytest.mark.p1 + def test_get_user_api_key_success(self, admin_session: requests.Session) -> None: + """Test successfully getting API keys for a user with correct response structure""" + user_name: str = EMAIL + + # Generate a test API key first + generate_response: Dict[str, Any] = generate_user_api_key(admin_session, user_name) + assert generate_response["code"] == RetCode.SUCCESS, generate_response + generated_key: Dict[str, Any] = generate_response["data"] + generated_token: str = generated_key["token"] + + # Get all API keys for the user + get_response: Dict[str, Any] = get_user_api_key(admin_session, user_name) + assert get_response["code"] == RetCode.SUCCESS, get_response + assert "message" in get_response, "Response should contain message" + assert "data" in get_response, "Response should contain data" + + api_keys: List[Dict[str, Any]] = get_response["data"] + + # Verify response is a list with at least one key + assert isinstance(api_keys, list), "API keys should be returned as a list" + assert len(api_keys) > 0, "User should have at least one API key" + + # Verify structure of each API key + for key in api_keys: + assert isinstance(key, dict), "Each API key should be a dictionary" + assert "token" in key, "API key should contain token" + assert "beta" in key, "API key should contain beta" + assert "tenant_id" in key, "API key should contain tenant_id" + assert "create_date" in key, "API key should contain create_date" + + # Verify field types + assert isinstance(key["token"], str), "token should be string" + assert isinstance(key["beta"], str), "beta should be string" + assert isinstance(key["tenant_id"], str), "tenant_id should be string" + assert isinstance(key.get("create_date"), (str, type(None))), "create_date should be string or None" + assert isinstance(key.get("update_date"), (str, type(None))), "update_date should be string or None" + + # Verify the generated key is in the list + token_found: bool = any(key.get("token") == generated_token for key in api_keys) + assert token_found, "Generated API key should appear in the list" + + @pytest.mark.p2 + def test_get_user_api_key_nonexistent_user(self, admin_session: requests.Session) -> None: + """Test getting API keys for non-existent user fails""" + nonexistent_user: str = "nonexistent_user_12345" + response: Dict[str, Any] = get_user_api_key(admin_session, nonexistent_user) + + assert response["code"] == RetCode.NOT_FOUND, response + assert "message" in response, "Response should contain message" + message: str = response["message"] + expected_message: str = f"User '{nonexistent_user}' not found" + assert message == expected_message, f"Message should indicate user not found, got: {message}" + + @pytest.mark.p2 + def test_get_user_api_key_empty_username(self, admin_session: requests.Session) -> None: + """Test getting API keys with empty username""" + response: Dict[str, Any] = get_user_api_key(admin_session, "") + + # Empty username should either return error or empty list + if response["code"] == RetCode.SUCCESS: + assert "data" in response, "Response should contain data" + api_keys: List[Dict[str, Any]] = response["data"] + assert isinstance(api_keys, list), "Should return a list" + assert len(api_keys) == 0, "Empty username should return empty list" + else: + assert "message" in response, "Error response should contain message" + assert len(response["message"]) > 0, "Error message should not be empty" + + @pytest.mark.p2 + def test_get_user_api_key_token_uniqueness(self, admin_session: requests.Session) -> None: + """Test that all API keys in the list have unique tokens""" + user_name: str = EMAIL + + # Generate multiple API keys + response1: Dict[str, Any] = generate_user_api_key(admin_session, user_name) + assert response1["code"] == RetCode.SUCCESS, response1 + response2: Dict[str, Any] = generate_user_api_key(admin_session, user_name) + assert response2["code"] == RetCode.SUCCESS, response2 + + # Get all API keys + get_response: Dict[str, Any] = get_user_api_key(admin_session, user_name) + assert get_response["code"] == RetCode.SUCCESS, get_response + api_keys: List[Dict[str, Any]] = get_response["data"] + + # Verify all tokens are unique + tokens: List[str] = [key.get("token") for key in api_keys if key.get("token")] + assert len(tokens) == len(set(tokens)), "All API keys should have unique tokens" + + @pytest.mark.p2 + def test_get_user_api_key_tenant_id_consistency(self, admin_session: requests.Session) -> None: + """Test that all API keys for a user have the same tenant_id""" + user_name: str = EMAIL + + # Generate multiple API keys + response1: Dict[str, Any] = generate_user_api_key(admin_session, user_name) + assert response1["code"] == RetCode.SUCCESS, response1 + response2: Dict[str, Any] = generate_user_api_key(admin_session, user_name) + assert response2["code"] == RetCode.SUCCESS, response2 + + # Get all API keys + get_response: Dict[str, Any] = get_user_api_key(admin_session, user_name) + assert get_response["code"] == RetCode.SUCCESS, get_response + api_keys: List[Dict[str, Any]] = get_response["data"] + + # Verify all keys have the same tenant_id + tenant_ids: List[str] = [key.get("tenant_id") for key in api_keys if key.get("tenant_id")] + if len(tenant_ids) > 0: + assert all(tid == tenant_ids[0] for tid in tenant_ids), "All API keys should have the same tenant_id" + + @pytest.mark.p2 + def test_get_user_api_key_beta_format(self, admin_session: requests.Session) -> None: + """Test that beta field in API keys has correct format (32 characters)""" + user_name: str = EMAIL + + # Generate a test API key + generate_response: Dict[str, Any] = generate_user_api_key(admin_session, user_name) + assert generate_response["code"] == RetCode.SUCCESS, generate_response + + # Get all API keys + get_response: Dict[str, Any] = get_user_api_key(admin_session, user_name) + assert get_response["code"] == RetCode.SUCCESS, get_response + api_keys: List[Dict[str, Any]] = get_response["data"] + + # Verify beta format for all keys + for key in api_keys: + beta: str = key.get("beta", "") + assert isinstance(beta, str), "beta should be a string" + assert len(beta) == 32, f"beta should be 32 characters, got {len(beta)}" + + @pytest.mark.p3 + def test_get_user_api_key_without_auth(self) -> None: + """Test that getting API keys without admin auth fails""" + session: requests.Session = requests.Session() + user_name: str = EMAIL + + response: Dict[str, Any] = get_user_api_key(session, user_name) + + assert response["code"] == RetCode.UNAUTHORIZED, response + assert "message" in response, "Response should contain message" + message: str = response["message"].lower() + assert message == UNAUTHORIZED_ERROR_MESSAGE