Mqtt Integration
To integrate mqtt, we must first understand what mqtt is:
MQTT (Message Queuing Telemetry Transport) is a lightweight, publish-subscribe-based messaging protocol that is suitable for resource-constrained devices and low bandwidth, high latency, or unstable network environments. Suitable for Internet of Things (IoT) and device communication scenarios, it can achieve efficient communication between sensors, actuators and other devices.
Understand several concepts:
MQTT Client, MQTT Broker, Publish-Subscribe Mode, Topic, QoS
The film begins:
1. System architecture:
+------------------+ +-------------------+ +-----------------+
| Device-side MQTT client |<--->| MQTT proxy (Broker) |<--->| Server MQTT client |
| DeviceMqttClient | | (for example, Mosquitto) | | ServerMqttClient |
+------------------+ +-------------------+ +-----------------+
^ ^
| |
v v
+------------------+ +-----------------+
| Device Layer | | Application Layer |
| Device Layer | | Application Layer |
+------------------+ +-----------------+
2. Components:
- MQTT Configuration (MqttConfig): Manage MQTT connection configuration information
- BaseMQTT Client (BaseMqttClient): Provide basic functions of MQTT client
- Device-side MQTT client (DeviceMqttClient): Device-oriented MQTT client implementation
- Server-side MQTT client (ServerMqttClient): Server-oriented MQTT client implementation
- DevicePool: Manage information about all connected devices
- MQTT Service (MqttService): Integrate MQTT functions as a hosting service
3. Topic design
-
Equipment related:
-
device/{deviceId}/info
: Equipment information -
device/{deviceId}/status
: Device status -
device/{deviceId}/heartbeat
: Device heartbeat -
device/{deviceId}/command
: Device command -
device/{deviceId}/command/response
: Device command response
-
-
Meeting related:
-
meeting/broadcast
: Conference Broadcast
-
-
Announcement related:
-
announcement/broadcast
: Announcement broadcast
-
4. Data Model
4.1 Device Information (DeviceInfo)
Basic information about the device, including:
- Device ID (DeviceId)
- DeviceName
- Device Type (DeviceType)
- IP Address (IpAddress)
- MAC Address (MacAddress)
- Software Version (SoftwareVersion)
- Hardware Version (HardwareVersion)
- Device Status (Status)
- Last OnlineTime
- Last HeartbeatTime
- Description
- Location
- Additional Properties
4.2 Device Status (DeviceStatus)
Possible statuses of the device include:
- Offline
- Online
- Busy
- Other custom statuses
4.3 Device Command
Commands sent to the device include:
- Command ID (CommandId)
- Device ID (DeviceId)
- CommandType
- Command Parameters
- Timeout
- CreateTime
4.4 MeetingInfo
Meeting-related information is used for meeting notifications.
4.5 AnnouncementInfo
System announcement information, used for broadcast notifications.
5. Workflow:
5.1 Service startup process
-
When the program starts, register MQTT related services through dependency injection:
// Register for MQTT service <MqttConfig>(); <DevicePool>(); <MqttService>(); (provider => <MqttService>());
-
MqttService
AsIHostedService
start up:- Loading MQTT configuration
- Create a server-side MQTT client
- Register various event handlers
- Connect to the MQTT server
- Start the heartbeat check timer
- Start the device cleaning timer
5.2 Device connection process
- Equipment passed
DeviceMqttClient
Connect to the MQTT proxy - After the connection is successful, the device publishes its own information to
device/{deviceId}/info
theme - Server's
ServerMqttClient
Receive the device information and add the device toDevicePool
- Trigger the device online event and execute related business logic
5.3 Equipment heartbeat process
- The device sends heartbeat messages regularly to
device/{deviceId}/heartbeat
theme - The server receives a heartbeat message and updates the last heartbeat time of the device
- The server's heartbeat check timer periodically checks the device's heartbeat timeout
- For devices with heartbeat timeout, update their status to offline and trigger the device offline event
5.4 Command sending process
- Application via
()
Send command - The command passes
ServerMqttClient
Publish todevice/{deviceId}/command
theme - The device receives commands and performs corresponding operations
- The device publishes the command execution result to
device/{deviceId}/command/response
theme - The server receives a command response and triggers a command response event
5.5 Meeting and announcement broadcast process
- Application via
MqttService
Release meeting or announcement information - Information passed
ServerMqttClient
Post to the corresponding broadcast topic - The device receives broadcast information and triggers the corresponding event to process it
5.6 Equipment cleaning process
- The device cleaning timer performs cleaning tasks regularly (default is 2 a.m.)
- Clean up device records for long-term offline (default over 24 hours)
6. Error handling and reconnection mechanism
- After the MQTT client is disconnected, the reconnect timer will be automatically started.
- Reconnect attempts based on the configured reconnect interval
- If the maximum number of reconnections is configured, stop reconnections after the number of times is reached; if set to 0, infinite reconnections are required.
- All communication operations include error handling and logging
MQTT configuration options include:
- ServerAddress: Default is "localhost"
- Server port (Port): Default is 1883
- Client ID (ClientId): Automatically generated by default
- Username and Password: for authentication
- Whether to use TLS/SSL (UseTls): Default is false
- ConnectionTimeout: Default is 10 seconds
- KeepAliveInterval: Default is 60 seconds
- ReconnectInterval: Default is 5 seconds
- MaxReconnectAttempts: Default is 0 (infinite reconnection)
The design idea is finished, and the following article explains the implementation method (taking C# as an example)