Pre-Indexing: Pipelines
I don't have direct control over my incoming documents' structure but before they get inserted into my index I want to:
- skip a document if it comes from a test environment (
"env": "test") - drop all empty fields (empty strings, empty arrays,
null) - remove leading underscores from attribute names (
_color→color) - concatenate two string fields together (
color+category)
Can I do it in one go?
My documents will look like this:
{
"env": "staging",
"_tags": [],
"null": null,
"_category": "jackets",
"_color": "white"
} and I'd like to end up with:
{
"env" : "staging",
"color" : "white",
"category" : "jackets"
"seo_category" : "white jackets"
} Every time a document is about to be inserted, there's the possibility to let it run through a pipeline*. A pipeline is composed of blocks called processors that will be executed in the order they've been declared. There are dozens of built-in processors available but you can compose your own in a scripting language called Painless (documented here and here .)
PUT _ingest/pipeline/my_data_cleanser
{
"description": "Runs a doc thru data cleanser...",
"processors": [
{
"drop": {
"if" : "ctx.env == 'test'"
}
},
{
"script": {
"source": """
// *For more on empty fields, check out [Field Existence, Length, and Size](../6%20(Painless)%20Scripting%20c678b56cbec3430b913081023ff0d5db/Missing%20vs%20Empty%20Fields%2099b95a21f39645d6a1ad920593eb705c.md)*
def keys_to_remove = ctx.keySet()
.stream()
.filter(field -> ctx[field] == null ||
ctx[field] == "" ||
(ctx[field] instanceof ArrayList && ctx[field].isEmpty()))
.collect(Collectors.toList());
for (key in keys_to_remove) {
ctx.remove(key);
}
"""
}
},
{
"script": {
"source": """
def forbidden_keys = [
'_type',
'_id',
'_version_type',
'_index',
'_version'
];
def keys_to_remove = [];
def corrected_keys_map = [:];
for (def pair : ctx.entrySet()) {
def key = pair.getKey();
if (forbidden_keys.contains(key)) {
continue;
}
def value = pair.getValue();
if (!key.startsWith('_')) {
continue;
}
// we care only about the first underscore
def new_key = key.substring(1);
if (new_key.length() < 1) {
continue;
}
corrected_keys_map[new_key] = value;
keys_to_remove.add(key);
}
// delete underscored pairs & prevent ConcurrentModificationException
ctx.entrySet().removeIf(e -> keys_to_remove.contains(e.getKey()));
// save the corrected entries back to the doc context
ctx.putAll(corrected_keys_map);
"""
}
},
{
"set" : {
"field": "seo_category",
"value": "{{_source.color}} {{_source.category}}"
}
}
]
} As we're about to insert a document, we'll specify the ?pipeline attribute ( java docs , python docs ):
POST clothing_index/_doc?pipeline=my_data_cleanser
{
"env": "staging",
"_tags": [],
"null": null,
"_category": "jackets",
"_color": "white"
} and verify that the pipeline worked by querying the index:
POST clothing_index/_search Limiting a field to a set of allowed values can be handled using a single drop processor:
PUT _ingest/pipeline/enforce_abc
{
"description": "Enforces a, b, or c in 'my_field'",
"processors": [
{
"drop": {
"if" : "!['a', 'b', 'c'].contains(ctx.my_field)"
}
}
]
} When putting together a pipeline, it's onerous to save it under an ID, index some docs with the applied pipeline, delete them after that, adjust the pipeline, and then repeat the whole process again. You can instead simulate a pipeline without saving it and without indexing any docs by employing the Simulate pipeline API: