Blog
flink-44 代码中使用flinksql
引入依赖
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-runtime</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-files</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-loader</artifactId>
<version>${flink.version}</version>
</dependency>
- flink-table-planner-loader新版本写法 需要
- flink-connector-files 现在独立包 需要单独引入
创建表执行环境
使用Table Api和sql需要一个特别的运行时环境,这就是所谓的表环境TableEnvironment
它主要负责:
- 注册Catalog和表
- 执行SQL查询
- 注册用户自定义函数UDF
- DataStream和表之间的转换
每个表和SQL的执行,都必须绑定在一个表执行环境中。
package com.learn.flink.sql;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import static org.apache.flink.table.api.Expressions.$;
public class SqlDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// TODO 1. 创建表环境
// 1.1 创建配置对象
// 写法一
// EnvironmentSettings settings = EnvironmentSettings.newInstance()
// .inStreamingMode()
// .build();
// StreamTableEnvironment.create(env, settings);
// 写法二
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// TODO 2. 创建表
tableEnv.executeSql("CREATE TABLE source (\n" +
" id INT,\n" +
" ts BIGINT,\n" +
" vc INT\n" +
") WITH (\n" +
" 'connector' = 'datagen',\n" +
" 'rows-per-second' = '1',\n" +
" 'fields.id.kind' = 'random',\n" +
" 'fields.id.min' = '1',\n" +
" 'fields.id.max' = '10',\n" +
" 'fields.ts.kind' = 'sequence',\n" +
" 'fields.ts.start' = '1',\n" +
" 'fields.ts.end' = '1000000',\n" +
" 'fields.vc.kind' = 'random',\n" +
" 'fields.vc.min' = '1',\n" +
" 'fields.vc.max' = '100'\n" +
");");
tableEnv.executeSql("CREATE TABLE sink (\n" +
" id INT,\n" +
" sumVc BIGINT\n" +
") WITH (\n" +
" 'connector' = 'print'\n" +
");");
// TODO 3. 执行查询
// 使用sql进行查询
Table table = tableEnv.sqlQuery("SELECT id, sum(vc) as sumVc FROM source WHERE id > 5 GROUP BY id ;");
// 把table对象注册成表名
tableEnv.createTemporaryView("tmp", table);
tableEnv.sqlQuery("SELECT * FROM tmp where id > 7");
// 使用table api进行查询
// Table source = tableEnv.from("source");
// Table result = source
// .where($("id").isGreater(5))
// .groupBy($("id"))
// .aggregate($("vc").sum().as("sumVc"))
// .select($("id"), $("sumVc"));
// TODO 4. 输出表
// 4.1 sql用法
tableEnv.executeSql("INSERT INTO sink SELECT * FROM tmp");
// 4.2 tableapi用法
// result.insertInto("sink");
}
}
表和流的转换
package com.learn.flink.source;
import com.learn.flink.bean.WaterSensor;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
public class TableStreamDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<WaterSensor> sensorDS = env.fromElements(
new WaterSensor("s1", 1L, 1),
new WaterSensor("s1", 2L, 2),
new WaterSensor("s2", 2L, 2),
new WaterSensor("s3", 3L, 3),
new WaterSensor("s3", 4L, 4)
);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// 流转表
Table sensorTable = tableEnv.fromDataStream(sensorDS);
tableEnv.createTemporaryView("sensor", sensorTable);
Table filterTable = tableEnv.sqlQuery("SELECT id, ts, vc FROM sensor WHERE ts > 2");
Table sumTable = tableEnv.sqlQuery("SELECT id, sum(vc) FROM sensor GROUP BY id");
// 表转流
// 追加流
tableEnv.toDataStream(filterTable, WaterSensor.class).print("filter");
// 撤回流
tableEnv.toChangelogStream(sumTable).print("sum");
// 只要代码中调用了datastreamapi就需要execute否则不需要
env.execute();
}
}