Flink, 大数据

flink-41 常用Connector读写

需要注意 服务器上 flink客户端依赖和idea中编码依赖有不同

只要修改依赖,修改配置 都需要重启yarn-session和sql-client

普通kafka表

创建kafka的映射表

CREATE TABLE t1 (
    `event_time` TIMESTAMP(3) METADATA FROM 'timestamp',
    `partition` BIGINT METADATA VIRTUAL,
    `offset` BIGINT METADATA VIRTUAL,
    id INT,
    ts BIGINT,
    vc INT
) WITH (
    'connector' = 'kafka',
    'properties.bootstrap.servers' = 'hadoop001:9092,hadoop002:9092,hadoop003:9092',
    'properties.group.id' = 't1',
    'scan.startup.mode' = 'earliest-offset',
    'sink.partitioner' = 'fixed',
    'topic' = 't1',
    'format' = 'json'
);
  • scan.startup.mode
    • earliest-offset
    • latest-offset
    • group-offsets
    • timestamp
    • specific-offsets
  • sink.partitioner
    • fixed
      • 如果flink2个并发 kafka一个分区 则都往一个分区写,如果kafka4个分区 则只往前两个分区写

Upsert-Kafka表

如果当前表存在更新操作,那么普通的kafka连接器将无法满足,此时可以使用Upsert连接器

  • 强制取最新
  • 如果value为空视为删除 则给null值
  • flink将根据主键列的值对数据进行分区 保证主键上的消息有序 所以必须定义主键

创建upsert-kafka的映射表(必须定义主键)

CREATE TABLE t2 (
    id INT,
    ts BIGINT,
    sumVC DOUBLE,
    PRIMARY KEY (id, ts) NOT ENFORCED
) WITH (
    'connector' = 'upsert-kafka',
    'properties.bootstrap.servers' = 'hadoop001:9092,hadoop002:9092',
    'topic' = 't2',
    'key.format' = 'json',
    'value.format' = 'json'
);
INSERT INTO t2 SELECT id, ts, sum(vc) a FROM source GROUP BY id, ts;

upsert-kafka无法从指定的偏移量读取,只会从主题的源读取,如此才直到整改数据的更新过程,并且通过-U +U +I 等符号来显示数据的变化过程

File

创建filesystem映射表

CREATE TABLE t3(id INT, ts BIGINT, vc INT)
WITH (
    'connector' = 'filesystem',
    'path' = 'hdfs://hadoop001:8020/data/t3',
    'format' = 'csv'
);

会报错 找不到类

解决方法一:去掉hive连接器 然后重启flink集群、sql-client

方法二:替换flink-table-planner-loader-1.17.1.jar

mv /data/flink/opt/flink-table-planner_2.12-1.17.1.jar /data/flink/lib/
mv /data/flink/lib/flink-table-planner-loader-1.17.1.jar /data/flink/opt/

JDBC (mysql)

flink在将数据写入外部数据库时使用DDL中定义的主键,如果定义了主键,则连接器以upsert模式操作,否则连接器以追加模式操作

在upsert模式下flink会更具主键插入新行或更新现有行

mysql中建表

CREATE TABLE `ws2` (
    `id` INT(11) NOT NULL,
    `ts` BIGINT(20) DEFAULT NULL,
    `vc` INT(11) DEFAULT NULL,
    PRIMARY KEY ('id')
) ENGINE=InnoDB DEFAULT CHAESET=utf8;
CREATE TABLE ws2 (
    id INT,
    ts BIGINT,
    vc PRIMARY KEY(id) NOT ENFORCED
) WITH (
    'connector' = 'jdbc',
    'url' = 'jdbc:mysql://xxxx',
    'username' = 'xxx',
    'password' = 'xxxx',
    'connection.max-retry-timeout' = '60s',
    'table-name' = 'ws2',
    'sink.buffer-flush.max-rows' = '500',
    'sink.buffer-flush.interval' = '5s',
    'sink.max-retries' = '3',
    'sink.parallelism' = '1'
);