Init storage engine (#13707)

### What problem does this PR solve?

1. Init Minio / S3 / OSS
2. Fix minio / s3 / oss config

### Type of change

- [x] New Feature (non-breaking change which adds functionality)

---------

Signed-off-by: Jin Hai <haijin.chn@gmail.com>
This commit is contained in:
Jin Hai
2026-03-20 13:15:41 +08:00
committed by GitHub
parent 04a60a41e0
commit 9ce766192f
6 changed files with 115 additions and 138 deletions

View File

@ -10,6 +10,7 @@ import (
"ragflow/internal/common"
"ragflow/internal/server"
"ragflow/internal/server/local"
"ragflow/internal/storage"
"ragflow/internal/utility"
"strings"
"syscall"
@ -118,6 +119,10 @@ func main() {
}
defer cache.Close()
if err := storage.InitStorageFactory(); err != nil {
logger.Fatal("Failed to initialize storage factory", zap.Error(err))
}
// Initialize server variables (runtime variables that can change during operation)
// This must be done after Cache is initialized
if err := server.InitVariables(cache.Get()); err != nil {

View File

@ -159,8 +159,8 @@ const (
// OSSConfig holds Aliyun OSS storage configuration
// OSS is compatible with S3 API
type OSSConfig struct {
AccessKeyID string `mapstructure:"access_key"` // OSS Access Key ID
SecretAccessKey string `mapstructure:"secret_key"` // OSS Secret Access Key
AccessKey string `mapstructure:"access_key"` // OSS Access Key ID
SecretKey string `mapstructure:"secret_key"` // OSS Secret Access Key
EndpointURL string `mapstructure:"endpoint_url"` // OSS Endpoint (e.g., "https://oss-cn-hangzhou.aliyuncs.com")
Region string `mapstructure:"region"` // Region (e.g., "cn-hangzhou")
Bucket string `mapstructure:"bucket"` // Default bucket (optional)
@ -182,10 +182,10 @@ type MinioConfig struct {
// S3Config holds AWS S3 storage configuration
type S3Config struct {
AccessKeyID string `mapstructure:"access_key"` // AWS Access Key ID
SecretAccessKey string `mapstructure:"secret_key"` // AWS Secret Access Key
SessionToken string `mapstructure:"session_token"` // AWS Session Token (optional)
AccessKey string `mapstructure:"access_key"` // AWS Access Key ID
SecretKey string `mapstructure:"secret_key"` // AWS Secret Access Key
Region string `mapstructure:"region_name"` // AWS Region
SessionToken string `mapstructure:"session_token"` // AWS Session Token (optional)
EndpointURL string `mapstructure:"endpoint_url"` // Custom endpoint (optional)
SignatureVersion string `mapstructure:"signature_version"` // Signature version
AddressingStyle string `mapstructure:"addressing_style"` // Addressing style
@ -425,17 +425,22 @@ func FromEnvironments() error {
}
// Storage
//storageType := strings.ToLower(os.Getenv("STORAGE_IMPL"))
//switch storageType {
//case "minio":
// globalConfig.StorageEngine.Type = StorageMinio
//case "s3":
// globalConfig.StorageEngine.Type = StorageS3
//case "oss":
// globalConfig.StorageEngine.Type = StorageOSS
//default:
// return fmt.Errorf("invalid storage type: %s", storageType)
//}
storageType := strings.ToLower(os.Getenv("STORAGE_IMPL"))
switch storageType {
case "minio":
globalConfig.StorageEngine.Type = StorageMinio
case "s3":
globalConfig.StorageEngine.Type = StorageS3
case "oss":
globalConfig.StorageEngine.Type = StorageOSS
case "":
// Default
if globalConfig.StorageEngine.Type == "" {
globalConfig.StorageEngine.Type = StorageMinio
}
default:
return fmt.Errorf("invalid storage type: %s", storageType)
}
// Language
if globalConfig.Language == "" {
@ -566,11 +571,6 @@ func FromConfigFile(configPath string) error {
// Map doc_engine section to DocEngineConfig
if globalConfig != nil && globalConfig.DocEngine.Type == "" {
// Use DOC_ENGINE env var if set
//if docEngine != "" {
// globalConfig.DocEngine.Type = EngineType(docEngine)
//}
// Try to map from doc_engine section (overrides env var if present)
if v.IsSet("doc_engine") {
docEngineConfig := v.Sub("doc_engine")
if docEngineConfig != nil {
@ -610,6 +610,56 @@ func FromConfigFile(configPath string) error {
}
}
if globalConfig != nil && globalConfig.StorageEngine.Type == "" {
// Also check legacy es section for backward compatibility
if v.IsSet("minio") {
minioConfig := v.Sub("minio")
if minioConfig != nil {
if globalConfig.StorageEngine.Minio == nil {
globalConfig.StorageEngine.Minio = &MinioConfig{
Host: minioConfig.GetString("host"),
User: minioConfig.GetString("user"),
Password: minioConfig.GetString("password"),
Secure: minioConfig.GetBool("secure"),
PrefixPath: minioConfig.GetString("prefix_path"),
Verify: minioConfig.GetBool("verify"),
Bucket: minioConfig.GetString("bucket"),
}
}
}
}
if v.IsSet("s3") {
s3Config := v.Sub("s3")
if s3Config != nil {
if globalConfig.StorageEngine.S3 == nil {
globalConfig.StorageEngine.S3 = &S3Config{
AccessKey: s3Config.GetString("access_key"),
SecretKey: s3Config.GetString("secret_key"),
Region: s3Config.GetString("region"),
}
}
}
}
if v.IsSet("oss") {
ossConfig := v.Sub("oss")
if ossConfig != nil {
if globalConfig.StorageEngine.OSS == nil {
globalConfig.StorageEngine.OSS = &OSSConfig{
AccessKey: ossConfig.GetString("access_key"),
SecretKey: ossConfig.GetString("secret_key"),
EndpointURL: ossConfig.GetString("endpoint_url"),
Region: ossConfig.GetString("region"),
Bucket: ossConfig.GetString("bucket"),
SignatureVersion: ossConfig.GetString("signature_version"),
AddressingStyle: ossConfig.GetString("addressing_style"),
}
}
}
}
}
// Map user_default_llm section to UserDefaultLLMConfig
if v.IsSet("user_default_llm") {
userDefaultLLMConfig := v.Sub("user_default_llm")

View File

@ -22,6 +22,7 @@ import (
"crypto/tls"
"fmt"
"net/http"
"ragflow/internal/server"
"time"
"github.com/minio/minio-go/v7"
@ -34,11 +35,11 @@ type MinioStorage struct {
client *minio.Client
bucket string
prefixPath string
config *MinioConfig
config *server.MinioConfig
}
// NewMinioStorage creates a new MinIO storage instance
func NewMinioStorage(config *MinioConfig) (*MinioStorage, error) {
func NewMinioStorage(config *server.MinioConfig) (*MinioStorage, error) {
storage := &MinioStorage{
bucket: config.Bucket,
prefixPath: config.PrefixPath,

View File

@ -21,6 +21,7 @@ import (
"context"
"errors"
"fmt"
"ragflow/internal/server"
"time"
"github.com/aws/aws-sdk-go-v2/aws"
@ -37,11 +38,11 @@ type OSSStorage struct {
client *s3.Client
bucket string
prefixPath string
config *OSSConfig
config *server.OSSConfig
}
// NewOSSStorage creates a new OSS storage instance
func NewOSSStorage(config *OSSConfig) (*OSSStorage, error) {
func NewOSSStorage(config *server.OSSConfig) (*OSSStorage, error) {
storage := &OSSStorage{
bucket: config.Bucket,
prefixPath: config.PrefixPath,
@ -60,8 +61,8 @@ func (o *OSSStorage) connect() error {
// Create static credentials
creds := credentials.NewStaticCredentialsProvider(
o.config.AccessKeyID,
o.config.SecretAccessKey,
o.config.AccessKey,
o.config.SecretKey,
"",
)

View File

@ -21,6 +21,7 @@ import (
"context"
"errors"
"fmt"
"ragflow/internal/server"
"time"
"github.com/aws/aws-sdk-go-v2/aws"
@ -36,15 +37,13 @@ type S3Storage struct {
client *s3.Client
bucket string
prefixPath string
config *S3Config
config *server.S3Config
}
// NewS3Storage creates a new S3 storage instance
func NewS3Storage(config *S3Config) (*S3Storage, error) {
func NewS3Storage(config *server.S3Config) (*S3Storage, error) {
storage := &S3Storage{
bucket: config.Bucket,
prefixPath: config.PrefixPath,
config: config,
config: config,
}
if err := storage.connect(); err != nil {
@ -65,10 +64,10 @@ func (s *S3Storage) connect() error {
}
// Configure credentials if provided
if s.config.AccessKeyID != "" && s.config.SecretAccessKey != "" {
if s.config.AccessKey != "" && s.config.SecretKey != "" {
creds := credentials.NewStaticCredentialsProvider(
s.config.AccessKeyID,
s.config.SecretAccessKey,
s.config.AccessKey,
s.config.SecretKey,
s.config.SessionToken,
)
opts = append(opts, config.WithCredentialsProvider(creds))

View File

@ -18,12 +18,9 @@ package storage
import (
"fmt"
"os"
"ragflow/internal/logger"
"ragflow/internal/server"
"sync"
"github.com/spf13/viper"
"go.uber.org/zap"
)
var (
@ -48,81 +45,37 @@ func GetStorageFactory() *StorageFactory {
}
// InitStorageFactory initializes the storage factory with configuration
func InitStorageFactory(v *viper.Viper) error {
func InitStorageFactory() error {
factory := GetStorageFactory()
// Get storage type from environment or config
storageType := os.Getenv("STORAGE_IMPL")
if storageType == "" {
storageType = v.GetString("storage_type")
}
if storageType == "" {
storageType = "MINIO" // Default storage type
}
storageConfig := &server.StorageConfig{}
if err := v.UnmarshalKey("storage", storageConfig); err != nil {
return fmt.Errorf("failed to unmarshal storage config: %w", err)
}
storageConfig.StorageType = storageType
factory.config = storageConfig
globalConfig := server.GetConfig()
factory.config = &globalConfig.StorageEngine
// Initialize storage based on type
if err := factory.initStorage(storageType, v); err != nil {
if err := factory.initStorage(); err != nil {
return err
}
zap.L().Info("Storage factory initialized",
zap.String("storage_type", storageType),
)
logger.Info(fmt.Sprintf("Storage initialized: %s", factory.config.Type))
return nil
}
// initStorage initializes the specific storage implementation
func (f *StorageFactory) initStorage(storageType string, v *viper.Viper) error {
switch storageType {
case "MINIO":
return f.initMinio(v)
case "AWS_S3":
return f.initS3(v)
case "OSS":
return f.initOSS(v)
func (f *StorageFactory) initStorage() error {
switch f.config.Type {
case "minio":
return f.initMinio(f.config.Minio)
case "s3":
return f.initS3(f.config.S3)
case "oss":
return f.initOSS(f.config.OSS)
default:
return fmt.Errorf("unsupported storage type: %s", storageType)
return fmt.Errorf("unsupported storage type: %s", f.config.Type)
}
}
func (f *StorageFactory) initMinio(v *viper.Viper) error {
config := &server.MinioConfig{}
// Try to load from minio section first
if v.IsSet("minio") {
minioConfig := v.Sub("minio")
if minioConfig != nil {
config.Host = minioConfig.GetString("host")
config.User = minioConfig.GetString("user")
config.Password = minioConfig.GetString("password")
config.Secure = minioConfig.GetBool("secure")
config.Verify = minioConfig.GetBool("verify")
config.Bucket = minioConfig.GetString("bucket")
config.PrefixPath = minioConfig.GetString("prefix_path")
}
}
// Apply defaults
if config.Host == "" {
config.Host = "localhost:9000"
}
if config.User == "" {
config.User = "minioadmin"
}
if config.Password == "" {
config.Password = "minioadmin"
}
storage, err := NewMinioStorage(config)
func (f *StorageFactory) initMinio(minioConfig *server.MinioConfig) error {
storage, err := NewMinioStorage(minioConfig)
if err != nil {
return fmt.Errorf("failed to create MinIO storage: %w", err)
}
@ -131,30 +84,13 @@ func (f *StorageFactory) initMinio(v *viper.Viper) error {
defer f.mu.Unlock()
f.storageType = StorageMinio
f.storage = storage
f.config.Minio = config
f.config.Minio = minioConfig
return nil
}
func (f *StorageFactory) initS3(v *viper.Viper) error {
config := &server.S3Config{}
if v.IsSet("s3") {
s3Config := v.Sub("s3")
if s3Config != nil {
config.AccessKeyID = s3Config.GetString("access_key")
config.SecretAccessKey = s3Config.GetString("secret_key")
config.SessionToken = s3Config.GetString("session_token")
config.Region = s3Config.GetString("region_name")
config.EndpointURL = s3Config.GetString("endpoint_url")
config.SignatureVersion = s3Config.GetString("signature_version")
config.AddressingStyle = s3Config.GetString("addressing_style")
config.Bucket = s3Config.GetString("bucket")
config.PrefixPath = s3Config.GetString("prefix_path")
}
}
storage, err := NewS3Storage(config)
func (f *StorageFactory) initS3(s3Config *server.S3Config) error {
storage, err := NewS3Storage(s3Config)
if err != nil {
return fmt.Errorf("failed to create S3 storage: %w", err)
}
@ -163,29 +99,14 @@ func (f *StorageFactory) initS3(v *viper.Viper) error {
defer f.mu.Unlock()
f.storageType = StorageAWSS3
f.storage = storage
f.config.S3 = config
f.config.S3 = s3Config
return nil
}
func (f *StorageFactory) initOSS(v *viper.Viper) error {
config := &server.OSSConfig{}
func (f *StorageFactory) initOSS(ossConfig *server.OSSConfig) error {
if v.IsSet("oss") {
ossConfig := v.Sub("oss")
if ossConfig != nil {
config.AccessKeyID = ossConfig.GetString("access_key")
config.SecretAccessKey = ossConfig.GetString("secret_key")
config.EndpointURL = ossConfig.GetString("endpoint_url")
config.Region = ossConfig.GetString("region")
config.Bucket = ossConfig.GetString("bucket")
config.PrefixPath = ossConfig.GetString("prefix_path")
config.SignatureVersion = ossConfig.GetString("signature_version")
config.AddressingStyle = ossConfig.GetString("addressing_style")
}
}
storage, err := NewOSSStorage(config)
storage, err := NewOSSStorage(ossConfig)
if err != nil {
return fmt.Errorf("failed to create OSS storage: %w", err)
}
@ -194,7 +115,7 @@ func (f *StorageFactory) initOSS(v *viper.Viper) error {
defer f.mu.Unlock()
f.storageType = StorageOSS
f.storage = storage
f.config.OSS = config
f.config.OSS = ossConfig
return nil
}