
a cli for replaying historical kinesis messages via an s3 archive

MIT License



a cli for replaying historical kinesis messages via an s3 archive (e.g. s3 firehose delivery destination)



Currently s3-kinesis-replay ships binaries for OSX and Linux 64bit systems. You can download the latest release from GitHub

$ wget https://github.com/cludden/s3-kinesis-replay/releases/download/{version}/{artifact}

Building From Source

$ make

Getting Started

View usage information:

$ s3-kinesis-replay --help
a cli for replaying historical kinesis messages via an s3 archive

  s3-kinesis-replay [flags]

      --bucket string                         s3 archive bucket name
      --delimiter string                      optional delimiter regexp
      --format string                         parser format
  -h, --help                                  help for s3-kinesis-replay
      --json-concurrency int                  json parser concurrency (default 4)
      --json-schema string                    json parser schema path
      --kinesis-backoff-interval string       kinesis backoff interval
      --kinesis-backoff-max-interval string   kinesis max backoff interval
      --kinesis-buffer-window string          kinesis buffer window size
      --kinesis-endpoint string               kinesis endpoint override
      --kinesis-region string                 kinesis region override
      --log-level string                      log verbosity level
      --partition-key string                  json parser parition key path
      --prefix string                         s3 archive prefix
      --replace string                        optional replace regexp
      --replace-with string                   optional replacement string
      --s3-concurrency int                    s3 download concurrency (default 4)
      --s3-region string                      s3 archive region
      --start-after string                    s3 archive start-after key
      --stop-at string                        s3 archive stop-at key
      --stream-name string                    target kinesis stream name

Basic usage with all required flags: (assumes aws environment is configured)

$ s3-kinesis-replay \
    --bucket my-bucket \
    --prefix 2018/01 \
    --stream-name my-stream \
    --format json \
    --partition-key path.to.partitionKey

Note: It is worthwile to use this application from within the same region as your s3 bucket & kinesis stream to avoid s3 egress charges


s3-kinesis-replay uses viper for configuration, and accepts a configuration file, environment variables, cli flags, or a combination of all three.

Key EnvVar Flag Description Required Default
json.concurrency JSON_CONCURRENCY --json-concurrency number of parser goroutines 4
json.partition_key JSON_PARTITION_KEY --partition-key path to json field holding paritition key true
json.schema JSON_SCHEMA --json-schema path to json schema file
kinesis.backoff_interval KINESIS_BACKOFF_INTERVAL --kinesis-backoff-interval duration string for initial backoff 1s
kinesis.backoff_max_interval KINESIS_BACKOFF_MAX_INTERVAL --kinesis-backoff-max-interval duration string for max backoff 10s
kinesis.buffer_window KINESIS_BUFFER_WINDOW --kinesis-buffer-window duration string for buffer window 10s
kinesis.endpoint KINESIS_ENDPOINT --kinesis-endpoint optional endpoint override
kinesis.region KINESIS_REGION --kinesis-region target stream aws region, will fall back to AWS_REGION environment variable
kinesis.stream_name KINESIS_STREAM_NAME --stream-name target kinesis stream name true
log.format LOG_FORMAT --log-format supports json or text json
log.level LOG_LEVEL --log-level logging verbosity info
parser.delimiter PARSER_DELIMITER --delimiter used to split s3 objects into multiple messages
parser.format PARSER_FORMAT --format the parser to use. currently json is the only supported parser true
parser.replace PARSER_REPLACE --replace an optional replace regex patter
parser.replace_with PARSER_REPLACE_WITH --replace-wth an optional replacement string
s3.bucket S3_BUCKET --bucket the s3 bucket name that contains the archive true
s3.concurrency S3_CONCURRENCY --s3-concurrency the number of goroutines to use for downloading s3 objects 4
s3.endpoint S3_ENDPOINT --s3-endpoint an optional s3 endpoint override
s3.prefix S3_PREFIX --prefix an optional s3 key prefix
s3.region S3_REGION --s3-region the s3 bucket region, will fall back to AWS_REGION environment variable
s3.start_after S3_START_AFTER --start-after start scanning after this s3 key
s3.stop_at S3_STOP_AT --stop-at stop scanning at this s3 key


  1. Fork it
  2. Create your feature branch (git checkout -b my-new-feature)
  3. Commit your changes using conventional changelog standards (git commit -am 'feat: adds my new feature')
  4. Push to the branch (git push origin my-new-feature)
  5. Ensure linting/tests are all passing
  6. Create new Pull Request




# run short tests
$ docker-compose run test make test-short

# run all tests
$ docker-compose run test make test-verbose

# run tests with coverage
$ docker-compose run test make test-coverage


# run lint
$ docker-compose run test make lint


Licensed under the MIT License

Copyright (c) 2018 Chris Ludden