Translate values of subscribed MQTT topics, write values to InfluxDB database.
ISC License
MQTT::InfluxDB::Translator
Translate values of subscribed MQTT topics, write values to InfluxDB database.
InfluxDB
.InfluxDB
record (seelookup_translate
and lookup_topics
)Add this line to your application's Gemfile
:
gem "mqtt-influxdb-translator", git: "https://github.com/trombik/mqtt-influxdb-translator.git", branch: "master"
And then execute:
> bundle
---
influxdb:
auth_method: basic_auth
username: user
password: password
database: mydatabase
time_precision: ns
hosts:
- influxdb.example.org
paho-mqtt:
host: hab.i.trombik.org
ssl: false
reconnect_delay: 10
client_id: mqtt-influxdb-translator
persistent: true
log_level: DEBUG
log_file: /dev/null
topics:
- ["homie/+/esp/uptime", 1]
- ["homie/+/esp/freeheap", 1]
- ["homie/+/esp/rssi", 1]
- ["homie/+/esp/signal", 1]
lookup_topics:
- ["homie/+/esp/location", 1]
translate: |
# the translation logic, will be executed with ruby's `eval`.
#
# Available variables:
#
# * topic, MQTT topic string
# * value, value of the topic. note that the value is always String. if the
# value is number, use `value.to_i`, or `value.to_f`.
# * timestamp, nano sec when the value is fetched
#
# this ruby fragment must return a list of `name` and `data`.
# they are arguments for InfluxDB#write_point. See
# https://github.com/influxdata/influxdb-ruby
#
(_dummy, mac_addr, node_name, attribute_name) = topic.split("/")
data = {
values: { attribute_name => value.to_i },
tags: { mac_addr: mac_addr },
timestamp: timestamp
}
if @lookup.key?(mac_addr) && @lookup[mac_addr].key?("location")
data[:tags][:location] = @lookup[mac_addr]["location"]
end
return [node_name, data]
lookup_translate: |
(_dummy, mac_addr, node_name, attribute_name) = topic.split("/")
@lookup[mac_addr] = { attribute_name => value }
daemons
Key | Description | Default |
---|---|---|
daemonize |
bool . If true, daemonize. If false, run in foreground. |
true |
pid_dir |
Path to PID file directory |
current directory |
influxdb
The hash is passed to InfluxDB::Client
. See
https://github.com/influxdata/influxdb-ruby
for possible options.
paho-mqtt
The hash is passed to PahoMqtt::Client
. See
https://github.com/RubyDevInc/paho.mqtt.ruby
for possible options.
log_level
Log level. One of accepted values by Logger
class, such as DEBUG
, and INFO
.
translate
The ruby code to translate MQTT topic and value. Available variables are:
topic
, MQTT topic stringvalue
, value of the topic. Note that the value is always String
. If thevalue.to_i
, or value.to_f
.timestamp
, nano sec when the value is fetchedlookup_translate
, and lookup_topics
lookup_topics
is a list of optional MQTT topics to subscribe. The values of
the topics will not be sent to InfluxDB
.
lookup_translate
is ruby code to translate lookup_topics
and values for
lookup in translate
. When a value of the topics is published,
lookup_translate
will be executed. Available variables are same as in
translate
.
In lookup_translate
, use @lookup
instance variable to keep some values for
lookup in translate
.
An example: you want to monitor a value of an MQTT topic.
homie/aabbccddeeff/esp/signal: 75
Also, your device publishes another MQTT topic, location
.
homie/aabbccddeeff/esp/location: "somewhere"
You want to include a tag in InfluxDB
record, location=somewhere
, so that
you can see not only MAC address of the device, but also the location of the
device, which is more useful for humans. But, in translate
, what you have is
MQTT topic, its value, and time only. With lookup_topics
and
lookup_translate
, you can lookup the location of the device with MAC address
as a key. See the following example.
---
topics:
- ["homie/+/esp/uptime", 1]
- ["homie/+/esp/freeheap", 1]
- ["homie/+/esp/rssi", 1]
- ["homie/+/esp/signal", 1]
lookup_topics:
- ["homie/+/esp/location", 1]
translate: |
(_dummy, mac_addr, node_name, attribute_name) = topic.split("/")
data = {
values: { attribute_name => value.to_i },
tags: { mac_addr: mac_addr },
timestamp: timestamp
}
if @lookup.key?(mac_addr) && @lookup[mac_addr].key?("location")
data[:tags][:location] = @lookup[mac_addr]["location"]
end
return [node_name, data]
lookup_translate: |
(_dummy, mac_addr, node_name, attribute_name) = topic.split("/")
@lookup[mac_addr] = { attribute_name => value }
To start the daemon, run:
bundle exec ruby exe/mqitctl start -- /path/to/config.yml
To stop the daemon, run:
bundle exec ruby exe/mqitctl stop -- /path/to/config.yml
ruby
2.5.x and newerbundler
2.xnpm
After checking out the repo, run bin/setup
to install dependencies. Then,
run rake spec
to run the tests. You can also run bin/console
for an
interactive prompt that will allow you to experiment.
MQTT
topicsDuring the development, you might need to remove obsolete, retained MQTT
topics. Use mqtt-forget
to remove topics.
node_modules/mqtt-forget/index.js -u mqtt://mqtt.example.org -f -t 'homie/${MAC_ADDRESS}/#'
roomPing
device from InfluxDB
Run influx
influx -precision rfc3339 -database $MYDATABSE -username $USERNAME -password $PASSWORD -host influxdb.example.org
Run InfluxQL
command.
DELETE FROM esp WHERE mac_addr = '$MAC_ADDRESS'
Bug reports and pull requests are welcome on GitHub at https://github.com/trombik/mqtt-influxdb-translator.
The gem is available as open source under the terms of the ISC License.