Add User API Token Management to Admin API and CLI (#12595)

## Summary

This PR extends the RAGFlow Admin API and CLI with comprehensive user
API token management capabilities. Administrators can now generate,
list, and delete API tokens for users through both the REST API and the
Admin CLI interface.

## Changes

### Backend API (`admin/server/`)

#### New Endpoints
- **POST `/api/v1/admin/users/<username>/new_token`** - Generate a new
API token for a user
- **GET `/api/v1/admin/users/<username>/token_list`** - List all API
tokens for a user
- **DELETE `/api/v1/admin/users/<username>/token/<token>`** - Delete a
specific API token for a user

#### Service Layer Updates (`services.py`)
- Added `get_user_api_key(username)` - Retrieves all API tokens for a
user
- Added `save_api_token(api_token)` - Saves a new API token to the
database
- Added `delete_api_token(username, token)` - Deletes an API token for a
user

### Admin CLI (`admin/client/`)

#### New Commands
- **`GENERATE TOKEN FOR USER <username>;`** - Generate a new API token
for the specified user
- **`LIST TOKENS OF <username>;`** - List all API tokens associated with
a user
- **`DROP TOKEN <token> OF <username>;`** - Delete a specific API token
for a user

### Testing

Added comprehensive test suite in `test/testcases/test_admin_api/`:
- **`test_generate_user_api_key.py`** - Tests for API token generation
- **`test_get_user_api_key.py`** - Tests for listing user API tokens
- **`test_delete_user_api_key.py`** - Tests for deleting API tokens
- **`conftest.py`** - Shared test fixtures and utilities

## Technical Details

### Token Generation
- Tokens are generated using `generate_confirmation_token()` utility
- Each token includes metadata: `tenant_id`, `token`, `beta`,
`create_time`, `create_date`
- Tokens are associated with user tenants automatically

### Security Considerations
- All endpoints require admin authentication (`@check_admin_auth`)
- Tokens are URL-encoded when passed in DELETE requests to handle
special characters
- Proper error handling for unauthorized access and missing resources

### API Response Format
All endpoints follow the standard RAGFlow response format:
```json
{
  "code": 0,
  "data": {...},
  "message": "Success message"
}
```

## Files Changed

- `admin/client/admin_client.py` - CLI token management commands
- `admin/server/routes.py` - New API endpoints
- `admin/server/services.py` - Token management service methods
- `docs/guides/admin/admin_cli.md` - CLI documentation updates
- `test/testcases/test_admin_api/conftest.py` - Test fixtures
- `test/testcases/test_admin_api/test_user_api_key_management/*` - Test
suites

### Type of change

- [x] New Feature (non-breaking change which adds functionality)

---------

Co-authored-by: Alexander Strasser <alexander.strasser@ondewo.com>
Co-authored-by: Hetavi Shah <your.email@example.com>
This commit is contained in:
Hetavi Shah
2026-01-17 12:51:00 +05:30
committed by GitHub
parent bd9163904a
commit 46305ef35e
8 changed files with 1124 additions and 141 deletions

View File

@ -0,0 +1,120 @@
#
# Copyright 2025 The InfiniFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import os
import urllib.parse
from typing import Any
import pytest
import requests
from configs import VERSION
# Admin API runs on port 9381
ADMIN_HOST_ADDRESS = os.getenv("ADMIN_HOST_ADDRESS", "http://127.0.0.1:9381")
UNAUTHORIZED_ERROR_MESSAGE = "<!doctype html>\n<html lang=en>\n<title>401 unauthorized</title>\n<h1>unauthorized</h1>\n<p>the server could not verify that you are authorized to access the url requested. you either supplied the wrong credentials (e.g. a bad password), or your browser doesn&#39;t understand how to supply the credentials required.</p>\n"
# password is "admin"
ENCRYPTED_ADMIN_PASSWORD: str = """WBPsJbL/W+1HN+hchm5pgu1YC3yMEb/9MFtsanZrpKEE9kAj4u09EIIVDtIDZhJOdTjz5pp5QW9TwqXBfQ2qzDqVJiwK7HGcNsoPi4wQPCmnLo0fs62QklMlg7l1Q7fjGRgV+KWtvNUce2PFzgrcAGDqRIuA/slSclKUEISEiK4z62rdDgvHT8LyuACuF1lPUY5wV0m/MbmGijRJlgvglAF8BX0BP8rQr8wZeaJdcnAy/keuODCjltMZDL06tYluN7HoiU+qlhBB+ltqG411oO/+vVhBgWsuVVOHd8uMjJEL320GUWUicprDUZvjlLaSSqVyyOiRMHpqAE9eHEecWg=="""
def admin_login(session: requests.Session, email: str = "admin@ragflow.io", password: str = "admin") -> str:
"""Helper function to login as admin and return authorization token"""
url: str = f"{ADMIN_HOST_ADDRESS}/api/{VERSION}/admin/login"
response: requests.Response = session.post(url, json={"email": email, "password": ENCRYPTED_ADMIN_PASSWORD})
res_json: dict[str, Any] = response.json()
if res_json.get("code") != 0:
raise Exception(res_json.get("message"))
# Admin login uses session cookies and Authorization header
# Set Authorization header for subsequent requests
auth: str = response.headers.get("Authorization", "")
if auth:
session.headers.update({"Authorization": auth})
return auth
@pytest.fixture(scope="session")
def admin_session() -> requests.Session:
"""Fixture to create an admin session with login"""
session: requests.Session = requests.Session()
try:
admin_login(session)
except Exception as e:
pytest.skip(f"Admin login failed: {e}")
return session
def generate_user_api_key(session: requests.Session, user_name: str) -> dict[str, Any]:
"""Helper function to generate API key for a user
Returns:
Dict containing the full API response with keys: code, message, data
"""
url: str = f"{ADMIN_HOST_ADDRESS}/api/{VERSION}/admin/users/{user_name}/new_token"
response: requests.Response = session.post(url)
# Some error responses (e.g., 401) may return HTML instead of JSON.
try:
res_json: dict[str, Any] = response.json()
except requests.exceptions.JSONDecodeError:
return {
"code": response.status_code,
"message": response.text,
"data": None,
}
return res_json
def get_user_api_key(session: requests.Session, username: str) -> dict[str, Any]:
"""Helper function to get API keys for a user
Returns:
Dict containing the full API response with keys: code, message, data
"""
url: str = f"{ADMIN_HOST_ADDRESS}/api/{VERSION}/admin/users/{username}/token_list"
response: requests.Response = session.get(url)
try:
res_json: dict[str, Any] = response.json()
except requests.exceptions.JSONDecodeError:
return {
"code": response.status_code,
"message": response.text,
"data": None,
}
return res_json
def delete_user_api_key(session: requests.Session, username: str, token: str) -> dict[str, Any]:
"""Helper function to delete an API key for a user
Returns:
Dict containing the full API response with keys: code, message, data
"""
# URL encode the token to handle special characters
encoded_token: str = urllib.parse.quote(token, safe="")
url: str = f"{ADMIN_HOST_ADDRESS}/api/{VERSION}/admin/users/{username}/token/{encoded_token}"
response: requests.Response = session.delete(url)
try:
res_json: dict[str, Any] = response.json()
except requests.exceptions.JSONDecodeError:
return {
"code": response.status_code,
"message": response.text,
"data": None,
}
return res_json

View File

@ -0,0 +1,191 @@
#
# Copyright 2025 The InfiniFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from typing import Any
import pytest
import requests
from conftest import delete_user_api_key, generate_user_api_key, get_user_api_key, UNAUTHORIZED_ERROR_MESSAGE
from common.constants import RetCode
from configs import EMAIL, HOST_ADDRESS, PASSWORD, VERSION
class TestDeleteUserApiKey:
@pytest.mark.p1
def test_delete_user_api_key_success(self, admin_session: requests.Session) -> None:
"""Test successfully deleting an API key for a user"""
user_name: str = EMAIL
# Generate an API key first
generate_response: dict[str, Any] = generate_user_api_key(admin_session, user_name)
assert generate_response.get("code") == RetCode.SUCCESS, f"Generate should succeed, got code {generate_response.get('code')}"
generated_key: dict[str, Any] = generate_response["data"]
token: str = generated_key["token"]
# Delete the API key
delete_response: dict[str, Any] = delete_user_api_key(admin_session, user_name, token)
# Verify response
assert delete_response.get("code") == RetCode.SUCCESS, f"Delete should succeed, got code {delete_response.get('code')}"
assert "message" in delete_response, "Response should contain message"
message: str = delete_response.get("message", "")
assert message == "API key deleted successfully", f"Message should indicate success, got: {message}"
@pytest.mark.p1
def test_user_api_key_removed_from_list_after_deletion(self, admin_session: requests.Session) -> None:
"""Test that deleted API key is removed from the list"""
user_name: str = EMAIL
# Generate an API key
generate_response: dict[str, Any] = generate_user_api_key(admin_session, user_name)
assert generate_response.get("code") == RetCode.SUCCESS, f"Generate should succeed, got code {generate_response.get('code')}"
generated_key: dict[str, Any] = generate_response["data"]
token: str = generated_key["token"]
# Verify the key exists in the list
get_response_before: dict[str, Any] = get_user_api_key(admin_session, user_name)
assert get_response_before.get("code") == RetCode.SUCCESS, f"Get should succeed, got code {get_response_before.get('code')}"
api_keys_before: list[dict[str, Any]] = get_response_before["data"]
token_found_before: bool = any(key.get("token") == token for key in api_keys_before)
assert token_found_before, "Generated API key should be in the list before deletion"
# Delete the API key
delete_response: dict[str, Any] = delete_user_api_key(admin_session, user_name, token)
assert delete_response.get("code") == RetCode.SUCCESS, f"Delete should succeed, got code {delete_response.get('code')}"
# Verify the key is no longer in the list
get_response_after: dict[str, Any] = get_user_api_key(admin_session, user_name)
assert get_response_after.get("code") == RetCode.SUCCESS, f"Get should succeed, got code {get_response_after.get('code')}"
api_keys_after: list[dict[str, Any]] = get_response_after["data"]
token_found_after: bool = any(key.get("token") == token for key in api_keys_after)
assert not token_found_after, "Deleted API key should not be in the list after deletion"
@pytest.mark.p2
def test_delete_user_api_key_response_structure(self, admin_session: requests.Session) -> None:
"""Test that delete_user_api_key returns correct response structure"""
user_name: str = EMAIL
# Generate an API key
generate_response: dict[str, Any] = generate_user_api_key(admin_session, user_name)
assert generate_response.get("code") == RetCode.SUCCESS, f"Generate should succeed, got code {generate_response.get('code')}"
token: str = generate_response["data"]["token"]
# Delete the API key
delete_response: dict[str, Any] = delete_user_api_key(admin_session, user_name, token)
# Verify response structure
assert delete_response.get("code") == RetCode.SUCCESS, f"Response code should be {RetCode.SUCCESS}, got {delete_response.get('code')}"
assert "message" in delete_response, "Response should contain message"
# Data can be None for delete operations
assert "data" in delete_response, "Response should contain data field"
@pytest.mark.p2
def test_delete_user_api_key_twice(self, admin_session: requests.Session) -> None:
"""Test that deleting the same token twice behaves correctly"""
user_name: str = EMAIL
# Generate an API key
generate_response: dict[str, Any] = generate_user_api_key(admin_session, user_name)
assert generate_response.get("code") == RetCode.SUCCESS, f"Generate should succeed, got code {generate_response.get('code')}"
token: str = generate_response["data"]["token"]
# Delete the API key first time
delete_response1: dict[str, Any] = delete_user_api_key(admin_session, user_name, token)
assert delete_response1.get("code") == RetCode.SUCCESS, f"First delete should succeed, got code {delete_response1.get('code')}"
# Try to delete the same token again
delete_response2: dict[str, Any] = delete_user_api_key(admin_session, user_name, token)
# Second delete should fail since token no longer exists
assert delete_response2.get("code") == RetCode.NOT_FOUND, "Second delete should fail for already deleted token"
assert "message" in delete_response2, "Response should contain message"
@pytest.mark.p2
def test_delete_user_api_key_with_nonexistent_token(self, admin_session: requests.Session) -> None:
"""Test deleting a non-existent API key fails"""
user_name: str = EMAIL
nonexistent_token: str = "ragflow-nonexistent-token-12345"
# Try to delete a non-existent token
delete_response: dict[str, Any] = delete_user_api_key(admin_session, user_name, nonexistent_token)
# Should return error
assert delete_response.get("code") == RetCode.NOT_FOUND, "Delete should fail for non-existent token"
assert "message" in delete_response, "Response should contain message"
message: str = delete_response.get("message", "")
assert message == "API key not found or could not be deleted", f"Message should indicate token not found, got: {message}"
@pytest.mark.p2
def test_delete_user_api_key_with_nonexistent_user(self, admin_session: requests.Session) -> None:
"""Test deleting API key for non-existent user fails"""
nonexistent_user: str = "nonexistent_user_12345@example.com"
token: str = "ragflow-test-token-12345"
# Try to delete token for non-existent user
delete_response: dict[str, Any] = delete_user_api_key(admin_session, nonexistent_user, token)
# Should return error
assert delete_response.get("code") == RetCode.NOT_FOUND, "Delete should fail for non-existent user"
assert "message" in delete_response, "Response should contain message"
message: str = delete_response.get("message", "")
expected_message: str = f"User '{nonexistent_user}' not found"
assert message == expected_message, f"Message should indicate user not found, got: {message}"
@pytest.mark.p2
def test_delete_user_api_key_wrong_user_token(self, admin_session: requests.Session) -> None:
"""Test that deleting a token belonging to another user fails"""
user_name: str = EMAIL
# create second user
url: str = HOST_ADDRESS + f"/{VERSION}/user/register"
user2_email: str = "qa2@ragflow.io"
register_data: dict[str, str] = {"email": user2_email, "nickname": "qa2", "password": PASSWORD}
res: Any = requests.post(url=url, json=register_data)
res: dict[str, Any] = res.json()
if res.get("code") != 0 and "has already registered" not in res.get("message"):
raise Exception(f"Failed to create second user: {res.get("message")}")
# Generate a token for the test user
generate_response: dict[str, Any] = generate_user_api_key(admin_session, user_name)
assert generate_response.get("code") == RetCode.SUCCESS, f"Generate should succeed, got code {generate_response.get('code')}"
token: str = generate_response["data"]["token"]
# Try to delete with the second username
delete_response: dict[str, Any] = delete_user_api_key(admin_session, user2_email, token)
# Should fail because user doesn't exist or token doesn't belong to that user
assert delete_response.get("code") == RetCode.NOT_FOUND, "Delete should fail for wrong user"
assert "message" in delete_response, "Response should contain message"
message: str = delete_response.get("message", "")
expected_message: str = "API key not found or could not be deleted"
assert message == expected_message, f"Message should indicate user not found, got: {message}"
@pytest.mark.p3
def test_delete_user_api_key_without_auth(self) -> None:
"""Test that deleting API key without admin auth fails"""
session: requests.Session = requests.Session()
user_name: str = EMAIL
token: str = "ragflow-test-token-12345"
response: dict[str, Any] = delete_user_api_key(session, user_name, token)
# Verify error response
assert response.get("code") == RetCode.UNAUTHORIZED, "Response code should indicate error"
assert "message" in response, "Response should contain message"
message: str = response.get("message", "").lower()
# The message is an HTML string indicating unauthorized user.
assert message == UNAUTHORIZED_ERROR_MESSAGE

View File

@ -0,0 +1,232 @@
#
# Copyright 2025 The InfiniFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from typing import Any, Dict, List
import pytest
import requests
from common.constants import RetCode
from conftest import generate_user_api_key, get_user_api_key, UNAUTHORIZED_ERROR_MESSAGE
from configs import EMAIL
class TestGenerateUserApiKey:
@pytest.mark.p1
def test_generate_user_api_key_success(self, admin_session: requests.Session) -> None:
"""Test successfully generating API key for a user"""
# Use the test user email (get_user_details expects email)
user_name: str = EMAIL
# Generate API key
response: Dict[str, Any] = generate_user_api_key(admin_session, user_name)
# Verify response code, message, and data
assert response.get("code") == RetCode.SUCCESS, f"Response code should be {RetCode.SUCCESS}, got {response.get('code')}"
assert "message" in response, "Response should contain message"
assert "data" in response, "Response should contain data"
assert response.get("data") is not None, "API key generation should return data"
result: Dict[str, Any] = response["data"]
# Verify response structure
assert "tenant_id" in result, "Response should contain tenant_id"
assert "token" in result, "Response should contain token"
assert "beta" in result, "Response should contain beta"
assert "create_time" in result, "Response should contain create_time"
assert "create_date" in result, "Response should contain create_date"
# Verify token format (should start with "ragflow-")
token: str = result["token"]
assert isinstance(token, str), "Token should be a string"
assert len(token) > 0, "Token should not be empty"
# Verify beta is independently generated
beta: str = result["beta"]
assert isinstance(beta, str), "Beta should be a string"
assert len(beta) == 32, "Beta should be 32 characters"
# Beta should be independent from token (not derived from it)
if token.startswith("ragflow-"):
token_without_prefix: str = token.replace("ragflow-", "")[:32]
assert beta != token_without_prefix, "Beta should be independently generated, not derived from token"
@pytest.mark.p1
def test_generate_user_api_key_appears_in_list(self, admin_session: requests.Session) -> None:
"""Test that generated API key appears in get_user_api_key list"""
user_name: str = EMAIL
# Generate API key
generate_response: Dict[str, Any] = generate_user_api_key(admin_session, user_name)
assert generate_response.get("code") == RetCode.SUCCESS, f"Generate should succeed, got code {generate_response.get('code')}"
generated_key: Dict[str, Any] = generate_response["data"]
token: str = generated_key["token"]
# Get all API keys for the user
get_response: Dict[str, Any] = get_user_api_key(admin_session, user_name)
assert get_response.get("code") == RetCode.SUCCESS, f"Get should succeed, got code {get_response.get('code')}"
api_keys: List[Dict[str, Any]] = get_response["data"]
# Verify the generated key is in the list
assert len(api_keys) > 0, "User should have at least one API key"
token_found: bool = any(key.get("token") == token for key in api_keys)
assert token_found, "Generated API key should appear in the list"
@pytest.mark.p1
def test_generate_user_api_key_response_structure(self, admin_session: requests.Session) -> None:
"""Test that generate_user_api_key returns correct response structure"""
user_name: str = EMAIL
response: Dict[str, Any] = generate_user_api_key(admin_session, user_name)
# Verify response code, message, and data
assert response.get("code") == RetCode.SUCCESS, f"Response code should be {RetCode.SUCCESS}, got {response.get('code')}"
assert "message" in response, "Response should contain message"
assert "data" in response, "Response should contain data"
result: Dict[str, Any] = response["data"]
# Verify all required fields
assert "tenant_id" in result, "Response should have tenant_id"
assert "token" in result, "Response should have token"
assert "beta" in result, "Response should have beta"
assert "create_time" in result, "Response should have create_time"
assert "create_date" in result, "Response should have create_date"
assert "update_time" in result, "Response should have update_time"
assert "update_date" in result, "Response should have update_date"
# Verify field types
assert isinstance(result["tenant_id"], str), "tenant_id should be string"
assert isinstance(result["token"], str), "token should be string"
assert isinstance(result["beta"], str), "beta should be string"
assert isinstance(result["create_time"], (int, type(None))), "create_time should be int or None"
assert isinstance(result["create_date"], (str, type(None))), "create_date should be string or None"
@pytest.mark.p2
def test_generate_user_api_key_multiple_times(self, admin_session: requests.Session) -> None:
"""Test generating multiple API keys for the same user"""
user_name: str = EMAIL
# Generate first API key
response1: Dict[str, Any] = generate_user_api_key(admin_session, user_name)
assert response1.get("code") == RetCode.SUCCESS, f"First generate should succeed, got code {response1.get('code')}"
key1: Dict[str, Any] = response1["data"]
token1: str = key1["token"]
# Generate second API key
response2: Dict[str, Any] = generate_user_api_key(admin_session, user_name)
assert response2.get("code") == RetCode.SUCCESS, f"Second generate should succeed, got code {response2.get('code')}"
key2: Dict[str, Any] = response2["data"]
token2: str = key2["token"]
# Tokens should be different
assert token1 != token2, "Multiple API keys should have different tokens"
# Both should appear in the list
get_response: Dict[str, Any] = get_user_api_key(admin_session, user_name)
assert get_response.get("code") == RetCode.SUCCESS, f"Get should succeed, got code {get_response.get('code')}"
api_keys: List[Dict[str, Any]] = get_response["data"]
tokens: List[str] = [key.get("token") for key in api_keys]
assert token1 in tokens, "First token should be in the list"
assert token2 in tokens, "Second token should be in the list"
@pytest.mark.p2
def test_generate_user_api_key_nonexistent_user(self, admin_session: requests.Session) -> None:
"""Test generating API key for non-existent user fails"""
response: Dict[str, Any] = generate_user_api_key(admin_session, "nonexistent_user_12345")
# Verify error response
assert response.get("code") == RetCode.NOT_FOUND, "Response code should indicate error"
assert "message" in response, "Response should contain message"
message: str = response.get("message", "")
assert message == "User not found!", f"Message should indicate user not found, got: {message}"
@pytest.mark.p2
def test_generate_user_api_key_tenant_id_consistency(self, admin_session: requests.Session) -> None:
"""Test that generated API keys have consistent tenant_id"""
user_name: str = EMAIL
# Generate multiple API keys
response1: Dict[str, Any] = generate_user_api_key(admin_session, user_name)
assert response1.get("code") == RetCode.SUCCESS, f"First generate should succeed, got code {response1.get('code')}"
key1: Dict[str, Any] = response1["data"]
response2: Dict[str, Any] = generate_user_api_key(admin_session, user_name)
assert response2.get("code") == RetCode.SUCCESS, f"Second generate should succeed, got code {response2.get('code')}"
key2: Dict[str, Any] = response2["data"]
# Tenant IDs should be the same for the same user
assert key1["tenant_id"] == key2["tenant_id"], "Same user should have same tenant_id"
@pytest.mark.p2
def test_generate_user_api_key_token_format(self, admin_session: requests.Session) -> None:
"""Test that generated API key has correct format"""
user_name: str = EMAIL
response: Dict[str, Any] = generate_user_api_key(admin_session, user_name)
assert response.get("code") == RetCode.SUCCESS, f"Response code should be {RetCode.SUCCESS}, got {response.get('code')}"
result: Dict[str, Any] = response["data"]
token: str = result["token"]
# Token should be a non-empty string
assert isinstance(token, str), "Token should be a string"
assert len(token) > 0, "Token should not be empty"
# Beta should be independently generated (32 chars, not derived from token)
beta: str = result["beta"]
assert isinstance(beta, str), "Beta should be a string"
assert len(beta) == 32, "Beta should be 32 characters"
# Beta should be independent from token (not derived from it)
if token.startswith("ragflow-"):
token_without_prefix: str = token.replace("ragflow-", "")[:32]
assert beta != token_without_prefix, "Beta should be independently generated, not derived from token"
@pytest.mark.p1
def test_generate_user_api_key_without_auth(self) -> None:
"""Test that generating API key without admin auth fails"""
session: requests.Session = requests.Session()
user_name: str = EMAIL
response: Dict[str, Any] = generate_user_api_key(session, user_name)
# Verify error response
assert response.get("code") == RetCode.UNAUTHORIZED, "Response code should indicate error"
assert "message" in response, "Response should contain message"
message: str = response.get("message", "").lower()
# The message is an HTML string indicating unauthorized user .
assert message == UNAUTHORIZED_ERROR_MESSAGE
@pytest.mark.p3
def test_generate_user_api_key_timestamp_fields(self, admin_session: requests.Session) -> None:
"""Test that generated API key has correct timestamp fields"""
user_name: str = EMAIL
response: Dict[str, Any] = generate_user_api_key(admin_session, user_name)
assert response.get("code") == RetCode.SUCCESS, f"Response code should be {RetCode.SUCCESS}, got {response.get('code')}"
result: Dict[str, Any] = response["data"]
# create_time should be a timestamp (int)
create_time: Any = result.get("create_time")
assert create_time is None or isinstance(create_time, int), "create_time should be int or None"
if create_time is not None:
assert create_time > 0, "create_time should be positive"
# create_date should be a date string
create_date: Any = result.get("create_date")
assert create_date is None or isinstance(create_date, str), "create_date should be string or None"
# update_time and update_date should be None for new keys
assert result.get("update_time") is None, "update_time should be None for new keys"
assert result.get("update_date") is None, "update_date should be None for new keys"

View File

@ -0,0 +1,169 @@
#
# Copyright 2025 The InfiniFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from typing import Any, Dict, List
import pytest
import requests
from conftest import generate_user_api_key, get_user_api_key, UNAUTHORIZED_ERROR_MESSAGE
from common.constants import RetCode
from configs import EMAIL
class TestGetUserApiKey:
@pytest.mark.p1
def test_get_user_api_key_success(self, admin_session: requests.Session) -> None:
"""Test successfully getting API keys for a user with correct response structure"""
user_name: str = EMAIL
# Generate a test API key first
generate_response: Dict[str, Any] = generate_user_api_key(admin_session, user_name)
assert generate_response["code"] == RetCode.SUCCESS, generate_response
generated_key: Dict[str, Any] = generate_response["data"]
generated_token: str = generated_key["token"]
# Get all API keys for the user
get_response: Dict[str, Any] = get_user_api_key(admin_session, user_name)
assert get_response["code"] == RetCode.SUCCESS, get_response
assert "message" in get_response, "Response should contain message"
assert "data" in get_response, "Response should contain data"
api_keys: List[Dict[str, Any]] = get_response["data"]
# Verify response is a list with at least one key
assert isinstance(api_keys, list), "API keys should be returned as a list"
assert len(api_keys) > 0, "User should have at least one API key"
# Verify structure of each API key
for key in api_keys:
assert isinstance(key, dict), "Each API key should be a dictionary"
assert "token" in key, "API key should contain token"
assert "beta" in key, "API key should contain beta"
assert "tenant_id" in key, "API key should contain tenant_id"
assert "create_date" in key, "API key should contain create_date"
# Verify field types
assert isinstance(key["token"], str), "token should be string"
assert isinstance(key["beta"], str), "beta should be string"
assert isinstance(key["tenant_id"], str), "tenant_id should be string"
assert isinstance(key.get("create_date"), (str, type(None))), "create_date should be string or None"
assert isinstance(key.get("update_date"), (str, type(None))), "update_date should be string or None"
# Verify the generated key is in the list
token_found: bool = any(key.get("token") == generated_token for key in api_keys)
assert token_found, "Generated API key should appear in the list"
@pytest.mark.p2
def test_get_user_api_key_nonexistent_user(self, admin_session: requests.Session) -> None:
"""Test getting API keys for non-existent user fails"""
nonexistent_user: str = "nonexistent_user_12345"
response: Dict[str, Any] = get_user_api_key(admin_session, nonexistent_user)
assert response["code"] == RetCode.NOT_FOUND, response
assert "message" in response, "Response should contain message"
message: str = response["message"]
expected_message: str = f"User '{nonexistent_user}' not found"
assert message == expected_message, f"Message should indicate user not found, got: {message}"
@pytest.mark.p2
def test_get_user_api_key_empty_username(self, admin_session: requests.Session) -> None:
"""Test getting API keys with empty username"""
response: Dict[str, Any] = get_user_api_key(admin_session, "")
# Empty username should either return error or empty list
if response["code"] == RetCode.SUCCESS:
assert "data" in response, "Response should contain data"
api_keys: List[Dict[str, Any]] = response["data"]
assert isinstance(api_keys, list), "Should return a list"
assert len(api_keys) == 0, "Empty username should return empty list"
else:
assert "message" in response, "Error response should contain message"
assert len(response["message"]) > 0, "Error message should not be empty"
@pytest.mark.p2
def test_get_user_api_key_token_uniqueness(self, admin_session: requests.Session) -> None:
"""Test that all API keys in the list have unique tokens"""
user_name: str = EMAIL
# Generate multiple API keys
response1: Dict[str, Any] = generate_user_api_key(admin_session, user_name)
assert response1["code"] == RetCode.SUCCESS, response1
response2: Dict[str, Any] = generate_user_api_key(admin_session, user_name)
assert response2["code"] == RetCode.SUCCESS, response2
# Get all API keys
get_response: Dict[str, Any] = get_user_api_key(admin_session, user_name)
assert get_response["code"] == RetCode.SUCCESS, get_response
api_keys: List[Dict[str, Any]] = get_response["data"]
# Verify all tokens are unique
tokens: List[str] = [key.get("token") for key in api_keys if key.get("token")]
assert len(tokens) == len(set(tokens)), "All API keys should have unique tokens"
@pytest.mark.p2
def test_get_user_api_key_tenant_id_consistency(self, admin_session: requests.Session) -> None:
"""Test that all API keys for a user have the same tenant_id"""
user_name: str = EMAIL
# Generate multiple API keys
response1: Dict[str, Any] = generate_user_api_key(admin_session, user_name)
assert response1["code"] == RetCode.SUCCESS, response1
response2: Dict[str, Any] = generate_user_api_key(admin_session, user_name)
assert response2["code"] == RetCode.SUCCESS, response2
# Get all API keys
get_response: Dict[str, Any] = get_user_api_key(admin_session, user_name)
assert get_response["code"] == RetCode.SUCCESS, get_response
api_keys: List[Dict[str, Any]] = get_response["data"]
# Verify all keys have the same tenant_id
tenant_ids: List[str] = [key.get("tenant_id") for key in api_keys if key.get("tenant_id")]
if len(tenant_ids) > 0:
assert all(tid == tenant_ids[0] for tid in tenant_ids), "All API keys should have the same tenant_id"
@pytest.mark.p2
def test_get_user_api_key_beta_format(self, admin_session: requests.Session) -> None:
"""Test that beta field in API keys has correct format (32 characters)"""
user_name: str = EMAIL
# Generate a test API key
generate_response: Dict[str, Any] = generate_user_api_key(admin_session, user_name)
assert generate_response["code"] == RetCode.SUCCESS, generate_response
# Get all API keys
get_response: Dict[str, Any] = get_user_api_key(admin_session, user_name)
assert get_response["code"] == RetCode.SUCCESS, get_response
api_keys: List[Dict[str, Any]] = get_response["data"]
# Verify beta format for all keys
for key in api_keys:
beta: str = key.get("beta", "")
assert isinstance(beta, str), "beta should be a string"
assert len(beta) == 32, f"beta should be 32 characters, got {len(beta)}"
@pytest.mark.p3
def test_get_user_api_key_without_auth(self) -> None:
"""Test that getting API keys without admin auth fails"""
session: requests.Session = requests.Session()
user_name: str = EMAIL
response: Dict[str, Any] = get_user_api_key(session, user_name)
assert response["code"] == RetCode.UNAUTHORIZED, response
assert "message" in response, "Response should contain message"
message: str = response["message"].lower()
assert message == UNAUTHORIZED_ERROR_MESSAGE