阿里云IOT设备数据接入-从阿里云IOT云平台获取设备数据
阿里云IOT设备数据接入 (从阿里云IOT云平台获取设备数据)
从 IOT云平台获取设备数据
前言:
这篇文档主要讲述的就是当设备的数据发送到物联网套件之后,用户的服务端如何获取设备
的数据。
通过阅读 IoT文档,我们了解到队列中消息结构体如下:
{
“payload”: “Base64 Encode的数据”,
“messagetype”: “status”,
“messageid”: 996000000000000001,
“topic”: “具体的设备Topic”,
“timestamp”: 1526450324
}
- messageid:IoT套件生成的消息ID
- messagetype:指的是消息类型:设备状态status和设备上报消息upload
- topic:表示该消息源自套件中的哪个topic,当messageType=status时,topic为null,当messageType=upload时,topic为具体的设备Topic
- payload:数据为Base64 Encode的数据。当messageType=status时,数据是设备状态数据;当messageType=upload时,data即为设备发布到Topic中的原始数据。
- timestamp:队列中消息生成时间戳,并非业务的时间戳
一,基于HTTP/2通道的服务端订阅-设备状态和数据
1.服务端订阅
1.1 服务端订阅流程
在IoT场景,有时候我们期望业务服务器能接收到设备状态和设备采集的数据,而不是通过云产品中转。这时我们可以开启 服务端订阅 ,IoT平台会把设备产生的消息通过HTTP/2通道推送到业务服务器,以便根据自身业务场景消费。
服务端可以直接订阅产品下配置的所有类型的消息。
目前,新版物联网平台通过HTTP/2通道进行消息流转。配置HTTP/2服务端订阅后,物联网平台会将消息通过HTTP/2通道推送至服务端。通过接入HTTP/2 SDK,企业服务器可以直接从物联网平台接收消息。HTTP/2 SDK提供身份认证、Topic订阅、消息发送和消息接收能力,并支持设备接入和云端接入能力。HTTP/2 SDK适用于物联网平台与企业服务器之间的大量消息流转,也支持设备与物联网平台之间的消息收发。
注:HTTP/2是方便、快捷、低延时,小堆积的消息流转通道,不是队列。目前是只有java8客户端sdk。 如果业务数据流非常大, 需要支持海量消息堆积,建议通过规则引擎流转队列方式处理。
不论那个方式,消费端还是需要提升自身消费速率才能高效处理业务消息。
目前服务端订阅是基于uid的,所有产品的消息都会被HTTP/2的client端消费,这时需要我们自己根据topic包含的productKey/deviceName区分
2.服务端订阅SDK使用
目前仅提供Java8版SDK
2.1 java sdk依赖
<!-- Aliyun core --><dependency> <groupId>com.aliyun</groupId> <artifactId>aliyun-java-sdk-core</artifactId> <version>3.7.1</version></dependency><!-- iot message client --><dependency> <groupId>com.aliyun.openservices</groupId> <artifactId>iot-client-message</artifactId> <version>1.1.2</version></dependency>
2.2 消费IoT平台设备消息
import java.net.UnknownHostException;import java.util.concurrent.ExecutionException;import com.aliyun.openservices.iot.api.Profile;import com.aliyun.openservices.iot.api.message.MessageClientFactory;import com.aliyun.openservices.iot.api.message.api.MessageClient;import com.aliyun.openservices.iot.api.message.callback.MessageCallback;import com.aliyun.openservices.iot.api.message.entity.Message; public class H2Client { public static void main(String[] args) throws UnknownHostException, ExecutionException, InterruptedException { // 身份 String accessKey = "[阿里云](https://l.gushuji.site/aliyun)accessKey"; String accessSecret = "[阿里云](https://l.gushuji.site/aliyun)accessSecret"; String regionId = "cn-shanghai"; String uid = "[阿里云](https://l.gushuji.site/aliyun)账号 uid"; String endPoint = "https://" + uid + ".iot-as-http2." + regionId + ".aliyuncs.com"; // 连接配置 Profile profile = Profile.getAccessKeyProfile(endPoint, regionId, accessKey, accessSecret); // 构造客户端 MessageClient client = MessageClientFactory.messageClient(profile); // 数据接收 client.connect(messageToken -> { Message m = messageToken.getMessage(); System.out.println("\ntopic="+m.getTopic()); System.out.println("payload=" + new String(m.getPayload())); System.out.println("generateTime=" + m.getGenerateTime()); // 此处标记CommitSuccess已消费,IoT平台会删除当前Message, // 否则会保留到过期时间 return MessageCallback.Action.CommitSuccess; }); }}
3,数据解析
3.1关于数据解析
由于低配置且资源受限或者对网络流量有要求的设备,不适合直接构造JSON数据和云端通信,因此选择将数据透传到云端,由云端运行转换脚本将透传的数据转换成Alink JSON格式的数据。您可以在创建产品时,选择数据格式为 透传/自定义格式 ,目前转换脚本通过JavaScript语言开发,需要开发者自行开发转换脚本。物联网平台为开发者提供了用于数据解析的在线脚本编辑器,方便您进行在线的编辑和模拟调试。
3.2数据解析流程:
4,使用限制
当您使用服务端订阅时,请注意以下限制。
限制
描述
JDK版本
仅支持JDK8。
认证超时
连接建立之后,需要立刻发送认证请求。如果15秒内没有认证成功,服务器将主动关闭连接。
数据超时
连接建立之后,客户端需要定期发送PING包来维持连接。发送PING包的时间间隔可以在客户端设置,默认为30秒,最大60秒。
若超过60秒发送PING包或数据,服务端会关闭连接。
若超过设定的时间,客户端没有收到PONG包或数据应答,SDK将主动断开重连,默认时间间隔为60秒。
推送超时
推送失败重试消息时,每次批量推送10条。若该批次消息在10秒后,仍未收到客户端回复的ACK,则认为推送超时。
失败推送重试策略
每60秒重新推送一次因客户端离线、消息消费慢等原因导致的堆积消息。
消息保存时长
QoS0的消息保存1天,QoS1的消息保存7天。
SDK实例个数
每个 账号最多可以启动64个SDK实例。
单租户限流限制
默认单租户的限流限制为1,000 QPS。如果您有特殊需求,请提交工单。
二,规则引擎 - 设备数据流转
1,什么是规则引擎
当设备基于 进行通信时,您可以使用规则引擎,编写SQL对Topic中的数据进行处理,并配置转发规则将处理后的数据转发到 其他服务。例如:
- 可以转发到 、 、 中进行存储。
- 可以转发到 中,使用 进行流计算,使用 进行大规模离线计算。
- 可以转发到 进行事件计算。
- 可以转发到另一个Topic中实现M2M通信。
- 可以转发到 实现高可靠消费数据·。
注意: 如果数据格式为二进制的数据,不支持转发至表格存储、时序时空数据库和云数据库RDS版。
说明:二进制数据可使用 to_base64(*)将原始数据转换成base64String,同时支持内置函数和条件筛选。
2,如何使用
具体使用细节参考: 非常详细,在页面一顿配置即可,在此不再长篇大论。
3, 使用限制
- 规则引擎基于Topic对数据进行处理。只有通过Topic进行通信时,才能使用规则引擎。
- 规则引擎通过SQL对Topic中的数据进行处理。
- SQL语法目前不支持子查询。
- 支持部分函数,比如
deviceName()
获取当前设备名称,具体请参考函数列表。
描述
限制
单账号最多可以设置1000条规则。
1000
一条规则中转发数据的操作不能超过10个。
10
数据转发性能依赖所转发的云产品实例,在云产品实例性能足够的情况下,规则引擎为单个 账号提供1000QPS的数据转发能力。即,子账号共享主账号配额;消息并发处理,每秒最多允许1000条消息经过规则引擎流转到其他云产品。 如果请求量超出该限制或云产品写入耗时超过1s,数据转发会被限流。
1000QPS
数据转发依赖其他云产品,使用时需确保目标云产品实例正常。目标云产品的实例宕机、欠费、参数错误(如授权变更、值非法)、配置错误等异常状况将会导致消息流转失败。
规则引擎不保证消息只到达一次,在分布式环境下,某些rebalance短暂不一致可能导致一条消息发送多次情况,应用方收到消息后需要去重。
三,阿里收费规则
1,阿里收费规则
案例6:高级版
高级版的收费项目包括消息传输费用和设备管理费用。
有一个设备每秒向物联网平台发送0.4 KB大小的消息,然后物联网平台将该消息传输到5个设备,1个应用程序,即5个设备和1个应用程序接收该消息。当月有30天。
计费项:
- 消息传输费用
每条消息大小0.4 KB,小于每条消息的大小限制(0.5 KB),每条消息计为1条消息。
每月发送消息数:1×60×60×24×30=259.2万。
每月接收消息数:6×1×60×60×24×30=1555.2万。
总消息数:259.2万+1555.2万=1814.4万。
前100万条消息免费,所以,当月需付费的消息数:1814.4万–100万=1714.4万。
按照3.6元/100万条消息计费,当月消息传输费用:1714.4万x(3.6元/100万)=61.72元。
- 设备管理费用
日活设备数量为6个(1个消息发送设备和5个消息接收设备),小于每天每个账号10个免费日活设备赠额,所以设备管理费用为0元。
当月应付费用:61.72元+0元=61.72元。
注意:
您可以免费使用规则引擎转发消息,但是将数据转发至其他云产品中时,其他云产品正常计费。