Painless Pipelines
In Painless Enrichment, I used /_update_by_query to enrich documents in place without a full reindex.
This post takes that one step further: a repeatable enrichment pipeline for a products index using stored Painless scripts on a systemd timer.
I prefer systemd timers for this because of the same trade-offs in Systemd Timers vs. Cron Jobs: dependency control, predictable scheduling, and centralized logs.
The key design choice is to split enrichment by concern (pricing, inventory) instead of building one giant script. Smaller scripts are easier to test, version, and roll back.
Add target fields to the mapping
Start by adding only the derived fields these jobs will write.
PUT /products/_mapping
{
"properties": {
"discount_pct": { "type": "integer" },
"price_band": { "type": "keyword" },
"stock_state": { "type": "keyword" },
"enriched_at": { "type": "date" }
}
}
Store reusable scripts in cluster state
With the mapping in place, store each script with a versioned ID so the scheduler can call it reliably.
PUT /_scripts/products-pricing-signals-v1
{
"script": {
"lang": "painless",
"source": """
if (ctx._source.list_price != null &&
ctx._source.list_price > 0 &&
ctx._source.price != null) {
double discount =
((ctx._source.list_price - ctx._source.price) / ctx._source.list_price) * 100.0;
ctx._source.discount_pct = (int) Math.round(discount);
}
if (ctx._source.price != null) {
if (ctx._source.price < 25) {
ctx._source.price_band = "budget";
} else if (ctx._source.price < 100) {
ctx._source.price_band = "mid";
} else {
ctx._source.price_band = "premium";
}
}
ctx._source.enriched_at = params.now;
"""
}
}
PUT /_scripts/products-inventory-state-v1
{
"script": {
"lang": "painless",
"source": """
if (ctx._source.inventory_count != null) {
if (ctx._source.inventory_count <= 0) {
ctx._source.stock_state = "out_of_stock";
} else if (ctx._source.inventory_count < 10) {
ctx._source.stock_state = "low_stock";
} else {
ctx._source.stock_state = "in_stock";
}
ctx._source.enriched_at = params.now;
}
"""
}
}
Execute targeted update jobs
When it’s time to run, call /_update_by_query asynchronously per script and target only missing/stale fields so reruns stay cheap.
POST /products/_update_by_query?conflicts=proceed&wait_for_completion=false&slices=auto
{
"query": {
"bool": {
"should": [
{ "bool": { "must_not": { "exists": { "field": "discount_pct" } } } },
{ "range": { "enriched_at": { "lt": "now-30m" } } }
],
"minimum_should_match": 1
}
},
"script": {
"id": "products-pricing-signals-v1",
"params": {
"now": "{{current_timestamp}}"
}
}
}
Use the same pattern for products-inventory-state-v1 with a field-specific query. Each request returns a task id you can monitor with /_tasks/<task_id>.
Wrap the jobs in a runner script
At that point, wrap both jobs in a small runner script. It starts both update jobs in parallel, waits for both task ids, and writes job finished lines to journalctl.
#!/usr/bin/env bash
set -euo pipefail
ES_URL="http://127.0.0.1:9200"
INDEX="products"
NOW_UTC="$(date -u +"%Y-%m-%dT%H:%M:%SZ")"
run_job() {
local script_id="$1"
local query_json="$2"
curl -sS -X POST "$ES_URL/$INDEX/_update_by_query?conflicts=proceed&wait_for_completion=false&slices=auto" \
-H "Content-Type: application/json" \
-d "{
\"query\": $query_json,
\"script\": {
\"id\": \"$script_id\",
\"params\": { \"now\": \"$NOW_UTC\" }
}
}"
}
wait_for_task() {
local task_id="$1"
local job_name="$2"
local completed
while true; do
completed=$(curl -sS "$ES_URL/_tasks/$task_id" | jq -r '.completed')
if [ "$completed" = "true" ]; then
echo "$(date -u +"%Y-%m-%dT%H:%M:%SZ") job finished: $job_name ($task_id)"
break
fi
sleep 2
done
}
pricing_task_id=$(run_job "products-pricing-signals-v1" \
'{
"bool": {
"should": [
{ "bool": { "must_not": { "exists": { "field": "discount_pct" } } } },
{ "range": { "enriched_at": { "lt": "now-30m" } } }
],
"minimum_should_match": 1
}
}' | jq -r '.task')
inventory_task_id=$(run_job "products-inventory-state-v1" \
'{
"bool": {
"should": [
{ "bool": { "must_not": { "exists": { "field": "stock_state" } } } },
{ "range": { "enriched_at": { "lt": "now-15m" } } }
],
"minimum_should_match": 1
}
}' | jq -r '.task')
wait_for_task "$pricing_task_id" "products-pricing-signals-v1" &
wait_for_task "$inventory_task_id" "products-inventory-state-v1" &
wait
Schedule with systemd
Service unit:
# /etc/systemd/system/products-enrichment.service
[Unit]
Description=Run product enrichment scripts
After=network-online.target
Wants=network-online.target
[Service]
Type=oneshot
User=search
Group=search
ExecStart=/opt/search/bin/run-products-enrichment.sh
Timer unit:
# /etc/systemd/system/products-enrichment.timer
[Unit]
Description=Run product enrichment every 15 minutes
[Timer]
OnCalendar=*:0/15
Persistent=true
RandomizedDelaySec=20
[Install]
WantedBy=timers.target
Enable it:
sudo systemctl daemon-reload
sudo systemctl enable --now products-enrichment.timer
Verify and monitor
Check next/last run:
systemctl list-timers products-enrichment.timer
Check execution logs:
journalctl -u products-enrichment.service -n 100 --no-pager
Verify enriched fields are present:
GET /products/_search
{
"size": 5,
"_source": [
"name",
"price",
"list_price",
"discount_pct",
"price_band",
"inventory_count",
"stock_state",
"enriched_at"
],
"query": {
"match_all": {}
}
}
Is this production-ready?
Yes, for the right scope. If you’re enriching one Elasticsearch index with a small set of idempotent scripts on a predictable schedule, this systemd-based approach is simple, debuggable, and cheap to run.
Once the workflow turns into cross-system orchestration (multi-step dependencies, larger backfills, stricter retry/SLA needs, multiple teams), orchestration platforms like Airflow or Dagster are usually a better fit.