Add a configuration for controlling the redis instance / type used for streaming events between celery worker and api (vibe-kanban 08e07904)

Currently, the celery worker executing workflows / chatflows uses redis pubsub to publish events to api.
(See \_topic\_msg\_generator and \_publish\_streaming\_response)

The current implementation uses the default redis client.

For large scale deployment, we need to use a dedicated redis cluster to ensure performance.

To achieve this, you should:

1. introduce a dedicated configuration class to control

  the redis address used for pubsub. (Ideally, there should only be one configuration item such as `pubsub_redis_url`, and its default value should be the original redis confugration.)

2. Add an option to switch between pubsub and sharded pubsub. When shared pubsub is specified, the ShardedRedisBroadcastChannel should be used instead.

COmplete the task above, add some unit tests.
This commit is contained in:
QuantumGhost
2026-01-19 07:40:44 +08:00
parent afdf2397f2
commit 2db638b992
8 changed files with 194 additions and 7 deletions

View File

@ -15,6 +15,9 @@ from redis.sentinel import Sentinel
from configs import dify_config
from dify_app import DifyApp
from libs.broadcast_channel.channel import BroadcastChannel as BroadcastChannelProtocol
from libs.broadcast_channel.redis.channel import BroadcastChannel as RedisBroadcastChannel
from libs.broadcast_channel.redis.sharded_channel import ShardedRedisBroadcastChannel
if TYPE_CHECKING:
from redis.lock import Lock
@ -116,6 +119,7 @@ class RedisClientWrapper:
redis_client: RedisClientWrapper = RedisClientWrapper()
pubsub_redis_client: RedisClientWrapper = RedisClientWrapper()
def _get_ssl_configuration() -> tuple[type[Union[Connection, SSLConnection]], dict[str, Any]]:
@ -228,6 +232,12 @@ def _create_standalone_client(redis_params: dict[str, Any]) -> Union[redis.Redis
return client
def _create_pubsub_client(pubsub_url: str, use_clusters: bool) -> Union[redis.Redis, RedisCluster]:
if use_clusters:
return RedisCluster.from_url(pubsub_url)
return redis.Redis.from_url(pubsub_url)
def init_app(app: DifyApp):
"""Initialize Redis client and attach it to the app."""
global redis_client
@ -246,6 +256,22 @@ def init_app(app: DifyApp):
redis_client.initialize(client)
app.extensions["redis"] = redis_client
pubsub_client = client
if dify_config.PUBSUB_REDIS_URL:
pubsub_client = _create_pubsub_client(dify_config.PUBSUB_REDIS_URL, dify_config.PUBSUB_REDIS_USE_CLUSTERS)
pubsub_redis_client.initialize(pubsub_client)
def get_pubsub_redis_client() -> RedisClientWrapper:
return pubsub_redis_client
def get_pubsub_broadcast_channel() -> BroadcastChannelProtocol:
redis_conn = get_pubsub_redis_client()
if dify_config.PUBSUB_REDIS_CHANNEL_TYPE == "sharded":
return ShardedRedisBroadcastChannel(redis_conn)
return RedisBroadcastChannel(redis_conn)
P = ParamSpec("P")
R = TypeVar("R")