stdin Source

The stdin source reads JSON line events from standard input and feeds them into a nanosync pipeline as if they came from a live database source. Every line must be a valid JSON object in the format the stdout sink produces. The pipeline processes events in order and exits when stdin is closed or the read timeout fires.

Primary use cases:

Event format

Each line must be a complete JSON object. Required fields:

FieldDescription
_ns_opOperation: INSERT, UPDATE, or DELETE
_ns_tableFully qualified table name (e.g. public.orders)
(column names)Column values matching the table schema

Additional metadata fields from the stdout sink (_ns_lsn, _ns_committed_at, _ns_before) are accepted and passed through unchanged. Blank lines and lines beginning with # are skipped.

Example events:

{"_ns_op":"INSERT","_ns_table":"public.orders","_ns_lsn":"0/1A2B3C4","_ns_committed_at":"2025-01-15T10:32:00Z","id":1001,"customer_id":42,"total":99.95,"status":"pending"}
{"_ns_op":"UPDATE","_ns_table":"public.orders","_ns_lsn":"0/1A2B400","_ns_committed_at":"2025-01-15T10:33:12Z","_ns_before":{"id":1001,"status":"pending"},"id":1001,"customer_id":42,"total":99.95,"status":"shipped"}
{"_ns_op":"DELETE","_ns_table":"public.orders","_ns_lsn":"0/1A2B450","_ns_committed_at":"2025-01-15T10:40:00Z","_ns_before":{"id":1001,"customer_id":42,"total":99.95,"status":"shipped"},"id":1001}

Configuration

connections:
  - name: captured-events
    type: stdin

  - name: my-bigquery
    type: bigquery
    properties:
      project_id: my-gcp-project
      dataset_id: replication

pipelines:
  - name: replay
    source:
      connection: captured-events
    sink:
      connection: my-bigquery

Running a replay

Capture events from a live source to a file, then replay them later:

# Capture: stream from Postgres and write events to a file
nanosync stream --source "postgres://..." --tables "public.orders" > events.jsonl

# Replay: pipe the captured file into the replay pipeline
cat events.jsonl | nanosync apply --file replay-pipeline.yaml

Replay with transformation — filter to only INSERT events before feeding the pipeline:

jq -c 'select(._ns_op == "INSERT")' events.jsonl | nanosync apply --file replay-pipeline.yaml

Source properties

PropertyDefaultDescription
batch_size1000Events read and buffered per batch before flushing to the sink
read_timeout5sDuration to wait for the next line of input before considering the stream complete and shutting down

Set read_timeout higher if your event file has long gaps between events, or lower if you want the pipeline to exit quickly after the file ends.

Limitations

Pair the stdin source with the stdout sink to build a fully local pipeline that reads from a file and writes to the terminal. No cloud credentials, no live database — useful for verifying transform logic and schema handling in CI.