feat: telemetry infra

This commit is contained in:
xuzhaonan
2025-08-04 20:59:17 +08:00
parent 3fe4031531
commit 9335447977
27 changed files with 2433 additions and 17 deletions

View File

@ -35,7 +35,7 @@ import (
func TestClearConversationCtx(t *testing.T) {
h := server.Default()
err := application.Init(context.Background())
_, err := application.Init(context.Background())
t.Logf("application init err: %v", err)
h.POST("/api/conversation/create_section", ClearConversationCtx)
@ -55,7 +55,7 @@ func TestClearConversationCtx(t *testing.T) {
func TestClearConversationHistory(t *testing.T) {
h := server.Default()
err := application.Init(context.Background())
_, err := application.Init(context.Background())
t.Logf("application init err: %v", err)
h.POST("/api/conversation/clear_message", ClearConversationHistory)
req := &conversation.ClearConversationHistoryRequest{

View File

@ -35,7 +35,7 @@ import (
func TestGetMessageList(t *testing.T) {
h := server.Default()
err := application.Init(context.Background())
_, err := application.Init(context.Background())
t.Logf("application init err: %v", err)

View File

@ -20,9 +20,11 @@ import (
"context"
"fmt"
"github.com/cloudwego/hertz/pkg/route"
"github.com/coze-dev/coze-studio/backend/application/openauth"
"github.com/coze-dev/coze-studio/backend/application/template"
"github.com/coze-dev/coze-studio/backend/crossdomain/contract/crosssearch"
"github.com/coze-dev/coze-studio/backend/pkg/logs"
"github.com/coze-dev/coze-studio/backend/application/app"
"github.com/coze-dev/coze-studio/backend/application/base/appinfra"
@ -102,27 +104,33 @@ type complexServices struct {
conversationSVC *conversation.ConversationApplicationService
}
func Init(ctx context.Context) (err error) {
func Init(ctx context.Context) (shutdown []route.CtxCallback, err error) {
infra, err := appinfra.Init(ctx)
if err != nil {
return err
return nil, err
}
shutdown = append(shutdown, func(ctx context.Context) {
if e := infra.TracerProvider.Shutdown(ctx); e != nil {
logs.CtxErrorf(ctx, "shut down tracer provider failed, trace might loss, err=%v", e)
}
})
eventbus := initEventBus(infra)
basicServices, err := initBasicServices(ctx, infra, eventbus)
if err != nil {
return fmt.Errorf("Init - initBasicServices failed, err: %v", err)
return nil, fmt.Errorf("Init - initBasicServices failed, err: %v", err)
}
primaryServices, err := initPrimaryServices(ctx, basicServices)
if err != nil {
return fmt.Errorf("Init - initPrimaryServices failed, err: %v", err)
return nil, fmt.Errorf("Init - initPrimaryServices failed, err: %v", err)
}
complexServices, err := initComplexServices(ctx, primaryServices)
if err != nil {
return fmt.Errorf("Init - initVitalServices failed, err: %v", err)
return nil, fmt.Errorf("Init - initVitalServices failed, err: %v", err)
}
crossconnector.SetDefaultSVC(connectorImpl.InitDomainService(basicServices.connectorSVC.DomainSVC))
@ -139,7 +147,7 @@ func Init(ctx context.Context) (err error) {
crossdatacopy.SetDefaultSVC(dataCopyImpl.InitDomainService(basicServices.infra))
crosssearch.SetDefaultSVC(searchImpl.InitDomainService(complexServices.searchSVC.DomainSVC))
return nil
return shutdown, nil
}
func initEventBus(infra *appinfra.AppDependencies) *eventbusImpl {

View File

@ -22,7 +22,10 @@ import (
"os"
"strconv"
"strings"
"time"
"github.com/ClickHouse/clickhouse-go/v2"
"github.com/coze-dev/coze-studio/backend/infra/contract/telemetry"
"gorm.io/gorm"
"github.com/coze-dev/coze-studio/backend/infra/contract/coderunner"
@ -37,6 +40,7 @@ import (
"github.com/coze-dev/coze-studio/backend/infra/impl/imagex/veimagex"
"github.com/coze-dev/coze-studio/backend/infra/impl/mysql"
"github.com/coze-dev/coze-studio/backend/infra/impl/storage"
ck "github.com/coze-dev/coze-studio/backend/infra/impl/telemetry/clickhouse"
"github.com/coze-dev/coze-studio/backend/types/consts"
)
@ -51,6 +55,8 @@ type AppDependencies struct {
AppEventProducer eventbus.Producer
ModelMgr modelmgr.Manager
CodeRunner coderunner.Runner
TracerProvider telemetry.TracerProvider
QueryClient telemetry.QueryClient
}
func Init(ctx context.Context) (*AppDependencies, error) {
@ -101,6 +107,11 @@ func Init(ctx context.Context) (*AppDependencies, error) {
deps.CodeRunner = initCodeRunner()
deps.TracerProvider, deps.QueryClient, err = initTelemetry()
if err != nil {
return nil, err
}
return deps, nil
}
@ -182,3 +193,72 @@ func initCodeRunner() coderunner.Runner {
return direct.NewRunner()
}
}
func initTelemetry() (telemetry.TracerProvider, telemetry.QueryClient, error) {
typ := os.Getenv(consts.TelemetryType)
switch typ {
case "clickhouse":
opts := &clickhouse.Options{
Addr: strings.Split(os.Getenv(consts.ClickhouseAddr), ";"),
Auth: clickhouse.Auth{
Database: getEnvOrDefault(os.Getenv(consts.ClickhouseDBName), "default"),
Username: getEnvOrDefault(os.Getenv(consts.ClickhouseUserName), "default"),
Password: getEnvOrDefault(os.Getenv(consts.ClickhousePassword), "clickhouse123"),
},
// Debug: true,
// Debugf: func(format string, v ...any) {
// fmt.Printf(format+"\n", v...)
// },
Settings: clickhouse.Settings{
"max_execution_time": 60,
},
Compression: &clickhouse.Compression{
Method: clickhouse.CompressionZSTD,
Level: 1,
},
DialTimeout: time.Second * 30,
MaxOpenConns: 5,
MaxIdleConns: 5,
ConnMaxLifetime: time.Duration(10) * time.Minute,
ConnOpenStrategy: clickhouse.ConnOpenInOrder,
BlockBufferSize: 10,
MaxCompressionBuffer: 10240,
}
indexRootOnly := os.Getenv(consts.TelemetryIndexRootOnly) == "true"
tracerConfig := &ck.TracerConfig{
ClickhouseOptions: opts,
TracerProviderOptions: nil,
IndexRootOnly: indexRootOnly,
}
tp, err := ck.NewTracerProvider(tracerConfig)
if err != nil {
return nil, nil, err
}
var emptySpanID *string
if v := os.Getenv(consts.ClickhouseEmptySpanID); v != "" {
emptySpanID = &v
}
queryClientConfig := &ck.QueryClientConfig{
ClickhouseOptions: opts,
EmptySpanID: emptySpanID,
}
qc, err := ck.NewQueryClient(queryClientConfig)
if err != nil {
return nil, nil, err
}
return tp, qc, nil
default:
// TODO: not return errors to achieve compatible upgrades ?
return nil, nil, fmt.Errorf("unknown telemetry type: %s", typ)
}
}
func getEnvOrDefault(key, defaultValue string) string {
if v := os.Getenv(key); v != "" {
return v
}
return defaultValue
}

View File

@ -7,6 +7,7 @@ replace github.com/apache/thrift => github.com/apache/thrift v0.13.0
replace google.golang.org/grpc => google.golang.org/grpc v1.68.0
require (
github.com/ClickHouse/clickhouse-go/v2 v2.37.2
github.com/IBM/sarama v1.45.1
github.com/apache/rocketmq-client-go/v2 v2.1.3-0.20250427084711-67ec50b93040
github.com/apache/thrift v0.21.0
@ -75,13 +76,16 @@ require (
golang.org/x/image v0.22.0
golang.org/x/oauth2 v0.23.0
google.golang.org/genai v1.13.0
gorm.io/driver/clickhouse v0.7.0
)
require (
cloud.google.com/go v0.116.0 // indirect
cloud.google.com/go/auth v0.9.3 // indirect
cloud.google.com/go/compute/metadata v0.5.0 // indirect
github.com/ClickHouse/ch-go v0.66.1 // indirect
github.com/alicebob/gopher-json v0.0.0-20230218143504-906a9b012302 // indirect
github.com/andybalholm/brotli v1.1.1 // indirect
github.com/aws/aws-sdk-go-v2/internal/v4a v1.3.37 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.7.5 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.18.18 // indirect
@ -89,14 +93,18 @@ require (
github.com/evanphx/json-patch v4.12.0+incompatible // indirect
github.com/extrame/ole2 v0.0.0-20160812065207-d69429661ad7 // indirect
github.com/frankban/quicktest v1.14.6 // indirect
github.com/go-faster/city v1.0.1 // indirect
github.com/go-faster/errors v0.7.1 // indirect
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
github.com/google/btree v1.1.2 // indirect
github.com/google/s2a-go v0.1.8 // indirect
github.com/googleapis/enterprise-certificate-proxy v0.3.4 // indirect
github.com/jackc/puddle/v2 v2.2.2 // indirect
github.com/hashicorp/go-version v1.7.0 // indirect
github.com/meguminnnnnnnnn/go-openai v0.0.0-20250620092828-0d508a1dcdde // indirect
github.com/paulmach/orb v0.11.1 // indirect
github.com/peterbourgon/diskv/v3 v3.0.1 // indirect
github.com/rogpeppe/fastuuid v1.2.0 // indirect
github.com/segmentio/asm v1.2.0 // indirect
github.com/shabbyrobe/xmlwriter v0.0.0-20200208144257-9fca06d00ffa // indirect
github.com/yuin/gopher-lua v1.1.1 // indirect
go.opencensus.io v0.24.0 // indirect
@ -264,12 +272,12 @@ require (
go.etcd.io/etcd/server/v3 v3.5.5 // indirect
go.opentelemetry.io/auto/sdk v1.1.0 // indirect
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.54.0 // indirect
go.opentelemetry.io/otel v1.36.0 // indirect
go.opentelemetry.io/otel v1.36.0
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.20.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.20.0 // indirect
go.opentelemetry.io/otel/metric v1.36.0 // indirect
go.opentelemetry.io/otel/sdk v1.36.0 // indirect
go.opentelemetry.io/otel/trace v1.36.0 // indirect
go.opentelemetry.io/otel/sdk v1.36.0
go.opentelemetry.io/otel/trace v1.36.0
go.opentelemetry.io/proto/otlp v1.0.0 // indirect
go.uber.org/atomic v1.11.0 // indirect
go.uber.org/automaxprocs v1.5.3 // indirect

View File

@ -766,6 +766,10 @@ github.com/AndreasBriese/bbloom v0.0.0-20190306092124-e2d15f34fcf9/go.mod h1:bOv
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/BurntSushi/toml v1.1.0/go.mod h1:CxXYINrC8qIiEnFrOxCa7Jy5BFHlXnUU2pbicEuybxQ=
github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo=
github.com/ClickHouse/ch-go v0.66.1 h1:LQHFslfVYZsISOY0dnOYOXGkOUvpv376CCm8g7W74A4=
github.com/ClickHouse/ch-go v0.66.1/go.mod h1:NEYcg3aOFv2EmTJfo4m2WF7sHB/YFbLUuIWv9iq76xY=
github.com/ClickHouse/clickhouse-go/v2 v2.37.2 h1:wRLNKoynvHQEN4znnVHNLaYnrqVc9sGJmGYg+GGCfto=
github.com/ClickHouse/clickhouse-go/v2 v2.37.2/go.mod h1:pH2zrBGp5Y438DMwAxXMm1neSXPPjSI7tD4MURVULw8=
github.com/CloudyKit/fastprinter v0.0.0-20200109182630-33d98a066a53/go.mod h1:+3IMCy2vIlbG1XG/0ggNQv0SvxCAIpPM5b1nCz56Xno=
github.com/CloudyKit/jet/v3 v3.0.0/go.mod h1:HKQPgSJmdK8hdoAbKUUWajkHyHo4RaU5rMdUywE7VMo=
github.com/DATA-DOG/go-sqlmock v1.5.2 h1:OcvFkGmslmlZibjAjaHm3L//6LiuBgolP7OputlJIzU=
@ -802,6 +806,8 @@ github.com/alicebob/gopher-json v0.0.0-20230218143504-906a9b012302/go.mod h1:SGn
github.com/alicebob/miniredis/v2 v2.34.0 h1:mBFWMaJSNL9RwdGRyEDoAAv8OQc5UlEhLDQggTglU/0=
github.com/alicebob/miniredis/v2 v2.34.0/go.mod h1:kWShP4b58T1CW0Y5dViCd5ztzrDqRWqM3nksiyXk5s8=
github.com/andybalholm/brotli v1.0.4/go.mod h1:fO7iG3H7G2nSZ7m0zPUDn85XEX2GTukHGRSepvi9Eig=
github.com/andybalholm/brotli v1.1.1 h1:PR2pgnyFznKEugtsUo0xLdDop5SKXd5Qf5ysW+7XdTA=
github.com/andybalholm/brotli v1.1.1/go.mod h1:05ib4cKhjx3OQYUY22hTVd34Bc8upXjOLL2rKwwZBoA=
github.com/anthropics/anthropic-sdk-go v1.4.0 h1:fU1jKxYbQdQDiEXCxeW5XZRIOwKevn/PMg8Ay1nnUx0=
github.com/anthropics/anthropic-sdk-go v1.4.0/go.mod h1:AapDW22irxK2PSumZiQXYUFvsdQgkwIWlpESweWZI/c=
github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY=
@ -1090,6 +1096,10 @@ github.com/go-check/check v0.0.0-20180628173108-788fd7840127/go.mod h1:9ES+weclK
github.com/go-errors/errors v1.0.1/go.mod h1:f4zRHt4oKfwPJE5k8C9vpYG+aDHdBFUsgrm6/TyX73Q=
github.com/go-errors/errors v1.4.2 h1:J6MZopCL4uSllY1OfXM374weqZFFItUbrImctkmUxIA=
github.com/go-errors/errors v1.4.2/go.mod h1:sIVyrIiJhuEF+Pj9Ebtd6P/rEYROXFi3BopGUQ5a5Og=
github.com/go-faster/city v1.0.1 h1:4WAxSZ3V2Ws4QRDrscLEDcibJY8uf41H6AhXDrNDcGw=
github.com/go-faster/city v1.0.1/go.mod h1:jKcUJId49qdW3L1qKHH/3wPeUstCVpVSXTM6vO3VcTw=
github.com/go-faster/errors v0.7.1 h1:MkJTnDoEdi9pDabt1dpWf7AA8/BaSYZqibYyhZ20AYg=
github.com/go-faster/errors v0.7.1/go.mod h1:5ySTjWFiphBs07IKuiL69nxdfd5+fzh1u7FPGZP2quo=
github.com/go-fonts/dejavu v0.1.0/go.mod h1:4Wt4I4OU2Nq9asgDCteaAaWZOV24E+0/Pwo0gppep4g=
github.com/go-fonts/latin-modern v0.2.0/go.mod h1:rQVLdDMK+mK1xscDwsqM5J8U2jrRa3T0ecnM9pNujks=
github.com/go-fonts/liberation v0.1.1/go.mod h1:K6qoJYypsmfVjWg8KOVDQhLc8UDgIK2HYqyqAO9z7GY=
@ -1354,6 +1364,8 @@ github.com/hashicorp/go-uuid v1.0.2/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/b
github.com/hashicorp/go-uuid v1.0.3 h1:2gKiV6YVmrJ1i2CKKa9obLvRieoRGviZFL26PcT/Co8=
github.com/hashicorp/go-uuid v1.0.3/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro=
github.com/hashicorp/go-version v1.2.0/go.mod h1:fltr4n8CU8Ke44wwGCBoEymUuxUHl09ZGVZPK5anwXA=
github.com/hashicorp/go-version v1.7.0 h1:5tqGy27NaOTB8yJKUZELlFAS/LTKJkrmONwQKeRZfjY=
github.com/hashicorp/go-version v1.7.0/go.mod h1:fltr4n8CU8Ke44wwGCBoEymUuxUHl09ZGVZPK5anwXA=
github.com/hashicorp/go.net v0.0.1/go.mod h1:hjKkEWcCURg++eb33jQU7oqQcI9XDCnUzHA0oac0k90=
github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
@ -1581,6 +1593,7 @@ github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9G
github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk=
github.com/mohae/deepcopy v0.0.0-20170929034955-c48cc78d4826 h1:RWengNIwukTxcDr9M+97sNutRR1RKhG96O6jWumTTnw=
github.com/mohae/deepcopy v0.0.0-20170929034955-c48cc78d4826/go.mod h1:TaXosZuwdSHYgviHp1DAtfrULt5eUgsSMsZf+YrPgl8=
github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe/go.mod h1:wL8QJuTMNUDYhXwkmfOly8iTdp5TEcJFWZD2D7SIkUc=
github.com/moul/http2curl v1.0.0/go.mod h1:8UbvGypXm98wA/IqH45anm5Y2Z6ep6O31QGOAZ3H0fQ=
github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
@ -1639,6 +1652,9 @@ github.com/pascaldekloe/goe v0.0.0-20180627143212-57f6aae5913c/go.mod h1:lzWF7FI
github.com/pascaldekloe/goe v0.1.0/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc=
github.com/patrickmn/go-cache v2.1.0+incompatible h1:HRMgzkcYKYpi3C8ajMPV8OFXaaRUnok+kx1WdO15EQc=
github.com/patrickmn/go-cache v2.1.0+incompatible/go.mod h1:3Qf8kWWT7OJRJbdiICTKqZju1ZixQ/KpMGzzAfe6+WQ=
github.com/paulmach/orb v0.11.1 h1:3koVegMC4X/WeiXYz9iswopaTwMem53NzTJuTF20JzU=
github.com/paulmach/orb v0.11.1/go.mod h1:5mULz1xQfs3bmQm63QEJA6lNGujuRafwA5S/EnuLaLU=
github.com/paulmach/protoscan v0.2.1/go.mod h1:SpcSwydNLrxUGSDvXvO0P7g7AuhJ7lcKfDlhJCDw2gY=
github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic=
github.com/pelletier/go-toml/v2 v2.2.3 h1:YmeHyLY8mFWbdkNWwpr+qIL2bEqT0o95WSdkNHvL12M=
github.com/pelletier/go-toml/v2 v2.2.3/go.mod h1:MfCQTFTvCcUyyvvwm1+G6H/jORL20Xlb6rzQu9GuUkc=
@ -1759,6 +1775,8 @@ github.com/samber/lo v1.27.0 h1:GOyDWxsblvqYobqsmUuMddPa2/mMzkKyojlXol4+LaQ=
github.com/samber/lo v1.27.0/go.mod h1:it33p9UtPMS7z72fP4gw/EIfQB2eI8ke7GR2wc6+Rhg=
github.com/schollz/closestmatch v2.1.0+incompatible/go.mod h1:RtP1ddjLong6gTkbtmuhtR2uUrrJOpYzYRvbcPAid+g=
github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529/go.mod h1:DxrIzT+xaE7yg65j358z/aeFdxmN0P9QXhEzd20vsDc=
github.com/segmentio/asm v1.2.0 h1:9BQrFxC+YOHJlTlHGkTrFWf59nbL3XnCoFLTwDCI7ys=
github.com/segmentio/asm v1.2.0/go.mod h1:BqMnlJP91P8d+4ibuonYZw9mfnzI9HfxselHZr5aAcs=
github.com/sergi/go-diff v1.0.0/go.mod h1:0CfEIISq7TuYL3j771MWULgwwjU+GofnZX9QAmXWZgo=
github.com/shabbyrobe/xmlwriter v0.0.0-20200208144257-9fca06d00ffa h1:2cO3RojjYl3hVTbEvJVqrMaFmORhL6O06qdW42toftk=
github.com/shabbyrobe/xmlwriter v0.0.0-20200208144257-9fca06d00ffa/go.mod h1:Yjr3bdWaVWyME1kha7X0jsz3k2DgXNa1Pj3XGyUAbx8=
@ -1842,6 +1860,7 @@ github.com/tidwall/gjson v1.18.0 h1:FIDeeyB800efLX89e5a8Y0BNH+LOngJyGrIWxG2FKQY=
github.com/tidwall/gjson v1.18.0/go.mod h1:/wbyibRr2FHMks5tjHJ5F8dMZh3AcwJEMf5vlfC0lxk=
github.com/tidwall/match v1.1.1 h1:+Ho715JplO36QYgwN9PGYNhgZvoUSc9X2c80KVTi+GA=
github.com/tidwall/match v1.1.1/go.mod h1:eRSPERbgtNPcGhD8UCthc6PmLEQXEWd3PRB5JTxsfmM=
github.com/tidwall/pretty v1.0.0/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk=
github.com/tidwall/pretty v1.2.0/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU=
github.com/tidwall/pretty v1.2.1 h1:qjsOFOWWQl+N3RsoF5/ssm1pHmJJwhjlSbZ51I6wMl4=
github.com/tidwall/pretty v1.2.1/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU=
@ -1890,7 +1909,9 @@ github.com/x448/float16 v0.8.4 h1:qLwI1I70+NjRFUR3zs1JPUCgaCXSh3SW62uAKT1mSBM=
github.com/x448/float16 v0.8.4/go.mod h1:14CWIYCyZA/cWjXOioeEpHeN/83MdbZDRQHoFcYsOfg=
github.com/xdg-go/pbkdf2 v1.0.0/go.mod h1:jrpuAogTd400dnrH08LKmI/xc1MbPOebTwRqcT5RDeI=
github.com/xdg-go/scram v1.0.2/go.mod h1:1WAq6h33pAW+iRreB34OORO2Nf7qel3VV3fjBj+hCSs=
github.com/xdg-go/scram v1.1.1/go.mod h1:RaEWvsqvNKKvBPvcKeFjrG2cJqOkHTiyTpzz23ni57g=
github.com/xdg-go/stringprep v1.0.2/go.mod h1:8F9zXuvzgwmyT5DUm4GUfZGDdT3W+LCvS6+da4O5kxM=
github.com/xdg-go/stringprep v1.0.3/go.mod h1:W3f5j4i+9rC0kuIEJL0ky1VpHXQU3ocBgklLGvcBnW8=
github.com/xeipuuv/gojsonpointer v0.0.0-20180127040702-4e3ac2762d5f/go.mod h1:N2zxlSyiKSe5eX1tZViRH5QA0qijqEDrYZiPEAiq3wU=
github.com/xeipuuv/gojsonreference v0.0.0-20180127040603-bd5ef7bd5415/go.mod h1:GwrjFmJcFw6At/Gs6z4yjiIwzuJ1/+UwLxMQDVQXShQ=
github.com/xeipuuv/gojsonschema v1.2.0/go.mod h1:anYRn/JVcOK2ZgGU+IjEV4nwlhoK5sQluxsYJ78Id3Y=
@ -1903,9 +1924,12 @@ github.com/xuri/excelize/v2 v2.9.0 h1:1tgOaEq92IOEumR1/JfYS/eR0KHOCsRv/rYXXh6YJQ
github.com/xuri/excelize/v2 v2.9.0/go.mod h1:uqey4QBZ9gdMeWApPLdhm9x+9o2lq4iVmjiLfBS5hdE=
github.com/xuri/nfp v0.0.0-20240318013403-ab9948c2c4a7 h1:hPVCafDV85blFTabnqKgNhDCkJX25eik94Si9cTER4A=
github.com/xuri/nfp v0.0.0-20240318013403-ab9948c2c4a7/go.mod h1:WwHg+CVyzlv/TX9xqBFXEZAuxOPxn2k1GNHwG41IIUQ=
github.com/xyproto/randomstring v1.0.5 h1:YtlWPoRdgMu3NZtP45drfy1GKoojuR7hmRcnhZqKjWU=
github.com/xyproto/randomstring v1.0.5/go.mod h1:rgmS5DeNXLivK7YprL0pY+lTuhNQW3iGxZ18UQApw/E=
github.com/yalp/jsonpath v0.0.0-20180802001716-5cc68e5049a0/go.mod h1:/LWChgwKmvncFJFHJ7Gvn9wZArjbV5/FppcK2fKk/tI=
github.com/yargevad/filepathx v1.0.0 h1:SYcT+N3tYGi+NvazubCNlvgIPbzAk7i7y2dwg3I5FYc=
github.com/yargevad/filepathx v1.0.0/go.mod h1:BprfX/gpYNJHJfc35GjRRpVcwWXS89gGulUIU5tK3tA=
github.com/youmark/pkcs8 v0.0.0-20181117223130-1be2e3e5546d/go.mod h1:rHwXgn7JulP+udvsHwJoVG1YGAP6VLg4y9I5dyZdqmA=
github.com/yudai/gojsondiff v1.0.0/go.mod h1:AY32+k2cwILAkW1fbgxQ5mUmMiZFgLIV+FBNExI05xg=
github.com/yudai/golcs v0.0.0-20170316035057-ecda9a501e82/go.mod h1:lgjkn3NuSvDfVJdfcVVdX+jpBxNmX4rDAzaS45IcYoM=
github.com/yudai/pp v2.0.1+incompatible/go.mod h1:PuxR/8QJ7cyCkFp/aUDS+JY727OFEZkTdatxwunjIkc=
@ -1945,6 +1969,7 @@ go.etcd.io/etcd/raft/v3 v3.5.5 h1:Ibz6XyZ60OYyRopu73lLM/P+qco3YtlZMOhnXNS051I=
go.etcd.io/etcd/raft/v3 v3.5.5/go.mod h1:76TA48q03g1y1VpTue92jZLr9lIHKUNcYdZOOGyx8rI=
go.etcd.io/etcd/server/v3 v3.5.5 h1:jNjYm/9s+f9A9r6+SC4RvNaz6AqixpOvhrFdT0PvIj0=
go.etcd.io/etcd/server/v3 v3.5.5/go.mod h1:rZ95vDw/jrvsbj9XpTqPrTAB9/kzchVdhRirySPkUBc=
go.mongodb.org/mongo-driver v1.11.4/go.mod h1:PTSz5yu21bkT/wXpkS7WR5f0ddqw5quethTUn9WM+2g=
go.opencensus.io v0.21.0/go.mod h1:mSImk1erAIZhrmZN+AvHh14ztQfjbGwt4TtuofqLduU=
go.opencensus.io v0.22.0/go.mod h1:+kGneAE2xo2IficOXnaByMWTGM9T73dGwxeWcUqIpI8=
go.opencensus.io v0.22.2/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw=
@ -2036,6 +2061,7 @@ golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5y
golang.org/x/crypto v0.0.0-20211108221036-ceb1ce70b4fa/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
golang.org/x/crypto v0.0.0-20220314234659-1baeb1ce4c0b/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
golang.org/x/crypto v0.0.0-20220411220226-7b82a4e95df4/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
golang.org/x/crypto v0.0.0-20220722155217-630584e8d5aa/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
golang.org/x/crypto v0.1.0/go.mod h1:RecgLatLF4+eUMCP1PoPZQb+cVrJcOPbHkTkbkB9sbw=
golang.org/x/crypto v0.6.0/go.mod h1:OFC/31mSvZgRz0V1QTNCzfAI1aIRzbiufJtkMIlEp58=
@ -2861,6 +2887,8 @@ gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gorm.io/datatypes v1.1.1-0.20230130040222-c43177d3cf8c h1:jWdr7cHgl8c/ua5vYbR2WhSp+NQmzhsj0xoY3foTzW8=
gorm.io/datatypes v1.1.1-0.20230130040222-c43177d3cf8c/go.mod h1:SH2K9R+2RMjuX1CkCONrPwoe9JzVv2hkQvEu4bXGojE=
gorm.io/driver/clickhouse v0.7.0 h1:BCrqvgONayvZRgtuA6hdya+eAW5P2QVagV3OlEp1vtA=
gorm.io/driver/clickhouse v0.7.0/go.mod h1:TmNo0wcVTsD4BBObiRnCahUgHJHjBIwuRejHwYt3JRs=
gorm.io/driver/mysql v1.5.7 h1:MndhOPYOfEp2rHKgkZIhJ16eVUIRf2HmzgoPmh7FCWo=
gorm.io/driver/mysql v1.5.7/go.mod h1:sEtPWMiqiN1N1cMXoXmBbd8C6/l+TESwriotuRRpkDM=
gorm.io/driver/postgres v1.5.11 h1:ubBVAfbKEUld/twyKZ0IYn9rSQh448EdelLYk9Mv314=

View File

@ -0,0 +1,98 @@
package telemetry
import "go.opentelemetry.io/otel/attribute"
const (
AttributeLogID = "_log_id" // string
AttributeSpaceID = "_space_id" // int64
AttributeType = "_type" // int
AttributeUserID = "_user_id" // string
AttributeEntityID = "_entity_id" // int64
AttributeEnvironment = "_env" // string
AttributeVersion = "_version" // string
AttributeInput = "_input" // string
AttributeOutput = "_output" // string
AttributeInputTokens = "_input_tokens" // int64
AttributeOutputToken = "_output_token" // int64
AttributeModel = "_model" // string of chatmodel.Config
AttributeTemperature = "_temperature" // float64
AttributeMessageID = "_message_id" // string
AttributeTimeToFirstToken = "_ttft" // int64, ms
AttributePrompt = "_prompt" // string
AttributeToolName = "_tool_name" // string
AttributeExecuteID = "_execute_id" // string
)
func NewSpanAttrLogID(logID string) attribute.KeyValue {
return attribute.String(AttributeLogID, logID)
}
func NewSpanAttrSpaceID(spaceID int64) attribute.KeyValue {
return attribute.Int64(AttributeSpaceID, spaceID)
}
func NewSpanAttrType(typ int64) attribute.KeyValue {
return attribute.Int64(AttributeType, typ)
}
func NewSpanAttrUserID(userID int64) attribute.KeyValue {
return attribute.Int64(AttributeUserID, userID)
}
func NewSpanAttrEntityID(entityID int64) attribute.KeyValue {
return attribute.Int64(AttributeEntityID, entityID)
}
func NewSpanAttrEnvironment(env string) attribute.KeyValue {
return attribute.String(AttributeEnvironment, env)
}
func NewSpanAttrVersion(version string) attribute.KeyValue {
return attribute.String(AttributeVersion, version)
}
func NewSpanAttrInput(input string) attribute.KeyValue {
return attribute.String(AttributeInput, input)
}
func NewSpanAttrInputTokens(inputTokens int64) attribute.KeyValue {
return attribute.Int64(AttributeInputTokens, inputTokens)
}
func NewSpanAttrOutput(output string) attribute.KeyValue {
return attribute.String(AttributeOutput, output)
}
func NewSpanAttrOutputTokens(outputTokens int64) attribute.KeyValue {
return attribute.Int64(AttributeOutputToken, outputTokens)
}
func NewSpanAttrModel(model string) attribute.KeyValue {
return attribute.String(AttributeModel, model)
}
func NewSpanAttrTemperature(temperature float64) attribute.KeyValue {
return attribute.Float64(AttributeTemperature, temperature)
}
func NewSpanAttrMessageID(messageID string) attribute.KeyValue {
return attribute.String(AttributeMessageID, messageID)
}
func NewSpanAttrTimeToFirstToken(timeToFirstToken int64) attribute.KeyValue {
return attribute.Int64(AttributeTimeToFirstToken, timeToFirstToken)
}
func NewSpanAttrPrompt(prompt string) attribute.KeyValue {
return attribute.String(AttributePrompt, prompt)
}
func NewSpanAttrToolName(toolName string) attribute.KeyValue {
return attribute.String(AttributeToolName, toolName)
}
func NewSpanAttrExecuteID(executeID string) attribute.KeyValue {
return attribute.String(AttributeExecuteID, executeID)
}

View File

@ -0,0 +1,54 @@
package telemetry
type SpanType int64
const (
Unknown SpanType = 1
UserInput SpanType = 2
ThirdParty SpanType = 3
ScheduledTasks SpanType = 4
OpenDialog SpanType = 5
InvokeAgent SpanType = 6
RestartAgent SpanType = 7
SwitchAgent SpanType = 8
LLMCall SpanType = 9
LLMBatchCall SpanType = 10
Workflow SpanType = 11
WorkflowStart SpanType = 12
WorkflowEnd SpanType = 13
PluginTool SpanType = 14
PluginToolBatch SpanType = 15
Knowledge SpanType = 16
Code SpanType = 17
CodeBatch SpanType = 18
Condition SpanType = 19
Chain SpanType = 20
Card SpanType = 21
WorkflowMessage SpanType = 22
WorkflowLLMCall SpanType = 23
WorkflowLLMBatchCall SpanType = 24
WorkflowCode SpanType = 25
WorkflowCodeBatch SpanType = 26
WorkflowCondition SpanType = 27
WorkflowPluginTool SpanType = 28
WorkflowPluginToolBatch SpanType = 29
WorkflowKnowledge SpanType = 30
WorkflowVariable SpanType = 31
WorkflowDatabase SpanType = 32
Variable SpanType = 33
Database SpanType = 34
LongTermMemory SpanType = 35
Hook SpanType = 36
BWStart SpanType = 37
BWEnd SpanType = 38
BWBatch SpanType = 39
BWLoop SpanType = 40
BWCondition SpanType = 41
BWLLM SpanType = 42
BWParallel SpanType = 43
BWScript SpanType = 44
BWVariable SpanType = 45
BWCallFlow SpanType = 46
BWConnector SpanType = 47
UserInputV2 SpanType = 48
)

View File

@ -0,0 +1,43 @@
package telemetry
import (
"context"
"time"
"go.opentelemetry.io/otel/codes"
tracesdk "go.opentelemetry.io/otel/sdk/trace"
"go.opentelemetry.io/otel/trace"
)
type TracerProvider interface {
trace.TracerProvider
Shutdown(ctx context.Context) error
}
type QueryClient interface {
ListSpan(ctx context.Context, request *ListTracesRequest) (
spans []tracesdk.ReadOnlySpan, nextCursor *string, hasMore bool, err error)
GetTrace(ctx context.Context, request *GetTraceRequest) ([]tracesdk.ReadOnlySpan, error)
}
type ListTracesRequest struct {
RootOnly bool
SpaceID int64
EntityID int64
Status codes.Code
StartAt time.Time
EndAt time.Time
Limit int
Cursor *string
}
type GetTraceRequest struct {
SpaceID int64
EntityID int64
TraceID *trace.TraceID
LogID *string
}

View File

@ -0,0 +1,38 @@
package clickhouse
import (
"github.com/ClickHouse/clickhouse-go/v2"
ckdriver "gorm.io/driver/clickhouse"
"gorm.io/gorm"
)
func newClickhouseDB(cfg *clickhouse.Options) (*gorm.DB, error) {
opt := *cfg
opt.MaxOpenConns = 0
opt.MaxIdleConns = 0
opt.ConnMaxLifetime = 0
conn := clickhouse.OpenDB(&opt)
if cfg.MaxIdleConns > 0 {
conn.SetMaxIdleConns(cfg.MaxIdleConns)
}
if cfg.MaxOpenConns > 0 {
conn.SetMaxOpenConns(cfg.MaxOpenConns)
}
if cfg.ConnMaxLifetime > 0 {
conn.SetConnMaxLifetime(cfg.ConnMaxLifetime)
}
if err := conn.Ping(); err != nil {
return nil, err
}
db, err := gorm.Open(ckdriver.New(ckdriver.Config{
Conn: conn,
}))
if err != nil {
return nil, err
}
return db, nil
}

View File

@ -0,0 +1,250 @@
package clickhouse
import (
"context"
"encoding/hex"
"fmt"
"strconv"
"time"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/sdk/resource"
tracesdk "go.opentelemetry.io/otel/sdk/trace"
"go.opentelemetry.io/otel/trace"
"github.com/coze-dev/coze-studio/backend/infra/contract/telemetry"
"github.com/coze-dev/coze-studio/backend/infra/impl/telemetry/clickhouse/internal/model"
)
func toSpanIndexModel(span tracesdk.ReadOnlySpan) (*model.SpansIndex, error) {
index := &model.SpansIndex{
SpanID: span.SpanContext().SpanID().String(),
TraceID: span.SpanContext().TraceID().String(),
ParentSpanID: span.Parent().SpanID().String(),
Name: span.Name(),
Kind: int8(span.SpanKind()),
StatusCode: int64(span.Status().Code),
StatusMsg: span.Status().Description,
StartTimeMs: uint64(span.StartTime().UnixMilli()),
}
for _, attr := range span.Attributes() {
switch attr.Key {
case telemetry.AttributeLogID:
index.LogID = attr.Value.AsString()
case telemetry.AttributeSpaceID:
index.SpaceID = attr.Value.AsInt64()
case telemetry.AttributeType:
index.Type = int32(attr.Value.AsInt64())
case telemetry.AttributeUserID:
index.UserID = attr.Value.AsInt64()
case telemetry.AttributeEntityID:
index.EntityID = attr.Value.AsInt64()
case telemetry.AttributeEnvironment:
index.Env = attr.Value.AsString()
case telemetry.AttributeVersion:
index.Version = attr.Value.AsString()
case telemetry.AttributeInput:
index.Input = attr.Value.AsString()
default:
// do nothing
}
}
return index, nil
}
func toSpanDataModel(span tracesdk.ReadOnlySpan) (*model.SpansData, error) {
data := &model.SpansData{
SpanID: span.SpanContext().SpanID().String(),
TraceID: span.SpanContext().TraceID().String(),
ParentSpanID: span.Parent().SpanID().String(),
Name: span.Name(),
Kind: int8(span.SpanKind()),
StatusCode: int64(span.Status().Code),
StatusMsg: span.Status().Description,
ResourceAttributes: make(map[string]string),
LogID: "",
StartTimeMs: uint64(span.StartTime().UnixMilli()),
AttrKeys: make([]string, 0, len(span.Attributes())),
AttrValues: make([]string, 0, len(span.Attributes())),
}
for _, attr := range span.Resource().Attributes() {
data.ResourceAttributes[string(attr.Key)] = attr.Value.Emit()
}
for _, attr := range span.Attributes() {
switch attr.Key {
case telemetry.AttributeLogID:
data.LogID = attr.Value.AsString()
default:
data.AttrKeys = append(data.AttrKeys, string(attr.Key))
data.AttrValues = append(data.AttrValues, attr.Value.Emit())
}
}
return data, nil
}
func fromSpanIndexModel(emptySpanID string) func(index *model.SpansIndex) (tracesdk.ReadOnlySpan, error) {
return func(index *model.SpansIndex) (tracesdk.ReadOnlySpan, error) {
traceID, err := hex.DecodeString(index.TraceID)
if err != nil {
return nil, err
}
if len(traceID) != 16 {
return nil, fmt.Errorf("[fromSpanIndexModel] invalid trace ID: %s", index.TraceID)
}
spanID, err := hex.DecodeString(index.SpanID)
if err != nil {
return nil, err
}
if len(spanID) != 8 {
return nil, fmt.Errorf("[fromSpanIndexModel] invalid span ID: %s", index.SpanID)
}
span := &ckSpan{
name: index.Name,
spanContext: trace.NewSpanContext(trace.SpanContextConfig{
TraceID: trace.TraceID(traceID),
SpanID: trace.SpanID(spanID),
}),
parent: trace.SpanContext{},
kind: trace.SpanKind(index.Kind),
startTime: time.UnixMilli(int64(index.StartTimeMs)),
resource: nil,
attributes: []attribute.KeyValue{
telemetry.NewSpanAttrLogID(index.LogID),
telemetry.NewSpanAttrSpaceID(index.SpaceID),
telemetry.NewSpanAttrType(int64(index.Type)),
telemetry.NewSpanAttrUserID(index.UserID),
telemetry.NewSpanAttrEntityID(index.EntityID),
telemetry.NewSpanAttrEnvironment(index.Env),
telemetry.NewSpanAttrVersion(index.Version),
telemetry.NewSpanAttrInput(index.Input),
},
status: tracesdk.Status{
Code: codes.Code(index.StatusCode),
Description: index.StatusMsg,
},
csCount: 0,
}
if index.ParentSpanID != emptySpanID {
parentSpanID, err := hex.DecodeString(index.ParentSpanID)
if err != nil {
return nil, err
}
span.parent = trace.NewSpanContext(trace.SpanContextConfig{
TraceID: trace.TraceID(traceID),
SpanID: trace.SpanID(parentSpanID),
})
}
return span, nil
}
}
func fromSpanDataModel(emptySpanID string) func(data *model.SpansData) (tracesdk.ReadOnlySpan, error) {
return func(data *model.SpansData) (tracesdk.ReadOnlySpan, error) {
traceID, err := hex.DecodeString(data.TraceID)
if err != nil {
return nil, err
}
if len(traceID) != 16 {
return nil, fmt.Errorf("[fromSpanDataModel] invalid trace ID: %s", data.TraceID)
}
spanID, err := hex.DecodeString(data.SpanID)
if err != nil {
return nil, err
}
if len(spanID) != 8 {
return nil, fmt.Errorf("[fromSpanDataModel] invalid span ID: %s", data.SpanID)
}
span := &ckSpan{
name: data.Name,
spanContext: trace.NewSpanContext(trace.SpanContextConfig{
TraceID: trace.TraceID(traceID),
SpanID: trace.SpanID(spanID),
}),
parent: trace.SpanContext{},
kind: trace.SpanKind(data.Kind),
startTime: time.UnixMilli(int64(data.StartTimeMs)),
endTime: time.UnixMilli(int64(data.EndTimeMs)),
resource: nil,
attributes: nil,
status: tracesdk.Status{
Code: codes.Code(data.StatusCode),
Description: data.StatusMsg,
},
csCount: 0,
}
if data.ParentSpanID != emptySpanID {
parentSpanID, err := hex.DecodeString(data.ParentSpanID)
if err != nil {
return nil, err
}
span.parent = trace.NewSpanContext(trace.SpanContextConfig{
TraceID: trace.TraceID(traceID),
SpanID: trace.SpanID(parentSpanID),
})
}
if len(data.AttrKeys) != len(data.AttrValues) {
return nil, fmt.Errorf("[fromSpanDataModel] invalid attribute count, keys=%d, vals=%d", len(data.AttrKeys), len(data.AttrValues))
}
var ra []attribute.KeyValue
for k, v := range data.ResourceAttributes {
// todo: resource value type
ra = append(ra, attribute.String(k, v))
}
rs, err := resource.New(context.Background(), resource.WithAttributes(ra...))
if err != nil {
return nil, err
}
span.resource = rs
for i := range data.AttrKeys {
key, val := data.AttrKeys[i], data.AttrValues[i]
var kv attribute.KeyValue
switch key {
case telemetry.AttributeSpaceID,
telemetry.AttributeType,
telemetry.AttributeUserID,
telemetry.AttributeEntityID,
telemetry.AttributeInputTokens,
telemetry.AttributeOutputToken:
i64, err := strconv.ParseInt(val, 10, 64)
if err != nil {
return nil, fmt.Errorf("[fromSpanDataModel] invalid attribute, key=%s, value=%s, expect value as int64, err=%w", key, val, err)
}
kv = attribute.Int64(key, i64)
case telemetry.AttributeLogID,
telemetry.AttributeEnvironment,
telemetry.AttributeVersion,
telemetry.AttributeInput,
telemetry.AttributeOutput,
telemetry.AttributeModel:
kv = attribute.String(key, val)
default:
kv = assertAttribute(key, val)
}
span.attributes = append(span.attributes, kv)
}
return span, nil
}
}
func assertAttribute(key string, val string) attribute.KeyValue {
return attribute.String(key, val)
}

View File

@ -0,0 +1,66 @@
package clickhouse
import (
"context"
"fmt"
"go.opentelemetry.io/otel/sdk/trace"
"github.com/coze-dev/coze-studio/backend/infra/impl/telemetry/clickhouse/internal/model"
"github.com/coze-dev/coze-studio/backend/infra/impl/telemetry/clickhouse/internal/query"
"github.com/coze-dev/coze-studio/backend/pkg/logs"
)
type exporter struct {
query *query.Query
indexRootOnly bool
}
func (e *exporter) ExportSpans(ctx context.Context, spans []trace.ReadOnlySpan) (err error) {
defer func() {
if err != nil {
err = fmt.Errorf("[ExportSpans] failed when exporting spans: %w", err)
logs.CtxErrorf(ctx, "%v", err)
}
}()
var (
spansIndex []*model.SpansIndex
spansData []*model.SpansData
)
for _, span := range spans {
if !e.indexRootOnly || !span.Parent().HasSpanID() {
index, err := toSpanIndexModel(span)
if err != nil {
return err
}
spansIndex = append(spansIndex, index)
}
data, err := toSpanDataModel(span)
if err != nil {
return err
}
spansData = append(spansData, data)
}
if len(spansData) > 0 {
if err = e.query.SpansData.WithContext(ctx).Debug().Create(spansData...); err != nil {
return err
}
}
if len(spansIndex) > 0 {
if err = e.query.SpansIndex.WithContext(ctx).Debug().Create(spansIndex...); err != nil {
return err
}
}
return nil
}
func (e *exporter) Shutdown(ctx context.Context) error {
return nil
}

View File

@ -0,0 +1,29 @@
// Code generated by gorm.io/gen. DO NOT EDIT.
// Code generated by gorm.io/gen. DO NOT EDIT.
// Code generated by gorm.io/gen. DO NOT EDIT.
package model
const TableNameSpansData = "spans_data"
// SpansData mapped from table <spans_data>
type SpansData struct {
SpanID string `gorm:"column:span_id;type:String;primaryKey" json:"span_id"`
TraceID string `gorm:"column:trace_id;type:String;primaryKey" json:"trace_id"`
ParentSpanID string `gorm:"column:parent_span_id;type:String;not null" json:"parent_span_id"`
Name string `gorm:"column:name;type:String;not null" json:"name"`
Kind int8 `gorm:"column:kind;type:Int8;not null" json:"kind"`
StatusCode int64 `gorm:"column:status_code;type:Int64;not null" json:"status_code"`
StatusMsg string `gorm:"column:status_msg;type:String;not null" json:"status_msg"`
ResourceAttributes map[string]string `gorm:"column:resource_attributes;type:Map(String, String);not null" json:"resource_attributes"`
StartTimeMs uint64 `gorm:"column:start_time_ms;type:UInt64;not null" json:"start_time_ms"`
EndTimeMs uint64 `gorm:"column:end_time_ms;type:UInt64;not null" json:"end_time_ms"`
LogID string `gorm:"column:log_id;type:String;not null" json:"log_id"`
AttrKeys []string `gorm:"column:attr_keys;type:Array(LowCardinality(String));not null" json:"attr_keys"`
AttrValues []string `gorm:"column:attr_values;type:Array(String);not null" json:"attr_values"`
}
// TableName SpansData's table name
func (*SpansData) TableName() string {
return TableNameSpansData
}

View File

@ -0,0 +1,32 @@
// Code generated by gorm.io/gen. DO NOT EDIT.
// Code generated by gorm.io/gen. DO NOT EDIT.
// Code generated by gorm.io/gen. DO NOT EDIT.
package model
const TableNameSpansIndex = "spans_index"
// SpansIndex mapped from table <spans_index>
type SpansIndex struct {
SpanID string `gorm:"column:span_id;type:String;not null" json:"span_id"`
TraceID string `gorm:"column:trace_id;type:String;not null" json:"trace_id"`
ParentSpanID string `gorm:"column:parent_span_id;type:String;not null" json:"parent_span_id"`
Name string `gorm:"column:name;type:String;not null" json:"name"`
Kind int8 `gorm:"column:kind;type:Int8;not null" json:"kind"`
StatusCode int64 `gorm:"column:status_code;type:Int64;not null" json:"status_code"`
StatusMsg string `gorm:"column:status_msg;type:String;not null" json:"status_msg"`
LogID string `gorm:"column:log_id;type:String;not null" json:"log_id"`
SpaceID int64 `gorm:"column:space_id;type:Int64;primaryKey" json:"space_id"`
Type int32 `gorm:"column:type;type:Int32;not null" json:"type"`
UserID int64 `gorm:"column:user_id;type:Int64;not null" json:"user_id"`
EntityID int64 `gorm:"column:entity_id;type:Int64;primaryKey" json:"entity_id"`
Env string `gorm:"column:env;type:String;not null" json:"env"`
Version string `gorm:"column:version;type:String;not null" json:"version"`
Input string `gorm:"column:input;type:String;not null" json:"input"`
StartTimeMs uint64 `gorm:"column:start_time_ms;type:UInt64;primaryKey" json:"start_time_ms"`
}
// TableName SpansIndex's table name
func (*SpansIndex) TableName() string {
return TableNameSpansIndex
}

View File

@ -0,0 +1,111 @@
// Code generated by gorm.io/gen. DO NOT EDIT.
// Code generated by gorm.io/gen. DO NOT EDIT.
// Code generated by gorm.io/gen. DO NOT EDIT.
package query
import (
"context"
"database/sql"
"gorm.io/gorm"
"gorm.io/gen"
"gorm.io/plugin/dbresolver"
)
var (
Q = new(Query)
SpansData *spansData
SpansIndex *spansIndex
)
func SetDefault(db *gorm.DB, opts ...gen.DOOption) {
*Q = *Use(db, opts...)
SpansData = &Q.SpansData
SpansIndex = &Q.SpansIndex
}
func Use(db *gorm.DB, opts ...gen.DOOption) *Query {
return &Query{
db: db,
SpansData: newSpansData(db, opts...),
SpansIndex: newSpansIndex(db, opts...),
}
}
type Query struct {
db *gorm.DB
SpansData spansData
SpansIndex spansIndex
}
func (q *Query) Available() bool { return q.db != nil }
func (q *Query) clone(db *gorm.DB) *Query {
return &Query{
db: db,
SpansData: q.SpansData.clone(db),
SpansIndex: q.SpansIndex.clone(db),
}
}
func (q *Query) ReadDB() *Query {
return q.ReplaceDB(q.db.Clauses(dbresolver.Read))
}
func (q *Query) WriteDB() *Query {
return q.ReplaceDB(q.db.Clauses(dbresolver.Write))
}
func (q *Query) ReplaceDB(db *gorm.DB) *Query {
return &Query{
db: db,
SpansData: q.SpansData.replaceDB(db),
SpansIndex: q.SpansIndex.replaceDB(db),
}
}
type queryCtx struct {
SpansData ISpansDataDo
SpansIndex ISpansIndexDo
}
func (q *Query) WithContext(ctx context.Context) *queryCtx {
return &queryCtx{
SpansData: q.SpansData.WithContext(ctx),
SpansIndex: q.SpansIndex.WithContext(ctx),
}
}
func (q *Query) Transaction(fc func(tx *Query) error, opts ...*sql.TxOptions) error {
return q.db.Transaction(func(tx *gorm.DB) error { return fc(q.clone(tx)) }, opts...)
}
func (q *Query) Begin(opts ...*sql.TxOptions) *QueryTx {
tx := q.db.Begin(opts...)
return &QueryTx{Query: q.clone(tx), Error: tx.Error}
}
type QueryTx struct {
*Query
Error error
}
func (q *QueryTx) Commit() error {
return q.db.Commit().Error
}
func (q *QueryTx) Rollback() error {
return q.db.Rollback().Error
}
func (q *QueryTx) SavePoint(name string) error {
return q.db.SavePoint(name).Error
}
func (q *QueryTx) RollbackTo(name string) error {
return q.db.RollbackTo(name).Error
}

View File

@ -0,0 +1,428 @@
// Code generated by gorm.io/gen. DO NOT EDIT.
// Code generated by gorm.io/gen. DO NOT EDIT.
// Code generated by gorm.io/gen. DO NOT EDIT.
package query
import (
"context"
"gorm.io/gorm"
"gorm.io/gorm/clause"
"gorm.io/gorm/schema"
"gorm.io/gen"
"gorm.io/gen/field"
"gorm.io/plugin/dbresolver"
"github.com/coze-dev/coze-studio/backend/infra/impl/telemetry/clickhouse/internal/model"
)
func newSpansData(db *gorm.DB, opts ...gen.DOOption) spansData {
_spansData := spansData{}
_spansData.spansDataDo.UseDB(db, opts...)
_spansData.spansDataDo.UseModel(&model.SpansData{})
tableName := _spansData.spansDataDo.TableName()
_spansData.ALL = field.NewAsterisk(tableName)
_spansData.SpanID = field.NewString(tableName, "span_id")
_spansData.TraceID = field.NewString(tableName, "trace_id")
_spansData.ParentSpanID = field.NewString(tableName, "parent_span_id")
_spansData.Name = field.NewString(tableName, "name")
_spansData.Kind = field.NewInt8(tableName, "kind")
_spansData.StatusCode = field.NewInt64(tableName, "status_code")
_spansData.StatusMsg = field.NewString(tableName, "status_msg")
_spansData.ResourceAttributes = field.NewField(tableName, "resource_attributes")
_spansData.StartTimeMs = field.NewUint64(tableName, "start_time_ms")
_spansData.EndTimeMs = field.NewUint64(tableName, "end_time_ms")
_spansData.LogID = field.NewString(tableName, "log_id")
_spansData.AttrKeys = field.NewField(tableName, "attr_keys")
_spansData.AttrValues = field.NewField(tableName, "attr_values")
_spansData.fillFieldMap()
return _spansData
}
type spansData struct {
spansDataDo
ALL field.Asterisk
SpanID field.String
TraceID field.String
ParentSpanID field.String
Name field.String
Kind field.Int8
StatusCode field.Int64
StatusMsg field.String
ResourceAttributes field.Field
StartTimeMs field.Uint64
EndTimeMs field.Uint64
LogID field.String
AttrKeys field.Field
AttrValues field.Field
fieldMap map[string]field.Expr
}
func (s spansData) Table(newTableName string) *spansData {
s.spansDataDo.UseTable(newTableName)
return s.updateTableName(newTableName)
}
func (s spansData) As(alias string) *spansData {
s.spansDataDo.DO = *(s.spansDataDo.As(alias).(*gen.DO))
return s.updateTableName(alias)
}
func (s *spansData) updateTableName(table string) *spansData {
s.ALL = field.NewAsterisk(table)
s.SpanID = field.NewString(table, "span_id")
s.TraceID = field.NewString(table, "trace_id")
s.ParentSpanID = field.NewString(table, "parent_span_id")
s.Name = field.NewString(table, "name")
s.Kind = field.NewInt8(table, "kind")
s.StatusCode = field.NewInt64(table, "status_code")
s.StatusMsg = field.NewString(table, "status_msg")
s.ResourceAttributes = field.NewField(table, "resource_attributes")
s.StartTimeMs = field.NewUint64(table, "start_time_ms")
s.EndTimeMs = field.NewUint64(table, "end_time_ms")
s.LogID = field.NewString(table, "log_id")
s.AttrKeys = field.NewField(table, "attr_keys")
s.AttrValues = field.NewField(table, "attr_values")
s.fillFieldMap()
return s
}
func (s *spansData) GetFieldByName(fieldName string) (field.OrderExpr, bool) {
_f, ok := s.fieldMap[fieldName]
if !ok || _f == nil {
return nil, false
}
_oe, ok := _f.(field.OrderExpr)
return _oe, ok
}
func (s *spansData) fillFieldMap() {
s.fieldMap = make(map[string]field.Expr, 13)
s.fieldMap["span_id"] = s.SpanID
s.fieldMap["trace_id"] = s.TraceID
s.fieldMap["parent_span_id"] = s.ParentSpanID
s.fieldMap["name"] = s.Name
s.fieldMap["kind"] = s.Kind
s.fieldMap["status_code"] = s.StatusCode
s.fieldMap["status_msg"] = s.StatusMsg
s.fieldMap["resource_attributes"] = s.ResourceAttributes
s.fieldMap["start_time_ms"] = s.StartTimeMs
s.fieldMap["end_time_ms"] = s.EndTimeMs
s.fieldMap["log_id"] = s.LogID
s.fieldMap["attr_keys"] = s.AttrKeys
s.fieldMap["attr_values"] = s.AttrValues
}
func (s spansData) clone(db *gorm.DB) spansData {
s.spansDataDo.ReplaceConnPool(db.Statement.ConnPool)
return s
}
func (s spansData) replaceDB(db *gorm.DB) spansData {
s.spansDataDo.ReplaceDB(db)
return s
}
type spansDataDo struct{ gen.DO }
type ISpansDataDo interface {
gen.SubQuery
Debug() ISpansDataDo
WithContext(ctx context.Context) ISpansDataDo
WithResult(fc func(tx gen.Dao)) gen.ResultInfo
ReplaceDB(db *gorm.DB)
ReadDB() ISpansDataDo
WriteDB() ISpansDataDo
As(alias string) gen.Dao
Session(config *gorm.Session) ISpansDataDo
Columns(cols ...field.Expr) gen.Columns
Clauses(conds ...clause.Expression) ISpansDataDo
Not(conds ...gen.Condition) ISpansDataDo
Or(conds ...gen.Condition) ISpansDataDo
Select(conds ...field.Expr) ISpansDataDo
Where(conds ...gen.Condition) ISpansDataDo
Order(conds ...field.Expr) ISpansDataDo
Distinct(cols ...field.Expr) ISpansDataDo
Omit(cols ...field.Expr) ISpansDataDo
Join(table schema.Tabler, on ...field.Expr) ISpansDataDo
LeftJoin(table schema.Tabler, on ...field.Expr) ISpansDataDo
RightJoin(table schema.Tabler, on ...field.Expr) ISpansDataDo
Group(cols ...field.Expr) ISpansDataDo
Having(conds ...gen.Condition) ISpansDataDo
Limit(limit int) ISpansDataDo
Offset(offset int) ISpansDataDo
Count() (count int64, err error)
Scopes(funcs ...func(gen.Dao) gen.Dao) ISpansDataDo
Unscoped() ISpansDataDo
Create(values ...*model.SpansData) error
CreateInBatches(values []*model.SpansData, batchSize int) error
Save(values ...*model.SpansData) error
First() (*model.SpansData, error)
Take() (*model.SpansData, error)
Last() (*model.SpansData, error)
Find() ([]*model.SpansData, error)
FindInBatch(batchSize int, fc func(tx gen.Dao, batch int) error) (results []*model.SpansData, err error)
FindInBatches(result *[]*model.SpansData, batchSize int, fc func(tx gen.Dao, batch int) error) error
Pluck(column field.Expr, dest interface{}) error
Delete(...*model.SpansData) (info gen.ResultInfo, err error)
Update(column field.Expr, value interface{}) (info gen.ResultInfo, err error)
UpdateSimple(columns ...field.AssignExpr) (info gen.ResultInfo, err error)
Updates(value interface{}) (info gen.ResultInfo, err error)
UpdateColumn(column field.Expr, value interface{}) (info gen.ResultInfo, err error)
UpdateColumnSimple(columns ...field.AssignExpr) (info gen.ResultInfo, err error)
UpdateColumns(value interface{}) (info gen.ResultInfo, err error)
UpdateFrom(q gen.SubQuery) gen.Dao
Attrs(attrs ...field.AssignExpr) ISpansDataDo
Assign(attrs ...field.AssignExpr) ISpansDataDo
Joins(fields ...field.RelationField) ISpansDataDo
Preload(fields ...field.RelationField) ISpansDataDo
FirstOrInit() (*model.SpansData, error)
FirstOrCreate() (*model.SpansData, error)
FindByPage(offset int, limit int) (result []*model.SpansData, count int64, err error)
ScanByPage(result interface{}, offset int, limit int) (count int64, err error)
Scan(result interface{}) (err error)
Returning(value interface{}, columns ...string) ISpansDataDo
UnderlyingDB() *gorm.DB
schema.Tabler
}
func (s spansDataDo) Debug() ISpansDataDo {
return s.withDO(s.DO.Debug())
}
func (s spansDataDo) WithContext(ctx context.Context) ISpansDataDo {
return s.withDO(s.DO.WithContext(ctx))
}
func (s spansDataDo) ReadDB() ISpansDataDo {
return s.Clauses(dbresolver.Read)
}
func (s spansDataDo) WriteDB() ISpansDataDo {
return s.Clauses(dbresolver.Write)
}
func (s spansDataDo) Session(config *gorm.Session) ISpansDataDo {
return s.withDO(s.DO.Session(config))
}
func (s spansDataDo) Clauses(conds ...clause.Expression) ISpansDataDo {
return s.withDO(s.DO.Clauses(conds...))
}
func (s spansDataDo) Returning(value interface{}, columns ...string) ISpansDataDo {
return s.withDO(s.DO.Returning(value, columns...))
}
func (s spansDataDo) Not(conds ...gen.Condition) ISpansDataDo {
return s.withDO(s.DO.Not(conds...))
}
func (s spansDataDo) Or(conds ...gen.Condition) ISpansDataDo {
return s.withDO(s.DO.Or(conds...))
}
func (s spansDataDo) Select(conds ...field.Expr) ISpansDataDo {
return s.withDO(s.DO.Select(conds...))
}
func (s spansDataDo) Where(conds ...gen.Condition) ISpansDataDo {
return s.withDO(s.DO.Where(conds...))
}
func (s spansDataDo) Order(conds ...field.Expr) ISpansDataDo {
return s.withDO(s.DO.Order(conds...))
}
func (s spansDataDo) Distinct(cols ...field.Expr) ISpansDataDo {
return s.withDO(s.DO.Distinct(cols...))
}
func (s spansDataDo) Omit(cols ...field.Expr) ISpansDataDo {
return s.withDO(s.DO.Omit(cols...))
}
func (s spansDataDo) Join(table schema.Tabler, on ...field.Expr) ISpansDataDo {
return s.withDO(s.DO.Join(table, on...))
}
func (s spansDataDo) LeftJoin(table schema.Tabler, on ...field.Expr) ISpansDataDo {
return s.withDO(s.DO.LeftJoin(table, on...))
}
func (s spansDataDo) RightJoin(table schema.Tabler, on ...field.Expr) ISpansDataDo {
return s.withDO(s.DO.RightJoin(table, on...))
}
func (s spansDataDo) Group(cols ...field.Expr) ISpansDataDo {
return s.withDO(s.DO.Group(cols...))
}
func (s spansDataDo) Having(conds ...gen.Condition) ISpansDataDo {
return s.withDO(s.DO.Having(conds...))
}
func (s spansDataDo) Limit(limit int) ISpansDataDo {
return s.withDO(s.DO.Limit(limit))
}
func (s spansDataDo) Offset(offset int) ISpansDataDo {
return s.withDO(s.DO.Offset(offset))
}
func (s spansDataDo) Scopes(funcs ...func(gen.Dao) gen.Dao) ISpansDataDo {
return s.withDO(s.DO.Scopes(funcs...))
}
func (s spansDataDo) Unscoped() ISpansDataDo {
return s.withDO(s.DO.Unscoped())
}
func (s spansDataDo) Create(values ...*model.SpansData) error {
if len(values) == 0 {
return nil
}
return s.DO.Create(values)
}
func (s spansDataDo) CreateInBatches(values []*model.SpansData, batchSize int) error {
return s.DO.CreateInBatches(values, batchSize)
}
// Save : !!! underlying implementation is different with GORM
// The method is equivalent to executing the statement: db.Clauses(clause.OnConflict{UpdateAll: true}).Create(values)
func (s spansDataDo) Save(values ...*model.SpansData) error {
if len(values) == 0 {
return nil
}
return s.DO.Save(values)
}
func (s spansDataDo) First() (*model.SpansData, error) {
if result, err := s.DO.First(); err != nil {
return nil, err
} else {
return result.(*model.SpansData), nil
}
}
func (s spansDataDo) Take() (*model.SpansData, error) {
if result, err := s.DO.Take(); err != nil {
return nil, err
} else {
return result.(*model.SpansData), nil
}
}
func (s spansDataDo) Last() (*model.SpansData, error) {
if result, err := s.DO.Last(); err != nil {
return nil, err
} else {
return result.(*model.SpansData), nil
}
}
func (s spansDataDo) Find() ([]*model.SpansData, error) {
result, err := s.DO.Find()
return result.([]*model.SpansData), err
}
func (s spansDataDo) FindInBatch(batchSize int, fc func(tx gen.Dao, batch int) error) (results []*model.SpansData, err error) {
buf := make([]*model.SpansData, 0, batchSize)
err = s.DO.FindInBatches(&buf, batchSize, func(tx gen.Dao, batch int) error {
defer func() { results = append(results, buf...) }()
return fc(tx, batch)
})
return results, err
}
func (s spansDataDo) FindInBatches(result *[]*model.SpansData, batchSize int, fc func(tx gen.Dao, batch int) error) error {
return s.DO.FindInBatches(result, batchSize, fc)
}
func (s spansDataDo) Attrs(attrs ...field.AssignExpr) ISpansDataDo {
return s.withDO(s.DO.Attrs(attrs...))
}
func (s spansDataDo) Assign(attrs ...field.AssignExpr) ISpansDataDo {
return s.withDO(s.DO.Assign(attrs...))
}
func (s spansDataDo) Joins(fields ...field.RelationField) ISpansDataDo {
for _, _f := range fields {
s = *s.withDO(s.DO.Joins(_f))
}
return &s
}
func (s spansDataDo) Preload(fields ...field.RelationField) ISpansDataDo {
for _, _f := range fields {
s = *s.withDO(s.DO.Preload(_f))
}
return &s
}
func (s spansDataDo) FirstOrInit() (*model.SpansData, error) {
if result, err := s.DO.FirstOrInit(); err != nil {
return nil, err
} else {
return result.(*model.SpansData), nil
}
}
func (s spansDataDo) FirstOrCreate() (*model.SpansData, error) {
if result, err := s.DO.FirstOrCreate(); err != nil {
return nil, err
} else {
return result.(*model.SpansData), nil
}
}
func (s spansDataDo) FindByPage(offset int, limit int) (result []*model.SpansData, count int64, err error) {
result, err = s.Offset(offset).Limit(limit).Find()
if err != nil {
return
}
if size := len(result); 0 < limit && 0 < size && size < limit {
count = int64(size + offset)
return
}
count, err = s.Offset(-1).Limit(-1).Count()
return
}
func (s spansDataDo) ScanByPage(result interface{}, offset int, limit int) (count int64, err error) {
count, err = s.Count()
if err != nil {
return
}
err = s.Offset(offset).Limit(limit).Scan(result)
return
}
func (s spansDataDo) Scan(result interface{}) (err error) {
return s.DO.Scan(result)
}
func (s spansDataDo) Delete(models ...*model.SpansData) (result gen.ResultInfo, err error) {
return s.DO.Delete(models)
}
func (s *spansDataDo) withDO(do gen.Dao) *spansDataDo {
s.DO = *do.(*gen.DO)
return s
}

View File

@ -0,0 +1,440 @@
// Code generated by gorm.io/gen. DO NOT EDIT.
// Code generated by gorm.io/gen. DO NOT EDIT.
// Code generated by gorm.io/gen. DO NOT EDIT.
package query
import (
"context"
"gorm.io/gorm"
"gorm.io/gorm/clause"
"gorm.io/gorm/schema"
"gorm.io/gen"
"gorm.io/gen/field"
"gorm.io/plugin/dbresolver"
"github.com/coze-dev/coze-studio/backend/infra/impl/telemetry/clickhouse/internal/model"
)
func newSpansIndex(db *gorm.DB, opts ...gen.DOOption) spansIndex {
_spansIndex := spansIndex{}
_spansIndex.spansIndexDo.UseDB(db, opts...)
_spansIndex.spansIndexDo.UseModel(&model.SpansIndex{})
tableName := _spansIndex.spansIndexDo.TableName()
_spansIndex.ALL = field.NewAsterisk(tableName)
_spansIndex.SpanID = field.NewString(tableName, "span_id")
_spansIndex.TraceID = field.NewString(tableName, "trace_id")
_spansIndex.ParentSpanID = field.NewString(tableName, "parent_span_id")
_spansIndex.Name = field.NewString(tableName, "name")
_spansIndex.Kind = field.NewInt8(tableName, "kind")
_spansIndex.StatusCode = field.NewInt64(tableName, "status_code")
_spansIndex.StatusMsg = field.NewString(tableName, "status_msg")
_spansIndex.LogID = field.NewString(tableName, "log_id")
_spansIndex.SpaceID = field.NewInt64(tableName, "space_id")
_spansIndex.Type = field.NewInt32(tableName, "type")
_spansIndex.UserID = field.NewInt64(tableName, "user_id")
_spansIndex.EntityID = field.NewInt64(tableName, "entity_id")
_spansIndex.Env = field.NewString(tableName, "env")
_spansIndex.Version = field.NewString(tableName, "version")
_spansIndex.Input = field.NewString(tableName, "input")
_spansIndex.StartTimeMs = field.NewUint64(tableName, "start_time_ms")
_spansIndex.fillFieldMap()
return _spansIndex
}
type spansIndex struct {
spansIndexDo
ALL field.Asterisk
SpanID field.String
TraceID field.String
ParentSpanID field.String
Name field.String
Kind field.Int8
StatusCode field.Int64
StatusMsg field.String
LogID field.String
SpaceID field.Int64
Type field.Int32
UserID field.Int64
EntityID field.Int64
Env field.String
Version field.String
Input field.String
StartTimeMs field.Uint64
fieldMap map[string]field.Expr
}
func (s spansIndex) Table(newTableName string) *spansIndex {
s.spansIndexDo.UseTable(newTableName)
return s.updateTableName(newTableName)
}
func (s spansIndex) As(alias string) *spansIndex {
s.spansIndexDo.DO = *(s.spansIndexDo.As(alias).(*gen.DO))
return s.updateTableName(alias)
}
func (s *spansIndex) updateTableName(table string) *spansIndex {
s.ALL = field.NewAsterisk(table)
s.SpanID = field.NewString(table, "span_id")
s.TraceID = field.NewString(table, "trace_id")
s.ParentSpanID = field.NewString(table, "parent_span_id")
s.Name = field.NewString(table, "name")
s.Kind = field.NewInt8(table, "kind")
s.StatusCode = field.NewInt64(table, "status_code")
s.StatusMsg = field.NewString(table, "status_msg")
s.LogID = field.NewString(table, "log_id")
s.SpaceID = field.NewInt64(table, "space_id")
s.Type = field.NewInt32(table, "type")
s.UserID = field.NewInt64(table, "user_id")
s.EntityID = field.NewInt64(table, "entity_id")
s.Env = field.NewString(table, "env")
s.Version = field.NewString(table, "version")
s.Input = field.NewString(table, "input")
s.StartTimeMs = field.NewUint64(table, "start_time_ms")
s.fillFieldMap()
return s
}
func (s *spansIndex) GetFieldByName(fieldName string) (field.OrderExpr, bool) {
_f, ok := s.fieldMap[fieldName]
if !ok || _f == nil {
return nil, false
}
_oe, ok := _f.(field.OrderExpr)
return _oe, ok
}
func (s *spansIndex) fillFieldMap() {
s.fieldMap = make(map[string]field.Expr, 16)
s.fieldMap["span_id"] = s.SpanID
s.fieldMap["trace_id"] = s.TraceID
s.fieldMap["parent_span_id"] = s.ParentSpanID
s.fieldMap["name"] = s.Name
s.fieldMap["kind"] = s.Kind
s.fieldMap["status_code"] = s.StatusCode
s.fieldMap["status_msg"] = s.StatusMsg
s.fieldMap["log_id"] = s.LogID
s.fieldMap["space_id"] = s.SpaceID
s.fieldMap["type"] = s.Type
s.fieldMap["user_id"] = s.UserID
s.fieldMap["entity_id"] = s.EntityID
s.fieldMap["env"] = s.Env
s.fieldMap["version"] = s.Version
s.fieldMap["input"] = s.Input
s.fieldMap["start_time_ms"] = s.StartTimeMs
}
func (s spansIndex) clone(db *gorm.DB) spansIndex {
s.spansIndexDo.ReplaceConnPool(db.Statement.ConnPool)
return s
}
func (s spansIndex) replaceDB(db *gorm.DB) spansIndex {
s.spansIndexDo.ReplaceDB(db)
return s
}
type spansIndexDo struct{ gen.DO }
type ISpansIndexDo interface {
gen.SubQuery
Debug() ISpansIndexDo
WithContext(ctx context.Context) ISpansIndexDo
WithResult(fc func(tx gen.Dao)) gen.ResultInfo
ReplaceDB(db *gorm.DB)
ReadDB() ISpansIndexDo
WriteDB() ISpansIndexDo
As(alias string) gen.Dao
Session(config *gorm.Session) ISpansIndexDo
Columns(cols ...field.Expr) gen.Columns
Clauses(conds ...clause.Expression) ISpansIndexDo
Not(conds ...gen.Condition) ISpansIndexDo
Or(conds ...gen.Condition) ISpansIndexDo
Select(conds ...field.Expr) ISpansIndexDo
Where(conds ...gen.Condition) ISpansIndexDo
Order(conds ...field.Expr) ISpansIndexDo
Distinct(cols ...field.Expr) ISpansIndexDo
Omit(cols ...field.Expr) ISpansIndexDo
Join(table schema.Tabler, on ...field.Expr) ISpansIndexDo
LeftJoin(table schema.Tabler, on ...field.Expr) ISpansIndexDo
RightJoin(table schema.Tabler, on ...field.Expr) ISpansIndexDo
Group(cols ...field.Expr) ISpansIndexDo
Having(conds ...gen.Condition) ISpansIndexDo
Limit(limit int) ISpansIndexDo
Offset(offset int) ISpansIndexDo
Count() (count int64, err error)
Scopes(funcs ...func(gen.Dao) gen.Dao) ISpansIndexDo
Unscoped() ISpansIndexDo
Create(values ...*model.SpansIndex) error
CreateInBatches(values []*model.SpansIndex, batchSize int) error
Save(values ...*model.SpansIndex) error
First() (*model.SpansIndex, error)
Take() (*model.SpansIndex, error)
Last() (*model.SpansIndex, error)
Find() ([]*model.SpansIndex, error)
FindInBatch(batchSize int, fc func(tx gen.Dao, batch int) error) (results []*model.SpansIndex, err error)
FindInBatches(result *[]*model.SpansIndex, batchSize int, fc func(tx gen.Dao, batch int) error) error
Pluck(column field.Expr, dest interface{}) error
Delete(...*model.SpansIndex) (info gen.ResultInfo, err error)
Update(column field.Expr, value interface{}) (info gen.ResultInfo, err error)
UpdateSimple(columns ...field.AssignExpr) (info gen.ResultInfo, err error)
Updates(value interface{}) (info gen.ResultInfo, err error)
UpdateColumn(column field.Expr, value interface{}) (info gen.ResultInfo, err error)
UpdateColumnSimple(columns ...field.AssignExpr) (info gen.ResultInfo, err error)
UpdateColumns(value interface{}) (info gen.ResultInfo, err error)
UpdateFrom(q gen.SubQuery) gen.Dao
Attrs(attrs ...field.AssignExpr) ISpansIndexDo
Assign(attrs ...field.AssignExpr) ISpansIndexDo
Joins(fields ...field.RelationField) ISpansIndexDo
Preload(fields ...field.RelationField) ISpansIndexDo
FirstOrInit() (*model.SpansIndex, error)
FirstOrCreate() (*model.SpansIndex, error)
FindByPage(offset int, limit int) (result []*model.SpansIndex, count int64, err error)
ScanByPage(result interface{}, offset int, limit int) (count int64, err error)
Scan(result interface{}) (err error)
Returning(value interface{}, columns ...string) ISpansIndexDo
UnderlyingDB() *gorm.DB
schema.Tabler
}
func (s spansIndexDo) Debug() ISpansIndexDo {
return s.withDO(s.DO.Debug())
}
func (s spansIndexDo) WithContext(ctx context.Context) ISpansIndexDo {
return s.withDO(s.DO.WithContext(ctx))
}
func (s spansIndexDo) ReadDB() ISpansIndexDo {
return s.Clauses(dbresolver.Read)
}
func (s spansIndexDo) WriteDB() ISpansIndexDo {
return s.Clauses(dbresolver.Write)
}
func (s spansIndexDo) Session(config *gorm.Session) ISpansIndexDo {
return s.withDO(s.DO.Session(config))
}
func (s spansIndexDo) Clauses(conds ...clause.Expression) ISpansIndexDo {
return s.withDO(s.DO.Clauses(conds...))
}
func (s spansIndexDo) Returning(value interface{}, columns ...string) ISpansIndexDo {
return s.withDO(s.DO.Returning(value, columns...))
}
func (s spansIndexDo) Not(conds ...gen.Condition) ISpansIndexDo {
return s.withDO(s.DO.Not(conds...))
}
func (s spansIndexDo) Or(conds ...gen.Condition) ISpansIndexDo {
return s.withDO(s.DO.Or(conds...))
}
func (s spansIndexDo) Select(conds ...field.Expr) ISpansIndexDo {
return s.withDO(s.DO.Select(conds...))
}
func (s spansIndexDo) Where(conds ...gen.Condition) ISpansIndexDo {
return s.withDO(s.DO.Where(conds...))
}
func (s spansIndexDo) Order(conds ...field.Expr) ISpansIndexDo {
return s.withDO(s.DO.Order(conds...))
}
func (s spansIndexDo) Distinct(cols ...field.Expr) ISpansIndexDo {
return s.withDO(s.DO.Distinct(cols...))
}
func (s spansIndexDo) Omit(cols ...field.Expr) ISpansIndexDo {
return s.withDO(s.DO.Omit(cols...))
}
func (s spansIndexDo) Join(table schema.Tabler, on ...field.Expr) ISpansIndexDo {
return s.withDO(s.DO.Join(table, on...))
}
func (s spansIndexDo) LeftJoin(table schema.Tabler, on ...field.Expr) ISpansIndexDo {
return s.withDO(s.DO.LeftJoin(table, on...))
}
func (s spansIndexDo) RightJoin(table schema.Tabler, on ...field.Expr) ISpansIndexDo {
return s.withDO(s.DO.RightJoin(table, on...))
}
func (s spansIndexDo) Group(cols ...field.Expr) ISpansIndexDo {
return s.withDO(s.DO.Group(cols...))
}
func (s spansIndexDo) Having(conds ...gen.Condition) ISpansIndexDo {
return s.withDO(s.DO.Having(conds...))
}
func (s spansIndexDo) Limit(limit int) ISpansIndexDo {
return s.withDO(s.DO.Limit(limit))
}
func (s spansIndexDo) Offset(offset int) ISpansIndexDo {
return s.withDO(s.DO.Offset(offset))
}
func (s spansIndexDo) Scopes(funcs ...func(gen.Dao) gen.Dao) ISpansIndexDo {
return s.withDO(s.DO.Scopes(funcs...))
}
func (s spansIndexDo) Unscoped() ISpansIndexDo {
return s.withDO(s.DO.Unscoped())
}
func (s spansIndexDo) Create(values ...*model.SpansIndex) error {
if len(values) == 0 {
return nil
}
return s.DO.Create(values)
}
func (s spansIndexDo) CreateInBatches(values []*model.SpansIndex, batchSize int) error {
return s.DO.CreateInBatches(values, batchSize)
}
// Save : !!! underlying implementation is different with GORM
// The method is equivalent to executing the statement: db.Clauses(clause.OnConflict{UpdateAll: true}).Create(values)
func (s spansIndexDo) Save(values ...*model.SpansIndex) error {
if len(values) == 0 {
return nil
}
return s.DO.Save(values)
}
func (s spansIndexDo) First() (*model.SpansIndex, error) {
if result, err := s.DO.First(); err != nil {
return nil, err
} else {
return result.(*model.SpansIndex), nil
}
}
func (s spansIndexDo) Take() (*model.SpansIndex, error) {
if result, err := s.DO.Take(); err != nil {
return nil, err
} else {
return result.(*model.SpansIndex), nil
}
}
func (s spansIndexDo) Last() (*model.SpansIndex, error) {
if result, err := s.DO.Last(); err != nil {
return nil, err
} else {
return result.(*model.SpansIndex), nil
}
}
func (s spansIndexDo) Find() ([]*model.SpansIndex, error) {
result, err := s.DO.Find()
return result.([]*model.SpansIndex), err
}
func (s spansIndexDo) FindInBatch(batchSize int, fc func(tx gen.Dao, batch int) error) (results []*model.SpansIndex, err error) {
buf := make([]*model.SpansIndex, 0, batchSize)
err = s.DO.FindInBatches(&buf, batchSize, func(tx gen.Dao, batch int) error {
defer func() { results = append(results, buf...) }()
return fc(tx, batch)
})
return results, err
}
func (s spansIndexDo) FindInBatches(result *[]*model.SpansIndex, batchSize int, fc func(tx gen.Dao, batch int) error) error {
return s.DO.FindInBatches(result, batchSize, fc)
}
func (s spansIndexDo) Attrs(attrs ...field.AssignExpr) ISpansIndexDo {
return s.withDO(s.DO.Attrs(attrs...))
}
func (s spansIndexDo) Assign(attrs ...field.AssignExpr) ISpansIndexDo {
return s.withDO(s.DO.Assign(attrs...))
}
func (s spansIndexDo) Joins(fields ...field.RelationField) ISpansIndexDo {
for _, _f := range fields {
s = *s.withDO(s.DO.Joins(_f))
}
return &s
}
func (s spansIndexDo) Preload(fields ...field.RelationField) ISpansIndexDo {
for _, _f := range fields {
s = *s.withDO(s.DO.Preload(_f))
}
return &s
}
func (s spansIndexDo) FirstOrInit() (*model.SpansIndex, error) {
if result, err := s.DO.FirstOrInit(); err != nil {
return nil, err
} else {
return result.(*model.SpansIndex), nil
}
}
func (s spansIndexDo) FirstOrCreate() (*model.SpansIndex, error) {
if result, err := s.DO.FirstOrCreate(); err != nil {
return nil, err
} else {
return result.(*model.SpansIndex), nil
}
}
func (s spansIndexDo) FindByPage(offset int, limit int) (result []*model.SpansIndex, count int64, err error) {
result, err = s.Offset(offset).Limit(limit).Find()
if err != nil {
return
}
if size := len(result); 0 < limit && 0 < size && size < limit {
count = int64(size + offset)
return
}
count, err = s.Offset(-1).Limit(-1).Count()
return
}
func (s spansIndexDo) ScanByPage(result interface{}, offset int, limit int) (count int64, err error) {
count, err = s.Count()
if err != nil {
return
}
err = s.Offset(offset).Limit(limit).Scan(result)
return
}
func (s spansIndexDo) Scan(result interface{}) (err error) {
return s.DO.Scan(result)
}
func (s spansIndexDo) Delete(models ...*model.SpansIndex) (result gen.ResultInfo, err error) {
return s.DO.Delete(models)
}
func (s *spansIndexDo) withDO(do gen.Dao) *spansIndexDo {
s.DO = *do.(*gen.DO)
return s
}

View File

@ -0,0 +1,139 @@
package clickhouse
import (
"context"
"fmt"
"strconv"
"github.com/ClickHouse/clickhouse-go/v2"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/sdk/trace"
"gorm.io/gen"
"github.com/coze-dev/coze-studio/backend/infra/contract/telemetry"
"github.com/coze-dev/coze-studio/backend/infra/impl/telemetry/clickhouse/internal/query"
"github.com/coze-dev/coze-studio/backend/pkg/lang/ptr"
"github.com/coze-dev/coze-studio/backend/pkg/lang/slices"
)
const (
emptySpanID = "0000000000000000"
)
type QueryClientConfig struct {
ClickhouseOptions *clickhouse.Options
EmptySpanID *string
}
func NewQueryClient(config *QueryClientConfig) (telemetry.QueryClient, error) {
db, err := newClickhouseDB(config.ClickhouseOptions)
if err != nil {
return nil, err
}
qc := &queryClient{
query: query.Use(db),
emptySpanID: emptySpanID,
}
if config.EmptySpanID != nil {
qc.emptySpanID = *config.EmptySpanID
}
return qc, nil
}
type queryClient struct {
query *query.Query
emptySpanID string
}
func (q *queryClient) ListSpan(ctx context.Context, request *telemetry.ListTracesRequest) (
spans []trace.ReadOnlySpan, nextCursor *string, hasMore bool, err error) {
if request.SpaceID == 0 || request.EntityID == 0 || (request.StartAt.IsZero() && request.EndAt.IsZero()) {
return nil, nil, false, fmt.Errorf("[ListSpan] invalid request params")
}
si := q.query.SpansIndex
conds := []gen.Condition{
si.SpaceID.Eq(request.SpaceID),
si.EntityID.Eq(request.EntityID),
}
if !request.StartAt.IsZero() {
conds = append(conds, si.StartTimeMs.Gte(uint64(request.StartAt.UnixMilli())))
}
if !request.EndAt.IsZero() {
conds = append(conds, si.StartTimeMs.Lte(uint64(request.EndAt.UnixMilli())))
}
if request.RootOnly {
conds = append(conds, si.ParentSpanID.Eq(q.emptySpanID))
}
if request.Status != codes.Unset {
conds = append(conds, si.StatusCode.Eq(int64(request.Status)))
}
if request.Cursor != nil {
ms, err := strconv.ParseInt(*request.Cursor, 10, 64)
if err != nil {
return nil, nil, false, err
}
conds = append(conds, si.StartTimeMs.Lt(uint64(ms)))
}
limit := request.Limit
if limit == 0 {
limit = 30
}
indexes, err := si.WithContext(ctx).Debug().
Where(conds...).
Order(si.StartTimeMs.Desc()).
Limit(limit).
Find()
if err != nil {
return nil, nil, false, err
}
spans, err = slices.TransformWithErrorCheck(indexes, fromSpanIndexModel(q.emptySpanID))
if err != nil {
return nil, nil, false, err
}
if len(indexes) == limit {
hasMore = true
ms := indexes[len(indexes)-1].StartTimeMs
nextCursor = ptr.Of(strconv.FormatInt(int64(ms), 10))
}
return spans, nextCursor, hasMore, nil
}
func (q *queryClient) GetTrace(ctx context.Context, request *telemetry.GetTraceRequest) ([]trace.ReadOnlySpan, error) {
sd := q.query.SpansData
// TODO: 看下 space_id 和 entity_id 是否要加到 spans_data 表
var conds []gen.Condition
if request.TraceID == nil && request.LogID == nil {
return nil, fmt.Errorf("[GetTrace] both traceID and logID is nil")
}
if request.TraceID != nil {
conds = append(conds, sd.TraceID.Eq(request.TraceID.String()))
}
if request.LogID != nil {
conds = append(conds, sd.LogID.Eq(*request.LogID))
}
rows, err := sd.WithContext(ctx).Debug().Where(conds...).Limit(-1).Find()
if err != nil {
return nil, err
}
spans, err := slices.TransformWithErrorCheck(rows, fromSpanDataModel(q.emptySpanID))
if err != nil {
return nil, err
}
return spans, nil
}

View File

@ -0,0 +1,98 @@
package clickhouse
import (
"time"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/sdk/instrumentation"
"go.opentelemetry.io/otel/sdk/resource"
tracesdk "go.opentelemetry.io/otel/sdk/trace"
"go.opentelemetry.io/otel/trace"
)
type ckSpan struct {
tracesdk.ReadOnlySpan
name string
spanContext trace.SpanContext
parent trace.SpanContext
kind trace.SpanKind
startTime time.Time
endTime time.Time
resource *resource.Resource
attributes []attribute.KeyValue
status tracesdk.Status
csCount int
}
var _ tracesdk.ReadOnlySpan = ckSpan{}
func (c ckSpan) Name() string {
return c.name
}
func (c ckSpan) SpanContext() trace.SpanContext {
return c.spanContext
}
func (c ckSpan) Parent() trace.SpanContext {
return c.parent
}
func (c ckSpan) SpanKind() trace.SpanKind {
return c.kind
}
func (c ckSpan) StartTime() time.Time {
return c.startTime
}
func (c ckSpan) EndTime() time.Time {
return c.endTime
}
func (c ckSpan) Attributes() []attribute.KeyValue {
return c.attributes
}
func (c ckSpan) Links() []tracesdk.Link {
return nil
}
func (c ckSpan) Events() []tracesdk.Event {
return nil
}
func (c ckSpan) Status() tracesdk.Status {
return c.status
}
func (c ckSpan) InstrumentationScope() instrumentation.Scope {
return instrumentation.Scope{}
}
func (c ckSpan) InstrumentationLibrary() instrumentation.Library {
return instrumentation.Library{}
}
func (c ckSpan) Resource() *resource.Resource {
return c.resource
}
func (c ckSpan) DroppedAttributes() int {
return 0
}
func (c ckSpan) DroppedLinks() int {
return 0
}
func (c ckSpan) DroppedEvents() int {
return 0
}
func (c ckSpan) ChildSpanCount() int {
return c.csCount
}

View File

@ -0,0 +1,310 @@
package clickhouse
import (
"context"
"encoding/json"
"fmt"
"os"
"testing"
"time"
"github.com/ClickHouse/clickhouse-go/v2"
"github.com/cloudwego/eino-ext/components/model/openai"
"github.com/cloudwego/eino/callbacks"
"github.com/cloudwego/eino/components"
"github.com/cloudwego/eino/components/model"
"github.com/cloudwego/eino/compose"
"github.com/cloudwego/eino/schema"
"github.com/google/uuid"
"github.com/stretchr/testify/assert"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/trace"
"github.com/coze-dev/coze-studio/backend/infra/contract/telemetry"
"github.com/coze-dev/coze-studio/backend/pkg/lang/ptr"
)
func TestExporter(t *testing.T) {
if os.Getenv("ENABLE_CK_TELEMETRY_TEST") != "true" {
return
}
username, pwd := os.Getenv("CK_USERNAME"), os.Getenv("CK_PASSWORD")
baseURL, apiKey := os.Getenv("OPENAI_BASE_URL"), os.Getenv("OPENAI_API_KEY")
ctx := context.Background()
tp, err := NewTracerProvider(&TracerConfig{
ClickhouseOptions: &clickhouse.Options{
Addr: []string{"localhost:8124"},
Auth: clickhouse.Auth{
Database: "default",
Username: username,
Password: pwd,
},
Debug: true,
Debugf: func(format string, v ...any) {
fmt.Printf(format+"\n", v...)
},
Settings: clickhouse.Settings{
"max_execution_time": 60,
},
Compression: &clickhouse.Compression{
Method: clickhouse.CompressionZSTD,
Level: 1,
},
DialTimeout: time.Second * 30,
MaxOpenConns: 5,
MaxIdleConns: 5,
ConnMaxLifetime: time.Duration(10) * time.Minute,
ConnOpenStrategy: clickhouse.ConnOpenInOrder,
BlockBufferSize: 10,
MaxCompressionBuffer: 10240,
},
TracerProviderOptions: nil,
IndexRootOnly: false,
})
assert.NoError(t, err)
defer tp.Shutdown(ctx)
ch := &callbackHandler{tp.Tracer("test_tracer")}
cm, err := openai.NewChatModel(ctx, &openai.ChatModelConfig{
APIKey: apiKey,
ByAzure: true,
BaseURL: baseURL,
Model: "gpt-4o-2024-05-13",
})
assert.NoError(t, err)
g := compose.NewChain[string, string]().
AppendLambda(compose.InvokableLambda(func(ctx context.Context, input string) (output []*schema.Message, err error) {
return []*schema.Message{
schema.UserMessage(input),
}, nil
})).
AppendChatModel(cm).
AppendLambda(compose.InvokableLambda(func(ctx context.Context, input *schema.Message) (output string, err error) {
return input.Content, nil
}))
r, err := g.Compile(ctx)
assert.NoError(t, err)
resp, err := r.Invoke(
context.WithValue(ctx, "log-id", uuid.New().String()),
"hello",
compose.WithCallbacks(ch),
)
assert.NoError(t, err)
fmt.Println(resp)
}
func TestNewQueryClient(t *testing.T) {
if os.Getenv("ENABLE_CK_TELEMETRY_TEST") != "true" {
return
}
username, pwd := os.Getenv("CK_USERNAME"), os.Getenv("CK_PASSWORD")
ctx := context.Background()
qc, err := NewQueryClient(&QueryClientConfig{
ClickhouseOptions: &clickhouse.Options{
Addr: []string{"localhost:8124"},
Auth: clickhouse.Auth{
Database: "default",
Username: username,
Password: pwd,
},
Debug: true,
Debugf: func(format string, v ...any) {
fmt.Printf(format+"\n", v...)
},
Settings: clickhouse.Settings{
"max_execution_time": 60,
},
Compression: &clickhouse.Compression{
Method: clickhouse.CompressionZSTD,
Level: 1,
},
DialTimeout: time.Second * 30,
MaxOpenConns: 5,
MaxIdleConns: 5,
ConnMaxLifetime: time.Duration(10) * time.Minute,
ConnOpenStrategy: clickhouse.ConnOpenInOrder,
BlockBufferSize: 10,
MaxCompressionBuffer: 10240,
},
EmptySpanID: nil,
})
assert.NoError(t, err)
spans, nextCursor, hasMore, err := qc.ListSpan(ctx, &telemetry.ListTracesRequest{
RootOnly: true,
SpaceID: 7521695817242001408,
EntityID: 5566,
Status: 0,
StartAt: time.Unix(1751461407, 0),
EndAt: time.Unix(1753547807, 0),
Limit: 10,
Cursor: nil,
})
assert.NoError(t, err)
assert.False(t, hasMore)
fmt.Println(nextCursor)
assert.True(t, len(spans) > 0)
first := spans[0]
fullTrace, err := qc.GetTrace(ctx, &telemetry.GetTraceRequest{
SpaceID: 0,
EntityID: 0,
TraceID: ptr.Of(first.SpanContext().TraceID()),
})
assert.NoError(t, err)
assert.True(t, len(fullTrace) == 4)
for _, item := range fullTrace {
if item.Name() == "Chain" {
assert.False(t, item.Parent().IsValid())
} else {
assert.True(t, item.Parent().IsValid())
}
}
var logID string
for _, attr := range first.Attributes() {
if attr.Key == telemetry.AttributeLogID {
logID = attr.Value.AsString()
}
}
assert.True(t, logID != "")
fullTrace, err = qc.GetTrace(ctx, &telemetry.GetTraceRequest{
SpaceID: 0,
EntityID: 0,
LogID: ptr.Of(logID),
})
assert.NoError(t, err)
assert.True(t, len(fullTrace) == 4)
for _, item := range fullTrace {
if item.Name() == "Chain" {
assert.False(t, item.Parent().IsValid())
} else {
assert.True(t, item.Parent().IsValid())
}
}
}
type callbackHandler struct {
tracer trace.Tracer
}
func (c callbackHandler) OnStart(ctx context.Context, info *callbacks.RunInfo, input callbacks.CallbackInput) context.Context {
name := info.Name
if name == "" {
name = fmt.Sprintf("%s%s", info.Component, info.Type)
}
ctx, span := c.tracer.Start(ctx, info.Name)
span.SetName(name)
var (
spanType telemetry.SpanType
strInput string
)
switch info.Component {
case components.ComponentOfChatModel:
spanType = telemetry.LLMCall
i := model.ConvCallbackInput(input)
b, _ := json.Marshal(i.Messages)
strInput = string(b)
case compose.ComponentOfGraph, compose.ComponentOfChain:
spanType = telemetry.UserInput
if i, ok := input.(string); ok {
strInput = i
} else {
b, _ := json.Marshal(input)
strInput = string(b)
}
default:
spanType = telemetry.Unknown
if i, ok := input.(string); ok {
strInput = i
} else {
b, _ := json.Marshal(input)
strInput = string(b)
}
}
attrs := []attribute.KeyValue{
telemetry.NewSpanAttrLogID(ctx.Value("log-id").(string)),
telemetry.NewSpanAttrSpaceID(int64(7521695817242001408)),
telemetry.NewSpanAttrType(int64(spanType)),
telemetry.NewSpanAttrUserID(int64(3344)),
telemetry.NewSpanAttrEntityID(int64(5566)),
telemetry.NewSpanAttrEnvironment("dev"),
telemetry.NewSpanAttrVersion("1"),
telemetry.NewSpanAttrInput(strInput),
}
span.SetAttributes(attrs...)
return ctx
}
func (c callbackHandler) OnEnd(ctx context.Context, info *callbacks.RunInfo, output callbacks.CallbackOutput) context.Context {
span := trace.SpanFromContext(ctx)
var (
strOutput string
attrs []attribute.KeyValue
)
switch info.Component {
case components.ComponentOfChatModel:
o := model.ConvCallbackOutput(output)
attrs = append(attrs)
b, _ := json.Marshal(o.Message)
strOutput = string(b)
if o.TokenUsage != nil {
attrs = append(attrs,
telemetry.NewSpanAttrInputTokens(int64(o.TokenUsage.PromptTokens)),
telemetry.NewSpanAttrOutputTokens(int64(o.TokenUsage.CompletionTokens)),
)
}
if o.Config != nil {
attrs = append(attrs, telemetry.NewSpanAttrModel(o.Config.Model))
}
default:
if i, ok := output.(string); ok {
strOutput = i
} else {
b, _ := json.Marshal(output)
strOutput = string(b)
}
}
attrs = append(attrs, telemetry.NewSpanAttrOutput(strOutput))
span.SetAttributes(attrs...)
span.SetStatus(codes.Ok, "")
span.End()
return ctx
}
func (c callbackHandler) OnError(ctx context.Context, info *callbacks.RunInfo, err error) context.Context {
span := trace.SpanFromContext(ctx)
span.SetStatus(codes.Error, err.Error())
span.End()
return ctx
}
func (c callbackHandler) OnStartWithStreamInput(ctx context.Context, info *callbacks.RunInfo, input *schema.StreamReader[callbacks.CallbackInput]) context.Context {
input.Close()
return ctx
}
func (c callbackHandler) OnEndWithStreamOutput(ctx context.Context, info *callbacks.RunInfo, output *schema.StreamReader[callbacks.CallbackOutput]) context.Context {
output.Close()
return ctx
}

View File

@ -0,0 +1,45 @@
package clickhouse
import (
"context"
"github.com/ClickHouse/clickhouse-go/v2"
"go.opentelemetry.io/otel/sdk/resource"
"go.opentelemetry.io/otel/sdk/trace"
"github.com/coze-dev/coze-studio/backend/infra/contract/telemetry"
"github.com/coze-dev/coze-studio/backend/infra/impl/telemetry/clickhouse/internal/query"
)
type TracerConfig struct {
ClickhouseOptions *clickhouse.Options
TracerProviderOptions []trace.TracerProviderOption
IndexRootOnly bool
}
func NewTracerProvider(cfg *TracerConfig) (telemetry.TracerProvider, error) {
db, err := newClickhouseDB(cfg.ClickhouseOptions)
if err != nil {
return nil, err
}
exp := &exporter{query: query.Use(db), indexRootOnly: cfg.IndexRootOnly}
rcs, err := resource.New(
context.Background(),
resource.WithHost(),
resource.WithFromEnv(),
resource.WithProcessPID(),
resource.WithTelemetrySDK())
if err != nil {
return nil, err
}
bsp := trace.NewBatchSpanProcessor(exp)
tp := trace.NewTracerProvider(append([]trace.TracerProviderOption{
trace.WithSpanProcessor(bsp),
trace.WithResource(rcs),
trace.WithSampler(trace.AlwaysSample()),
}, cfg.TracerProviderOptions...)...)
return tp, nil
}

View File

@ -32,6 +32,7 @@ import (
"github.com/cloudwego/hertz/pkg/app/server"
"github.com/cloudwego/hertz/pkg/common/config"
"github.com/cloudwego/hertz/pkg/route"
"github.com/hertz-contrib/cors"
"github.com/joho/godotenv"
@ -56,15 +57,16 @@ func main() {
setLogLevel()
if err := application.Init(ctx); err != nil {
shutdown, err := application.Init(ctx)
if err != nil {
panic("InitializeInfra failed, err=" + err.Error())
}
asyncStartMinioProxyServer(ctx)
startHttpServer()
startHttpServer(shutdown)
}
func startHttpServer() {
func startHttpServer(shutdown []route.CtxCallback) {
maxRequestBodySize := os.Getenv("MAX_REQUEST_BODY_SIZE")
maxSize := conv.StrToInt64D(maxRequestBodySize, 1024*1024*200)
addr := getEnv("LISTEN_ADDR", ":8888")
@ -105,7 +107,8 @@ func startHttpServer() {
s.Use(middleware.OpenapiAuthMW())
s.Use(middleware.SessionAuthMW())
s.Use(middleware.I18nMW()) // must after SessionAuthMW
s.OnShutdown = append(s.OnShutdown, shutdown...)
router.GeneratedRegister(s)
s.Spin()
}

View File

@ -86,6 +86,14 @@ const (
UseSSL = "USE_SSL"
SSLCertFile = "SSL_CERT_FILE"
SSLKeyFile = "SSL_KEY_FILE"
TelemetryType = "TELEMETRY_TYPE" // clickhouse / cozeloop
TelemetryIndexRootOnly = "TELEMETRY_INDEX_ROOT_ONLY"
ClickhouseAddr = "CLICKHOUSE_ADDR"
ClickhouseDBName = "CLICKHOUSE_DB_NAME"
ClickhouseUserName = "CLICKHOUSE_USERNAME"
ClickhousePassword = "CLICKHOUSE_PASSWORD"
ClickhouseEmptySpanID = "CLICKHOUSE_EMPTY_SPAN_ID"
)
const (

View File

@ -505,6 +505,30 @@ services:
- coze-network
restart: 'no'
clickhouse:
image: bitnami/clickhouse:25
container_name: coze-clickhouse
environment:
- ALLOW_EMPTY_PASSWORD=no
- CLICKHOUSE_ADMIN_USER=${CLICKHOUSE_ADMIN_USER:-default}
- CLICKHOUSE_ADMIN_PASSWORD=${CLICKHOUSE_ADMIN_PASSWORD:-clickhouse123}
ports:
- '8123:8123'
- '8124:9000' # CLICKHOUSE_TCP_PORT
- '9004:9004'
- '9005:9005'
- '9009:9009'
volumes:
- ./data/bitnami/clickhouse:/bitnami/clickhouse:rw,Z
healthcheck:
test: wget --no-verbose --tries=1 --spider http://localhost:8123/ping || exit 1
interval: 5s
timeout: 10s
retries: 10
start_period: 10s
networks:
- coze-network
coze-server:
build:
context: ../

View File

@ -341,6 +341,30 @@ services:
networks:
- coze-network
clickhouse:
image: bitnami/clickhouse:25
container_name: coze-clickhouse
environment:
- ALLOW_EMPTY_PASSWORD=no
- CLICKHOUSE_ADMIN_USER=${CLICKHOUSE_ADMIN_USER:-default}
- CLICKHOUSE_ADMIN_PASSWORD=${CLICKHOUSE_ADMIN_PASSWORD:-clickhouse123}
ports:
- '8123:8123'
- '8124:9000' # CLICKHOUSE_TCP_PORT
- '9004:9004'
- '9005:9005'
- '9009:9009'
volumes:
- ./data/bitnami/clickhouse:/bitnami/clickhouse:rw,Z
healthcheck:
test: wget --no-verbose --tries=1 --spider http://localhost:8123/ping || exit 1
interval: 5s
timeout: 10s
retries: 10
start_period: 10s
networks:
- coze-network
coze-server:
# build:
# context: ../

View File

@ -0,0 +1,52 @@
CREATE DATABASE IF NOT EXISTS `default`;
CREATE TABLE IF NOT EXISTS spans_index (
span_id String Codec(ZSTD(1)), -- span id: [8]byte
trace_id String Codec(ZSTD(1)), -- trace id: [16]byte
parent_span_id String Codec(ZSTD(1)), -- parent span id: [8]byte
name String Codec(ZSTD(1)), -- name
kind Int8 Codec(ZSTD(1)), -- kind: https://github.com/open-telemetry/opentelemetry-proto/blob/30d237e1ff3ab7aa50e0922b5bebdd93505090af/opentelemetry/proto/trace/v1/trace.proto#L101-L129
status_code Int64 Codec(ZSTD(1)), -- 状态码
status_msg String Codec(ZSTD(1)), -- 状态信息
log_id String Codec(ZSTD(1)), -- log_id
space_id Int64 Codec(ZSTD(1)), -- space_id
type Int32 Codec(ZSTD(1)), -- span 节点类型,和 idl enum 对应
user_id Int64 CODEC(ZSTD(1)), -- user id
entity_id Int64 Codec(ZSTD(1)), -- 写入实体 id对应 agent / knowledge id
env String Codec(ZSTD(1)), -- 环境,开发 / 线上
version String Codec(ZSTD(1)), -- 版本
input String Codec(ZSTD(1)), -- 输入,需要展示和全文搜索过滤,单独提取出来
start_time_ms UInt64 CODEC(ZSTD(1)), -- 开始时间,单位毫秒
INDEX idx(trace_id) TYPE minmax granularity 8192
)
ENGINE = MergeTree()
ORDER BY (space_id, entity_id, start_time_ms)
PARTITION BY toDate(start_time_ms / 1000)
TTL toDate(start_time_ms / 1000) + INTERVAL 7 DAY
SETTINGS ttl_only_drop_parts = 1,
storage_policy = 'default';
CREATE TABLE IF NOT EXISTS spans_data (
span_id String Codec(ZSTD(1)), -- span id: [8]byte
trace_id String Codec(ZSTD(1)), -- trace id: [16]byte
parent_span_id String Codec(ZSTD(1)), -- parent span id: [8]byte
name String Codec(ZSTD(1)), -- name
kind Int8 Codec(ZSTD(1)), -- kind: https://github.com/open-telemetry/opentelemetry-proto/blob/30d237e1ff3ab7aa50e0922b5bebdd93505090af/opentelemetry/proto/trace/v1/trace.proto#L101-L129
status_code Int64 Codec(ZSTD(1)), -- 状态码
status_msg String Codec(ZSTD(1)), -- 状态信息
resource_attributes Map(String, String) CODEC(ZSTD(1)), -- 元信息
start_time_ms UInt64 CODEC(ZSTD(1)), -- 开始时间,单位毫秒
end_time_ms UInt64 CODEC(ZSTD(1)), -- 结束时间,单位毫秒
log_id String Codec(ZSTD(1)), -- log_id
attr_keys Array(LowCardinality(String)) Codec(ZSTD(1)), -- 其他 attr keys
attr_values Array(String) Codec(ZSTD(1)), -- 其他 attr values
INDEX idx_log_id log_id TYPE bloom_filter GRANULARITY 1024
)
ENGINE = MergeTree()
ORDER BY (trace_id, span_id)
PARTITION BY toDate(start_time_ms / 1000)
TTL toDate(start_time_ms / 1000) + INTERVAL 7 DAY
SETTINGS ttl_only_drop_parts = 1,
index_granularity = 2048,
storage_policy = 'default';