mirror of
https://github.com/langgenius/dify.git
synced 2026-04-26 13:45:57 +08:00
merge feat/plugins
This commit is contained in:
@ -1,6 +1,7 @@
|
||||
from datetime import timedelta
|
||||
|
||||
from celery import Celery, Task
|
||||
from celery.schedules import crontab
|
||||
from flask import Flask
|
||||
|
||||
from configs import dify_config
|
||||
@ -55,6 +56,8 @@ def init_app(app: Flask) -> Celery:
|
||||
imports = [
|
||||
"schedule.clean_embedding_cache_task",
|
||||
"schedule.clean_unused_datasets_task",
|
||||
"schedule.create_tidb_serverless_task",
|
||||
"schedule.update_tidb_serverless_status_task",
|
||||
]
|
||||
day = dify_config.CELERY_BEAT_SCHEDULER_TIME
|
||||
beat_schedule = {
|
||||
@ -66,6 +69,14 @@ def init_app(app: Flask) -> Celery:
|
||||
"task": "schedule.clean_unused_datasets_task.clean_unused_datasets_task",
|
||||
"schedule": timedelta(days=day),
|
||||
},
|
||||
"create_tidb_serverless_task": {
|
||||
"task": "schedule.create_tidb_serverless_task.create_tidb_serverless_task",
|
||||
"schedule": crontab(minute="0", hour="*"),
|
||||
},
|
||||
"update_tidb_serverless_status_task": {
|
||||
"task": "schedule.update_tidb_serverless_status_task.update_tidb_serverless_status_task",
|
||||
"schedule": crontab(minute="30", hour="*"),
|
||||
},
|
||||
}
|
||||
celery_app.conf.update(beat_schedule=beat_schedule, imports=imports)
|
||||
|
||||
|
||||
@ -1,8 +1,10 @@
|
||||
import logging
|
||||
import os
|
||||
import sys
|
||||
from datetime import datetime
|
||||
from logging.handlers import RotatingFileHandler
|
||||
|
||||
import pytz
|
||||
from flask import Flask
|
||||
|
||||
from configs import dify_config
|
||||
@ -17,8 +19,8 @@ def init_app(app: Flask):
|
||||
log_handlers = [
|
||||
RotatingFileHandler(
|
||||
filename=log_file,
|
||||
maxBytes=1024 * 1024 * 1024,
|
||||
backupCount=5,
|
||||
maxBytes=dify_config.LOG_FILE_MAX_SIZE * 1024 * 1024,
|
||||
backupCount=dify_config.LOG_FILE_BACKUP_COUNT,
|
||||
),
|
||||
logging.StreamHandler(sys.stdout),
|
||||
]
|
||||
@ -30,16 +32,10 @@ def init_app(app: Flask):
|
||||
handlers=log_handlers,
|
||||
force=True,
|
||||
)
|
||||
|
||||
log_tz = dify_config.LOG_TZ
|
||||
if log_tz:
|
||||
from datetime import datetime
|
||||
|
||||
import pytz
|
||||
|
||||
timezone = pytz.timezone(log_tz)
|
||||
|
||||
def time_converter(seconds):
|
||||
return datetime.utcfromtimestamp(seconds).astimezone(timezone).timetuple()
|
||||
|
||||
for handler in logging.root.handlers:
|
||||
handler.formatter.converter = time_converter
|
||||
handler.formatter.converter = lambda seconds: (
|
||||
datetime.fromtimestamp(seconds, tz=pytz.UTC).astimezone(log_tz).timetuple()
|
||||
)
|
||||
|
||||
@ -36,12 +36,9 @@ class AliyunOssStorage(BaseStorage):
|
||||
return data
|
||||
|
||||
def load_stream(self, filename: str) -> Generator:
|
||||
def generate(filename: str = filename) -> Generator:
|
||||
obj = self.client.get_object(self.__wrapper_folder_filename(filename))
|
||||
while chunk := obj.read(4096):
|
||||
yield chunk
|
||||
|
||||
return generate()
|
||||
obj = self.client.get_object(self.__wrapper_folder_filename(filename))
|
||||
while chunk := obj.read(4096):
|
||||
yield chunk
|
||||
|
||||
def download(self, filename, target_filepath):
|
||||
self.client.get_object_to_file(self.__wrapper_folder_filename(filename), target_filepath)
|
||||
|
||||
@ -62,17 +62,14 @@ class AwsS3Storage(BaseStorage):
|
||||
return data
|
||||
|
||||
def load_stream(self, filename: str) -> Generator:
|
||||
def generate(filename: str = filename) -> Generator:
|
||||
try:
|
||||
response = self.client.get_object(Bucket=self.bucket_name, Key=filename)
|
||||
yield from response["Body"].iter_chunks()
|
||||
except ClientError as ex:
|
||||
if ex.response["Error"]["Code"] == "NoSuchKey":
|
||||
raise FileNotFoundError("File not found")
|
||||
else:
|
||||
raise
|
||||
|
||||
return generate()
|
||||
try:
|
||||
response = self.client.get_object(Bucket=self.bucket_name, Key=filename)
|
||||
yield from response["Body"].iter_chunks()
|
||||
except ClientError as ex:
|
||||
if ex.response["Error"]["Code"] == "NoSuchKey":
|
||||
raise FileNotFoundError("File not found")
|
||||
else:
|
||||
raise
|
||||
|
||||
def download(self, filename, target_filepath):
|
||||
self.client.download_file(self.bucket_name, filename, target_filepath)
|
||||
|
||||
@ -32,13 +32,9 @@ class AzureBlobStorage(BaseStorage):
|
||||
|
||||
def load_stream(self, filename: str) -> Generator:
|
||||
client = self._sync_client()
|
||||
|
||||
def generate(filename: str = filename) -> Generator:
|
||||
blob = client.get_blob_client(container=self.bucket_name, blob=filename)
|
||||
blob_data = blob.download_blob()
|
||||
yield from blob_data.chunks()
|
||||
|
||||
return generate(filename)
|
||||
blob = client.get_blob_client(container=self.bucket_name, blob=filename)
|
||||
blob_data = blob.download_blob()
|
||||
yield from blob_data.chunks()
|
||||
|
||||
def download(self, filename, target_filepath):
|
||||
client = self._sync_client()
|
||||
|
||||
@ -39,12 +39,9 @@ class BaiduObsStorage(BaseStorage):
|
||||
return response.data.read()
|
||||
|
||||
def load_stream(self, filename: str) -> Generator:
|
||||
def generate(filename: str = filename) -> Generator:
|
||||
response = self.client.get_object(bucket_name=self.bucket_name, key=filename).data
|
||||
while chunk := response.read(4096):
|
||||
yield chunk
|
||||
|
||||
return generate()
|
||||
response = self.client.get_object(bucket_name=self.bucket_name, key=filename).data
|
||||
while chunk := response.read(4096):
|
||||
yield chunk
|
||||
|
||||
def download(self, filename, target_filepath):
|
||||
self.client.get_object_to_file(bucket_name=self.bucket_name, key=filename, file_name=target_filepath)
|
||||
|
||||
@ -39,14 +39,11 @@ class GoogleCloudStorage(BaseStorage):
|
||||
return data
|
||||
|
||||
def load_stream(self, filename: str) -> Generator:
|
||||
def generate(filename: str = filename) -> Generator:
|
||||
bucket = self.client.get_bucket(self.bucket_name)
|
||||
blob = bucket.get_blob(filename)
|
||||
with blob.open(mode="rb") as blob_stream:
|
||||
while chunk := blob_stream.read(4096):
|
||||
yield chunk
|
||||
|
||||
return generate()
|
||||
bucket = self.client.get_bucket(self.bucket_name)
|
||||
blob = bucket.get_blob(filename)
|
||||
with blob.open(mode="rb") as blob_stream:
|
||||
while chunk := blob_stream.read(4096):
|
||||
yield chunk
|
||||
|
||||
def download(self, filename, target_filepath):
|
||||
bucket = self.client.get_bucket(self.bucket_name)
|
||||
|
||||
@ -27,12 +27,9 @@ class HuaweiObsStorage(BaseStorage):
|
||||
return data
|
||||
|
||||
def load_stream(self, filename: str) -> Generator:
|
||||
def generate(filename: str = filename) -> Generator:
|
||||
response = self.client.getObject(bucketName=self.bucket_name, objectKey=filename)["body"].response
|
||||
while chunk := response.read(4096):
|
||||
yield chunk
|
||||
|
||||
return generate()
|
||||
response = self.client.getObject(bucketName=self.bucket_name, objectKey=filename)["body"].response
|
||||
while chunk := response.read(4096):
|
||||
yield chunk
|
||||
|
||||
def download(self, filename, target_filepath):
|
||||
self.client.getObject(bucketName=self.bucket_name, objectKey=filename, downloadPath=target_filepath)
|
||||
|
||||
@ -19,68 +19,44 @@ class LocalFsStorage(BaseStorage):
|
||||
folder = os.path.join(current_app.root_path, folder)
|
||||
self.folder = folder
|
||||
|
||||
def save(self, filename, data):
|
||||
def _build_filepath(self, filename: str) -> str:
|
||||
"""Build the full file path based on the folder and filename."""
|
||||
if not self.folder or self.folder.endswith("/"):
|
||||
filename = self.folder + filename
|
||||
return self.folder + filename
|
||||
else:
|
||||
filename = self.folder + "/" + filename
|
||||
return self.folder + "/" + filename
|
||||
|
||||
folder = os.path.dirname(filename)
|
||||
def save(self, filename, data):
|
||||
filepath = self._build_filepath(filename)
|
||||
folder = os.path.dirname(filepath)
|
||||
os.makedirs(folder, exist_ok=True)
|
||||
|
||||
Path(os.path.join(os.getcwd(), filename)).write_bytes(data)
|
||||
Path(os.path.join(os.getcwd(), filepath)).write_bytes(data)
|
||||
|
||||
def load_once(self, filename: str) -> bytes:
|
||||
if not self.folder or self.folder.endswith("/"):
|
||||
filename = self.folder + filename
|
||||
else:
|
||||
filename = self.folder + "/" + filename
|
||||
|
||||
if not os.path.exists(filename):
|
||||
filepath = self._build_filepath(filename)
|
||||
if not os.path.exists(filepath):
|
||||
raise FileNotFoundError("File not found")
|
||||
|
||||
data = Path(filename).read_bytes()
|
||||
return data
|
||||
return Path(filepath).read_bytes()
|
||||
|
||||
def load_stream(self, filename: str) -> Generator:
|
||||
def generate(filename: str = filename) -> Generator:
|
||||
if not self.folder or self.folder.endswith("/"):
|
||||
filename = self.folder + filename
|
||||
else:
|
||||
filename = self.folder + "/" + filename
|
||||
|
||||
if not os.path.exists(filename):
|
||||
raise FileNotFoundError("File not found")
|
||||
|
||||
with open(filename, "rb") as f:
|
||||
while chunk := f.read(4096): # Read in chunks of 4KB
|
||||
yield chunk
|
||||
|
||||
return generate()
|
||||
filepath = self._build_filepath(filename)
|
||||
if not os.path.exists(filepath):
|
||||
raise FileNotFoundError("File not found")
|
||||
with open(filepath, "rb") as f:
|
||||
while chunk := f.read(4096): # Read in chunks of 4KB
|
||||
yield chunk
|
||||
|
||||
def download(self, filename, target_filepath):
|
||||
if not self.folder or self.folder.endswith("/"):
|
||||
filename = self.folder + filename
|
||||
else:
|
||||
filename = self.folder + "/" + filename
|
||||
|
||||
if not os.path.exists(filename):
|
||||
filepath = self._build_filepath(filename)
|
||||
if not os.path.exists(filepath):
|
||||
raise FileNotFoundError("File not found")
|
||||
|
||||
shutil.copyfile(filename, target_filepath)
|
||||
shutil.copyfile(filepath, target_filepath)
|
||||
|
||||
def exists(self, filename):
|
||||
if not self.folder or self.folder.endswith("/"):
|
||||
filename = self.folder + filename
|
||||
else:
|
||||
filename = self.folder + "/" + filename
|
||||
|
||||
return os.path.exists(filename)
|
||||
filepath = self._build_filepath(filename)
|
||||
return os.path.exists(filepath)
|
||||
|
||||
def delete(self, filename):
|
||||
if not self.folder or self.folder.endswith("/"):
|
||||
filename = self.folder + filename
|
||||
else:
|
||||
filename = self.folder + "/" + filename
|
||||
if os.path.exists(filename):
|
||||
os.remove(filename)
|
||||
filepath = self._build_filepath(filename)
|
||||
if os.path.exists(filepath):
|
||||
os.remove(filepath)
|
||||
|
||||
@ -36,17 +36,14 @@ class OracleOCIStorage(BaseStorage):
|
||||
return data
|
||||
|
||||
def load_stream(self, filename: str) -> Generator:
|
||||
def generate(filename: str = filename) -> Generator:
|
||||
try:
|
||||
response = self.client.get_object(Bucket=self.bucket_name, Key=filename)
|
||||
yield from response["Body"].iter_chunks()
|
||||
except ClientError as ex:
|
||||
if ex.response["Error"]["Code"] == "NoSuchKey":
|
||||
raise FileNotFoundError("File not found")
|
||||
else:
|
||||
raise
|
||||
|
||||
return generate()
|
||||
try:
|
||||
response = self.client.get_object(Bucket=self.bucket_name, Key=filename)
|
||||
yield from response["Body"].iter_chunks()
|
||||
except ClientError as ex:
|
||||
if ex.response["Error"]["Code"] == "NoSuchKey":
|
||||
raise FileNotFoundError("File not found")
|
||||
else:
|
||||
raise
|
||||
|
||||
def download(self, filename, target_filepath):
|
||||
self.client.download_file(self.bucket_name, filename, target_filepath)
|
||||
|
||||
@ -36,17 +36,14 @@ class SupabaseStorage(BaseStorage):
|
||||
return content
|
||||
|
||||
def load_stream(self, filename: str) -> Generator:
|
||||
def generate(filename: str = filename) -> Generator:
|
||||
result = self.client.storage.from_(self.bucket_name).download(filename)
|
||||
byte_stream = io.BytesIO(result)
|
||||
while chunk := byte_stream.read(4096): # Read in chunks of 4KB
|
||||
yield chunk
|
||||
|
||||
return generate()
|
||||
result = self.client.storage.from_(self.bucket_name).download(filename)
|
||||
byte_stream = io.BytesIO(result)
|
||||
while chunk := byte_stream.read(4096): # Read in chunks of 4KB
|
||||
yield chunk
|
||||
|
||||
def download(self, filename, target_filepath):
|
||||
result = self.client.storage.from_(self.bucket_name).download(filename)
|
||||
Path(result).write_bytes(result)
|
||||
Path(target_filepath).write_bytes(result)
|
||||
|
||||
def exists(self, filename):
|
||||
result = self.client.storage.from_(self.bucket_name).list(filename)
|
||||
|
||||
@ -29,11 +29,8 @@ class TencentCosStorage(BaseStorage):
|
||||
return data
|
||||
|
||||
def load_stream(self, filename: str) -> Generator:
|
||||
def generate(filename: str = filename) -> Generator:
|
||||
response = self.client.get_object(Bucket=self.bucket_name, Key=filename)
|
||||
yield from response["Body"].get_stream(chunk_size=4096)
|
||||
|
||||
return generate()
|
||||
response = self.client.get_object(Bucket=self.bucket_name, Key=filename)
|
||||
yield from response["Body"].get_stream(chunk_size=4096)
|
||||
|
||||
def download(self, filename, target_filepath):
|
||||
response = self.client.get_object(Bucket=self.bucket_name, Key=filename)
|
||||
|
||||
@ -27,12 +27,9 @@ class VolcengineTosStorage(BaseStorage):
|
||||
return data
|
||||
|
||||
def load_stream(self, filename: str) -> Generator:
|
||||
def generate(filename: str = filename) -> Generator:
|
||||
response = self.client.get_object(bucket=self.bucket_name, key=filename)
|
||||
while chunk := response.read(4096):
|
||||
yield chunk
|
||||
|
||||
return generate()
|
||||
response = self.client.get_object(bucket=self.bucket_name, key=filename)
|
||||
while chunk := response.read(4096):
|
||||
yield chunk
|
||||
|
||||
def download(self, filename, target_filepath):
|
||||
self.client.get_object_to_file(bucket=self.bucket_name, key=filename, file_path=target_filepath)
|
||||
|
||||
Reference in New Issue
Block a user