目录

InfluxDB写入测试

目录

InfluxDB写入测试

早几年测试时序库时,采集数据到kafka,然后用不同数据进行存储验证。Influxdb是花时间比较多的,它的数据建模方法、读写方法都需要使用特殊的API。时间久了自己也经常忘记,把当时的测试关键代码记录下来,也方便日后查找。

代码基于java编写。

1、接口数据定义,clientid+tag组合必须唯一

public class KafkaInfo{
        //客户端id
        public String clientid;

        //测点名称
        public String tag ;

        //数据时戳
        public String ts ;

        //测点数据类型
        public String vt;

        //测点值
        public String value;

        //本次数据更新次数
        public long updatacount;

        //测点说明
        public String desc;
}

2、数据写入

    static Logger logger = Logger.getLogger(influxdbApplicationTest.class);

    // You can generate a Token from the "Tokens Tab" in the UI
    static String token = "web界面创建的token";
    static String bucket = "数据分库名";
    static String org = "初始化时创建的org";

    public static void main(String[] args) {
        logger.info("-------start ,ApplicationTest--------");

        InfluxDBClient client = InfluxDBClientFactory.create("http://10.126.12.113:8086", token.toCharArray());

        //
        // Write data
        //
        try (WriteApi writeApi = client.getWriteApi()) {

            //
            // Write by Data Point
            //
            for ( int i = 0;i < 10; i ++ ) {
                Point point = Point.measurement("Line123")
                        .addTag("tag", "Tags001")
                        .addTag("L1", "real")
                        .addField("value", 20*i)
                        .addField("update", 7000+i)
                        .time(Instant.now().toEpochMilli(), WritePrecision.MS);

                //writeApi.writePoint(bucket, org, point);
            }

        }

        //
        // Query data
        //
        String query = String.format("from(bucket: \"%s\") |> range(start: -1h) |> filter(fn: (r) =>r._measurement==\"Line123\"", bucket);

        QueryApi queryApi = client.getQueryApi();

        List<FluxTable> tables = queryApi.query(query,org);
        for (FluxTable fluxTable : tables) {
            List<FluxRecord> records = fluxTable.getRecords();
            for (FluxRecord fluxRecord : records) {
                System.out.println(fluxRecord.getTime() + ": " + fluxRecord.getMeasurement()+","+ fluxRecord.getValueByKey("tag") +","+ fluxRecord.getValueByKey("_value"));
            }
        }

        client.close();

        logger.info("-------finish ,ApplicationTest--------");

    }