Fix: async invocation isssue. (#12634)

### Type of change

- [x] Bug Fix (non-breaking change which fixes an issue)
This commit is contained in:
Kevin Hu
2026-01-15 14:22:16 +08:00
committed by GitHub
parent 97b983fd0b
commit eb35e2b89f
2 changed files with 8 additions and 18 deletions

View File

@ -44,7 +44,7 @@ class KGSearch(Dealer):
return response return response
async def query_rewrite(self, llm, question, idxnms, kb_ids): async def query_rewrite(self, llm, question, idxnms, kb_ids):
ty2ents = get_entity_type2samples(idxnms, kb_ids) ty2ents = await get_entity_type2samples(idxnms, kb_ids)
hint_prompt = PROMPTS["minirag_query2kwd"].format(query=question, hint_prompt = PROMPTS["minirag_query2kwd"].format(query=question,
TYPE_POOL=json.dumps(ty2ents, ensure_ascii=False, indent=2)) TYPE_POOL=json.dumps(ty2ents, ensure_ascii=False, indent=2))
result = await self._chat(llm, hint_prompt, [{"role": "user", "content": "Output:"}], {}) result = await self._chat(llm, hint_prompt, [{"role": "user", "content": "Output:"}], {})
@ -242,7 +242,7 @@ class KGSearch(Dealer):
for (f, t), rel in rels_from_txt: for (f, t), rel in rels_from_txt:
if not rel.get("description"): if not rel.get("description"):
for tid in tenant_ids: for tid in tenant_ids:
rela = get_relation(tid, kb_ids, f, t) rela = await get_relation(tid, kb_ids, f, t)
if rela: if rela:
break break
else: else:

View File

@ -327,7 +327,7 @@ async def graph_node_to_chunk(kb_id, embd_mdl, ent_name, meta, chunks):
@timeout(3, 3) @timeout(3, 3)
def get_relation(tenant_id, kb_id, from_ent_name, to_ent_name, size=1): async def get_relation(tenant_id, kb_id, from_ent_name, to_ent_name, size=1):
ents = from_ent_name ents = from_ent_name
if isinstance(ents, str): if isinstance(ents, str):
ents = [from_ent_name] ents = [from_ent_name]
@ -337,7 +337,7 @@ def get_relation(tenant_id, kb_id, from_ent_name, to_ent_name, size=1):
ents = list(set(ents)) ents = list(set(ents))
conds = {"fields": ["content_with_weight"], "size": size, "from_entity_kwd": ents, "to_entity_kwd": ents, "knowledge_graph_kwd": ["relation"]} conds = {"fields": ["content_with_weight"], "size": size, "from_entity_kwd": ents, "to_entity_kwd": ents, "knowledge_graph_kwd": ["relation"]}
res = [] res = []
es_res = settings.retriever.search(conds, search.index_name(tenant_id), [kb_id] if isinstance(kb_id, str) else kb_id) es_res = await settings.retriever.search(conds, search.index_name(tenant_id), [kb_id] if isinstance(kb_id, str) else kb_id)
for id in es_res.ids: for id in es_res.ids:
try: try:
if size == 1: if size == 1:
@ -404,12 +404,7 @@ async def does_graph_contains(tenant_id, kb_id, doc_id):
async def get_graph_doc_ids(tenant_id, kb_id) -> list[str]: async def get_graph_doc_ids(tenant_id, kb_id) -> list[str]:
conds = {"fields": ["source_id"], "removed_kwd": "N", "size": 1, "knowledge_graph_kwd": ["graph"]} conds = {"fields": ["source_id"], "removed_kwd": "N", "size": 1, "knowledge_graph_kwd": ["graph"]}
res = await asyncio.to_thread( res = await settings.retriever.search(conds, search.index_name(tenant_id), [kb_id])
settings.retriever.search,
conds,
search.index_name(tenant_id),
[kb_id]
)
doc_ids = [] doc_ids = []
if res.total == 0: if res.total == 0:
return doc_ids return doc_ids
@ -420,12 +415,7 @@ async def get_graph_doc_ids(tenant_id, kb_id) -> list[str]:
async def get_graph(tenant_id, kb_id, exclude_rebuild=None): async def get_graph(tenant_id, kb_id, exclude_rebuild=None):
conds = {"fields": ["content_with_weight", "removed_kwd", "source_id"], "size": 1, "knowledge_graph_kwd": ["graph"]} conds = {"fields": ["content_with_weight", "removed_kwd", "source_id"], "size": 1, "knowledge_graph_kwd": ["graph"]}
res = await asyncio.to_thread( res = await settings.retriever.search(conds, search.index_name(tenant_id), [kb_id])
settings.retriever.search,
conds,
search.index_name(tenant_id),
[kb_id]
)
if not res.total == 0: if not res.total == 0:
for id in res.ids: for id in res.ids:
try: try:
@ -626,8 +616,8 @@ def merge_tuples(list1, list2):
return result return result
def get_entity_type2samples(idxnms, kb_ids: list): async def get_entity_type2samples(idxnms, kb_ids: list):
es_res = settings.retriever.search({"knowledge_graph_kwd": "ty2ents", "kb_id": kb_ids, "size": 10000, "fields": ["content_with_weight"]},idxnms,kb_ids) es_res = await settings.retriever.search({"knowledge_graph_kwd": "ty2ents", "kb_id": kb_ids, "size": 10000, "fields": ["content_with_weight"]},idxnms,kb_ids)
res = defaultdict(list) res = defaultdict(list)
for id in es_res.ids: for id in es_res.ids: