server

The fully compliant, embeddable high-performance Go MQTT v5 server for IoT, smarthome, and pubsub

MIT License

Stars
794
Committers
39

Mochi-MQTT Server

English | | | !

**mochi-co/mqtt mochi-mqtt ** .

Mochi-MQTT Go MQTT v5 v3.1.1/

Mochi MQTT MQTT v5 / Go

MQTT?

MQTT MQ Telemetry Transport/Mochi MQTT MQTT 5.0.0

Mochi-MQTT

  • MQTT v5 MQTT v3.1.1 v3.0.0
    • MQTT v5
    • (Topic Aliases)
    • (Shared Subscriptions)
    • (Identifiers)
    • (Message Expiry)
    • (Client Session Expiry)
    • QoS (Flow Control Quotas)
    • (Auth Packets)
    • (Will Delay Intervals)
    • Mochi MQTT v1 MQTT QoS0,1,2$SYS
    • (Hook)(plugin)
    • (inline client)
    • Trie -
    • PahoMQTT v5 MQTT v3
  • TCPWebsocket SSL/TLS$SYS
  • RedisBadgerPebble Bolt Hook
  • ACL Hook

(Compatibility Notes)

v5 MQTT v5 v3 v5 v3 v5 v3

MQTT v3.0.0 v3.1.1 v3 v5 - (retained messages)(inflight messages)QOS

(Roadmap)

  • hook

(Quick Start)

Go

Mochi MQTT cmd cmd/main.go tcp (:1883)websocket (:1882) (:8080)

cd cmd
go build -o mqtt && ./mqtt

Docker

Docker Hub Mochi MQTT

docker pull mochimqtt/server

docker run -v $(pwd)/config.yaml:/config.yaml mochimqtt/server

yaml json Dockerfile cmd/main.go Websocket(:1882)TCP(:1883) (:8080) allow-all (Hook)

docker build -t mochi:latest .
docker run -p 1883:1883 -p 1882:1882 -p 8080:8080 -v $(pwd)/config.yaml:/config.yaml mochi:latest

Docker --config=config.yaml --config=config.json

(hooks)(listeners)(options)(compatibilities)

listeners:
  - type: "tcp"
    id: "tcp12"
    address: ":1883"
  - type: "ws"
    id: "ws1"
    address: ":1882"
  - type: "sysinfo"
    id: "stats"
    address: ":1880"
hooks:
  auth:
    allow_all: true
options:
  inline_client: true

examples/config

  1. hookauthstoragedebug
  2. (mochi-mqtthook)conf.toml
  3. (listeners)

(Hooks)(listeners) cmd/main.go

Mochi MQTT

Mochi MQTT

Mochi MQTT

import (
  "log"

  mqtt "github.com/mochi-mqtt/server/v2"
  "github.com/mochi-mqtt/server/v2/hooks/auth"
  "github.com/mochi-mqtt/server/v2/listeners"
)

func main() {
  // 
  sigs := make(chan os.Signal, 1)
  done := make(chan bool, 1)
  signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
  go func() {
    <-sigs
    done <- true
  }()

  //  MQTT 
  server := mqtt.New(nil)
  
  // ()
  _ = server.AddHook(new(auth.AllowHook), nil)
  
  // 1883 TCP 
  tcp := listeners.NewTCP(listeners.Config{ID: "t1", Address: ":1883"})
  err := server.AddListener(tcp)
  if err != nil {
    log.Fatal(err)
  }
  

  go func() {
    err := server.Serve()
    if err != nil {
      log.Fatal(err)
    }
  }()

  // 
  <-done

  // 
}

examples

(Network Listeners)

(Network Listeners)ListenersListeners

Listener Usage
listeners.NewTCP TCP TCP
listeners.NewUnixSock Unix
listeners.NewNet net.Listener
listeners.NewWebsocket Websocket
listeners.NewHTTPStats HTTP $SYS
listeners.NewHTTPHealthCheck HTTP

listeners.ListenerListener

*listeners.Config TLSListenerTLS cmd/main.go

(Server Options and Capabilities)

(Options)

server := mqtt.New(&mqtt.Options{
  Capabilities: mqtt.Capabilities{
    MaximumSessionExpiryInterval: 3600,
    MaximumClientWritesPending: 3,
    Compatibilities: mqtt.Compatibilities{
      ObscureNotAuthorized: true,
    },
  },
  ClientNetWriteBufferSize: 4096,
  ClientNetReadBufferSize: 4096,
  SysTopicResendInterval: 10,
  InlineClient: false,
})

mqtt.Optionsmqtt.Capabilities mqtt.Compatibilities ClientNetWriteBufferSize ClientNetReadBufferSize Capabilities.MaximumClientWritesPending IoT 1024*8 ,

(Default Configuration Notes)

  • server.Options.Capabilities.MaximumMessageExpiryInterval 8640024DOSretained/inflight0

(Event Hooks)

(Event Hooks)Hook(authentication)(persistent storage)(debugging tools)

(Hook) - (Hook)(Hook)(Hook)

mochi-mqtt/server/hooks/auth . AllowHook AllowHook
mochi-mqtt/server/hooks/auth . Auth
mochi-mqtt/server/hooks/storage/bolt BoltDB
mochi-mqtt/server/hooks/storage/badger BadgerDB
mochi-mqtt/server/hooks/storage/pebble PebbleDB
mochi-mqtt/server/hooks/storage/redis Redis
mochi-mqtt/server/hooks/debug

HookHook

(Access Control)

(Allow Hook)

Mochi MQTT (DENY-ALL)(Hook)(DENY-ALL)(Hook) auth.AllowAll (Hook)(ALLOW-ALL)

server := mqtt.New(nil)
_ = server.AddHook(new(auth.AllowHook), nil)

(Auth Ledger)

(Auth Ledger hook) ACL

(Auth rules)

Client ID
Username
Password
Remote IP
Allow true false

ACL(ACL rules)

Client ID
Username
Remote IP
Filters

0,1,2,3 hooks/auth/ledger.go

server := mqtt.New(nil)
err := server.AddHook(new(auth.Hook), &auth.Options{
    Ledger: &auth.Ledger{
    Auth: auth.AuthRules{ // Auth 
      {Username: "peach", Password: "password1", Allow: true},
      {Username: "melon", Password: "password2", Allow: true},
      {Remote: "127.0.0.1:*", Allow: true},
      {Remote: "localhost:*", Allow: true},
    },
    ACL: auth.ACLRules{ // ACL 
      {Remote: "127.0.0.1:*"}, // 
      {
        //  melon 
        Username: "melon", Filters: auth.Filters{
          "melon/#":   auth.ReadWrite,
          "updates/#": auth.WriteOnly, //  updates updates
        },
      },
      {
        // 
        Filters: auth.Filters{
          "#":         auth.ReadOnly,
          "updates/#": auth.Deny,
        },
      },
    },
  }
})

JSON YAML Data

err := server.AddHook(new(auth.Hook), &auth.Options{
    Data: data, // yaml  json
})

examples/auth/encoded/main.go

(Persistent Storage)

Redis

Redis (Hook)Redis(Hook)Redis(Hook) github.com/go-redis/redis/v8 Options

err := server.AddHook(new(redis.Hook), &redis.Options{
  Options: &rv8.Options{
    Addr:     "localhost:6379", // Redis
    Password: "",               // Redis
    DB:       0,                // Redisindex
  },
})
if err != nil {
  log.Fatal(err)
}

Redis examples/persistence/redis/main.go hooks/storage/redis

Pebble DB

PebbleDB (Hook)

err := server.AddHook(new(pebble.Hook), &pebble.Options{
  Path: pebblePath,
  Mode: pebble.NoSync,
})
if err != nil {
  log.Fatal(err)
}

pebble (Hook) examples/persistence/pebble/main.go hooks/storage/pebble

Badger DB

BadgerDB (Hook)

err := server.AddHook(new(badger.Hook), &badger.Options{
  Path: badgerPath,
})
if err != nil {
  log.Fatal(err)
}

Badger (Hook) examples/persistence/badger/main.go hooks/storage/badger

BoltDB (Hook) Badger examples/persistence/bolt/main.go

Event Hooks

(Hook) mqtt.Hook hooks.go (Hook)

OnPacketReadOnPacketEncode OnPacketSent -

OnStarted
OnStopped
OnConnectAuthenticate hooks/auth/allow_all basicHook true
OnACLCheck ACL
OnSysInfoTick $SYS
OnConnect
OnSessionEstablish CONNACK
OnSessionEstablished OnConnect
OnDisconnect
OnAuthPacket MQTT v5
OnPacketRead
OnPacketEncode
OnPacketSent
OnPacketProcessed
OnSubscribe
OnSubscribed
OnSelectSubscribers
OnUnsubscribe
OnUnsubscribed
OnPublish
OnPublished
OnPublishDropped
OnRetainMessage
OnRetainPublished
OnQosPublish QoS >= 1
OnQosComplete QoS
OnQosDropped QoS
OnPacketIDExhausted packet idsid
OnWill
OnWillSent
OnClientExpired
OnRetainedExpired
StoredClients
StoredSubscriptions
StoredInflightMessages inflight messages
StoredRetainedMessages
StoredSysInfo

HookHookHookOnACLCheck OnConnectAuthenticate

(Inline Client v2.4.0+)

server := mqtt.New(&mqtt.Options{
  InlineClient: true,
})

server.Publishserver.Subscribe server.Unsubscribe

direct examples

(Inline Publish)

Publish server.Publish

err := server.Publish("direct/publish", []byte("packet scheduled message"), false, 0)

QoS MQTT v5

(Inline Subscribe)

server.Subscribe QoS0 MQTTv5 subscriptionId

callbackFn := func(cl *mqtt.Client, sub packets.Subscription, pk packets.Packet) {
    server.Log.Info("inline client received message from subscription", "client", cl.ID, "subscriptionId", sub.Identifier, "topic", pk.TopicName, "payload", string(pk.Payload))
}
server.Subscribe("direct/#", 1, callbackFn)

(Inline Unsubscribe)

server.Unsubscribe

server.Unsubscribe("direct/#", 1)

(Packet Injection)

MQTT v5(publish packets)MQTT(packets)

(Packet Injection)MQTTping

(Inline Client)ACL$SYS

cl := server.NewClient(nil, "local", "inline", true)
server.InjectPacket(cl, packets.Packet{
  FixedHeader: packets.FixedHeader{
    Type: packets.Publish,
  },
  TopicName: "direct/publish",
  Payload: []byte("scheduled message"),
})

MQTT MQTTv5

hooks example

(Testing)

(Unit Tests)

Mochi MQTT

go run --cover ./...

Paho (Paho Interoperability Test)

examples/paho/main.go interoperability python3 client_test5.py Paho MQTT v5 v3

paho paho/main.go

(Performance Benchmarks)

Mochi MQTT mqtt MosquittoEMQX

MQTT-Stresser Apple Macbook Air M2 cmd/main.go

mqtt-stresser

mqtt-stresser -broker tcp://localhost:1883 -num-clients=2 -num-messages=10000

Broker publish fastest median slowest receive fastest median slowest
Mochi v2.2.10 124,772 125,456 124,614 314,461 313,186 311,910
Mosquitto v2.0.15 155,920 155,919 155,918 185,485 185,097 184,709
EMQX v5.0.11 156,945 156,257 155,568 17,918 17,783 17,649
Rumqtt v0.21.0 112,208 108,480 104,753 135,784 126,446 117,108

mqtt-stresser -broker tcp://localhost:1883 -num-clients=10 -num-messages=10000

Broker publish fastest median slowest receive fastest median slowest
Mochi v2.2.10 41,825 31,663 23,008 144,058 65,903 37,618
Mosquitto v2.0.15 42,729 38,633 29,879 23,241 19,714 18,806
EMQX v5.0.11 21,553 17,418 14,356 4,257 3,980 3,756
Rumqtt v0.21.0 42,213 23,153 20,814 49,465 36,626 19,283

100:

mqtt-stresser -broker tcp://localhost:1883 -num-clients=100 -num-messages=10000

Broker publish fastest median slowest receive fastest median slowest
Mochi v2.2.10 13,532 4,425 2,344 52,120 7,274 2,701
Mosquitto v2.0.15 3,826 3,395 3,032 1,200 1,150 1,118
EMQX v5.0.11 4,086 2,432 2,274 434 333 311
Rumqtt v0.21.0 78,972 5,047 3,804 4,286 3,249 2,027

EMQX Docker

(Contribution Guidelines)

(bug)PR(pull request)

  • PR(pull request)
  • SPDX FileContributor

[SPDX ] (https://spdx.dev) SPDX

// SPDX-License-Identifier: MIT
// SPDX-FileCopyrightText: 2023 mochi-mqtt
// SPDX-FileContributor: Your name or alias <[email protected]>

package name

SPDX-FileContributor -

Stargazers over time

Mochi MQTT