How to rename files or objects in Amazon S3?
March 9, 2023How to read and write Excel files with Spark?
March 16, 2023A pipeline is a definition of a series of processors that are to be executed in the same order as they are declared.
Think of a processor as a series of instructions that will be executed.
In this post we are going to create a pipeline to add a field named doc_timestamp to all the documents that are added to the index.
Creating a pipeline
With the below PUT we are creating a pipeline name doc_timestamp. This pipeline has only one processor which sets a field name doc_timestamp and set the value of the field to the timestamp when it is being ingested or added to the index.
curl -X PUT http://localhost:9200/_ingest/pipeline/doc_timestamp?pretty -H 'Content-Type: application/json' -d '
{
"description": "pipeline to add timestamp to documents",
"processors": [
{
"set": {
"field": "_source.doc_timestamp",
"value": "{{_ingest.timestamp}}"
}
}
]
}'
{
"acknowledged" : true
}
Attach the pipeline to an index
Here we are attaching the pipeline doc_timestamp to account_v2 index but marking it as the default_pipeline for the index.
curl -X PUT http://localhost:9200/account_v2/_settings?pretty -H 'Content-Type: application/json' -d '
{
"index.default_pipeline": "doc_timestamp"
}'
{
"acknowledged" : true
}
Now that the pipeline is attached to the index, anytime a document is added to the index a new field doc_timestamp will be added to the document. This doesn’t affect any of the existing documents in the index.
Let’s look up an existing document. We don’t see the doc_timestamp field in this document and it is expected.
curl -X GET localhost:9200/account_v2/_doc/735?pretty
{
"_index" : "account_v2",
"_type" : "_doc",
"_id" : "735",
"_version" : 1,
"_seq_no" : 344,
"_primary_term" : 1,
"found" : true,
"_source" : {
"account_number" : 735,
"balance" : 3984,
"firstname" : "Loraine",
"lastname" : "Willis",
"age" : 32,
"gender" : "F",
"address" : "928 Grove Street",
"employer" : "Gadtron",
"email" : "lorainewillis@gadtron.com",
"city" : "Lowgap",
"state" : "NY"
}
}
Add a new document to the index
Let’s add a new document to the index with id 2000.
curl -XPUT http://localhost:9200/account_v2/_doc/2000?pretty -H 'Content-Type: application/json' -d '{
"account_number": 2000,
"balance": 16418,
"firstname": "Elinor",
"lastname": "Ratliff",
"age": 36,
"gender": "M",
"address": "282 Kings Place",
"employer": "Scentric",
"email": "elinorratliff@scentric.com",
"city": "Ribera",
"state": "WA"
}'
{
"_index" : "account_v2",
"_type" : "_doc",
"_id" : "2000",
"_version" : 1,
"result" : "created",
"_shards" : {
"total" : 2,
"successful" : 1,
"failed" : 0
},
"_seq_no" : 993,
"_primary_term" : 1
}
Now that the document is added, let’s look up the document and there we see the new field doc_timestamp added to the document with the timestamp at which it was added to the index.
curl -X GET localhost:9200/account_v2/_doc/2000?pretty
{
"_index" : "account_v2",
"_type" : "_doc",
"_id" : "2000",
"_version" : 1,
"_seq_no" : 993,
"_primary_term" : 1,
"found" : true,
"_source" : {
"account_number" : 2000,
"firstname" : "Elinor",
"address" : "282 Kings Place",
"gender" : "M",
"city" : "Ribera",
"lastname" : "Ratliff",
"balance" : 16418,
"employer" : "Scentric",
"state" : "WA",
"age" : 36,
"email" : "elinorratliff@scentric.com",
"doc_timestamp" : "2020-11-19T20:39:33.639398617Z"
}
}


1 Comment
[…] The Big Data in Real World team builds a pipeline: […]