Files
coze-studio/docs/pulsar-eventbus-integration-guide-en.md

16 KiB

Pulsar EventBus Integration Guide

Overview

This document provides a comprehensive guide for integrating Apache Pulsar as an EventBus in Coze Studio, including architecture design, implementation details, configuration instructions, and usage guidelines.

Integration Background

Why Choose Pulsar?

In Coze Studio's architecture, EventBus plays a critical role in asynchronous message delivery, including workflow execution, Agent communication, data processing pipelines, and other core functions. As user scale grows and business complexity increases, we need a more powerful and flexible message queue solution.

Pulsar, as a next-generation distributed messaging system, brings the following core advantages to Coze Studio:

  1. High Performance: Pulsar provides low-latency, high-throughput messaging that can support Coze Studio's large-scale concurrent Agent execution and workflow processing
  2. Multi-tenancy: Native support for multi-tenant architecture, perfectly matching Coze Studio's multi-user, multi-workspace business model
  3. Persistence: Supports message persistence storage, ensuring the reliability of Agent execution states and workflow data, preventing task loss due to system restarts
  4. Horizontal Scaling: Supports separation of compute and storage, easy to scale horizontally, enabling smooth scaling as Coze Studio's user base grows
  5. Message Ordering: Pulsar provides strong consistency and message ordering guarantees, ensuring that Agent workflow steps execute in the correct sequence, preventing state confusion and data inconsistency
  6. Rich Features: Supports message deduplication, delayed messages, dead letter queues, and other advanced features, providing stronger reliability guarantees for complex AI workflows

Comparison with Other MQ Systems

Feature Pulsar NSQ Kafka RocketMQ
Deployment Complexity Medium Low Medium Medium
Performance High Medium High High
Multi-tenancy Native Support Not Supported Limited Limited
Message Persistence Strong Limited Strong Strong
Message Ordering Strong Weak Strong Strong
Horizontal Scaling Excellent Medium Good Good
Scaling Speed Fast Medium Slow Medium
Operational Complexity Medium Low High Medium
Ecosystem Rich Simple Very Rich Rich

Detailed Comparison of Horizontal Scaling Capabilities

Pulsar's Scaling Advantages:

  • Compute-Storage Separation: Broker (compute) and BookKeeper (storage) scale independently, allowing precise resource adjustment based on business needs
  • Stateless Brokers: Broker nodes are stateless and can start/stop quickly, enabling second-level scaling
  • Automatic Load Balancing: Automatic redistribution of Topics and Partitions when new Brokers are added
  • Hot Scaling: Supports dynamic addition/removal of nodes without service interruption

Comparison with Other MQ Systems:

  • Kafka: Requires manual Partition rebalancing, complex and time-consuming scaling process that may affect business operations
  • RocketMQ: While supporting dynamic scaling, the coordination mechanism between NameServer and Broker is relatively complex
  • NSQ: Single-machine architecture limits scaling capabilities, can only improve throughput by increasing Topic count

This excellent scaling capability makes Pulsar particularly suitable for scenarios like Coze Studio with rapid user growth and fluctuating business loads.

Architecture Design

Overall Architecture

┌─────────────────┐    ┌─────────────────┐    ┌─────────────────┐
│   Coze Studio   │    │  Pulsar         │    │   EventBus      │
│   Application   │───▶│   Client        │───▶│   Manager       │
└─────────────────┘    └─────────────────┘    └─────────────────┘
                                │
                                ▼
                       ┌─────────────────┐
                       │  Apache Pulsar  │
                       │   Cluster       │
                       └─────────────────┘

Core Components

1. Pulsar Producer

File Location: backend/infra/impl/eventbus/pulsar/producer.go

Core Functions:

type Producer interface {
    Send(ctx context.Context, body []byte, opts ...SendOpt) error
    BatchSend(ctx context.Context, bodyArr [][]byte, opts ...SendOpt) error
}

type producerImpl struct {
    topic    string
    client   pulsar.Client
    producer pulsar.Producer
}

Features:

  • Supports synchronous and asynchronous sending
  • Batch sending for performance optimization
  • JWT authentication support
  • Graceful shutdown handling

2. Pulsar Consumer

File Location: backend/infra/impl/eventbus/pulsar/consumer.go

Core Functions:

func RegisterConsumer(serviceURL, topic, group string, 
    consumerHandler eventbus.ConsumerHandler, 
    opts ...eventbus.ConsumerOpt) error

Features:

  • Exclusive mode consumption for message ordering
  • Automatic retry and error handling
  • Context cancellation support
  • Message acknowledgment and negative acknowledgment mechanisms

3. EventBus Factory

File Location: backend/infra/impl/eventbus/eventbus.go

Integration Point:

case consts.MQTypePulsar:
    return pulsar.NewProducer(nameServer, topic, group)

Configuration

Environment Variables

Required Configuration

# Message queue type
COZE_MQ_TYPE=pulsar

# Pulsar service address
MQ_NAME_SERVER=pulsar://localhost:6650

Optional Configuration

# JWT authentication token (if authentication is enabled)
PULSAR_JWT_TOKEN=eyJhbGciOiJIUzI1NiJ9.eyJzdWIiOiJhZG1pbiJ9.example_token

Docker Configuration

Standalone Pulsar Deployment

services:
  pulsar:
    image: apachepulsar/pulsar:3.0.12
    container_name: coze-pulsar
    restart: always
    command: >
      sh -c "bin/pulsar standalone"
    ports:
      - "6650:6650"   # Pulsar service port
      - "8080:8080"   # Pulsar admin port
    volumes:
      - ./data/pulsar:/pulsar/data
    networks:
      - coze-network
    healthcheck:
      test: ["CMD", "curl", "-f", "http://localhost:8080/admin/v2/clusters"]
      interval: 30s
      timeout: 10s
      retries: 5
      start_period: 30s

Production Cluster Deployment

For production environments, it's recommended to use Pulsar cluster deployment to achieve high availability and better performance. Cluster deployment involves configuring multiple components including ZooKeeper, BookKeeper, and Broker, which can be quite complex.

Production Environment Recommendations:

  • Use Pulsar cluster mode deployment for high availability
  • Enable JWT authentication for security
  • Configure appropriate resource limits and monitoring

For detailed cluster deployment configuration, please refer to the Apache Pulsar Official Documentation.

Usage Guide

1. Prepare Project

# Clone the project
git clone https://github.com/coze-dev/coze-studio.git
cd coze-studio

2. Modify Docker Compose Configuration

Add the Pulsar service to your docker/docker-compose.yml file:

services:
  # Add Pulsar service
  pulsar:
    image: apachepulsar/pulsar:3.0.12
    container_name: coze-pulsar
    restart: always
    command: >
      sh -c "bin/pulsar standalone"
    ports:
      - "6650:6650"   # Pulsar service port
      - "8080:8080"   # Pulsar admin port
    volumes:
      - ./data/pulsar:/pulsar/data
    networks:
      - coze-network
    healthcheck:
      test: ["CMD", "curl", "-f", "http://localhost:8080/admin/v2/clusters"]
      interval: 30s
      timeout: 10s
      retries: 5
      start_period: 30s

  # Other existing services...

3. Configure Environment Variables

Modify the .env file to configure Coze Studio to use Pulsar:

# Enter docker directory
cd docker

# Copy environment configuration file
cp .env.example .env

# Edit .env file and add the following configuration:
# Message queue type
COZE_MQ_TYPE=pulsar

# Pulsar service address
MQ_NAME_SERVER=pulsar://pulsar:6650

# JWT authentication token (optional, if authentication is enabled)
# PULSAR_JWT_TOKEN=your_jwt_token_here

4. Start Services

# Start complete Coze Studio services including Pulsar
docker-compose up -d

# Check service startup status
docker-compose ps

5. Verify Deployment

# Check Pulsar container status
docker ps | grep pulsar

# Check Pulsar health status
curl -f http://localhost:8080/admin/v2/clusters

# View Pulsar logs
docker logs coze-pulsar

# Test Pulsar connection
docker exec -it coze-pulsar bin/pulsar-admin clusters list

6. Access Services

  • Coze Studio: http://localhost:3000 (based on actual configuration)
  • Pulsar Admin: http://localhost:8080

Now Coze Studio has successfully integrated Pulsar as the message queue, and all EventBus functionality will be handled through Pulsar.

Appendix

A. Production Cluster Deployment

For production environments, it's recommended to use Pulsar cluster deployment to achieve high availability and better performance. Cluster deployment involves configuring multiple components including ZooKeeper, BookKeeper, and Broker, which can be quite complex.

Production Environment Recommendations:

  • Use Pulsar cluster mode deployment for high availability
  • Enable JWT authentication for security
  • Configure appropriate resource limits and monitoring

For detailed cluster deployment configuration, please refer to the Apache Pulsar Official Documentation.

B. Visual Management Tools

For users who need a graphical interface to manage Pulsar clusters, consider using ASP Community Edition. ASP Community Edition is a modern management platform designed specifically for Apache Pulsar, providing an intuitive web interface to manage clusters, tenants, namespaces, topics, and other resources. The platform supports real-time monitoring, performance metrics display, configuration management, and other features that greatly simplify the daily operations of Pulsar clusters.

For more information, please refer to: ASP Community Edition Documentation

C. Integration Features

1. Design Principles

Architecture Compatibility Design:

  • Strictly follows Coze Studio EventBus interface specifications for seamless integration
  • Uses factory pattern for unified management of multiple MQ systems
  • Maintains interface consistency with NSQ, Kafka, and RocketMQ implementations

Performance First:

  • Asynchronous batch sending reduces network overhead
  • Connection pooling reduces connection costs
  • Message acknowledgment mechanism ensures reliability

Easy Deployment:

  • Standalone mode for quick startup
  • Docker containerized deployment
  • Environment variable configuration for flexibility

2. Technical Highlights

JWT Authentication Support:

// Automatically detect and configure JWT authentication
if jwtToken := os.Getenv(consts.PulsarJWTToken); jwtToken != "" {
    clientOptions.Authentication = pulsar.NewAuthenticationToken(jwtToken)
    logs.Debugf("Using JWT authentication, token length: %d", len(jwtToken))
}

Batch Sending Optimization:

// Asynchronous batch sending for improved performance
for _, body := range bodyArr {
    msg := &pulsar.ProducerMessage{Payload: body}
    if option.ShardingKey != nil {
        msg.Key = *option.ShardingKey
    }
    p.producer.SendAsync(ctx, msg, callback)
}

Graceful Shutdown Handling:

// Listen for system signals and gracefully close resources
safego.Go(context.Background(), func() {
    signal.WaitExit()
    logs.Infof("shutting down pulsar consumer for topic: %s, group: %s", topic, group)
    cancel()
    consumer.Close()
    client.Close()
})

C. Troubleshooting

1. Common Issues

Connection Issues:

# Check Pulsar service status
docker exec -it coze-pulsar bin/pulsar-admin brokers healthcheck

# Check network connectivity
telnet localhost 6650

# View connection configuration
docker exec -it coze-pulsar cat conf/standalone.conf | grep -E "(advertisedAddress|bindAddress)"

Authentication Issues:

# Check JWT Token configuration
echo $PULSAR_JWT_TOKEN

# Verify token validity
docker exec -it coze-pulsar bin/pulsar-admin --auth-plugin org.apache.pulsar.client.impl.auth.AuthenticationToken \
  --auth-params token:$PULSAR_JWT_TOKEN \
  clusters list

Performance Issues:

# View topic backlog
docker exec -it coze-pulsar bin/pulsar-admin topics stats persistent://public/default/your-topic

# Adjust batch sending parameters
# Batch size and delay can be configured through SendOpt in code

2. Log Analysis

# View Pulsar service logs
docker logs coze-pulsar

# View Pulsar-related information in application logs
tail -f logs/coze-studio.log | grep -i "pulsar\|eventbus"

# Enable verbose logging
# Set rootLogLevel=DEBUG in Pulsar configuration

3. Monitoring Metrics

# Get broker metrics
curl http://localhost:8080/metrics/

# Get specific topic metrics
curl http://localhost:8080/admin/v2/persistent/public/default/your-topic/stats

# Monitor consumption lag
docker exec -it coze-pulsar bin/pulsar-admin topics subscriptions persistent://public/default/your-topic

D. Best Practices

1. Production Environment Configuration

# Recommended production environment configuration
COZE_MQ_TYPE=pulsar
MQ_NAME_SERVER=pulsar://pulsar-broker-1:6650,pulsar://pulsar-broker-2:6650,pulsar://pulsar-broker-3:6650
PULSAR_JWT_TOKEN=your-production-jwt-token

# Pulsar cluster configuration
# Recommend at least 3 Broker nodes
# Recommend at least 3 BookKeeper nodes
# Recommend at least 3 ZooKeeper nodes

2. Performance Tuning

# Producer configuration optimization
# Batch size: 1000 messages or 1MB
# Send timeout: 30 seconds
# Compression algorithm: LZ4

# Consumer configuration optimization
# Receive queue size: 1000
# Acknowledgment timeout: 30 seconds
# Consumer type: Exclusive (ensures ordering)

3. Security Configuration

# Enable JWT authentication
PULSAR_JWT_TOKEN=your-jwt-token

# Configure Access Control Lists (ACL)
# Configure topic-level permissions through Pulsar Admin tools

Summary

The Apache Pulsar EventBus integration in Coze Studio achieves the following goals:

  1. High Performance: Supports high-throughput, low-latency messaging
  2. High Reliability: Message persistence storage with acknowledgment mechanisms
  3. Easy Scaling: Supports horizontal scaling to accommodate business growth
  4. Easy Operations: Rich management tools and monitoring metrics
  5. Enterprise-grade: Multi-tenancy support for enterprise applications

Through this integration, Coze Studio provides users with a high-performance, highly reliable, and easily scalable message queue solution, particularly suitable for scenarios requiring high throughput, low latency, and enterprise-grade features.