Kafka Message Debugger
One day engineer came complaining that his consumer suddenly started to restart every few minutes
At the very end we have found that somehow health check from dotnet started to write plain text ping messages into topic
And consumer was not ready for that
Here is sample app that may be used to dramatically decrease research time of suche scenarious:
package main
import (
"context"
"crypto/tls"
"encoding/binary"
"fmt"
"os"
"strconv"
"time"
"github.com/confluentinc/confluent-kafka-go/schemaregistry"
"github.com/linkedin/goavro"
kafka "github.com/segmentio/kafka-go"
"github.com/segmentio/kafka-go/sasl/plain"
)
func main() {
topic := os.Getenv("TOPIC")
offset, err := strconv.ParseInt(os.Getenv("OFFSET"), 10, 64)
if err != nil {
panic(err)
}
client, err := schemaregistry.NewClient(&schemaregistry.Config{
BasicAuthCredentialsSource: "USER_INFO",
SchemaRegistryURL: os.Getenv("SCHEMA_URL"),
BasicAuthUserInfo: os.Getenv("SCHEMA_CREDS"),
})
if err != nil {
panic(err)
}
dialer := &kafka.Dialer{
Timeout: 10 * time.Second,
DualStack: true,
TLS: &tls.Config{},
SASLMechanism: plain.Mechanism{
Username: os.Getenv("BROKER_USERNAME"),
Password: os.Getenv("BROKER_PASSWORD"),
},
}
reader := kafka.NewReader(kafka.ReaderConfig{
Brokers: []string{os.Getenv("BROKER")},
Topic: topic,
Partition: 0,
Dialer: dialer,
})
defer reader.Close()
reader.SetOffset(int64(offset))
m, err := reader.ReadMessage(context.Background())
if err != nil {
fmt.Println(err)
}
fmt.Printf("Topic: %s\n", m.Topic)
fmt.Printf("Time: %s\n", m.Time)
fmt.Println()
fmt.Printf("Partition: %d\n", m.Partition)
fmt.Printf("Offset: %d\n", m.Offset)
fmt.Println()
fmt.Printf("Bytes: %v\n", m.Value)
fmt.Printf("String: %s\n", m.Value)
fmt.Printf("Magic: %v\n", m.Value[0] == 0)
if m.Value[0] != 0 {
return
}
fmt.Printf("Bytes: %v\n", m.Value[1:5])
fmt.Printf("Schema ID: %d\n", binary.BigEndian.Uint32(m.Value[1:]))
schema, err := client.GetBySubjectAndID(fmt.Sprintf("%s-value", topic), int(binary.BigEndian.Uint32(m.Value[1:])))
if err != nil {
panic(err)
}
fmt.Printf("\nSchema:\n%s\n\n", schema.Schema)
codec, err := goavro.NewCodec(schema.Schema)
if err != nil {
panic(err)
}
native, _, err := codec.NativeFromBinary(m.Value[5:])
if err != nil {
panic(err)
}
fmt.Printf("Deserialized:\n%v\n", native)
}
It will connect to desired topic in kafka, read concrete message and try to deserialize it and print all details
env $(cat .env | xargs) go run main.go
Output will be something like:
Topic: VacancyView
Time: 2022-10-20 10:11:16.704 +0000 UTC
Partition: 0
Offset: 2186
Bytes: [0 0 1 134 252 132 252 233 8 162 226 138 13 42 49 51 52 48 55 49 55 51 56 49 46 49 54 53 54 55 48 53 55 54 50]
*1340717381.1656705762
Magic: true
Bytes: [0 1 134 252]
Schema ID: 100092
Schema:
{"type":"record","name":"VacancyView","namespace":"KafkaModels","fields":[{"name":"VacancyId","type":"int","doc":"Vacancy identifier, sql: dbo.Vacancy.id, es: vaclist._id"},{"name":"UserId","type":"int","doc":"User identifier, dbo.EmailSource.id"},{"name":"ClientId","type":"string","doc":"Client identifier, '_ga' cookie being set by Google Analytics"}]}
Deserialized:
map[ClientId:1340717381.1656705762 UserId:13719697 VacancyId:9256706]
And if there wont be magic byte it will try to do its best to decode from json or plain text