Kafka/KSQL Streams Lost When Producing With Golang

Odd one this, and one that took me a little while to debug. I recently set up a Confluent/Kafka data pipeline with transformations being handled by KSQL and data being produced by an application written in Go. As part of the test process I persisted data using a MongoDB Sink connector.

The command line producers had no problems and producing a large file would persist the expected data to MongoDB.

However, I ran into issues when producing from Golang, I would notice that somewhere between 7% and 12% of the messages were being persisted to MongoDB, the others were lost somewhere in the processing. Brokers were up and operational, Mongo was running without issue. The data was clearly being lost somewhere between being sent and reaching the connectors.

Skip to the solution if you're not interested in how I worked out what went wrong.

What the Dickens?

My initial suspicion was that there were problems with how I was using the segmentio/kafka-go library to produce messages. But a switch to the confluent-kafka-go library had the same issues.

I started by checking the count of the stream in KSQL with the following:


Aside: we need to select and group by a column which has common data across all your rows in order to use an aggregate function in KSQL. SELECT COUNT(*) FROM STREAM; alone doesn't work.

The count was showing that the stream itself was seeing the reduced number of messages, indicating the connector wasn't at fault.

I next checked the health of the stream with the following KSQL command:


Which yielded the following output at the bottom:

Local runtime statistics
consumer-messages-per-sec: 71.35 consumer-total-bytes: 8702752 consumer-total-messages: 28536 last-message: 2019-03-14T10:01:39.178Z
consumer-failed-messages: 6417 consumer-failed-messages-per-sec: 64.20 last-failed: 2019-03-14T10:01:39.907Z
(Statistics of the local KSQL server interaction with the Kafka topic spotify-tracks)

The logs were indicating a very high number of failed messages (consumer-failed-messages).

This was confirmed when I checked the offset of the main topic I was producing to and compared with the offset of the topic used by the stream. The offset of the main topic was increasing as expected, where the second topic's offset wasn't.


This had me confused as the KSQL query to create the stream was pretty simple, just selecting some columns from the source JSON topic. 

(message VARCHAR, version INT, ...) \

A quick glance at the KSQL logs showed me the problem:

[2019-03-14 10:23:57,586] WARN task [0_0] Skipping record due to deserialization error. topic=[source-topic] partition=[0] offset=[7120] (org.apache.kafka.streams.processor.internals.RecordDeserializer:86)
org.apache.kafka.common.errors.SerializationException: KsqlJsonDeserializer failed to deserialize data for topic: source-topic
Caused by: org.apache.kafka.connect.errors.DataException: Converting byte[] to Kafka Connect data failed due to serialization error:
at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:344)
at io.confluent.ksql.serde.json.KsqlJsonDeserializer.getGenericRow(KsqlJsonDeserializer.java:80)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:747)
Caused by: org.apache.kafka.common.errors.SerializationException: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'rack_artists': was expecting ('true', 'false' or 'null') at [Source: (byte[])"rack_artists":"Artist 1","track_name":"Track 1"}
{"message":"MessageType","version":2,"track_id":"b31bf36ae24641fe8cb0ac9e8429792f","track_artists":"Artist 2""; line: 1, column: 14]
Caused by: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'rack_artists': was expecting ('true', 'false' or 'null')
at [Source: (byte[])"rack_artists":"Artist 1","track_name":"Track 1"}
{"message":"MessageType","version":2,"track_id":"b31bf36ae24641fe8cb0ac9e8429792f","track_artists":"Artist 2""; line: 1, column: 14]
at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1804)
at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:679)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:747)

This exception is trimmed (and some of the data itself is removed) but basically it appears as though the last and the penultimate lines of the source file have been merged somehow, indicating the newline wasn't used to separate the two final messages. 

This was starting to indicate the issue was in the Golang application itself.

The producer application itself is pretty simple; it grabs a gz file from an external location, gunzips it then reads each individual line to a Kafka message before producing each message, one at a time. The bulk of which is done in the following code:

scanner := bufio.NewScanner(file)

for scanner.Scan() {
val := scanner.Bytes()

p.ProduceChannel() <-&kafka.Message{
TopicPartition: kafka.TopicPartition{
Topic: &topic,
Partition: kafka.PartitionAny,
Value: val,

(This code uses the confluent-kafka-go library)

Nothing weird going on here, I thought, simply setting up a bufio.Scanner from the os.File reader, separating on new lines, looping over each line and passing the bytes to the producer on the message.

The Solution

So I've got no explanation for why this works but, out of ideas, I decided to get the line as a string and then convert it to bytes rather than rely on the Bytes function:

val := []byte(scanner.Text())

I reran the producer and (lo and behold!) the rows were all being persisted to the connected MongoDB database.

I really can't explain what is going on here and I'm not going to try to explain it, it just appears the bufio.Scanner.Text() function is safer to use on readable text with newline separators than the Bytes() function. Bizarre!

Posted on Mar 14, 2019

Discuss This

blog comments powered by Disqus