mirror of
https://github.com/infiniflow/ragflow.git
synced 2026-03-26 08:49:56 +08:00
refactor: improve memory_message_service current time stamp usage (#12912)
### What problem does this PR solve? improve memory_message_service current time stamp usage ### Type of change - [x] Refactoring
This commit is contained in:
@ -46,6 +46,9 @@ async def save_to_memory(memory_id: str, message_dict: dict):
|
||||
"agent_response": str
|
||||
}
|
||||
"""
|
||||
current_time = current_timestamp()
|
||||
current_date = timestamp_to_date(current_time)
|
||||
|
||||
memory = MemoryService.get_by_memory_id(memory_id)
|
||||
if not memory:
|
||||
return False, f"Memory '{memory_id}' not found."
|
||||
@ -69,7 +72,7 @@ async def save_to_memory(memory_id: str, message_dict: dict):
|
||||
"agent_id": message_dict["agent_id"],
|
||||
"session_id": message_dict["session_id"],
|
||||
"content": f"User Input: {message_dict.get('user_input')}\nAgent Response: {message_dict.get('agent_response')}",
|
||||
"valid_at": timestamp_to_date(current_timestamp()),
|
||||
"valid_at": current_date,
|
||||
"invalid_at": None,
|
||||
"forget_at": None,
|
||||
"status": True
|
||||
@ -91,17 +94,20 @@ async def save_to_memory(memory_id: str, message_dict: dict):
|
||||
|
||||
|
||||
async def save_extracted_to_memory_only(memory_id: str, message_dict, source_message_id: int, task_id: str=None):
|
||||
current_time = current_timestamp()
|
||||
current_date = timestamp_to_date(current_time)
|
||||
|
||||
memory = MemoryService.get_by_memory_id(memory_id)
|
||||
if not memory:
|
||||
msg = f"Memory '{memory_id}' not found."
|
||||
if task_id:
|
||||
TaskService.update_progress(task_id, {"progress": -1, "progress_msg": timestamp_to_date(current_timestamp())+ " " + msg})
|
||||
TaskService.update_progress(task_id, {"progress": -1, "progress_msg": current_date + " " + msg})
|
||||
return False, msg
|
||||
|
||||
if memory.memory_type == MemoryType.RAW.value:
|
||||
msg = f"Memory '{memory_id}' don't need to extract."
|
||||
if task_id:
|
||||
TaskService.update_progress(task_id, {"progress": 1.0, "progress_msg": timestamp_to_date(current_timestamp())+ " " + msg})
|
||||
TaskService.update_progress(task_id, {"progress": 1.0, "progress_msg": current_date + " " + msg})
|
||||
return True, msg
|
||||
|
||||
tenant_id = memory.tenant_id
|
||||
@ -131,23 +137,26 @@ async def save_extracted_to_memory_only(memory_id: str, message_dict, source_mes
|
||||
if not message_list:
|
||||
msg = "No memory extracted from raw message."
|
||||
if task_id:
|
||||
TaskService.update_progress(task_id, {"progress": 1.0, "progress_msg": timestamp_to_date(current_timestamp())+ " " + msg})
|
||||
TaskService.update_progress(task_id, {"progress": 1.0, "progress_msg": current_date + " " + msg})
|
||||
return True, msg
|
||||
|
||||
if task_id:
|
||||
TaskService.update_progress(task_id, {"progress": 0.5, "progress_msg": timestamp_to_date(current_timestamp())+ " " + f"Extracted {len(message_list)} messages from raw dialogue."})
|
||||
TaskService.update_progress(task_id, {"progress": 0.5, "progress_msg": current_date + " " + f"Extracted {len(message_list)} messages from raw dialogue."})
|
||||
return await embed_and_save(memory, message_list, task_id)
|
||||
|
||||
|
||||
async def extract_by_llm(tenant_id: str, llm_id: str, extract_conf: dict, memory_type: List[str], user_input: str,
|
||||
agent_response: str, system_prompt: str = "", user_prompt: str="", task_id: str=None) -> List[dict]:
|
||||
current_time = current_timestamp()
|
||||
current_date = timestamp_to_date(current_time)
|
||||
|
||||
llm_type = TenantLLMService.llm_id2llm_type(llm_id)
|
||||
if not llm_type:
|
||||
raise RuntimeError(f"Unknown type of LLM '{llm_id}'")
|
||||
if not system_prompt:
|
||||
system_prompt = PromptAssembler.assemble_system_prompt({"memory_type": memory_type})
|
||||
conversation_content = f"User Input: {user_input}\nAgent Response: {agent_response}"
|
||||
conversation_time = timestamp_to_date(current_timestamp())
|
||||
conversation_time = current_date
|
||||
user_prompts = []
|
||||
if user_prompt:
|
||||
user_prompts.append({"role": "user", "content": user_prompt})
|
||||
@ -156,11 +165,11 @@ async def extract_by_llm(tenant_id: str, llm_id: str, extract_conf: dict, memory
|
||||
user_prompts.append({"role": "user", "content": PromptAssembler.assemble_user_prompt(conversation_content, conversation_time, conversation_time)})
|
||||
llm = LLMBundle(tenant_id, llm_type, llm_id)
|
||||
if task_id:
|
||||
TaskService.update_progress(task_id, {"progress": 0.15, "progress_msg": timestamp_to_date(current_timestamp())+ " " + "Prepared prompts and LLM."})
|
||||
TaskService.update_progress(task_id, {"progress": 0.15, "progress_msg": current_date + " " + "Prepared prompts and LLM."})
|
||||
res = await llm.async_chat(system_prompt, user_prompts, extract_conf)
|
||||
res_json = get_json_result_from_llm_response(res)
|
||||
if task_id:
|
||||
TaskService.update_progress(task_id, {"progress": 0.35, "progress_msg": timestamp_to_date(current_timestamp())+ " " + "Get extracted result from LLM."})
|
||||
TaskService.update_progress(task_id, {"progress": 0.35, "progress_msg": current_date + " " + "Get extracted result from LLM."})
|
||||
return [{
|
||||
"content": extracted_content["content"],
|
||||
"valid_at": format_iso_8601_to_ymd_hms(extracted_content["valid_at"]),
|
||||
@ -170,21 +179,24 @@ async def extract_by_llm(tenant_id: str, llm_id: str, extract_conf: dict, memory
|
||||
|
||||
|
||||
async def embed_and_save(memory, message_list: list[dict], task_id: str=None):
|
||||
current_time = current_timestamp()
|
||||
current_date = timestamp_to_date(current_time)
|
||||
|
||||
embedding_model = LLMBundle(memory.tenant_id, llm_type=LLMType.EMBEDDING, llm_name=memory.embd_id)
|
||||
if task_id:
|
||||
TaskService.update_progress(task_id, {"progress": 0.65, "progress_msg": timestamp_to_date(current_timestamp())+ " " + "Prepared embedding model."})
|
||||
TaskService.update_progress(task_id, {"progress": 0.65, "progress_msg": current_date + " " + "Prepared embedding model."})
|
||||
vector_list, _ = embedding_model.encode([msg["content"] for msg in message_list])
|
||||
for idx, msg in enumerate(message_list):
|
||||
msg["content_embed"] = vector_list[idx]
|
||||
if task_id:
|
||||
TaskService.update_progress(task_id, {"progress": 0.85, "progress_msg": timestamp_to_date(current_timestamp())+ " " + "Embedded extracted content."})
|
||||
TaskService.update_progress(task_id, {"progress": 0.85, "progress_msg": current_date + " " + "Embedded extracted content."})
|
||||
vector_dimension = len(vector_list[0])
|
||||
if not MessageService.has_index(memory.tenant_id, memory.id):
|
||||
created = MessageService.create_index(memory.tenant_id, memory.id, vector_size=vector_dimension)
|
||||
if not created:
|
||||
error_msg = "Failed to create message index."
|
||||
if task_id:
|
||||
TaskService.update_progress(task_id, {"progress": -1, "progress_msg": timestamp_to_date(current_timestamp())+ " " + error_msg})
|
||||
TaskService.update_progress(task_id, {"progress": -1, "progress_msg": current_date + " " + error_msg})
|
||||
return False, error_msg
|
||||
|
||||
new_msg_size = sum([MessageService.calculate_message_size(m) for m in message_list])
|
||||
@ -199,17 +211,17 @@ async def embed_and_save(memory, message_list: list[dict], task_id: str=None):
|
||||
else:
|
||||
error_msg = "Failed to insert message into memory. Memory size reached limit and cannot decide which to delete."
|
||||
if task_id:
|
||||
TaskService.update_progress(task_id, {"progress": -1, "progress_msg": timestamp_to_date(current_timestamp())+ " " + error_msg})
|
||||
TaskService.update_progress(task_id, {"progress": -1, "progress_msg": current_date + " " + error_msg})
|
||||
return False, error_msg
|
||||
fail_cases = MessageService.insert_message(message_list, memory.tenant_id, memory.id)
|
||||
if fail_cases:
|
||||
error_msg = "Failed to insert message into memory. Details: " + "; ".join(fail_cases)
|
||||
if task_id:
|
||||
TaskService.update_progress(task_id, {"progress": -1, "progress_msg": timestamp_to_date(current_timestamp())+ " " + error_msg})
|
||||
TaskService.update_progress(task_id, {"progress": -1, "progress_msg": current_date + " " + error_msg})
|
||||
return False, error_msg
|
||||
|
||||
if task_id:
|
||||
TaskService.update_progress(task_id, {"progress": 0.95, "progress_msg": timestamp_to_date(current_timestamp())+ " " + "Saved messages to storage."})
|
||||
TaskService.update_progress(task_id, {"progress": 0.95, "progress_msg": current_date + " " + "Saved messages to storage."})
|
||||
increase_memory_size_cache(memory.id, new_msg_size)
|
||||
return True, "Message saved successfully."
|
||||
|
||||
@ -340,6 +352,9 @@ async def queue_save_to_memory_task(memory_ids: list[str], message_dict: dict):
|
||||
"agent_response": str
|
||||
}
|
||||
"""
|
||||
current_time = current_timestamp()
|
||||
current_date = timestamp_to_date(current_time)
|
||||
|
||||
def new_task(_memory_id: str, _source_id: int):
|
||||
return {
|
||||
"id": get_uuid(),
|
||||
@ -367,7 +382,7 @@ async def queue_save_to_memory_task(memory_ids: list[str], message_dict: dict):
|
||||
"agent_id": message_dict["agent_id"],
|
||||
"session_id": message_dict["session_id"],
|
||||
"content": f"User Input: {message_dict.get('user_input')}\nAgent Response: {message_dict.get('agent_response')}",
|
||||
"valid_at": timestamp_to_date(current_timestamp()),
|
||||
"valid_at": current_date,
|
||||
"invalid_at": None,
|
||||
"forget_at": None,
|
||||
"status": True
|
||||
@ -417,22 +432,24 @@ async def handle_save_to_memory_task(task_param: dict):
|
||||
}
|
||||
}
|
||||
"""
|
||||
current_time = current_timestamp()
|
||||
current_date = timestamp_to_date(current_time)
|
||||
|
||||
_, task = TaskService.get_by_id(task_param["id"])
|
||||
if not task:
|
||||
return False, f"Task {task_param['id']} is not found."
|
||||
if task.progress == -1:
|
||||
return False, f"Task {task_param['id']} is already failed."
|
||||
now_time = current_timestamp()
|
||||
TaskService.update_by_id(task_param["id"], {"begin_at": timestamp_to_date(now_time)})
|
||||
TaskService.update_by_id(task_param["id"], {"begin_at": current_date})
|
||||
|
||||
memory_id = task_param["memory_id"]
|
||||
source_id = task_param["source_id"]
|
||||
message_dict = task_param["message_dict"]
|
||||
success, msg = await save_extracted_to_memory_only(memory_id, message_dict, source_id, task.id)
|
||||
if success:
|
||||
TaskService.update_progress(task.id, {"progress": 1.0, "progress_msg": timestamp_to_date(current_timestamp())+ " " + msg})
|
||||
TaskService.update_progress(task.id, {"progress": 1.0, "progress_msg": current_date + " " + msg})
|
||||
return True, msg
|
||||
|
||||
logging.error(msg)
|
||||
TaskService.update_progress(task.id, {"progress": -1, "progress_msg": timestamp_to_date(current_timestamp())+ " " + msg})
|
||||
TaskService.update_progress(task.id, {"progress": -1, "progress_msg": current_date + " " + msg})
|
||||
return False, msg
|
||||
|
||||
Reference in New Issue
Block a user