Merge branch 'main' into feat/node-execution-retry

This commit is contained in:
Novice Lee
2024-12-18 09:38:18 +08:00
179 changed files with 3286 additions and 1295 deletions

View File

@ -72,7 +72,11 @@ class BaseNode(Generic[GenericNodeData]):
result = self._run()
except Exception as e:
logger.exception(f"Node {self.node_id} failed to run")
result = NodeRunResult(status=WorkflowNodeExecutionStatus.FAILED, error=str(e), error_type="SystemError")
result = NodeRunResult(
status=WorkflowNodeExecutionStatus.FAILED,
error=str(e),
error_type="WorkflowNodeError",
)
if isinstance(result, NodeRunResult):
yield RunCompletedEvent(run_result=result)

View File

@ -37,7 +37,7 @@ BODY_TYPE_TO_CONTENT_TYPE = {
class Executor:
method: Literal["get", "head", "post", "put", "delete", "patch"]
url: str
params: Mapping[str, str] | None
params: list[tuple[str, str]] | None
content: str | bytes | None
data: Mapping[str, Any] | None
files: Mapping[str, tuple[str | None, bytes, str]] | None
@ -69,7 +69,7 @@ class Executor:
self.method = node_data.method
self.auth = node_data.authorization
self.timeout = timeout
self.params = {}
self.params = []
self.headers = {}
self.content = None
self.files = None
@ -92,14 +92,48 @@ class Executor:
self.url = self.variable_pool.convert_template(self.node_data.url).text
def _init_params(self):
params = _plain_text_to_dict(self.node_data.params)
for key in params:
params[key] = self.variable_pool.convert_template(params[key]).text
self.params = params
"""
Almost same as _init_headers(), difference:
1. response a list tuple to support same key, like 'aa=1&aa=2'
2. param value may have '\n', we need to splitlines then extract the variable value.
"""
result = []
for line in self.node_data.params.splitlines():
if not (line := line.strip()):
continue
key, *value = line.split(":", 1)
if not (key := key.strip()):
continue
value = value[0].strip() if value else ""
result.append(
(self.variable_pool.convert_template(key).text, self.variable_pool.convert_template(value).text)
)
self.params = result
def _init_headers(self):
"""
Convert the header string of frontend to a dictionary.
Each line in the header string represents a key-value pair.
Keys and values are separated by ':'.
Empty values are allowed.
Examples:
'aa:bb\n cc:dd' -> {'aa': 'bb', 'cc': 'dd'}
'aa:\n cc:dd\n' -> {'aa': '', 'cc': 'dd'}
'aa\n cc : dd' -> {'aa': '', 'cc': 'dd'}
"""
headers = self.variable_pool.convert_template(self.node_data.headers).text
self.headers = _plain_text_to_dict(headers)
self.headers = {
key.strip(): (value[0].strip() if value else "")
for line in headers.splitlines()
if line.strip()
for key, *value in [line.split(":", 1)]
}
def _init_body(self):
body = self.node_data.body
@ -292,33 +326,6 @@ class Executor:
return raw
def _plain_text_to_dict(text: str, /) -> dict[str, str]:
"""
Convert a string of key-value pairs to a dictionary.
Each line in the input string represents a key-value pair.
Keys and values are separated by ':'.
Empty values are allowed.
Examples:
'aa:bb\n cc:dd' -> {'aa': 'bb', 'cc': 'dd'}
'aa:\n cc:dd\n' -> {'aa': '', 'cc': 'dd'}
'aa\n cc : dd' -> {'aa': '', 'cc': 'dd'}
Args:
convert_text (str): The input string to convert.
Returns:
dict[str, str]: A dictionary of key-value pairs.
"""
return {
key.strip(): (value[0].strip() if value else "")
for line in text.splitlines()
if line.strip()
for key, *value in [line.split(":", 1)]
}
def _generate_random_string(n: int) -> str:
"""
Generate a random string of lowercase ASCII letters.

View File

@ -163,7 +163,9 @@ class IterationNode(BaseNode[IterationNodeData]):
if self.node_data.is_parallel:
futures: list[Future] = []
q: Queue = Queue()
thread_pool = GraphEngineThreadPool(max_workers=self.node_data.parallel_nums, max_submit_count=100)
thread_pool = GraphEngineThreadPool(
max_workers=self.node_data.parallel_nums, max_submit_count=dify_config.MAX_SUBMIT_COUNT
)
for index, item in enumerate(iterator_list_value):
future: Future = thread_pool.submit(
self._run_single_iter_parallel,

View File

@ -70,7 +70,20 @@ class KnowledgeRetrievalNode(BaseNode[KnowledgeRetrievalNodeData]):
except KnowledgeRetrievalNodeError as e:
logger.warning("Error when running knowledge retrieval node")
return NodeRunResult(status=WorkflowNodeExecutionStatus.FAILED, inputs=variables, error=str(e))
return NodeRunResult(
status=WorkflowNodeExecutionStatus.FAILED,
inputs=variables,
error=str(e),
error_type=type(e).__name__,
)
# Temporary handle all exceptions from DatasetRetrieval class here.
except Exception as e:
return NodeRunResult(
status=WorkflowNodeExecutionStatus.FAILED,
inputs=variables,
error=str(e),
error_type=type(e).__name__,
)
def _fetch_dataset_retriever(self, node_data: KnowledgeRetrievalNodeData, query: str) -> list[dict[str, Any]]:
available_datasets = []
@ -160,18 +173,18 @@ class KnowledgeRetrievalNode(BaseNode[KnowledgeRetrievalNodeData]):
reranking_model = None
weights = None
all_documents = dataset_retrieval.multiple_retrieve(
self.app_id,
self.tenant_id,
self.user_id,
self.user_from.value,
available_datasets,
query,
node_data.multiple_retrieval_config.top_k,
node_data.multiple_retrieval_config.score_threshold,
node_data.multiple_retrieval_config.reranking_mode,
reranking_model,
weights,
node_data.multiple_retrieval_config.reranking_enable,
app_id=self.app_id,
tenant_id=self.tenant_id,
user_id=self.user_id,
user_from=self.user_from.value,
available_datasets=available_datasets,
query=query,
top_k=node_data.multiple_retrieval_config.top_k,
score_threshold=node_data.multiple_retrieval_config.score_threshold,
reranking_mode=node_data.multiple_retrieval_config.reranking_mode,
reranking_model=reranking_model,
weights=weights,
reranking_enable=node_data.multiple_retrieval_config.reranking_enable,
)
dify_documents = [item for item in all_documents if item.provider == "dify"]
external_documents = [item for item in all_documents if item.provider == "external"]

View File

@ -92,6 +92,16 @@ class ToolNode(BaseNode[ToolNodeData]):
error=f"Failed to invoke tool: {str(e)}",
error_type=type(e).__name__,
)
except Exception as e:
return NodeRunResult(
status=WorkflowNodeExecutionStatus.FAILED,
inputs=parameters_for_log,
metadata={
NodeRunMetadataKey.TOOL_INFO: tool_info,
},
error=f"Failed to invoke tool: {str(e)}",
error_type="UnknownError",
)
# convert tool messages
plain_text, files, json = self._convert_tool_messages(messages)