When using Kafka for communication between systems, you may find yourself having to change your consumer configuration. This may happen due to the publisher changing topic, or a new message broker being created and a company-wide mandate that everyone switch until a set date. Doing this sort of migration isn’t very complicated, but any errors may prove to be costly. If we re-consume a new topic from the beginning, for example, we may accidentally fill up our database, for example. We want to start consuming the new topic at around the same time we stopped consuming the old one (with a buffer of a few minutes preferably, just to be safe).
The first step here is to ensure that your consumer code is idempotent, but that’s something that you should always ensure – just imagine that you have to re-consume a couple of messages, in that case lacking idempotency may result in your system getting strange data and bugs.
Setting up Kafka on your machine
(Feel free to skip this section if you have everything set up.)
Kafka releases aren’t included in most package managers, except for Homebrew in Mac systems. They should be available, however, in the official download page. For Unix systems (Mac, Linux, WSL, *BSD), simply get one of the binary downloads and extract it wherever you prefer. Then, make sure to add the bin folder to your $PATH, so that you can use these scripts in any folder. In my case, I put the extracted archive in $HOME/.local/share/kafka, so I edit my .zshrc (or .bashrc in a plainer Linux/WSL system) and add:
export PATH=$HOME/.local/share/kafka/bin:$PATH
Now we can use the official kafka scripts in any directory. That is, unless we have some form of authentication. We can add additional configs to a file somewhere, and include that file in our commands. Let’s put a file called dev.cfg
in $HOME/.config/kafka/
(the exact directory doesn’t matter, but .config is the appropriate directory for configs), and write the following on it:
security.protocol=SASL_SSL
sasl.mechanism=SCRAM-SHA-512
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
username="me" \
password="noneofyourbusiness";
This is just an example, your broker’s security config may vary. After doing this, just add to any of the Kafka commands you use:
--command-config $HOME/.config/kafka/dev.cfg
Checking consumer offsets
Before migrating topics, it’s best if we make sure that the old topic consumer is totally up-to-date. We can use this command to check the consumption lag:
kafka-consumer-groups.sh \
--bootstrap-server='<your broker endpoints, separated by commas>' \
--describe \
--group='<your consumer group>'
This should yield something like the output below:
TOPIC PARTITION NEWEST_OFFSET OLDEST_OFFSET CONSUMER_OFFSET LEAD LAG
your-topic 0 30000000 0 30000000 30000000 0
your-topic 1 23900000 0 23900000 23900000 0
CLIENT_HOST CLIENT_ID TOPIC ASSIGNED_PARTITIONS
/128.0.0.0 your-consumer your-topic 0
/128.0.0.1 your-consumer your-topic 1
From this we can see that there is zero lag, since the newest offset is equal to the consumer offset on both partitions. We can also see that our consumers are still running (they’re the clients with ID “your-consumer”).
Performing the Kafka consumer migration
Before switching topics or brokers, we should first create a consumer group for the new configuration. This will allow us to change the offsets before beginning the message consumption proper. The easiest way to do this is to use kcat
, a simple command-line tool that allows us to consume and produce Kafka messages. To create a consumer group, you must consume messages, so it’s a matter of setting up kcat
(check the install guide on the kcat repo) and consuming a few messages from the new topic:
kcat \
-C \
-b '<your brokers>' \
-t new-topic \
-G new-consumer-group
# If you have authentication, just add:
-X security.protocol=SASL_SSL \
-X sasl.mechanisms=SCRAM-SHA-512 \
-X sasl.username="me" \
-X sasl.password="noneofyourbusiness"
Just run this for a second, check that messages come out, and then run the describe command for the new group:
kafka-consumer-groups.sh \
--bootstrap-server='new-broker' \
--describe \
--group='new-group'
TOPIC PARTITION NEWEST_OFFSET OLDEST_OFFSET CONSUMER_OFFSET LEAD LAG
new-topic 0 12387000 0 12 12 12386988
new-topic 1 12360000 0 38 38 12359962
CLIENT_HOST CLIENT_ID TOPIC ASSIGNED_PARTITIONS
We have huge lag here, but that’s fine, as we’re going to reset the offsets anyway. Stop the workers consuming from the older topic by whatever means you can and note down the time at which they stopped. We will now reset the consumer group offset for this new group. Just a note: in this case, we don’t have any consumers connected to this group, but if we did we’d also have to turn them off. You cannot reset a consumer group offset if there are clients connected to it.
To reset the offsets, just run the following command (here we’ll assume that we stopped the previous consumers at 10:00 of March 15th, 2023, and subtract ten minutes just to be safe):
kafka-consumer-groups.sh \
--bootstrap-server='new-brokers' \
--group='new-group' \
--topic='new-topic' \
--reset-offsets \
--to-datetime 2023-03-15T09:50:00.000 \
--dry-run
Running this dry run will not make any changes, It will only show you the new offsets for the consumer group, e.g.:
TOPIC PARTITION NEWEST_OFFSET OLDEST_OFFSET CONSUMER_OFFSET LEAD LAG
new-topic 0 12387000 0 12386998 12386988 2
new-topic 1 12360000 0 12369951 12369951 49
CLIENT_HOST CLIENT_ID TOPIC ASSIGNED_PARTITIONS
If nothing looks strange to you, go ahead, substitute --dry-run
with --execute
and run the command. Afterwards, deploy the new consumer configuration, turn on the worker, and let it run.