{"id":581,"date":"2020-04-23T11:08:40","date_gmt":"2020-04-23T11:08:40","guid":{"rendered":"https:\/\/blog.samarthya.me\/wps\/?p=581"},"modified":"2020-05-12T13:35:08","modified_gmt":"2020-05-12T13:35:08","slug":"pipeline-_ingest-api","status":"publish","type":"post","link":"https:\/\/blog.samarthya.me\/wps\/2020\/04\/23\/pipeline-_ingest-api\/","title":{"rendered":"Pipeline: _ingest API"},"content":{"rendered":"<p>Pipelines is a smooth way of processing some incoming data. In this blog, I will walk through some of the items I have tried my hands on.<\/p>\n<h2>Pipeline<\/h2>\n<p>A pipeline is a definition of a series of\u00a0processors\u00a0that are to be executed in the same order as they are declared.<\/p>\n<p>Consists of two main fields\u00a0<\/p>\n<ol>\n<li><code>description<\/code> Description or purpose of the pipeline<\/li>\n<li><code>processors<\/code> List of processors. Each processor transforms the document in some specific way.\u00a0<\/li>\n<\/ol>\n<ul>\n<li>You might hear or ingest node, bulk api these concepts are not similar but do overlap.<\/li>\n<li>To use a pipeline, simply specify the pipeline parameter on an index or bulk request.\u00a0<\/li>\n<\/ul>\n<h2>Ingest Node<\/h2>\n<p>Ingest node pre-process&#8217;s documents before the actual document indexing happens. The ingest node intercepts bulk and index requests, it applies transformations, and it then passes the documents back to the index or bulk APIs.<\/p>\n<blockquote>\n<p>All nodes in a cluster <code>enable ingest by default<\/code>, so any node can handle ingest tasks.\u00a0<\/p>\n<\/blockquote>\n<p>To pre-process documents before indexing <code>PIPELINES<\/code> are used.<\/p>\n<p>Some key considerations<\/p>\n<ul>\n<li>An <a href=\"https:\/\/blog.samarthya.me\/wps\/2020\/04\/23\/index-modules\/\">index<\/a> may also declare a\u00a0default pipeline\u00a0that will be used in the absence of the\u00a0pipeline\u00a0parameter.<\/li>\n<li>Index may also declare a <a href=\"https:\/\/www.elastic.co\/guide\/en\/elasticsearch\/reference\/master\/index-modules.html#dynamic-index-settings\">final pipeline<\/a> that will be executed after any request or default pipeline (if any).<\/li>\n<\/ul>\n<h3>How can we use it?<\/h3>\n<p>It is as simple as specifying a parameter in the URL<\/p>\n<blockquote>\n<p>PUT my-index\/_doc\/my-id?<code>pipeline=my_pipeline_id<\/code><br \/>{<br \/>\u00a0 \u00a0 \u00a0 \u00a0 &#8220;foo&#8221;: &#8220;bar&#8221;<br \/>}<\/p>\n<\/blockquote>\n<h2>Ingest API&#8217;s<\/h2>\n<p>There are specifically 4 kinds of _ingest API&#8217;s<\/p>\n<ol>\n<li>PUT pipeline<\/li>\n<li>GET pipeline<\/li>\n<li>Delete pipeline<\/li>\n<li>Simulate pipeline<\/li>\n<\/ol>\n<h2>PUT Pipeline<\/h2>\n<p>Allows to put create, or update a pipeline<\/p>\n<blockquote>\n<p>PUT _ingest\/pipeline\/fix_my_locales {<\/p>\n<p>&#8220;description&#8221;: &#8221; My first pipeline&#8221;,<\/p>\n<p>&#8220;processors&#8221;: []<\/p>\n<p>}<\/p>\n<\/blockquote>\n<p>Official documentation <a href=\"https:\/\/www.elastic.co\/guide\/en\/elasticsearch\/reference\/master\/put-pipeline-api.html\">here<\/a>.<\/p>\n<p><code>fix_my_locales<\/code> is the pipeline id that you wish to provide.<\/p>\n<div class=\"variablelist\">\n<dl class=\"variablelist\">\n<dt><span class=\"term\"><code class=\"literal\"><span style=\"color: #191e23; font-family: Noto Serif;\"><span style=\"font-size: 24.96px; background-color: #ffffff;\"><b>Input Parameters<\/b><\/span><\/span><\/code><\/span><\/dt>\n<dt><\/dt>\n<dt><span class=\"term\"><code class=\"literal\">description<\/code> : <\/span>string &#8211; Description of the ingest pipeline.<\/dt>\n<dt><\/dt>\n<dt><span class=\"term\"><code class=\"literal\">processors<\/code>\u00a0 \u00a0: <\/span><span style=\"font-size: inherit;\">array of <\/span><a class=\"xref\" style=\"font-size: inherit;\" title=\"Processors\" href=\"https:\/\/www.elastic.co\/guide\/en\/elasticsearch\/reference\/master\/ingest-processors.html\">processor objects<\/a> &#8211;<span style=\"font-size: inherit;\"> Array of processors (in order) used to pre-process documents before indexing.<\/span><\/dt>\n<dt><\/dt>\n<dt><span class=\"term\"><code class=\"literal\">version<\/code> : <\/span><span style=\"font-size: inherit;\">\u00a0integer &#8211; Optional version number used by external systems to manage ingest pipelines.<\/span><\/dt>\n<\/dl>\n<blockquote>\n<p>PUT _ingest\/pipeline\/transf_date<br \/>{<br \/>\u00a0 \u00a0 \u00a0&#8220;description&#8221;: &#8221; Transforms the date.&#8221;,<br \/>\u00a0 \u00a0 \u00a0&#8220;version&#8221;: 1, <br \/>\u00a0 \u00a0 \u00a0&#8220;processors&#8221;: [<br \/>\u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 {<\/p>\n<p>\u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 &#8220;set&#8221;: {<br \/>\u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 &#8220;field&#8221;: &#8220;cpem&#8221;,<br \/>\u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0&#8220;value&#8221;: &#8220;Covid&#8221;<br \/>\u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 }<\/p>\n<p>\u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 }<br \/>\u00a0 \u00a0 \u00a0 \u00a0]<br \/>}<\/p>\n<\/blockquote>\n<\/div>\n<p>Unsetting the version is as simple as removing the version from the above request and reissuing the request.<\/p>\n<p>A successful execution will be acknowledged by the node.<\/p>\n<blockquote>\n<p>{<br \/>\u00a0 \u00a0 \u00a0 &#8220;acknowledged&#8221; : true<br \/>}<\/p>\n<\/blockquote>\n<h3>Processors<\/h3>\n<ul>\n<li>The processors in a pipeline have <strong>read<\/strong> and <strong>write<\/strong> access to documents that pass through the pipeline.<\/li>\n<li>The processors can access fields in the source of a document and the document\u2019s metadata fields.<\/li>\n<\/ul>\n<h3>Accessing Data<\/h3>\n<p>In the different type of processors that can be defined you can use fields defined in the index (mapping), the metadata fields, or the fields within the _source (if enabled)<\/p>\n<h4>Example<\/h4>\n<pre>PUT _ingest\/pipeline\/transf_date<br \/>{<br \/>  \"description\": \" Transforms the date.\",<br \/>  \"version\": 1, <br \/>  \"processors\": [<br \/>    {<br \/>      \"set\": {<br \/>        \"field\": \"recieved\",<br \/>        \"value\": \"{{_ingest.timestamp}}\"<br \/>      }<br \/>    },<br \/>    {<br \/>      \"set\": {<br \/>        \"field\": \"firstname\",<br \/>        \"value\": \"{{_source.name}}\"<br \/>      }<br \/>    }<br \/>  ]<br \/>}<\/pre>\n<p>Let&#8217;s simulate the pipeline to check the results<\/p>\n<pre>POST _ingest\/pipeline\/transf_date\/_simulate?verbose<br \/>{<br \/>  \"docs\": [<br \/>    {<br \/>      \"_index\": \"rama\",<br \/>      \"_id\": 1,<br \/>      \"_source\": {<br \/>        \"name\": \"abar\"<br \/>      }<br \/>   } <br \/>  ]<br \/>}<\/pre>\n<p>The pipeline defines a field <code>recieved<\/code> which will be populated with the <code>time<\/code> at which the data is ingested &amp; field <code>firstname<\/code> will be populated with the <code>name<\/code> field in the <code>_source<\/code><\/p>\n<pre>{<br \/>  \"docs\" : [<br \/>    {<br \/>      \"processor_results\" : [<br \/>        {<br \/>          \"doc\" : {<br \/>            \"_index\" : \"rama\",<br \/>            \"_type\" : \"_doc\",<br \/>            \"_id\" : \"1\",<br \/>            \"_source\" : {<br \/>              \"name\" : \"abar\",<br \/>              \"recieved\" : \"2020-04-23T16:03:23.213722Z\"<br \/>            },<br \/>            \"_ingest\" : {<br \/>              \"timestamp\" : \"2020-04-23T16:03:23.213722Z\"<br \/>            }<br \/>          }<br \/>        },<br \/>        {<br \/>          \"doc\" : {<br \/>            \"_index\" : \"rama\",<br \/>            \"_type\" : \"_doc\",<br \/>            \"_id\" : \"1\",<br \/>            \"_source\" : {<br \/>              \"name\" : \"abar\",<br \/>              \"firstname\" : \"abar\",<br \/>              \"recieved\" : \"2020-04-23T16:03:23.213722Z\"<br \/>            },<br \/>            \"_ingest\" : {<br \/>              \"timestamp\" : \"2020-04-23T16:03:23.213722Z\"<br \/>            }<br \/>          }<br \/>        }<br \/>      ]<br \/>    }<br \/>  ]<br \/>}<\/pre>\n<p>Metafields that can be accessed are\u00a0<\/p>\n<ul>\n<li>_index<\/li>\n<li>_id<\/li>\n<li>_routing<\/li>\n<\/ul>\n<p>Beyond metadata fields and source fields, ingest also adds ingest metadata to the documents that it processes like the <code>_ingest.timestamp<\/code>.<\/p>\n<h2>DELETE Pipeline<\/h2>\n<p>Is invoked when you wish to delete a pipeline<\/p>\n<blockquote>\n<p><code>DELETE<\/code> _ingest\/pipeline\/Testing<\/p>\n<\/blockquote>\n<h2>GET Pipeline<\/h2>\n<p>Returns a mentioned pipeline<\/p>\n<blockquote>\n<p><code>GET<\/code> _ingest\/pipeline<\/p>\n<\/blockquote>\n<p>Returns all the pipelines that are active in the cluster.<\/p>\n<p><img fetchpriority=\"high\" decoding=\"async\" class=\"aligncenter wp-image-587 size-full\" src=\"https:\/\/blog.samarthya.me\/wps\/wp-content\/uploads\/2020\/04\/Screenshot-2020-04-23-at-3.15.47-PM.png\" alt=\"\" width=\"970\" height=\"544\" srcset=\"https:\/\/blog.samarthya.me\/wps\/wp-content\/uploads\/2020\/04\/Screenshot-2020-04-23-at-3.15.47-PM.png 970w, https:\/\/blog.samarthya.me\/wps\/wp-content\/uploads\/2020\/04\/Screenshot-2020-04-23-at-3.15.47-PM-300x168.png 300w, https:\/\/blog.samarthya.me\/wps\/wp-content\/uploads\/2020\/04\/Screenshot-2020-04-23-at-3.15.47-PM-768x431.png 768w\" sizes=\"(max-width: 970px) 100vw, 970px\" \/><\/p>\n<blockquote>\n<p><code>GET _ingest\/pipeline\/transf_date<\/code><\/p>\n<\/blockquote>\n<p>For information on a specific pipeline you can specify the name in the request and it should return the information<\/p>\n<blockquote>\n<p>{<br \/>\u00a0 \u00a0 \u00a0 &#8220;transf_date&#8221; : {<br \/>\u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0&#8220;description&#8221; : &#8221; Transforms the date.&#8221;,<br \/>\u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0&#8220;version&#8221; : 1,<br \/>\u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0&#8220;processors&#8221; : [<br \/>\u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 {<br \/>\u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0&#8220;set&#8221; : {<br \/>\u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 &#8220;field&#8221; : &#8220;cpem&#8221;,<br \/>\u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 &#8220;value&#8221; : &#8220;Covid&#8221;<br \/>\u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 }<br \/>\u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 }<br \/>\u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0 \u00a0]<br \/>\u00a0 \u00a0 \u00a0 \u00a0}<br \/>}<\/p>\n<\/blockquote>\n<p>If we just want to see the Version information we can utilise the <a href=\"https:\/\/www.elastic.co\/guide\/en\/elasticsearch\/reference\/master\/common-options.html#common-options-response-filtering\">response filtering<\/a>.<\/p>\n<blockquote>\n<p><code>GET<\/code> _ingest\/pipeline\/transf_date?<code>filter_path=*.version<\/code><\/p>\n<p>or<\/p>\n<p>GET _xpack?filter_path=license<\/p>\n<\/blockquote>\n<p>All <code>REST APIs<\/code> accept a <code>filter_path<\/code> parameter that can be used to reduce the response returned by Elasticsearch.\u00a0<\/p>\n<h2>Simulate Pipeline<\/h2>\n<p>Simulate and validate the pipeline for the processors used in the Pipeline<\/p>\n<blockquote>\n<p>POST _ingest\/pipeline\/fix_locales\/_simulate<br \/>{<br \/>&#8220;docs&#8221;: []<br \/>}<\/p>\n<\/blockquote>\n<p>Simulate allows you to validate the logic that the pipeline executes against simulated docs.<\/p>\n<p>POST \/_ingest\/pipeline\/&lt;pipeline&gt;\/_simulate<\/p>\n<p>GET \/_ingest\/pipeline\/&lt;pipeline&gt;\/_simulate<\/p>\n<p>POST \/_ingest\/pipeline\/_simulate<\/p>\n<p>GET \/_ingest\/pipeline\/_simulate<\/p>\n\n\n<pre class=\"wp-block-code\"><code>POST _ingest\/pipeline\/transf_date\/_simulate?verbose\n{\n     \"docs\": &#91;\n                     {\n                         \"_index\": \"rama\",\n                         \"_id\": 1,\n                        \"_source\": {\n                               \"foo\": \"abar\"\n                          }\n                     }\n                 ]\n}<\/code><\/pre>\n\n\n\n<h2 class=\"has-text-align-center wp-block-heading\">&#8212; THE &#8211; END &#8212;<\/h2>\n\n\n\n<p><\/p>\n","protected":false},"excerpt":{"rendered":"<p>Pipelines is a smooth way of processing some incoming data. In this blog, I will walk through some of the items I have tried my hands on. Pipeline A pipeline is a definition of a series of\u00a0processors\u00a0that are to be executed in the same order as they are declared. Consists of two main fields\u00a0 description [&hellip;]<\/p>\n","protected":false},"author":2,"featured_media":585,"comment_status":"open","ping_status":"open","sticky":false,"template":"","format":"image","meta":{"_exactmetrics_skip_tracking":false,"_exactmetrics_sitenote_active":false,"_exactmetrics_sitenote_note":"","_exactmetrics_sitenote_category":0,"footnotes":""},"categories":[34],"tags":[53,61],"class_list":["post-581","post","type-post","status-publish","format-image","has-post-thumbnail","hentry","category-technical","tag-elasticsearch","tag-pipeline","post_format-post-format-image"],"_links":{"self":[{"href":"https:\/\/blog.samarthya.me\/wps\/wp-json\/wp\/v2\/posts\/581","targetHints":{"allow":["GET"]}}],"collection":[{"href":"https:\/\/blog.samarthya.me\/wps\/wp-json\/wp\/v2\/posts"}],"about":[{"href":"https:\/\/blog.samarthya.me\/wps\/wp-json\/wp\/v2\/types\/post"}],"author":[{"embeddable":true,"href":"https:\/\/blog.samarthya.me\/wps\/wp-json\/wp\/v2\/users\/2"}],"replies":[{"embeddable":true,"href":"https:\/\/blog.samarthya.me\/wps\/wp-json\/wp\/v2\/comments?post=581"}],"version-history":[{"count":0,"href":"https:\/\/blog.samarthya.me\/wps\/wp-json\/wp\/v2\/posts\/581\/revisions"}],"wp:featuredmedia":[{"embeddable":true,"href":"https:\/\/blog.samarthya.me\/wps\/wp-json\/wp\/v2\/media\/585"}],"wp:attachment":[{"href":"https:\/\/blog.samarthya.me\/wps\/wp-json\/wp\/v2\/media?parent=581"}],"wp:term":[{"taxonomy":"category","embeddable":true,"href":"https:\/\/blog.samarthya.me\/wps\/wp-json\/wp\/v2\/categories?post=581"},{"taxonomy":"post_tag","embeddable":true,"href":"https:\/\/blog.samarthya.me\/wps\/wp-json\/wp\/v2\/tags?post=581"}],"curies":[{"name":"wp","href":"https:\/\/api.w.org\/{rel}","templated":true}]}}