Blog
flink-11 源算子 Source
从Flink1.13开始,主要使用流批统一的新Source架构
DataStreamSource<String> stream = env.fromSource(...)
准备工作
为方便练习,这里使用WaterSensor作为数据模型
字段名 | 数据类型 | 说明 |
id | String | 水位线传感器id |
ts | Long | 传感器生成记录时间戳 |
vc | Integer | 水位记录 |
从集合读取
ackage com.learn.flink.source;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.Arrays;
public class CollectionDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<Integer> source = env
// .fromCollection(Arrays.asList(1, 2, 3));
.fromElements(1, 2, 3);
source.print();
env.execute();
}
}
从文件读取
- 需要添加依赖flink-connector-files
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-files</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
package com.learn.flink.source;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.connector.file.src.FileSource;
import org.apache.flink.connector.file.src.reader.TextLineInputFormat;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class FileDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// 从文件读 需要添加依赖flink-connector-files
FileSource<String> fileSource = FileSource
.forRecordStreamFormat(
new TextLineInputFormat(),
new Path("input/words.txt")
).build();
DataStreamSource<String> streamSource = env.fromSource(fileSource, WatermarkStrategy.noWatermarks(), "fileSource");
streamSource.print();
env.execute();
}
}
新的source写法env.fromSource(Source的实现类, Watermark, 名字)
- ctrl + H快捷键可查看接口的实现类
从kafka读取数据
- 需要添加依赖flink-connector-kafka
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
package com.learn.flink.source;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class KafkaDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
KafkaSource<String> kafkaSource = KafkaSource.<String>builder()
.setBootstrapServers("hadoop001:9092,hadoop002:9092,hadoop003:9092")
.setGroupId("test")
.setTopics("topic_1")
.setValueOnlyDeserializer(new SimpleStringSchema())
.setStartingOffsets(OffsetsInitializer.earliest())
.build();
env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "kafkaSource").print();
env.execute();
}
}
从数据生成器读取数据
- 需要添加依赖flink-connector-datagen
package com.learn.flink.source;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy;
import org.apache.flink.connector.datagen.source.DataGeneratorSource;
import org.apache.flink.connector.datagen.source.GeneratorFunction;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class DataGenDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataGeneratorSource<String> dataGeneratorSource = new DataGeneratorSource<>(
new GeneratorFunction<Long, String>() {
@Override
public String map(Long value) throws Exception {
return "Number:" + value;
}
},
10,
RateLimiterStrategy.perSecond(1),
Types.STRING
);
env.fromSource(dataGeneratorSource, WatermarkStrategy.noWatermarks(), "datagenSource").print();
env.execute();
}
}
数据生成器Source四个参数:
- 第一个 GeneratorFunction接口,需要实现,重写map方法,输入类型固定是Long
- 第二个Long类型,自动生成的数字序列(从1自增)的最大值,达到这个值就停止了
- 第三个限速策略 比如每秒生成几条数据
- 第四个 返回值类型
- 如果有n个并行度 最大值位a,会将数据均分为n份 a/n, 比如最大值100 并行度2,每个并行度生成50个数