The fully compliant, embeddable high-performance Go MQTT v5 server for IoT, smarthome, and pubsub
MIT License
**mochi-co/mqtt mochi-mqtt ** .
Mochi MQTT MQTT v5 / Go
MQTT MQ Telemetry Transport/Mochi MQTT MQTT 5.0.0
v5 MQTT v5 v3 v5 v3 v5 v3
MQTT v3.0.0 v3.1.1 v3 v5 - (retained messages)(inflight messages)QOS
Mochi MQTT cmd cmd/main.go tcp (:1883)websocket (:1882) (:8080)
cd cmd
go build -o mqtt && ./mqtt
docker pull mochimqtt/server
docker run -v $(pwd)/config.yaml:/config.yaml mochimqtt/server
yaml json Dockerfile cmd/main.go Websocket(:1882)TCP(:1883) (:8080) allow-all (Hook)
docker build -t mochi:latest .
docker run -p 1883:1883 -p 1882:1882 -p 8080:8080 -v $(pwd)/config.yaml:/config.yaml mochi:latest
Docker --config=config.yaml
--config=config.json
(hooks)(listeners)(options)(compatibilities)
listeners:
- type: "tcp"
id: "tcp12"
address: ":1883"
- type: "ws"
id: "ws1"
address: ":1882"
- type: "sysinfo"
id: "stats"
address: ":1880"
hooks:
auth:
allow_all: true
options:
inline_client: true
(Hooks)(listeners) cmd/main.go
Mochi MQTT
import (
"log"
mqtt "github.com/mochi-mqtt/server/v2"
"github.com/mochi-mqtt/server/v2/hooks/auth"
"github.com/mochi-mqtt/server/v2/listeners"
)
func main() {
//
sigs := make(chan os.Signal, 1)
done := make(chan bool, 1)
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
go func() {
<-sigs
done <- true
}()
// MQTT
server := mqtt.New(nil)
// ()
_ = server.AddHook(new(auth.AllowHook), nil)
// 1883 TCP
tcp := listeners.NewTCP(listeners.Config{ID: "t1", Address: ":1883"})
err := server.AddListener(tcp)
if err != nil {
log.Fatal(err)
}
go func() {
err := server.Serve()
if err != nil {
log.Fatal(err)
}
}()
//
<-done
//
}
(Network Listeners)ListenersListeners
Listener | Usage |
---|---|
listeners.NewTCP | TCP TCP |
listeners.NewUnixSock | Unix |
listeners.NewNet | net.Listener |
listeners.NewWebsocket | Websocket |
listeners.NewHTTPStats | HTTP $SYS |
listeners.NewHTTPHealthCheck | HTTP |
*listeners.Config TLSListenerTLS cmd/main.go
(Options)
server := mqtt.New(&mqtt.Options{
Capabilities: mqtt.Capabilities{
MaximumSessionExpiryInterval: 3600,
MaximumClientWritesPending: 3,
Compatibilities: mqtt.Compatibilities{
ObscureNotAuthorized: true,
},
},
ClientNetWriteBufferSize: 4096,
ClientNetReadBufferSize: 4096,
SysTopicResendInterval: 10,
InlineClient: false,
})
mqtt.Optionsmqtt.Capabilities mqtt.Compatibilities ClientNetWriteBufferSize ClientNetReadBufferSize Capabilities.MaximumClientWritesPending IoT 1024*8 ,
(Event Hooks)Hook(authentication)(persistent storage)(debugging tools)
(Hook) - (Hook)(Hook)(Hook)
Mochi MQTT (DENY-ALL)(Hook)(DENY-ALL)(Hook) auth.AllowAll (Hook)(ALLOW-ALL)
server := mqtt.New(nil)
_ = server.AddHook(new(auth.AllowHook), nil)
(Auth Ledger hook) ACL
(Auth rules)
Client | ID |
Username | |
Password | |
Remote | IP |
Allow | true false |
ACL(ACL rules)
Client | ID |
Username | |
Remote | IP |
Filters |
0,1,2,3 hooks/auth/ledger.go
server := mqtt.New(nil)
err := server.AddHook(new(auth.Hook), &auth.Options{
Ledger: &auth.Ledger{
Auth: auth.AuthRules{ // Auth
{Username: "peach", Password: "password1", Allow: true},
{Username: "melon", Password: "password2", Allow: true},
{Remote: "127.0.0.1:*", Allow: true},
{Remote: "localhost:*", Allow: true},
},
ACL: auth.ACLRules{ // ACL
{Remote: "127.0.0.1:*"}, //
{
// melon
Username: "melon", Filters: auth.Filters{
"melon/#": auth.ReadWrite,
"updates/#": auth.WriteOnly, // updates updates
},
},
{
//
Filters: auth.Filters{
"#": auth.ReadOnly,
"updates/#": auth.Deny,
},
},
},
}
})
JSON YAML Data
err := server.AddHook(new(auth.Hook), &auth.Options{
Data: data, // yaml json
})
Redis (Hook)Redis(Hook)Redis(Hook) github.com/go-redis/redis/v8 Options
err := server.AddHook(new(redis.Hook), &redis.Options{
Options: &rv8.Options{
Addr: "localhost:6379", // Redis
Password: "", // Redis
DB: 0, // Redisindex
},
})
if err != nil {
log.Fatal(err)
}
Redis examples/persistence/redis/main.go hooks/storage/redis
PebbleDB (Hook)
err := server.AddHook(new(pebble.Hook), &pebble.Options{
Path: pebblePath,
Mode: pebble.NoSync,
})
if err != nil {
log.Fatal(err)
}
pebble (Hook) examples/persistence/pebble/main.go hooks/storage/pebble
BadgerDB (Hook)
err := server.AddHook(new(badger.Hook), &badger.Options{
Path: badgerPath,
})
if err != nil {
log.Fatal(err)
}
Badger (Hook) examples/persistence/badger/main.go hooks/storage/badger
BoltDB (Hook) Badger examples/persistence/bolt/main.go
(Hook) mqtt.Hook hooks.go (Hook)
OnPacketReadOnPacketEncode OnPacketSent -
OnStarted | |
OnStopped | |
OnConnectAuthenticate | hooks/auth/allow_all basicHook true |
OnACLCheck | ACL |
OnSysInfoTick | $SYS |
OnConnect | |
OnSessionEstablish | CONNACK |
OnSessionEstablished | OnConnect |
OnDisconnect | |
OnAuthPacket | MQTT v5 |
OnPacketRead | |
OnPacketEncode | |
OnPacketSent | |
OnPacketProcessed | |
OnSubscribe | |
OnSubscribed | |
OnSelectSubscribers | |
OnUnsubscribe | |
OnUnsubscribed | |
OnPublish | |
OnPublished | |
OnPublishDropped | |
OnRetainMessage | |
OnRetainPublished | |
OnQosPublish | QoS >= 1 |
OnQosComplete | QoS |
OnQosDropped | QoS |
OnPacketIDExhausted | packet idsid |
OnWill | |
OnWillSent | |
OnClientExpired | |
OnRetainedExpired | |
StoredClients | |
StoredSubscriptions | |
StoredInflightMessages | inflight messages |
StoredRetainedMessages | |
StoredSysInfo |
HookHookHookOnACLCheck OnConnectAuthenticate
server := mqtt.New(&mqtt.Options{
InlineClient: true,
})
server.Publishserver.Subscribe server.Unsubscribe
Publish server.Publish
err := server.Publish("direct/publish", []byte("packet scheduled message"), false, 0)
QoS MQTT v5
server.Subscribe
QoS0 MQTTv5 subscriptionId
callbackFn := func(cl *mqtt.Client, sub packets.Subscription, pk packets.Packet) {
server.Log.Info("inline client received message from subscription", "client", cl.ID, "subscriptionId", sub.Identifier, "topic", pk.TopicName, "payload", string(pk.Payload))
}
server.Subscribe("direct/#", 1, callbackFn)
server.Unsubscribe
server.Unsubscribe("direct/#", 1)
MQTT v5(publish packets)MQTT(packets)
(Packet Injection)MQTTping
(Inline Client)ACL$SYS
cl := server.NewClient(nil, "local", "inline", true)
server.InjectPacket(cl, packets.Packet{
FixedHeader: packets.FixedHeader{
Type: packets.Publish,
},
TopicName: "direct/publish",
Payload: []byte("scheduled message"),
})
MQTT MQTTv5
Mochi MQTT
go run --cover ./...
examples/paho/main.go
interoperability python3 client_test5.py
Paho MQTT v5 v3
paho
paho/main.go
Mochi MQTT mqtt MosquittoEMQX
MQTT-Stresser Apple Macbook Air M2 cmd/main.go
mqtt-stresser
mqtt-stresser -broker tcp://localhost:1883 -num-clients=2 -num-messages=10000
Broker | publish fastest | median | slowest | receive fastest | median | slowest |
---|---|---|---|---|---|---|
Mochi v2.2.10 | 124,772 | 125,456 | 124,614 | 314,461 | 313,186 | 311,910 |
Mosquitto v2.0.15 | 155,920 | 155,919 | 155,918 | 185,485 | 185,097 | 184,709 |
EMQX v5.0.11 | 156,945 | 156,257 | 155,568 | 17,918 | 17,783 | 17,649 |
Rumqtt v0.21.0 | 112,208 | 108,480 | 104,753 | 135,784 | 126,446 | 117,108 |
mqtt-stresser -broker tcp://localhost:1883 -num-clients=10 -num-messages=10000
Broker | publish fastest | median | slowest | receive fastest | median | slowest |
---|---|---|---|---|---|---|
Mochi v2.2.10 | 41,825 | 31,663 | 23,008 | 144,058 | 65,903 | 37,618 |
Mosquitto v2.0.15 | 42,729 | 38,633 | 29,879 | 23,241 | 19,714 | 18,806 |
EMQX v5.0.11 | 21,553 | 17,418 | 14,356 | 4,257 | 3,980 | 3,756 |
Rumqtt v0.21.0 | 42,213 | 23,153 | 20,814 | 49,465 | 36,626 | 19,283 |
100:
mqtt-stresser -broker tcp://localhost:1883 -num-clients=100 -num-messages=10000
Broker | publish fastest | median | slowest | receive fastest | median | slowest |
---|---|---|---|---|---|---|
Mochi v2.2.10 | 13,532 | 4,425 | 2,344 | 52,120 | 7,274 | 2,701 |
Mosquitto v2.0.15 | 3,826 | 3,395 | 3,032 | 1,200 | 1,150 | 1,118 |
EMQX v5.0.11 | 4,086 | 2,432 | 2,274 | 434 | 333 | 311 |
Rumqtt v0.21.0 | 78,972 | 5,047 | 3,804 | 4,286 | 3,249 | 2,027 |
EMQX Docker
[SPDX ] (https://spdx.dev) SPDX
// SPDX-License-Identifier: MIT
// SPDX-FileCopyrightText: 2023 mochi-mqtt
// SPDX-FileContributor: Your name or alias <[email protected]>
package name
SPDX-FileContributor -