When using Kafka for communication between systems, you may find yourself having to change your consumer configuration. This may happen due to the publisher changing topic, or a new message broker being created and a company-wide mandate that everyone switch until a set date. Doing this sort of migration isn’t very complicated, but any errors may prove to be costly. If we re-consume a new topic from the beginning, for example, we may accidentally fill up our database, for example. We want to start consuming the new topic at around the same time we stopped consuming the old one (with a buffer of a few minutes preferably, just to be safe).

The first step here is to ensure that your consumer code is idempotent, but that’s something that you should always ensure – just imagine that you have to re-consume a couple of messages, in that case lacking idempotency may result in your system getting strange data and bugs.

Setting up Kafka on your machine

(Feel free to skip this section if you have everything set up.)

Kafka releases aren’t included in most package managers, except for Homebrew in Mac systems. They should be available, however, in the official download page. For Unix systems (Mac, Linux, WSL, *BSD), simply get one of the binary downloads and extract it wherever you prefer. Then, make sure to add the bin folder to your $PATH, so that you can use these scripts in any folder. In my case, I put the extracted archive in $HOME/.local/share/kafka, so I edit my .zshrc (or .bashrc in a plainer Linux/WSL system) and add:

export PATH=$HOME/.local/share/kafka/bin:$PATH


Now we can use the official kafka scripts in any directory. That is, unless we have some form of authentication. We can add additional configs to a file somewhere, and include that file in our commands. Let’s put a file called dev.cfg in $HOME/.config/kafka/ (the exact directory doesn’t matter, but .config is the appropriate directory for configs), and write the following on it:

security.protocol=SASL_SSL
sasl.mechanism=SCRAM-SHA-512
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
    username="me" \
    password="noneofyourbusiness";

This is just an example, your broker’s security config may vary. After doing this, just add to any of the Kafka commands you use:

--command-config $HOME/.config/kafka/dev.cfg

Checking consumer offsets

Before migrating topics, it’s best if we make sure that the old topic consumer is totally up-to-date. We can use this command to check the consumption lag:

kafka-consumer-groups.sh \
	--bootstrap-server='<your broker endpoints, separated by commas>' \
	--describe \
	--group='<your consumer group>'

This should yield something like the output below:

TOPIC      PARTITION   NEWEST_OFFSET   OLDEST_OFFSET     CONSUMER_OFFSET     LEAD         LAG
your-topic 0           30000000        0                 30000000            30000000     0
your-topic 1           23900000        0                 23900000            23900000     0

CLIENT_HOST      CLIENT_ID         TOPIC          ASSIGNED_PARTITIONS
/128.0.0.0       your-consumer     your-topic     0
/128.0.0.1     	 your-consumer     your-topic     1

From this we can see that there is zero lag, since the newest offset is equal to the consumer offset on both partitions. We can also see that our consumers are still running (they’re the clients with ID “your-consumer”).

Performing the Kafka consumer migration

Before switching topics or brokers, we should first create a consumer group for the new configuration. This will allow us to change the offsets before beginning the message consumption proper. The easiest way to do this is to use kcat, a simple command-line tool that allows us to consume and produce Kafka messages. To create a consumer group, you must consume messages, so it’s a matter of setting up kcat (check the install guide on the kcat repo) and consuming a few messages from the new topic:

kcat \
	-C \
	-b '<your brokers>' \
	-t new-topic \
	-G new-consumer-group

# If you have authentication, just add: 

	-X security.protocol=SASL_SSL \
	-X sasl.mechanisms=SCRAM-SHA-512 \
	-X sasl.username="me" \
	-X sasl.password="noneofyourbusiness"

Just run this for a second, check that messages come out, and then run the describe command for the new group:

kafka-consumer-groups.sh \
	--bootstrap-server='new-broker' \
	--describe \
	--group='new-group'


TOPIC     PARTITION   NEWEST_OFFSET   OLDEST_OFFSET     CONSUMER_OFFSET     LEAD         LAG
new-topic 0           12387000        0                 12                  12           12386988
new-topic 1           12360000        0                 38                  38           12359962

CLIENT_HOST      CLIENT_ID         TOPIC          ASSIGNED_PARTITIONS

We have huge lag here, but that’s fine, as we’re going to reset the offsets anyway. Stop the workers consuming from the older topic by whatever means you can and note down the time at which they stopped. We will now reset the consumer group offset for this new group. Just a note: in this case, we don’t have any consumers connected to this group, but if we did we’d also have to turn them off. You cannot reset a consumer group offset if there are clients connected to it.

To reset the offsets, just run the following command (here we’ll assume that we stopped the previous consumers at 10:00 of March 15th, 2023, and subtract ten minutes just to be safe):

kafka-consumer-groups.sh \
	--bootstrap-server='new-brokers' \
	--group='new-group' \
	--topic='new-topic' \
	--reset-offsets \
	--to-datetime 2023-03-15T09:50:00.000 \
	--dry-run

Running this dry run will not make any changes, It will only show you the new offsets for the consumer group, e.g.:

TOPIC     PARTITION   NEWEST_OFFSET   OLDEST_OFFSET     CONSUMER_OFFSET     LEAD         LAG
new-topic 0           12387000        0                 12386998            12386988     2
new-topic 1           12360000        0                 12369951            12369951     49

CLIENT_HOST      CLIENT_ID         TOPIC          ASSIGNED_PARTITIONS

If nothing looks strange to you, go ahead, substitute --dry-run with --execute and run the command. Afterwards, deploy the new consumer configuration, turn on the worker, and let it run.

LEAVE A REPLY

Please enter your comment!
Please enter your name here