Location>code7788 >text

Mqtt Integration and Design

Popularity:305 ℃/2025-03-18 10:35:05

Mqtt Integration

To integrate mqtt, we must first understand what mqtt is:

MQTT (Message Queuing Telemetry Transport) is a lightweight, publish-subscribe-based messaging protocol that is suitable for resource-constrained devices and low bandwidth, high latency, or unstable network environments. Suitable for Internet of Things (IoT) and device communication scenarios, it can achieve efficient communication between sensors, actuators and other devices.

Understand several concepts:

MQTT Client, MQTT Broker, Publish-Subscribe Mode, Topic, QoS

The film begins:

1. System architecture:

+------------------+   +-------------------+   +-----------------+

 | Device-side MQTT client |<--->| MQTT proxy (Broker) |<--->| Server MQTT client |

 | DeviceMqttClient | | (for example, Mosquitto) | | ServerMqttClient |

 +------------------+   +-------------------+   +-----------------+

 ​ ^ ^

 ​ | |

 ​ v v

 +------------------+                +-----------------+

 | Device Layer | | Application Layer |

 | Device Layer | | Application Layer |

 +------------------+                +-----------------+

2. Components:

  • MQTT Configuration (MqttConfig): Manage MQTT connection configuration information
  • BaseMQTT Client (BaseMqttClient): Provide basic functions of MQTT client
  • Device-side MQTT client (DeviceMqttClient): Device-oriented MQTT client implementation
  • Server-side MQTT client (ServerMqttClient): Server-oriented MQTT client implementation
  • DevicePool: Manage information about all connected devices
  • MQTT Service (MqttService): Integrate MQTT functions as a hosting service

3. Topic design

  • Equipment related

    • device/{deviceId}/info: Equipment information
    • device/{deviceId}/status: Device status
    • device/{deviceId}/heartbeat: Device heartbeat
    • device/{deviceId}/command: Device command
    • device/{deviceId}/command/response: Device command response
  • Meeting related

    • meeting/broadcast: Conference Broadcast
  • Announcement related

    • announcement/broadcast: Announcement broadcast

4. Data Model

4.1 Device Information (DeviceInfo)

Basic information about the device, including:

  • Device ID (DeviceId)
  • DeviceName
  • Device Type (DeviceType)
  • IP Address (IpAddress)
  • MAC Address (MacAddress)
  • Software Version (SoftwareVersion)
  • Hardware Version (HardwareVersion)
  • Device Status (Status)
  • Last OnlineTime
  • Last HeartbeatTime
  • Description
  • Location
  • Additional Properties

4.2 Device Status (DeviceStatus)

Possible statuses of the device include:

  • Offline
  • Online
  • Busy
  • Other custom statuses

4.3 Device Command

Commands sent to the device include:

  • Command ID (CommandId)
  • Device ID (DeviceId)
  • CommandType
  • Command Parameters
  • Timeout
  • CreateTime

4.4 MeetingInfo

Meeting-related information is used for meeting notifications.

4.5 AnnouncementInfo

System announcement information, used for broadcast notifications.

5. Workflow:

5.1 Service startup process

  1. When the program starts, register MQTT related services through dependency injection:

    // Register for MQTT service
     <MqttConfig>();
     <DevicePool>();
     <MqttService>();
     (provider => <MqttService>());
  2. MqttServiceAsIHostedServicestart up:

    • Loading MQTT configuration
    • Create a server-side MQTT client
    • Register various event handlers
    • Connect to the MQTT server
    • Start the heartbeat check timer
    • Start the device cleaning timer

5.2 Device connection process

  1. Equipment passedDeviceMqttClientConnect to the MQTT proxy
  2. After the connection is successful, the device publishes its own information todevice/{deviceId}/infotheme
  3. Server'sServerMqttClientReceive the device information and add the device toDevicePool
  4. Trigger the device online event and execute related business logic

5.3 Equipment heartbeat process

  1. The device sends heartbeat messages regularly todevice/{deviceId}/heartbeattheme
  2. The server receives a heartbeat message and updates the last heartbeat time of the device
  3. The server's heartbeat check timer periodically checks the device's heartbeat timeout
  4. For devices with heartbeat timeout, update their status to offline and trigger the device offline event

5.4 Command sending process

  1. Application via()Send command
  2. The command passesServerMqttClientPublish todevice/{deviceId}/commandtheme
  3. The device receives commands and performs corresponding operations
  4. The device publishes the command execution result todevice/{deviceId}/command/responsetheme
  5. The server receives a command response and triggers a command response event

5.5 Meeting and announcement broadcast process

  1. Application viaMqttServiceRelease meeting or announcement information
  2. Information passedServerMqttClientPost to the corresponding broadcast topic
  3. The device receives broadcast information and triggers the corresponding event to process it

5.6 Equipment cleaning process

  1. The device cleaning timer performs cleaning tasks regularly (default is 2 a.m.)
  2. Clean up device records for long-term offline (default over 24 hours)

6. Error handling and reconnection mechanism

  1. After the MQTT client is disconnected, the reconnect timer will be automatically started.
  2. Reconnect attempts based on the configured reconnect interval
  3. If the maximum number of reconnections is configured, stop reconnections after the number of times is reached; if set to 0, infinite reconnections are required.
  4. All communication operations include error handling and logging

MQTT configuration options include:

  • ServerAddress: Default is "localhost"
  • Server port (Port): Default is 1883
  • Client ID (ClientId): Automatically generated by default
  • Username and Password: for authentication
  • Whether to use TLS/SSL (UseTls): Default is false
  • ConnectionTimeout: Default is 10 seconds
  • KeepAliveInterval: Default is 60 seconds
  • ReconnectInterval: Default is 5 seconds
  • MaxReconnectAttempts: Default is 0 (infinite reconnection)

The design idea is finished, and the following article explains the implementation method (taking C# as an example)