Pre-Indexing: Pipelines

I don't have direct control over my incoming documents' structure but before they get inserted into my index I want to:
- skip a document if it comes from a test environment (
"env": "test"
) - drop all empty fields (empty strings, empty arrays,
null
) - remove leading underscores from attribute names (
_color
→color
) - concatenate two string fields together (
color
+category
)
Can I do it in one go?
My documents will look like this:
{
"env": "staging",
"_tags": [],
"null": null,
"_category": "jackets",
"_color": "white"
}
and I'd like to end up with:
{
"env" : "staging",
"color" : "white",
"category" : "jackets"
"seo_category" : "white jackets"
}
Every time a document is about to be inserted, there's the possibility to let it run through a pipeline*. A pipeline is composed of blocks called processors that will be executed in the order they've been declared. There are dozens of built-in processors available but you can compose your own in a scripting language called Painless (documented here and here .)
PUT _ingest/pipeline/my_data_cleanser
{
"description": "Runs a doc thru data cleanser...",
"processors": [
{
"drop": {
"if" : "ctx.env == 'test'"
}
},
{
"script": {
"source": """
// *For more on empty fields, check out [Field Existence, Length, and Size](../6%20(Painless)%20Scripting%20c678b56cbec3430b913081023ff0d5db/Missing%20vs%20Empty%20Fields%2099b95a21f39645d6a1ad920593eb705c.md)*
def keys_to_remove = ctx.keySet()
.stream()
.filter(field -> ctx[field] == null ||
ctx[field] == "" ||
(ctx[field] instanceof ArrayList && ctx[field].isEmpty()))
.collect(Collectors.toList());
for (key in keys_to_remove) {
ctx.remove(key);
}
"""
}
},
{
"script": {
"source": """
def forbidden_keys = [
'_type',
'_id',
'_version_type',
'_index',
'_version'
];
def keys_to_remove = [];
def corrected_keys_map = [:];
for (def pair : ctx.entrySet()) {
def key = pair.getKey();
if (forbidden_keys.contains(key)) {
continue;
}
def value = pair.getValue();
if (!key.startsWith('_')) {
continue;
}
// we care only about the first underscore
def new_key = key.substring(1);
if (new_key.length() < 1) {
continue;
}
corrected_keys_map[new_key] = value;
keys_to_remove.add(key);
}
// delete underscored pairs & prevent ConcurrentModificationException
ctx.entrySet().removeIf(e -> keys_to_remove.contains(e.getKey()));
// save the corrected entries back to the doc context
ctx.putAll(corrected_keys_map);
"""
}
},
{
"set" : {
"field": "seo_category",
"value": "{{_source.color}} {{_source.category}}"
}
}
]
}
As we're about to insert a document, we'll specify the ?pipeline
attribute ( java docs , python docs ):
POST clothing_index/_doc?pipeline=my_data_cleanser
{
"env": "staging",
"_tags": [],
"null": null,
"_category": "jackets",
"_color": "white"
}
and verify that the pipeline worked by querying the index: