Pre-Indexing: Pipelines

Spatialized founder Jozef Sorocin
Jozef Soročin
Updated 07/13/2025

I don't have direct control over my incoming documents' structure but before they get inserted into my index I want to:

  1. skip a document if it comes from a test environment ("env": "test")
  2. drop all empty fields (empty strings, empty arrays, null)
  3. remove leading underscores from attribute names (_colorcolor)
  4. concatenate two string fields together (color + category)

Can I do it in one go?

My documents will look like this:

{
  "env": "staging",
  "_tags": [],
  "null": null,
  "_category": "jackets",
  "_color": "white"
}

and I'd like to end up with:

{
  "env" : "staging",
  "color" : "white",
  "category" : "jackets"
  "seo_category" : "white jackets"
}

Every time a document is about to be inserted, there's the possibility to let it run through a pipeline*. A pipeline is composed of blocks called processors that will be executed in the order they've been declared. There are dozens of built-in processors available but you can compose your own in a scripting language called Painless (documented here and here .)

PUT _ingest/pipeline/my_data_cleanser
{
  "description": "Runs a doc thru data cleanser...",
  "processors": [
    {
      "drop": {
        "if" : "ctx.env == 'test'"
      }
    },

    {
      "script": {
        "source": """
          // *For more on empty fields, check out [Field Existence, Length, and Size](../6%20(Painless)%20Scripting%20c678b56cbec3430b913081023ff0d5db/Missing%20vs%20Empty%20Fields%2099b95a21f39645d6a1ad920593eb705c.md)*
          def keys_to_remove = ctx.keySet()
                          .stream()
                          .filter(field -> ctx[field] == null ||
                                           ctx[field] == "" ||
                                           (ctx[field] instanceof ArrayList && ctx[field].isEmpty()))
                          .collect(Collectors.toList());

          for (key in keys_to_remove) {
            ctx.remove(key);
          }
        """
      }
    },

    {
      "script": {
        "source": """
          def forbidden_keys = [
            '_type',
            '_id',
            '_version_type',
            '_index',
            '_version'
          ];

          def keys_to_remove = [];
          def corrected_keys_map = [:];

          for (def pair : ctx.entrySet()) {
            def key = pair.getKey();
            if (forbidden_keys.contains(key)) {
              continue;
            }
            def value = pair.getValue();

            if (!key.startsWith('_')) {
              continue;
            }

						// we care only about the first underscore
						def new_key = key.substring(1);
            if (new_key.length() < 1) {
              continue;
            }
            corrected_keys_map[new_key] = value;

            keys_to_remove.add(key);
          }

          // delete underscored pairs & prevent ConcurrentModificationException
          ctx.entrySet().removeIf(e -> keys_to_remove.contains(e.getKey()));

          // save the corrected entries back to the doc context
          ctx.putAll(corrected_keys_map);
        """
      }
    },

    {
      "set" : {
        "field": "seo_category",
        "value": "{{_source.color}} {{_source.category}}"
      }
    }
  ]
}

As we're about to insert a document, we'll specify the ?pipeline attribute ( java docs , python docs ):

POST clothing_index/_doc?pipeline=my_data_cleanser
{
  "env": "staging",
  "_tags": [],
  "null": null,
  "_category": "jackets",
  "_color": "white"
}

and verify that the pipeline worked by querying the index:

Join 200+ developers who've mastered this! Get Complete Access — €19
Already a member? Sign in here