Flink, 大数据

flink-44 代码中使用flinksql

引入依赖

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-api-java-bridge</artifactId>
    <version>${flink.version}</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-runtime</artifactId>
    <version>${flink.version}</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-files</artifactId>
    <version>${flink.version}</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-planner-loader</artifactId>
    <version>${flink.version}</version>
</dependency>
  • flink-table-planner-loader新版本写法 需要
  • flink-connector-files 现在独立包 需要单独引入

创建表执行环境

使用Table Api和sql需要一个特别的运行时环境,这就是所谓的表环境TableEnvironment

它主要负责:

  • 注册Catalog和表
  • 执行SQL查询
  • 注册用户自定义函数UDF
  • DataStream和表之间的转换

每个表和SQL的执行,都必须绑定在一个表执行环境中。

package com.learn.flink.sql;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

import static org.apache.flink.table.api.Expressions.$;

public class SqlDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // TODO 1. 创建表环境
        // 1.1 创建配置对象
        // 写法一
//        EnvironmentSettings settings = EnvironmentSettings.newInstance()
//                .inStreamingMode()
//                .build();
//        StreamTableEnvironment.create(env, settings);

        // 写法二
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        // TODO 2. 创建表
        tableEnv.executeSql("CREATE TABLE source (\n" +
                "    id INT,\n" +
                "    ts BIGINT,\n" +
                "    vc INT\n" +
                ") WITH (\n" +
                "    'connector' = 'datagen',\n" +
                "    'rows-per-second' = '1',\n" +
                "    'fields.id.kind' = 'random',\n" +
                "    'fields.id.min' = '1',\n" +
                "    'fields.id.max' = '10',\n" +
                "    'fields.ts.kind' = 'sequence',\n" +
                "    'fields.ts.start' = '1',\n" +
                "    'fields.ts.end' = '1000000',\n" +
                "    'fields.vc.kind' = 'random',\n" +
                "    'fields.vc.min' = '1',\n" +
                "    'fields.vc.max' = '100'\n" +
                ");");

        tableEnv.executeSql("CREATE TABLE sink (\n" +
                "    id INT,\n" +
                "    sumVc BIGINT\n" +
                ") WITH (\n" +
                "    'connector' = 'print'\n" +
                ");");

        // TODO 3. 执行查询
        // 使用sql进行查询
        Table table = tableEnv.sqlQuery("SELECT id, sum(vc) as sumVc FROM source WHERE id > 5 GROUP BY id ;");
        // 把table对象注册成表名
        tableEnv.createTemporaryView("tmp", table);
        tableEnv.sqlQuery("SELECT * FROM tmp where id > 7");

        // 使用table api进行查询
//        Table source = tableEnv.from("source");
//        Table result = source
//                .where($("id").isGreater(5))
//                .groupBy($("id"))
//                .aggregate($("vc").sum().as("sumVc"))
//                .select($("id"), $("sumVc"));

        // TODO 4. 输出表
        // 4.1 sql用法
        tableEnv.executeSql("INSERT INTO sink SELECT * FROM tmp");
        // 4.2 tableapi用法
//        result.insertInto("sink");
        
    }
}

表和流的转换

package com.learn.flink.source;

import com.learn.flink.bean.WaterSensor;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

public class TableStreamDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<WaterSensor> sensorDS = env.fromElements(
                new WaterSensor("s1", 1L, 1),
                new WaterSensor("s1", 2L, 2),
                new WaterSensor("s2", 2L, 2),
                new WaterSensor("s3", 3L, 3),
                new WaterSensor("s3", 4L, 4)
        );

        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
        // 流转表
        Table sensorTable = tableEnv.fromDataStream(sensorDS);
        tableEnv.createTemporaryView("sensor", sensorTable);

        Table filterTable = tableEnv.sqlQuery("SELECT id, ts, vc FROM sensor WHERE ts > 2");
        Table sumTable = tableEnv.sqlQuery("SELECT id, sum(vc) FROM sensor GROUP BY id");

        // 表转流
        // 追加流
        tableEnv.toDataStream(filterTable, WaterSensor.class).print("filter");
        // 撤回流
        tableEnv.toChangelogStream(sumTable).print("sum");

        // 只要代码中调用了datastreamapi就需要execute否则不需要
        env.execute();
    }
}