{"id":2283,"date":"2022-07-13T14:21:34","date_gmt":"2022-07-13T14:21:34","guid":{"rendered":"https:\/\/blog.samarthya.me\/wps\/?p=2283"},"modified":"2022-07-13T14:21:36","modified_gmt":"2022-07-13T14:21:36","slug":"naming-convention-policy-for-kafka-topics","status":"publish","type":"post","link":"https:\/\/blog.samarthya.me\/wps\/2022\/07\/13\/naming-convention-policy-for-kafka-topics\/","title":{"rendered":"Naming Convention Policy for Kafka topics"},"content":{"rendered":"\n<p>Creating a\u00a0<strong>Topic<\/strong>\u00a0in a kafka-cluster is easy and well documented for the `kafka-topics.sh` or even the official API documentation.<\/p>\n\n\n\n<pre class=\"wp-block-code has-black-color has-cyan-bluish-gray-background-color has-text-color has-background\"><code>&nbsp;bin\/kafka-topics.sh --help<\/code><\/pre>\n\n\n\n<p>The complexity arises, when you are trying to enforce a standard way of defining topic naming. There are many ways to identify the&nbsp;<strong>right convention<\/strong>&nbsp;based on your need, but to enforce such a topic convention while you are creating one is explained in this 5 steps blog.<\/p>\n\n\n\n<blockquote class=\"wp-block-quote is-layout-flow wp-block-quote-is-layout-flow\"><p>There is no right&nbsp;<em>convention<\/em>&nbsp;it is always determined based on what your business needs.<\/p><\/blockquote>\n\n\n\n<p>For my example, I wish to define to define a topic convention that follows the semantics as under<\/p>\n\n\n\n<pre class=\"wp-block-code has-black-color has-pale-pink-background-color has-text-color has-background\"><code>&lt;organizationname&gt;.&lt;productname&gt;<\/code><\/pre>\n\n\n\n<p>It is simple enough to get started, and can be easily extended as you will observe as you follow along.<\/p>\n\n\n\n<p>From the&nbsp;<a href=\"https:\/\/kafka.apache.org\/documentation\/#brokerconfigs_create.topic.policy.class.name\" target=\"_blank\" rel=\"noreferrer noopener\">official documentation<\/a>, if you wish to define a custom topic policy creation you will have to define property as under<\/p>\n\n\n\n<p>Properties files<\/p>\n\n\n\n<pre class=\"wp-block-code has-black-color has-light-green-cyan-background-color has-text-color has-background\"><code>create.topic.policy.class.name=mypackage.className<\/code><\/pre>\n\n\n\n<p>The className should implement the interface&nbsp;<\/p>\n\n\n\n<pre class=\"wp-block-code has-black-color has-light-green-cyan-background-color has-text-color has-background\"><code> org.apache.kafka.server.policy.CreateTopicPolicy<\/code><\/pre>\n\n\n\n<h2 class=\"wp-block-heading\">Step 1: Building Project<\/h2>\n\n\n\n<p>With these two building blocks let&#8217;s define a maven project<\/p>\n\n\n\n<figure class=\"wp-block-image\"><img decoding=\"async\" src=\"https:\/\/dz2cdn1.dzone.com\/storage\/temp\/16042395-screen-shot-2022-07-13-at-93700-am.png\" alt=\"\"\/><figcaption>Project Space<\/figcaption><\/figure>\n\n\n\n<h2 class=\"wp-block-heading\">Step 2: Define the dependency<\/h2>\n\n\n\n<p>Let&#8217;s define a package `me.samarthya` and also add dependency of the kafka-clients in the `pom.xml`<\/p>\n\n\n\n<pre class=\"wp-block-code has-black-color has-pale-cyan-blue-background-color has-text-color has-background\"><code> &lt;dependency&gt;<br>   &lt;groupId&gt;org.apache.kafka&lt;\/groupId&gt;<br>   &lt;artifactId&gt;kafka-clients&lt;\/artifactId&gt;<br>   &lt;version&gt;3.2.0&lt;\/version&gt;<br>   &lt;scope&gt;compile&lt;\/scope&gt;<br>&lt;\/dependency&gt;<\/code><\/pre>\n\n\n\n<h2 class=\"wp-block-heading\">Step 3: Implementation<\/h2>\n\n\n\n<p>Lets define the main class Topic Policy as under<\/p>\n\n\n\n<pre class=\"wp-block-code has-black-color has-light-green-cyan-background-color has-text-color has-background\"><code>public class TopicPolicy implements CreateTopicPolicy {<br>    private final Logger logger = Logger.getLogger(TopicPolicy.class.toString());<br><br>    private final static String TopicPattern = \"\\\\w+\\\\.{1}\\\\w+\";<br><br>    @Override<br>    public void validate(RequestMetadata requestMetadata) throws PolicyViolationException {<br>        StringBuilder bd = new StringBuilder().append(\" Topic Name=\").append(requestMetadata.topic());<br>        logger.info(bd.toString());<br>        if ( requestMetadata.topic().isEmpty() || !Pattern.matches(TopicPattern, requestMetadata.topic())) {<br>            throw new PolicyViolationException(\"Topic name \" + requestMetadata.topic() + \" should match the pattern \" + TopicPattern);<br>        }<br>    }<br><br>    @Override<br>    public void close() throws Exception {<br>        logger.info(\" Close &amp; release.\");<br>    }<br><br>    @Override<br>    public void configure(Map&lt;String, ?&gt; configs) {<br>        if (configs != null) {<br>            for( String k: configs.keySet()) {<br>                logger.info(configs.get(k).toString());<br>            }<br>        }<br>    }<br>}<\/code><\/pre>\n\n\n\n<p>With the class defined the main thing to observe is, the&nbsp;<strong>TopicPattern&nbsp;<\/strong>that has been defined as the format that will be matched for the name, and if it is not found a PolicyViolationException will be thrown.<\/p>\n\n\n\n<h2 class=\"wp-block-heading\"><strong>Step 4: Repeat for each broker in the cluster<\/strong><\/h2>\n\n\n\n<p>Package the jar and it has to be placed under the `lib` folder of the kafka (classpath).<strong>&nbsp;&nbsp;<\/strong><\/p>\n\n\n\n<p>PowerShell<\/p>\n\n\n\n<pre class=\"wp-block-code has-black-color has-cyan-bluish-gray-background-color has-text-color has-background\"><code>&nbsp; 4 -rw-r--r--. 1 vagrant vagrant &nbsp; &nbsp; 3881 Jul 12 06:28 topic-policy-1.0-SNAPSHOT.jar<\/code><\/pre>\n\n\n\n<p>also, in the `server.properties` you can define two properties<\/p>\n\n\n\n<pre class=\"wp-block-code has-black-color has-light-green-cyan-background-color has-text-color has-background\"><code>create.topic.policy.class.name=me.samarthya.TopicPolicy\nauto.create.topics.enable=false<\/code><\/pre>\n\n\n\n<p>Restart your cluster.<\/p>\n\n\n\n<h2 class=\"wp-block-heading\">Step 5: Test your `Topics`<\/h2>\n\n\n\n<p>Let&#8217;s go back to the kafka binary folder (local machine) and issue the topic creation command again<\/p>\n\n\n\n<pre class=\"wp-block-code has-black-color has-cyan-bluish-gray-background-color has-text-color has-background\"><code>&nbsp;bin\/kafka-topics.sh --bootstrap-server mybroker.test:9092 &nbsp;--topic invalid_topic --create<\/code><\/pre>\n\n\n\n<p>If the jar has been loaded successfully you should see an error reported as below<\/p>\n\n\n\n<pre class=\"wp-block-code has-black-color has-cyan-bluish-gray-background-color has-text-color has-background\"><code>WARNING: Due to limitations in metric names, topics with a period ('.') or underscore ('_') could collide. To avoid issues it is best to use either, but not both.<br>Error while executing topic command : Topic name invalid_topic should match the pattern \\w+\\.{1}\\w+<br>&#91;2022-07-13 09:49:21,805] ERROR org.apache.kafka.common.errors.PolicyViolationException: Topic name invalid_topic should match the pattern \\w+\\.{1}\\w+<br>&nbsp;(kafka.admin.TopicCommand$)<\/code><\/pre>\n\n\n\n<p>You can modify the pattern now as per your convenience and re-deploy the jar to check the new custom topic policies.<\/p>\n\n\n\n<h3 class=\"wp-block-heading\">Example:<\/h3>\n\n\n\n<pre class=\"wp-block-code has-black-color has-cyan-bluish-gray-background-color has-text-color has-background\"><code>bin\/kafka-topics.sh --bootstrap-server mybroker.test.test:9092 &nbsp;--topic invalid.valid --create<\/code><\/pre>\n\n\n\n<pre class=\"wp-block-code has-black-color has-cyan-bluish-gray-background-color has-text-color has-background\"><code>WARNING: Due to limitations in metric names, topics with a period ('.') or underscore ('_') could collide. To avoid issues it is best to use either, but not both.\nCreated topic invalid.valid<\/code><\/pre>\n\n\n\n<h2 class=\"wp-block-heading\">Note:<\/h2>\n\n\n\n<p>Since the auto-topic creation has been disabled if you try and create an invalid topic through producer it will not work (below).<\/p>\n\n\n\n<p>PowerShell<\/p>\n\n\n\n<pre class=\"wp-block-code has-black-color has-cyan-bluish-gray-background-color has-text-color has-background\"><code> bin\/kafka-console-producer.sh --bootstrap-server mybroker.test:9092 --topic test<\/code><\/pre>\n\n\n\n<p>Will result in following error<\/p>\n\n\n\n<pre class=\"wp-block-code has-black-color has-cyan-bluish-gray-background-color has-text-color has-background\"><code>&#91;2022-07-13 09:54:21,196] WARN &#91;Producer clientId=console-producer] Error while fetching metadata with correlation id 4 : {test=UNKNOWN_TOPIC_OR_PARTITION} (org.apache.kafka.clients.NetworkClient)<\/code><\/pre>\n\n\n\n<p>For an existing topic `invalid.valid` it should work<\/p>\n\n\n\n<pre class=\"wp-block-code has-black-color has-cyan-bluish-gray-background-color has-text-color has-background\"><code>bin\/kafka-console-producer.sh --bootstrap-server mybrokers.test:9092 --topic invalid.valid<\/code><\/pre>\n","protected":false},"excerpt":{"rendered":"<p>Creating a\u00a0Topic\u00a0in a kafka-cluster is easy and well documented for the `kafka-topics.sh` or even the official API documentation. The complexity arises, when you are trying to enforce a standard way of defining topic naming. There are many ways to identify the&nbsp;right convention&nbsp;based on your need, but to enforce such a topic convention while you are [&hellip;]<\/p>\n","protected":false},"author":2,"featured_media":2284,"comment_status":"open","ping_status":"open","sticky":false,"template":"","format":"image","meta":{"_exactmetrics_skip_tracking":false,"_exactmetrics_sitenote_active":false,"_exactmetrics_sitenote_note":"","_exactmetrics_sitenote_category":0,"footnotes":""},"categories":[120,34,239],"tags":[225,257],"class_list":["post-2283","post","type-post","status-publish","format-image","has-post-thumbnail","hentry","category-java","category-technical","category-technical-2","tag-kafka","tag-topics","post_format-post-format-image"],"_links":{"self":[{"href":"https:\/\/blog.samarthya.me\/wps\/wp-json\/wp\/v2\/posts\/2283","targetHints":{"allow":["GET"]}}],"collection":[{"href":"https:\/\/blog.samarthya.me\/wps\/wp-json\/wp\/v2\/posts"}],"about":[{"href":"https:\/\/blog.samarthya.me\/wps\/wp-json\/wp\/v2\/types\/post"}],"author":[{"embeddable":true,"href":"https:\/\/blog.samarthya.me\/wps\/wp-json\/wp\/v2\/users\/2"}],"replies":[{"embeddable":true,"href":"https:\/\/blog.samarthya.me\/wps\/wp-json\/wp\/v2\/comments?post=2283"}],"version-history":[{"count":3,"href":"https:\/\/blog.samarthya.me\/wps\/wp-json\/wp\/v2\/posts\/2283\/revisions"}],"predecessor-version":[{"id":2287,"href":"https:\/\/blog.samarthya.me\/wps\/wp-json\/wp\/v2\/posts\/2283\/revisions\/2287"}],"wp:featuredmedia":[{"embeddable":true,"href":"https:\/\/blog.samarthya.me\/wps\/wp-json\/wp\/v2\/media\/2284"}],"wp:attachment":[{"href":"https:\/\/blog.samarthya.me\/wps\/wp-json\/wp\/v2\/media?parent=2283"}],"wp:term":[{"taxonomy":"category","embeddable":true,"href":"https:\/\/blog.samarthya.me\/wps\/wp-json\/wp\/v2\/categories?post=2283"},{"taxonomy":"post_tag","embeddable":true,"href":"https:\/\/blog.samarthya.me\/wps\/wp-json\/wp\/v2\/tags?post=2283"}],"curies":[{"name":"wp","href":"https:\/\/api.w.org\/{rel}","templated":true}]}}