# # Copyright 2024 The InfiniFlow Authors. All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # from datetime import datetime, timedelta from quart import request from api.db.services.api_service import API4ConversationService from api.db.services.user_service import UserTenantService from api.utils.api_utils import get_data_error_result, get_json_result, server_error_response from api.apps import login_required, current_user @manager.route('/stats', methods=['GET']) # noqa: F821 @login_required def stats(): try: tenants = UserTenantService.query(user_id=current_user.id) if not tenants: return get_data_error_result(message="Tenant not found!") objs = API4ConversationService.stats( tenants[0].tenant_id, request.args.get( "from_date", (datetime.now() - timedelta( days=7)).strftime("%Y-%m-%d 00:00:00")), request.args.get( "to_date", datetime.now().strftime("%Y-%m-%d %H:%M:%S")), "agent" if "canvas_id" in request.args else None) res = {"pv": [], "uv": [], "speed": [], "tokens": [], "round": [], "thumb_up": []} for obj in objs: dt = obj["dt"] res["pv"].append((dt, obj["pv"])) res["uv"].append((dt, obj["uv"])) res["speed"].append((dt, float(obj["tokens"]) / (float(obj["duration"]) + 0.1))) # +0.1 to avoid division by zero res["tokens"].append((dt, float(obj["tokens"]) / 1000.0)) # convert to thousands res["round"].append((dt, obj["round"])) res["thumb_up"].append((dt, obj["thumb_up"])) return get_json_result(data=res) except Exception as e: return server_error_response(e)