Kafka Message Debugger

One day engineer came complaining that his consumer suddenly started to restart every few minutes

At the very end we have found that somehow health check from dotnet started to write plain text ping messages into topic

And consumer was not ready for that

Here is sample app that may be used to dramatically decrease research time of suche scenarious:

package main

import (
	"context"
	"crypto/tls"
	"encoding/binary"
	"fmt"
	"os"
	"strconv"
	"time"

	"github.com/confluentinc/confluent-kafka-go/schemaregistry"
	"github.com/linkedin/goavro"
	kafka "github.com/segmentio/kafka-go"
	"github.com/segmentio/kafka-go/sasl/plain"
)

func main() {
	topic := os.Getenv("TOPIC")
	offset, err := strconv.ParseInt(os.Getenv("OFFSET"), 10, 64)
  if err != nil {
    panic(err)
  }

	client, err := schemaregistry.NewClient(&schemaregistry.Config{
		BasicAuthCredentialsSource: "USER_INFO",
		SchemaRegistryURL:          os.Getenv("SCHEMA_URL"),
		BasicAuthUserInfo:          os.Getenv("SCHEMA_CREDS"),
	})
	if err != nil {
		panic(err)
	}

	dialer := &kafka.Dialer{
		Timeout:   10 * time.Second,
		DualStack: true,
		TLS:       &tls.Config{},
		SASLMechanism: plain.Mechanism{
			Username: os.Getenv("BROKER_USERNAME"),
			Password: os.Getenv("BROKER_PASSWORD"),
		},
	}

	reader := kafka.NewReader(kafka.ReaderConfig{
		Brokers:   []string{os.Getenv("BROKER")},
		Topic:     topic,
		Partition: 0,
		Dialer:    dialer,
	})
	defer reader.Close()
	reader.SetOffset(int64(offset))

	m, err := reader.ReadMessage(context.Background())
	if err != nil {
		fmt.Println(err)
	}

	fmt.Printf("Topic:     %s\n", m.Topic)
	fmt.Printf("Time:      %s\n", m.Time)
	fmt.Println()
	fmt.Printf("Partition: %d\n", m.Partition)
	fmt.Printf("Offset:    %d\n", m.Offset)
	fmt.Println()
	fmt.Printf("Bytes:     %v\n", m.Value)
	fmt.Printf("String:    %s\n", m.Value)
	fmt.Printf("Magic:     %v\n", m.Value[0] == 0)
	if m.Value[0] != 0 {
		return
	}
	fmt.Printf("Bytes:     %v\n", m.Value[1:5])
	fmt.Printf("Schema ID: %d\n", binary.BigEndian.Uint32(m.Value[1:]))

	schema, err := client.GetBySubjectAndID(fmt.Sprintf("%s-value", topic), int(binary.BigEndian.Uint32(m.Value[1:])))
	if err != nil {
		panic(err)
	}
	fmt.Printf("\nSchema:\n%s\n\n", schema.Schema)

	codec, err := goavro.NewCodec(schema.Schema)
	if err != nil {
		panic(err)
	}
	native, _, err := codec.NativeFromBinary(m.Value[5:])
	if err != nil {
		panic(err)
	}
	fmt.Printf("Deserialized:\n%v\n", native)
}

It will connect to desired topic in kafka, read concrete message and try to deserialize it and print all details

env $(cat .env | xargs) go run main.go

Output will be something like:

Topic:     VacancyView
Time:      2022-10-20 10:11:16.704 +0000 UTC

Partition: 0
Offset:    2186

Bytes:     [0 0 1 134 252 132 252 233 8 162 226 138 13 42 49 51 52 48 55 49 55 51 56 49 46 49 54 53 54 55 48 53 55 54 50]
*1340717381.1656705762
Magic:     true
Bytes:     [0 1 134 252]
Schema ID: 100092

Schema:
{"type":"record","name":"VacancyView","namespace":"KafkaModels","fields":[{"name":"VacancyId","type":"int","doc":"Vacancy identifier, sql: dbo.Vacancy.id, es: vaclist._id"},{"name":"UserId","type":"int","doc":"User identifier, dbo.EmailSource.id"},{"name":"ClientId","type":"string","doc":"Client identifier, '_ga' cookie being set by Google Analytics"}]}

Deserialized:
map[ClientId:1340717381.1656705762 UserId:13719697 VacancyId:9256706]

And if there wont be magic byte it will try to do its best to decode from json or plain text