lago/events_processor/config/kafka.go
2025-02-20 17:30:50 +01:00

126 lines
2.8 KiB
Go

package config
import (
"context"
"log/slog"
"os"
"time"
"github.com/twmb/franz-go/pkg/kgo"
"github.com/twmb/franz-go/pkg/sasl/scram"
"github.com/twmb/franz-go/plugin/kotel"
"github.com/twmb/franz-go/plugin/kslog"
"go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc"
"go.opentelemetry.io/otel/propagation"
"go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/trace"
)
const (
Scram256 string = "SCRAM-SHA-256"
Scram512 string = "SCRAM-SHA-512"
)
func NewKafkaClient(config []kgo.Opt) (*kgo.Client, error) {
scramAlgorithm := os.Getenv("KAFKA_SCRAM_ALGORITHM")
tls := os.Getenv("KAFKA_TLS") == "true"
logger := slog.Default()
logger = logger.With("component", "kafka")
opts := []kgo.Opt{
kgo.SeedBrokers(os.Getenv("KAFKA_BROKER")),
kgo.WithLogger(kslog.New(logger)),
}
if len(config) > 0 {
opts = append(opts, config...)
}
if os.Getenv("ENV") == "production" {
meterProvider, err := initMeterProvider(context.Background())
if err != nil {
panic(err)
}
meterOpts := []kotel.MeterOpt{kotel.MeterProvider(meterProvider)}
meter := kotel.NewMeter(meterOpts...)
tracerProvider, err := initTracerProvider(context.Background())
if err != nil {
panic(err)
}
tracerOpts := []kotel.TracerOpt{
kotel.TracerProvider(tracerProvider),
kotel.TracerPropagator(propagation.NewCompositeTextMapPropagator(propagation.TraceContext{})),
}
tracer := kotel.NewTracer(tracerOpts...)
kotelOps := []kotel.Opt{
kotel.WithTracer(tracer),
kotel.WithMeter(meter),
}
kotelService := kotel.NewKotel(kotelOps...)
kotelOpt := kgo.WithHooks(kotelService.Hooks()...)
opts = append(opts, kotelOpt)
}
if scramAlgorithm != "" {
var scramOpt kgo.Opt
scramAuth := scram.Auth{
User: os.Getenv("KAFKA_USERNAME"),
Pass: os.Getenv("KAFKA_PASSWORD"),
}
switch scramAlgorithm {
case Scram256:
scramOpt = kgo.SASL(scramAuth.AsSha256Mechanism())
case Scram512:
scramOpt = kgo.SASL(scramAuth.AsSha512Mechanism())
}
opts = append(opts, scramOpt)
}
if tls {
tlsOpt := kgo.DialTLS()
opts = append(opts, tlsOpt)
}
client, err := kgo.NewClient(opts...)
if err != nil {
return nil, err
}
return client, nil
}
func initTracerProvider(ctx context.Context) (*trace.TracerProvider, error) {
traceExporter, err := otlptracegrpc.New(ctx)
if err != nil {
return nil, err
}
tracerProvider := trace.NewTracerProvider(
trace.WithBatcher(traceExporter),
)
return tracerProvider, nil
}
func initMeterProvider(ctx context.Context) (*metric.MeterProvider, error) {
metricExporter, err := otlpmetricgrpc.New(ctx)
if err != nil {
return nil, err
}
meterProvider := metric.NewMeterProvider(
metric.WithReader(metric.NewPeriodicReader(metricExporter,
metric.WithInterval(60*time.Second))),
)
return meterProvider, nil
}