Refact: system apis (#14298)

### What problem does this PR solve?
Refact: system apis

### Type of change

- [x] Refactoring
This commit is contained in:
Magicbook1108
2026-04-23 14:09:42 +08:00
committed by GitHub
parent 57f527eb02
commit 76b017ca32
6 changed files with 185 additions and 207 deletions

View File

@ -14,18 +14,25 @@
# limitations under the License.
#
import json
import logging
from datetime import datetime
from timeit import default_timer as timer
from quart import jsonify
from api.apps import login_required, current_user
from api.utils.api_utils import get_json_result, get_data_error_result, server_error_response, generate_confirmation_token
from api.utils.health_utils import run_health_checks
from api.utils.health_utils import run_health_checks, get_oceanbase_status
from common.versions import get_ragflow_version
from datetime import datetime
from common.time_utils import current_timestamp, datetime_format
from api.db.db_models import APIToken
from api.db.services.api_service import APITokenService
from api.db.services.knowledgebase_service import KnowledgebaseService
from api.db.services.user_service import UserTenantService
from common.log_utils import get_log_levels, set_log_level
from common import settings
from rag.utils.redis_conn import REDIS_CONN
@manager.route("/system/ping", methods=["GET"]) # noqa: F821
async def ping():
@ -53,6 +60,174 @@ def version():
"""
return get_json_result(data=get_ragflow_version())
@manager.route("/system/status", methods=["GET"]) # noqa: F821
@login_required
def status():
"""
Get the system status.
---
tags:
- System
security:
- ApiKeyAuth: []
responses:
200:
description: System is operational.
schema:
type: object
properties:
es:
type: object
description: Elasticsearch status.
storage:
type: object
description: Storage status.
database:
type: object
description: Database status.
503:
description: Service unavailable.
schema:
type: object
properties:
error:
type: string
description: Error message.
"""
res = {}
st = timer()
try:
res["doc_engine"] = settings.docStoreConn.health()
res["doc_engine"]["elapsed"] = "{:.1f}".format((timer() - st) * 1000.0)
except Exception as e:
res["doc_engine"] = {
"type": "unknown",
"status": "red",
"elapsed": "{:.1f}".format((timer() - st) * 1000.0),
"error": str(e),
}
st = timer()
try:
settings.STORAGE_IMPL.health()
res["storage"] = {
"storage": settings.STORAGE_IMPL_TYPE.lower(),
"status": "green",
"elapsed": "{:.1f}".format((timer() - st) * 1000.0),
}
except Exception as e:
res["storage"] = {
"storage": settings.STORAGE_IMPL_TYPE.lower(),
"status": "red",
"elapsed": "{:.1f}".format((timer() - st) * 1000.0),
"error": str(e),
}
st = timer()
try:
KnowledgebaseService.get_by_id("x")
res["database"] = {
"database": settings.DATABASE_TYPE.lower(),
"status": "green",
"elapsed": "{:.1f}".format((timer() - st) * 1000.0),
}
except Exception as e:
res["database"] = {
"database": settings.DATABASE_TYPE.lower(),
"status": "red",
"elapsed": "{:.1f}".format((timer() - st) * 1000.0),
"error": str(e),
}
st = timer()
try:
if not REDIS_CONN.health():
raise Exception("Lost connection!")
res["redis"] = {
"status": "green",
"elapsed": "{:.1f}".format((timer() - st) * 1000.0),
}
except Exception as e:
res["redis"] = {
"status": "red",
"elapsed": "{:.1f}".format((timer() - st) * 1000.0),
"error": str(e),
}
task_executor_heartbeats = {}
try:
task_executors = REDIS_CONN.smembers("TASKEXE")
now = datetime.now().timestamp()
for task_executor_id in task_executors:
heartbeats = REDIS_CONN.zrangebyscore(task_executor_id, now - 60 * 30, now)
heartbeats = [json.loads(heartbeat) for heartbeat in heartbeats]
task_executor_heartbeats[task_executor_id] = heartbeats
except Exception:
logging.exception("get task executor heartbeats failed!")
res["task_executor_heartbeats"] = task_executor_heartbeats
return get_json_result(data=res)
@manager.route("/system/oceanbase/status", methods=["GET"]) # noqa: F821
@login_required
def oceanbase_status():
"""
Get OceanBase health status and performance metrics.
---
tags:
- System
security:
- ApiKeyAuth: []
responses:
200:
description: OceanBase status retrieved successfully.
schema:
type: object
properties:
status:
type: string
description: Status (alive/timeout).
message:
type: object
description: Detailed status information including health and performance metrics.
"""
try:
status_info = get_oceanbase_status()
return get_json_result(data=status_info)
except Exception as e:
return get_json_result(
data={
"status": "error",
"message": f"Failed to get OceanBase status: {str(e)}"
},
code=500
)
@manager.route("/system/config", methods=["GET"]) # noqa: F821
def get_config():
"""
Get system configuration.
---
tags:
- System
responses:
200:
description: Return system configuration
schema:
type: object
properties:
registerEnable:
type: integer 0 means disabled, 1 means enabled
description: Whether user registration is enabled
"""
return get_json_result(data={
"registerEnabled": settings.REGISTER_ENABLED,
"disablePasswordLogin": settings.DISABLE_PASSWORD_LOGIN,
})
@manager.route("/system/healthz", methods=["GET"]) # noqa: F821
def healthz():
result, all_ok = run_health_checks()

View File

@ -1,197 +0,0 @@
#
# Copyright 2024 The InfiniFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License
#
import logging
from datetime import datetime
import json
from api.apps import login_required
from api.db.services.knowledgebase_service import KnowledgebaseService
from api.utils.api_utils import (
get_json_result,
)
from timeit import default_timer as timer
from rag.utils.redis_conn import REDIS_CONN
from api.utils.health_utils import get_oceanbase_status
from common import settings
@manager.route("/status", methods=["GET"]) # noqa: F821
@login_required
def status():
"""
Get the system status.
---
tags:
- System
security:
- ApiKeyAuth: []
responses:
200:
description: System is operational.
schema:
type: object
properties:
es:
type: object
description: Elasticsearch status.
storage:
type: object
description: Storage status.
database:
type: object
description: Database status.
503:
description: Service unavailable.
schema:
type: object
properties:
error:
type: string
description: Error message.
"""
res = {}
st = timer()
try:
res["doc_engine"] = settings.docStoreConn.health()
res["doc_engine"]["elapsed"] = "{:.1f}".format((timer() - st) * 1000.0)
except Exception as e:
res["doc_engine"] = {
"type": "unknown",
"status": "red",
"elapsed": "{:.1f}".format((timer() - st) * 1000.0),
"error": str(e),
}
st = timer()
try:
settings.STORAGE_IMPL.health()
res["storage"] = {
"storage": settings.STORAGE_IMPL_TYPE.lower(),
"status": "green",
"elapsed": "{:.1f}".format((timer() - st) * 1000.0),
}
except Exception as e:
res["storage"] = {
"storage": settings.STORAGE_IMPL_TYPE.lower(),
"status": "red",
"elapsed": "{:.1f}".format((timer() - st) * 1000.0),
"error": str(e),
}
st = timer()
try:
KnowledgebaseService.get_by_id("x")
res["database"] = {
"database": settings.DATABASE_TYPE.lower(),
"status": "green",
"elapsed": "{:.1f}".format((timer() - st) * 1000.0),
}
except Exception as e:
res["database"] = {
"database": settings.DATABASE_TYPE.lower(),
"status": "red",
"elapsed": "{:.1f}".format((timer() - st) * 1000.0),
"error": str(e),
}
st = timer()
try:
if not REDIS_CONN.health():
raise Exception("Lost connection!")
res["redis"] = {
"status": "green",
"elapsed": "{:.1f}".format((timer() - st) * 1000.0),
}
except Exception as e:
res["redis"] = {
"status": "red",
"elapsed": "{:.1f}".format((timer() - st) * 1000.0),
"error": str(e),
}
task_executor_heartbeats = {}
try:
task_executors = REDIS_CONN.smembers("TASKEXE")
now = datetime.now().timestamp()
for task_executor_id in task_executors:
heartbeats = REDIS_CONN.zrangebyscore(task_executor_id, now - 60 * 30, now)
heartbeats = [json.loads(heartbeat) for heartbeat in heartbeats]
task_executor_heartbeats[task_executor_id] = heartbeats
except Exception:
logging.exception("get task executor heartbeats failed!")
res["task_executor_heartbeats"] = task_executor_heartbeats
return get_json_result(data=res)
@manager.route("/oceanbase/status", methods=["GET"]) # noqa: F821
@login_required
def oceanbase_status():
"""
Get OceanBase health status and performance metrics.
---
tags:
- System
security:
- ApiKeyAuth: []
responses:
200:
description: OceanBase status retrieved successfully.
schema:
type: object
properties:
status:
type: string
description: Status (alive/timeout).
message:
type: object
description: Detailed status information including health and performance metrics.
"""
try:
status_info = get_oceanbase_status()
return get_json_result(data=status_info)
except Exception as e:
return get_json_result(
data={
"status": "error",
"message": f"Failed to get OceanBase status: {str(e)}"
},
code=500
)
@manager.route("/config", methods=["GET"]) # noqa: F821
def get_config():
"""
Get system configuration.
---
tags:
- System
responses:
200:
description: Return system configuration
schema:
type: object
properties:
registerEnable:
type: integer 0 means disabled, 1 means enabled
description: Whether user registration is enabled
"""
return get_json_result(data={
"registerEnabled": settings.REGISTER_ENABLED,
"disablePasswordLogin": settings.DISABLE_PASSWORD_LOGIN,
})