Compare commits
1 Commits
main
...
feat/telem
| Author | SHA1 | Date | |
|---|---|---|---|
| 9335447977 |
@ -35,7 +35,7 @@ import (
|
||||
|
||||
func TestClearConversationCtx(t *testing.T) {
|
||||
h := server.Default()
|
||||
err := application.Init(context.Background())
|
||||
_, err := application.Init(context.Background())
|
||||
|
||||
t.Logf("application init err: %v", err)
|
||||
h.POST("/api/conversation/create_section", ClearConversationCtx)
|
||||
@ -55,7 +55,7 @@ func TestClearConversationCtx(t *testing.T) {
|
||||
|
||||
func TestClearConversationHistory(t *testing.T) {
|
||||
h := server.Default()
|
||||
err := application.Init(context.Background())
|
||||
_, err := application.Init(context.Background())
|
||||
t.Logf("application init err: %v", err)
|
||||
h.POST("/api/conversation/clear_message", ClearConversationHistory)
|
||||
req := &conversation.ClearConversationHistoryRequest{
|
||||
|
||||
@ -35,7 +35,7 @@ import (
|
||||
|
||||
func TestGetMessageList(t *testing.T) {
|
||||
h := server.Default()
|
||||
err := application.Init(context.Background())
|
||||
_, err := application.Init(context.Background())
|
||||
|
||||
t.Logf("application init err: %v", err)
|
||||
|
||||
|
||||
@ -20,9 +20,11 @@ import (
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
"github.com/cloudwego/hertz/pkg/route"
|
||||
"github.com/coze-dev/coze-studio/backend/application/openauth"
|
||||
"github.com/coze-dev/coze-studio/backend/application/template"
|
||||
"github.com/coze-dev/coze-studio/backend/crossdomain/contract/crosssearch"
|
||||
"github.com/coze-dev/coze-studio/backend/pkg/logs"
|
||||
|
||||
"github.com/coze-dev/coze-studio/backend/application/app"
|
||||
"github.com/coze-dev/coze-studio/backend/application/base/appinfra"
|
||||
@ -102,27 +104,33 @@ type complexServices struct {
|
||||
conversationSVC *conversation.ConversationApplicationService
|
||||
}
|
||||
|
||||
func Init(ctx context.Context) (err error) {
|
||||
func Init(ctx context.Context) (shutdown []route.CtxCallback, err error) {
|
||||
infra, err := appinfra.Init(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
return nil, err
|
||||
}
|
||||
|
||||
shutdown = append(shutdown, func(ctx context.Context) {
|
||||
if e := infra.TracerProvider.Shutdown(ctx); e != nil {
|
||||
logs.CtxErrorf(ctx, "shut down tracer provider failed, trace might loss, err=%v", e)
|
||||
}
|
||||
})
|
||||
|
||||
eventbus := initEventBus(infra)
|
||||
|
||||
basicServices, err := initBasicServices(ctx, infra, eventbus)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Init - initBasicServices failed, err: %v", err)
|
||||
return nil, fmt.Errorf("Init - initBasicServices failed, err: %v", err)
|
||||
}
|
||||
|
||||
primaryServices, err := initPrimaryServices(ctx, basicServices)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Init - initPrimaryServices failed, err: %v", err)
|
||||
return nil, fmt.Errorf("Init - initPrimaryServices failed, err: %v", err)
|
||||
}
|
||||
|
||||
complexServices, err := initComplexServices(ctx, primaryServices)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Init - initVitalServices failed, err: %v", err)
|
||||
return nil, fmt.Errorf("Init - initVitalServices failed, err: %v", err)
|
||||
}
|
||||
|
||||
crossconnector.SetDefaultSVC(connectorImpl.InitDomainService(basicServices.connectorSVC.DomainSVC))
|
||||
@ -139,7 +147,7 @@ func Init(ctx context.Context) (err error) {
|
||||
crossdatacopy.SetDefaultSVC(dataCopyImpl.InitDomainService(basicServices.infra))
|
||||
crosssearch.SetDefaultSVC(searchImpl.InitDomainService(complexServices.searchSVC.DomainSVC))
|
||||
|
||||
return nil
|
||||
return shutdown, nil
|
||||
}
|
||||
|
||||
func initEventBus(infra *appinfra.AppDependencies) *eventbusImpl {
|
||||
|
||||
@ -22,7 +22,10 @@ import (
|
||||
"os"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/ClickHouse/clickhouse-go/v2"
|
||||
"github.com/coze-dev/coze-studio/backend/infra/contract/telemetry"
|
||||
"gorm.io/gorm"
|
||||
|
||||
"github.com/coze-dev/coze-studio/backend/infra/contract/coderunner"
|
||||
@ -37,6 +40,7 @@ import (
|
||||
"github.com/coze-dev/coze-studio/backend/infra/impl/imagex/veimagex"
|
||||
"github.com/coze-dev/coze-studio/backend/infra/impl/mysql"
|
||||
"github.com/coze-dev/coze-studio/backend/infra/impl/storage"
|
||||
ck "github.com/coze-dev/coze-studio/backend/infra/impl/telemetry/clickhouse"
|
||||
"github.com/coze-dev/coze-studio/backend/types/consts"
|
||||
)
|
||||
|
||||
@ -51,6 +55,8 @@ type AppDependencies struct {
|
||||
AppEventProducer eventbus.Producer
|
||||
ModelMgr modelmgr.Manager
|
||||
CodeRunner coderunner.Runner
|
||||
TracerProvider telemetry.TracerProvider
|
||||
QueryClient telemetry.QueryClient
|
||||
}
|
||||
|
||||
func Init(ctx context.Context) (*AppDependencies, error) {
|
||||
@ -101,6 +107,11 @@ func Init(ctx context.Context) (*AppDependencies, error) {
|
||||
|
||||
deps.CodeRunner = initCodeRunner()
|
||||
|
||||
deps.TracerProvider, deps.QueryClient, err = initTelemetry()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return deps, nil
|
||||
}
|
||||
|
||||
@ -182,3 +193,72 @@ func initCodeRunner() coderunner.Runner {
|
||||
return direct.NewRunner()
|
||||
}
|
||||
}
|
||||
|
||||
func initTelemetry() (telemetry.TracerProvider, telemetry.QueryClient, error) {
|
||||
typ := os.Getenv(consts.TelemetryType)
|
||||
switch typ {
|
||||
case "clickhouse":
|
||||
opts := &clickhouse.Options{
|
||||
Addr: strings.Split(os.Getenv(consts.ClickhouseAddr), ";"),
|
||||
Auth: clickhouse.Auth{
|
||||
Database: getEnvOrDefault(os.Getenv(consts.ClickhouseDBName), "default"),
|
||||
Username: getEnvOrDefault(os.Getenv(consts.ClickhouseUserName), "default"),
|
||||
Password: getEnvOrDefault(os.Getenv(consts.ClickhousePassword), "clickhouse123"),
|
||||
},
|
||||
// Debug: true,
|
||||
// Debugf: func(format string, v ...any) {
|
||||
// fmt.Printf(format+"\n", v...)
|
||||
// },
|
||||
Settings: clickhouse.Settings{
|
||||
"max_execution_time": 60,
|
||||
},
|
||||
Compression: &clickhouse.Compression{
|
||||
Method: clickhouse.CompressionZSTD,
|
||||
Level: 1,
|
||||
},
|
||||
DialTimeout: time.Second * 30,
|
||||
MaxOpenConns: 5,
|
||||
MaxIdleConns: 5,
|
||||
ConnMaxLifetime: time.Duration(10) * time.Minute,
|
||||
ConnOpenStrategy: clickhouse.ConnOpenInOrder,
|
||||
BlockBufferSize: 10,
|
||||
MaxCompressionBuffer: 10240,
|
||||
}
|
||||
|
||||
indexRootOnly := os.Getenv(consts.TelemetryIndexRootOnly) == "true"
|
||||
tracerConfig := &ck.TracerConfig{
|
||||
ClickhouseOptions: opts,
|
||||
TracerProviderOptions: nil,
|
||||
IndexRootOnly: indexRootOnly,
|
||||
}
|
||||
tp, err := ck.NewTracerProvider(tracerConfig)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
var emptySpanID *string
|
||||
if v := os.Getenv(consts.ClickhouseEmptySpanID); v != "" {
|
||||
emptySpanID = &v
|
||||
}
|
||||
queryClientConfig := &ck.QueryClientConfig{
|
||||
ClickhouseOptions: opts,
|
||||
EmptySpanID: emptySpanID,
|
||||
}
|
||||
qc, err := ck.NewQueryClient(queryClientConfig)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
return tp, qc, nil
|
||||
|
||||
default:
|
||||
// TODO: not return errors to achieve compatible upgrades ?
|
||||
return nil, nil, fmt.Errorf("unknown telemetry type: %s", typ)
|
||||
}
|
||||
}
|
||||
|
||||
func getEnvOrDefault(key, defaultValue string) string {
|
||||
if v := os.Getenv(key); v != "" {
|
||||
return v
|
||||
}
|
||||
return defaultValue
|
||||
}
|
||||
|
||||
@ -7,6 +7,7 @@ replace github.com/apache/thrift => github.com/apache/thrift v0.13.0
|
||||
replace google.golang.org/grpc => google.golang.org/grpc v1.68.0
|
||||
|
||||
require (
|
||||
github.com/ClickHouse/clickhouse-go/v2 v2.37.2
|
||||
github.com/IBM/sarama v1.45.1
|
||||
github.com/apache/rocketmq-client-go/v2 v2.1.3-0.20250427084711-67ec50b93040
|
||||
github.com/apache/thrift v0.21.0
|
||||
@ -75,13 +76,16 @@ require (
|
||||
golang.org/x/image v0.22.0
|
||||
golang.org/x/oauth2 v0.23.0
|
||||
google.golang.org/genai v1.13.0
|
||||
gorm.io/driver/clickhouse v0.7.0
|
||||
)
|
||||
|
||||
require (
|
||||
cloud.google.com/go v0.116.0 // indirect
|
||||
cloud.google.com/go/auth v0.9.3 // indirect
|
||||
cloud.google.com/go/compute/metadata v0.5.0 // indirect
|
||||
github.com/ClickHouse/ch-go v0.66.1 // indirect
|
||||
github.com/alicebob/gopher-json v0.0.0-20230218143504-906a9b012302 // indirect
|
||||
github.com/andybalholm/brotli v1.1.1 // indirect
|
||||
github.com/aws/aws-sdk-go-v2/internal/v4a v1.3.37 // indirect
|
||||
github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.7.5 // indirect
|
||||
github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.18.18 // indirect
|
||||
@ -89,14 +93,18 @@ require (
|
||||
github.com/evanphx/json-patch v4.12.0+incompatible // indirect
|
||||
github.com/extrame/ole2 v0.0.0-20160812065207-d69429661ad7 // indirect
|
||||
github.com/frankban/quicktest v1.14.6 // indirect
|
||||
github.com/go-faster/city v1.0.1 // indirect
|
||||
github.com/go-faster/errors v0.7.1 // indirect
|
||||
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
|
||||
github.com/google/btree v1.1.2 // indirect
|
||||
github.com/google/s2a-go v0.1.8 // indirect
|
||||
github.com/googleapis/enterprise-certificate-proxy v0.3.4 // indirect
|
||||
github.com/jackc/puddle/v2 v2.2.2 // indirect
|
||||
github.com/hashicorp/go-version v1.7.0 // indirect
|
||||
github.com/meguminnnnnnnnn/go-openai v0.0.0-20250620092828-0d508a1dcdde // indirect
|
||||
github.com/paulmach/orb v0.11.1 // indirect
|
||||
github.com/peterbourgon/diskv/v3 v3.0.1 // indirect
|
||||
github.com/rogpeppe/fastuuid v1.2.0 // indirect
|
||||
github.com/segmentio/asm v1.2.0 // indirect
|
||||
github.com/shabbyrobe/xmlwriter v0.0.0-20200208144257-9fca06d00ffa // indirect
|
||||
github.com/yuin/gopher-lua v1.1.1 // indirect
|
||||
go.opencensus.io v0.24.0 // indirect
|
||||
@ -264,12 +272,12 @@ require (
|
||||
go.etcd.io/etcd/server/v3 v3.5.5 // indirect
|
||||
go.opentelemetry.io/auto/sdk v1.1.0 // indirect
|
||||
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.54.0 // indirect
|
||||
go.opentelemetry.io/otel v1.36.0 // indirect
|
||||
go.opentelemetry.io/otel v1.36.0
|
||||
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.20.0 // indirect
|
||||
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.20.0 // indirect
|
||||
go.opentelemetry.io/otel/metric v1.36.0 // indirect
|
||||
go.opentelemetry.io/otel/sdk v1.36.0 // indirect
|
||||
go.opentelemetry.io/otel/trace v1.36.0 // indirect
|
||||
go.opentelemetry.io/otel/sdk v1.36.0
|
||||
go.opentelemetry.io/otel/trace v1.36.0
|
||||
go.opentelemetry.io/proto/otlp v1.0.0 // indirect
|
||||
go.uber.org/atomic v1.11.0 // indirect
|
||||
go.uber.org/automaxprocs v1.5.3 // indirect
|
||||
|
||||
@ -766,6 +766,10 @@ github.com/AndreasBriese/bbloom v0.0.0-20190306092124-e2d15f34fcf9/go.mod h1:bOv
|
||||
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
|
||||
github.com/BurntSushi/toml v1.1.0/go.mod h1:CxXYINrC8qIiEnFrOxCa7Jy5BFHlXnUU2pbicEuybxQ=
|
||||
github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo=
|
||||
github.com/ClickHouse/ch-go v0.66.1 h1:LQHFslfVYZsISOY0dnOYOXGkOUvpv376CCm8g7W74A4=
|
||||
github.com/ClickHouse/ch-go v0.66.1/go.mod h1:NEYcg3aOFv2EmTJfo4m2WF7sHB/YFbLUuIWv9iq76xY=
|
||||
github.com/ClickHouse/clickhouse-go/v2 v2.37.2 h1:wRLNKoynvHQEN4znnVHNLaYnrqVc9sGJmGYg+GGCfto=
|
||||
github.com/ClickHouse/clickhouse-go/v2 v2.37.2/go.mod h1:pH2zrBGp5Y438DMwAxXMm1neSXPPjSI7tD4MURVULw8=
|
||||
github.com/CloudyKit/fastprinter v0.0.0-20200109182630-33d98a066a53/go.mod h1:+3IMCy2vIlbG1XG/0ggNQv0SvxCAIpPM5b1nCz56Xno=
|
||||
github.com/CloudyKit/jet/v3 v3.0.0/go.mod h1:HKQPgSJmdK8hdoAbKUUWajkHyHo4RaU5rMdUywE7VMo=
|
||||
github.com/DATA-DOG/go-sqlmock v1.5.2 h1:OcvFkGmslmlZibjAjaHm3L//6LiuBgolP7OputlJIzU=
|
||||
@ -802,6 +806,8 @@ github.com/alicebob/gopher-json v0.0.0-20230218143504-906a9b012302/go.mod h1:SGn
|
||||
github.com/alicebob/miniredis/v2 v2.34.0 h1:mBFWMaJSNL9RwdGRyEDoAAv8OQc5UlEhLDQggTglU/0=
|
||||
github.com/alicebob/miniredis/v2 v2.34.0/go.mod h1:kWShP4b58T1CW0Y5dViCd5ztzrDqRWqM3nksiyXk5s8=
|
||||
github.com/andybalholm/brotli v1.0.4/go.mod h1:fO7iG3H7G2nSZ7m0zPUDn85XEX2GTukHGRSepvi9Eig=
|
||||
github.com/andybalholm/brotli v1.1.1 h1:PR2pgnyFznKEugtsUo0xLdDop5SKXd5Qf5ysW+7XdTA=
|
||||
github.com/andybalholm/brotli v1.1.1/go.mod h1:05ib4cKhjx3OQYUY22hTVd34Bc8upXjOLL2rKwwZBoA=
|
||||
github.com/anthropics/anthropic-sdk-go v1.4.0 h1:fU1jKxYbQdQDiEXCxeW5XZRIOwKevn/PMg8Ay1nnUx0=
|
||||
github.com/anthropics/anthropic-sdk-go v1.4.0/go.mod h1:AapDW22irxK2PSumZiQXYUFvsdQgkwIWlpESweWZI/c=
|
||||
github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY=
|
||||
@ -1090,6 +1096,10 @@ github.com/go-check/check v0.0.0-20180628173108-788fd7840127/go.mod h1:9ES+weclK
|
||||
github.com/go-errors/errors v1.0.1/go.mod h1:f4zRHt4oKfwPJE5k8C9vpYG+aDHdBFUsgrm6/TyX73Q=
|
||||
github.com/go-errors/errors v1.4.2 h1:J6MZopCL4uSllY1OfXM374weqZFFItUbrImctkmUxIA=
|
||||
github.com/go-errors/errors v1.4.2/go.mod h1:sIVyrIiJhuEF+Pj9Ebtd6P/rEYROXFi3BopGUQ5a5Og=
|
||||
github.com/go-faster/city v1.0.1 h1:4WAxSZ3V2Ws4QRDrscLEDcibJY8uf41H6AhXDrNDcGw=
|
||||
github.com/go-faster/city v1.0.1/go.mod h1:jKcUJId49qdW3L1qKHH/3wPeUstCVpVSXTM6vO3VcTw=
|
||||
github.com/go-faster/errors v0.7.1 h1:MkJTnDoEdi9pDabt1dpWf7AA8/BaSYZqibYyhZ20AYg=
|
||||
github.com/go-faster/errors v0.7.1/go.mod h1:5ySTjWFiphBs07IKuiL69nxdfd5+fzh1u7FPGZP2quo=
|
||||
github.com/go-fonts/dejavu v0.1.0/go.mod h1:4Wt4I4OU2Nq9asgDCteaAaWZOV24E+0/Pwo0gppep4g=
|
||||
github.com/go-fonts/latin-modern v0.2.0/go.mod h1:rQVLdDMK+mK1xscDwsqM5J8U2jrRa3T0ecnM9pNujks=
|
||||
github.com/go-fonts/liberation v0.1.1/go.mod h1:K6qoJYypsmfVjWg8KOVDQhLc8UDgIK2HYqyqAO9z7GY=
|
||||
@ -1354,6 +1364,8 @@ github.com/hashicorp/go-uuid v1.0.2/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/b
|
||||
github.com/hashicorp/go-uuid v1.0.3 h1:2gKiV6YVmrJ1i2CKKa9obLvRieoRGviZFL26PcT/Co8=
|
||||
github.com/hashicorp/go-uuid v1.0.3/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro=
|
||||
github.com/hashicorp/go-version v1.2.0/go.mod h1:fltr4n8CU8Ke44wwGCBoEymUuxUHl09ZGVZPK5anwXA=
|
||||
github.com/hashicorp/go-version v1.7.0 h1:5tqGy27NaOTB8yJKUZELlFAS/LTKJkrmONwQKeRZfjY=
|
||||
github.com/hashicorp/go-version v1.7.0/go.mod h1:fltr4n8CU8Ke44wwGCBoEymUuxUHl09ZGVZPK5anwXA=
|
||||
github.com/hashicorp/go.net v0.0.1/go.mod h1:hjKkEWcCURg++eb33jQU7oqQcI9XDCnUzHA0oac0k90=
|
||||
github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
|
||||
github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
|
||||
@ -1581,6 +1593,7 @@ github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9G
|
||||
github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk=
|
||||
github.com/mohae/deepcopy v0.0.0-20170929034955-c48cc78d4826 h1:RWengNIwukTxcDr9M+97sNutRR1RKhG96O6jWumTTnw=
|
||||
github.com/mohae/deepcopy v0.0.0-20170929034955-c48cc78d4826/go.mod h1:TaXosZuwdSHYgviHp1DAtfrULt5eUgsSMsZf+YrPgl8=
|
||||
github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe/go.mod h1:wL8QJuTMNUDYhXwkmfOly8iTdp5TEcJFWZD2D7SIkUc=
|
||||
github.com/moul/http2curl v1.0.0/go.mod h1:8UbvGypXm98wA/IqH45anm5Y2Z6ep6O31QGOAZ3H0fQ=
|
||||
github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
|
||||
github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
|
||||
@ -1639,6 +1652,9 @@ github.com/pascaldekloe/goe v0.0.0-20180627143212-57f6aae5913c/go.mod h1:lzWF7FI
|
||||
github.com/pascaldekloe/goe v0.1.0/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc=
|
||||
github.com/patrickmn/go-cache v2.1.0+incompatible h1:HRMgzkcYKYpi3C8ajMPV8OFXaaRUnok+kx1WdO15EQc=
|
||||
github.com/patrickmn/go-cache v2.1.0+incompatible/go.mod h1:3Qf8kWWT7OJRJbdiICTKqZju1ZixQ/KpMGzzAfe6+WQ=
|
||||
github.com/paulmach/orb v0.11.1 h1:3koVegMC4X/WeiXYz9iswopaTwMem53NzTJuTF20JzU=
|
||||
github.com/paulmach/orb v0.11.1/go.mod h1:5mULz1xQfs3bmQm63QEJA6lNGujuRafwA5S/EnuLaLU=
|
||||
github.com/paulmach/protoscan v0.2.1/go.mod h1:SpcSwydNLrxUGSDvXvO0P7g7AuhJ7lcKfDlhJCDw2gY=
|
||||
github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic=
|
||||
github.com/pelletier/go-toml/v2 v2.2.3 h1:YmeHyLY8mFWbdkNWwpr+qIL2bEqT0o95WSdkNHvL12M=
|
||||
github.com/pelletier/go-toml/v2 v2.2.3/go.mod h1:MfCQTFTvCcUyyvvwm1+G6H/jORL20Xlb6rzQu9GuUkc=
|
||||
@ -1759,6 +1775,8 @@ github.com/samber/lo v1.27.0 h1:GOyDWxsblvqYobqsmUuMddPa2/mMzkKyojlXol4+LaQ=
|
||||
github.com/samber/lo v1.27.0/go.mod h1:it33p9UtPMS7z72fP4gw/EIfQB2eI8ke7GR2wc6+Rhg=
|
||||
github.com/schollz/closestmatch v2.1.0+incompatible/go.mod h1:RtP1ddjLong6gTkbtmuhtR2uUrrJOpYzYRvbcPAid+g=
|
||||
github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529/go.mod h1:DxrIzT+xaE7yg65j358z/aeFdxmN0P9QXhEzd20vsDc=
|
||||
github.com/segmentio/asm v1.2.0 h1:9BQrFxC+YOHJlTlHGkTrFWf59nbL3XnCoFLTwDCI7ys=
|
||||
github.com/segmentio/asm v1.2.0/go.mod h1:BqMnlJP91P8d+4ibuonYZw9mfnzI9HfxselHZr5aAcs=
|
||||
github.com/sergi/go-diff v1.0.0/go.mod h1:0CfEIISq7TuYL3j771MWULgwwjU+GofnZX9QAmXWZgo=
|
||||
github.com/shabbyrobe/xmlwriter v0.0.0-20200208144257-9fca06d00ffa h1:2cO3RojjYl3hVTbEvJVqrMaFmORhL6O06qdW42toftk=
|
||||
github.com/shabbyrobe/xmlwriter v0.0.0-20200208144257-9fca06d00ffa/go.mod h1:Yjr3bdWaVWyME1kha7X0jsz3k2DgXNa1Pj3XGyUAbx8=
|
||||
@ -1842,6 +1860,7 @@ github.com/tidwall/gjson v1.18.0 h1:FIDeeyB800efLX89e5a8Y0BNH+LOngJyGrIWxG2FKQY=
|
||||
github.com/tidwall/gjson v1.18.0/go.mod h1:/wbyibRr2FHMks5tjHJ5F8dMZh3AcwJEMf5vlfC0lxk=
|
||||
github.com/tidwall/match v1.1.1 h1:+Ho715JplO36QYgwN9PGYNhgZvoUSc9X2c80KVTi+GA=
|
||||
github.com/tidwall/match v1.1.1/go.mod h1:eRSPERbgtNPcGhD8UCthc6PmLEQXEWd3PRB5JTxsfmM=
|
||||
github.com/tidwall/pretty v1.0.0/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk=
|
||||
github.com/tidwall/pretty v1.2.0/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU=
|
||||
github.com/tidwall/pretty v1.2.1 h1:qjsOFOWWQl+N3RsoF5/ssm1pHmJJwhjlSbZ51I6wMl4=
|
||||
github.com/tidwall/pretty v1.2.1/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU=
|
||||
@ -1890,7 +1909,9 @@ github.com/x448/float16 v0.8.4 h1:qLwI1I70+NjRFUR3zs1JPUCgaCXSh3SW62uAKT1mSBM=
|
||||
github.com/x448/float16 v0.8.4/go.mod h1:14CWIYCyZA/cWjXOioeEpHeN/83MdbZDRQHoFcYsOfg=
|
||||
github.com/xdg-go/pbkdf2 v1.0.0/go.mod h1:jrpuAogTd400dnrH08LKmI/xc1MbPOebTwRqcT5RDeI=
|
||||
github.com/xdg-go/scram v1.0.2/go.mod h1:1WAq6h33pAW+iRreB34OORO2Nf7qel3VV3fjBj+hCSs=
|
||||
github.com/xdg-go/scram v1.1.1/go.mod h1:RaEWvsqvNKKvBPvcKeFjrG2cJqOkHTiyTpzz23ni57g=
|
||||
github.com/xdg-go/stringprep v1.0.2/go.mod h1:8F9zXuvzgwmyT5DUm4GUfZGDdT3W+LCvS6+da4O5kxM=
|
||||
github.com/xdg-go/stringprep v1.0.3/go.mod h1:W3f5j4i+9rC0kuIEJL0ky1VpHXQU3ocBgklLGvcBnW8=
|
||||
github.com/xeipuuv/gojsonpointer v0.0.0-20180127040702-4e3ac2762d5f/go.mod h1:N2zxlSyiKSe5eX1tZViRH5QA0qijqEDrYZiPEAiq3wU=
|
||||
github.com/xeipuuv/gojsonreference v0.0.0-20180127040603-bd5ef7bd5415/go.mod h1:GwrjFmJcFw6At/Gs6z4yjiIwzuJ1/+UwLxMQDVQXShQ=
|
||||
github.com/xeipuuv/gojsonschema v1.2.0/go.mod h1:anYRn/JVcOK2ZgGU+IjEV4nwlhoK5sQluxsYJ78Id3Y=
|
||||
@ -1903,9 +1924,12 @@ github.com/xuri/excelize/v2 v2.9.0 h1:1tgOaEq92IOEumR1/JfYS/eR0KHOCsRv/rYXXh6YJQ
|
||||
github.com/xuri/excelize/v2 v2.9.0/go.mod h1:uqey4QBZ9gdMeWApPLdhm9x+9o2lq4iVmjiLfBS5hdE=
|
||||
github.com/xuri/nfp v0.0.0-20240318013403-ab9948c2c4a7 h1:hPVCafDV85blFTabnqKgNhDCkJX25eik94Si9cTER4A=
|
||||
github.com/xuri/nfp v0.0.0-20240318013403-ab9948c2c4a7/go.mod h1:WwHg+CVyzlv/TX9xqBFXEZAuxOPxn2k1GNHwG41IIUQ=
|
||||
github.com/xyproto/randomstring v1.0.5 h1:YtlWPoRdgMu3NZtP45drfy1GKoojuR7hmRcnhZqKjWU=
|
||||
github.com/xyproto/randomstring v1.0.5/go.mod h1:rgmS5DeNXLivK7YprL0pY+lTuhNQW3iGxZ18UQApw/E=
|
||||
github.com/yalp/jsonpath v0.0.0-20180802001716-5cc68e5049a0/go.mod h1:/LWChgwKmvncFJFHJ7Gvn9wZArjbV5/FppcK2fKk/tI=
|
||||
github.com/yargevad/filepathx v1.0.0 h1:SYcT+N3tYGi+NvazubCNlvgIPbzAk7i7y2dwg3I5FYc=
|
||||
github.com/yargevad/filepathx v1.0.0/go.mod h1:BprfX/gpYNJHJfc35GjRRpVcwWXS89gGulUIU5tK3tA=
|
||||
github.com/youmark/pkcs8 v0.0.0-20181117223130-1be2e3e5546d/go.mod h1:rHwXgn7JulP+udvsHwJoVG1YGAP6VLg4y9I5dyZdqmA=
|
||||
github.com/yudai/gojsondiff v1.0.0/go.mod h1:AY32+k2cwILAkW1fbgxQ5mUmMiZFgLIV+FBNExI05xg=
|
||||
github.com/yudai/golcs v0.0.0-20170316035057-ecda9a501e82/go.mod h1:lgjkn3NuSvDfVJdfcVVdX+jpBxNmX4rDAzaS45IcYoM=
|
||||
github.com/yudai/pp v2.0.1+incompatible/go.mod h1:PuxR/8QJ7cyCkFp/aUDS+JY727OFEZkTdatxwunjIkc=
|
||||
@ -1945,6 +1969,7 @@ go.etcd.io/etcd/raft/v3 v3.5.5 h1:Ibz6XyZ60OYyRopu73lLM/P+qco3YtlZMOhnXNS051I=
|
||||
go.etcd.io/etcd/raft/v3 v3.5.5/go.mod h1:76TA48q03g1y1VpTue92jZLr9lIHKUNcYdZOOGyx8rI=
|
||||
go.etcd.io/etcd/server/v3 v3.5.5 h1:jNjYm/9s+f9A9r6+SC4RvNaz6AqixpOvhrFdT0PvIj0=
|
||||
go.etcd.io/etcd/server/v3 v3.5.5/go.mod h1:rZ95vDw/jrvsbj9XpTqPrTAB9/kzchVdhRirySPkUBc=
|
||||
go.mongodb.org/mongo-driver v1.11.4/go.mod h1:PTSz5yu21bkT/wXpkS7WR5f0ddqw5quethTUn9WM+2g=
|
||||
go.opencensus.io v0.21.0/go.mod h1:mSImk1erAIZhrmZN+AvHh14ztQfjbGwt4TtuofqLduU=
|
||||
go.opencensus.io v0.22.0/go.mod h1:+kGneAE2xo2IficOXnaByMWTGM9T73dGwxeWcUqIpI8=
|
||||
go.opencensus.io v0.22.2/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw=
|
||||
@ -2036,6 +2061,7 @@ golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5y
|
||||
golang.org/x/crypto v0.0.0-20211108221036-ceb1ce70b4fa/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
|
||||
golang.org/x/crypto v0.0.0-20220314234659-1baeb1ce4c0b/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
|
||||
golang.org/x/crypto v0.0.0-20220411220226-7b82a4e95df4/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
|
||||
golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
|
||||
golang.org/x/crypto v0.0.0-20220722155217-630584e8d5aa/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
|
||||
golang.org/x/crypto v0.1.0/go.mod h1:RecgLatLF4+eUMCP1PoPZQb+cVrJcOPbHkTkbkB9sbw=
|
||||
golang.org/x/crypto v0.6.0/go.mod h1:OFC/31mSvZgRz0V1QTNCzfAI1aIRzbiufJtkMIlEp58=
|
||||
@ -2861,6 +2887,8 @@ gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
|
||||
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
||||
gorm.io/datatypes v1.1.1-0.20230130040222-c43177d3cf8c h1:jWdr7cHgl8c/ua5vYbR2WhSp+NQmzhsj0xoY3foTzW8=
|
||||
gorm.io/datatypes v1.1.1-0.20230130040222-c43177d3cf8c/go.mod h1:SH2K9R+2RMjuX1CkCONrPwoe9JzVv2hkQvEu4bXGojE=
|
||||
gorm.io/driver/clickhouse v0.7.0 h1:BCrqvgONayvZRgtuA6hdya+eAW5P2QVagV3OlEp1vtA=
|
||||
gorm.io/driver/clickhouse v0.7.0/go.mod h1:TmNo0wcVTsD4BBObiRnCahUgHJHjBIwuRejHwYt3JRs=
|
||||
gorm.io/driver/mysql v1.5.7 h1:MndhOPYOfEp2rHKgkZIhJ16eVUIRf2HmzgoPmh7FCWo=
|
||||
gorm.io/driver/mysql v1.5.7/go.mod h1:sEtPWMiqiN1N1cMXoXmBbd8C6/l+TESwriotuRRpkDM=
|
||||
gorm.io/driver/postgres v1.5.11 h1:ubBVAfbKEUld/twyKZ0IYn9rSQh448EdelLYk9Mv314=
|
||||
|
||||
98
backend/infra/contract/telemetry/attributes.go
Normal file
98
backend/infra/contract/telemetry/attributes.go
Normal file
@ -0,0 +1,98 @@
|
||||
package telemetry
|
||||
|
||||
import "go.opentelemetry.io/otel/attribute"
|
||||
|
||||
const (
|
||||
AttributeLogID = "_log_id" // string
|
||||
AttributeSpaceID = "_space_id" // int64
|
||||
AttributeType = "_type" // int
|
||||
AttributeUserID = "_user_id" // string
|
||||
AttributeEntityID = "_entity_id" // int64
|
||||
AttributeEnvironment = "_env" // string
|
||||
AttributeVersion = "_version" // string
|
||||
|
||||
AttributeInput = "_input" // string
|
||||
AttributeOutput = "_output" // string
|
||||
AttributeInputTokens = "_input_tokens" // int64
|
||||
AttributeOutputToken = "_output_token" // int64
|
||||
|
||||
AttributeModel = "_model" // string of chatmodel.Config
|
||||
AttributeTemperature = "_temperature" // float64
|
||||
AttributeMessageID = "_message_id" // string
|
||||
AttributeTimeToFirstToken = "_ttft" // int64, ms
|
||||
AttributePrompt = "_prompt" // string
|
||||
AttributeToolName = "_tool_name" // string
|
||||
AttributeExecuteID = "_execute_id" // string
|
||||
)
|
||||
|
||||
func NewSpanAttrLogID(logID string) attribute.KeyValue {
|
||||
return attribute.String(AttributeLogID, logID)
|
||||
}
|
||||
|
||||
func NewSpanAttrSpaceID(spaceID int64) attribute.KeyValue {
|
||||
return attribute.Int64(AttributeSpaceID, spaceID)
|
||||
}
|
||||
|
||||
func NewSpanAttrType(typ int64) attribute.KeyValue {
|
||||
return attribute.Int64(AttributeType, typ)
|
||||
}
|
||||
|
||||
func NewSpanAttrUserID(userID int64) attribute.KeyValue {
|
||||
return attribute.Int64(AttributeUserID, userID)
|
||||
}
|
||||
|
||||
func NewSpanAttrEntityID(entityID int64) attribute.KeyValue {
|
||||
return attribute.Int64(AttributeEntityID, entityID)
|
||||
}
|
||||
|
||||
func NewSpanAttrEnvironment(env string) attribute.KeyValue {
|
||||
return attribute.String(AttributeEnvironment, env)
|
||||
}
|
||||
|
||||
func NewSpanAttrVersion(version string) attribute.KeyValue {
|
||||
return attribute.String(AttributeVersion, version)
|
||||
}
|
||||
|
||||
func NewSpanAttrInput(input string) attribute.KeyValue {
|
||||
return attribute.String(AttributeInput, input)
|
||||
}
|
||||
|
||||
func NewSpanAttrInputTokens(inputTokens int64) attribute.KeyValue {
|
||||
return attribute.Int64(AttributeInputTokens, inputTokens)
|
||||
}
|
||||
|
||||
func NewSpanAttrOutput(output string) attribute.KeyValue {
|
||||
return attribute.String(AttributeOutput, output)
|
||||
}
|
||||
|
||||
func NewSpanAttrOutputTokens(outputTokens int64) attribute.KeyValue {
|
||||
return attribute.Int64(AttributeOutputToken, outputTokens)
|
||||
}
|
||||
|
||||
func NewSpanAttrModel(model string) attribute.KeyValue {
|
||||
return attribute.String(AttributeModel, model)
|
||||
}
|
||||
|
||||
func NewSpanAttrTemperature(temperature float64) attribute.KeyValue {
|
||||
return attribute.Float64(AttributeTemperature, temperature)
|
||||
}
|
||||
|
||||
func NewSpanAttrMessageID(messageID string) attribute.KeyValue {
|
||||
return attribute.String(AttributeMessageID, messageID)
|
||||
}
|
||||
|
||||
func NewSpanAttrTimeToFirstToken(timeToFirstToken int64) attribute.KeyValue {
|
||||
return attribute.Int64(AttributeTimeToFirstToken, timeToFirstToken)
|
||||
}
|
||||
|
||||
func NewSpanAttrPrompt(prompt string) attribute.KeyValue {
|
||||
return attribute.String(AttributePrompt, prompt)
|
||||
}
|
||||
|
||||
func NewSpanAttrToolName(toolName string) attribute.KeyValue {
|
||||
return attribute.String(AttributeToolName, toolName)
|
||||
}
|
||||
|
||||
func NewSpanAttrExecuteID(executeID string) attribute.KeyValue {
|
||||
return attribute.String(AttributeExecuteID, executeID)
|
||||
}
|
||||
54
backend/infra/contract/telemetry/span_type.go
Normal file
54
backend/infra/contract/telemetry/span_type.go
Normal file
@ -0,0 +1,54 @@
|
||||
package telemetry
|
||||
|
||||
type SpanType int64
|
||||
|
||||
const (
|
||||
Unknown SpanType = 1
|
||||
UserInput SpanType = 2
|
||||
ThirdParty SpanType = 3
|
||||
ScheduledTasks SpanType = 4
|
||||
OpenDialog SpanType = 5
|
||||
InvokeAgent SpanType = 6
|
||||
RestartAgent SpanType = 7
|
||||
SwitchAgent SpanType = 8
|
||||
LLMCall SpanType = 9
|
||||
LLMBatchCall SpanType = 10
|
||||
Workflow SpanType = 11
|
||||
WorkflowStart SpanType = 12
|
||||
WorkflowEnd SpanType = 13
|
||||
PluginTool SpanType = 14
|
||||
PluginToolBatch SpanType = 15
|
||||
Knowledge SpanType = 16
|
||||
Code SpanType = 17
|
||||
CodeBatch SpanType = 18
|
||||
Condition SpanType = 19
|
||||
Chain SpanType = 20
|
||||
Card SpanType = 21
|
||||
WorkflowMessage SpanType = 22
|
||||
WorkflowLLMCall SpanType = 23
|
||||
WorkflowLLMBatchCall SpanType = 24
|
||||
WorkflowCode SpanType = 25
|
||||
WorkflowCodeBatch SpanType = 26
|
||||
WorkflowCondition SpanType = 27
|
||||
WorkflowPluginTool SpanType = 28
|
||||
WorkflowPluginToolBatch SpanType = 29
|
||||
WorkflowKnowledge SpanType = 30
|
||||
WorkflowVariable SpanType = 31
|
||||
WorkflowDatabase SpanType = 32
|
||||
Variable SpanType = 33
|
||||
Database SpanType = 34
|
||||
LongTermMemory SpanType = 35
|
||||
Hook SpanType = 36
|
||||
BWStart SpanType = 37
|
||||
BWEnd SpanType = 38
|
||||
BWBatch SpanType = 39
|
||||
BWLoop SpanType = 40
|
||||
BWCondition SpanType = 41
|
||||
BWLLM SpanType = 42
|
||||
BWParallel SpanType = 43
|
||||
BWScript SpanType = 44
|
||||
BWVariable SpanType = 45
|
||||
BWCallFlow SpanType = 46
|
||||
BWConnector SpanType = 47
|
||||
UserInputV2 SpanType = 48
|
||||
)
|
||||
43
backend/infra/contract/telemetry/tracer.go
Normal file
43
backend/infra/contract/telemetry/tracer.go
Normal file
@ -0,0 +1,43 @@
|
||||
package telemetry
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"go.opentelemetry.io/otel/codes"
|
||||
tracesdk "go.opentelemetry.io/otel/sdk/trace"
|
||||
"go.opentelemetry.io/otel/trace"
|
||||
)
|
||||
|
||||
type TracerProvider interface {
|
||||
trace.TracerProvider
|
||||
|
||||
Shutdown(ctx context.Context) error
|
||||
}
|
||||
|
||||
type QueryClient interface {
|
||||
ListSpan(ctx context.Context, request *ListTracesRequest) (
|
||||
spans []tracesdk.ReadOnlySpan, nextCursor *string, hasMore bool, err error)
|
||||
|
||||
GetTrace(ctx context.Context, request *GetTraceRequest) ([]tracesdk.ReadOnlySpan, error)
|
||||
}
|
||||
|
||||
type ListTracesRequest struct {
|
||||
RootOnly bool
|
||||
SpaceID int64
|
||||
EntityID int64
|
||||
Status codes.Code
|
||||
|
||||
StartAt time.Time
|
||||
EndAt time.Time
|
||||
|
||||
Limit int
|
||||
Cursor *string
|
||||
}
|
||||
|
||||
type GetTraceRequest struct {
|
||||
SpaceID int64
|
||||
EntityID int64
|
||||
TraceID *trace.TraceID
|
||||
LogID *string
|
||||
}
|
||||
38
backend/infra/impl/telemetry/clickhouse/conn.go
Normal file
38
backend/infra/impl/telemetry/clickhouse/conn.go
Normal file
@ -0,0 +1,38 @@
|
||||
package clickhouse
|
||||
|
||||
import (
|
||||
"github.com/ClickHouse/clickhouse-go/v2"
|
||||
ckdriver "gorm.io/driver/clickhouse"
|
||||
"gorm.io/gorm"
|
||||
)
|
||||
|
||||
func newClickhouseDB(cfg *clickhouse.Options) (*gorm.DB, error) {
|
||||
opt := *cfg
|
||||
opt.MaxOpenConns = 0
|
||||
opt.MaxIdleConns = 0
|
||||
opt.ConnMaxLifetime = 0
|
||||
|
||||
conn := clickhouse.OpenDB(&opt)
|
||||
if cfg.MaxIdleConns > 0 {
|
||||
conn.SetMaxIdleConns(cfg.MaxIdleConns)
|
||||
}
|
||||
if cfg.MaxOpenConns > 0 {
|
||||
conn.SetMaxOpenConns(cfg.MaxOpenConns)
|
||||
}
|
||||
if cfg.ConnMaxLifetime > 0 {
|
||||
conn.SetConnMaxLifetime(cfg.ConnMaxLifetime)
|
||||
}
|
||||
|
||||
if err := conn.Ping(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
db, err := gorm.Open(ckdriver.New(ckdriver.Config{
|
||||
Conn: conn,
|
||||
}))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return db, nil
|
||||
}
|
||||
250
backend/infra/impl/telemetry/clickhouse/convert.go
Normal file
250
backend/infra/impl/telemetry/clickhouse/convert.go
Normal file
@ -0,0 +1,250 @@
|
||||
package clickhouse
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/hex"
|
||||
"fmt"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"go.opentelemetry.io/otel/attribute"
|
||||
"go.opentelemetry.io/otel/codes"
|
||||
"go.opentelemetry.io/otel/sdk/resource"
|
||||
tracesdk "go.opentelemetry.io/otel/sdk/trace"
|
||||
"go.opentelemetry.io/otel/trace"
|
||||
|
||||
"github.com/coze-dev/coze-studio/backend/infra/contract/telemetry"
|
||||
"github.com/coze-dev/coze-studio/backend/infra/impl/telemetry/clickhouse/internal/model"
|
||||
)
|
||||
|
||||
func toSpanIndexModel(span tracesdk.ReadOnlySpan) (*model.SpansIndex, error) {
|
||||
index := &model.SpansIndex{
|
||||
SpanID: span.SpanContext().SpanID().String(),
|
||||
TraceID: span.SpanContext().TraceID().String(),
|
||||
ParentSpanID: span.Parent().SpanID().String(),
|
||||
Name: span.Name(),
|
||||
Kind: int8(span.SpanKind()),
|
||||
StatusCode: int64(span.Status().Code),
|
||||
StatusMsg: span.Status().Description,
|
||||
StartTimeMs: uint64(span.StartTime().UnixMilli()),
|
||||
}
|
||||
|
||||
for _, attr := range span.Attributes() {
|
||||
switch attr.Key {
|
||||
case telemetry.AttributeLogID:
|
||||
index.LogID = attr.Value.AsString()
|
||||
case telemetry.AttributeSpaceID:
|
||||
index.SpaceID = attr.Value.AsInt64()
|
||||
case telemetry.AttributeType:
|
||||
index.Type = int32(attr.Value.AsInt64())
|
||||
case telemetry.AttributeUserID:
|
||||
index.UserID = attr.Value.AsInt64()
|
||||
case telemetry.AttributeEntityID:
|
||||
index.EntityID = attr.Value.AsInt64()
|
||||
case telemetry.AttributeEnvironment:
|
||||
index.Env = attr.Value.AsString()
|
||||
case telemetry.AttributeVersion:
|
||||
index.Version = attr.Value.AsString()
|
||||
case telemetry.AttributeInput:
|
||||
index.Input = attr.Value.AsString()
|
||||
default:
|
||||
// do nothing
|
||||
}
|
||||
}
|
||||
|
||||
return index, nil
|
||||
}
|
||||
|
||||
func toSpanDataModel(span tracesdk.ReadOnlySpan) (*model.SpansData, error) {
|
||||
data := &model.SpansData{
|
||||
SpanID: span.SpanContext().SpanID().String(),
|
||||
TraceID: span.SpanContext().TraceID().String(),
|
||||
ParentSpanID: span.Parent().SpanID().String(),
|
||||
Name: span.Name(),
|
||||
Kind: int8(span.SpanKind()),
|
||||
StatusCode: int64(span.Status().Code),
|
||||
StatusMsg: span.Status().Description,
|
||||
ResourceAttributes: make(map[string]string),
|
||||
LogID: "",
|
||||
StartTimeMs: uint64(span.StartTime().UnixMilli()),
|
||||
AttrKeys: make([]string, 0, len(span.Attributes())),
|
||||
AttrValues: make([]string, 0, len(span.Attributes())),
|
||||
}
|
||||
|
||||
for _, attr := range span.Resource().Attributes() {
|
||||
data.ResourceAttributes[string(attr.Key)] = attr.Value.Emit()
|
||||
}
|
||||
|
||||
for _, attr := range span.Attributes() {
|
||||
switch attr.Key {
|
||||
case telemetry.AttributeLogID:
|
||||
data.LogID = attr.Value.AsString()
|
||||
default:
|
||||
data.AttrKeys = append(data.AttrKeys, string(attr.Key))
|
||||
data.AttrValues = append(data.AttrValues, attr.Value.Emit())
|
||||
}
|
||||
}
|
||||
|
||||
return data, nil
|
||||
}
|
||||
|
||||
func fromSpanIndexModel(emptySpanID string) func(index *model.SpansIndex) (tracesdk.ReadOnlySpan, error) {
|
||||
return func(index *model.SpansIndex) (tracesdk.ReadOnlySpan, error) {
|
||||
traceID, err := hex.DecodeString(index.TraceID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if len(traceID) != 16 {
|
||||
return nil, fmt.Errorf("[fromSpanIndexModel] invalid trace ID: %s", index.TraceID)
|
||||
}
|
||||
|
||||
spanID, err := hex.DecodeString(index.SpanID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if len(spanID) != 8 {
|
||||
return nil, fmt.Errorf("[fromSpanIndexModel] invalid span ID: %s", index.SpanID)
|
||||
}
|
||||
|
||||
span := &ckSpan{
|
||||
name: index.Name,
|
||||
spanContext: trace.NewSpanContext(trace.SpanContextConfig{
|
||||
TraceID: trace.TraceID(traceID),
|
||||
SpanID: trace.SpanID(spanID),
|
||||
}),
|
||||
parent: trace.SpanContext{},
|
||||
kind: trace.SpanKind(index.Kind),
|
||||
startTime: time.UnixMilli(int64(index.StartTimeMs)),
|
||||
resource: nil,
|
||||
attributes: []attribute.KeyValue{
|
||||
telemetry.NewSpanAttrLogID(index.LogID),
|
||||
telemetry.NewSpanAttrSpaceID(index.SpaceID),
|
||||
telemetry.NewSpanAttrType(int64(index.Type)),
|
||||
telemetry.NewSpanAttrUserID(index.UserID),
|
||||
telemetry.NewSpanAttrEntityID(index.EntityID),
|
||||
telemetry.NewSpanAttrEnvironment(index.Env),
|
||||
telemetry.NewSpanAttrVersion(index.Version),
|
||||
telemetry.NewSpanAttrInput(index.Input),
|
||||
},
|
||||
status: tracesdk.Status{
|
||||
Code: codes.Code(index.StatusCode),
|
||||
Description: index.StatusMsg,
|
||||
},
|
||||
csCount: 0,
|
||||
}
|
||||
|
||||
if index.ParentSpanID != emptySpanID {
|
||||
parentSpanID, err := hex.DecodeString(index.ParentSpanID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
span.parent = trace.NewSpanContext(trace.SpanContextConfig{
|
||||
TraceID: trace.TraceID(traceID),
|
||||
SpanID: trace.SpanID(parentSpanID),
|
||||
})
|
||||
}
|
||||
|
||||
return span, nil
|
||||
}
|
||||
}
|
||||
|
||||
func fromSpanDataModel(emptySpanID string) func(data *model.SpansData) (tracesdk.ReadOnlySpan, error) {
|
||||
return func(data *model.SpansData) (tracesdk.ReadOnlySpan, error) {
|
||||
traceID, err := hex.DecodeString(data.TraceID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if len(traceID) != 16 {
|
||||
return nil, fmt.Errorf("[fromSpanDataModel] invalid trace ID: %s", data.TraceID)
|
||||
}
|
||||
|
||||
spanID, err := hex.DecodeString(data.SpanID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if len(spanID) != 8 {
|
||||
return nil, fmt.Errorf("[fromSpanDataModel] invalid span ID: %s", data.SpanID)
|
||||
}
|
||||
|
||||
span := &ckSpan{
|
||||
name: data.Name,
|
||||
spanContext: trace.NewSpanContext(trace.SpanContextConfig{
|
||||
TraceID: trace.TraceID(traceID),
|
||||
SpanID: trace.SpanID(spanID),
|
||||
}),
|
||||
parent: trace.SpanContext{},
|
||||
kind: trace.SpanKind(data.Kind),
|
||||
startTime: time.UnixMilli(int64(data.StartTimeMs)),
|
||||
endTime: time.UnixMilli(int64(data.EndTimeMs)),
|
||||
resource: nil,
|
||||
attributes: nil,
|
||||
status: tracesdk.Status{
|
||||
Code: codes.Code(data.StatusCode),
|
||||
Description: data.StatusMsg,
|
||||
},
|
||||
csCount: 0,
|
||||
}
|
||||
|
||||
if data.ParentSpanID != emptySpanID {
|
||||
parentSpanID, err := hex.DecodeString(data.ParentSpanID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
span.parent = trace.NewSpanContext(trace.SpanContextConfig{
|
||||
TraceID: trace.TraceID(traceID),
|
||||
SpanID: trace.SpanID(parentSpanID),
|
||||
})
|
||||
}
|
||||
|
||||
if len(data.AttrKeys) != len(data.AttrValues) {
|
||||
return nil, fmt.Errorf("[fromSpanDataModel] invalid attribute count, keys=%d, vals=%d", len(data.AttrKeys), len(data.AttrValues))
|
||||
}
|
||||
|
||||
var ra []attribute.KeyValue
|
||||
for k, v := range data.ResourceAttributes {
|
||||
// todo: resource value type
|
||||
ra = append(ra, attribute.String(k, v))
|
||||
}
|
||||
rs, err := resource.New(context.Background(), resource.WithAttributes(ra...))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
span.resource = rs
|
||||
|
||||
for i := range data.AttrKeys {
|
||||
key, val := data.AttrKeys[i], data.AttrValues[i]
|
||||
var kv attribute.KeyValue
|
||||
|
||||
switch key {
|
||||
case telemetry.AttributeSpaceID,
|
||||
telemetry.AttributeType,
|
||||
telemetry.AttributeUserID,
|
||||
telemetry.AttributeEntityID,
|
||||
telemetry.AttributeInputTokens,
|
||||
telemetry.AttributeOutputToken:
|
||||
i64, err := strconv.ParseInt(val, 10, 64)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("[fromSpanDataModel] invalid attribute, key=%s, value=%s, expect value as int64, err=%w", key, val, err)
|
||||
}
|
||||
kv = attribute.Int64(key, i64)
|
||||
case telemetry.AttributeLogID,
|
||||
telemetry.AttributeEnvironment,
|
||||
telemetry.AttributeVersion,
|
||||
telemetry.AttributeInput,
|
||||
telemetry.AttributeOutput,
|
||||
telemetry.AttributeModel:
|
||||
kv = attribute.String(key, val)
|
||||
default:
|
||||
kv = assertAttribute(key, val)
|
||||
}
|
||||
|
||||
span.attributes = append(span.attributes, kv)
|
||||
}
|
||||
|
||||
return span, nil
|
||||
}
|
||||
}
|
||||
|
||||
func assertAttribute(key string, val string) attribute.KeyValue {
|
||||
return attribute.String(key, val)
|
||||
}
|
||||
66
backend/infra/impl/telemetry/clickhouse/exporter.go
Normal file
66
backend/infra/impl/telemetry/clickhouse/exporter.go
Normal file
@ -0,0 +1,66 @@
|
||||
package clickhouse
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
"go.opentelemetry.io/otel/sdk/trace"
|
||||
|
||||
"github.com/coze-dev/coze-studio/backend/infra/impl/telemetry/clickhouse/internal/model"
|
||||
"github.com/coze-dev/coze-studio/backend/infra/impl/telemetry/clickhouse/internal/query"
|
||||
"github.com/coze-dev/coze-studio/backend/pkg/logs"
|
||||
)
|
||||
|
||||
type exporter struct {
|
||||
query *query.Query
|
||||
|
||||
indexRootOnly bool
|
||||
}
|
||||
|
||||
func (e *exporter) ExportSpans(ctx context.Context, spans []trace.ReadOnlySpan) (err error) {
|
||||
defer func() {
|
||||
if err != nil {
|
||||
err = fmt.Errorf("[ExportSpans] failed when exporting spans: %w", err)
|
||||
logs.CtxErrorf(ctx, "%v", err)
|
||||
}
|
||||
}()
|
||||
|
||||
var (
|
||||
spansIndex []*model.SpansIndex
|
||||
spansData []*model.SpansData
|
||||
)
|
||||
|
||||
for _, span := range spans {
|
||||
if !e.indexRootOnly || !span.Parent().HasSpanID() {
|
||||
index, err := toSpanIndexModel(span)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
spansIndex = append(spansIndex, index)
|
||||
}
|
||||
|
||||
data, err := toSpanDataModel(span)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
spansData = append(spansData, data)
|
||||
}
|
||||
|
||||
if len(spansData) > 0 {
|
||||
if err = e.query.SpansData.WithContext(ctx).Debug().Create(spansData...); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
if len(spansIndex) > 0 {
|
||||
if err = e.query.SpansIndex.WithContext(ctx).Debug().Create(spansIndex...); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (e *exporter) Shutdown(ctx context.Context) error {
|
||||
return nil
|
||||
}
|
||||
@ -0,0 +1,29 @@
|
||||
// Code generated by gorm.io/gen. DO NOT EDIT.
|
||||
// Code generated by gorm.io/gen. DO NOT EDIT.
|
||||
// Code generated by gorm.io/gen. DO NOT EDIT.
|
||||
|
||||
package model
|
||||
|
||||
const TableNameSpansData = "spans_data"
|
||||
|
||||
// SpansData mapped from table <spans_data>
|
||||
type SpansData struct {
|
||||
SpanID string `gorm:"column:span_id;type:String;primaryKey" json:"span_id"`
|
||||
TraceID string `gorm:"column:trace_id;type:String;primaryKey" json:"trace_id"`
|
||||
ParentSpanID string `gorm:"column:parent_span_id;type:String;not null" json:"parent_span_id"`
|
||||
Name string `gorm:"column:name;type:String;not null" json:"name"`
|
||||
Kind int8 `gorm:"column:kind;type:Int8;not null" json:"kind"`
|
||||
StatusCode int64 `gorm:"column:status_code;type:Int64;not null" json:"status_code"`
|
||||
StatusMsg string `gorm:"column:status_msg;type:String;not null" json:"status_msg"`
|
||||
ResourceAttributes map[string]string `gorm:"column:resource_attributes;type:Map(String, String);not null" json:"resource_attributes"`
|
||||
StartTimeMs uint64 `gorm:"column:start_time_ms;type:UInt64;not null" json:"start_time_ms"`
|
||||
EndTimeMs uint64 `gorm:"column:end_time_ms;type:UInt64;not null" json:"end_time_ms"`
|
||||
LogID string `gorm:"column:log_id;type:String;not null" json:"log_id"`
|
||||
AttrKeys []string `gorm:"column:attr_keys;type:Array(LowCardinality(String));not null" json:"attr_keys"`
|
||||
AttrValues []string `gorm:"column:attr_values;type:Array(String);not null" json:"attr_values"`
|
||||
}
|
||||
|
||||
// TableName SpansData's table name
|
||||
func (*SpansData) TableName() string {
|
||||
return TableNameSpansData
|
||||
}
|
||||
@ -0,0 +1,32 @@
|
||||
// Code generated by gorm.io/gen. DO NOT EDIT.
|
||||
// Code generated by gorm.io/gen. DO NOT EDIT.
|
||||
// Code generated by gorm.io/gen. DO NOT EDIT.
|
||||
|
||||
package model
|
||||
|
||||
const TableNameSpansIndex = "spans_index"
|
||||
|
||||
// SpansIndex mapped from table <spans_index>
|
||||
type SpansIndex struct {
|
||||
SpanID string `gorm:"column:span_id;type:String;not null" json:"span_id"`
|
||||
TraceID string `gorm:"column:trace_id;type:String;not null" json:"trace_id"`
|
||||
ParentSpanID string `gorm:"column:parent_span_id;type:String;not null" json:"parent_span_id"`
|
||||
Name string `gorm:"column:name;type:String;not null" json:"name"`
|
||||
Kind int8 `gorm:"column:kind;type:Int8;not null" json:"kind"`
|
||||
StatusCode int64 `gorm:"column:status_code;type:Int64;not null" json:"status_code"`
|
||||
StatusMsg string `gorm:"column:status_msg;type:String;not null" json:"status_msg"`
|
||||
LogID string `gorm:"column:log_id;type:String;not null" json:"log_id"`
|
||||
SpaceID int64 `gorm:"column:space_id;type:Int64;primaryKey" json:"space_id"`
|
||||
Type int32 `gorm:"column:type;type:Int32;not null" json:"type"`
|
||||
UserID int64 `gorm:"column:user_id;type:Int64;not null" json:"user_id"`
|
||||
EntityID int64 `gorm:"column:entity_id;type:Int64;primaryKey" json:"entity_id"`
|
||||
Env string `gorm:"column:env;type:String;not null" json:"env"`
|
||||
Version string `gorm:"column:version;type:String;not null" json:"version"`
|
||||
Input string `gorm:"column:input;type:String;not null" json:"input"`
|
||||
StartTimeMs uint64 `gorm:"column:start_time_ms;type:UInt64;primaryKey" json:"start_time_ms"`
|
||||
}
|
||||
|
||||
// TableName SpansIndex's table name
|
||||
func (*SpansIndex) TableName() string {
|
||||
return TableNameSpansIndex
|
||||
}
|
||||
111
backend/infra/impl/telemetry/clickhouse/internal/query/gen.go
Normal file
111
backend/infra/impl/telemetry/clickhouse/internal/query/gen.go
Normal file
@ -0,0 +1,111 @@
|
||||
// Code generated by gorm.io/gen. DO NOT EDIT.
|
||||
// Code generated by gorm.io/gen. DO NOT EDIT.
|
||||
// Code generated by gorm.io/gen. DO NOT EDIT.
|
||||
|
||||
package query
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
|
||||
"gorm.io/gorm"
|
||||
|
||||
"gorm.io/gen"
|
||||
|
||||
"gorm.io/plugin/dbresolver"
|
||||
)
|
||||
|
||||
var (
|
||||
Q = new(Query)
|
||||
SpansData *spansData
|
||||
SpansIndex *spansIndex
|
||||
)
|
||||
|
||||
func SetDefault(db *gorm.DB, opts ...gen.DOOption) {
|
||||
*Q = *Use(db, opts...)
|
||||
SpansData = &Q.SpansData
|
||||
SpansIndex = &Q.SpansIndex
|
||||
}
|
||||
|
||||
func Use(db *gorm.DB, opts ...gen.DOOption) *Query {
|
||||
return &Query{
|
||||
db: db,
|
||||
SpansData: newSpansData(db, opts...),
|
||||
SpansIndex: newSpansIndex(db, opts...),
|
||||
}
|
||||
}
|
||||
|
||||
type Query struct {
|
||||
db *gorm.DB
|
||||
|
||||
SpansData spansData
|
||||
SpansIndex spansIndex
|
||||
}
|
||||
|
||||
func (q *Query) Available() bool { return q.db != nil }
|
||||
|
||||
func (q *Query) clone(db *gorm.DB) *Query {
|
||||
return &Query{
|
||||
db: db,
|
||||
SpansData: q.SpansData.clone(db),
|
||||
SpansIndex: q.SpansIndex.clone(db),
|
||||
}
|
||||
}
|
||||
|
||||
func (q *Query) ReadDB() *Query {
|
||||
return q.ReplaceDB(q.db.Clauses(dbresolver.Read))
|
||||
}
|
||||
|
||||
func (q *Query) WriteDB() *Query {
|
||||
return q.ReplaceDB(q.db.Clauses(dbresolver.Write))
|
||||
}
|
||||
|
||||
func (q *Query) ReplaceDB(db *gorm.DB) *Query {
|
||||
return &Query{
|
||||
db: db,
|
||||
SpansData: q.SpansData.replaceDB(db),
|
||||
SpansIndex: q.SpansIndex.replaceDB(db),
|
||||
}
|
||||
}
|
||||
|
||||
type queryCtx struct {
|
||||
SpansData ISpansDataDo
|
||||
SpansIndex ISpansIndexDo
|
||||
}
|
||||
|
||||
func (q *Query) WithContext(ctx context.Context) *queryCtx {
|
||||
return &queryCtx{
|
||||
SpansData: q.SpansData.WithContext(ctx),
|
||||
SpansIndex: q.SpansIndex.WithContext(ctx),
|
||||
}
|
||||
}
|
||||
|
||||
func (q *Query) Transaction(fc func(tx *Query) error, opts ...*sql.TxOptions) error {
|
||||
return q.db.Transaction(func(tx *gorm.DB) error { return fc(q.clone(tx)) }, opts...)
|
||||
}
|
||||
|
||||
func (q *Query) Begin(opts ...*sql.TxOptions) *QueryTx {
|
||||
tx := q.db.Begin(opts...)
|
||||
return &QueryTx{Query: q.clone(tx), Error: tx.Error}
|
||||
}
|
||||
|
||||
type QueryTx struct {
|
||||
*Query
|
||||
Error error
|
||||
}
|
||||
|
||||
func (q *QueryTx) Commit() error {
|
||||
return q.db.Commit().Error
|
||||
}
|
||||
|
||||
func (q *QueryTx) Rollback() error {
|
||||
return q.db.Rollback().Error
|
||||
}
|
||||
|
||||
func (q *QueryTx) SavePoint(name string) error {
|
||||
return q.db.SavePoint(name).Error
|
||||
}
|
||||
|
||||
func (q *QueryTx) RollbackTo(name string) error {
|
||||
return q.db.RollbackTo(name).Error
|
||||
}
|
||||
@ -0,0 +1,428 @@
|
||||
// Code generated by gorm.io/gen. DO NOT EDIT.
|
||||
// Code generated by gorm.io/gen. DO NOT EDIT.
|
||||
// Code generated by gorm.io/gen. DO NOT EDIT.
|
||||
|
||||
package query
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"gorm.io/gorm"
|
||||
"gorm.io/gorm/clause"
|
||||
"gorm.io/gorm/schema"
|
||||
|
||||
"gorm.io/gen"
|
||||
"gorm.io/gen/field"
|
||||
|
||||
"gorm.io/plugin/dbresolver"
|
||||
|
||||
"github.com/coze-dev/coze-studio/backend/infra/impl/telemetry/clickhouse/internal/model"
|
||||
)
|
||||
|
||||
func newSpansData(db *gorm.DB, opts ...gen.DOOption) spansData {
|
||||
_spansData := spansData{}
|
||||
|
||||
_spansData.spansDataDo.UseDB(db, opts...)
|
||||
_spansData.spansDataDo.UseModel(&model.SpansData{})
|
||||
|
||||
tableName := _spansData.spansDataDo.TableName()
|
||||
_spansData.ALL = field.NewAsterisk(tableName)
|
||||
_spansData.SpanID = field.NewString(tableName, "span_id")
|
||||
_spansData.TraceID = field.NewString(tableName, "trace_id")
|
||||
_spansData.ParentSpanID = field.NewString(tableName, "parent_span_id")
|
||||
_spansData.Name = field.NewString(tableName, "name")
|
||||
_spansData.Kind = field.NewInt8(tableName, "kind")
|
||||
_spansData.StatusCode = field.NewInt64(tableName, "status_code")
|
||||
_spansData.StatusMsg = field.NewString(tableName, "status_msg")
|
||||
_spansData.ResourceAttributes = field.NewField(tableName, "resource_attributes")
|
||||
_spansData.StartTimeMs = field.NewUint64(tableName, "start_time_ms")
|
||||
_spansData.EndTimeMs = field.NewUint64(tableName, "end_time_ms")
|
||||
_spansData.LogID = field.NewString(tableName, "log_id")
|
||||
_spansData.AttrKeys = field.NewField(tableName, "attr_keys")
|
||||
_spansData.AttrValues = field.NewField(tableName, "attr_values")
|
||||
|
||||
_spansData.fillFieldMap()
|
||||
|
||||
return _spansData
|
||||
}
|
||||
|
||||
type spansData struct {
|
||||
spansDataDo
|
||||
|
||||
ALL field.Asterisk
|
||||
SpanID field.String
|
||||
TraceID field.String
|
||||
ParentSpanID field.String
|
||||
Name field.String
|
||||
Kind field.Int8
|
||||
StatusCode field.Int64
|
||||
StatusMsg field.String
|
||||
ResourceAttributes field.Field
|
||||
StartTimeMs field.Uint64
|
||||
EndTimeMs field.Uint64
|
||||
LogID field.String
|
||||
AttrKeys field.Field
|
||||
AttrValues field.Field
|
||||
|
||||
fieldMap map[string]field.Expr
|
||||
}
|
||||
|
||||
func (s spansData) Table(newTableName string) *spansData {
|
||||
s.spansDataDo.UseTable(newTableName)
|
||||
return s.updateTableName(newTableName)
|
||||
}
|
||||
|
||||
func (s spansData) As(alias string) *spansData {
|
||||
s.spansDataDo.DO = *(s.spansDataDo.As(alias).(*gen.DO))
|
||||
return s.updateTableName(alias)
|
||||
}
|
||||
|
||||
func (s *spansData) updateTableName(table string) *spansData {
|
||||
s.ALL = field.NewAsterisk(table)
|
||||
s.SpanID = field.NewString(table, "span_id")
|
||||
s.TraceID = field.NewString(table, "trace_id")
|
||||
s.ParentSpanID = field.NewString(table, "parent_span_id")
|
||||
s.Name = field.NewString(table, "name")
|
||||
s.Kind = field.NewInt8(table, "kind")
|
||||
s.StatusCode = field.NewInt64(table, "status_code")
|
||||
s.StatusMsg = field.NewString(table, "status_msg")
|
||||
s.ResourceAttributes = field.NewField(table, "resource_attributes")
|
||||
s.StartTimeMs = field.NewUint64(table, "start_time_ms")
|
||||
s.EndTimeMs = field.NewUint64(table, "end_time_ms")
|
||||
s.LogID = field.NewString(table, "log_id")
|
||||
s.AttrKeys = field.NewField(table, "attr_keys")
|
||||
s.AttrValues = field.NewField(table, "attr_values")
|
||||
|
||||
s.fillFieldMap()
|
||||
|
||||
return s
|
||||
}
|
||||
|
||||
func (s *spansData) GetFieldByName(fieldName string) (field.OrderExpr, bool) {
|
||||
_f, ok := s.fieldMap[fieldName]
|
||||
if !ok || _f == nil {
|
||||
return nil, false
|
||||
}
|
||||
_oe, ok := _f.(field.OrderExpr)
|
||||
return _oe, ok
|
||||
}
|
||||
|
||||
func (s *spansData) fillFieldMap() {
|
||||
s.fieldMap = make(map[string]field.Expr, 13)
|
||||
s.fieldMap["span_id"] = s.SpanID
|
||||
s.fieldMap["trace_id"] = s.TraceID
|
||||
s.fieldMap["parent_span_id"] = s.ParentSpanID
|
||||
s.fieldMap["name"] = s.Name
|
||||
s.fieldMap["kind"] = s.Kind
|
||||
s.fieldMap["status_code"] = s.StatusCode
|
||||
s.fieldMap["status_msg"] = s.StatusMsg
|
||||
s.fieldMap["resource_attributes"] = s.ResourceAttributes
|
||||
s.fieldMap["start_time_ms"] = s.StartTimeMs
|
||||
s.fieldMap["end_time_ms"] = s.EndTimeMs
|
||||
s.fieldMap["log_id"] = s.LogID
|
||||
s.fieldMap["attr_keys"] = s.AttrKeys
|
||||
s.fieldMap["attr_values"] = s.AttrValues
|
||||
}
|
||||
|
||||
func (s spansData) clone(db *gorm.DB) spansData {
|
||||
s.spansDataDo.ReplaceConnPool(db.Statement.ConnPool)
|
||||
return s
|
||||
}
|
||||
|
||||
func (s spansData) replaceDB(db *gorm.DB) spansData {
|
||||
s.spansDataDo.ReplaceDB(db)
|
||||
return s
|
||||
}
|
||||
|
||||
type spansDataDo struct{ gen.DO }
|
||||
|
||||
type ISpansDataDo interface {
|
||||
gen.SubQuery
|
||||
Debug() ISpansDataDo
|
||||
WithContext(ctx context.Context) ISpansDataDo
|
||||
WithResult(fc func(tx gen.Dao)) gen.ResultInfo
|
||||
ReplaceDB(db *gorm.DB)
|
||||
ReadDB() ISpansDataDo
|
||||
WriteDB() ISpansDataDo
|
||||
As(alias string) gen.Dao
|
||||
Session(config *gorm.Session) ISpansDataDo
|
||||
Columns(cols ...field.Expr) gen.Columns
|
||||
Clauses(conds ...clause.Expression) ISpansDataDo
|
||||
Not(conds ...gen.Condition) ISpansDataDo
|
||||
Or(conds ...gen.Condition) ISpansDataDo
|
||||
Select(conds ...field.Expr) ISpansDataDo
|
||||
Where(conds ...gen.Condition) ISpansDataDo
|
||||
Order(conds ...field.Expr) ISpansDataDo
|
||||
Distinct(cols ...field.Expr) ISpansDataDo
|
||||
Omit(cols ...field.Expr) ISpansDataDo
|
||||
Join(table schema.Tabler, on ...field.Expr) ISpansDataDo
|
||||
LeftJoin(table schema.Tabler, on ...field.Expr) ISpansDataDo
|
||||
RightJoin(table schema.Tabler, on ...field.Expr) ISpansDataDo
|
||||
Group(cols ...field.Expr) ISpansDataDo
|
||||
Having(conds ...gen.Condition) ISpansDataDo
|
||||
Limit(limit int) ISpansDataDo
|
||||
Offset(offset int) ISpansDataDo
|
||||
Count() (count int64, err error)
|
||||
Scopes(funcs ...func(gen.Dao) gen.Dao) ISpansDataDo
|
||||
Unscoped() ISpansDataDo
|
||||
Create(values ...*model.SpansData) error
|
||||
CreateInBatches(values []*model.SpansData, batchSize int) error
|
||||
Save(values ...*model.SpansData) error
|
||||
First() (*model.SpansData, error)
|
||||
Take() (*model.SpansData, error)
|
||||
Last() (*model.SpansData, error)
|
||||
Find() ([]*model.SpansData, error)
|
||||
FindInBatch(batchSize int, fc func(tx gen.Dao, batch int) error) (results []*model.SpansData, err error)
|
||||
FindInBatches(result *[]*model.SpansData, batchSize int, fc func(tx gen.Dao, batch int) error) error
|
||||
Pluck(column field.Expr, dest interface{}) error
|
||||
Delete(...*model.SpansData) (info gen.ResultInfo, err error)
|
||||
Update(column field.Expr, value interface{}) (info gen.ResultInfo, err error)
|
||||
UpdateSimple(columns ...field.AssignExpr) (info gen.ResultInfo, err error)
|
||||
Updates(value interface{}) (info gen.ResultInfo, err error)
|
||||
UpdateColumn(column field.Expr, value interface{}) (info gen.ResultInfo, err error)
|
||||
UpdateColumnSimple(columns ...field.AssignExpr) (info gen.ResultInfo, err error)
|
||||
UpdateColumns(value interface{}) (info gen.ResultInfo, err error)
|
||||
UpdateFrom(q gen.SubQuery) gen.Dao
|
||||
Attrs(attrs ...field.AssignExpr) ISpansDataDo
|
||||
Assign(attrs ...field.AssignExpr) ISpansDataDo
|
||||
Joins(fields ...field.RelationField) ISpansDataDo
|
||||
Preload(fields ...field.RelationField) ISpansDataDo
|
||||
FirstOrInit() (*model.SpansData, error)
|
||||
FirstOrCreate() (*model.SpansData, error)
|
||||
FindByPage(offset int, limit int) (result []*model.SpansData, count int64, err error)
|
||||
ScanByPage(result interface{}, offset int, limit int) (count int64, err error)
|
||||
Scan(result interface{}) (err error)
|
||||
Returning(value interface{}, columns ...string) ISpansDataDo
|
||||
UnderlyingDB() *gorm.DB
|
||||
schema.Tabler
|
||||
}
|
||||
|
||||
func (s spansDataDo) Debug() ISpansDataDo {
|
||||
return s.withDO(s.DO.Debug())
|
||||
}
|
||||
|
||||
func (s spansDataDo) WithContext(ctx context.Context) ISpansDataDo {
|
||||
return s.withDO(s.DO.WithContext(ctx))
|
||||
}
|
||||
|
||||
func (s spansDataDo) ReadDB() ISpansDataDo {
|
||||
return s.Clauses(dbresolver.Read)
|
||||
}
|
||||
|
||||
func (s spansDataDo) WriteDB() ISpansDataDo {
|
||||
return s.Clauses(dbresolver.Write)
|
||||
}
|
||||
|
||||
func (s spansDataDo) Session(config *gorm.Session) ISpansDataDo {
|
||||
return s.withDO(s.DO.Session(config))
|
||||
}
|
||||
|
||||
func (s spansDataDo) Clauses(conds ...clause.Expression) ISpansDataDo {
|
||||
return s.withDO(s.DO.Clauses(conds...))
|
||||
}
|
||||
|
||||
func (s spansDataDo) Returning(value interface{}, columns ...string) ISpansDataDo {
|
||||
return s.withDO(s.DO.Returning(value, columns...))
|
||||
}
|
||||
|
||||
func (s spansDataDo) Not(conds ...gen.Condition) ISpansDataDo {
|
||||
return s.withDO(s.DO.Not(conds...))
|
||||
}
|
||||
|
||||
func (s spansDataDo) Or(conds ...gen.Condition) ISpansDataDo {
|
||||
return s.withDO(s.DO.Or(conds...))
|
||||
}
|
||||
|
||||
func (s spansDataDo) Select(conds ...field.Expr) ISpansDataDo {
|
||||
return s.withDO(s.DO.Select(conds...))
|
||||
}
|
||||
|
||||
func (s spansDataDo) Where(conds ...gen.Condition) ISpansDataDo {
|
||||
return s.withDO(s.DO.Where(conds...))
|
||||
}
|
||||
|
||||
func (s spansDataDo) Order(conds ...field.Expr) ISpansDataDo {
|
||||
return s.withDO(s.DO.Order(conds...))
|
||||
}
|
||||
|
||||
func (s spansDataDo) Distinct(cols ...field.Expr) ISpansDataDo {
|
||||
return s.withDO(s.DO.Distinct(cols...))
|
||||
}
|
||||
|
||||
func (s spansDataDo) Omit(cols ...field.Expr) ISpansDataDo {
|
||||
return s.withDO(s.DO.Omit(cols...))
|
||||
}
|
||||
|
||||
func (s spansDataDo) Join(table schema.Tabler, on ...field.Expr) ISpansDataDo {
|
||||
return s.withDO(s.DO.Join(table, on...))
|
||||
}
|
||||
|
||||
func (s spansDataDo) LeftJoin(table schema.Tabler, on ...field.Expr) ISpansDataDo {
|
||||
return s.withDO(s.DO.LeftJoin(table, on...))
|
||||
}
|
||||
|
||||
func (s spansDataDo) RightJoin(table schema.Tabler, on ...field.Expr) ISpansDataDo {
|
||||
return s.withDO(s.DO.RightJoin(table, on...))
|
||||
}
|
||||
|
||||
func (s spansDataDo) Group(cols ...field.Expr) ISpansDataDo {
|
||||
return s.withDO(s.DO.Group(cols...))
|
||||
}
|
||||
|
||||
func (s spansDataDo) Having(conds ...gen.Condition) ISpansDataDo {
|
||||
return s.withDO(s.DO.Having(conds...))
|
||||
}
|
||||
|
||||
func (s spansDataDo) Limit(limit int) ISpansDataDo {
|
||||
return s.withDO(s.DO.Limit(limit))
|
||||
}
|
||||
|
||||
func (s spansDataDo) Offset(offset int) ISpansDataDo {
|
||||
return s.withDO(s.DO.Offset(offset))
|
||||
}
|
||||
|
||||
func (s spansDataDo) Scopes(funcs ...func(gen.Dao) gen.Dao) ISpansDataDo {
|
||||
return s.withDO(s.DO.Scopes(funcs...))
|
||||
}
|
||||
|
||||
func (s spansDataDo) Unscoped() ISpansDataDo {
|
||||
return s.withDO(s.DO.Unscoped())
|
||||
}
|
||||
|
||||
func (s spansDataDo) Create(values ...*model.SpansData) error {
|
||||
if len(values) == 0 {
|
||||
return nil
|
||||
}
|
||||
return s.DO.Create(values)
|
||||
}
|
||||
|
||||
func (s spansDataDo) CreateInBatches(values []*model.SpansData, batchSize int) error {
|
||||
return s.DO.CreateInBatches(values, batchSize)
|
||||
}
|
||||
|
||||
// Save : !!! underlying implementation is different with GORM
|
||||
// The method is equivalent to executing the statement: db.Clauses(clause.OnConflict{UpdateAll: true}).Create(values)
|
||||
func (s spansDataDo) Save(values ...*model.SpansData) error {
|
||||
if len(values) == 0 {
|
||||
return nil
|
||||
}
|
||||
return s.DO.Save(values)
|
||||
}
|
||||
|
||||
func (s spansDataDo) First() (*model.SpansData, error) {
|
||||
if result, err := s.DO.First(); err != nil {
|
||||
return nil, err
|
||||
} else {
|
||||
return result.(*model.SpansData), nil
|
||||
}
|
||||
}
|
||||
|
||||
func (s spansDataDo) Take() (*model.SpansData, error) {
|
||||
if result, err := s.DO.Take(); err != nil {
|
||||
return nil, err
|
||||
} else {
|
||||
return result.(*model.SpansData), nil
|
||||
}
|
||||
}
|
||||
|
||||
func (s spansDataDo) Last() (*model.SpansData, error) {
|
||||
if result, err := s.DO.Last(); err != nil {
|
||||
return nil, err
|
||||
} else {
|
||||
return result.(*model.SpansData), nil
|
||||
}
|
||||
}
|
||||
|
||||
func (s spansDataDo) Find() ([]*model.SpansData, error) {
|
||||
result, err := s.DO.Find()
|
||||
return result.([]*model.SpansData), err
|
||||
}
|
||||
|
||||
func (s spansDataDo) FindInBatch(batchSize int, fc func(tx gen.Dao, batch int) error) (results []*model.SpansData, err error) {
|
||||
buf := make([]*model.SpansData, 0, batchSize)
|
||||
err = s.DO.FindInBatches(&buf, batchSize, func(tx gen.Dao, batch int) error {
|
||||
defer func() { results = append(results, buf...) }()
|
||||
return fc(tx, batch)
|
||||
})
|
||||
return results, err
|
||||
}
|
||||
|
||||
func (s spansDataDo) FindInBatches(result *[]*model.SpansData, batchSize int, fc func(tx gen.Dao, batch int) error) error {
|
||||
return s.DO.FindInBatches(result, batchSize, fc)
|
||||
}
|
||||
|
||||
func (s spansDataDo) Attrs(attrs ...field.AssignExpr) ISpansDataDo {
|
||||
return s.withDO(s.DO.Attrs(attrs...))
|
||||
}
|
||||
|
||||
func (s spansDataDo) Assign(attrs ...field.AssignExpr) ISpansDataDo {
|
||||
return s.withDO(s.DO.Assign(attrs...))
|
||||
}
|
||||
|
||||
func (s spansDataDo) Joins(fields ...field.RelationField) ISpansDataDo {
|
||||
for _, _f := range fields {
|
||||
s = *s.withDO(s.DO.Joins(_f))
|
||||
}
|
||||
return &s
|
||||
}
|
||||
|
||||
func (s spansDataDo) Preload(fields ...field.RelationField) ISpansDataDo {
|
||||
for _, _f := range fields {
|
||||
s = *s.withDO(s.DO.Preload(_f))
|
||||
}
|
||||
return &s
|
||||
}
|
||||
|
||||
func (s spansDataDo) FirstOrInit() (*model.SpansData, error) {
|
||||
if result, err := s.DO.FirstOrInit(); err != nil {
|
||||
return nil, err
|
||||
} else {
|
||||
return result.(*model.SpansData), nil
|
||||
}
|
||||
}
|
||||
|
||||
func (s spansDataDo) FirstOrCreate() (*model.SpansData, error) {
|
||||
if result, err := s.DO.FirstOrCreate(); err != nil {
|
||||
return nil, err
|
||||
} else {
|
||||
return result.(*model.SpansData), nil
|
||||
}
|
||||
}
|
||||
|
||||
func (s spansDataDo) FindByPage(offset int, limit int) (result []*model.SpansData, count int64, err error) {
|
||||
result, err = s.Offset(offset).Limit(limit).Find()
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
if size := len(result); 0 < limit && 0 < size && size < limit {
|
||||
count = int64(size + offset)
|
||||
return
|
||||
}
|
||||
|
||||
count, err = s.Offset(-1).Limit(-1).Count()
|
||||
return
|
||||
}
|
||||
|
||||
func (s spansDataDo) ScanByPage(result interface{}, offset int, limit int) (count int64, err error) {
|
||||
count, err = s.Count()
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
err = s.Offset(offset).Limit(limit).Scan(result)
|
||||
return
|
||||
}
|
||||
|
||||
func (s spansDataDo) Scan(result interface{}) (err error) {
|
||||
return s.DO.Scan(result)
|
||||
}
|
||||
|
||||
func (s spansDataDo) Delete(models ...*model.SpansData) (result gen.ResultInfo, err error) {
|
||||
return s.DO.Delete(models)
|
||||
}
|
||||
|
||||
func (s *spansDataDo) withDO(do gen.Dao) *spansDataDo {
|
||||
s.DO = *do.(*gen.DO)
|
||||
return s
|
||||
}
|
||||
@ -0,0 +1,440 @@
|
||||
// Code generated by gorm.io/gen. DO NOT EDIT.
|
||||
// Code generated by gorm.io/gen. DO NOT EDIT.
|
||||
// Code generated by gorm.io/gen. DO NOT EDIT.
|
||||
|
||||
package query
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"gorm.io/gorm"
|
||||
"gorm.io/gorm/clause"
|
||||
"gorm.io/gorm/schema"
|
||||
|
||||
"gorm.io/gen"
|
||||
"gorm.io/gen/field"
|
||||
|
||||
"gorm.io/plugin/dbresolver"
|
||||
|
||||
"github.com/coze-dev/coze-studio/backend/infra/impl/telemetry/clickhouse/internal/model"
|
||||
)
|
||||
|
||||
func newSpansIndex(db *gorm.DB, opts ...gen.DOOption) spansIndex {
|
||||
_spansIndex := spansIndex{}
|
||||
|
||||
_spansIndex.spansIndexDo.UseDB(db, opts...)
|
||||
_spansIndex.spansIndexDo.UseModel(&model.SpansIndex{})
|
||||
|
||||
tableName := _spansIndex.spansIndexDo.TableName()
|
||||
_spansIndex.ALL = field.NewAsterisk(tableName)
|
||||
_spansIndex.SpanID = field.NewString(tableName, "span_id")
|
||||
_spansIndex.TraceID = field.NewString(tableName, "trace_id")
|
||||
_spansIndex.ParentSpanID = field.NewString(tableName, "parent_span_id")
|
||||
_spansIndex.Name = field.NewString(tableName, "name")
|
||||
_spansIndex.Kind = field.NewInt8(tableName, "kind")
|
||||
_spansIndex.StatusCode = field.NewInt64(tableName, "status_code")
|
||||
_spansIndex.StatusMsg = field.NewString(tableName, "status_msg")
|
||||
_spansIndex.LogID = field.NewString(tableName, "log_id")
|
||||
_spansIndex.SpaceID = field.NewInt64(tableName, "space_id")
|
||||
_spansIndex.Type = field.NewInt32(tableName, "type")
|
||||
_spansIndex.UserID = field.NewInt64(tableName, "user_id")
|
||||
_spansIndex.EntityID = field.NewInt64(tableName, "entity_id")
|
||||
_spansIndex.Env = field.NewString(tableName, "env")
|
||||
_spansIndex.Version = field.NewString(tableName, "version")
|
||||
_spansIndex.Input = field.NewString(tableName, "input")
|
||||
_spansIndex.StartTimeMs = field.NewUint64(tableName, "start_time_ms")
|
||||
|
||||
_spansIndex.fillFieldMap()
|
||||
|
||||
return _spansIndex
|
||||
}
|
||||
|
||||
type spansIndex struct {
|
||||
spansIndexDo
|
||||
|
||||
ALL field.Asterisk
|
||||
SpanID field.String
|
||||
TraceID field.String
|
||||
ParentSpanID field.String
|
||||
Name field.String
|
||||
Kind field.Int8
|
||||
StatusCode field.Int64
|
||||
StatusMsg field.String
|
||||
LogID field.String
|
||||
SpaceID field.Int64
|
||||
Type field.Int32
|
||||
UserID field.Int64
|
||||
EntityID field.Int64
|
||||
Env field.String
|
||||
Version field.String
|
||||
Input field.String
|
||||
StartTimeMs field.Uint64
|
||||
|
||||
fieldMap map[string]field.Expr
|
||||
}
|
||||
|
||||
func (s spansIndex) Table(newTableName string) *spansIndex {
|
||||
s.spansIndexDo.UseTable(newTableName)
|
||||
return s.updateTableName(newTableName)
|
||||
}
|
||||
|
||||
func (s spansIndex) As(alias string) *spansIndex {
|
||||
s.spansIndexDo.DO = *(s.spansIndexDo.As(alias).(*gen.DO))
|
||||
return s.updateTableName(alias)
|
||||
}
|
||||
|
||||
func (s *spansIndex) updateTableName(table string) *spansIndex {
|
||||
s.ALL = field.NewAsterisk(table)
|
||||
s.SpanID = field.NewString(table, "span_id")
|
||||
s.TraceID = field.NewString(table, "trace_id")
|
||||
s.ParentSpanID = field.NewString(table, "parent_span_id")
|
||||
s.Name = field.NewString(table, "name")
|
||||
s.Kind = field.NewInt8(table, "kind")
|
||||
s.StatusCode = field.NewInt64(table, "status_code")
|
||||
s.StatusMsg = field.NewString(table, "status_msg")
|
||||
s.LogID = field.NewString(table, "log_id")
|
||||
s.SpaceID = field.NewInt64(table, "space_id")
|
||||
s.Type = field.NewInt32(table, "type")
|
||||
s.UserID = field.NewInt64(table, "user_id")
|
||||
s.EntityID = field.NewInt64(table, "entity_id")
|
||||
s.Env = field.NewString(table, "env")
|
||||
s.Version = field.NewString(table, "version")
|
||||
s.Input = field.NewString(table, "input")
|
||||
s.StartTimeMs = field.NewUint64(table, "start_time_ms")
|
||||
|
||||
s.fillFieldMap()
|
||||
|
||||
return s
|
||||
}
|
||||
|
||||
func (s *spansIndex) GetFieldByName(fieldName string) (field.OrderExpr, bool) {
|
||||
_f, ok := s.fieldMap[fieldName]
|
||||
if !ok || _f == nil {
|
||||
return nil, false
|
||||
}
|
||||
_oe, ok := _f.(field.OrderExpr)
|
||||
return _oe, ok
|
||||
}
|
||||
|
||||
func (s *spansIndex) fillFieldMap() {
|
||||
s.fieldMap = make(map[string]field.Expr, 16)
|
||||
s.fieldMap["span_id"] = s.SpanID
|
||||
s.fieldMap["trace_id"] = s.TraceID
|
||||
s.fieldMap["parent_span_id"] = s.ParentSpanID
|
||||
s.fieldMap["name"] = s.Name
|
||||
s.fieldMap["kind"] = s.Kind
|
||||
s.fieldMap["status_code"] = s.StatusCode
|
||||
s.fieldMap["status_msg"] = s.StatusMsg
|
||||
s.fieldMap["log_id"] = s.LogID
|
||||
s.fieldMap["space_id"] = s.SpaceID
|
||||
s.fieldMap["type"] = s.Type
|
||||
s.fieldMap["user_id"] = s.UserID
|
||||
s.fieldMap["entity_id"] = s.EntityID
|
||||
s.fieldMap["env"] = s.Env
|
||||
s.fieldMap["version"] = s.Version
|
||||
s.fieldMap["input"] = s.Input
|
||||
s.fieldMap["start_time_ms"] = s.StartTimeMs
|
||||
}
|
||||
|
||||
func (s spansIndex) clone(db *gorm.DB) spansIndex {
|
||||
s.spansIndexDo.ReplaceConnPool(db.Statement.ConnPool)
|
||||
return s
|
||||
}
|
||||
|
||||
func (s spansIndex) replaceDB(db *gorm.DB) spansIndex {
|
||||
s.spansIndexDo.ReplaceDB(db)
|
||||
return s
|
||||
}
|
||||
|
||||
type spansIndexDo struct{ gen.DO }
|
||||
|
||||
type ISpansIndexDo interface {
|
||||
gen.SubQuery
|
||||
Debug() ISpansIndexDo
|
||||
WithContext(ctx context.Context) ISpansIndexDo
|
||||
WithResult(fc func(tx gen.Dao)) gen.ResultInfo
|
||||
ReplaceDB(db *gorm.DB)
|
||||
ReadDB() ISpansIndexDo
|
||||
WriteDB() ISpansIndexDo
|
||||
As(alias string) gen.Dao
|
||||
Session(config *gorm.Session) ISpansIndexDo
|
||||
Columns(cols ...field.Expr) gen.Columns
|
||||
Clauses(conds ...clause.Expression) ISpansIndexDo
|
||||
Not(conds ...gen.Condition) ISpansIndexDo
|
||||
Or(conds ...gen.Condition) ISpansIndexDo
|
||||
Select(conds ...field.Expr) ISpansIndexDo
|
||||
Where(conds ...gen.Condition) ISpansIndexDo
|
||||
Order(conds ...field.Expr) ISpansIndexDo
|
||||
Distinct(cols ...field.Expr) ISpansIndexDo
|
||||
Omit(cols ...field.Expr) ISpansIndexDo
|
||||
Join(table schema.Tabler, on ...field.Expr) ISpansIndexDo
|
||||
LeftJoin(table schema.Tabler, on ...field.Expr) ISpansIndexDo
|
||||
RightJoin(table schema.Tabler, on ...field.Expr) ISpansIndexDo
|
||||
Group(cols ...field.Expr) ISpansIndexDo
|
||||
Having(conds ...gen.Condition) ISpansIndexDo
|
||||
Limit(limit int) ISpansIndexDo
|
||||
Offset(offset int) ISpansIndexDo
|
||||
Count() (count int64, err error)
|
||||
Scopes(funcs ...func(gen.Dao) gen.Dao) ISpansIndexDo
|
||||
Unscoped() ISpansIndexDo
|
||||
Create(values ...*model.SpansIndex) error
|
||||
CreateInBatches(values []*model.SpansIndex, batchSize int) error
|
||||
Save(values ...*model.SpansIndex) error
|
||||
First() (*model.SpansIndex, error)
|
||||
Take() (*model.SpansIndex, error)
|
||||
Last() (*model.SpansIndex, error)
|
||||
Find() ([]*model.SpansIndex, error)
|
||||
FindInBatch(batchSize int, fc func(tx gen.Dao, batch int) error) (results []*model.SpansIndex, err error)
|
||||
FindInBatches(result *[]*model.SpansIndex, batchSize int, fc func(tx gen.Dao, batch int) error) error
|
||||
Pluck(column field.Expr, dest interface{}) error
|
||||
Delete(...*model.SpansIndex) (info gen.ResultInfo, err error)
|
||||
Update(column field.Expr, value interface{}) (info gen.ResultInfo, err error)
|
||||
UpdateSimple(columns ...field.AssignExpr) (info gen.ResultInfo, err error)
|
||||
Updates(value interface{}) (info gen.ResultInfo, err error)
|
||||
UpdateColumn(column field.Expr, value interface{}) (info gen.ResultInfo, err error)
|
||||
UpdateColumnSimple(columns ...field.AssignExpr) (info gen.ResultInfo, err error)
|
||||
UpdateColumns(value interface{}) (info gen.ResultInfo, err error)
|
||||
UpdateFrom(q gen.SubQuery) gen.Dao
|
||||
Attrs(attrs ...field.AssignExpr) ISpansIndexDo
|
||||
Assign(attrs ...field.AssignExpr) ISpansIndexDo
|
||||
Joins(fields ...field.RelationField) ISpansIndexDo
|
||||
Preload(fields ...field.RelationField) ISpansIndexDo
|
||||
FirstOrInit() (*model.SpansIndex, error)
|
||||
FirstOrCreate() (*model.SpansIndex, error)
|
||||
FindByPage(offset int, limit int) (result []*model.SpansIndex, count int64, err error)
|
||||
ScanByPage(result interface{}, offset int, limit int) (count int64, err error)
|
||||
Scan(result interface{}) (err error)
|
||||
Returning(value interface{}, columns ...string) ISpansIndexDo
|
||||
UnderlyingDB() *gorm.DB
|
||||
schema.Tabler
|
||||
}
|
||||
|
||||
func (s spansIndexDo) Debug() ISpansIndexDo {
|
||||
return s.withDO(s.DO.Debug())
|
||||
}
|
||||
|
||||
func (s spansIndexDo) WithContext(ctx context.Context) ISpansIndexDo {
|
||||
return s.withDO(s.DO.WithContext(ctx))
|
||||
}
|
||||
|
||||
func (s spansIndexDo) ReadDB() ISpansIndexDo {
|
||||
return s.Clauses(dbresolver.Read)
|
||||
}
|
||||
|
||||
func (s spansIndexDo) WriteDB() ISpansIndexDo {
|
||||
return s.Clauses(dbresolver.Write)
|
||||
}
|
||||
|
||||
func (s spansIndexDo) Session(config *gorm.Session) ISpansIndexDo {
|
||||
return s.withDO(s.DO.Session(config))
|
||||
}
|
||||
|
||||
func (s spansIndexDo) Clauses(conds ...clause.Expression) ISpansIndexDo {
|
||||
return s.withDO(s.DO.Clauses(conds...))
|
||||
}
|
||||
|
||||
func (s spansIndexDo) Returning(value interface{}, columns ...string) ISpansIndexDo {
|
||||
return s.withDO(s.DO.Returning(value, columns...))
|
||||
}
|
||||
|
||||
func (s spansIndexDo) Not(conds ...gen.Condition) ISpansIndexDo {
|
||||
return s.withDO(s.DO.Not(conds...))
|
||||
}
|
||||
|
||||
func (s spansIndexDo) Or(conds ...gen.Condition) ISpansIndexDo {
|
||||
return s.withDO(s.DO.Or(conds...))
|
||||
}
|
||||
|
||||
func (s spansIndexDo) Select(conds ...field.Expr) ISpansIndexDo {
|
||||
return s.withDO(s.DO.Select(conds...))
|
||||
}
|
||||
|
||||
func (s spansIndexDo) Where(conds ...gen.Condition) ISpansIndexDo {
|
||||
return s.withDO(s.DO.Where(conds...))
|
||||
}
|
||||
|
||||
func (s spansIndexDo) Order(conds ...field.Expr) ISpansIndexDo {
|
||||
return s.withDO(s.DO.Order(conds...))
|
||||
}
|
||||
|
||||
func (s spansIndexDo) Distinct(cols ...field.Expr) ISpansIndexDo {
|
||||
return s.withDO(s.DO.Distinct(cols...))
|
||||
}
|
||||
|
||||
func (s spansIndexDo) Omit(cols ...field.Expr) ISpansIndexDo {
|
||||
return s.withDO(s.DO.Omit(cols...))
|
||||
}
|
||||
|
||||
func (s spansIndexDo) Join(table schema.Tabler, on ...field.Expr) ISpansIndexDo {
|
||||
return s.withDO(s.DO.Join(table, on...))
|
||||
}
|
||||
|
||||
func (s spansIndexDo) LeftJoin(table schema.Tabler, on ...field.Expr) ISpansIndexDo {
|
||||
return s.withDO(s.DO.LeftJoin(table, on...))
|
||||
}
|
||||
|
||||
func (s spansIndexDo) RightJoin(table schema.Tabler, on ...field.Expr) ISpansIndexDo {
|
||||
return s.withDO(s.DO.RightJoin(table, on...))
|
||||
}
|
||||
|
||||
func (s spansIndexDo) Group(cols ...field.Expr) ISpansIndexDo {
|
||||
return s.withDO(s.DO.Group(cols...))
|
||||
}
|
||||
|
||||
func (s spansIndexDo) Having(conds ...gen.Condition) ISpansIndexDo {
|
||||
return s.withDO(s.DO.Having(conds...))
|
||||
}
|
||||
|
||||
func (s spansIndexDo) Limit(limit int) ISpansIndexDo {
|
||||
return s.withDO(s.DO.Limit(limit))
|
||||
}
|
||||
|
||||
func (s spansIndexDo) Offset(offset int) ISpansIndexDo {
|
||||
return s.withDO(s.DO.Offset(offset))
|
||||
}
|
||||
|
||||
func (s spansIndexDo) Scopes(funcs ...func(gen.Dao) gen.Dao) ISpansIndexDo {
|
||||
return s.withDO(s.DO.Scopes(funcs...))
|
||||
}
|
||||
|
||||
func (s spansIndexDo) Unscoped() ISpansIndexDo {
|
||||
return s.withDO(s.DO.Unscoped())
|
||||
}
|
||||
|
||||
func (s spansIndexDo) Create(values ...*model.SpansIndex) error {
|
||||
if len(values) == 0 {
|
||||
return nil
|
||||
}
|
||||
return s.DO.Create(values)
|
||||
}
|
||||
|
||||
func (s spansIndexDo) CreateInBatches(values []*model.SpansIndex, batchSize int) error {
|
||||
return s.DO.CreateInBatches(values, batchSize)
|
||||
}
|
||||
|
||||
// Save : !!! underlying implementation is different with GORM
|
||||
// The method is equivalent to executing the statement: db.Clauses(clause.OnConflict{UpdateAll: true}).Create(values)
|
||||
func (s spansIndexDo) Save(values ...*model.SpansIndex) error {
|
||||
if len(values) == 0 {
|
||||
return nil
|
||||
}
|
||||
return s.DO.Save(values)
|
||||
}
|
||||
|
||||
func (s spansIndexDo) First() (*model.SpansIndex, error) {
|
||||
if result, err := s.DO.First(); err != nil {
|
||||
return nil, err
|
||||
} else {
|
||||
return result.(*model.SpansIndex), nil
|
||||
}
|
||||
}
|
||||
|
||||
func (s spansIndexDo) Take() (*model.SpansIndex, error) {
|
||||
if result, err := s.DO.Take(); err != nil {
|
||||
return nil, err
|
||||
} else {
|
||||
return result.(*model.SpansIndex), nil
|
||||
}
|
||||
}
|
||||
|
||||
func (s spansIndexDo) Last() (*model.SpansIndex, error) {
|
||||
if result, err := s.DO.Last(); err != nil {
|
||||
return nil, err
|
||||
} else {
|
||||
return result.(*model.SpansIndex), nil
|
||||
}
|
||||
}
|
||||
|
||||
func (s spansIndexDo) Find() ([]*model.SpansIndex, error) {
|
||||
result, err := s.DO.Find()
|
||||
return result.([]*model.SpansIndex), err
|
||||
}
|
||||
|
||||
func (s spansIndexDo) FindInBatch(batchSize int, fc func(tx gen.Dao, batch int) error) (results []*model.SpansIndex, err error) {
|
||||
buf := make([]*model.SpansIndex, 0, batchSize)
|
||||
err = s.DO.FindInBatches(&buf, batchSize, func(tx gen.Dao, batch int) error {
|
||||
defer func() { results = append(results, buf...) }()
|
||||
return fc(tx, batch)
|
||||
})
|
||||
return results, err
|
||||
}
|
||||
|
||||
func (s spansIndexDo) FindInBatches(result *[]*model.SpansIndex, batchSize int, fc func(tx gen.Dao, batch int) error) error {
|
||||
return s.DO.FindInBatches(result, batchSize, fc)
|
||||
}
|
||||
|
||||
func (s spansIndexDo) Attrs(attrs ...field.AssignExpr) ISpansIndexDo {
|
||||
return s.withDO(s.DO.Attrs(attrs...))
|
||||
}
|
||||
|
||||
func (s spansIndexDo) Assign(attrs ...field.AssignExpr) ISpansIndexDo {
|
||||
return s.withDO(s.DO.Assign(attrs...))
|
||||
}
|
||||
|
||||
func (s spansIndexDo) Joins(fields ...field.RelationField) ISpansIndexDo {
|
||||
for _, _f := range fields {
|
||||
s = *s.withDO(s.DO.Joins(_f))
|
||||
}
|
||||
return &s
|
||||
}
|
||||
|
||||
func (s spansIndexDo) Preload(fields ...field.RelationField) ISpansIndexDo {
|
||||
for _, _f := range fields {
|
||||
s = *s.withDO(s.DO.Preload(_f))
|
||||
}
|
||||
return &s
|
||||
}
|
||||
|
||||
func (s spansIndexDo) FirstOrInit() (*model.SpansIndex, error) {
|
||||
if result, err := s.DO.FirstOrInit(); err != nil {
|
||||
return nil, err
|
||||
} else {
|
||||
return result.(*model.SpansIndex), nil
|
||||
}
|
||||
}
|
||||
|
||||
func (s spansIndexDo) FirstOrCreate() (*model.SpansIndex, error) {
|
||||
if result, err := s.DO.FirstOrCreate(); err != nil {
|
||||
return nil, err
|
||||
} else {
|
||||
return result.(*model.SpansIndex), nil
|
||||
}
|
||||
}
|
||||
|
||||
func (s spansIndexDo) FindByPage(offset int, limit int) (result []*model.SpansIndex, count int64, err error) {
|
||||
result, err = s.Offset(offset).Limit(limit).Find()
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
if size := len(result); 0 < limit && 0 < size && size < limit {
|
||||
count = int64(size + offset)
|
||||
return
|
||||
}
|
||||
|
||||
count, err = s.Offset(-1).Limit(-1).Count()
|
||||
return
|
||||
}
|
||||
|
||||
func (s spansIndexDo) ScanByPage(result interface{}, offset int, limit int) (count int64, err error) {
|
||||
count, err = s.Count()
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
err = s.Offset(offset).Limit(limit).Scan(result)
|
||||
return
|
||||
}
|
||||
|
||||
func (s spansIndexDo) Scan(result interface{}) (err error) {
|
||||
return s.DO.Scan(result)
|
||||
}
|
||||
|
||||
func (s spansIndexDo) Delete(models ...*model.SpansIndex) (result gen.ResultInfo, err error) {
|
||||
return s.DO.Delete(models)
|
||||
}
|
||||
|
||||
func (s *spansIndexDo) withDO(do gen.Dao) *spansIndexDo {
|
||||
s.DO = *do.(*gen.DO)
|
||||
return s
|
||||
}
|
||||
139
backend/infra/impl/telemetry/clickhouse/query_client.go
Normal file
139
backend/infra/impl/telemetry/clickhouse/query_client.go
Normal file
@ -0,0 +1,139 @@
|
||||
package clickhouse
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"strconv"
|
||||
|
||||
"github.com/ClickHouse/clickhouse-go/v2"
|
||||
"go.opentelemetry.io/otel/codes"
|
||||
"go.opentelemetry.io/otel/sdk/trace"
|
||||
"gorm.io/gen"
|
||||
|
||||
"github.com/coze-dev/coze-studio/backend/infra/contract/telemetry"
|
||||
"github.com/coze-dev/coze-studio/backend/infra/impl/telemetry/clickhouse/internal/query"
|
||||
"github.com/coze-dev/coze-studio/backend/pkg/lang/ptr"
|
||||
"github.com/coze-dev/coze-studio/backend/pkg/lang/slices"
|
||||
)
|
||||
|
||||
const (
|
||||
emptySpanID = "0000000000000000"
|
||||
)
|
||||
|
||||
type QueryClientConfig struct {
|
||||
ClickhouseOptions *clickhouse.Options
|
||||
EmptySpanID *string
|
||||
}
|
||||
|
||||
func NewQueryClient(config *QueryClientConfig) (telemetry.QueryClient, error) {
|
||||
db, err := newClickhouseDB(config.ClickhouseOptions)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
||||
}
|
||||
|
||||
qc := &queryClient{
|
||||
query: query.Use(db),
|
||||
emptySpanID: emptySpanID,
|
||||
}
|
||||
|
||||
if config.EmptySpanID != nil {
|
||||
qc.emptySpanID = *config.EmptySpanID
|
||||
}
|
||||
|
||||
return qc, nil
|
||||
}
|
||||
|
||||
type queryClient struct {
|
||||
query *query.Query
|
||||
emptySpanID string
|
||||
}
|
||||
|
||||
func (q *queryClient) ListSpan(ctx context.Context, request *telemetry.ListTracesRequest) (
|
||||
spans []trace.ReadOnlySpan, nextCursor *string, hasMore bool, err error) {
|
||||
|
||||
if request.SpaceID == 0 || request.EntityID == 0 || (request.StartAt.IsZero() && request.EndAt.IsZero()) {
|
||||
return nil, nil, false, fmt.Errorf("[ListSpan] invalid request params")
|
||||
}
|
||||
|
||||
si := q.query.SpansIndex
|
||||
|
||||
conds := []gen.Condition{
|
||||
si.SpaceID.Eq(request.SpaceID),
|
||||
si.EntityID.Eq(request.EntityID),
|
||||
}
|
||||
if !request.StartAt.IsZero() {
|
||||
conds = append(conds, si.StartTimeMs.Gte(uint64(request.StartAt.UnixMilli())))
|
||||
}
|
||||
if !request.EndAt.IsZero() {
|
||||
conds = append(conds, si.StartTimeMs.Lte(uint64(request.EndAt.UnixMilli())))
|
||||
}
|
||||
if request.RootOnly {
|
||||
conds = append(conds, si.ParentSpanID.Eq(q.emptySpanID))
|
||||
}
|
||||
if request.Status != codes.Unset {
|
||||
conds = append(conds, si.StatusCode.Eq(int64(request.Status)))
|
||||
}
|
||||
if request.Cursor != nil {
|
||||
ms, err := strconv.ParseInt(*request.Cursor, 10, 64)
|
||||
if err != nil {
|
||||
return nil, nil, false, err
|
||||
}
|
||||
conds = append(conds, si.StartTimeMs.Lt(uint64(ms)))
|
||||
}
|
||||
|
||||
limit := request.Limit
|
||||
if limit == 0 {
|
||||
limit = 30
|
||||
}
|
||||
indexes, err := si.WithContext(ctx).Debug().
|
||||
Where(conds...).
|
||||
Order(si.StartTimeMs.Desc()).
|
||||
Limit(limit).
|
||||
Find()
|
||||
if err != nil {
|
||||
return nil, nil, false, err
|
||||
}
|
||||
|
||||
spans, err = slices.TransformWithErrorCheck(indexes, fromSpanIndexModel(q.emptySpanID))
|
||||
if err != nil {
|
||||
return nil, nil, false, err
|
||||
}
|
||||
|
||||
if len(indexes) == limit {
|
||||
hasMore = true
|
||||
ms := indexes[len(indexes)-1].StartTimeMs
|
||||
nextCursor = ptr.Of(strconv.FormatInt(int64(ms), 10))
|
||||
}
|
||||
|
||||
return spans, nextCursor, hasMore, nil
|
||||
}
|
||||
|
||||
func (q *queryClient) GetTrace(ctx context.Context, request *telemetry.GetTraceRequest) ([]trace.ReadOnlySpan, error) {
|
||||
|
||||
sd := q.query.SpansData
|
||||
|
||||
// TODO: 看下 space_id 和 entity_id 是否要加到 spans_data 表
|
||||
var conds []gen.Condition
|
||||
if request.TraceID == nil && request.LogID == nil {
|
||||
return nil, fmt.Errorf("[GetTrace] both traceID and logID is nil")
|
||||
}
|
||||
if request.TraceID != nil {
|
||||
conds = append(conds, sd.TraceID.Eq(request.TraceID.String()))
|
||||
}
|
||||
if request.LogID != nil {
|
||||
conds = append(conds, sd.LogID.Eq(*request.LogID))
|
||||
}
|
||||
|
||||
rows, err := sd.WithContext(ctx).Debug().Where(conds...).Limit(-1).Find()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
spans, err := slices.TransformWithErrorCheck(rows, fromSpanDataModel(q.emptySpanID))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return spans, nil
|
||||
}
|
||||
98
backend/infra/impl/telemetry/clickhouse/span.go
Normal file
98
backend/infra/impl/telemetry/clickhouse/span.go
Normal file
@ -0,0 +1,98 @@
|
||||
package clickhouse
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
"go.opentelemetry.io/otel/attribute"
|
||||
"go.opentelemetry.io/otel/sdk/instrumentation"
|
||||
"go.opentelemetry.io/otel/sdk/resource"
|
||||
tracesdk "go.opentelemetry.io/otel/sdk/trace"
|
||||
"go.opentelemetry.io/otel/trace"
|
||||
)
|
||||
|
||||
type ckSpan struct {
|
||||
tracesdk.ReadOnlySpan
|
||||
|
||||
name string
|
||||
spanContext trace.SpanContext
|
||||
parent trace.SpanContext
|
||||
kind trace.SpanKind
|
||||
|
||||
startTime time.Time
|
||||
endTime time.Time
|
||||
|
||||
resource *resource.Resource
|
||||
attributes []attribute.KeyValue
|
||||
status tracesdk.Status
|
||||
csCount int
|
||||
}
|
||||
|
||||
var _ tracesdk.ReadOnlySpan = ckSpan{}
|
||||
|
||||
func (c ckSpan) Name() string {
|
||||
return c.name
|
||||
}
|
||||
|
||||
func (c ckSpan) SpanContext() trace.SpanContext {
|
||||
return c.spanContext
|
||||
}
|
||||
|
||||
func (c ckSpan) Parent() trace.SpanContext {
|
||||
return c.parent
|
||||
}
|
||||
|
||||
func (c ckSpan) SpanKind() trace.SpanKind {
|
||||
return c.kind
|
||||
}
|
||||
|
||||
func (c ckSpan) StartTime() time.Time {
|
||||
return c.startTime
|
||||
}
|
||||
|
||||
func (c ckSpan) EndTime() time.Time {
|
||||
return c.endTime
|
||||
}
|
||||
|
||||
func (c ckSpan) Attributes() []attribute.KeyValue {
|
||||
return c.attributes
|
||||
}
|
||||
|
||||
func (c ckSpan) Links() []tracesdk.Link {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c ckSpan) Events() []tracesdk.Event {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c ckSpan) Status() tracesdk.Status {
|
||||
return c.status
|
||||
}
|
||||
|
||||
func (c ckSpan) InstrumentationScope() instrumentation.Scope {
|
||||
return instrumentation.Scope{}
|
||||
}
|
||||
|
||||
func (c ckSpan) InstrumentationLibrary() instrumentation.Library {
|
||||
return instrumentation.Library{}
|
||||
}
|
||||
|
||||
func (c ckSpan) Resource() *resource.Resource {
|
||||
return c.resource
|
||||
}
|
||||
|
||||
func (c ckSpan) DroppedAttributes() int {
|
||||
return 0
|
||||
}
|
||||
|
||||
func (c ckSpan) DroppedLinks() int {
|
||||
return 0
|
||||
}
|
||||
|
||||
func (c ckSpan) DroppedEvents() int {
|
||||
return 0
|
||||
}
|
||||
|
||||
func (c ckSpan) ChildSpanCount() int {
|
||||
return c.csCount
|
||||
}
|
||||
310
backend/infra/impl/telemetry/clickhouse/telemetry_test.go
Normal file
310
backend/infra/impl/telemetry/clickhouse/telemetry_test.go
Normal file
@ -0,0 +1,310 @@
|
||||
package clickhouse
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"os"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/ClickHouse/clickhouse-go/v2"
|
||||
"github.com/cloudwego/eino-ext/components/model/openai"
|
||||
"github.com/cloudwego/eino/callbacks"
|
||||
"github.com/cloudwego/eino/components"
|
||||
"github.com/cloudwego/eino/components/model"
|
||||
"github.com/cloudwego/eino/compose"
|
||||
"github.com/cloudwego/eino/schema"
|
||||
"github.com/google/uuid"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"go.opentelemetry.io/otel/attribute"
|
||||
"go.opentelemetry.io/otel/codes"
|
||||
"go.opentelemetry.io/otel/trace"
|
||||
|
||||
"github.com/coze-dev/coze-studio/backend/infra/contract/telemetry"
|
||||
"github.com/coze-dev/coze-studio/backend/pkg/lang/ptr"
|
||||
)
|
||||
|
||||
func TestExporter(t *testing.T) {
|
||||
if os.Getenv("ENABLE_CK_TELEMETRY_TEST") != "true" {
|
||||
return
|
||||
}
|
||||
username, pwd := os.Getenv("CK_USERNAME"), os.Getenv("CK_PASSWORD")
|
||||
baseURL, apiKey := os.Getenv("OPENAI_BASE_URL"), os.Getenv("OPENAI_API_KEY")
|
||||
|
||||
ctx := context.Background()
|
||||
|
||||
tp, err := NewTracerProvider(&TracerConfig{
|
||||
ClickhouseOptions: &clickhouse.Options{
|
||||
Addr: []string{"localhost:8124"},
|
||||
Auth: clickhouse.Auth{
|
||||
Database: "default",
|
||||
Username: username,
|
||||
Password: pwd,
|
||||
},
|
||||
Debug: true,
|
||||
Debugf: func(format string, v ...any) {
|
||||
fmt.Printf(format+"\n", v...)
|
||||
},
|
||||
Settings: clickhouse.Settings{
|
||||
"max_execution_time": 60,
|
||||
},
|
||||
Compression: &clickhouse.Compression{
|
||||
Method: clickhouse.CompressionZSTD,
|
||||
Level: 1,
|
||||
},
|
||||
DialTimeout: time.Second * 30,
|
||||
MaxOpenConns: 5,
|
||||
MaxIdleConns: 5,
|
||||
ConnMaxLifetime: time.Duration(10) * time.Minute,
|
||||
ConnOpenStrategy: clickhouse.ConnOpenInOrder,
|
||||
BlockBufferSize: 10,
|
||||
MaxCompressionBuffer: 10240,
|
||||
},
|
||||
TracerProviderOptions: nil,
|
||||
IndexRootOnly: false,
|
||||
})
|
||||
assert.NoError(t, err)
|
||||
|
||||
defer tp.Shutdown(ctx)
|
||||
|
||||
ch := &callbackHandler{tp.Tracer("test_tracer")}
|
||||
|
||||
cm, err := openai.NewChatModel(ctx, &openai.ChatModelConfig{
|
||||
APIKey: apiKey,
|
||||
ByAzure: true,
|
||||
BaseURL: baseURL,
|
||||
Model: "gpt-4o-2024-05-13",
|
||||
})
|
||||
assert.NoError(t, err)
|
||||
|
||||
g := compose.NewChain[string, string]().
|
||||
AppendLambda(compose.InvokableLambda(func(ctx context.Context, input string) (output []*schema.Message, err error) {
|
||||
return []*schema.Message{
|
||||
schema.UserMessage(input),
|
||||
}, nil
|
||||
})).
|
||||
AppendChatModel(cm).
|
||||
AppendLambda(compose.InvokableLambda(func(ctx context.Context, input *schema.Message) (output string, err error) {
|
||||
return input.Content, nil
|
||||
}))
|
||||
r, err := g.Compile(ctx)
|
||||
assert.NoError(t, err)
|
||||
|
||||
resp, err := r.Invoke(
|
||||
context.WithValue(ctx, "log-id", uuid.New().String()),
|
||||
"hello",
|
||||
compose.WithCallbacks(ch),
|
||||
)
|
||||
assert.NoError(t, err)
|
||||
fmt.Println(resp)
|
||||
}
|
||||
|
||||
func TestNewQueryClient(t *testing.T) {
|
||||
if os.Getenv("ENABLE_CK_TELEMETRY_TEST") != "true" {
|
||||
return
|
||||
}
|
||||
username, pwd := os.Getenv("CK_USERNAME"), os.Getenv("CK_PASSWORD")
|
||||
|
||||
ctx := context.Background()
|
||||
|
||||
qc, err := NewQueryClient(&QueryClientConfig{
|
||||
ClickhouseOptions: &clickhouse.Options{
|
||||
Addr: []string{"localhost:8124"},
|
||||
Auth: clickhouse.Auth{
|
||||
Database: "default",
|
||||
Username: username,
|
||||
Password: pwd,
|
||||
},
|
||||
Debug: true,
|
||||
Debugf: func(format string, v ...any) {
|
||||
fmt.Printf(format+"\n", v...)
|
||||
},
|
||||
Settings: clickhouse.Settings{
|
||||
"max_execution_time": 60,
|
||||
},
|
||||
Compression: &clickhouse.Compression{
|
||||
Method: clickhouse.CompressionZSTD,
|
||||
Level: 1,
|
||||
},
|
||||
DialTimeout: time.Second * 30,
|
||||
MaxOpenConns: 5,
|
||||
MaxIdleConns: 5,
|
||||
ConnMaxLifetime: time.Duration(10) * time.Minute,
|
||||
ConnOpenStrategy: clickhouse.ConnOpenInOrder,
|
||||
BlockBufferSize: 10,
|
||||
MaxCompressionBuffer: 10240,
|
||||
},
|
||||
EmptySpanID: nil,
|
||||
})
|
||||
assert.NoError(t, err)
|
||||
spans, nextCursor, hasMore, err := qc.ListSpan(ctx, &telemetry.ListTracesRequest{
|
||||
RootOnly: true,
|
||||
SpaceID: 7521695817242001408,
|
||||
EntityID: 5566,
|
||||
Status: 0,
|
||||
StartAt: time.Unix(1751461407, 0),
|
||||
EndAt: time.Unix(1753547807, 0),
|
||||
Limit: 10,
|
||||
Cursor: nil,
|
||||
})
|
||||
assert.NoError(t, err)
|
||||
assert.False(t, hasMore)
|
||||
fmt.Println(nextCursor)
|
||||
assert.True(t, len(spans) > 0)
|
||||
|
||||
first := spans[0]
|
||||
fullTrace, err := qc.GetTrace(ctx, &telemetry.GetTraceRequest{
|
||||
SpaceID: 0,
|
||||
EntityID: 0,
|
||||
TraceID: ptr.Of(first.SpanContext().TraceID()),
|
||||
})
|
||||
assert.NoError(t, err)
|
||||
assert.True(t, len(fullTrace) == 4)
|
||||
for _, item := range fullTrace {
|
||||
if item.Name() == "Chain" {
|
||||
assert.False(t, item.Parent().IsValid())
|
||||
} else {
|
||||
assert.True(t, item.Parent().IsValid())
|
||||
}
|
||||
}
|
||||
|
||||
var logID string
|
||||
for _, attr := range first.Attributes() {
|
||||
if attr.Key == telemetry.AttributeLogID {
|
||||
logID = attr.Value.AsString()
|
||||
}
|
||||
}
|
||||
assert.True(t, logID != "")
|
||||
fullTrace, err = qc.GetTrace(ctx, &telemetry.GetTraceRequest{
|
||||
SpaceID: 0,
|
||||
EntityID: 0,
|
||||
LogID: ptr.Of(logID),
|
||||
})
|
||||
assert.NoError(t, err)
|
||||
assert.True(t, len(fullTrace) == 4)
|
||||
for _, item := range fullTrace {
|
||||
if item.Name() == "Chain" {
|
||||
assert.False(t, item.Parent().IsValid())
|
||||
} else {
|
||||
assert.True(t, item.Parent().IsValid())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
type callbackHandler struct {
|
||||
tracer trace.Tracer
|
||||
}
|
||||
|
||||
func (c callbackHandler) OnStart(ctx context.Context, info *callbacks.RunInfo, input callbacks.CallbackInput) context.Context {
|
||||
name := info.Name
|
||||
if name == "" {
|
||||
name = fmt.Sprintf("%s%s", info.Component, info.Type)
|
||||
}
|
||||
ctx, span := c.tracer.Start(ctx, info.Name)
|
||||
span.SetName(name)
|
||||
|
||||
var (
|
||||
spanType telemetry.SpanType
|
||||
strInput string
|
||||
)
|
||||
|
||||
switch info.Component {
|
||||
case components.ComponentOfChatModel:
|
||||
spanType = telemetry.LLMCall
|
||||
i := model.ConvCallbackInput(input)
|
||||
b, _ := json.Marshal(i.Messages)
|
||||
strInput = string(b)
|
||||
case compose.ComponentOfGraph, compose.ComponentOfChain:
|
||||
spanType = telemetry.UserInput
|
||||
if i, ok := input.(string); ok {
|
||||
strInput = i
|
||||
} else {
|
||||
b, _ := json.Marshal(input)
|
||||
strInput = string(b)
|
||||
}
|
||||
default:
|
||||
spanType = telemetry.Unknown
|
||||
if i, ok := input.(string); ok {
|
||||
strInput = i
|
||||
} else {
|
||||
b, _ := json.Marshal(input)
|
||||
strInput = string(b)
|
||||
}
|
||||
}
|
||||
|
||||
attrs := []attribute.KeyValue{
|
||||
telemetry.NewSpanAttrLogID(ctx.Value("log-id").(string)),
|
||||
telemetry.NewSpanAttrSpaceID(int64(7521695817242001408)),
|
||||
telemetry.NewSpanAttrType(int64(spanType)),
|
||||
telemetry.NewSpanAttrUserID(int64(3344)),
|
||||
telemetry.NewSpanAttrEntityID(int64(5566)),
|
||||
telemetry.NewSpanAttrEnvironment("dev"),
|
||||
telemetry.NewSpanAttrVersion("1"),
|
||||
telemetry.NewSpanAttrInput(strInput),
|
||||
}
|
||||
span.SetAttributes(attrs...)
|
||||
|
||||
return ctx
|
||||
}
|
||||
|
||||
func (c callbackHandler) OnEnd(ctx context.Context, info *callbacks.RunInfo, output callbacks.CallbackOutput) context.Context {
|
||||
span := trace.SpanFromContext(ctx)
|
||||
|
||||
var (
|
||||
strOutput string
|
||||
attrs []attribute.KeyValue
|
||||
)
|
||||
|
||||
switch info.Component {
|
||||
case components.ComponentOfChatModel:
|
||||
o := model.ConvCallbackOutput(output)
|
||||
attrs = append(attrs)
|
||||
b, _ := json.Marshal(o.Message)
|
||||
strOutput = string(b)
|
||||
|
||||
if o.TokenUsage != nil {
|
||||
attrs = append(attrs,
|
||||
telemetry.NewSpanAttrInputTokens(int64(o.TokenUsage.PromptTokens)),
|
||||
telemetry.NewSpanAttrOutputTokens(int64(o.TokenUsage.CompletionTokens)),
|
||||
)
|
||||
}
|
||||
|
||||
if o.Config != nil {
|
||||
attrs = append(attrs, telemetry.NewSpanAttrModel(o.Config.Model))
|
||||
}
|
||||
|
||||
default:
|
||||
if i, ok := output.(string); ok {
|
||||
strOutput = i
|
||||
} else {
|
||||
b, _ := json.Marshal(output)
|
||||
strOutput = string(b)
|
||||
}
|
||||
}
|
||||
|
||||
attrs = append(attrs, telemetry.NewSpanAttrOutput(strOutput))
|
||||
|
||||
span.SetAttributes(attrs...)
|
||||
span.SetStatus(codes.Ok, "")
|
||||
span.End()
|
||||
|
||||
return ctx
|
||||
}
|
||||
|
||||
func (c callbackHandler) OnError(ctx context.Context, info *callbacks.RunInfo, err error) context.Context {
|
||||
span := trace.SpanFromContext(ctx)
|
||||
span.SetStatus(codes.Error, err.Error())
|
||||
span.End()
|
||||
return ctx
|
||||
}
|
||||
|
||||
func (c callbackHandler) OnStartWithStreamInput(ctx context.Context, info *callbacks.RunInfo, input *schema.StreamReader[callbacks.CallbackInput]) context.Context {
|
||||
input.Close()
|
||||
return ctx
|
||||
}
|
||||
|
||||
func (c callbackHandler) OnEndWithStreamOutput(ctx context.Context, info *callbacks.RunInfo, output *schema.StreamReader[callbacks.CallbackOutput]) context.Context {
|
||||
output.Close()
|
||||
return ctx
|
||||
}
|
||||
45
backend/infra/impl/telemetry/clickhouse/tracer_provider.go
Normal file
45
backend/infra/impl/telemetry/clickhouse/tracer_provider.go
Normal file
@ -0,0 +1,45 @@
|
||||
package clickhouse
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/ClickHouse/clickhouse-go/v2"
|
||||
"go.opentelemetry.io/otel/sdk/resource"
|
||||
"go.opentelemetry.io/otel/sdk/trace"
|
||||
|
||||
"github.com/coze-dev/coze-studio/backend/infra/contract/telemetry"
|
||||
"github.com/coze-dev/coze-studio/backend/infra/impl/telemetry/clickhouse/internal/query"
|
||||
)
|
||||
|
||||
type TracerConfig struct {
|
||||
ClickhouseOptions *clickhouse.Options
|
||||
TracerProviderOptions []trace.TracerProviderOption
|
||||
IndexRootOnly bool
|
||||
}
|
||||
|
||||
func NewTracerProvider(cfg *TracerConfig) (telemetry.TracerProvider, error) {
|
||||
db, err := newClickhouseDB(cfg.ClickhouseOptions)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
exp := &exporter{query: query.Use(db), indexRootOnly: cfg.IndexRootOnly}
|
||||
rcs, err := resource.New(
|
||||
context.Background(),
|
||||
resource.WithHost(),
|
||||
resource.WithFromEnv(),
|
||||
resource.WithProcessPID(),
|
||||
resource.WithTelemetrySDK())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
bsp := trace.NewBatchSpanProcessor(exp)
|
||||
tp := trace.NewTracerProvider(append([]trace.TracerProviderOption{
|
||||
trace.WithSpanProcessor(bsp),
|
||||
trace.WithResource(rcs),
|
||||
trace.WithSampler(trace.AlwaysSample()),
|
||||
}, cfg.TracerProviderOptions...)...)
|
||||
|
||||
return tp, nil
|
||||
}
|
||||
0
backend/infra/impl/telemetry/cozeloop/.gitkeep
Normal file
0
backend/infra/impl/telemetry/cozeloop/.gitkeep
Normal file
@ -32,6 +32,7 @@ import (
|
||||
|
||||
"github.com/cloudwego/hertz/pkg/app/server"
|
||||
"github.com/cloudwego/hertz/pkg/common/config"
|
||||
"github.com/cloudwego/hertz/pkg/route"
|
||||
"github.com/hertz-contrib/cors"
|
||||
"github.com/joho/godotenv"
|
||||
|
||||
@ -56,15 +57,16 @@ func main() {
|
||||
|
||||
setLogLevel()
|
||||
|
||||
if err := application.Init(ctx); err != nil {
|
||||
shutdown, err := application.Init(ctx)
|
||||
if err != nil {
|
||||
panic("InitializeInfra failed, err=" + err.Error())
|
||||
}
|
||||
|
||||
asyncStartMinioProxyServer(ctx)
|
||||
startHttpServer()
|
||||
startHttpServer(shutdown)
|
||||
}
|
||||
|
||||
func startHttpServer() {
|
||||
func startHttpServer(shutdown []route.CtxCallback) {
|
||||
maxRequestBodySize := os.Getenv("MAX_REQUEST_BODY_SIZE")
|
||||
maxSize := conv.StrToInt64D(maxRequestBodySize, 1024*1024*200)
|
||||
addr := getEnv("LISTEN_ADDR", ":8888")
|
||||
@ -105,7 +107,8 @@ func startHttpServer() {
|
||||
s.Use(middleware.OpenapiAuthMW())
|
||||
s.Use(middleware.SessionAuthMW())
|
||||
s.Use(middleware.I18nMW()) // must after SessionAuthMW
|
||||
|
||||
s.OnShutdown = append(s.OnShutdown, shutdown...)
|
||||
|
||||
router.GeneratedRegister(s)
|
||||
s.Spin()
|
||||
}
|
||||
|
||||
@ -86,6 +86,14 @@ const (
|
||||
UseSSL = "USE_SSL"
|
||||
SSLCertFile = "SSL_CERT_FILE"
|
||||
SSLKeyFile = "SSL_KEY_FILE"
|
||||
|
||||
TelemetryType = "TELEMETRY_TYPE" // clickhouse / cozeloop
|
||||
TelemetryIndexRootOnly = "TELEMETRY_INDEX_ROOT_ONLY"
|
||||
ClickhouseAddr = "CLICKHOUSE_ADDR"
|
||||
ClickhouseDBName = "CLICKHOUSE_DB_NAME"
|
||||
ClickhouseUserName = "CLICKHOUSE_USERNAME"
|
||||
ClickhousePassword = "CLICKHOUSE_PASSWORD"
|
||||
ClickhouseEmptySpanID = "CLICKHOUSE_EMPTY_SPAN_ID"
|
||||
)
|
||||
|
||||
const (
|
||||
|
||||
@ -505,6 +505,30 @@ services:
|
||||
- coze-network
|
||||
restart: 'no'
|
||||
|
||||
clickhouse:
|
||||
image: bitnami/clickhouse:25
|
||||
container_name: coze-clickhouse
|
||||
environment:
|
||||
- ALLOW_EMPTY_PASSWORD=no
|
||||
- CLICKHOUSE_ADMIN_USER=${CLICKHOUSE_ADMIN_USER:-default}
|
||||
- CLICKHOUSE_ADMIN_PASSWORD=${CLICKHOUSE_ADMIN_PASSWORD:-clickhouse123}
|
||||
ports:
|
||||
- '8123:8123'
|
||||
- '8124:9000' # CLICKHOUSE_TCP_PORT
|
||||
- '9004:9004'
|
||||
- '9005:9005'
|
||||
- '9009:9009'
|
||||
volumes:
|
||||
- ./data/bitnami/clickhouse:/bitnami/clickhouse:rw,Z
|
||||
healthcheck:
|
||||
test: wget --no-verbose --tries=1 --spider http://localhost:8123/ping || exit 1
|
||||
interval: 5s
|
||||
timeout: 10s
|
||||
retries: 10
|
||||
start_period: 10s
|
||||
networks:
|
||||
- coze-network
|
||||
|
||||
coze-server:
|
||||
build:
|
||||
context: ../
|
||||
|
||||
@ -341,6 +341,30 @@ services:
|
||||
networks:
|
||||
- coze-network
|
||||
|
||||
clickhouse:
|
||||
image: bitnami/clickhouse:25
|
||||
container_name: coze-clickhouse
|
||||
environment:
|
||||
- ALLOW_EMPTY_PASSWORD=no
|
||||
- CLICKHOUSE_ADMIN_USER=${CLICKHOUSE_ADMIN_USER:-default}
|
||||
- CLICKHOUSE_ADMIN_PASSWORD=${CLICKHOUSE_ADMIN_PASSWORD:-clickhouse123}
|
||||
ports:
|
||||
- '8123:8123'
|
||||
- '8124:9000' # CLICKHOUSE_TCP_PORT
|
||||
- '9004:9004'
|
||||
- '9005:9005'
|
||||
- '9009:9009'
|
||||
volumes:
|
||||
- ./data/bitnami/clickhouse:/bitnami/clickhouse:rw,Z
|
||||
healthcheck:
|
||||
test: wget --no-verbose --tries=1 --spider http://localhost:8123/ping || exit 1
|
||||
interval: 5s
|
||||
timeout: 10s
|
||||
retries: 10
|
||||
start_period: 10s
|
||||
networks:
|
||||
- coze-network
|
||||
|
||||
coze-server:
|
||||
# build:
|
||||
# context: ../
|
||||
|
||||
52
docker/volumes/clickhouse/schema.sql
Normal file
52
docker/volumes/clickhouse/schema.sql
Normal file
@ -0,0 +1,52 @@
|
||||
CREATE DATABASE IF NOT EXISTS `default`;
|
||||
CREATE TABLE IF NOT EXISTS spans_index (
|
||||
span_id String Codec(ZSTD(1)), -- span id: [8]byte
|
||||
trace_id String Codec(ZSTD(1)), -- trace id: [16]byte
|
||||
parent_span_id String Codec(ZSTD(1)), -- parent span id: [8]byte
|
||||
name String Codec(ZSTD(1)), -- name
|
||||
kind Int8 Codec(ZSTD(1)), -- kind: https://github.com/open-telemetry/opentelemetry-proto/blob/30d237e1ff3ab7aa50e0922b5bebdd93505090af/opentelemetry/proto/trace/v1/trace.proto#L101-L129
|
||||
status_code Int64 Codec(ZSTD(1)), -- 状态码
|
||||
status_msg String Codec(ZSTD(1)), -- 状态信息
|
||||
log_id String Codec(ZSTD(1)), -- log_id
|
||||
space_id Int64 Codec(ZSTD(1)), -- space_id
|
||||
type Int32 Codec(ZSTD(1)), -- span 节点类型,和 idl enum 对应
|
||||
user_id Int64 CODEC(ZSTD(1)), -- user id
|
||||
entity_id Int64 Codec(ZSTD(1)), -- 写入实体 id,对应 agent / knowledge id
|
||||
env String Codec(ZSTD(1)), -- 环境,开发 / 线上
|
||||
version String Codec(ZSTD(1)), -- 版本
|
||||
input String Codec(ZSTD(1)), -- 输入,需要展示和全文搜索过滤,单独提取出来
|
||||
|
||||
start_time_ms UInt64 CODEC(ZSTD(1)), -- 开始时间,单位毫秒
|
||||
INDEX idx(trace_id) TYPE minmax granularity 8192
|
||||
)
|
||||
ENGINE = MergeTree()
|
||||
ORDER BY (space_id, entity_id, start_time_ms)
|
||||
PARTITION BY toDate(start_time_ms / 1000)
|
||||
TTL toDate(start_time_ms / 1000) + INTERVAL 7 DAY
|
||||
SETTINGS ttl_only_drop_parts = 1,
|
||||
storage_policy = 'default';
|
||||
CREATE TABLE IF NOT EXISTS spans_data (
|
||||
span_id String Codec(ZSTD(1)), -- span id: [8]byte
|
||||
trace_id String Codec(ZSTD(1)), -- trace id: [16]byte
|
||||
parent_span_id String Codec(ZSTD(1)), -- parent span id: [8]byte
|
||||
name String Codec(ZSTD(1)), -- name
|
||||
kind Int8 Codec(ZSTD(1)), -- kind: https://github.com/open-telemetry/opentelemetry-proto/blob/30d237e1ff3ab7aa50e0922b5bebdd93505090af/opentelemetry/proto/trace/v1/trace.proto#L101-L129
|
||||
status_code Int64 Codec(ZSTD(1)), -- 状态码
|
||||
status_msg String Codec(ZSTD(1)), -- 状态信息
|
||||
resource_attributes Map(String, String) CODEC(ZSTD(1)), -- 元信息
|
||||
start_time_ms UInt64 CODEC(ZSTD(1)), -- 开始时间,单位毫秒
|
||||
end_time_ms UInt64 CODEC(ZSTD(1)), -- 结束时间,单位毫秒
|
||||
log_id String Codec(ZSTD(1)), -- log_id
|
||||
|
||||
attr_keys Array(LowCardinality(String)) Codec(ZSTD(1)), -- 其他 attr keys
|
||||
attr_values Array(String) Codec(ZSTD(1)), -- 其他 attr values
|
||||
INDEX idx_log_id log_id TYPE bloom_filter GRANULARITY 1024
|
||||
)
|
||||
|
||||
ENGINE = MergeTree()
|
||||
ORDER BY (trace_id, span_id)
|
||||
PARTITION BY toDate(start_time_ms / 1000)
|
||||
TTL toDate(start_time_ms / 1000) + INTERVAL 7 DAY
|
||||
SETTINGS ttl_only_drop_parts = 1,
|
||||
index_granularity = 2048,
|
||||
storage_policy = 'default';
|
||||
Reference in New Issue
Block a user