The goal of espipe is to be a minimalist command-line utility to bulk ingest documents from a file or I/O stream into an Elasticsearch cluster. No enrichment, no transformation, no complication.
Have you ever had thousands of sample documents in an .ndjson or .csv file, and you just want to load them all into a local insecure Elasticsearch cluster?
espipe docs.ndjson http://localhost:9200/new_indexAnd you're done.
Add a my-cluster host entry with API keys to the ~/.espipe/hosts.yml and you can reference the host by name:
espipe docs.ndjson my-cluster:/new_indexBeing multi-threaded and unthrottled, espipe is capable of fully saturating the CPU of the sending host and can potentially overwhelm the target cluster, so use with caution. It will gracefully handle backpressure and http 429 responses to ensure at-least-once delivery.
Documents are batched into _bulk requests of 5,000 documents and sent with the create action by default. Use --action to switch to index or update based on your needs. For --action=update, each source document must include an _id field for the update target. Use --batch-size and --max-requests to tune bulk request size and concurrency at runtime.
Install the published crate with Cargo:
cargo install espipeRun the published container image:
docker run --rm vimcommando/espipe --helpTo build from source instead:
git clone https://github.com/VimCommando/espipe
cd espipe
cargo install --path .To build the container image from source:
docker build -f docker/Dockerfile -t espipe:local .To build and publish a multi-platform Docker Hub image:
./bin/buildx.shespipe reads records from:
.ndjsonfiles.ndjson.gzfiles.jsonfiles.csvfiles.csv.gzfilesstdinas NDJSON
It writes records to:
- Elasticsearch
_bulk - a local
.ndjsonor.ndjson.gzfile stdout
When writing to Elasticsearch, espipe batches documents into groups of 5,000 records by default, enables request body gzip compression by default, and sends multiple bulk requests concurrently. Use --batch-size to change the number of documents per bulk request and --max-requests to change the number of in-flight bulk requests. File gzip compression is selected only for supported .csv.gz, .ndjson.gz, and output .ndjson.gz suffixes, and is separate from Elasticsearch request body compression.
Usage: espipe [OPTIONS] <INPUT> <OUTPUT>
Arguments:
<INPUT> The input URI to read docs from
<OUTPUT> The output URI to send docs to
Options:
-k, --insecure Ignore certificate validation
-a, --apikey <APIKEY> Apikey to authenticate via http header
-u, --username <USERNAME> Username for basic authentication
-p, --password <PASSWORD> Password for basic authentication
-q, --quiet Quiet mode, don't print runtime summary
-z, --uncompressed Disable request body gzip compression
--action <ACTION> Bulk action for Elasticsearch outputs [default: create] [possible values: create, index, update]
--batch-size <BATCH_SIZE> Documents per Elasticsearch bulk request [default: 5000]
--max-requests <MAX_REQUESTS> Maximum concurrent Elasticsearch bulk requests [default: 16]
-h, --help Print helpBoth positional arguments are parsed as URI-like strings.
-Reads NDJSON fromstdin.path/to/file.ndjsonReads NDJSON from a local file.path/to/file.ndjson.gzReads gzip-compressed NDJSON from a local file.path/to/file.jsonReads line-delimited JSON from a local file.path/to/file.csvReads CSV from a local file.path/to/file.csv.gzReads gzip-compressed CSV from a local file.file:///absolute/path/to/file.ndjsonReads NDJSON from afile://URI.file:///absolute/path/to/file.ndjson.gzReads gzip-compressed NDJSON from afile://URI.file:///absolute/path/to/file.jsonReads line-delimited JSON from afile://URI.file:///absolute/path/to/file.csvReads CSV from afile://URI.file:///absolute/path/to/file.csv.gzReads gzip-compressed CSV from afile://URI.
HTTPS input URIs are supported for unauthenticated remote .csv, .ndjson, and .json sources. URLs without a supported file extension can still be accepted when the response Content-Type maps to CSV or NDJSON-oriented JSON input.
-Writes raw JSON lines tostdout.path/to/output.ndjsonWrites raw JSON lines to a local file, truncating any existing file.path/to/output.ndjson.gzWrites gzip-compressed raw JSON lines to a local file, truncating any existing file.file:///absolute/path/to/output.ndjsonWrites raw JSON lines to afile://URI target.file:///absolute/path/to/output.ndjson.gzWrites gzip-compressed raw JSON lines to afile://URI target.http://host:9200/index-nameSends documents to Elasticsearch using the_bulkAPI.https://host:9200/index-nameSends documents to Elasticsearch over TLS.known-host:index-nameResolvesknown-hostfrom a local hosts file and sends to the named index.
When writing to Elasticsearch, the output path must include an index name.
Remote .json inputs are treated as NDJSON. If the downloaded JSON payload does not match the required NDJSON shape, espipe exits with: JSON payload does not look like required NDJSON input format.
Each line must be valid line-delimited JSON. For pass-through JSON inputs, espipe expects the first non-whitespace character on each line to be {.
The first row must be a header row. Each subsequent row is converted into a JSON object using the CSV headers as field names.
CSV values are emitted as JSON strings. espipe does not infer numeric, boolean, or date types from CSV input.
espipe supports three Elasticsearch bulk actions:
createSends each document as acreateoperation.indexSends each document as anindexoperation.updateSends each document as anupdateoperation with a{ "doc": ... }payload.
For --action update, every input document must:
- be a JSON object
- include an
_idfield - have
_idas a string
The _id field is removed from the document body and used as the update target.
For Elasticsearch targets:
--batch-sizeSets the number of documents included in each_bulkrequest.--max-requestsSets the maximum number of concurrent in-flight bulk requests.
The internal channel capacity always matches --batch-size.
For Elasticsearch targets, espipe:
- batches documents into 5,000-document
_bulkrequests by default - keeps up to 16 bulk requests in flight by default
- enables gzip request body compression by default
- retries
429 Too Many Requestsresponses with exponential backoff - logs bulk-item error counts when Elasticsearch reports partial failures
400 Bad Request bulk responses are logged and counted as zero successful documents for that batch.
For file and stdout targets, espipe writes one raw JSON document per line. It does not emit Elasticsearch bulk action metadata lines for these outputs.
Authentication flags apply only to direct http:// and https:// Elasticsearch outputs:
--apikey--username--password--insecure
Known hosts are loaded from:
$ESPIPE_HOSTS, if set- otherwise
~/.espipe/hosts.yml
Example:
localhost:
auth: None
url: http://localhost:9200/
secure-cluster:
auth: Basic
url: https://example.com:9200/
username: elastic
password: changeme
insecure: false
ess-cluster:
auth: ApiKey
url: https://cluster.example.com/
apikey: "base64-encoded-api-key"Usage:
espipe docs.ndjson localhost:my-index
espipe docs.ndjson secure-cluster:my-index
espipe docs.ndjson ess-cluster:my-indexFor known-host outputs, authentication and TLS settings come from the host entry. CLI auth flags are not applied on top of the known-host configuration.
espipe docs.ndjson http://localhost:9200/my-indexespipe users.csv http://localhost:9200/userscat docs.ndjson | espipe - http://localhost:9200/my-indexespipe users.csv output.ndjsonespipe users.csv.gz output.ndjson.gz
espipe docs.ndjson.gz http://localhost:9200/my-indexespipe docs.ndjson https://example.com:9200/my-index \
--username elastic \
--password changemeespipe docs.ndjson https://example.com:9200/my-index \
--apikey "base64-encoded-api-key"espipe docs.ndjson http://localhost:9200/my-index --uncompressedespipe docs.ndjson http://localhost:9200/my-index \
--batch-size 1000 \
--max-requests 4Input:
{"_id":"1","message":"hello"}
{"_id":"2","message":"world"}
Command:
espipe docs.ndjson http://localhost:9200/my-index --action updateespipe is optimized for straightforward ingestion, not for rich machine-readable error reporting.
Current behavior:
- invalid CLI argument combinations are rejected by
clap - invalid authentication combinations fail at startup
- invalid input or output targets fail at startup
- Elasticsearch transport failures during send or close terminate the process
429bulk responses are retried automatically- bulk item failures are logged, but successful items in the same batch are still counted
One current limitation is that input parsing errors and end-of-input are handled through the same loop boundary. In practice, malformed NDJSON or CSV input may stop ingestion early without a dedicated non-zero parsing exit code.
espipe is intentionally aggressive enough to saturate a local or small remote cluster.
Current bulk worker settings:
- batch size: 5,000 documents
- channel capacity: 5,000 documents
- max in-flight bulk requests: 16
- Tokio worker threads: 3
This is fast for local ingestion and test data loading, but it can overwhelm smaller clusters or shared environments.
Set LOG_LEVEL to inspect request and ingestion behavior:
LOG_LEVEL=debug espipe docs.ndjson http://localhost:9200/my-indexUseful checks:
- verify the target index name is present in the output URI
- verify CSV files have a header row
- verify NDJSON files contain one complete JSON object per line
- verify
--action updateinputs include string_idvalues - verify known-host entries live in
~/.espipe/hosts.ymlor$ESPIPE_HOSTS
espipe is a binary crate. It does not publish or support a public Rust library interface.