From 16396f5693dde89c47996dadf411ff3ee7c2f2bc Mon Sep 17 00:00:00 2001 From: pozen Date: Tue, 4 Nov 2025 09:48:46 +0800 Subject: [PATCH] feat: add NATS EventBus implementation (#2385) Co-authored-by: pozen --- backend/go.mod | 3 + backend/go.sum | 5 + backend/infra/eventbus/impl/eventbus.go | 9 +- backend/infra/eventbus/impl/nats/consumer.go | 283 ++++++++++ backend/infra/eventbus/impl/nats/nats_test.go | 275 ++++++++++ backend/infra/eventbus/impl/nats/producer.go | 243 +++++++++ backend/types/consts/consts.go | 6 + docker/.env.example | 23 +- docs/nats-eventbus-integration-guide-en.md | 507 ++++++++++++++++++ docs/nats-eventbus-integration-guide.md | 507 ++++++++++++++++++ 10 files changed, 1858 insertions(+), 3 deletions(-) create mode 100644 backend/infra/eventbus/impl/nats/consumer.go create mode 100644 backend/infra/eventbus/impl/nats/nats_test.go create mode 100644 backend/infra/eventbus/impl/nats/producer.go create mode 100644 docs/nats-eventbus-integration-guide-en.md create mode 100644 docs/nats-eventbus-integration-guide.md diff --git a/backend/go.mod b/backend/go.mod index 15243ed15..bef54a89d 100755 --- a/backend/go.mod +++ b/backend/go.mod @@ -285,6 +285,7 @@ require ( require ( github.com/apache/pulsar-client-go v0.16.0 github.com/eino-contrib/ollama v0.1.0 + github.com/nats-io/nats.go v1.34.1 ) require ( @@ -312,6 +313,8 @@ require ( github.com/mitchellh/mapstructure v1.5.0 // indirect github.com/mtibben/percent v0.2.1 // indirect github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect + github.com/nats-io/nkeys v0.4.7 // indirect + github.com/nats-io/nuid v1.0.1 // indirect github.com/shoenig/go-m1cpu v0.1.6 // indirect github.com/wk8/go-ordered-map/v2 v2.1.8 // indirect go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.59.0 // indirect diff --git a/backend/go.sum b/backend/go.sum index d4b5080cc..39b0c3bd3 100644 --- a/backend/go.sum +++ b/backend/go.sum @@ -870,9 +870,14 @@ github.com/nats-io/jwt/v2 v2.0.3/go.mod h1:VRP+deawSXyhNjXmxPCHskrR6Mq50BqpEI5SE github.com/nats-io/nats-server/v2 v2.5.0/go.mod h1:Kj86UtrXAL6LwYRA6H4RqzkHhK0Vcv2ZnKD5WbQ1t3g= github.com/nats-io/nats.go v1.9.1/go.mod h1:ZjDU1L/7fJ09jvUSRVBR2e7+RnLiiIQyqyzEE/Zbp4w= github.com/nats-io/nats.go v1.12.1/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w= +github.com/nats-io/nats.go v1.34.1 h1:syWey5xaNHZgicYBemv0nohUPPmaLteiBEUT6Q5+F/4= +github.com/nats-io/nats.go v1.34.1/go.mod h1:Ubdu4Nh9exXdSz0RVWRFBbRfrbSxOYd26oF0wkWclB8= github.com/nats-io/nkeys v0.1.0/go.mod h1:xpnFELMwJABBLVhffcfd1MZx6VsNRFpEugbxziKVo7w= github.com/nats-io/nkeys v0.2.0/go.mod h1:XdZpAbhgyyODYqjTawOnIOI7VlbKSarI9Gfy1tqEu/s= github.com/nats-io/nkeys v0.3.0/go.mod h1:gvUNGjVcM2IPr5rCsRsC6Wb3Hr2CQAm08dsxtV6A5y4= +github.com/nats-io/nkeys v0.4.7 h1:RwNJbbIdYCoClSDNY7QVKZlyb/wfT6ugvFCiKy6vDvI= +github.com/nats-io/nkeys v0.4.7/go.mod h1:kqXRgRDPlGy7nGaEDMuYzmiJCIAAWDK0IMBtDmGD0nc= +github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno= github.com/nikolalohinski/gonja v1.5.3 h1:GsA+EEaZDZPGJ8JtpeGN78jidhOlxeJROpqMT9fTj9c= diff --git a/backend/infra/eventbus/impl/eventbus.go b/backend/infra/eventbus/impl/eventbus.go index 5061c79b7..abe0d7964 100644 --- a/backend/infra/eventbus/impl/eventbus.go +++ b/backend/infra/eventbus/impl/eventbus.go @@ -22,6 +22,7 @@ import ( "github.com/coze-dev/coze-studio/backend/infra/eventbus" "github.com/coze-dev/coze-studio/backend/infra/eventbus/impl/kafka" + "github.com/coze-dev/coze-studio/backend/infra/eventbus/impl/nats" "github.com/coze-dev/coze-studio/backend/infra/eventbus/impl/nsq" "github.com/coze-dev/coze-studio/backend/infra/eventbus/impl/pulsar" "github.com/coze-dev/coze-studio/backend/infra/eventbus/impl/rmq" @@ -57,9 +58,11 @@ func (consumerServiceImpl) RegisterConsumer(nameServer, topic, group string, con return rmq.RegisterConsumer(nameServer, topic, group, consumerHandler, opts...) case "pulsar": return pulsar.RegisterConsumer(nameServer, topic, group, consumerHandler, opts...) + case "nats": + return nats.RegisterConsumer(nameServer, topic, group, consumerHandler, opts...) } - return fmt.Errorf("invalid mq type: %s , only support nsq, kafka, rmq, pulsar", tp) + return fmt.Errorf("invalid mq type: %s , only support nsq, kafka, rmq, pulsar, nats", tp) } func NewProducer(nameServer, topic, group string, retries int) (eventbus.Producer, error) { @@ -73,9 +76,11 @@ func NewProducer(nameServer, topic, group string, retries int) (eventbus.Produce return rmq.NewProducer(nameServer, topic, group, retries) case "pulsar": return pulsar.NewProducer(nameServer, topic, group) + case "nats": + return nats.NewProducer(nameServer, topic, group) } - return nil, fmt.Errorf("invalid mq type: %s , only support nsq, kafka, rmq, pulsar", tp) + return nil, fmt.Errorf("invalid mq type: %s , only support nsq, kafka, rmq, pulsar, nats", tp) } func InitResourceEventBusProducer() (eventbus.Producer, error) { diff --git a/backend/infra/eventbus/impl/nats/consumer.go b/backend/infra/eventbus/impl/nats/consumer.go new file mode 100644 index 000000000..0a7f0524b --- /dev/null +++ b/backend/infra/eventbus/impl/nats/consumer.go @@ -0,0 +1,283 @@ +/* + * Copyright 2025 coze-dev Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package nats + +import ( + "context" + "fmt" + "os" + "strings" + "time" + + "github.com/nats-io/nats.go" + + "github.com/coze-dev/coze-studio/backend/infra/eventbus" + "github.com/coze-dev/coze-studio/backend/pkg/lang/signal" + "github.com/coze-dev/coze-studio/backend/pkg/logs" + "github.com/coze-dev/coze-studio/backend/pkg/safego" + "github.com/coze-dev/coze-studio/backend/types/consts" +) + +func RegisterConsumer(serverURL, topic, group string, consumerHandler eventbus.ConsumerHandler, opts ...eventbus.ConsumerOpt) error { + // Validate input parameters + if serverURL == "" { + return fmt.Errorf("NATS server URL is empty") + } + if topic == "" { + return fmt.Errorf("topic is empty") + } + if group == "" { + return fmt.Errorf("group is empty") + } + if consumerHandler == nil { + return fmt.Errorf("consumer handler is nil") + } + + // Parse consumer options + option := &eventbus.ConsumerOption{} + for _, opt := range opts { + opt(option) + } + + // Prepare connection options + natsOptions := []nats.Option{ + nats.Name(fmt.Sprintf("%s-consumer", group)), + nats.ReconnectWait(2 * time.Second), + nats.MaxReconnects(-1), // Unlimited reconnects + nats.DisconnectErrHandler(func(nc *nats.Conn, err error) { + logs.Warnf("NATS consumer disconnected: %v", err) + }), + nats.ReconnectHandler(func(nc *nats.Conn) { + logs.Infof("NATS consumer reconnected to %s", nc.ConnectedUrl()) + }), + } + + // Add authentication support + if err := addAuthentication(&natsOptions); err != nil { + return fmt.Errorf("setup authentication failed: %w", err) + } + + // Create NATS connection + nc, err := nats.Connect(serverURL, natsOptions...) + if err != nil { + return fmt.Errorf("create NATS connection failed: %w", err) + } + + // Check if JetStream is enabled + useJetStream := os.Getenv(consts.NATSUseJetStream) == "true" + + // Create cancellable context for better resource management + ctx, cancel := context.WithCancel(context.Background()) + + if useJetStream { + // Use JetStream for persistent messaging + err = startJetStreamConsumer(ctx, nc, topic, group, consumerHandler) + } else { + // Use core NATS for simple pub/sub + err = startCoreConsumer(ctx, nc, topic, group, consumerHandler) + } + + if err != nil { + nc.Close() + cancel() // Cancel context to prevent leak + return err + } + + // Handle graceful shutdown + safego.Go(context.Background(), func() { + signal.WaitExit() + logs.Infof("shutting down NATS consumer for topic: %s, group: %s", topic, group) + cancel() // Cancel the context to stop consumer loop + nc.Close() + }) + + return nil +} + +// startJetStreamConsumer starts a JetStream-based consumer for persistent messaging +func startJetStreamConsumer(ctx context.Context, nc *nats.Conn, topic, group string, consumerHandler eventbus.ConsumerHandler) error { + // Create JetStream context + js, err := nc.JetStream() + if err != nil { + return fmt.Errorf("create JetStream context failed: %w", err) + } + + // Ensure Stream exists + if err := ensureStream(js, topic); err != nil { + return fmt.Errorf("ensure stream failed: %w", err) + } + + // Start consuming messages in a goroutine + safego.Go(ctx, func() { + defer nc.Close() + + // Create durable pull subscription + sub, err := js.PullSubscribe(topic, group) + if err != nil { + logs.Errorf("create NATS JetStream subscription failed: %v", err) + return + } + defer sub.Unsubscribe() + + for { + select { + case <-ctx.Done(): + logs.Infof("NATS JetStream consumer stopped for topic: %s, group: %s", topic, group) + return + default: + // Fetch one message at a time for better control and resource management + msgs, err := sub.Fetch(1, nats.MaxWait(1*time.Second)) + if err != nil { + if ctx.Err() != nil { + return + } + // Handle timeout and other non-fatal errors + if err == nats.ErrTimeout { + continue + } + logs.Errorf("fetch NATS JetStream message error: %v", err) + continue + } + + // Process the single message + if len(msgs) > 0 { + msg := msgs[0] + eventMsg := &eventbus.Message{ + Topic: topic, + Group: group, + Body: msg.Data, + } + + // Handle message with context + if err := consumerHandler.HandleMessage(ctx, eventMsg); err != nil { + logs.Errorf("handle NATS JetStream message failed, topic: %s, group: %s, err: %v", topic, group, err) + // Negative acknowledge on error + msg.Nak() + continue + } + + // Acknowledge message on success + msg.Ack() + } + } + } + }) + + return nil +} + +// startCoreConsumer starts a core NATS consumer for simple pub/sub +func startCoreConsumer(ctx context.Context, nc *nats.Conn, topic, group string, consumerHandler eventbus.ConsumerHandler) error { + // Start consuming messages in a goroutine + safego.Go(ctx, func() { + defer nc.Close() + + // Create queue subscription for load balancing + sub, err := nc.QueueSubscribe(topic, group, func(msg *nats.Msg) { + eventMsg := &eventbus.Message{ + Topic: topic, + Group: group, + Body: msg.Data, + } + + // Handle message with context + if err := consumerHandler.HandleMessage(ctx, eventMsg); err != nil { + logs.Errorf("handle NATS core message failed, topic: %s, group: %s, err: %v", topic, group, err) + // For core NATS, we can't nack, just log the error + return + } + + logs.Debugf("successfully processed NATS core message, topic: %s, group: %s", topic, group) + }) + + if err != nil { + logs.Errorf("create NATS core subscription failed: %v", err) + return + } + defer sub.Unsubscribe() + + // Wait for context cancellation + <-ctx.Done() + logs.Infof("NATS core consumer stopped for topic: %s, group: %s", topic, group) + }) + + return nil +} + +// addAuthentication adds authentication options to NATS connection +func addAuthentication(options *[]nats.Option) error { + // JWT authentication with NKey + if jwtToken := os.Getenv(consts.NATSJWTToken); jwtToken != "" { + nkeySeed := os.Getenv(consts.NATSNKeySeed) + if nkeySeed == "" { + return fmt.Errorf("NATS_NKEY_SEED is required when using JWT authentication") + } + *options = append(*options, nats.UserJWTAndSeed(jwtToken, nkeySeed)) + return nil + } + + // Username/password authentication + if username := os.Getenv(consts.NATSUsername); username != "" { + password := os.Getenv(consts.NATSPassword) + *options = append(*options, nats.UserInfo(username, password)) + return nil + } + + // Token authentication + if token := os.Getenv(consts.NATSToken); token != "" { + *options = append(*options, nats.Token(token)) + return nil + } + + // No authentication configured + return nil +} + +// ensureStream ensures that a JetStream stream exists for the given subject +func ensureStream(js nats.JetStreamContext, subject string) error { + // Replace dots and other invalid characters with underscores for stream name + // NATS stream names cannot contain dots, spaces, or other special characters + streamName := strings.ReplaceAll(subject, ".", "_") + "_STREAM" + + // Check if Stream already exists + _, err := js.StreamInfo(streamName) + if err == nil { + return nil // Stream already exists + } + + // Only create stream if it's specifically a "stream not found" error + if err != nats.ErrStreamNotFound { + return fmt.Errorf("failed to check stream %s: %w", streamName, err) + } + + // Create Stream if it doesn't exist + _, err = js.AddStream(&nats.StreamConfig{ + Name: streamName, + Subjects: []string{subject}, + Storage: nats.FileStorage, // File storage for persistence + MaxAge: 24 * time.Hour, // Retain messages for 24 hours + MaxMsgs: 1000000, // Maximum number of messages + MaxBytes: 1024 * 1024 * 1024, // Maximum storage size (1GB) + }) + + if err != nil { + return fmt.Errorf("failed to create stream %s: %w", streamName, err) + } + + logs.Infof("created NATS JetStream stream: %s", streamName) + return nil +} diff --git a/backend/infra/eventbus/impl/nats/nats_test.go b/backend/infra/eventbus/impl/nats/nats_test.go new file mode 100644 index 000000000..c425dcd2d --- /dev/null +++ b/backend/infra/eventbus/impl/nats/nats_test.go @@ -0,0 +1,275 @@ +/* + * Copyright 2025 coze-dev Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package nats + +import ( + "context" + "os" + "sync" + "testing" + "time" + + "github.com/nats-io/nats.go" + "github.com/stretchr/testify/assert" + + "github.com/coze-dev/coze-studio/backend/infra/eventbus" + "github.com/coze-dev/coze-studio/backend/types/consts" +) + +var serviceURL = "nats://localhost:4222" + +func TestNATSProducer(t *testing.T) { + if os.Getenv("NATS_LOCAL_TEST") != "true" { + return + } + + // Set up NATS connection options + opts := []nats.Option{nats.Name("test-producer")} + + // Add authentication if provided + if jwtToken := os.Getenv(consts.NATSJWTToken); jwtToken != "" { + opts = append(opts, nats.UserJWT(func() (string, error) { + return jwtToken, nil + }, func(nonce []byte) ([]byte, error) { + return []byte(os.Getenv(consts.NATSNKeySeed)), nil + })) + } else if username := os.Getenv(consts.NATSUsername); username != "" { + password := os.Getenv(consts.NATSPassword) + opts = append(opts, nats.UserInfo(username, password)) + } else if token := os.Getenv(consts.NATSToken); token != "" { + opts = append(opts, nats.Token(token)) + } + + nc, err := nats.Connect(serviceURL, opts...) + assert.NoError(t, err) + defer nc.Close() + + // Test core NATS publishing + err = nc.Publish("test.subject", []byte("hello from core NATS")) + assert.NoError(t, err) + t.Log("Message sent via core NATS") + + // Test JetStream publishing if enabled + if os.Getenv(consts.NATSUseJetStream) == "true" { + js, err := nc.JetStream() + assert.NoError(t, err) + + // Ensure stream exists + _, err = js.AddStream(&nats.StreamConfig{ + Name: "TEST_STREAM", + Subjects: []string{"test.jetstream.>"}, + }) + if err != nil && err != nats.ErrStreamNameAlreadyInUse { + assert.NoError(t, err) + } + + _, err = js.Publish("test.jetstream.subject", []byte("hello from JetStream")) + assert.NoError(t, err) + t.Log("Message sent via JetStream") + } +} + +func TestNATSConsumer(t *testing.T) { + if os.Getenv("NATS_LOCAL_TEST") != "true" { + return + } + + // Set up NATS connection options + opts := []nats.Option{nats.Name("test-consumer")} + + // Add authentication if provided + if jwtToken := os.Getenv(consts.NATSJWTToken); jwtToken != "" { + opts = append(opts, nats.UserJWT(func() (string, error) { + return jwtToken, nil + }, func(nonce []byte) ([]byte, error) { + return []byte(os.Getenv(consts.NATSNKeySeed)), nil + })) + } else if username := os.Getenv(consts.NATSUsername); username != "" { + password := os.Getenv(consts.NATSPassword) + opts = append(opts, nats.UserInfo(username, password)) + } else if token := os.Getenv(consts.NATSToken); token != "" { + opts = append(opts, nats.Token(token)) + } + + nc, err := nats.Connect(serviceURL, opts...) + assert.NoError(t, err) + defer nc.Close() + + // Test core NATS subscription + t.Run("CoreNATSConsumer", func(t *testing.T) { + wg := sync.WaitGroup{} + wg.Add(1) + + // Subscribe to messages + sub, err := nc.QueueSubscribe("test.subject", "test-queue", func(msg *nats.Msg) { + defer wg.Done() + t.Logf("Received core NATS message: %s", string(msg.Data)) + assert.Equal(t, "hello from core NATS", string(msg.Data)) + }) + assert.NoError(t, err) + defer sub.Unsubscribe() + + // Send a test message + err = nc.Publish("test.subject", []byte("hello from core NATS")) + assert.NoError(t, err) + + // Wait for message with timeout + done := make(chan struct{}) + go func() { + wg.Wait() + close(done) + }() + + select { + case <-done: + // Success + case <-time.After(5 * time.Second): + t.Error("Timeout waiting for core NATS message") + } + }) + + // Test JetStream subscription if enabled + if os.Getenv(consts.NATSUseJetStream) == "true" { + t.Run("JetStreamConsumer", func(t *testing.T) { + js, err := nc.JetStream() + assert.NoError(t, err) + + // Ensure stream exists + _, err = js.AddStream(&nats.StreamConfig{ + Name: "TEST_STREAM", + Subjects: []string{"test.jetstream.>"}, + }) + if err != nil && err != nats.ErrStreamNameAlreadyInUse { + assert.NoError(t, err) + } + + wg := sync.WaitGroup{} + wg.Add(1) + + // Subscribe to JetStream messages + sub, err := js.PullSubscribe("test.jetstream.subject", "test-consumer") + assert.NoError(t, err) + defer sub.Unsubscribe() + + // Send a test message + _, err = js.Publish("test.jetstream.subject", []byte("hello from JetStream")) + assert.NoError(t, err) + + go func() { + defer wg.Done() + msgs, err := sub.Fetch(1, nats.MaxWait(5*time.Second)) + if err != nil { + t.Errorf("Failed to fetch JetStream message: %v", err) + return + } + if len(msgs) > 0 { + msg := msgs[0] + t.Logf("Received JetStream message: %s", string(msg.Data)) + assert.Equal(t, "hello from JetStream", string(msg.Data)) + msg.Ack() + } + }() + + // Wait for message with timeout + done := make(chan struct{}) + go func() { + wg.Wait() + close(done) + }() + + select { + case <-done: + // Success + case <-time.After(10 * time.Second): + t.Error("Timeout waiting for JetStream message") + } + }) + } +} + +func TestNATSProducerImpl(t *testing.T) { + if os.Getenv("NATS_LOCAL_TEST") != "true" { + return + } + + producer, err := NewProducer(serviceURL, "test.topic", "test-group") + assert.NoError(t, err) + // Note: eventbus.Producer interface doesn't have Close method + // The underlying connection will be closed when the producer is garbage collected + + // Test single message send + err = producer.Send(context.Background(), []byte("single message test")) + assert.NoError(t, err) + t.Log("Single message sent successfully") + + // Test batch message send + messages := [][]byte{ + []byte("batch message 1"), + []byte("batch message 2"), + } + + err = producer.BatchSend(context.Background(), messages) + assert.NoError(t, err) + t.Log("Batch messages sent successfully") +} + +func TestNATSConsumerImpl(t *testing.T) { + if os.Getenv("NATS_LOCAL_TEST") != "true" { + return + } + + // Create a test message handler + messageReceived := make(chan *eventbus.Message, 1) + handler := &testHandler{ + messageReceived: messageReceived, + t: t, + } + + // Register consumer + err := RegisterConsumer(serviceURL, "test.consumer.impl", "test-group", handler) + assert.NoError(t, err) + + // Send a test message using producer + producer, err := NewProducer(serviceURL, "test.consumer.impl", "test-group") + assert.NoError(t, err) + // Note: eventbus.Producer interface doesn't have Close method + // The underlying connection will be closed when the producer is garbage collected + + err = producer.Send(context.Background(), []byte("consumer implementation test")) + assert.NoError(t, err) + + // Wait for message to be received + select { + case receivedMsg := <-messageReceived: + assert.Equal(t, []byte("consumer implementation test"), receivedMsg.Body) + t.Log("Consumer implementation test passed") + case <-time.After(10 * time.Second): + t.Error("Timeout waiting for message in consumer implementation test") + } +} + +// testHandler implements eventbus.ConsumerHandler for testing +type testHandler struct { + messageReceived chan *eventbus.Message + t *testing.T +} + +func (h *testHandler) HandleMessage(ctx context.Context, message *eventbus.Message) error { + h.t.Logf("Handler received message: %s", string(message.Body)) + h.messageReceived <- message + return nil +} diff --git a/backend/infra/eventbus/impl/nats/producer.go b/backend/infra/eventbus/impl/nats/producer.go new file mode 100644 index 000000000..37f012b4b --- /dev/null +++ b/backend/infra/eventbus/impl/nats/producer.go @@ -0,0 +1,243 @@ +/* + * Copyright 2025 coze-dev Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package nats + +import ( + "context" + "fmt" + "os" + "sync" + + "github.com/nats-io/nats.go" + + "github.com/coze-dev/coze-studio/backend/infra/eventbus" + "github.com/coze-dev/coze-studio/backend/pkg/logs" + "github.com/coze-dev/coze-studio/backend/pkg/taskgroup" + "github.com/coze-dev/coze-studio/backend/types/consts" +) + +type producerImpl struct { + nc *nats.Conn + js nats.JetStreamContext + useJetStream bool + topic string // Store the topic for this producer instance + closed bool + mu sync.RWMutex +} + +// NewProducer creates a new NATS producer +func NewProducer(serverURL, topic, group string) (eventbus.Producer, error) { + if serverURL == "" { + return nil, fmt.Errorf("server URL is empty") + } + + if topic == "" { + return nil, fmt.Errorf("topic is empty") + } + + // Set up NATS connection options + opts := []nats.Option{ + nats.Name("coze-studio-producer"), + nats.MaxReconnects(-1), // Unlimited reconnects + } + + // Add authentication if provided + if jwtToken := os.Getenv(consts.NATSJWTToken); jwtToken != "" { + nkeySeed := os.Getenv(consts.NATSNKeySeed) + opts = append(opts, nats.UserJWTAndSeed(jwtToken, nkeySeed)) + } else if username := os.Getenv(consts.NATSUsername); username != "" { + password := os.Getenv(consts.NATSPassword) + opts = append(opts, nats.UserInfo(username, password)) + } else if token := os.Getenv(consts.NATSToken); token != "" { + opts = append(opts, nats.Token(token)) + } + + // Connect to NATS + nc, err := nats.Connect(serverURL, opts...) + if err != nil { + return nil, fmt.Errorf("connect to NATS failed: %w", err) + } + + // Check if JetStream should be used + useJetStream := os.Getenv(consts.NATSUseJetStream) == "true" + + producer := &producerImpl{ + nc: nc, + useJetStream: useJetStream, + topic: topic, // Store the topic for this producer instance + closed: false, + } + + // Initialize JetStream if needed + if useJetStream { + js, err := nc.JetStream() + if err != nil { + nc.Close() + return nil, fmt.Errorf("create JetStream context failed: %w", err) + } + producer.js = js + } + + return producer, nil +} + +// Send sends a single message using the stored topic +func (p *producerImpl) Send(ctx context.Context, body []byte, opts ...eventbus.SendOpt) error { + return p.BatchSend(ctx, [][]byte{body}, opts...) +} + +// BatchSend sends multiple messages using the stored topic +func (p *producerImpl) BatchSend(ctx context.Context, bodyArr [][]byte, opts ...eventbus.SendOpt) error { + p.mu.RLock() + if p.closed { + p.mu.RUnlock() + return fmt.Errorf("producer is closed") + } + p.mu.RUnlock() + + if len(bodyArr) == 0 { + return fmt.Errorf("no messages to send") + } + + // Use the stored topic + topic := p.topic + if topic == "" { + return fmt.Errorf("topic is not set") + } + + // Parse producer options + option := &eventbus.SendOption{} + for _, opt := range opts { + opt(option) + } + + if p.useJetStream { + return p.batchSendJetStream(ctx, topic, bodyArr, option) + } else { + return p.batchSendCore(ctx, topic, bodyArr, option) + } +} + +// batchSendJetStream sends messages using JetStream for persistence +func (p *producerImpl) batchSendJetStream(ctx context.Context, topic string, messages [][]byte, option *eventbus.SendOption) error { + // Ensure Stream exists + if err := ensureStream(p.js, topic); err != nil { + return fmt.Errorf("ensure stream failed: %w", err) + } + + // Use TaskGroup to wait for all async publishes + tg := taskgroup.NewTaskGroup(ctx, min(len(messages), 5)) + + for i, message := range messages { + tg.Go(func() error { + // Prepare publish options + pubOpts := []nats.PubOpt{} + + // Add message ID for deduplication if sharding key is provided + if option.ShardingKey != nil && *option.ShardingKey != "" { + msgID := fmt.Sprintf("%s-%d", *option.ShardingKey, i) + pubOpts = append(pubOpts, nats.MsgId(msgID)) + } + + // Add context for timeout + pubOpts = append(pubOpts, nats.Context(ctx)) + + // Publish message asynchronously + _, err := p.js.Publish(topic, message, pubOpts...) + if err != nil { + return fmt.Errorf("publish message %d failed: %w", i, err) + } + return nil + }) + } + + // Wait for all messages to be sent + if err := tg.Wait(); err != nil { + return err + } + + logs.Debugf("successfully sent %d messages to NATS JetStream topic: %s", len(messages), topic) + return nil +} + +// batchSendCore sends messages using core NATS for simple pub/sub +func (p *producerImpl) batchSendCore(ctx context.Context, topic string, messages [][]byte, option *eventbus.SendOption) error { + // Use TaskGroup to wait for all async publishes + tg := taskgroup.NewTaskGroup(ctx, min(len(messages), 5)) + + for i, message := range messages { + tg.Go(func() error { + // For core NATS, we can add headers if sharding key is provided + if option.ShardingKey != nil && *option.ShardingKey != "" { + // Create message with headers + natsMsg := &nats.Msg{ + Subject: topic, + Data: message, + Header: nats.Header{}, + } + natsMsg.Header.Set("Sharding-Key", *option.ShardingKey) + + err := p.nc.PublishMsg(natsMsg) + if err != nil { + return fmt.Errorf("publish message %d with header failed: %w", i, err) + } + } else { + // Simple publish without headers + err := p.nc.Publish(topic, message) + if err != nil { + return fmt.Errorf("publish message %d failed: %w", i, err) + } + } + return nil + }) + } + + // Wait for all messages to be sent + if err := tg.Wait(); err != nil { + return err + } + + // Flush to ensure all messages are sent + if err := p.nc.Flush(); err != nil { + return fmt.Errorf("flush NATS connection failed: %w", err) + } + logs.Debugf("successfully sent %d messages to NATS core topic: %s", len(messages), topic) + return nil +} + +// Close closes the producer and releases resources +func (p *producerImpl) Close() error { + p.mu.Lock() + defer p.mu.Unlock() + + if p.closed { + return nil + } + + p.closed = true + + if p.nc != nil { + // Drain connection to ensure all pending messages are sent + if err := p.nc.Drain(); err != nil { + logs.Warnf("drain NATS connection failed: %v", err) + } + p.nc.Close() + } + + logs.Infof("NATS producer closed successfully") + return nil +} diff --git a/backend/types/consts/consts.go b/backend/types/consts/consts.go index 7ace2d187..cf457ba4c 100644 --- a/backend/types/consts/consts.go +++ b/backend/types/consts/consts.go @@ -60,6 +60,12 @@ const ( RMQAccessKey = "RMQ_ACCESS_KEY" PulsarServiceURL = "PULSAR_SERVICE_URL" PulsarJWTToken = "PULSAR_JWT_TOKEN" + NATSJWTToken = "NATS_JWT_TOKEN" + NATSNKeySeed = "NATS_NKEY_SEED" + NATSUsername = "NATS_USERNAME" + NATSPassword = "NATS_PASSWORD" + NATSToken = "NATS_TOKEN" + NATSUseJetStream = "NATS_USE_JETSTREAM" RMQTopicApp = "opencoze_search_app" RMQTopicResource = "opencoze_search_resource" RMQTopicKnowledge = "opencoze_knowledge" diff --git a/docker/.env.example b/docker/.env.example index b8643f42a..99664de7c 100644 --- a/docker/.env.example +++ b/docker/.env.example @@ -73,7 +73,7 @@ export ES_NUMBER_OF_SHARDS="1" export ES_NUMBER_OF_REPLICAS="1" # Backend Event Bus -export COZE_MQ_TYPE="nsq" # nsq / kafka / rmq / pulsar +export COZE_MQ_TYPE="nsq" # nsq / kafka / rmq / pulsar / nats export MQ_NAME_SERVER="nsqd:4150" # RocketMQ export RMQ_ACCESS_KEY="" @@ -83,6 +83,27 @@ export RMQ_SECRET_KEY="" # Fill PULSAR_JWT_TOKEN for JWT auth, leave empty for no auth export PULSAR_SERVICE_URL="pulsar://pulsar-service:6650" export PULSAR_JWT_TOKEN="" +# NATS +# Use NATS as backend eventbus with JetStream support +# Set COZE_MQ_TYPE="nats" and MQ_NAME_SERVER="nats:4222" to enable NATS +# NATS_SERVER_URL: NATS server connection URL, supports nats:// and tls:// protocols +# For cluster setup, use comma-separated URLs: "nats://nats1:4222,nats://nats2:4222" +# For TLS connection: "tls://nats:4222" +export NATS_SERVER_URL="nats://nats:4222" +# NATS_JWT_TOKEN: JWT token for NATS authentication (leave empty for no auth) +export NATS_JWT_TOKEN="" +# NATS_NKEY_SEED: Path to NATS seed file for NKey authentication (optional) +export NATS_NKEY_SEED="" +# NATS_USERNAME: Username for NATS authentication (optional) +export NATS_USERNAME="" +# NATS_PASSWORD: Password for NATS authentication (optional) +export NATS_PASSWORD="" +# NATS_TOKEN: Token for NATS authentication (optional) +export NATS_TOKEN="" +# NATS_STREAM_REPLICAS: Number of replicas for JetStream streams (default: 1) +export NATS_STREAM_REPLICAS="1" +# NATS_USE_JETSTREAM: Enable JetStream mode for message persistence and reliability (default: false) +export NATS_USE_JETSTREAM="true" # Settings for VectorStore # VectorStore type: milvus / vikingdb / oceanbase diff --git a/docs/nats-eventbus-integration-guide-en.md b/docs/nats-eventbus-integration-guide-en.md new file mode 100644 index 000000000..3decdb219 --- /dev/null +++ b/docs/nats-eventbus-integration-guide-en.md @@ -0,0 +1,507 @@ +# NATS EventBus Integration Guide + +## Overview + +This document provides a comprehensive guide for integrating NATS as an EventBus in Coze Studio, including architecture design, implementation details, configuration instructions, and usage guidelines. + +## Integration Background + +### Why Choose NATS? + +In Coze Studio's architecture, EventBus plays a critical role in asynchronous message delivery, including workflow execution, Agent communication, data processing pipelines, and other core functions. NATS, as a lightweight and high-performance messaging system, brings the following core advantages to Coze Studio: + +1. **Lightweight**: NATS has minimal resource footprint and simple deployment architecture, perfect for cloud-native environments +2. **High Performance**: Provides low-latency, high-throughput messaging that can support Coze Studio's large-scale concurrent Agent execution +3. **Simplicity**: Clean and intuitive API that reduces development and maintenance costs +4. **JetStream Support**: Provides message persistence, replay, and stream processing capabilities through JetStream +5. **Cloud Native**: Native support for Kubernetes, easy to deploy and manage in containerized environments +6. **Security**: Built-in authentication and authorization mechanisms with TLS encryption support + +### Comparison with Other MQ Systems + +| Feature | NATS | NSQ | Kafka | RocketMQ | Pulsar | +| ---------------------- | -------------- | -------------- | -------------- | -------------- | -------------- | +| **Deployment Complexity** | Very Low | Low | Medium | Medium | Medium | +| **Performance** | Very High | Medium | High | High | High | +| **Resource Usage** | Very Low | Low | Medium | Medium | Medium | +| **Message Persistence** | JetStream | Limited | Strong | Strong | Strong | +| **Message Ordering** | Supported | Weak | Strong | Strong | Strong | +| **Horizontal Scaling** | Good | Medium | Good | Good | Excellent | +| **Operational Complexity** | Very Low | Low | High | Medium | Medium | +| **Cloud Native Support** | Excellent | Medium | Medium | Medium | Good | + +#### NATS Core Advantages + +**Lightweight and High Performance**: +- **Memory Usage**: NATS server typically requires only tens of MB to handle millions of messages +- **Startup Speed**: Second-level startup, perfect for microservices and containerized deployments +- **Latency**: Sub-millisecond message latency, suitable for real-time scenarios +- **Throughput**: Single node can handle millions of messages per second + +**Simplicity**: +- **Simple Configuration**: Minimal configuration required to run, no complex cluster setup needed +- **Clean API**: Publish/subscribe pattern is simple and intuitive with low learning curve +- **Operations Friendly**: Rich monitoring and debugging tools, easy troubleshooting + +**Cloud Native Features**: +- **Kubernetes Integration**: Official Helm Charts and Operators available +- **Service Discovery**: Built-in service discovery mechanism, no external dependencies +- **Elastic Scaling**: Supports dynamic cluster membership changes + +## Architecture Design + +### Overall Architecture + +``` +┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐ +│ Coze Studio │ │ NATS Server │ │ JetStream │ +│ Application │ │ │ │ Storage │ +├─────────────────┤ ├─────────────────┤ ├─────────────────┤ +│ Producer │───▶│ Core NATS │ │ Streams │ +│ Consumer │◀───│ JetStream │◀───│ Consumers │ +│ EventBus │ │ Clustering │ │ Key-Value │ +└─────────────────┘ └─────────────────┘ └─────────────────┘ +``` + +### Message Flow Patterns + +NATS supports two messaging modes in Coze Studio: + +1. **Core NATS**: For real-time, lightweight message delivery + - Publish/Subscribe pattern + - Request/Response pattern + - Queue Group pattern + +2. **JetStream**: For messages requiring persistence and high reliability + - Stream storage + - Message replay + - Consumer acknowledgment mechanism + +## Implementation Details + +### Producer Implementation + +The Producer is responsible for sending messages to NATS, supporting the following features: + +```go +type Producer struct { + nc nats.Conn + js nats.JetStreamContext + closed bool + mu sync.RWMutex +} + +func (p *Producer) SendMessage(ctx context.Context, topic string, message []byte) error { + // Supports both Core NATS and JetStream modes + if p.js != nil { + // JetStream mode: supports message persistence + _, err := p.js.Publish(topic, message) + return err + } else { + // Core NATS mode: lightweight publishing + return p.nc.Publish(topic, message) + } +} +``` + +### Consumer Implementation + +The Consumer is responsible for receiving and processing messages from NATS: + +```go +func (c *Consumer) RegisterConsumer(serverURL, topic, group string, handler ConsumerHandler) error { + // Choose JetStream or Core NATS based on configuration + if c.useJetStream { + return c.startJetStreamConsumer(ctx, topic, group, handler) + } else { + return c.startCoreConsumer(ctx, topic, group, handler) + } +} +``` + +#### JetStream Consumer Features + +- **Message Acknowledgment**: Supports manual acknowledgment mechanism to ensure successful message processing +- **Retry Mechanism**: Automatic retry for failed messages with exponential backoff support +- **Sequential Processing**: Single message processing to avoid complexity from batch processing +- **Flow Control**: Precise message flow control to prevent consumer overload + +#### Core NATS Consumer Features + +- **Queue Groups**: Supports load-balanced message distribution +- **Lightweight**: No persistence overhead, suitable for real-time message processing +- **High Performance**: Extremely low message processing latency + +## Configuration Guide + +### Environment Variables + +Add the following NATS-related configurations in `docker/.env.example`: + +```bash +# Backend Event Bus +export COZE_MQ_TYPE="nats" # Set message queue type to NATS +export MQ_NAME_SERVER="nats:4222" # NATS server address + +# NATS specific configuration +# NATS_SERVER_URL: NATS server connection URL, supports nats:// and tls:// protocols +# For cluster setup, use comma-separated URLs: "nats://nats1:4222,nats://nats2:4222" +# For TLS connection: "tls://nats:4222" +export NATS_SERVER_URL="nats://nats:4222" + +# NATS_JWT_TOKEN: JWT token for NATS authentication (leave empty for no auth) +export NATS_JWT_TOKEN="" + +# NATS_NKEY_SEED: Path to NATS seed file for NKey authentication (optional) +export NATS_NKEY_SEED="" + +# NATS_USERNAME: Username for NATS authentication (optional) +export NATS_USERNAME="" + +# NATS_PASSWORD: Password for NATS authentication (optional) +export NATS_PASSWORD="" + +# NATS_TOKEN: Token for NATS authentication (optional) +export NATS_TOKEN="" + +# NATS_STREAM_REPLICAS: Number of replicas for JetStream streams (default: 1) +export NATS_STREAM_REPLICAS="1" + +# NATS_USE_JETSTREAM: Enable JetStream mode for message persistence and reliability (default: false) +export NATS_USE_JETSTREAM="true" +``` + +### Docker Compose Configuration + +NATS service configuration in `docker-compose.yml`: + +```yaml +nats: + image: nats:2.10.24-alpine + container_name: nats + restart: unless-stopped + command: + - "--jetstream" # Enable JetStream + - "--store_dir=/data" # Data storage directory + - "--max_memory_store=1GB" # Memory storage limit + - "--max_file_store=10GB" # File storage limit + ports: + - "4222:4222" # Client connection port + - "8222:8222" # HTTP monitoring port + - "6222:6222" # Cluster communication port + volumes: + - ./volumes/nats:/data + networks: + - coze-network + healthcheck: + test: ["CMD", "wget", "--no-verbose", "--tries=1", "--spider", "http://localhost:8222/"] + interval: 30s + timeout: 10s + retries: 3 + start_period: 40s +``` + +### Application Configuration + +Configure NATS in Coze Studio application through environment variables: + +```go +// Read configuration from environment variables +mqType := os.Getenv("COZE_MQ_TYPE") +natsURL := os.Getenv("NATS_SERVER_URL") +jwtToken := os.Getenv("NATS_JWT_TOKEN") +seedFile := os.Getenv("NATS_NKEY_SEED") +streamReplicas := os.Getenv("NATS_STREAM_REPLICAS") + +// Create NATS EventBus +if mqType == "nats" { + config := &nats.Config{ + ServerURL: natsURL, + JWTToken: jwtToken, + SeedFile: seedFile, + StreamReplicas: streamReplicas, + } + + eventBus, err := nats.NewProducer(config) + if err != nil { + log.Fatal("Failed to create NATS producer:", err) + } +} +``` + +## Deployment Guide + +### Docker Deployment + +1. **Configure Environment Variables**: + ```bash + cp docker/.env.example docker/.env + # Edit .env file, set COZE_MQ_TYPE="nats" + ``` + +2. **Start Services**: + ```bash + cd docker + docker-compose up -d nats + ``` + +3. **Verify Deployment**: + ```bash + # Check NATS service status + docker-compose ps nats + + # View NATS monitoring interface + curl http://localhost:8222/varz + ``` + +### Kubernetes Deployment + +Deploy NATS using the official Helm Chart: + +```bash +# Add NATS Helm repository +helm repo add nats https://nats-io.github.io/k8s/helm/charts/ + +# Install NATS +helm install nats nats/nats --set nats.jetstream.enabled=true +``` + +### Production Environment Configuration + +For production environments, the following configuration optimizations are recommended: + +1. **Cluster Deployment**: + ```yaml + nats: + cluster: + enabled: true + replicas: 3 + ``` + +2. **Persistent Storage**: + ```yaml + nats: + jetstream: + fileStore: + pvc: + size: 100Gi + storageClassName: fast-ssd + ``` + +3. **Resource Limits**: + ```yaml + nats: + resources: + limits: + cpu: 2000m + memory: 4Gi + requests: + cpu: 500m + memory: 1Gi + ``` + +4. **Security Configuration**: + ```yaml + nats: + auth: + enabled: true + token: "your-secure-token" + tls: + enabled: true + ``` + +## Monitoring and Operations + +### Monitoring Metrics + +NATS provides rich monitoring metrics accessible through HTTP endpoints: + +- **Server Information**: `GET /varz` +- **Connection Information**: `GET /connz` +- **Subscription Information**: `GET /subsz` +- **JetStream Information**: `GET /jsz` + +### Key Monitoring Metrics + +1. **Performance Metrics**: + - Message throughput (messages/sec) + - Message latency (latency) + - Connection count (connections) + +2. **Resource Metrics**: + - Memory usage (memory usage) + - CPU utilization (cpu usage) + - Disk usage (disk usage) + +3. **JetStream Metrics**: + - Stream count (streams) + - Consumer count (consumers) + - Storage usage (storage usage) + +### Log Management + +NATS supports multiple log levels and output formats: + +```bash +# Enable debug logging +nats-server --debug + +# Log output to file +nats-server --log /var/log/nats.log + +# JSON format logging +nats-server --logtime --log_size_limit 100MB +``` + +## Performance Optimization + +### Connection Pool Optimization + +```go +// Configure connection options +opts := []nats.Option{ + nats.MaxReconnects(10), + nats.ReconnectWait(2 * time.Second), + nats.Timeout(5 * time.Second), +} + +nc, err := nats.Connect(serverURL, opts...) +``` + +### JetStream Optimization + +```go +// Configure JetStream options +jsOpts := []nats.JSOpt{ + nats.PublishAsyncMaxPending(1000), + nats.PublishAsyncErrHandler(func(js nats.JetStream, originalMsg *nats.Msg, err error) { + log.Printf("Async publish error: %v", err) + }), +} + +js, err := nc.JetStream(jsOpts...) +``` + +### Consumer Optimization + +```go +// Configure consumer options +consumerOpts := []nats.SubOpt{ + nats.Durable("coze-consumer"), + nats.MaxDeliver(3), + nats.AckWait(30 * time.Second), + nats.MaxAckPending(100), +} + +sub, err := js.PullSubscribe(topic, "coze-group", consumerOpts...) +``` + +## Troubleshooting + +### Common Issues + +1. **Connection Failures**: + - Check if NATS service is running + - Verify network connectivity + - Confirm port configuration is correct + +2. **Message Loss**: + - Check if JetStream is enabled + - Verify message acknowledgment mechanism + - Review error logs + +3. **Performance Issues**: + - Monitor resource usage + - Check for message backlog + - Optimize consumer configuration + +### Debugging Tools + +NATS provides rich debugging tools: + +```bash +# NATS CLI tools +nats server info +nats stream list +nats consumer list + +# Monitor message flow +nats sub "coze.>" +nats pub "coze.test" "hello world" +``` + +## Best Practices + +### Subject Naming Conventions + +Recommend using hierarchical subject naming: + +``` +coze.workflow.{workflow_id}.{event_type} +coze.agent.{agent_id}.{action} +coze.knowledge.{kb_id}.{operation} +``` + +### Error Handling + +Implement comprehensive error handling mechanisms: + +```go +func (c *Consumer) handleMessage(msg *nats.Msg) { + defer func() { + if r := recover(); r != nil { + log.Printf("Message processing panic: %v", r) + msg.Nak() // Reject message, trigger retry + } + }() + + if err := c.processMessage(msg.Data); err != nil { + log.Printf("Message processing error: %v", err) + msg.Nak() + return + } + + msg.Ack() // Acknowledge successful message processing +} +``` + +### Resource Management + +Properly manage NATS connections and resources: + +```go +func (p *Producer) Close() error { + p.mu.Lock() + defer p.mu.Unlock() + + if p.closed { + return nil + } + + p.closed = true + + if p.nc != nil { + p.nc.Close() + } + + return nil +} +``` + +## Summary + +NATS as Coze Studio's EventBus solution provides lightweight, high-performance, and easy-to-deploy messaging capabilities. Through JetStream extensions, NATS can also provide enterprise-grade message persistence and stream processing functionality. + +Key advantages of choosing NATS: +- **Simplicity**: Low deployment and maintenance costs +- **Performance**: Extremely high message processing performance +- **Cloud Native**: Perfect fit for containerized and Kubernetes environments +- **Reliability**: JetStream provides message persistence and acknowledgment mechanisms +- **Scalability**: Supports cluster deployment and horizontal scaling + +NATS is particularly suitable for the following scenarios: +- Inter-service communication in microservice architectures +- Real-time data stream processing +- Message delivery for cloud-native applications +- Low-latency messaging systems +- Resource-constrained deployment environments \ No newline at end of file diff --git a/docs/nats-eventbus-integration-guide.md b/docs/nats-eventbus-integration-guide.md new file mode 100644 index 000000000..ced81aaab --- /dev/null +++ b/docs/nats-eventbus-integration-guide.md @@ -0,0 +1,507 @@ +# NATS EventBus 集成指南 + +## 概述 + +本文档详细介绍了 NATS 作为 EventBus 在 Coze Studio 中的集成适配情况,包括架构设计、实现细节、配置说明和使用指南。 + +## 集成背景 + +### 为什么选择 NATS? + +在 Coze Studio 的架构中,EventBus 承担着关键的异步消息传递任务,包括工作流执行、Agent 通信、数据处理管道等核心功能。NATS 作为一个轻量级、高性能的消息系统,为 Coze Studio 带来了以下核心优势: + +1. **轻量级**: NATS 具有极小的资源占用和简单的部署架构,非常适合云原生环境 +2. **高性能**: 提供低延迟、高吞吐量的消息传递,能够支撑 Coze Studio 大规模并发的 Agent 执行 +3. **简单易用**: API 简洁直观,降低了开发和维护成本 +4. **JetStream 支持**: 通过 JetStream 提供消息持久化、重放和流处理能力 +5. **云原生**: 原生支持 Kubernetes,易于在容器化环境中部署和管理 +6. **安全性**: 内置多种认证和授权机制,支持 TLS 加密 + +### 与其他 MQ 的对比 + +| 特性 | NATS | NSQ | Kafka | RocketMQ | Pulsar | +| ---------------------- | -------------- | -------------- | -------------- | -------------- | -------------- | +| **部署复杂度** | 极低 | 低 | 中等 | 中等 | 中等 | +| **性能** | 极高 | 中等 | 高 | 高 | 高 | +| **资源占用** | 极低 | 低 | 中等 | 中等 | 中等 | +| **消息持久化** | JetStream | 有限 | 强 | 强 | 强 | +| **顺序性保障** | 支持 | 弱 | 强 | 强 | 强 | +| **水平扩展性** | 良好 | 中等 | 良好 | 良好 | 优秀 | +| **运维复杂度** | 极低 | 低 | 高 | 中等 | 中等 | +| **云原生支持** | 优秀 | 中等 | 中等 | 中等 | 良好 | + +#### NATS 的核心优势 + +**轻量级和高性能**: +- **内存占用**:NATS 服务器通常只需要几十 MB 内存即可处理数百万消息 +- **启动速度**:秒级启动,非常适合微服务和容器化部署 +- **延迟**:亚毫秒级消息延迟,适合实时性要求高的场景 +- **吞吐量**:单节点可处理数百万消息/秒 + +**简单性**: +- **配置简单**:最小化配置即可运行,无需复杂的集群配置 +- **API 简洁**:发布/订阅模式简单直观,学习成本低 +- **运维友好**:监控和调试工具丰富,问题排查容易 + +**云原生特性**: +- **Kubernetes 集成**:官方提供 Helm Charts 和 Operator +- **服务发现**:内置服务发现机制,无需外部依赖 +- **弹性伸缩**:支持动态集群成员变更 + +## 架构设计 + +### 整体架构 + +``` +┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐ +│ Coze Studio │ │ NATS Server │ │ JetStream │ +│ Application │ │ │ │ Storage │ +├─────────────────┤ ├─────────────────┤ ├─────────────────┤ +│ Producer │───▶│ Core NATS │ │ Streams │ +│ Consumer │◀───│ JetStream │◀───│ Consumers │ +│ EventBus │ │ Clustering │ │ Key-Value │ +└─────────────────┘ └─────────────────┘ └─────────────────┘ +``` + +### 消息流转模式 + +NATS 在 Coze Studio 中支持两种消息模式: + +1. **Core NATS**: 用于实时、轻量级的消息传递 + - 发布/订阅模式 + - 请求/响应模式 + - 队列组模式 + +2. **JetStream**: 用于需要持久化和高可靠性的消息 + - 流式存储 + - 消息重放 + - 消费者确认机制 + +## 实现细节 + +### Producer 实现 + +Producer 负责向 NATS 发送消息,支持以下特性: + +```go +type Producer struct { + nc nats.Conn + js nats.JetStreamContext + closed bool + mu sync.RWMutex +} + +func (p *Producer) SendMessage(ctx context.Context, topic string, message []byte) error { + // 支持 Core NATS 和 JetStream 两种模式 + if p.js != nil { + // JetStream 模式:支持消息持久化 + _, err := p.js.Publish(topic, message) + return err + } else { + // Core NATS 模式:轻量级发布 + return p.nc.Publish(topic, message) + } +} +``` + +### Consumer 实现 + +Consumer 负责从 NATS 接收和处理消息: + +```go +func (c *Consumer) RegisterConsumer(serverURL, topic, group string, handler ConsumerHandler) error { + // 根据配置选择 JetStream 或 Core NATS + if c.useJetStream { + return c.startJetStreamConsumer(ctx, topic, group, handler) + } else { + return c.startCoreConsumer(ctx, topic, group, handler) + } +} +``` + +#### JetStream Consumer 特性 + +- **消息确认**: 支持手动确认机制,确保消息处理成功 +- **重试机制**: 失败消息自动重试,支持指数退避 +- **顺序处理**: 单条消息处理,避免批处理带来的复杂性 +- **流控制**: 精确的消息流控制,防止消费者过载 + +#### Core NATS Consumer 特性 + +- **队列组**: 支持负载均衡的消息分发 +- **轻量级**: 无持久化开销,适合实时消息处理 +- **高性能**: 极低的消息处理延迟 + +## 配置说明 + +### 环境变量配置 + +在 `docker/.env.example` 中添加以下 NATS 相关配置: + +```bash +# Backend Event Bus +export COZE_MQ_TYPE="nats" # 设置消息队列类型为 NATS +export MQ_NAME_SERVER="nats:4222" # NATS 服务器地址 + +# NATS 特定配置 +# NATS_SERVER_URL: NATS 服务器连接 URL,支持 nats:// 和 tls:// 协议 +# 集群模式使用逗号分隔的 URL: "nats://nats1:4222,nats://nats2:4222" +# TLS 连接: "tls://nats:4222" +export NATS_SERVER_URL="nats://nats:4222" + +# NATS_JWT_TOKEN: NATS JWT 认证令牌(留空表示无认证) +export NATS_JWT_TOKEN="" + +# NATS_NKEY_SEED: NATS NKey 认证种子文件路径(可选) +export NATS_NKEY_SEED="" + +# NATS_USERNAME: NATS 用户名认证(可选) +export NATS_USERNAME="" + +# NATS_PASSWORD: NATS 密码认证(可选) +export NATS_PASSWORD="" + +# NATS_TOKEN: NATS 令牌认证(可选) +export NATS_TOKEN="" + +# NATS_STREAM_REPLICAS: JetStream 流的副本数量(默认: 1) +export NATS_STREAM_REPLICAS="1" + +# NATS_USE_JETSTREAM: 启用 JetStream 模式以获得消息持久化和可靠性(默认: false) +export NATS_USE_JETSTREAM="true" +``` + +### Docker Compose 配置 + +在 `docker-compose.yml` 中的 NATS 服务配置: + +```yaml +nats: + image: nats:2.10.24-alpine + container_name: nats + restart: unless-stopped + command: + - "--jetstream" # 启用 JetStream + - "--store_dir=/data" # 数据存储目录 + - "--max_memory_store=1GB" # 内存存储限制 + - "--max_file_store=10GB" # 文件存储限制 + ports: + - "4222:4222" # 客户端连接端口 + - "8222:8222" # HTTP 监控端口 + - "6222:6222" # 集群通信端口 + volumes: + - ./volumes/nats:/data + networks: + - coze-network + healthcheck: + test: ["CMD", "wget", "--no-verbose", "--tries=1", "--spider", "http://localhost:8222/"] + interval: 30s + timeout: 10s + retries: 3 + start_period: 40s +``` + +### 应用程序配置 + +在 Coze Studio 应用中,通过环境变量配置 NATS: + +```go +// 从环境变量读取配置 +mqType := os.Getenv("COZE_MQ_TYPE") +natsURL := os.Getenv("NATS_SERVER_URL") +jwtToken := os.Getenv("NATS_JWT_TOKEN") +seedFile := os.Getenv("NATS_NKEY_SEED") +streamReplicas := os.Getenv("NATS_STREAM_REPLICAS") + +// 创建 NATS EventBus +if mqType == "nats" { + config := &nats.Config{ + ServerURL: natsURL, + JWTToken: jwtToken, + SeedFile: seedFile, + StreamReplicas: streamReplicas, + } + + eventBus, err := nats.NewProducer(config) + if err != nil { + log.Fatal("Failed to create NATS producer:", err) + } +} +``` + +## 部署指南 + +### Docker 部署 + +1. **配置环境变量**: + ```bash + cp docker/.env.example docker/.env + # 编辑 .env 文件,设置 COZE_MQ_TYPE="nats" + ``` + +2. **启动服务**: + ```bash + cd docker + docker-compose up -d nats + ``` + +3. **验证部署**: + ```bash + # 检查 NATS 服务状态 + docker-compose ps nats + + # 查看 NATS 监控界面 + curl http://localhost:8222/varz + ``` + +### Kubernetes 部署 + +使用官方 Helm Chart 部署 NATS: + +```bash +# 添加 NATS Helm 仓库 +helm repo add nats https://nats-io.github.io/k8s/helm/charts/ + +# 安装 NATS +helm install nats nats/nats --set nats.jetstream.enabled=true +``` + +### 生产环境配置 + +对于生产环境,建议进行以下配置优化: + +1. **集群部署**: + ```yaml + nats: + cluster: + enabled: true + replicas: 3 + ``` + +2. **持久化存储**: + ```yaml + nats: + jetstream: + fileStore: + pvc: + size: 100Gi + storageClassName: fast-ssd + ``` + +3. **资源限制**: + ```yaml + nats: + resources: + limits: + cpu: 2000m + memory: 4Gi + requests: + cpu: 500m + memory: 1Gi + ``` + +4. **安全配置**: + ```yaml + nats: + auth: + enabled: true + token: "your-secure-token" + tls: + enabled: true + ``` + +## 监控和运维 + +### 监控指标 + +NATS 提供丰富的监控指标,可通过 HTTP 端点获取: + +- **服务器信息**: `GET /varz` +- **连接信息**: `GET /connz` +- **订阅信息**: `GET /subsz` +- **JetStream 信息**: `GET /jsz` + +### 关键监控指标 + +1. **性能指标**: + - 消息吞吐量 (messages/sec) + - 消息延迟 (latency) + - 连接数 (connections) + +2. **资源指标**: + - 内存使用量 (memory usage) + - CPU 使用率 (cpu usage) + - 磁盘使用量 (disk usage) + +3. **JetStream 指标**: + - 流数量 (streams) + - 消费者数量 (consumers) + - 存储使用量 (storage usage) + +### 日志管理 + +NATS 支持多种日志级别和输出格式: + +```bash +# 启用调试日志 +nats-server --debug + +# 日志输出到文件 +nats-server --log /var/log/nats.log + +# JSON 格式日志 +nats-server --logtime --log_size_limit 100MB +``` + +## 性能优化 + +### 连接池优化 + +```go +// 配置连接选项 +opts := []nats.Option{ + nats.MaxReconnects(10), + nats.ReconnectWait(2 * time.Second), + nats.Timeout(5 * time.Second), +} + +nc, err := nats.Connect(serverURL, opts...) +``` + +### JetStream 优化 + +```go +// 配置 JetStream 选项 +jsOpts := []nats.JSOpt{ + nats.PublishAsyncMaxPending(1000), + nats.PublishAsyncErrHandler(func(js nats.JetStream, originalMsg *nats.Msg, err error) { + log.Printf("Async publish error: %v", err) + }), +} + +js, err := nc.JetStream(jsOpts...) +``` + +### 消费者优化 + +```go +// 配置消费者选项 +consumerOpts := []nats.SubOpt{ + nats.Durable("coze-consumer"), + nats.MaxDeliver(3), + nats.AckWait(30 * time.Second), + nats.MaxAckPending(100), +} + +sub, err := js.PullSubscribe(topic, "coze-group", consumerOpts...) +``` + +## 故障排查 + +### 常见问题 + +1. **连接失败**: + - 检查 NATS 服务是否启动 + - 验证网络连通性 + - 确认端口配置正确 + +2. **消息丢失**: + - 检查 JetStream 是否启用 + - 验证消息确认机制 + - 查看错误日志 + +3. **性能问题**: + - 监控资源使用情况 + - 检查消息积压 + - 优化消费者配置 + +### 调试工具 + +NATS 提供了丰富的调试工具: + +```bash +# NATS CLI 工具 +nats server info +nats stream list +nats consumer list + +# 监控消息流 +nats sub "coze.>" +nats pub "coze.test" "hello world" +``` + +## 最佳实践 + +### 主题命名规范 + +建议使用层次化的主题命名: + +``` +coze.workflow.{workflow_id}.{event_type} +coze.agent.{agent_id}.{action} +coze.knowledge.{kb_id}.{operation} +``` + +### 错误处理 + +实现完善的错误处理机制: + +```go +func (c *Consumer) handleMessage(msg *nats.Msg) { + defer func() { + if r := recover(); r != nil { + log.Printf("Message processing panic: %v", r) + msg.Nak() // 拒绝消息,触发重试 + } + }() + + if err := c.processMessage(msg.Data); err != nil { + log.Printf("Message processing error: %v", err) + msg.Nak() + return + } + + msg.Ack() // 确认消息处理成功 +} +``` + +### 资源管理 + +正确管理 NATS 连接和资源: + +```go +func (p *Producer) Close() error { + p.mu.Lock() + defer p.mu.Unlock() + + if p.closed { + return nil + } + + p.closed = true + + if p.nc != nil { + p.nc.Close() + } + + return nil +} +``` + +## 总结 + +NATS 作为 Coze Studio 的 EventBus 解决方案,提供了轻量级、高性能、易于部署的消息传递能力。通过 JetStream 扩展,NATS 还能提供企业级的消息持久化和流处理功能。 + +选择 NATS 的主要优势: +- **简单性**: 部署和维护成本低 +- **性能**: 极高的消息处理性能 +- **云原生**: 完美适配容器化和 Kubernetes 环境 +- **可靠性**: JetStream 提供消息持久化和确认机制 +- **扩展性**: 支持集群部署和水平扩展 + +NATS 特别适合以下场景: +- 微服务架构的服务间通信 +- 实时数据流处理 +- 云原生应用的消息传递 +- 需要低延迟的消息系统 +- 资源受限的部署环境 \ No newline at end of file