StreamAlert is a serverless, realtime data analysis framework which empowers you to ingest, analyze, and alert on data from any environment, using datasources and alerting logic you define.
APACHE-2.0 License
Published by ryandeivert over 6 years ago
This is to be used as a rolling 'stable' tag. It allows the following command to clone the latest stable release without need to update documentation:
git clone --branch stable https://github.com/airbnb/streamalert.git
Updating the tag in the future requires the following:
git tag stable --force
git tag --force stable <commit_sha>
git push origin stable --force
Published by ryandeivert over 6 years ago
This release now supports Github as an alerting output. Sending alerts to a github
output will now create an Issue in the specified Github.com repository. A huge thanks to @patrickod for this contribution!
Also new to this release is support for Komand as an alerting output. This allows Komand to carry out specific actions when alerts are triggered and further expands StreamAlert’s integration with security orchestration tools. A huge thanks to @0xdabbad00 for this contribution!
The PagerDuty Incidents alerting output now supports adding notes to Incidents created in PagerDuty. This is accomplished by adding a note
to an record’s context within a rule.
Improved handling of S3 payloads, including skipping files of zero size and checking for IOError
related issues when downloading objects.
Connection reset by peer
bug filed in #478
TypeError
when deleting messages from SQSUpdated CloudTrail Events log schema and new schema for Carbon Black Audit logs.
Published by ryandeivert over 6 years ago
This release includes a new (beta) threat intelligence feature to enable analysis and identification of suspicious activity in your infrastructure based on IP address, domain and file hash indicators.
StreamAlert compares these indicators (stored in a DynamoDB table) to incoming data in real-time, and generates an alert if any matches are found.
To compliment this feature, it also includes a Threat Intel Downloader which is a Lambda function to collect and update the latest IP addresses, domains and file hashes mentioned above to the DynamoDB table. Currently, the Threat Intel Downloader supports fetching of data from Anomali’s ThreatStream API.
StreamAlert Apps enable you to easily retrieve data from any service with a RESTful API to send to StreamAlert for processing and alerting. The infrastructure is serverless, the configuration takes minutes, and the deployment is automated via Terraform.
Read more about this feature in our blog post, or learn how to get started with StreamAlert Apps in the documentation.
This release includes several apps, with more coming in future releases:
As announced in the last release (v1.5.0), StreamAlert can be configured to search generated alerts with AWS Athena.
This feature has been extended to support delivery of all incoming logs into Amazon S3 via AWS Firehose, and to be searched by AWS Athena in the streamalert
database. This allows users to query data for long periods of time, and perform statistics, joins, and other analysis.
The StreamAlert CLI also manages the setup, creation, and provisioning of data tables and required AWS infrastructure. To get started, check out our Athena setup instructions.
StreamAlert now includes support for two new PagerDuty API outputs:
Rule test events can now be configured to indicate which rules they will trigger, and includes the log schema that this event corresponds to. The CLI also now reports on hard-to-diagnose errors related to rule tests. See the documentation for more information on the new test event structure.
StreamAlert now includes rule helper functions which help you recursively find key-values in records without worrying about the schema or nesting.
Bandit is a Python scanner which checks for common security issues in Python source code. The project has now been updated to run bandit
on the StreamAlert source as part the CI pipeline.
Kinesis shard-level metrics, via enhanced monitoring, will now be disabled by default with the optional ability to configure specific metrics to log. This will greatly reduce AWS costs for end-users.
Updates to various Carbon Black schemas fixes for Carbon Black and CloudTrail logs.
Special thanks for the following external contributions from @armtash and @javefang:
Published by jacknagz about 7 years ago
StreamAlert now supports historical searching of alerts!. To enable this functionality, follow the steps outlined in the docs.
Once setup is complete, ensure your rules are sending alerts to the default S3
bucket created by StreamAlert:
Example conf/outputs.json
config:
{
"aws-s3": {
"main": "<my-prefix>.streamalerts"
}
}
Example rule:
@rule(logs=['cloudtrail:events'],
outputs=['aws-s3:main'])
def test_cloudtrail_rule(rec):
return rec['region'] == 'us-west-2'
To search alerts, open AWS Athena and run desired SELECT
statements on the alerts
table in the newly created streamalert
database.
Optionally, a dt
partition can be specified to limit results to the nearest hour.
For more information on SQL syntax and options, see the Athena Language Reference.
To gain a better understanding of your StreamAlert deployment, detailed metrics have been added for failed log parsing (FailedParses
), total records processed (TotalRecords
), total triggered alerts (TriggeredAlerts
), and more.
Custom metrics can be enabled or disabled using the python manage.py metrics
command for either aggregate and per-cluster metrics.
Alarms can also be configured using the python manage.py create-alarm
command. For more information on metrics setup, click the link in the header above.
Previously, in order to verify that a newly added schema was working as expected, a rule had to be created.
The new python manage.py validate-schemas
command removes the need to create a rule to test a schema.
After you have created a schema, and added a test event in tests/integration/rules,
the schema can be verified by running:
$ python manage.py validate-schemas --test-files <rule_file_name.json>
It is common for multiple logs to have similar fields, but with different key names.
Examples include src_ip
, source_ip
, client_ip
, remote_address
, remote_ip
, dst_ip
, etc.
What if you wanted to write a single rule that analyzed all IP addresses found in your logs? With data normalization, you can!
By normalizing schema keys, rules can be simplified and consolidated.
Let’s walk through an example, using two example schemas:
{
"system:logs": {
"parser": "json",
"schema": {
"date": "string",
"client_ip": "string", # represents an ip address
"message": "string",
"name": "string"
}
},
"web:logs": {
"parser": "json",
"schema": {
"error_code": "string",
"filename": "string",
"src_ip": "string", # also represents an ip address
"name": "string"
}
}
}
The field names to be normalized are declared in conf/types.json
. In this case, we will normalize the ip
related fields.
{
"system": {
"sourceAddress": ["client_ip"]
},
"web": {
"sourceAddress": ["src_ip"]
}
}
Note the usage of CEF format. For examples, see the provided conf/types.json in the repository.
When writing rules, you can use the special keyword argument datatypes
to ensure that the rule applies to all logs with this normalized field:
from helpers.base import fetch_values_by_datatype, in_network
@rule(datatypes=['sourceAddress'],
outputs=['aws-s3:main'])
def trusted_ip_check(rec):
# Verify that a system IP is within the trusted CIDR set
ip_addresses = fetch_values_by_datatype(rec, ‘sourceAddress’)
trusted_cidrs = {‘10.0.100.0/24’, ‘10.1.200.0/24’}
return not all(in_network(ip, trusted_cidrs) for ip in ip_addresses)
Note: Rules can still be restricted to specific log types by using the logs
constraint.
Two other large benefits of data normalization:
The removal of SNS has simplified inter-service communication and increased reliability in alert delivery between Lambda functions.
Alert delivery has been consolidated to a single S3 bucket to enable historical searching of alerts.
The stream_alert_cli.py
command line tool has been renamed to manage.py
.
To get started with the new CLI:
$ python manage.py --help
$ python manage.py <subcommand> --help
#223 - Fix nested rule directory import errors
#250 - Massive Pylint cleanup
#274 - Prevent the alert processor from running without a valid config
#284 - Raise exception if output credentials could not be encrypted
#300, #315 - VPC flow log, CarbonBlack, Osquery schema fixes and additional support
#297 - GitHub schema fixes
Published by jacknagz over 7 years ago
To encourage collaboration and contribution of StreamAlert rules from the community, the rules directory has been reorganized:
|------- rules/
| |------- community/
| |------- default/
When contributing public rules, rule files should be placed within a named subdirectory under the community folder. An example is the cloudtrail rules in rules/community/cloudtrail
.
For rules internal to your organization, the default
folder is a great starting point. Arbitrary amounts of subdirectories can be created under this directory. Remember to always place a blank __init__.py
in new subdirectories to be picked up by rule processor imports.
Matchers and helpers have also been reorganized into their own respective directories:
|------- conf/
|------- docs/
|------- helpers/
|------- matchers/
|------- rules/
|------- stream_alert/
|------- stream_alert_cli/
|------- terraform/
|------- test/
Be sure to update rules and matchers referencing helpers based on this new structure.
StreamAlert’s supporting AWS infrastructure is managed by a set of Terraform modules. Each module controls a piece of StreamAlert. An example is the monitoring
module, used to create metric alarms and alert administrators when Lambda errors or throttles occur.
To give users full control over which modules and settings they would like, clusters have been refactored into independent JSON files:
# conf/clusters/production.json
{
"id": "production",
"region": "us-west-2",
"modules": {
"stream_alert": {
"alert_processor": {
"timeout": 25,
"memory": 128,
"current_version": "$LATEST"
},
"rule_processor": {
"timeout": 10,
"memory": 256,
"current_version": "$LATEST"
}
},
"cloudwatch_monitoring": {
"enabled": true
},
"kinesis": {
"streams": {
"shards": 1,
"retention": 24
},
"firehose": {
"enabled": true,
"s3_bucket_suffix": "streamalert.results"
}
},
"kinesis_events": {
"enabled": true
}
},
"outputs": {
"kinesis": [
"username",
"access_key_id",
"secret_key"
]
}
}
For more information on setup, check out https://www.streamalert.io/clusters.html
AWS VPC (Virtual Private Cloud) allows users or organizations to run virtual machines in a logically segmented environment. To support delivery of StreamAlerts to internal resources (such as EC2 instances), the alert processor may now be configured to access resources inside a VPC:
# conf/clusters/<cluster-name>.json
{
"alert_processor": {
"vpc_config": {
"subnet_ids": ["subnet-id-1"],
"security_group_ids": ["security-group-id-1"]
}
}
}
Note: When making this change, you must explicitly destroy and then re-create the alert processor:
$ cd terraform
$ terraform destroy -target=module.stream_alert_<cluster-name>.aws_lambda_function.streamalert_alert_processor
Then, run:
$ python stream_alert_cli.py terraform build
To better validate StreamAlert’s end-to-end functionality, testing has been reworked to support sending alerts from a local StreamAlert repo. With a local set of valid AWS credentials, it is possible to use configured rule tests to dispatch alerts to configured outputs (such as Slack or PagerDuty).
This functionality is provided through the StreamAlertCLI tool, with the new command line argument live-test
:
$ python stream_alert_cli.py live-test --cluster <cluster_name>
For normal use cases, it is unlikely to want (or need) to test the full ruleset, as this could result in a high volume of alerts to outputs. To test specific rules, the --rules
argument followed by a space-delimited list of rule names to test:
$ python stream_alert_cli.py live-test --cluster <cluster_name> --rules <rule_name_01> <rule_name_02>
#129 - Cluster aware SNS inputs
#166 - Apply optional top level keys to nested JSON records
#168 - Fix the handler import path for the alert_processor
#183 - Lambda traceback due to PagerDuty errors
#201 - Updated IAM permissions for streamalert user
#202 - Handle errors when Terraform is not installed
#206, #209 - Schema updates to osquery and carbonblack:watchlist.hit.binary
Published by jacknagz over 7 years ago
Log schemas now support list
, boolean
, and float
types for more accurate schemas (#77). As records are parsed by the rule_processor
, fields will now cast into these new types to be referenced by rules.
Example Schema:
"carbonblack:feed.storage.hit.process": {
"schema": {
"sensor_id": "integer",
"report_score": "integer",
"from_feed_search": "boolean",
"feed_id": "integer",
"ioc_type": "string",
"ioc_attr": {},
"docs": [],
"group": "string",
"server_name": "string",
"hostname": "string",
"feed_name": "string",
"cb_server": "string",
"timestamp": "float",
"process_guid": "string",
"interface_ip": "string",
"type": "string"
},
"parser": "json"
}
}
Example rule:
@rule(logs=['carbonblack:feed.storage.hit.process'],
matchers=[],
outputs=['slack:soc', 'pagerduty:soc'])
def cb_storage_hit_process(rec):
"""This event occurs when an intelligence feed indicator matches a new process upon ingest. """
return (
rec['from_feed_search'] == True and
len(rec['docs']) > 1
)
Additionally, to handle logs with optional keys, a new parser option optional_top_level_keys
has been added (#95). At a minimum, an incoming record must contain the keys defined in the schema
, and if any of the defined optional_top_level_keys
do not exist, an empty default value (per the defined type) will be added to the parsed record. This is to ensure rules do not reference keys that may not exist and subsequently result in an exception.
Example Schema:
"github:enterprise": {
"schema": {
"@timestamp": "string",
"@version": "integer",
"host": "string",
"message": "string",
"port": "integer",
"received_at": "string",
"tags": []
},
"parser": "json",
"configuration": {
"optional_top_level_keys": {
"logsource": "string",
"pid": "integer",
"program": "string",
"timestamp": "string"
}
}
}
This schema supports the following logs:
[
{
"message": "github_audit message",
"@version": "1",
"@timestamp": "2015-05-20T20:00:36.731Z",
"host": "10.0.0.1",
"port": 59310,
"tags": [],
"received_at": "2015-05-20T20:00:36.731Z",
"timestamp": "May 20 20:00:36",
"logsource": "github",
"program": "github_audit"
},
{
"message": "github_audit message",
"@version": "1",
"@timestamp": "2015-05-20T20:00:36.731Z",
"host": "10.0.0.1",
"port": 59310,
"pid": 1599,
"tags": [],
"received_at": "2015-05-20T20:00:36.731Z",
"timestamp": "May 20 20:00:36",
"logsource": "github",
"program": "github_audit"
}
]
To quickly disable rules without deleting them, a new decorator (@disable
) has been added (#75). Note: This decorator must be right above the @rule
decorator with no spaces:
Example rule:
rule = StreamRules.rule
disable = StreamRules.disable()
@disable
@rule(logs=['carbonblack:feed.storage.hit.process'],
matchers=[],
outputs=['slack:soc', 'pagerduty:soc'])
def cb_storage_hit_process(rec):
"""This event occurs when an intelligence feed indicator matches a new process upon ingest. """
return (
rec['from_feed_search'] == True and
len(rec['docs'] > 1
)
When @disable
is being used, make sure to update the integration test to not expect an alert to trigger:
{
"records": [
{
"data": {...},
"description": "CB Feed Storage Hit Process should not trigger an alert",
"trigger": false,
"source": "my_s3_bucket",
"service": "s3"
}
]
}
Messages sent to Slack outputs are now formatted using mrkdwn styling, and sent as a series of attachments (#135).
Example output:
Adding new outputs for supported services is now as easy as running:
$ python stream_alert_cli.py output new --service slack
This will create a new Slack integration. Prompts will then walk through entering any information required for the service. The currently supported services as of this release are: AWS Lambda, AWS S3, Pagerduty, Phantom, and Slack.
As an added bonus, these changes allow rules to send alerts to multiple configured outputs per service. For example, a rule could previously only send to one 'destination' in Slack, but can not send to multiple configured webhooks per service. To send to different integrations in Slack, a user would simply add them to the rule, like so:
@rule(logs=['carbonblack:feed.storage.hit.binary'],
matchers=[],
outputs=['slack:alerts_channel', 'slack:direct_message', 'pagerduty:corp_alerts'])
def cb_feed_storage_hit_binary_virustotal(rec):
"""Identify binaries that match against the virustotal feed"""
return (
rec['type'] == 'feed.storage.hit.binary' and
rec['feed_name'] == 'virustotal'
)
The StreamAlert output classes have also been refactored to easily enable the addition of new output services (#97). The documentation has been updated to demonstrate this new extensibility along with providing a walkthrough of how to implement a new service to send alerts to.
To promote Serverless Service Oriented Architectures, StreamAlert now has the ability to accept input from arbitrary AWS SNS topics (#118/#119) and invoke arbitrary AWS Lambda functions as an output (#110).
To enable StreamAlert to accept input from SNS topics, modify the conf/inputs.json file, and terraform will automatically handle subscribing to the topic(s).
Example of adding an SNS input:
{
"aws-sns": {
"our_sns_input": "arn:aws:sns:us-east-1:012345678912:sns-topic-name"
}
}
As stated in the Modular Outputs section above, users can add AWS Lambda functions that they would like to utilize as outputs via the stream_alert_cli.py tool. This is accomplished by simply running the following command and following the prompts:
$ python stream_alert_cli.py output new --service aws-lambda
Example:
$ python stream_alert_cli.py output new --service aws-lambda
StreamAlertCLI [INFO]: Issues? Report here: https://github.com/airbnb/streamalert/issues
Please supply a short and unique descriptor for this Lambda function configuration
(ie: abbreviated name): external-lambda-function
Please supply the AWS arn, with the optional qualifier, that represents the Lambda function
to use for this configuration (ie: arn:aws:lambda:aws-region:acct-id:function:output_function:qualifier):
arn:aws:lambda:us-east-1:012345678912:function:my_function:Production
StreamAlertCLI [INFO]: Successfully saved 'external-lambda-function' output configuration
for service 'aws-lambda'
StreamAlertCLI [INFO]: Completed
#126, #137, #147, #161 - StreamAlert performance improvements thanks to @ryandeivert!
#100 - Check Slack message size before sending, and appropriately split long messages.
#79 - Does not upload the Lambda deployment package if pip
fails to install dependencies.
Published by jacknagz over 7 years ago
AWS VPC Flow Logs is a feature that enables you to capture information about the network traffic going to and from network interfaces in your VPC. This network flow is represented as (srcaddr
, dstaddr
, srcport
, dstport
, and protocol
). Potential use cases for these logs include network traffic analysis, ACL auditing, and more.
StreamAlert now formally supports the setup, ingestion, and analysis of these logs. Follow the instructions below to get setup in minutes!
Add the following to your cluster(s) .tf
file located in the terraform/
directory:
module "flow_logs_cluster_name_here" {
source = "modules/tf_stream_alert_flow_logs"
destination_stream_arn = "${module.kinesis_cluster_name_here.arn}"
targets = "${var.flow_log_settings["cluster_name_here"]}"
region = "${lookup(var.clusters, "cluster_name_here")}"
flow_log_group_name = "${var.prefix}_cluster_name_here_stream_alert_flow_logs"
}
In variables.json
, define the specific VPC, Subnet, or ENI IDs to capture flow logs from:
{
"flow_log_settings": {
"vpcs": ["vpc-id"],
"subnets": ["public-subnet-id"],
"enis": ["eni-id"]
},
}
Apply these changes:
$ ./stream_alert_cli.py terraform build
To configure StreamAlert to process these logs, follow the instructions here to add the flow_log
type in conf/logs.json
and conf/sources.json
.
Finally, deploy the new version of the AWS Lambda function:
$ ./stream_alert_cli.py lambda deploy --env staging --func alert
If no Cloudwatch alarms are triggered, deploy to production
:
$ ./stream_alert_cli.py lambda deploy --env production --func alert
It is common for applications (Cloudwatch, Inspec, and more) to output a single line JSON object. Previously, StreamAlert treated each line as an individual payload. This meant nested JSON objects were treated as one payload. With this release, StreamAlert now detects nested records, and parses them as individual payloads to be processed by rules.
As an example, let's look at the following log (prettified for this example):
{
"Records": [
{
"eventVersion": "1",
"eventID": "1",
"eventTime": "10:45:35 PM UTC",
"eventType": "1",
"request": "aws lambda list-functions",
"awsRegion": "us-east-1"
},
{
"eventVersion": "1",
"eventID": "2",
"eventTime": "11:45:35 PM UTC",
"eventType": "2",
"request": "aws lambda delete-function",
"awsRegion": "us-east-1"
}
]
}
When defining a schema for a nested log type like this, a hint
named records
must be specified with a JSONPath-RW selector pointing to the nested records:
"nested_log_type": {
"parser": "json",
"schema": {
"eventVersion": "string",
"eventID": "string",
"eventTime": "string",
"eventType": "string",
"request": "string",
"awsRegion": "string"
},
"hints" : {
"records": "Records[*]"
}
Rule testing is a crucial part of writing safe, effective rules. With the new integration testing framework, rule fixtures (example logs) are defined in test/integration/rules
, and have the following structure:
{
"records": [
{
"data": "Jan 01 2017,1487095529,test-host-2,this is test data for rules,cluster 5",
"description": "host is test-host-2",
"trigger": true,
"source": "prefix_cluster1_stream_alert_kinesis",
"service": "kinesis"
}
]
}
Each record
includes a log to test (the data
key), along with metadata (description
, source
, service
), and a desired outcome of the test (whether or not it should trigger
an alert).
For this example, the following rule will be tested:
@rule(logs=['csv_log'],
matchers=[],
outputs=['s3'])
def sample_csv_rule(rec):
return rec['host'] == 'test-host-2'
To run tests against this rule, use the following helper script:
$ ./test/scripts/integration_test_kinesis.sh
sample_csv_rule
test: host is test-host-2 [Pass]
For additional examples, check out Rules Testing.
Previously, rules and matchers required a name argument as well as a function name. This has been simplified, and now you only need to define the name in one place:
Before:
@matcher('prod')
def prod(rec):
return rec['environment'] == 'prod'
@rule('invalid_subnet',
logs=['osquery'],
matchers=['prod'],
outputs=['pagerduty'])
def invalid_subnet(rec):
return True
After:
@matcher()
def prod(rec): # matcher name `prod`
return rec['environment'] == 'prod'
@rule(logs=['osquery'],
matchers=['prod'],
outputs=['pagerduty'])
def invalid_subnet(rec): # rule name `invalid_subnet`
return True
To accommodate users with existing incident management and alerting infrastructure, a new flag has been added to return a list of generated alerts (instead of handling them with StreamAlert Outputs).
This option is enabled by passing return_alerts=True
to the StreamAlert
initializer in the main.py
function handler:
from stream_alert.handler import StreamAlert
def handler(event, context):
"""Main Lambda handler function"""
alerts = StreamAlert(return_alerts=True).run(event, context)
# custom workflow goes here
s3transfer
package dependency in requirements.txt.
hints
option bugPublished by jacknagz over 7 years ago
stream_alert/parsers.py
with the following structure:@parser
class NewParserName(ParserBase):
# the name of the new parser to be called in the conf/logs.json
__parserid__ = 'new-parser-name'
def parser(self):
# these attributes are automatically set on initialization
data = self.data
options = self.options
schema = self.schema
# parser logic goes here
# optionally, you can define helper methods in this
# class to make parsing easier/cleaner
# return a parsed dictionary
return parsed_payload
"csv_log": {
"schema": {
"date": "string",
...
},
"parser": "csv",
"delimiter": "|",
"hints": {}
}
,
for csv, k=v
for kv), you can omit these settings from your config.StreamPayload
class to prevent marshalling of unnecessary attributesterraform
subcommand