This commit is contained in:
takatost
2024-07-17 01:02:40 +08:00
parent 775e52db4d
commit 4ef3d4e65c
37 changed files with 241 additions and 267 deletions

View File

@ -14,6 +14,4 @@ class Condition(BaseModel):
# for number
"=", "", ">", "<", "", "", "null", "not null"
]
value_type: Literal["string", "value_selector"] = "string"
value: Optional[str] = None
value_selector: Optional[list[str]] = None

View File

@ -1,95 +1,101 @@
from typing import Literal, Optional
from typing import Any, Optional
from core.file.file_obj import FileVar
from core.workflow.entities.variable_pool import VariablePool
from core.workflow.utils.condition.entities import Condition
from core.workflow.utils.variable_template_parser import VariableTemplateParser
class ConditionProcessor:
def process(self, variable_pool: VariablePool,
logical_operator: Literal["and", "or"],
conditions: list[Condition]) -> tuple[bool, list[dict]]:
"""
Process conditions
:param variable_pool: variable pool
:param logical_operator: logical operator
:param conditions: conditions
"""
def process_conditions(self, variable_pool: VariablePool, conditions: list[Condition]):
input_conditions = []
sub_condition_compare_results = []
group_result = []
try:
for condition in conditions:
actual_value = variable_pool.get_variable_value(
variable_selector=condition.variable_selector
)
index = 0
for condition in conditions:
index += 1
actual_value = variable_pool.get_variable_value(
variable_selector=condition.variable_selector
)
if condition.value_type == "value_selector":
expected_value = variable_pool.get_variable_value(
variable_selector=condition.value_selector
)
expected_value = None
if condition.value is not None:
variable_template_parser = VariableTemplateParser(template=condition.value)
variable_selectors = variable_template_parser.extract_variable_selectors()
if variable_selectors:
for variable_selector in variable_selectors:
value = variable_pool.get_variable_value(
variable_selector=variable_selector.value_selector
)
expected_value = variable_template_parser.format({variable_selector.variable: value})
if expected_value is None:
expected_value = condition.value
else:
expected_value = condition.value
input_conditions.append({
comparison_operator = condition.comparison_operator
input_conditions.append(
{
"actual_value": actual_value,
"expected_value": expected_value,
"comparison_operator": condition.comparison_operator
})
"comparison_operator": comparison_operator
}
)
for input_condition in input_conditions:
actual_value = input_condition["actual_value"]
expected_value = input_condition["expected_value"]
comparison_operator = input_condition["comparison_operator"]
result = self.evaluate_condition(actual_value, comparison_operator, expected_value)
group_result.append(result)
if comparison_operator == "contains":
compare_result = self._assert_contains(actual_value, expected_value)
elif comparison_operator == "not contains":
compare_result = self._assert_not_contains(actual_value, expected_value)
elif comparison_operator == "start with":
compare_result = self._assert_start_with(actual_value, expected_value)
elif comparison_operator == "end with":
compare_result = self._assert_end_with(actual_value, expected_value)
elif comparison_operator == "is":
compare_result = self._assert_is(actual_value, expected_value)
elif comparison_operator == "is not":
compare_result = self._assert_is_not(actual_value, expected_value)
elif comparison_operator == "empty":
compare_result = self._assert_empty(actual_value)
elif comparison_operator == "not empty":
compare_result = self._assert_not_empty(actual_value)
elif comparison_operator == "=":
compare_result = self._assert_equal(actual_value, expected_value)
elif comparison_operator == "":
compare_result = self._assert_not_equal(actual_value, expected_value)
elif comparison_operator == ">":
compare_result = self._assert_greater_than(actual_value, expected_value)
elif comparison_operator == "<":
compare_result = self._assert_less_than(actual_value, expected_value)
elif comparison_operator == "":
compare_result = self._assert_greater_than_or_equal(actual_value, expected_value)
elif comparison_operator == "":
compare_result = self._assert_less_than_or_equal(actual_value, expected_value)
elif comparison_operator == "null":
compare_result = self._assert_null(actual_value)
elif comparison_operator == "not null":
compare_result = self._assert_not_null(actual_value)
else:
continue
return input_conditions, group_result
sub_condition_compare_results.append({
**input_condition,
"result": compare_result
})
except Exception as e:
raise ConditionAssertionError(str(e), input_conditions, sub_condition_compare_results)
def evaluate_condition(
self,
actual_value: Optional[str | int | float | dict[Any, Any] | list[Any] | FileVar | None],
comparison_operator: str,
expected_value: Optional[str] = None
) -> bool:
"""
Evaluate condition
:param actual_value: actual value
:param expected_value: expected value
:param comparison_operator: comparison operator
if logical_operator == "and":
compare_result = False not in [condition["result"] for condition in sub_condition_compare_results]
:return: bool
"""
if comparison_operator == "contains":
return self._assert_contains(actual_value, expected_value) # type: ignore
elif comparison_operator == "not contains":
return self._assert_not_contains(actual_value, expected_value) # type: ignore
elif comparison_operator == "start with":
return self._assert_start_with(actual_value, expected_value) # type: ignore
elif comparison_operator == "end with":
return self._assert_end_with(actual_value, expected_value) # type: ignore
elif comparison_operator == "is":
return self._assert_is(actual_value, expected_value) # type: ignore
elif comparison_operator == "is not":
return self._assert_is_not(actual_value, expected_value) # type: ignore
elif comparison_operator == "empty":
return self._assert_empty(actual_value) # type: ignore
elif comparison_operator == "not empty":
return self._assert_not_empty(actual_value) # type: ignore
elif comparison_operator == "=":
return self._assert_equal(actual_value, expected_value) # type: ignore
elif comparison_operator == "":
return self._assert_not_equal(actual_value, expected_value) # type: ignore
elif comparison_operator == ">":
return self._assert_greater_than(actual_value, expected_value) # type: ignore
elif comparison_operator == "<":
return self._assert_less_than(actual_value, expected_value) # type: ignore
elif comparison_operator == "":
return self._assert_greater_than_or_equal(actual_value, expected_value) # type: ignore
elif comparison_operator == "":
return self._assert_less_than_or_equal(actual_value, expected_value) # type: ignore
elif comparison_operator == "null":
return self._assert_null(actual_value) # type: ignore
elif comparison_operator == "not null":
return self._assert_not_null(actual_value) # type: ignore
else:
compare_result = True in [condition["result"] for condition in sub_condition_compare_results]
return compare_result, sub_condition_compare_results
raise ValueError(f"Invalid comparison operator: {comparison_operator}")
def _assert_contains(self, actual_value: Optional[str | list], expected_value: str) -> bool:
"""
@ -301,7 +307,8 @@ class ConditionProcessor:
return False
return True
def _assert_greater_than_or_equal(self, actual_value: Optional[int | float], expected_value: str | int | float) -> bool:
def _assert_greater_than_or_equal(self, actual_value: Optional[int | float],
expected_value: str | int | float) -> bool:
"""
Assert greater than or equal
:param actual_value: actual value
@ -323,7 +330,8 @@ class ConditionProcessor:
return False
return True
def _assert_less_than_or_equal(self, actual_value: Optional[int | float], expected_value: str | int | float) -> bool:
def _assert_less_than_or_equal(self, actual_value: Optional[int | float],
expected_value: str | int | float) -> bool:
"""
Assert less than or equal
:param actual_value: actual value