Blog
flink-41 常用Connector读写
需要注意 服务器上 flink客户端依赖和idea中编码依赖有不同
只要修改依赖,修改配置 都需要重启yarn-session和sql-client
普通kafka表
创建kafka的映射表
CREATE TABLE t1 (
`event_time` TIMESTAMP(3) METADATA FROM 'timestamp',
`partition` BIGINT METADATA VIRTUAL,
`offset` BIGINT METADATA VIRTUAL,
id INT,
ts BIGINT,
vc INT
) WITH (
'connector' = 'kafka',
'properties.bootstrap.servers' = 'hadoop001:9092,hadoop002:9092,hadoop003:9092',
'properties.group.id' = 't1',
'scan.startup.mode' = 'earliest-offset',
'sink.partitioner' = 'fixed',
'topic' = 't1',
'format' = 'json'
);
- scan.startup.mode
- earliest-offset
- latest-offset
- group-offsets
- timestamp
- specific-offsets
- sink.partitioner
- fixed
- 如果flink2个并发 kafka一个分区 则都往一个分区写,如果kafka4个分区 则只往前两个分区写
- fixed
Upsert-Kafka表
如果当前表存在更新操作,那么普通的kafka连接器将无法满足,此时可以使用Upsert连接器
- 强制取最新
- 如果value为空视为删除 则给null值
- flink将根据主键列的值对数据进行分区 保证主键上的消息有序 所以必须定义主键
创建upsert-kafka的映射表(必须定义主键)
CREATE TABLE t2 (
id INT,
ts BIGINT,
sumVC DOUBLE,
PRIMARY KEY (id, ts) NOT ENFORCED
) WITH (
'connector' = 'upsert-kafka',
'properties.bootstrap.servers' = 'hadoop001:9092,hadoop002:9092',
'topic' = 't2',
'key.format' = 'json',
'value.format' = 'json'
);
INSERT INTO t2 SELECT id, ts, sum(vc) a FROM source GROUP BY id, ts;
upsert-kafka无法从指定的偏移量读取,只会从主题的源读取,如此才直到整改数据的更新过程,并且通过-U +U +I 等符号来显示数据的变化过程
File
创建filesystem映射表
CREATE TABLE t3(id INT, ts BIGINT, vc INT)
WITH (
'connector' = 'filesystem',
'path' = 'hdfs://hadoop001:8020/data/t3',
'format' = 'csv'
);
会报错 找不到类
解决方法一:去掉hive连接器 然后重启flink集群、sql-client
方法二:替换flink-table-planner-loader-1.17.1.jar
mv /data/flink/opt/flink-table-planner_2.12-1.17.1.jar /data/flink/lib/
mv /data/flink/lib/flink-table-planner-loader-1.17.1.jar /data/flink/opt/
JDBC (mysql)
flink在将数据写入外部数据库时使用DDL中定义的主键,如果定义了主键,则连接器以upsert模式操作,否则连接器以追加模式操作
在upsert模式下flink会更具主键插入新行或更新现有行
mysql中建表
CREATE TABLE `ws2` (
`id` INT(11) NOT NULL,
`ts` BIGINT(20) DEFAULT NULL,
`vc` INT(11) DEFAULT NULL,
PRIMARY KEY ('id')
) ENGINE=InnoDB DEFAULT CHAESET=utf8;
CREATE TABLE ws2 (
id INT,
ts BIGINT,
vc PRIMARY KEY(id) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://xxxx',
'username' = 'xxx',
'password' = 'xxxx',
'connection.max-retry-timeout' = '60s',
'table-name' = 'ws2',
'sink.buffer-flush.max-rows' = '500',
'sink.buffer-flush.interval' = '5s',
'sink.max-retries' = '3',
'sink.parallelism' = '1'
);