Blog
flink-18 输出算子
Flink1.12开始,同样重构了Sink架构
stream.sinkTo(...)
https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/connectors/datastream/overview/
package com.learn.flink.sink;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.connector.datagen.source.DataGeneratorSource;
import org.apache.flink.connector.datagen.source.GeneratorFunction;
import org.apache.flink.connector.file.sink.FileSink;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig;
import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.DateTimeBucketAssigner;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
import java.time.Duration;
import java.time.ZoneId;
public class SinkFile {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 每个目录中都会有并行度个数的文件在写入
env.setParallelism(2);
// 必须开启checkpoint,否则一致都是 .inprogress
env.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE);
DataGeneratorSource<String> dataGeneratorSource = new DataGeneratorSource<>(
new GeneratorFunction<Long, String>() {
@Override
public String map(Long value) throws Exception {
return "Number:" + value;
}
},
Long.MAX_VALUE,
RateLimiterStrategy.perSecond(10),
Types.STRING
);
DataStreamSource<String> dataGen = env.fromSource(dataGeneratorSource, WatermarkStrategy.noWatermarks(), "datagenSource");
// 输出到文件系统
FileSink<String> fileSink = FileSink // 输出行式存储的文件,指定路径、指定编码
.<String>forRowFormat(new Path("output/tmp/"), new SimpleStringEncoder<>("UTF-8"))
// 输出文件的配置:文件名的前缀、后缀等
.withOutputFileConfig(
OutputFileConfig.builder()
.withPartPrefix("fs-")
.withPartSuffix(".log")
.build()
)
// 按照目录分桶
.withBucketAssigner(new DateTimeBucketAssigner<>("yyyy-MM-dd HH", ZoneId.systemDefault()))
// 指定滚动策略: 10s 1M
.withRollingPolicy(DefaultRollingPolicy.builder()
.withRolloverInterval(Duration.ofSeconds(10))
.withMaxPartSize(new MemorySize(1024 * 1024))
.build()
).build();
dataGen.sinkTo(fileSink);
env.execute();
}
}
输出到Kafka
package com.learn.flink.sink;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.kafka.clients.producer.ProducerConfig;
public class SinkKafka {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// 如果是精准一次,必须开启checkpoint 否则在精准一次 无法写入kafka
env.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE);
SingleOutputStreamOperator<String> sensorDS = env.socketTextStream("hadoop003", 7777);
KafkaSink<String> kafkaSink = KafkaSink.<String>builder()
.setBootstrapServers("hadoop001:9092,hadoop002:9092,hadoop003:9092")
// 指定序列化器:指定Topic名称、具体的序列化
.setRecordSerializer(
KafkaRecordSerializationSchema.<String>builder()
.setTopic("ws")
.setValueSerializationSchema(new SimpleStringSchema())
.build()
)
// 写到kafka的一致性级别:精准一次、至少一次
.setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
// 如果是精准一次必须设置事务的前缀
.setTransactionalIdPrefix("fs")
// 如果是精准一次,必须设置 事务超时时间:大于checkpoint间隔、小于max15分钟
.setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 10 * 60 * 1000 + "")
.build();
sensorDS.sinkTo(kafkaSink);
env.execute();
}
}
Caused by: org.apache.kafka.common.KafkaException: Unexpected error in InitProducerIdResponse; The transaction timeout is larger than the maximum value allowed by the broker (as configured by transaction.max.timeout.ms).
需要设置:
env.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE);
KafkaSink.<String>builder()
.XXXXXX
// 写到kafka的一致性级别:精准一次、至少一次
.setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
// 如果是精准一次必须设置事务的前缀
.setTransactionalIdPrefix("fs")
// 如果是精准一次,必须设置 事务超时时间:大于checkpoint间隔、小于max15分钟
.setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 10 * 60 * 1000 + "")
.build();
注意:如果要使用精准一次写入kafka,需要满足以下条件,缺一不可
- 开启checkpoint
- 设置事务前缀
- 设置事务超时时间:checkpoint间隔 < 事务超时时间 < kafka默认超时max的15分钟
自定义序列化器
package com.learn.flink.sink;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import javax.annotation.Nullable;
import java.nio.charset.StandardCharsets;
public class SinkKafkaWithKey {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE);
SingleOutputStreamOperator<String> sensorDS = env.socketTextStream("hadoop003", 7777);
/*
如果要指定写入kafka的key
可以自定义反序列化器:
1、实现一个接口,重写 序列化方法
2、指定key 转成 字节数组
3、指定value 转成 字节数组
4、返回一个 ProducerRecord对象 把key,value放进去
*/
KafkaSink<String> kafkaSink = KafkaSink.<String>builder()
.setBootstrapServers("hadoop001:9092,hadoop002:9092,hadoop003:9092")
.setRecordSerializer(
new KafkaRecordSerializationSchema<String>() {
@Nullable
@Override
public ProducerRecord<byte[], byte[]> serialize(String element, KafkaSinkContext context, Long timestamp) {
String[] datas = element.split(",");
byte[] key = datas[0].getBytes(StandardCharsets.UTF_8);
byte[] value = datas[1].getBytes(StandardCharsets.UTF_8);
return new ProducerRecord<>("ws", key, value);
}
}
)
.setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
.setTransactionalIdPrefix("fs")
.setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 10 * 60 * 1000 + "")
.build();
sensorDS.sinkTo(kafkaSink);
env.execute();
}
}
输出到mysql(jdbc)
添加依赖
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.33</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc</artifactId>
<version>3.1.0-1.17</version>
</dependency>
在test库下建表ws
CREATE TABLE `ws` (
`id` VARCHAR(100) NOT NULL,
`ts` BIGINT(20) DEFAULT NULL,
`vc` INT(11) DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=INNODB DEFAULT CHARSET=utf8;
当前只能用老的sink写法addSink()
package com.learn.flink.sink;
import com.learn.flink.bean.WaterSensor;
import com.learn.flink.functions.WaterSensorMapFunction;
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.connector.jdbc.JdbcStatementBuilder;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import java.sql.PreparedStatement;
import java.sql.SQLException;
public class SinkMysql {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
SingleOutputStreamOperator<WaterSensor> sensorDS = env.socketTextStream("hadoop003", 7777)
.map(new WaterSensorMapFunction());
SinkFunction<WaterSensor> sink = JdbcSink.sink(
"INSERT INTO ws VALUES (?, ?, ?)",
new JdbcStatementBuilder<WaterSensor>() {
@Override
public void accept(PreparedStatement preparedStatement, WaterSensor waterSensor) throws SQLException {
preparedStatement.setString(1, waterSensor.getId());
preparedStatement.setLong(2, waterSensor.getTs());
preparedStatement.setInt(3, waterSensor.getVc());
}
},
JdbcExecutionOptions.builder()
.withMaxRetries(3) // 重试次数
.withBatchSize(100) // 写出数据条件 100条数据 或 3s
.withBatchIntervalMs(3000)
.build()
,
new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
.withUrl("jdbc:mysql://xxxxxxxx:3306/test?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=UTF-8&useSSL=false")
.withUsername("xxx")
.withPassword("xxx")
.withConnectionCheckTimeoutSeconds(60) // 连接超时检测 默认60秒
.build()
);
sensorDS.addSink(sink);
env.execute();
}
}
JDBCSink的四个参数:
- 第一个参数:执行的sql,一般就是insert into
- 第二个参数:预编译sql,对占位符的填充
- 第三个参数:执行选项:攒批、重试
- 第四个参数:连接选项:url、用户名、密码
自定义sink
推荐使用官方连接器