Merge remote-tracking branch 'origin/feat/plugins' into dev/plugin-deploy

This commit is contained in:
Yeuoly
2024-11-21 14:35:14 +08:00
290 changed files with 3601 additions and 3350 deletions

View File

@ -68,6 +68,7 @@ def init_app(app: Flask) -> Celery:
"schedule.clean_unused_datasets_task",
"schedule.create_tidb_serverless_task",
"schedule.update_tidb_serverless_status_task",
"schedule.clean_messages",
]
day = dify_config.CELERY_BEAT_SCHEDULER_TIME
beat_schedule = {
@ -87,6 +88,10 @@ def init_app(app: Flask) -> Celery:
"task": "schedule.update_tidb_serverless_status_task.update_tidb_serverless_status_task",
"schedule": crontab(minute="30", hour="*"),
},
"clean_messages": {
"task": "schedule.clean_messages.clean_messages",
"schedule": timedelta(days=day),
},
}
celery_app.conf.update(beat_schedule=beat_schedule, imports=imports)

View File

@ -1,11 +1,12 @@
import redis
from redis.cluster import ClusterNode, RedisCluster
from redis.connection import Connection, SSLConnection
from redis.sentinel import Sentinel
from configs import dify_config
class RedisClientWrapper(redis.Redis):
class RedisClientWrapper:
"""
A wrapper class for the Redis client that addresses the issue where the global
`redis_client` variable cannot be updated when a new Redis instance is returned
@ -71,6 +72,12 @@ def init_app(app):
)
master = sentinel.master_for(dify_config.REDIS_SENTINEL_SERVICE_NAME, **redis_params)
redis_client.initialize(master)
elif dify_config.REDIS_USE_CLUSTERS:
nodes = [
ClusterNode(host=node.split(":")[0], port=int(node.split.split(":")[1]))
for node in dify_config.REDIS_CLUSTERS.split(",")
]
redis_client.initialize(RedisCluster(startup_nodes=nodes, password=dify_config.REDIS_CLUSTERS_PASSWORD))
else:
redis_params.update(
{

View File

@ -70,7 +70,7 @@ class Storage:
try:
self.storage_runner.save(filename, data)
except Exception as e:
logging.exception("Failed to save file: %s", e)
logging.exception(f"Failed to save file {filename}")
raise e
def load(self, filename: str, /, *, stream: bool = False) -> Union[bytes, Generator]:
@ -80,42 +80,42 @@ class Storage:
else:
return self.load_once(filename)
except Exception as e:
logging.exception("Failed to load file: %s", e)
logging.exception(f"Failed to load file {filename}")
raise e
def load_once(self, filename: str) -> bytes:
try:
return self.storage_runner.load_once(filename)
except Exception as e:
logging.exception("Failed to load_once file: %s", e)
logging.exception(f"Failed to load_once file {filename}")
raise e
def load_stream(self, filename: str) -> Generator:
try:
return self.storage_runner.load_stream(filename)
except Exception as e:
logging.exception("Failed to load_stream file: %s", e)
logging.exception(f"Failed to load_stream file {filename}")
raise e
def download(self, filename, target_filepath):
try:
self.storage_runner.download(filename, target_filepath)
except Exception as e:
logging.exception("Failed to download file: %s", e)
logging.exception(f"Failed to download file {filename}")
raise e
def exists(self, filename):
try:
return self.storage_runner.exists(filename)
except Exception as e:
logging.exception("Failed to check file exists: %s", e)
logging.exception(f"Failed to check file exists {filename}")
raise e
def delete(self, filename):
try:
return self.storage_runner.delete(filename)
except Exception as e:
logging.exception("Failed to delete file: %s", e)
logging.exception(f"Failed to delete file {filename}")
raise e