Naming Convention Policy for Kafka topics
Creating a Topic in a kafka-cluster is easy and well documented for the `kafka-topics.sh` or even the official API documentation.
bin/kafka-topics.sh --help
The complexity arises, when you are trying to enforce a standard way of defining topic naming. There are many ways to identify the right convention based on your need, but to enforce such a topic convention while you are creating one is explained in this 5 steps blog.
There is no right convention it is always determined based on what your business needs.
For my example, I wish to define to define a topic convention that follows the semantics as under
<organizationname>.<productname>
It is simple enough to get started, and can be easily extended as you will observe as you follow along.
From the official documentation, if you wish to define a custom topic policy creation you will have to define property as under
Properties files
create.topic.policy.class.name=mypackage.className
The className should implement the interface
org.apache.kafka.server.policy.CreateTopicPolicy
Step 1: Building Project
With these two building blocks let’s define a maven project
Step 2: Define the dependency
Let’s define a package `me.samarthya` and also add dependency of the kafka-clients in the `pom.xml`
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.2.0</version>
<scope>compile</scope>
</dependency>
Step 3: Implementation
Lets define the main class Topic Policy as under
public class TopicPolicy implements CreateTopicPolicy {
private final Logger logger = Logger.getLogger(TopicPolicy.class.toString());
private final static String TopicPattern = "\\w+\\.{1}\\w+";
@Override
public void validate(RequestMetadata requestMetadata) throws PolicyViolationException {
StringBuilder bd = new StringBuilder().append(" Topic Name=").append(requestMetadata.topic());
logger.info(bd.toString());
if ( requestMetadata.topic().isEmpty() || !Pattern.matches(TopicPattern, requestMetadata.topic())) {
throw new PolicyViolationException("Topic name " + requestMetadata.topic() + " should match the pattern " + TopicPattern);
}
}
@Override
public void close() throws Exception {
logger.info(" Close & release.");
}
@Override
public void configure(Map<String, ?> configs) {
if (configs != null) {
for( String k: configs.keySet()) {
logger.info(configs.get(k).toString());
}
}
}
}
With the class defined the main thing to observe is, the TopicPattern that has been defined as the format that will be matched for the name, and if it is not found a PolicyViolationException will be thrown.
Step 4: Repeat for each broker in the cluster
Package the jar and it has to be placed under the `lib` folder of the kafka (classpath).
PowerShell
4 -rw-r--r--. 1 vagrant vagrant 3881 Jul 12 06:28 topic-policy-1.0-SNAPSHOT.jar
also, in the `server.properties` you can define two properties
create.topic.policy.class.name=me.samarthya.TopicPolicy
auto.create.topics.enable=false
Restart your cluster.
Step 5: Test your `Topics`
Let’s go back to the kafka binary folder (local machine) and issue the topic creation command again
bin/kafka-topics.sh --bootstrap-server mybroker.test:9092 --topic invalid_topic --create
If the jar has been loaded successfully you should see an error reported as below
WARNING: Due to limitations in metric names, topics with a period ('.') or underscore ('_') could collide. To avoid issues it is best to use either, but not both.
Error while executing topic command : Topic name invalid_topic should match the pattern \w+\.{1}\w+
[2022-07-13 09:49:21,805] ERROR org.apache.kafka.common.errors.PolicyViolationException: Topic name invalid_topic should match the pattern \w+\.{1}\w+
(kafka.admin.TopicCommand$)
You can modify the pattern now as per your convenience and re-deploy the jar to check the new custom topic policies.
Example:
bin/kafka-topics.sh --bootstrap-server mybroker.test.test:9092 --topic invalid.valid --create
WARNING: Due to limitations in metric names, topics with a period ('.') or underscore ('_') could collide. To avoid issues it is best to use either, but not both.
Created topic invalid.valid
Note:
Since the auto-topic creation has been disabled if you try and create an invalid topic through producer it will not work (below).
PowerShell
bin/kafka-console-producer.sh --bootstrap-server mybroker.test:9092 --topic test
Will result in following error
[2022-07-13 09:54:21,196] WARN [Producer clientId=console-producer] Error while fetching metadata with correlation id 4 : {test=UNKNOWN_TOPIC_OR_PARTITION} (org.apache.kafka.clients.NetworkClient)
For an existing topic `invalid.valid` it should work
bin/kafka-console-producer.sh --bootstrap-server mybrokers.test:9092 --topic invalid.valid