This repository implements the control protocol client for the Elastic Agent. All processes implementing Elastic Agent inputs must implement a control protocol client.
The control protocol is implemented as gRPC service on localhost using port 6789 by default and secured using mTLS.
This repository currently contains three separate protobuf files:
The entrypoint for implementing a control protocol client in Go is to create an instance of the client by calling client.NewV2() or client.NewV2FromReader().
The development process is driven by Mage. Run mage -l
to see the list of targets.
When editing the the .proto files first run mage update:all
to run protoc followed by mage check:all
to run the checks and tests.
When ready to publish a release for consumption by client implementations, create a tag and a new release at https://github.com/elastic/elastic-agent-client/releases.
The design of the control protocol follows from the Elastic Agent architecture which defines the component and unit terminology used in the protocol. The connection sequence is as follows:
ConnInfo
message type describes the information// Connection information sent to the application on startup so it knows how to connect back to the Elastic Agent.
//
// This is normally sent through stdin and should never be sent across a network un-encrypted.
message ConnInfo {
// GRPC connection address.
string addr = 1;
// Server name to use when connecting over TLS.
string server_name = 2;
// Token that the application should send as the unique identifier when connecting over the GRPC.
string token = 3;
// CA certificate.
bytes ca_cert = 4;
// Peer certificate.
bytes peer_cert = 5;
// Peer private key.
bytes peer_key = 6;
// Allowed services that spawned process can use. (only used in V2)
repeated ConnInfoServices services = 7;
}
The client.NewV2FromReader() is a convenience method for directly initializing a client from Stdin at startup.
CheckinV2
and Actions
streaming RPC to allow them to receive their expected configuration and anyThe following example messages were captured from a running Elastic Agent with the system integration configured to collect logs but not metrics. The message exchange occurs between the agent and a new logfile component implemented by Filebeat. The messages were serialized to JSON using the protojson.Marshal. Control protocol message logging is currently not possible in production because the payloads contain complete integration input configurations, which can contain credentials for the systems being monitored.
logfile
component first sends the initial CheckinObserved
message:{
"@timestamp": "2023-08-15T19:41:39.987Z",
"observed": {
"token": "27e63445-dff2-48a7-927d-3113ab50769b",
"versionInfo": {
"name": "beat-v2-client",
"version": "8.10.0",
"meta": {
"build_time": "2023-08-14 23:40:19 +0000 UTC",
"commit": "ca76790f16b6f36e131cebd64b6b56d009f7a4dd"
}
}
}
}
CheckinExpected
message.logfile
input section of the agent policyoutput
section.{
"@timestamp": "2023-08-15T19:41:39.987Z",
"expected": {
"units": [
{
"id": "log-default-logfile-system-8ec9254c-0f1e-4166-965b-549c2818a0b0",
"state": "HEALTHY",
"configStateIdx": "1",
"config": {
"source": {
"data_stream": {
"namespace": "default"
},
"id": "logfile-system-8ec9254c-0f1e-4166-965b-549c2818a0b0",
"meta": {
"package": {
"name": "system",
"version": "1.38.2"
}
},
"name": "system-1",
"package_policy_id": "8ec9254c-0f1e-4166-965b-549c2818a0b0",
"policy": {
"revision": 3
},
"revision": 2,
"streams": [
{
"data_stream": {
"dataset": "system.auth",
"type": "logs"
},
"exclude_files": [
".gz$"
],
"id": "logfile-system.auth-8ec9254c-0f1e-4166-965b-549c2818a0b0",
"ignore_older": "72h",
"multiline": {
"match": "after",
"pattern": "^\\s"
},
"paths": [
"/var/log/auth.log*",
"/var/log/secure*"
],
"processors": [
{
"add_locale": null
}
],
"tags": [
"system-auth"
]
},
{
"data_stream": {
"dataset": "system.syslog",
"type": "logs"
},
"exclude_files": [
".gz$"
],
"id": "logfile-system.syslog-8ec9254c-0f1e-4166-965b-549c2818a0b0",
"ignore_older": "72h",
"multiline": {
"match": "after",
"pattern": "^\\s"
},
"paths": [
"/var/log/messages*",
"/var/log/syslog*",
"/var/log/system*"
],
"processors": [
{
"add_locale": null
}
]
}
],
"type": "log"
},
"id": "logfile-system-8ec9254c-0f1e-4166-965b-549c2818a0b0",
"type": "log",
"name": "system-1",
"revision": "2",
"meta": {
"source": {
"package": {
"name": "system",
"version": "1.38.2"
}
},
"package": {
"source": {
"name": "system",
"version": "1.38.2"
},
"name": "system",
"version": "1.38.2"
}
},
"dataStream": {
"source": {
"namespace": "default"
},
"namespace": "default"
},
"streams": [
{
"source": {
"data_stream": {
"dataset": "system.auth",
"type": "logs"
},
"exclude_files": [
".gz$"
],
"id": "logfile-system.auth-8ec9254c-0f1e-4166-965b-549c2818a0b0",
"ignore_older": "72h",
"multiline": {
"match": "after",
"pattern": "^\\s"
},
"paths": [
"/var/log/auth.log*",
"/var/log/secure*"
],
"processors": [
{
"add_locale": null
}
],
"tags": [
"system-auth"
]
},
"id": "logfile-system.auth-8ec9254c-0f1e-4166-965b-549c2818a0b0",
"dataStream": {
"source": {
"dataset": "system.auth",
"type": "logs"
},
"dataset": "system.auth",
"type": "logs"
}
},
{
"source": {
"data_stream": {
"dataset": "system.syslog",
"type": "logs"
},
"exclude_files": [
".gz$"
],
"id": "logfile-system.syslog-8ec9254c-0f1e-4166-965b-549c2818a0b0",
"ignore_older": "72h",
"multiline": {
"match": "after",
"pattern": "^\\s"
},
"paths": [
"/var/log/messages*",
"/var/log/syslog*",
"/var/log/system*"
],
"processors": [
{
"add_locale": null
}
]
},
"id": "logfile-system.syslog-8ec9254c-0f1e-4166-965b-549c2818a0b0",
"dataStream": {
"source": {
"dataset": "system.syslog",
"type": "logs"
},
"dataset": "system.syslog",
"type": "logs"
}
}
]
},
"logLevel": "INFO"
},
{
"id": "log-default",
"type": "OUTPUT",
"state": "HEALTHY",
"configStateIdx": "1",
"config": {
"source": {
"api_key": "fake:api-key",
"hosts": [
"https://localhost:9092"
],
"type": "elasticsearch"
},
"type": "elasticsearch"
},
"logLevel": "INFO"
}
],
"features": {
"source": {
"agent": {
"features": {
"fqdn": {
"enabled": false
}
}
}
},
"fqdn": {}
},
"featuresIdx": "2",
"component": {
"limits": {
"source": {
"go_max_procs": 0
}
}
},
"componentIdx": "2"
}
}
logfile
component checks in again reporting that it has succesfully applied the latest configuration.{
"@timestamp": "2023-08-15T19:41:41.009Z",
"observed": {
"token": "27e63445-dff2-48a7-927d-3113ab50769b",
"units": [
{
"id": "log-default-logfile-system-8ec9254c-0f1e-4166-965b-549c2818a0b0",
"configStateIdx": "1",
"message": "Starting"
},
{
"id": "log-default",
"type": "OUTPUT",
"configStateIdx": "1",
"state": "HEALTHY",
"message": "Healthy"
}
],
"featuresIdx": "2",
"componentIdx": "2"
}
}
CheckinExpected
message indicating the component is up to date.{
"@timestamp": "2023-08-15T19:41:41.009Z",
"expected": {
"units": [
{
"id": "log-default-logfile-system-8ec9254c-0f1e-4166-965b-549c2818a0b0",
"state": "HEALTHY",
"configStateIdx": "1",
"logLevel": "INFO"
},
{
"id": "log-default",
"type": "OUTPUT",
"state": "HEALTHY",
"configStateIdx": "1",
"logLevel": "INFO"
}
],
"features": {
"source": {
"agent": {
"features": {
"fqdn": {
"enabled": false
}
}
}
},
"fqdn": {}
},
"featuresIdx": "2",
"component": {
"limits": {
"source": {
"go_max_procs": 0
}
}
},
"componentIdx": "2"
},
}
To log control protocol messages in the format above in Go, code like the following can be used with the zap logger.
// When logging either the agent or client side of the exchange both an observed and expected response
// are likely be available at the same time.
var expected *proto.CheckinExpected,
var observed *proto.CheckinObserved,
obsJSON, _ := protojson.Marshal(observed)
expJSON, _ := protojson.Marshal(expected)
c.logger.Infow("Checkin",
zap.Any("observed", json.RawMessage(obsJSON)),
zap.Any("expected", json.RawMessage(expJSON)),
)