stdin Source
The stdin source reads JSON line events from standard input and feeds them into a nanosync pipeline as if they came from a live database source. Every line must be a valid JSON object in the format the stdout sink produces. The pipeline processes events in order and exits when stdin is closed or the read timeout fires.
Primary use cases:
- Replay — pipe a previously captured
.jsonlfile back through a pipeline to re-process events or replay them into a different sink. - CI testing — run a pipeline in automated tests without a live database. Feed synthetic events and assert sink output.
- Load testing a sink — generate high-volume event streams and measure sink throughput without putting pressure on a source database.
Event format
Each line must be a complete JSON object. Required fields:
| Field | Description |
|---|---|
_ns_op | Operation: INSERT, UPDATE, or DELETE |
_ns_table | Fully qualified table name (e.g. public.orders) |
| (column names) | Column values matching the table schema |
Additional metadata fields from the stdout sink (_ns_lsn, _ns_committed_at, _ns_before) are accepted and passed through unchanged. Blank lines and lines beginning with # are skipped.
Example events:
{"_ns_op":"INSERT","_ns_table":"public.orders","_ns_lsn":"0/1A2B3C4","_ns_committed_at":"2025-01-15T10:32:00Z","id":1001,"customer_id":42,"total":99.95,"status":"pending"}
{"_ns_op":"UPDATE","_ns_table":"public.orders","_ns_lsn":"0/1A2B400","_ns_committed_at":"2025-01-15T10:33:12Z","_ns_before":{"id":1001,"status":"pending"},"id":1001,"customer_id":42,"total":99.95,"status":"shipped"}
{"_ns_op":"DELETE","_ns_table":"public.orders","_ns_lsn":"0/1A2B450","_ns_committed_at":"2025-01-15T10:40:00Z","_ns_before":{"id":1001,"customer_id":42,"total":99.95,"status":"shipped"},"id":1001}
Configuration
connections:
- name: captured-events
type: stdin
- name: my-bigquery
type: bigquery
properties:
project_id: my-gcp-project
dataset_id: replication
pipelines:
- name: replay
source:
connection: captured-events
sink:
connection: my-bigquery
Running a replay
Capture events from a live source to a file, then replay them later:
# Capture: stream from Postgres and write events to a file
nanosync stream --source "postgres://..." --tables "public.orders" > events.jsonl
# Replay: pipe the captured file into the replay pipeline
cat events.jsonl | nanosync apply --file replay-pipeline.yaml
Replay with transformation — filter to only INSERT events before feeding the pipeline:
jq -c 'select(._ns_op == "INSERT")' events.jsonl | nanosync apply --file replay-pipeline.yaml
Source properties
| Property | Default | Description |
|---|---|---|
batch_size | 1000 | Events read and buffered per batch before flushing to the sink |
read_timeout | 5s | Duration to wait for the next line of input before considering the stream complete and shutting down |
Set read_timeout higher if your event file has long gaps between events, or lower if you want the pipeline to exit quickly after the file ends.
Limitations
- No resume on interrupt. stdin is not seekable. If the pipeline is interrupted mid-replay, restart from the beginning of the file. There is no LSN-based checkpoint for stdin — the state store records that the pipeline ran but cannot resume from a mid-file position.
- Schema must match the sink. The stdin source does not negotiate schema with the sink. Events whose column set does not match the target table schema will be rejected by the sink. Ensure the event file was captured from a compatible schema version.
- Single table per event. The
_ns_tablefield in each event determines which table the event targets. Mixed-table files (events from multiple tables) are supported — each event is routed by its_ns_tablevalue.
Pair the stdin source with the stdout sink to build a fully local pipeline that reads from a file and writes to the terminal. No cloud credentials, no live database — useful for verifying transform logic and schema handling in CI.