kafka command line tools

First of all we gonna need kafka itself, e.g.:

docker run -it --rm --name=kafka -e SAMPLEDATA=0 -e RUNNING_SAMPLEDATA=0 -e RUNTESTS=0 -e FORWARDLOGS=0 -e ADV_HOST=127.0.0.1 -p 2181:2181 -p 3030:3030 -p 8081-8083:8081-8083 -p 9092:9092 -p 9581-9585:9581-9585 lensesio/fast-data-dev:2.3.0

or

wget https://raw.githubusercontent.com/confluentinc/examples/5.3.1-post/cp-all-in-one/docker-compose.yml
docker-compose up -d

or

run everything by hands on your own like described in quickstart

or

get confluent.cloud

Kafka topics

The most basic and needed

List topics

kafka-topics --bootstrap-server localhost:9092 --list

Create topic

kafka-topics --bootstrap-server localhost:9092 --create --topic demo2 --partitions 3 --replication-factor 1

Delete topic

kafka-topics --bootstrap-server localhost:9092 --delete --topic demo2

Console producer and consumers

Here are examples for following use cases:

  • simple without key
  • simple with string key
  • simple with integer key
  • json without key
  • json with string key
  • json with ingeteger key
  • json with json key
  • avro without key
  • avro with string key
  • avro with integer key
  • avro with avro key

By deafult in all following examples messages delimited by new line, e.g. start producer, type something, press enter.

All follogin examples are run agains

docker run -it --rm --name=kafka -e SAMPLEDATA=0 -e RUNNING_SAMPLEDATA=0 -e RUNTESTS=0 -e FORWARDLOGS=0 -e ADV_HOST=127.0.0.1 -p 2181:2181 -p 3030:3030 -p 8081-8083:8081-8083 -p 9092:9092 -p 9581-9585:9581-9585 lensesio/fast-data-dev:2.3.0

String producer and consumer

Simple without key

Create topic

docker exec -it kafka kafka-topics --bootstrap-server localhost:9092 --create --topic SimpleWithoutKey --partitions 3 --replication-factor 1

Start kafka-console-producer which will produce simple string messages

docker exec -it kafka kafka-console-producer --broker-list localhost:9092 --topic SimpleWithoutKey

Start kafka-console-consumer to consume simple string messages

docker exec -it kafka kafka-console-consumer --bootstrap-server localhost:9092 --topic SimpleWithoutKey --from-beginning

Produce simple messages like:

hello
world

And you should see them in consumer as:

hello
world

Simple with string key

Create topic

docker exec -it kafka kafka-topics --bootstrap-server localhost:9092 --create --topic SimpleWithStringKey --partitions 3 --replication-factor 1

Start kafka-console-producer which will produce simple string messages with string key

docker exec -it kafka kafka-console-producer --broker-list localhost:9092 --topic SimpleWithStringKey --property parse.key=true --property key.separator=:

Notes:

  • --property parse.key=true our consumer will expect us to enter key along side value
  • --property key.separator=: is optional and by default is space

Start kafka-console-consumer to consume simple string messages with string key

docker exec -it kafka kafka-console-consumer --bootstrap-server localhost:9092 --topic SimpleWithStringKey --property print.key=true --from-beginning

Notes:

  • --property print.key=true will print key

Produce messages like:

1:one
2:two

And you should see them in consumer as:

1	one
2	two

If you try to produce message without key you should see an error:

>message without key
org.apache.kafka.common.KafkaException: No key found on line 3: acme
	at kafka.tools.ConsoleProducer$LineMessageReader.readMessage(ConsoleProducer.scala:265)
	at kafka.tools.ConsoleProducer$.main(ConsoleProducer.scala:54)
	at kafka.tools.ConsoleProducer.main(ConsoleProducer.scala)

Simple with integer key

Not fully possible at moment, here are some links:

The problem is that no matter what you will pass to console producer it still will send bytes

Create topic

docker exec -it kafka kafka-topics --bootstrap-server localhost:9092 --create --topic SimpleWithIntKey --partitions 3 --replication-factor 1

Start kafka-console-producer which will produce simple string messages with integer key

docker exec -it kafka kafka-console-producer --broker-list localhost:9092 --topic SimpleWithIntKey --property parse.key=true --property key.serializer=org.apache.kafka.common.serialization.IntegerDeserializer --property value.serializer=org.apache.kafka.common.serialization.StringDeserializer --property key.separator=:

Notes:

  • --property key.serializer=org.apache.kafka.common.serialization.IntegerDeserializer defines which deserializer should be used for key
  • --property value.serializer=org.apache.kafka.common.serialization.StringDeserializer defines which deserializer should be user for value
  • both not being applied

Start kafka-console-consumer to consume simple string messages with integer key

docker exec -it kafka kafka-console-consumer --bootstrap-server localhost:9092 --topic SimpleWithIntKey --property print.key=true --from-beginning --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer --property value.deserializer=org.apache.kafka.common.serialization.StringDeserializer --skip-message-on-error

Notes:

  • key.deserializer=org.apache.kafka.common.serialization.StringDeserializer we are forced to use string instead of integer deserializer here, otherwise will receive an error ERROR Error processing message, skipping this message: (kafka.tools.ConsoleConsumer$) org.apache.kafka.common.errors.SerializationException: Size of data received by IntegerDeserializer is not 4
  • --skip-message-on-error do not crash on bad message, just skip it

Produce messages like:

1:one
2:two

And you should see them in consumer as:

1	one
2	two

Json producer and consumer

Json without key

Create topic

docker exec -it kafka kafka-topics --bootstrap-server localhost:9092 --create --topic JsonWithoutKey --partitions 3 --replication-factor 1

Start kafka-console-producer which will produce json messages

docker exec -it kafka kafka-console-producer --broker-list localhost:9092 --topic JsonWithoutKey

Note that like in previous example with integer key, kafka-console-producer does not respect given serializers so we will just put string which looks like json but still sent as bytes

Start kafka-console-consumer to consume json messages

docker exec -it kafka kafka-console-consumer --bootstrap-server localhost:9092 --topic JsonWithoutKey --property value.deserializer=org.apache.kafka.connect.json.JsonDeserializer --skip-message-on-error --from-beginning --property print.timestamp=true

Produce json messages like:

{"foo": "bar"}
{"acme": 42}

And you should see them like:

CreateTime:1578081298745        {"foo":"bar"}
CreateTime:1578081304001        {"acme":42}

There is not checks in producer but if you send something wrong you will see an error in consumer

CreateTime:1578081353956        [2020-01-03 19:55:54,970] ERROR Error processing message, skipping this message:  (kafka.tools.ConsoleConsumer$)
org.apache.kafka.common.errors.SerializationException: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'foo': was expecting 'null', 'true', 'false' or NaN
 at [Source: (byte[])"foo"; line: 1, column: 7]
Caused by: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'foo': was expecting 'null', 'true', 'false' or NaN
 at [Source: (byte[])"foo"; line: 1, column: 7]

Json with string key

Create topic

docker exec -it kafka kafka-topics --bootstrap-server localhost:9092 --create --topic JsonWithStringKey --partitions 3 --replication-factor 1

Start kafka-console-producer which will produce json messages with string key

docker exec -it kafka kafka-console-producer --broker-list localhost:9092 --topic JsonWithStringKey --property parse.key=true --property key.separator=:

Start kafka-console-consumer to consume json messages with string key

docker exec -it kafka kafka-console-consumer --bootstrap-server localhost:9092 --topic JsonWithStringKey --property print.key=true --from-beginning --property value.deserializer=org.apache.kafka.connect.json.JsonDeserializer

Produce messages:

1:{"foo":"bar"}
2:{"acme":42}

And you should see:

1       {"foo":"bar"}
2       {"acme":42}

Note that there is the same problem with keys as in previous examples, and you can not force integer key.

Json with json key

Create topic

docker exec -it kafka kafka-topics --bootstrap-server localhost:9092 --create --topic JsonWithJsonKey --partitions 3 --replication-factor 1

Start kafka-console-producer which will produce json messages with json keys

docker exec -it kafka kafka-console-producer --broker-list localhost:9092 --topic JsonWithJsonKey --property parse.key=true --property key.separator="|"

Start kafka-console-consumer to consume json messages

docker exec -it kafka kafka-console-consumer --bootstrap-server localhost:9092 --topic JsonWithJsonKey  --property value.deserializer=org.apache.kafka.connect.json.JsonDeserializer --property key.deserializer=org.apache.kafka.connect.json.JsonDeserializer --skip-message-on-error --from-beginning --property print.key=true

Produce messages:

{"id":1}|{"foo":"bar"}
{"id":2}|{"acme":42}

And you should see:

{"id":1}        {"foo":"bar"}
{"id":2}        {"acme":42}

If you will produce bad key or value you will get:

ERROR Error processing message, skipping this message:  (kafka.tools.ConsoleConsumer$)
org.apache.kafka.common.errors.SerializationException: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'foo': was expecting 'null', 'true', 'false' or NaN
 at [Source: (byte[])"foo"; line: 1, column: 7]
Caused by: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'foo': was expecting 'null', 'true', 'false' or NaN
 at [Source: (byte[])"foo"; line: 1, column: 7]

Avro producer and consumer

Avro without key

Create topic

docker exec -it kafka kafka-topics --bootstrap-server localhost:9092 --create --topic AvroWithoutKey --partitions 3 --replication-factor 1

Start kafka-avro-console-producer to produce avro messages

docker exec -it kafka kafka-avro-console-producer --broker-list localhost:9092 --topic AvroWithoutKey --property value.schema='{"type":"record","name":"AvroWithoutKey","fields":[{"name":"foo","type":"string"}]}'

Note that from now on we are using kafka-avro-console-producer instead of kafka-console-producer which has few additional properties like --property value.schema='{"type":"record","name":"AvroWithoutKey","fields":[{"name":"foo","type":"string"}]}' messages published via this consumer will be validated against given schema. Also note that this producer does not show > symbol, so do not wait for it.

Start kafka-avro-console-consumer to consume avro messages

docker exec -it kafka kafka-avro-console-consumer --bootstrap-server localhost:9092 --topic AvroWithoutKey --from-beginning

Try sending something like:

{"foo":"hello"}
{"foo":"world"}

and you should see exactly the same output in consumer.

If you will try send something wrong you will receive an error:

{"acme":42}
org.apache.kafka.common.errors.SerializationException: Error deserializing json {"acme":42} to Avro of schema {"type":"record","name":"AvroWithoutKey","fields":[{"name":"foo","type":"string"}]}
Caused by: org.apache.avro.AvroTypeException: Expected field name not found: foo
        at org.apache.avro.io.JsonDecoder.doAction(JsonDecoder.java:477)
        at org.apache.avro.io.parsing.Parser.advance(Parser.java:88)

but still if something you are sending is schema compatible everything should be ok, try sending {"foo":"bar","acme":42} and you will receive {"foo":"bar"} in your consumer

Avro with string key

Create topic

docker exec -it kafka kafka-topics --bootstrap-server localhost:9092 --create --topic AvroWithStringKey --partitions 3 --replication-factor 1

Start kafka-avro-console-producer to produce avro messages with primitive string key

docker exec -it kafka kafka-avro-console-producer --broker-list localhost:9092 --topic AvroWithStringKey --property value.schema='{"type":"record","name":"AvroWithStringKey","fields":[{"name":"foo","type":"string"}]}' --property parse.key=true --property key.schema='{"type":"string"}' --property key.separator=" "

Not that we have added --property key.schema='{"type":"string"}' which allow us to use primitives as key and they still will be validated.

Start kafka-avro-console-consumer to consume avro messages with string keys

docker exec -it kafka kafka-avro-console-consumer --bootstrap-server localhost:9092 --topic AvroWithStringKey --from-beginning --property print.key=true --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer

Try send something like:

"one" {"foo":"1"}
"two" {"foo":"2"}

and you should get:

one     {"foo":"1"}
two     {"foo":"2"}

Do not forget to wrap key with double quotes otherwise you will get an error:

org.apache.kafka.common.errors.SerializationException: Error deserializing json one to Avro of schema "string"
Caused by: org.codehaus.jackson.JsonParseException: Unexpected character ('o' (code 111)): expected a valid value (number, String, array, object, 'true', 'false' or 'null')
 at [Source: java.io.StringReader@3feb2dda; line: 1, column: 2]

Avro with int key

Does not work, in example below after trying to send 1|{"foo":"bar"} receiving an error:

org.apache.kafka.common.errors.SerializationException: Error deserializing json 1|{"foo":"hello"} to Avro of schema {"type":"record","name":"AvroWithIntKey","fields":[{"name":"foo","type":"int"}]}
Caused by: org.apache.avro.AvroTypeException: Expected record-start. Got VALUE_NUMBER_INT

Create topic

docker exec -it kafka kafka-topics --bootstrap-server localhost:9092 --create --topic AvroWithIntKey --partitions 3 --replication-factor 1

Start kafka-avro-console-producer to produce avro messages with integer keys

docker exec -it kafka kafka-avro-console-producer --broker-list localhost:9092 --topic AvroWithIntKey --property value.schema='{"type":"record","name":"AvroWithIntKey","fields":[{"name":"foo","type":"int"}]}' --property key.separator="|"

Start kafka-avro-console-consumer to consume avro messages

docker exec -it kafka kafka-avro-console-consumer --bootstrap-server localhost:9092 --topic AvroWithIntKey --from-beginning

Avro with avro key

Create topic

docker exec -it kafka kafka-topics --bootstrap-server localhost:9092 --create --topic AvroWithAvroKey --partitions 3 --replication-factor 1

Start kafka-avro-console-producer which will produce avro messages with avro keys

docker exec -it kafka kafka-avro-console-producer --broker-list localhost:9092 --topic AvroWithAvroKey --property value.schema='{"type":"record", "name": "AvroWithAvroKey", "fields":[{"name":"foo","type":"string"}]}' --property parse.key=true --property key.schema='{"type":"record","name": "key", "fields":[{"name":"id","type":"int"}]}' --property key.separator=" "

Start kafka-avro-console-consumer to consume avro messages with avro keys

docker exec -it kafka kafka-avro-console-consumer --bootstrap-server localhost:9092 --topic AvroWithAvroKey --from-beginning --property print.key=true

Try send

{"id":1} {"foo":"hello"}
{"id":2} {"foo":"world"}

and you should receive

{"id":1}        {"foo":"hello"}
{"id":2}        {"foo":"world"}

if you will try send wrong key like {"id":"guid"} you will receive an error

org.apache.kafka.common.errors.SerializationException: Error deserializing json {"id":"guid"} to Avro of schema {"type":"record","name":"key","fields":[{"name":"id","type":"int"}]}
Caused by: org.apache.avro.AvroTypeException: Expected int. Got VALUE_STRING

Confluent Cloud

If you are using confluent.cloud from confluent.io you still able to do all this with few more params added for commands

More examples can be found here

Topics

You gonna need properties file which you can retrieve from https://confluent.cloud/environments/*****/clusters/***-*****/integrations/clients#java by navigating cluster then "CLI & client configuration"

cloud.properties

bootstrap.servers=xxx-xxxxx.us-east1.gcp.confluent.cloud:9092
ssl.endpoint.identification.algorithm=https
security.protocol=SASL_SSL
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username\="xxxxxxx" password\="xxxxxxx";
docker run -it --rm -v $PWD/cloud.properties:/cloud.properties confluentinc/cp-kafka:5.3.2 kafka-topics \
  --bootstrap-server xxx-xxxxx.us-east1.gcp.confluent.cloud:9092 \
  --command-config /cloud.properties \
  --list

all other commands will work as expected

Produce & consume simple messages

If you are going to run simple producer without avro and schema registry then properties file from previous example should be enough

Create topic

docker run -it --rm -v $PWD/cloud.properties:/cloud.properties confluentinc/cp-kafka:5.3.2 kafka-topics \
  --bootstrap-server xxx-xxxxx.us-east1.gcp.confluent.cloud:9092 \
  --command-config /cloud.properties \
   --create --topic simple1 --partitions 3 --replication-factor 3

Start producer

docker run -it --rm -v $PWD/cloud.properties:/cloud.properties confluentinc/cp-kafka:5.3.2 kafka-console-producer \
  --broker-list xxx-xxxxx.us-east1.gcp.confluent.cloud:9092 \
  --producer.config /cloud.properties \
  --topic simple1

Start consumer

docker run -it --rm -v $PWD/cloud.properties:/cloud.properties confluentinc/cp-kafka:5.3.2 kafka-console-consumer \
  --bootstrap-server xxx-xxxxx.us-east1.gcp.confluent.cloud:9092 \
  --consumer.config /cloud.properties \
  --topic simple1

Cleanup

docker run -it --rm -v $PWD/cloud.properties:/cloud.properties confluentinc/cp-kafka:5.3.2 kafka-topics \
  --bootstrap-server xxx-xxxxx.us-east1.gcp.confluent.cloud:9092 \
  --command-config /cloud.properties \
   --delete --topic simple1

Note that you are not restricted to strings only, you can also use all previous examples with different keys and json

Produce & consume AVRO messages in confluent.cloud

Create topic

docker run -it --rm -v $PWD/cloud.properties:/cloud.properties confluentinc/cp-kafka:5.3.2 kafka-topics \
  --bootstrap-server xxx-xxxxx.us-east1.gcp.confluent.cloud:9092 \
  --command-config /cloud.properties \
   --create --topic avro1 --partitions 3 --replication-factor 3

Start producer

docker run -it --rm -v $PWD/cloud.properties:/cloud.properties confluentinc/cp-schema-registry:5.3.2 kafka-avro-console-producer \
    --broker-list xxx-xxxxx.us-east1.gcp.confluent.cloud:9092 \
    --topic avro1 \
    --property value.schema='{"type":"record","name":"AvroWithoutKey","fields":[{"name":"foo","type":"string"}]}' \
    --producer.config /cloud.properties \
    --property schema.registry.url="https://xxxx-xxxxx.us-east1.gcp.confluent.cloud" \
    --property schema.registry.basic.auth.user.info="xxxxxxx:xxxxxxx" \
    --property basic.auth.credentials.source=USER_INFO

Start consumer

docker run -it --rm -v $PWD/cloud.properties:/cloud.properties confluentinc/cp-schema-registry:5.3.2 kafka-avro-console-consumer \
    --bootstrap-server xxx-xxxxx.us-east1.gcp.confluent.cloud:9092 \
    --topic avro1 \
    --from-beginning \
    --value-deserializer io.confluent.kafka.serializers.KafkaAvroDeserializer \
    --key-deserializer org.apache.kafka.common.serialization.StringDeserializer \
    --consumer.config /cloud.properties \
    --property schema.registry.url="https://xxxx-xxxxx.us-east1.gcp.confluent.cloud" \
    --property schema.registry.basic.auth.user.info="xxxxxxx:xxxxxxx" \
    --property basic.auth.credentials.source=USER_INFO

Notes:

  • we are using another docker image confluentinc/cp-schema-registry:5.3.2 because of kafka avro console consumer and producer
  • cloud.properties is still enough but schema registry settings should be passed via command line arguments

Kafka connect

We are going to use kafka connect to:

  • produce predefined messages from a file to replay some sequence of events
  • produce generated messages to get millions of them for test purposes
  • have sample sink connector to save messages to a file

All example will be made as standalone worker which should not be used in production and used here only because of its easy to use

At very end worker command looks liks like this: connect-standalone worker.properties task1.properties task2.properties where worker.properties contains configuration for worker itself and some defaults for tasks, taskX.properties is task configuration, you can have many of them, for example your worker might have few tasks which will produce messages from different files and one task to consume them into elasticsearch.

Tasks producing data into kafka called source, tasks consuming data from kafka called sink.

Be aware of advertised hosts and rest ports, if you are connecting to dockerized kafka which have localhost as advertised host from your worker which is also run in container nothing will work, use --net=host for such scenarios, but then you gonna need to change rest.port to avoid conflict with already taken port.

More links about worker properties:

Also you can get samples like so:

docker run -it --rm confluentinc/cp-kafka-connect:5.3.2 cat /etc/schema-registry/connect-avro-standalone.properties

Local kafka connect

Start your kafka

docker run -it --rm --name=kafka -e SAMPLEDATA=0 -e RUNNING_SAMPLEDATA=0 -e RUNTESTS=0 -e FORWARDLOGS=0 -e ADV_HOST=127.0.0.1 -p 2181:2181 -p 3030:3030 -p 8081-8082:8081-8082 -p 9092:9092 -p 9581-9585:9581-9585 lensesio/fast-data-dev:2.3.0

Note that I'm not exposing 8083 which is used by kafka connect rest api to avoid conflicts, otherwise do not forget to change rest.port in worker.properties

Simple messages

worker.properties

bootstrap.servers=localhost:9092

# do not forget to change me to avoid conflicts
rest.port=8083

# required for standalone workers
offset.storage.file.filename=/tmp/standalone.offsets

# where to look for additional plugins
plugin.path=/usr/share/java,/usr/share/confluent-hub-components

# optional, defaults for tasks
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter

Notes:

  • key and value converters are optional and can be overriden in tasks
  • most used converters are: org.apache.kafka.connect.storage.StringConverter, org.apache.kafka.connect.json.JsonConverter, io.confluent.connect.avro.AvroConverter
  • avro converter requires schema registry
  • for json converter do not forget to add value.converter.schemas.enable=false if you wish not to receive schema, e.g. by sending {"foo":"bar"} you will receive {"schema":{"type":"string","optional":false},"payload":"{\"foo\": \"bar\"}"}

Kafka Connect Source Text File

Notes on task configuration properties:

  • do not forget that each task should have unique name it will be used to watch for offsets and for distributed wrokers it will be used for topic names
  • connector.class is a kind of plugin, you can choose from hub.confluent.io
  • tasks.max control parallelism, for sink tasks can not be bigger that number of topic partitions

source-text-file.properties

name=source-text-file
connector.class=org.apache.kafka.connect.file.FileStreamSourceConnector
# optional, override worker defaults
value.converter=org.apache.kafka.connect.storage.StringConverter
topic=DemoTextFile
file=demo-text-file.txt

demo-text-file.txt

hello
world
mac
was
here

Create topic

docker exec -it kafka kafka-topics --bootstrap-server localhost:9092 --create --topic DemoTextFile --partitions 3 --replication-factor 1

Start consumer

docker exec -it kafka kafka-console-consumer --bootstrap-server localhost:9092 --topic DemoTextFile --from-beginning

Start worker

docker run -it --rm \
    --name=standalone \
    --net=host \
    -v $PWD:/data \
    -w /data \
    confluentinc/cp-kafka-connect:5.3.2 connect-standalone worker.properties source-text-file.properties

Note that we are bypassing our current directory into container so worker has access to all configuration files

If everything is ok after some while you will see your messages from a source file in your consumer

Kafka Connect Source JSON File

This one will work same way as previous

source-json-file.properties

name=source-json-file
connector.class=org.apache.kafka.connect.file.FileStreamSourceConnector
# optional, override worker defaults
# value.converter=org.apache.kafka.connect.json.JsonConverter
# value.converter.schemas.enable=false
# if your will use JsonConverter here you will receive string with escaped json
value.converter=org.apache.kafka.connect.storage.StringConverter
topic=DemoJsonFile
file=demo-json-file.ndjson

demo-json-file.ndjson

{"foo": "hello"}
{"foo": "world"}
{"foo": "bar"}
{"acme": 42}

Create topic

docker exec -it kafka kafka-topics --bootstrap-server localhost:9092 --create --topic DemoJsonFile --partitions 3 --replication-factor 1

Start consumer

docker exec -it kafka kafka-console-consumer --bootstrap-server localhost:9092 --topic DemoJsonFile --property value.deserializer=org.apache.kafka.connect.json.JsonDeserializer --skip-message-on-error --from-beginning

Start worker

docker run -it --rm \
    --name=standalone \
    --net=host \
    -v $PWD:/data \
    -w /data \
    confluentinc/cp-kafka-connect:5.3.2 connect-standalone worker.properties source-json-file.properties

While everything running, try add more records to a source file and save it, you should immediatelly see them in consumer.

Also try to add non json line to a source file, you will get an error:

[2020-01-04 10:09:00,896] ERROR Error processing message, skipping this message:  (kafka.tools.ConsoleConsumer$)
org.apache.kafka.common.errors.SerializationException: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'non': was expecting 'null', 'true', 'false' or NaN
 at [Source: (byte[])"non json"; line: 1, column: 5]
Caused by: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'non': was expecting 'null', 'true', 'false' or NaN
 at [Source: (byte[])"non json"; line: 1, column: 5]

but because we are running consumer with a --skip-message-on-error flag it should not die and continue listening to new records

unfortunatelly there is no way to produce messages with keys from simple files, if you will look at sources you will see that null is passed as key

If you wish to have keys you should run configured console producer and pipe file contents into it

Replaying Avro Messages With Key Value

This particular example does not use Kafka Connect but still might be used to replay some sequence of messages

Lets suppose that our source.txt file will look like:

source.txt

{"id":1}|{"foo":"hello"}
{"id":2}|{"foo":"world"}

where each line is an message with key and value separated by pipe

Create topic

docker exec -it kafka kafka-topics --bootstrap-server localhost:9092 --create --topic AvroFromFile --partitions 3 --replication-factor 1

Start kafka-avro-console-consumer to consume avro messages from file

docker run -it --rm --net=host confluentinc/cp-schema-registry:5.3.2 kafka-avro-console-consumer --bootstrap-server localhost:9092 --topic AvroFromFile --from-beginning --property print.key=true

Start kafka-avro-console-producer which will produce avro messages from a file

docker run -it --rm --net=host -v $PWD:/data -w /data confluentinc/cp-schema-registry:5.3.2 sh -c "kafka-avro-console-producer --broker-list localhost:9092 --topic AvroFromFile --property value.schema='{\"type\":\"record\", \"name\": \"AvroFromFile\", \"fields\":[{\"name\":\"foo\",\"type\":\"string\"}]}' --property parse.key=true --property key.schema='{\"type\":\"record\",\"name\": \"key\", \"fields\":[{\"name\":\"id\",\"type\":\"int\"}]}' --property key.separator=\"|\" < source.txt"

And you should see your desired messages in consumer:

{"id":1}	{"foo":"hello"}
{"id":2}	{"foo":"world"}

Note that I have used sh -c "...." here because of bash can not understand whether last < source.txt should be ran inside docker or not

Kafka Connect Source DataGen Avro

In following example we are going to generate tousand of recods based on given avro schema

source.properties

name=source
connector.class=io.confluent.kafka.connect.datagen.DatagenConnector
kafka.topic=AvroDatagen
# override worker.properties
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
# number of messages to generate
iterations=1000
tasks.max=1
# avro schema
schema.filename=/data/AvroDatagen.avsc

Some additional properties can be found here

Note that by default auto.register.schemas is set to true so you do not need to register schemas upfront everything will be done automatically. Also note that both key.subject.name.strategy and value.subject.name.strategy are set to io.confluent.kafka.serializers.subject.SubjectNameStrategy so schema names will be AvroDatagen-key and AvroDatagen-value retrospectively.

AvroDatagen.avsc

{
  "type": "record",
  "name": "AvroDatagen",
  "namespace": "ua.rabota.topics",
  "fields": [
    {
      "name": "userId",
      "type": {
        "type": "int",
        "arg.properties": {
          "range": {
            "min": 1,
            "max": 100
          }
        }
      }
    },
    {
      "name": "vacancyId",
      "type": {
        "type": "long",
        "arg.properties": {
          "range": {
            "min": 7710732,
            "max": 7711732
          }
        }
      }
    },
    {
      "name": "platform",
      "type": ["null", {
        "type": "string",
        "arg.properties": {
          "options": ["desktop", "mobile", "ios", "android"]
        }
      }],
      "default": null
    }
  ]
}

Note that usually in avro schema you defining properties like {"name": "foo", "type": "string"} where type is usually primitive string with type name, for datagen we are describing type as object with additional arg.properties

Crate topic

docker exec -it kafka kafka-topics --bootstrap-server localhost:9092 --create --topic AvroDatagen --partitions 3 --replication-factor 1

Start consumer

docker exec -it kafka kafka-avro-console-consumer --bootstrap-server localhost:9092 --topic AvroDatagen --from-beginning

Start avro datagen producer

docker run -it --rm \
    --name=standalone \
    --net=host \
    -v $PWD:/data \
    -w /data \
    confluentinc/cp-kafka-connect:5.3.2 bash -c "confluent-hub install --no-prompt confluentinc/kafka-connect-datagen:0.2.0 && connect-standalone worker.properties source.properties"

Note how we are installing kafka-connect-datagen before starting connect-standalone it does not shipped by deafult

After a while, when everything will boot up you should see incomming messages in consumer

When datagen will produce desired 1000 messages it will die and you will see something like:

[2020-01-04 11:22:37,984] ERROR WorkerSourceTask{id=source-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:179)
org.apache.kafka.connect.errors.ConnectException: Stopping connector: generated the configured 1000 number of messages

Unfortunatelly datagen is quite limited about keys only way you can have keys is to provide schema.keyfield which will use one of generated properties as message key, and according to sources it still will be simple string key.

Kafka Connect Simple Sink To Text File

This might be used for debug and log

sink.properties

name=sink
connector.class=org.apache.kafka.connect.file.FileStreamSinkConnector
tasks.max=1
topics=SinkDemo
file=/data/data.txt

Create topic

docker exec -it kafka kafka-topics --bootstrap-server localhost:9092 --create --topic SinkDemo --partitions 3 --replication-factor 1

Start Kafka Connect Sink to save messages to a text file

docker run -it --rm \
    --name=standalone \
    --net=host \
    -v $PWD:/data \
    -w /data \
    confluentinc/cp-kafka-connect:5.3.2 connect-standalone worker.properties sink.properties

Start console producer

docker exec -it kafka kafka-console-producer --broker-list localhost:9092 --topic SinkDemo

and start typing messages into it, you should immediatelly see them in text file

Do not forget that you can run some tricky setups like connect-standalone worker.properties source.properties sink.properties which might generate data into topic and immediatelly sink them into source

Standalone connect worker with confluent.cloud

All previous examples should work well with confluent.cloud if you will provide required configuration options

What you gonna need

cloud.properties

bootstrap.servers=xxx-xxxxx.us-east1.gcp.confluent.cloud:9092
ssl.endpoint.identification.algorithm=https
security.protocol=SASL_SSL
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username\="xxxxxxx" password\="xxxxxxx";

this file will be used by kafka-topics to create topic

worker.properties

bootstrap.servers=xxx-xxxxx.us-east1.gcp.confluent.cloud:9092
plugin.path=/usr/share/java,/usr/share/confluent-hub-components

offset.storage.file.filename=/tmp/standalone.offsets

# TODO: check whether this is a deafults
# default 60000
offset.flush.interval.ms=10000
# default 40000
request.timeout.ms=20000
# 100
retry.backoff.ms=500
consumer.request.timeout.ms=20000
consumer.retry.backoff.ms=500
producer.request.timeout.ms=20000
producer.retry.backoff.ms=500

# deafult https
ssl.endpoint.identification.algorithm=https
# default PLAINTEXT
security.protocol=SASL_SSL
# default GSSAPI
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username\="xxxxxxx" password\="xxxxxxx";

# Connect producer and consumer specific configuration
producer.ssl.endpoint.identification.algorithm=https
producer.confluent.monitoring.interceptor.ssl.endpoint.identification.algorithm=https
consumer.ssl.endpoint.identification.algorithm=https
consumer.confluent.monitoring.interceptor.ssl.endpoint.identification.algorithm=https
producer.security.protocol=SASL_SSL
producer.confluent.monitoring.interceptor.security.protocol=SASL_SSL
consumer.security.protocol=SASL_SSL
consumer.confluent.monitoring.interceptor.security.protocol=SASL_SSL
producer.sasl.mechanism=PLAIN
producer.confluent.monitoring.interceptor.sasl.mechanism=PLAIN
consumer.sasl.mechanism=PLAIN
consumer.confluent.monitoring.interceptor.sasl.mechanism=PLAIN
producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username\="xxxxxxx" password\="xxxxxxx";
producer.confluent.monitoring.interceptor.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username\="xxxxxxx" password\="xxxxxxx";
consumer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username\="xxxxxxx" password\="xxxxxxx";
consumer.confluent.monitoring.interceptor.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username\="xxxxxxx" password\="xxxxxxx";

# Confluent Schema Registry for Kafka Connect
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.basic.auth.credentials.source=USER_INFO
value.converter.schema.registry.basic.auth.user.info=xxxxxxx:xxxxxxx
value.converter.schema.registry.url=https://xxxx-xxxxx.us-east1.gcp.confluent.cloud

key.converter=io.confluent.connect.avro.AvroConverter
key.converter.basic.auth.credentials.source=USER_INFO
key.converter.schema.registry.basic.auth.user.info=xxxxxxx:xxxxxxx
key.converter.schema.registry.url=https://xxxx-xxxxx.us-east1.gcp.confluent.cloud


# additions - https://docs.confluent.io/current/cloud/connect/connect-cloud-config.html

confluent.topic.bootstrap.servers=xxx-xxxxx.us-east1.gcp.confluent.cloud:9092
confluent.topic.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username\="xxxxxxx" password\="xxxxxxx";
confluent.topic.security.protocol=SASL_SSL
confluent.topic.sasl.mechanism=PLAIN

reporter.admin.bootstrap.servers=xxx-xxxxx.us-east1.gcp.confluent.cloud:9092
reporter.admin.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username\="xxxxxxx" password\="xxxxxxx";
reporter.admin.security.protocol=SASL_SSL
reporter.admin.sasl.mechanism=PLAIN

reporter.producer.bootstrap.servers=xxx-xxxxx.us-east1.gcp.confluent.cloud:9092
reporter.producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username\="xxxxxxx" password\="xxxxxxx";
reporter.producer.security.protocol=SASL_SSL
reporter.producer.sasl.mechanism=PLAIN

this one is for worker to be able to comminicate with confluent cloud

source.properties

name=source
tasks.max=1

connector.class=io.confluent.kafka.connect.datagen.DatagenConnector
kafka.topic=demo1
iterations=1000
schema.filename=/data/demo1.avsc

this one will be used by datagen connector to generate random data into given topic

sink.properties

name=sink
connector.class=org.apache.kafka.connect.file.FileStreamSinkConnector
topics=demo1
file=/data/data.txt

sing generated messages back from cloud to local file

demo1.avsc

{
  "type": "record",
  "name": "demo1",
  "namespace": "ua.rabota.topics",
  "fields": [
    {
      "name": "userId",
      "type": {
        "type": "int",
        "arg.properties": {
          "range": {
            "min": 1,
            "max": 100
          }
        }
      }
    },
    {
      "name": "vacancyId",
      "type": {
        "type": "long",
        "arg.properties": {
          "range": {
            "min": 7710732,
            "max": 7711732
          }
        }
      }
    },
    {
      "name": "platform",
      "type": ["null", {
        "type": "string",
        "arg.properties": {
          "options": ["desktop", "mobile", "ios", "android"]
        }
      }],
      "default": null
    }
  ]
}

schema for messages to be generated

create topic

docker run -it --rm -v $PWD/cloud.properties:/cloud.properties confluentinc/cp-kafka:5.3.2 kafka-topics \
  --bootstrap-server xxx-xxxxx.us-east1.gcp.confluent.cloud:9092 \
  --command-config /cloud.properties \
  --create --topic demo1  --partitions 3 --replication-factor 3

start worker

docker run -it --rm \
    --name=standalone \
    -v $PWD:/data \
    -w /data \
    confluentinc/cp-kafka-connect:5.3.2 bash -c "confluent-hub install --no-prompt confluentinc/kafka-connect-datagen:0.2.0 && connect-standalone worker.properties source.properties sink.properties"

after a while you will see that your data.txt file becomes full of random generated messages

So now you can quickly send batch of messages both generated and predefined not only to local kafka but also to your confluent cloud one - profit

Distributed Worker

Confluent cloud not giving you distributed workers for some reasons. Seems like it is because they do not know how much of them you gonna need. To start your own connect cluster you will need worker.properties from previous example just remove offset.storage.file.filename from it and add

group.id=mac1
offset.storage.topic=mac1-offsets
config.storage.topic=mac1-configs
status.storage.topic=mac1-status

offset.storage.partitions=3
replication.factor=3
config.storage.replication.factor=3
offset.storage.replication.factor=3
status.storage.replication.factor=3

take a closer look to first four settings, make sure they are unique

The difference between standalone and distributed worker is that from now you going to add and remove your tasks via rest api

In most of the cases everything will look the same as in previous examples, except that now you are going to post json instead of property files like in example from docs:

POST /connectors HTTP/1.1
Host: connect.example.com
Content-Type: application/json
Accept: application/json

{
    "name": "hdfs-sink-connector",
    "config": {
        "connector.class": "io.confluent.connect.hdfs.HdfsSinkConnector",
        "tasks.max": "10",
        "topics": "test-topic",
        "hdfs.url": "hdfs://fakehost:9000",
        "hadoop.conf.dir": "/opt/hadoop/conf",
        "hadoop.home": "/opt/hadoop",
        "flush.size": "100",
        "rotate.interval.ms": "1000"
    }
}

Here is an example of docker run which is a good starting point to run your connect cluster in kubernetes

docker run -it --rm \
    --name=mac1 \
    -p 8083:8083 \
    -e CONNECT_BOOTSTRAP_SERVERS=xxx-xxxxx.us-east1.gcp.confluent.cloud:9092 \
    -e CONNECT_GROUP_ID=mac1 \
    -e CONNECT_OFFSET_STORAGE_TOPIC=mac1-offsets \
    -e CONNECT_CONFIG_STORAGE_TOPIC=mac1-configs \
    -e CONNECT_STATUS_STORAGE_TOPIC=mac1-status \
    -e CONNECT_OFFSET_STORAGE_PARTITIONS=3 \
    -e CONNECT_REPLICATION_FACTOR=3 \
    -e CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR=3 \
    -e CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR=3 \
    -e CONNECT_STATUS_STORAGE_REPLICATION_FACTOR=3 \
    -e CONNECT_OFFSET_FLUSH_INTERVAL_MS=10000 \
    -e CONNECT_REQUEST_TIMEOUT_MS=20000 \
    -e CONNECT_RETRY_BACKOFF_MS=500 \
    -e CONNECT_CONSUMER_REQUEST_TIMEOUT_MS=20000 \
    -e CONNECT_CONSUMER_RETRY_BACKOFF_MS=500 \
    -e CONNECT_PRODUCER_REQUEST_TIMEOUT_MS=20000 \
    -e CONNECT_PRODUCER_RETRY_BACKOFF_MS=500 \
    -e CONNECT_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM=https \
    -e CONNECT_SECURITY_PROTOCOL=SASL_SSL \
    -e CONNECT_SASL_MECHANISM=PLAIN \
    -e CONNECT_SASL_JAAS_CONFIG="org.apache.kafka.common.security.plain.PlainLoginModule required username=\"xxxxxxx\" password=\"xxxxxxx\";" \
    -e CONNECT_PRODUCER_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM=https \
    -e CONNECT_PRODUCER_CONFLUENT_MONITORING_INTERCEPTOR_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM=https \
    -e CONNECT_CONSUMER_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM=https \
    -e CONNECT_CONSUMER_CONFLUENT_MONITORING_INTERCEPTOR_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM=https \
    -e CONNECT_PRODUCER_SECURITY_PROTOCOL=SASL_SSL \
    -e CONNECT_PRODUCER_CONFLUENT_MONITORING_INTERCEPTOR_SECURITY_PROTOCOL=SASL_SSL \
    -e CONNECT_CONSUMER_SECURITY_PROTOCOL=SASL_SSL \
    -e CONNECT_CONSUMER_CONFLUENT_MONITORING_INTERCEPTOR_SECURITY_PROTOCOL=SASL_SSL \
    -e CONNECT_PRODUCER_SASL_MECHANISM=PLAIN \
    -e CONNECT_PRODUCER_CONFLUENT_MONITORING_INTERCEPTOR_SASL_MECHANISM=PLAIN \
    -e CONNECT_CONSUMER_SASL_MECHANISM=PLAIN \
    -e CONNECT_CONSUMER_CONFLUENT_MONITORING_INTERCEPTOR_SASL_MECHANISM=PLAIN \
    -e CONNECT_PRODUCER_SASL_JAAS_CONFIG="org.apache.kafka.common.security.plain.PlainLoginModule required username=\"xxxxxxx\" password=\"xxxxxxx\";" \
    -e CONNECT_PRODUCER_CONFLUENT_MONITORING_INTERCEPTOR_SASL_JAAS_CONFIG="org.apache.kafka.common.security.plain.PlainLoginModule required username=\"xxxxxxx\" password=\"xxxxxxx\";" \
    -e CONNECT_CONSUMER_SASL_JAAS_CONFIG="org.apache.kafka.common.security.plain.PlainLoginModule required username=\"xxxxxxx\" password=\"xxxxxxx\";" \
    -e CONNECT_CONSUMER_CONFLUENT_MONITORING_INTERCEPTOR_SASL_JAAS_CONFIG="org.apache.kafka.common.security.plain.PlainLoginModule required username=\"xxxxxxx\" password=\"xxxxxxx\";" \
    -e CONNECT_VALUE_CONVERTER=io.confluent.connect.avro.AvroConverter \
    -e CONNECT_VALUE_CONVERTER_BASIC_AUTH_CREDENTIALS_SOURCE=USER_INFO \
    -e CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO=xxxxxxx:xxxxxxx \
    -e CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL=https://xxxx-xxxxx.us-east1.gcp.confluent.cloud \
    -e CONNECT_KEY_CONVERTER=io.confluent.connect.avro.AvroConverter \
    -e CONNECT_KEY_CONVERTER_SCHEMAS_ENABLE=true \
    -e CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE=true \
    -e CONNECT_REST_POST=8083 \
    -e CONNECT_REST_ADVERTISED_HOST_NAME=localhost \
    -e CONNECT_KEY_CONVERTER_BASIC_AUTH_CREDENTIALS_SOURCE=USER_INFO \
    -e CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO=xxxxxxx:xxxxxxx \
    -e CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL=https://xxxx-xxxxx.us-east1.gcp.confluent.cloud
    confluentinc/cp-kafka-connect:5.3.2

Bash aliases

Even simple operations like creating topic becomes not easy to remember especially if you will have local, dev, prod kafka clusters

If you are using ccloud command line tool you already should have ~/.ccloud/ which to me seems a good place to save my cloud.properties files in my case it will be dev.properties and prod.peroperties

Here are few starting point examples

Local kafka bash aliases

alias local-topic="docker run -it --rm --net=host confluentinc/cp-kafka:5.3.2 kafka-topics --bootstrap-server localhost:9092"

alias local-topic-list="local-topic --list"

alias local-topic-delete="local-topic --delete --topic"

alias local-topic-describe="local-topic --describe --topic"

alias local-topic-create="local-topic --create --replication-factor 1 --topic"

alias local-topic-create1="local-topic --create --replication-factor 1 --partitions 1 --topic"

alias local-topic-create2="local-topic --create --replication-factor 1 --partitions 2 --topic"

alias local-console-consumer="docker run -it --rm --net=host confluentinc/cp-kafka:5.3.2 kafka-console-consumer --bootstrap-server localhost:9092 --from-beginning --topic"

alias local-console-producer="docker run -it --rm --net=host confluentinc/cp-kafka:5.3.2 kafka-console-producer --broker-list localhost:9092 --topic"

Confluent cloud kafka bash aliases

alias dev-topic="docker run -it --rm -v /Users/mac/.ccloud/dev.properties:/dev.properties confluentinc/cp-kafka:5.3.2 kafka-topics --bootstrap-server $(grep bootstrap.server ~/.ccloud/dev.properties | tail -1 | cut -d'=' -f2) --command-config dev.properties"

alias dev-topic-list="dev-topic --list"

alias dev-topic-delete="dev-topic --delete --topic"

alias dev-topic-describe="dev-topic --describe --topic"

alias dev-topic-create="dev-topic --create  --replication-factor 3 --topic"

alias dev-console-consumer="docker run -it --rm -v /Users/mac/.ccloud/dev.properties:/dev.properties confluentinc/cp-kafka:5.3.2 kafka-console-consumer --bootstrap-server $(grep bootstrap.server ~/.ccloud/dev.properties | tail -1 | cut -d'=' -f2) --consumer.config dev.properties --topic"

alias dev-console-producer="docker run -it --rm -v /Users/mac/.ccloud/dev.properties:/dev.properties confluentinc/cp-kafka:5.3.2 kafka-console-producer --broker-list $(grep bootstrap.server ~/.ccloud/dev.properties | tail -1 | cut -d'=' -f2) --producer.config dev.properties --topic"