gmqtt: github.com/DrmagicE/gmqtt Index | Examples | Files | Directories

package gmqtt

import "github.com/DrmagicE/gmqtt"

Package gmqtt provides an MQTT v3.1.1 server library.

see /examples for more details.

Code:

ln, err := net.Listen("tcp", ":1883")
if err != nil {
    fmt.Println(err.Error())
    return
}

ws := &WsServer{
    Server: &http.Server{Addr: ":8080"},
    Path:   "/",
}
l, _ := zap.NewProduction()
srv := NewServer(
    WithTCPListener(ln),
    WithWebsocketServer(ws),

    // add config
    WithConfig(DefaultConfig),
    // add plugins
    // WithPlugin(prometheus.New(&http.Server{Addr: ":8082"}, "/metrics")),
    // add Hook
    WithHook(Hooks{
        OnConnect: func(ctx context.Context, client Client) (code uint8) {
            return packets.CodeAccepted
        },
        OnSubscribe: func(ctx context.Context, client Client, topic packets.Topic) (qos uint8) {
            fmt.Println("register onSubscribe callback")
            return packets.QOS_1
        },
    }),
    // add logger
    WithLogger(l),
)

srv.Run()
fmt.Println("started...")
signalCh := make(chan os.Signal, 1)
signal.Notify(signalCh, os.Interrupt, syscall.SIGTERM)
<-signalCh
srv.Stop(context.Background())
fmt.Println("stopped")

Index

Examples

Package Files

client.go hook.go message.go options.go plugin.go publish_service.go server.go session.go stats.go

Constants

const (
    Connecting = iota
    Connected
    Switiching
    Disconnected
)

Client status

const (
    DefaultMsgRouterLen  = 4096
    DefaultRegisterLen   = 2048
    DefaultUnRegisterLen = 2048
)

Default configration

Variables

var (
    ErrInvalStatus    = errors.New("invalid connection status")
    ErrConnectTimeOut = errors.New("connect time out")
)

Error

var DefaultConfig = Config{
    RetryInterval:              20 * time.Second,
    RetryCheckInterval:         20 * time.Second,
    SessionExpiryInterval:      0 * time.Second,
    SessionExpiryCheckInterval: 0 * time.Second,
    QueueQos0Messages:          true,
    MaxInflight:                32,
    MaxAwaitRel:                100,
    MaxMsgQueue:                1000,
    DeliveryMode:               OnlyOnce,
    MsgRouterLen:               DefaultMsgRouterLen,
    RegisterLen:                DefaultRegisterLen,
    UnregisterLen:              DefaultUnRegisterLen,
}

DefaultConfig default config used by NewServer()

var (
    // ErrInvalWsMsgType [MQTT-6.0.0-1]
    ErrInvalWsMsgType = errors.New("invalid websocket message type")
)

func LoggerWithField Uses

func LoggerWithField(fields ...zap.Field) *zap.Logger

LoggerWithField add fields to a new logger. Plugins can use this method to add plugin name field.

func NewMessage Uses

func NewMessage(topic string, payload []byte, qos uint8, opts ...msgOptions) packets.Message

NewMessage creates a message for publish service.

func NewServer Uses

func NewServer(opts ...Options) *server

NewServer returns a gmqtt server instance with the given options

func Retained Uses

func Retained(retained bool) msgOptions

Retained sets retained flag to the message

type Client Uses

type Client interface {
    // OptionsReader returns ClientOptionsReader for reading options data.
    OptionsReader() ClientOptionsReader
    // IsConnected returns whether the client is connected.
    IsConnected() bool
    // ConnectedAt returns the connected time
    ConnectedAt() time.Time
    // DisconnectedAt return the disconnected time
    DisconnectedAt() time.Time
    // Connection returns the raw net.Conn
    Connection() net.Conn
    // Close closes the client connection. The returned channel will be closed after unregister process has been done
    Close() <-chan struct{}

    GetSessionStatsManager() SessionStatsManager
}

Client represent

type ClientOptionsReader Uses

type ClientOptionsReader interface {
    ClientID() string
    Username() string
    Password() string
    KeepAlive() uint16
    CleanSession() bool
    WillFlag() bool
    WillRetain() bool
    WillQos() uint8
    WillTopic() string
    WillPayload() []byte
    LocalAddr() net.Addr
    RemoteAddr() net.Addr
}

ClientOptionsReader is mainly used in callback functions.

type ClientStats Uses

type ClientStats struct {
    ConnectedTotal    uint64
    DisconnectedTotal uint64
    // ActiveCurrent is the number of current active session.
    ActiveCurrent uint64
    // InactiveCurrent is the number of current inactive session.
    InactiveCurrent uint64
    // ExpiredTotal is the number of expired session.
    ExpiredTotal uint64
}

ClientStats provides the statistics of client connections.

type Config Uses

type Config struct {
    RetryInterval              time.Duration
    RetryCheckInterval         time.Duration
    SessionExpiryInterval      time.Duration
    SessionExpiryCheckInterval time.Duration
    QueueQos0Messages          bool
    MaxInflight                int
    MaxAwaitRel                int
    MaxMsgQueue                int
    DeliveryMode               DeliveryMode
    MsgRouterLen               int
    RegisterLen                int
    UnregisterLen              int
}

type DeliveryMode Uses

type DeliveryMode int
const (
    Overlap  DeliveryMode = 0
    OnlyOnce DeliveryMode = 1
)

type HookWrapper Uses

type HookWrapper struct {
    OnConnectWrapper           OnConnectWrapper
    OnConnectedWrapper         OnConnectedWrapper
    OnSessionCreatedWrapper    OnSessionCreatedWrapper
    OnSessionResumedWrapper    OnSessionResumedWrapper
    OnSessionTerminatedWrapper OnSessionTerminatedWrapper
    OnSubscribeWrapper         OnSubscribeWrapper
    OnSubscribedWrapper        OnSubscribedWrapper
    OnUnsubscribeWrapper       OnUnsubscribeWrapper
    OnUnsubscribedWrapper      OnUnsubscribedWrapper
    OnMsgArrivedWrapper        OnMsgArrivedWrapper
    OnAckedWrapper             OnAckedWrapper
    OnMsgDroppedWrapper        OnMsgDroppedWrapper
    OnDeliverWrapper           OnDeliverWrapper
    OnCloseWrapper             OnCloseWrapper
    OnAcceptWrapper            OnAcceptWrapper
    OnStopWrapper              OnStopWrapper
}

HookWrapper groups all hook wrappers function

type Hooks Uses

type Hooks struct {
    OnAccept
    OnStop
    OnSubscribe
    OnSubscribed
    OnUnsubscribe
    OnUnsubscribed
    OnMsgArrived
    OnConnect
    OnConnected
    OnSessionCreated
    OnSessionResumed
    OnSessionTerminated
    OnDeliver
    OnAcked
    OnClose
    OnMsgDropped
}

type MessageStats Uses

type MessageStats struct {
    Qos0 struct {
        DroppedTotal  uint64
        ReceivedTotal uint64
        SentTotal     uint64
    }
    Qos1 struct {
        DroppedTotal  uint64
        ReceivedTotal uint64
        SentTotal     uint64
    }
    Qos2 struct {
        DroppedTotal  uint64
        ReceivedTotal uint64
        SentTotal     uint64
    }
    QueuedCurrent uint64
}

MessageStats represents the statistics of PUBLISH packet, separated by QOS.

type OnAccept Uses

type OnAccept func(ctx context.Context, conn net.Conn) bool

OnAccept 会在新连接建立的时候调用,只在TCP server中有效。如果返回false,则会直接关闭连接

OnAccept will be called after a new connection established in TCP server. If returns false, the connection will be close directly.

type OnAcceptWrapper Uses

type OnAcceptWrapper func(OnAccept) OnAccept

type OnAcked Uses

type OnAcked func(ctx context.Context, client Client, msg packets.Message)

OnAcked 当客户端对qos1或qos2返回确认的时候调用

OnAcked will be called when receiving the ack packet for a published qos1 or qos2 message.

type OnAckedWrapper Uses

type OnAckedWrapper func(OnAcked) OnAcked

type OnClose Uses

type OnClose func(ctx context.Context, client Client, err error)

OnClose tcp连接关闭之后触发

OnClose will be called after the tcp connection of the client has been closed

type OnCloseWrapper Uses

type OnCloseWrapper func(OnClose) OnClose

type OnConnect Uses

type OnConnect func(ctx context.Context, client Client) (code uint8)

OnConnect 当合法的connect报文到达的时候触发,返回connack中响应码

OnConnect will be called when a valid connect packet is received. It returns the code of the connack packet

type OnConnectWrapper Uses

type OnConnectWrapper func(OnConnect) OnConnect

type OnConnected Uses

type OnConnected func(ctx context.Context, client Client)

OnConnected 当客户端成功连接后触发

OnConnected will be called when a mqtt client connect successfully.

type OnConnectedWrapper Uses

type OnConnectedWrapper func(OnConnected) OnConnected

type OnDeliver Uses

type OnDeliver func(ctx context.Context, client Client, msg packets.Message)

OnDeliver 分发消息时触发

OnDeliver will be called when publishing a message to a client.

type OnDeliverWrapper Uses

type OnDeliverWrapper func(OnDeliver) OnDeliver

type OnMsgArrived Uses

type OnMsgArrived func(ctx context.Context, client Client, msg packets.Message) (valid bool)

OnMsgArrived 返回接收到的publish报文是否允许转发,返回false则该报文不会被继续转发

OnMsgArrived returns whether the publish packet will be delivered or not. If returns false, the packet will not be delivered to any clients.

type OnMsgArrivedWrapper Uses

type OnMsgArrivedWrapper func(OnMsgArrived) OnMsgArrived

type OnMsgDropped Uses

type OnMsgDropped func(ctx context.Context, client Client, msg packets.Message)

OnMessageDropped 丢弃报文后触发

OnMsgDropped will be called after the msg dropped

type OnMsgDroppedWrapper Uses

type OnMsgDroppedWrapper func(OnMsgDropped) OnMsgDropped

type OnSessionCreated Uses

type OnSessionCreated func(ctx context.Context, client Client)

OnSessionCreated 新建session时触发

OnSessionCreated will be called when session created.

type OnSessionCreatedWrapper Uses

type OnSessionCreatedWrapper func(OnSessionCreated) OnSessionCreated

type OnSessionResumed Uses

type OnSessionResumed func(ctx context.Context, client Client)

OnSessionResumed 恢复session时触发

OnSessionResumed will be called when session resumed.

type OnSessionResumedWrapper Uses

type OnSessionResumedWrapper func(OnSessionResumed) OnSessionResumed

type OnSessionTerminated Uses

type OnSessionTerminated func(ctx context.Context, client Client, reason SessionTerminatedReason)

OnSessionTerminated session 下线时触发

OnSessionTerminated will be called when session terminated.

type OnSessionTerminatedWrapper Uses

type OnSessionTerminatedWrapper func(OnSessionTerminated) OnSessionTerminated

type OnStop Uses

type OnStop func(ctx context.Context)

OnStop will be called on server.Stop()

type OnStopWrapper Uses

type OnStopWrapper func(OnStop) OnStop

type OnSubscribe Uses

type OnSubscribe func(ctx context.Context, client Client, topic packets.Topic) (qos uint8)

OnSubscribe 返回topic允许订阅的最高QoS等级

OnSubscribe returns the maximum available QoS for the topic:

0x00 - Success - Maximum QoS 0
0x01 - Success - Maximum QoS 1
0x02 - Success - Maximum QoS 2
0x80 - Failure

type OnSubscribeWrapper Uses

type OnSubscribeWrapper func(OnSubscribe) OnSubscribe

type OnSubscribed Uses

type OnSubscribed func(ctx context.Context, client Client, topic packets.Topic)

OnSubscribed will be called after the topic subscribe successfully

type OnSubscribedWrapper Uses

type OnSubscribedWrapper func(OnSubscribed) OnSubscribed

type OnUnsubscribe Uses

type OnUnsubscribe func(ctx context.Context, client Client, topicName string)

OnUnsubscribe will be called when the topic is being unsubscribed

type OnUnsubscribeWrapper Uses

type OnUnsubscribeWrapper func(OnUnsubscribe) OnUnsubscribe

type OnUnsubscribed Uses

type OnUnsubscribed func(ctx context.Context, client Client, topicName string)

OnUnsubscribed will be called after the topic has been unsubscribed

type OnUnsubscribedWrapper Uses

type OnUnsubscribedWrapper func(OnUnsubscribed) OnUnsubscribed

type Options Uses

type Options func(srv *server)

func WithConfig Uses

func WithConfig(config Config) Options

WithConfig set the config of the server

func WithHook Uses

func WithHook(hooks Hooks) Options

WithHook set hooks of the server. Notice: WithPlugin() will overwrite hooks.

func WithLogger Uses

func WithLogger(logger *zap.Logger) Options

func WithPlugin Uses

func WithPlugin(plugin ...Plugable) Options

WithPlugin set plugin(s) of the server.

func WithTCPListener Uses

func WithTCPListener(lns ...net.Listener) Options

WithTCPListener set tcp listener(s) of the server. Default listen on :1883.

func WithWebsocketServer Uses

func WithWebsocketServer(ws ...*WsServer) Options

WithWebsocketServer set websocket server(s) of the server.

type PacketBytes Uses

type PacketBytes struct {
    Connect     uint64
    Connack     uint64
    Disconnect  uint64
    Pingreq     uint64
    Pingresp    uint64
    Puback      uint64
    Pubcomp     uint64
    Publish     uint64
    Pubrec      uint64
    Pubrel      uint64
    Suback      uint64
    Subscribe   uint64
    Unsuback    uint64
    Unsubscribe uint64
}

PacketBytes represents total bytes of each packet type have been received or sent.

type PacketCount Uses

type PacketCount struct {
    Connect     uint64
    Connack     uint64
    Disconnect  uint64
    Pingreq     uint64
    Pingresp    uint64
    Puback      uint64
    Pubcomp     uint64
    Publish     uint64
    Pubrec      uint64
    Pubrel      uint64
    Suback      uint64
    Subscribe   uint64
    Unsuback    uint64
    Unsubscribe uint64
}

PacketCount represents total number of each packet type have been received or sent.

type PacketStats Uses

type PacketStats struct {
    BytesReceived *PacketBytes
    ReceivedTotal *PacketCount
    BytesSent     *PacketBytes
    SentTotal     *PacketCount
}

PacketStats represents the statistics of MQTT Packet.

type Plugable Uses

type Plugable interface {
    // Load will be called in server.Run(). If return error, the server will panic.
    Load(service Server) error
    // Unload will be called when the server is shutdown, the return error is only for logging
    Unload() error
    // HookWrapper returns all hook wrappers that used by the plugin.
    // Return a empty wrapper  if the plugin does not need any hooks
    HookWrapper() HookWrapper
    // Name return the plugin name
    Name() string
}

Plugable is the interface need to be implemented for every plugins.

type PublishService Uses

type PublishService interface {
    // Publish publish a message to broker.
    // Calling this method will not trigger OnMsgArrived hook.
    Publish(message packets.Message)
    // PublishToClient publish a message to a specific client.
    // If match sets to true, the message will send to the client
    // only if the client is subscribed to a topic that matches the message.
    // If match sets to false, the message will send to the client directly even
    // there are no matched subscriptions.
    // Calling this method will not trigger OnMsgArrived hook.
    PublishToClient(clientID string, message packets.Message, match bool)
}

PublishService provides the ability to publish messages to the broker.

type Server Uses

type Server interface {
    // SubscriptionStore returns the subscription.Store.
    SubscriptionStore() subscription.Store
    // RetainedStore returns the retained.Store.
    RetainedStore() retained.Store
    // PublishService returns the PublishService
    PublishService() PublishService
    // Client return the client specified by clientID.
    Client(clientID string) Client
    // GetConfig returns the config of the server
    GetConfig() Config
    // GetStatsManager returns StatsManager
    GetStatsManager() StatsManager
}

Server interface represents a mqtt server instance.

type ServerStats Uses

type ServerStats struct {
    PacketStats       *PacketStats
    ClientStats       *ClientStats
    MessageStats      *MessageStats
    SubscriptionStats *subscription.Stats
}

ServerStats is the collection of global statistics.

type SessionStats Uses

type SessionStats struct {
    // InflightCurrent, the current length of the inflight queue.
    InflightCurrent uint64
    // AwaitRelCurrent, the current length of the awaitRel queue.
    AwaitRelCurrent uint64

    MessageStats
}

SessionStats the collection of statistics of each session.

type SessionStatsManager Uses

type SessionStatsManager interface {

    // GetStats return the session statistics
    GetStats() *SessionStats
    // contains filtered or unexported methods
}

SessionStatsManager interface provides the ability to access the statistics of the session

type SessionTerminatedReason Uses

type SessionTerminatedReason byte
const (
    NormalTermination SessionTerminatedReason = iota
    ConflictTermination
    ExpiredTermination
)

type StatsManager Uses

type StatsManager interface {

    // GetStats return the server statistics
    GetStats() *ServerStats
    // contains filtered or unexported methods
}

StatsManager interface provides the ability to access the statistics of the server

type WsServer Uses

type WsServer struct {
    Server   *http.Server
    Path     string // Url path
    CertFile string //TLS configration
    KeyFile  string //TLS configration
}

WsServer is used to build websocket server

Directories

PathSynopsis
pkg/packets
plugin/management
plugin/prometheus
retained
retained/trie
subscription
subscription/trie

Package gmqtt imports 23 packages (graph) and is imported by 7 packages. Updated 2020-09-29. Refresh now. Tools for package owners.