a cli for replaying historical kinesis messages via an s3 archive (e.g. s3 firehose delivery destination)
Download
Currently s3-kinesis-replay
ships binaries for OSX and Linux 64bit systems. You can download the latest release from GitHub
$ wget https://github.com/cludden/s3-kinesis-replay/releases/download/{version}/{artifact}
Building From Source
$ make
View usage information:
$ s3-kinesis-replay --help
a cli for replaying historical kinesis messages via an s3 archive
Usage:
s3-kinesis-replay [flags]
Flags:
--bucket string s3 archive bucket name
--delimiter string optional delimiter regexp
--format string parser format
-h, --help help for s3-kinesis-replay
--json-concurrency int json parser concurrency (default 4)
--json-schema string json parser schema path
--kinesis-backoff-interval string kinesis backoff interval
--kinesis-backoff-max-interval string kinesis max backoff interval
--kinesis-buffer-window string kinesis buffer window size
--kinesis-endpoint string kinesis endpoint override
--kinesis-region string kinesis region override
--log-level string log verbosity level
--partition-key string json parser parition key path
--prefix string s3 archive prefix
--replace string optional replace regexp
--replace-with string optional replacement string
--s3-concurrency int s3 download concurrency (default 4)
--s3-region string s3 archive region
--start-after string s3 archive start-after key
--stop-at string s3 archive stop-at key
--stream-name string target kinesis stream name
Basic usage with all required flags: (assumes aws environment is configured)
$ s3-kinesis-replay \
--bucket my-bucket \
--prefix 2018/01 \
--stream-name my-stream \
--format json \
--partition-key path.to.partitionKey
Note: It is worthwile to use this application from within the same region as your s3 bucket & kinesis stream to avoid s3 egress charges
s3-kinesis-replay
uses viper for configuration, and accepts a configuration file, environment variables, cli flags, or a combination of all three.
Key | EnvVar | Flag | Description | Required | Default |
---|---|---|---|---|---|
json.concurrency | JSON_CONCURRENCY | --json-concurrency | number of parser goroutines | 4 | |
json.partition_key | JSON_PARTITION_KEY | --partition-key | path to json field holding paritition key | true | |
json.schema | JSON_SCHEMA | --json-schema | path to json schema file | ||
kinesis.backoff_interval | KINESIS_BACKOFF_INTERVAL | --kinesis-backoff-interval | duration string for initial backoff | 1s | |
kinesis.backoff_max_interval | KINESIS_BACKOFF_MAX_INTERVAL | --kinesis-backoff-max-interval | duration string for max backoff | 10s | |
kinesis.buffer_window | KINESIS_BUFFER_WINDOW | --kinesis-buffer-window | duration string for buffer window | 10s | |
kinesis.endpoint | KINESIS_ENDPOINT | --kinesis-endpoint | optional endpoint override | ||
kinesis.region | KINESIS_REGION | --kinesis-region | target stream aws region, will fall back to AWS_REGION environment variable | ||
kinesis.stream_name | KINESIS_STREAM_NAME | --stream-name | target kinesis stream name | true | |
log.format | LOG_FORMAT | --log-format | supports json or text
|
json | |
log.level | LOG_LEVEL | --log-level | logging verbosity | info | |
parser.delimiter | PARSER_DELIMITER | --delimiter | used to split s3 objects into multiple messages | ||
parser.format | PARSER_FORMAT | --format | the parser to use. currently json is the only supported parser |
true | |
parser.replace | PARSER_REPLACE | --replace | an optional replace regex patter | ||
parser.replace_with | PARSER_REPLACE_WITH | --replace-wth | an optional replacement string | ||
s3.bucket | S3_BUCKET | --bucket | the s3 bucket name that contains the archive | true | |
s3.concurrency | S3_CONCURRENCY | --s3-concurrency | the number of goroutines to use for downloading s3 objects | 4 | |
s3.endpoint | S3_ENDPOINT | --s3-endpoint | an optional s3 endpoint override | ||
s3.prefix | S3_PREFIX | --prefix | an optional s3 key prefix | ||
s3.region | S3_REGION | --s3-region | the s3 bucket region, will fall back to AWS_REGION environment variable |
||
s3.start_after | S3_START_AFTER | --start-after | start scanning after this s3 key | ||
s3.stop_at | S3_STOP_AT | --stop-at | stop scanning at this s3 key |
git checkout -b my-new-feature
)git commit -am 'feat: adds my new feature'
)git push origin my-new-feature
)Prerequisities:
Testing:
# run short tests
$ docker-compose run test make test-short
# run all tests
$ docker-compose run test make test-verbose
# run tests with coverage
$ docker-compose run test make test-coverage
Linting:
# run lint
$ docker-compose run test make lint
Licensed under the MIT License
Copyright (c) 2018 Chris Ludden