add workflow logics

This commit is contained in:
takatost
2024-02-20 21:30:43 +08:00
parent 9ad6bd78f5
commit f067947266
44 changed files with 894 additions and 389 deletions

View File

@ -1,7 +1,6 @@
import copy
from core.entities.application_entities import AppMode
from core.prompt.advanced_prompt_templates import (
BAICHUAN_CHAT_APP_CHAT_PROMPT_CONFIG,
BAICHUAN_CHAT_APP_COMPLETION_PROMPT_CONFIG,
@ -14,6 +13,7 @@ from core.prompt.advanced_prompt_templates import (
COMPLETION_APP_COMPLETION_PROMPT_CONFIG,
CONTEXT,
)
from models.model import AppMode
class AdvancedPromptTemplateService:

View File

@ -9,6 +9,7 @@ from core.model_runtime.model_providers import model_provider_factory
from core.moderation.factory import ModerationFactory
from core.provider_manager import ProviderManager
from models.account import Account
from models.model import AppMode
from services.dataset_service import DatasetService
SUPPORT_TOOLS = ["dataset", "google_search", "web_reader", "wikipedia", "current_datetime"]
@ -315,9 +316,6 @@ class AppModelConfigService:
if "tool_parameters" not in tool:
raise ValueError("tool_parameters is required in agent_mode.tools")
# dataset_query_variable
cls.is_dataset_query_variable_valid(config, app_mode)
# advanced prompt validation
cls.is_advanced_prompt_valid(config, app_mode)
@ -443,21 +441,6 @@ class AppModelConfigService:
config=config
)
@classmethod
def is_dataset_query_variable_valid(cls, config: dict, mode: str) -> None:
# Only check when mode is completion
if mode != 'completion':
return
agent_mode = config.get("agent_mode", {})
tools = agent_mode.get("tools", [])
dataset_exists = "dataset" in str(tools)
dataset_query_variable = config.get("dataset_query_variable")
if dataset_exists and not dataset_query_variable:
raise ValueError("Dataset query variable is required when dataset is exist")
@classmethod
def is_advanced_prompt_valid(cls, config: dict, app_mode: str) -> None:
# prompt_type

View File

@ -8,12 +8,10 @@ from core.application_manager import ApplicationManager
from core.entities.application_entities import InvokeFrom
from core.file.message_file_parser import MessageFileParser
from extensions.ext_database import db
from models.model import Account, App, AppModelConfig, Conversation, EndUser, Message
from models.model import Account, App, AppModelConfig, Conversation, EndUser
from services.app_model_config_service import AppModelConfigService
from services.errors.app import MoreLikeThisDisabledError
from services.errors.app_model_config import AppModelConfigBrokenError
from services.errors.conversation import ConversationCompletedError, ConversationNotExistsError
from services.errors.message import MessageNotExistsError
class CompletionService:
@ -157,62 +155,6 @@ class CompletionService:
}
)
@classmethod
def generate_more_like_this(cls, app_model: App, user: Union[Account, EndUser],
message_id: str, invoke_from: InvokeFrom, streaming: bool = True) \
-> Union[dict, Generator]:
if not user:
raise ValueError('user cannot be None')
message = db.session.query(Message).filter(
Message.id == message_id,
Message.app_id == app_model.id,
Message.from_source == ('api' if isinstance(user, EndUser) else 'console'),
Message.from_end_user_id == (user.id if isinstance(user, EndUser) else None),
Message.from_account_id == (user.id if isinstance(user, Account) else None),
).first()
if not message:
raise MessageNotExistsError()
current_app_model_config = app_model.app_model_config
more_like_this = current_app_model_config.more_like_this_dict
if not current_app_model_config.more_like_this or more_like_this.get("enabled", False) is False:
raise MoreLikeThisDisabledError()
app_model_config = message.app_model_config
model_dict = app_model_config.model_dict
completion_params = model_dict.get('completion_params')
completion_params['temperature'] = 0.9
model_dict['completion_params'] = completion_params
app_model_config.model = json.dumps(model_dict)
# parse files
message_file_parser = MessageFileParser(tenant_id=app_model.tenant_id, app_id=app_model.id)
file_objs = message_file_parser.transform_message_files(
message.files, app_model_config
)
application_manager = ApplicationManager()
return application_manager.generate(
tenant_id=app_model.tenant_id,
app_id=app_model.id,
app_model_config_id=app_model_config.id,
app_model_config_dict=app_model_config.to_dict(),
app_model_config_override=True,
user=user,
invoke_from=invoke_from,
inputs=message.inputs,
query=message.query,
files=file_objs,
conversation=None,
stream=streaming,
extras={
"auto_generate_conversation_name": False
}
)
@classmethod
def get_cleaned_inputs(cls, user_inputs: dict, app_model_config: AppModelConfig):
if user_inputs is None:

View File

@ -1,7 +1,7 @@
# -*- coding:utf-8 -*-
__all__ = [
'base', 'conversation', 'message', 'index', 'app_model_config', 'account', 'document', 'dataset',
'app', 'completion', 'audio', 'file'
'completion', 'audio', 'file'
]
from . import *

View File

@ -1,2 +0,0 @@
class MoreLikeThisDisabledError(Exception):
pass

View File

View File

@ -0,0 +1,72 @@
# default block config
default_block_configs = [
{
"type": "llm",
"config": {
"prompt_templates": {
"chat_model": {
"prompts": [
{
"role": "system",
"text": "You are a helpful AI assistant."
}
]
},
"completion_model": {
"conversation_histories_role": {
"user_prefix": "Human",
"assistant_prefix": "Assistant"
},
"prompt": {
"text": "Here is the chat histories between human and assistant, inside "
"<histories></histories> XML tags.\n\n<histories>\n{{"
"#histories#}}\n</histories>\n\n\nHuman: {{#query#}}\n\nAssistant:"
},
"stop": ["Human:"]
}
}
}
},
{
"type": "code",
"config": {
"variables": [
{
"variable": "arg1",
"value_selector": []
},
{
"variable": "arg2",
"value_selector": []
}
],
"code_language": "python3",
"code": "def main(\n arg1: int,\n arg2: int,\n) -> int:\n return {\n \"result\": arg1 "
"+ arg2\n }",
"outputs": [
{
"variable": "result",
"variable_type": "number"
}
]
}
},
{
"type": "template-transform",
"config": {
"variables": [
{
"variable": "arg1",
"value_selector": []
}
],
"template": "{{ arg1 }}"
}
},
{
"type": "question-classifier",
"config": {
"instructions": "" # TODO
}
}
]

View File

@ -0,0 +1,259 @@
import json
from typing import Optional
from core.application_manager import ApplicationManager
from core.entities.application_entities import ModelConfigEntity, PromptTemplateEntity, FileUploadEntity, \
ExternalDataVariableEntity, DatasetEntity, VariableEntity
from core.model_runtime.utils import helper
from core.workflow.entities.NodeEntities import NodeType
from core.workflow.nodes.end.entities import EndNodeOutputType
from extensions.ext_database import db
from models.account import Account
from models.model import App, AppMode, ChatbotAppEngine
from models.workflow import Workflow, WorkflowType
class WorkflowConverter:
"""
App Convert to Workflow Mode
"""
def convert_to_workflow(self, app_model: App, account: Account) -> Workflow:
"""
Convert to workflow mode
- basic mode of chatbot app
- advanced mode of assistant app (for migration)
- completion app (for migration)
:param app_model: App instance
:param account: Account instance
:return: workflow instance
"""
# get original app config
app_model_config = app_model.app_model_config
# convert app model config
application_manager = ApplicationManager()
application_manager.convert_from_app_model_config_dict(
tenant_id=app_model.tenant_id,
app_model_config_dict=app_model_config.to_dict()
)
# init workflow graph
graph = {
"nodes": [],
"edges": []
}
# Convert list:
# - variables -> start
# - model_config -> llm
# - prompt_template -> llm
# - file_upload -> llm
# - external_data_variables -> http-request
# - dataset -> knowledge-retrieval
# - show_retrieve_source -> knowledge-retrieval
# convert to start node
start_node = self._convert_to_start_node(
variables=app_model_config.variables
)
graph['nodes'].append(start_node)
# convert to http request node
if app_model_config.external_data_variables:
http_request_node = self._convert_to_http_request_node(
external_data_variables=app_model_config.external_data_variables
)
graph = self._append_node(graph, http_request_node)
# convert to knowledge retrieval node
if app_model_config.dataset:
knowledge_retrieval_node = self._convert_to_knowledge_retrieval_node(
dataset=app_model_config.dataset,
show_retrieve_source=app_model_config.show_retrieve_source
)
graph = self._append_node(graph, knowledge_retrieval_node)
# convert to llm node
llm_node = self._convert_to_llm_node(
model_config=app_model_config.model_config,
prompt_template=app_model_config.prompt_template,
file_upload=app_model_config.file_upload
)
graph = self._append_node(graph, llm_node)
# convert to end node by app mode
end_node = self._convert_to_end_node(app_model=app_model)
graph = self._append_node(graph, end_node)
# get new app mode
app_mode = self._get_new_app_mode(app_model)
# create workflow record
workflow = Workflow(
tenant_id=app_model.tenant_id,
app_id=app_model.id,
type=WorkflowType.from_app_mode(app_mode).value,
version='draft',
graph=json.dumps(graph),
created_by=account.id
)
db.session.add(workflow)
db.session.flush()
# create new app model config record
new_app_model_config = app_model_config.copy()
new_app_model_config.external_data_tools = ''
new_app_model_config.model = ''
new_app_model_config.user_input_form = ''
new_app_model_config.dataset_query_variable = None
new_app_model_config.pre_prompt = None
new_app_model_config.agent_mode = ''
new_app_model_config.prompt_type = 'simple'
new_app_model_config.chat_prompt_config = ''
new_app_model_config.completion_prompt_config = ''
new_app_model_config.dataset_configs = ''
new_app_model_config.chatbot_app_engine = ChatbotAppEngine.WORKFLOW.value \
if app_mode == AppMode.CHAT else ChatbotAppEngine.NORMAL.value
new_app_model_config.workflow_id = workflow.id
db.session.add(new_app_model_config)
db.session.commit()
return workflow
def _convert_to_start_node(self, variables: list[VariableEntity]) -> dict:
"""
Convert to Start Node
:param variables: list of variables
:return:
"""
return {
"id": "start",
"position": None,
"data": {
"title": "START",
"type": NodeType.START.value,
"variables": [helper.dump_model(v) for v in variables]
}
}
def _convert_to_http_request_node(self, external_data_variables: list[ExternalDataVariableEntity]) -> dict:
"""
Convert API Based Extension to HTTP Request Node
:param external_data_variables: list of external data variables
:return:
"""
# TODO: implement
pass
def _convert_to_knowledge_retrieval_node(self, new_app_mode: AppMode, dataset: DatasetEntity) -> dict:
"""
Convert datasets to Knowledge Retrieval Node
:param new_app_mode: new app mode
:param dataset: dataset
:return:
"""
# TODO: implement
if new_app_mode == AppMode.CHAT:
query_variable_selector = ["start", "sys.query"]
else:
pass
return {
"id": "knowledge-retrieval",
"position": None,
"data": {
"title": "KNOWLEDGE RETRIEVAL",
"type": NodeType.KNOWLEDGE_RETRIEVAL.value,
}
}
def _convert_to_llm_node(self, model_config: ModelConfigEntity,
prompt_template: PromptTemplateEntity,
file_upload: Optional[FileUploadEntity] = None) -> dict:
"""
Convert to LLM Node
:param model_config: model config
:param prompt_template: prompt template
:param file_upload: file upload config (optional)
"""
# TODO: implement
pass
def _convert_to_end_node(self, app_model: App) -> dict:
"""
Convert to End Node
:param app_model: App instance
:return:
"""
if app_model.mode == AppMode.CHAT.value:
return {
"id": "end",
"position": None,
"data": {
"title": "END",
"type": NodeType.END.value,
}
}
elif app_model.mode == "completion":
# for original completion app
return {
"id": "end",
"position": None,
"data": {
"title": "END",
"type": NodeType.END.value,
"outputs": {
"type": EndNodeOutputType.PLAIN_TEXT.value,
"plain_text_selector": ["llm", "text"]
}
}
}
def _create_edge(self, source: str, target: str) -> dict:
"""
Create Edge
:param source: source node id
:param target: target node id
:return:
"""
return {
"id": f"{source}-{target}",
"source": source,
"target": target
}
def _append_node(self, graph: dict, node: dict) -> dict:
"""
Append Node to Graph
:param graph: Graph, include: nodes, edges
:param node: Node to append
:return:
"""
previous_node = graph['nodes'][-1]
graph['nodes'].append(node)
graph['edges'].append(self._create_edge(previous_node['id'], node['id']))
return graph
def _get_new_app_mode(self, app_model: App) -> AppMode:
"""
Get new app mode
:param app_model: App instance
:return: AppMode
"""
if app_model.mode == "completion":
return AppMode.WORKFLOW
else:
return AppMode.value_of(app_model.mode)

View File

@ -0,0 +1,83 @@
import json
from datetime import datetime
from extensions.ext_database import db
from models.account import Account
from models.model import App, ChatbotAppEngine
from models.workflow import Workflow, WorkflowType
from services.workflow.defaults import default_block_configs
from services.workflow.workflow_converter import WorkflowConverter
class WorkflowService:
"""
Workflow Service
"""
def get_draft_workflow(self, app_model: App) -> Workflow:
"""
Get draft workflow
"""
# fetch draft workflow by app_model
workflow = db.session.query(Workflow).filter(
Workflow.tenant_id == app_model.tenant_id,
Workflow.app_id == app_model.id,
Workflow.version == 'draft'
).first()
# return draft workflow
return workflow
def sync_draft_workflow(self, app_model: App, graph: dict, account: Account) -> Workflow:
"""
Sync draft workflow
"""
# fetch draft workflow by app_model
workflow = self.get_draft_workflow(app_model=app_model)
# create draft workflow if not found
if not workflow:
workflow = Workflow(
tenant_id=app_model.tenant_id,
app_id=app_model.id,
type=WorkflowType.from_app_mode(app_model.mode).value,
version='draft',
graph=json.dumps(graph),
created_by=account.id
)
db.session.add(workflow)
# update draft workflow if found
else:
workflow.graph = json.dumps(graph)
workflow.updated_by = account.id
workflow.updated_at = datetime.utcnow()
# commit db session changes
db.session.commit()
# return draft workflow
return workflow
def get_default_block_configs(self) -> dict:
"""
Get default block configs
"""
# return default block config
return default_block_configs
def chatbot_convert_to_workflow(self, app_model: App) -> Workflow:
"""
basic mode of chatbot app to workflow
:param app_model: App instance
:return:
"""
# check if chatbot app is in basic mode
if app_model.app_model_config.chatbot_app_engine != ChatbotAppEngine.NORMAL:
raise ValueError('Chatbot app already in workflow mode')
# convert to workflow mode
workflow_converter = WorkflowConverter()
workflow = workflow_converter.convert_to_workflow(app_model=app_model)
return workflow