· Zen HuiFer · Tutorial · 2 min read
Data Flow Link
Understand the core data structure and installation method of the data flow link in the Go IoT development platform, and optimize the data processing flow of IoT projects.
Data Flow Link
Core Data Structure
On-site installation methods for IoT projects:
- Physical devices can directly communicate over the network and report data
- Physical devices need to report data through gateway devices
For these two installation modes, the core data structure within this system is proposed.
type DataRowList struct {
Time int64 `json:"time"` // 秒级时间戳
DeviceUid string `json:"device_uid"` // 能够产生网络通讯的唯一编码
IdentificationCode string `json:"identification_code"` // 设备标识码
DataRows []DataRow `json:"data"`
Nc string `json:"nc"`
}
type DataRow struct {
Name string `json:"name"`
Value string `json:"value"`
}
In the first installation mode, DeviceUid
and IdentificationCode
are the same, while in the second installation mode, they are different.
Note: This data structure is not the data structure reported by the device, but the internal circulation data structure. After the device reports, it needs to be parsed into this data structure through a parsing script.
Below is an example of data reported by an MQTT client. The DeviceUid
field is the unique identifier of the MQTT client in this system. This identifier is determined when the MQTT client is created, so the user only needs to copy the relevant unique identifier. In addition, if the MQTT wildcard subscription mode is not used, IdentificationCode
needs to be set the same as DeviceUid
. The processing modes of other protocols are the same.
Core Data Flow Link
The following diagram shows the complete process from device data reporting to data processing.
- Devices send data to the specified server through COAP, MQTT, TCP/IP, WebSocket, and HTTP.
- After receiving the data, the server forwards it to different message queues according to different reporting protocols.
Reporting Protocol | Message Queue |
---|---|
COAP | pre_coap_handler |
MQTT | pre_handler |
TCP/IP | pre_tcp_handler |
WebSocket | pre_ws_handler |
HTTP | pre_http_handler |
- The message queue will find the corresponding parsing script to convert the original message into
DataRowList
data. Data storage will be completed inpre_xxx_handler
. - After passing through the
pre_xxx_handler
queue, the data will be placed into thewaring_handler
,waring_delay_handler
, andtransmit_handler
message queues for subsequent data processing. Among them,waring_handler
will put the data judged to be an alarm into the notification queuewaring_notice
.
Data Storage
This project uses Influxdb2 as the data storage tool. The overall design scheme is as follows.
bucket
is the content of the configuration filemeasurement
calculation rule:protocol_${DeviceUid}_${IdentificationCode}
field
details are as follows
Data Field | Data Value |
---|---|
storage_time | System storage time, this data is generated by the system, the time before writing to influxdb. |
push_time | Data reporting time, this data is obtained from ${DataRowList.Time} |
DataRowList.DataRows.Name | DataRowList.DataRows.Value |