Flink, 大数据

flink-11 源算子 Source

从Flink1.13开始,主要使用流批统一的新Source架构

DataStreamSource<String> stream = env.fromSource(...)

准备工作

为方便练习,这里使用WaterSensor作为数据模型

字段名数据类型说明
idString水位线传感器id
tsLong传感器生成记录时间戳
vcInteger水位记录

从集合读取

ackage com.learn.flink.source;

import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.util.Arrays;

public class CollectionDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<Integer> source = env
//                .fromCollection(Arrays.asList(1, 2, 3));
                .fromElements(1, 2, 3);
        source.print();
        env.execute();
    }
}

从文件读取

  • 需要添加依赖flink-connector-files
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-files</artifactId>
    <version>${flink.version}</version>
    <scope>provided</scope>
</dependency>
package com.learn.flink.source;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.connector.file.src.FileSource;
import org.apache.flink.connector.file.src.reader.TextLineInputFormat;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class FileDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        // 从文件读 需要添加依赖flink-connector-files
        FileSource<String> fileSource = FileSource
                .forRecordStreamFormat(
                        new TextLineInputFormat(),
                        new Path("input/words.txt")
                ).build();
        DataStreamSource<String> streamSource = env.fromSource(fileSource, WatermarkStrategy.noWatermarks(), "fileSource");
        streamSource.print();
        env.execute();
    }
}

新的source写法env.fromSource(Source的实现类, Watermark, 名字)

  • ctrl + H快捷键可查看接口的实现类

从kafka读取数据

  • 需要添加依赖flink-connector-kafka
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka</artifactId>
    <version>${flink.version}</version>
    <scope>provided</scope>
</dependency>
package com.learn.flink.source;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;


public class KafkaDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        KafkaSource<String> kafkaSource = KafkaSource.<String>builder()
                .setBootstrapServers("hadoop001:9092,hadoop002:9092,hadoop003:9092")
                .setGroupId("test")
                .setTopics("topic_1")
                .setValueOnlyDeserializer(new SimpleStringSchema())
                .setStartingOffsets(OffsetsInitializer.earliest())
                .build();
        env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "kafkaSource").print();
        env.execute();
    }
}

从数据生成器读取数据

  • 需要添加依赖flink-connector-datagen
package com.learn.flink.source;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy;
import org.apache.flink.connector.datagen.source.DataGeneratorSource;
import org.apache.flink.connector.datagen.source.GeneratorFunction;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class DataGenDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataGeneratorSource<String> dataGeneratorSource = new DataGeneratorSource<>(
                new GeneratorFunction<Long, String>() {
                    @Override
                    public String map(Long value) throws Exception {
                        return "Number:" + value;
                    }
                },
                10,
                RateLimiterStrategy.perSecond(1),
                Types.STRING
        );
        env.fromSource(dataGeneratorSource, WatermarkStrategy.noWatermarks(), "datagenSource").print();
        env.execute();
    }
}

数据生成器Source四个参数:

  • 第一个 GeneratorFunction接口,需要实现,重写map方法,输入类型固定是Long
  • 第二个Long类型,自动生成的数字序列(从1自增)的最大值,达到这个值就停止了
  • 第三个限速策略 比如每秒生成几条数据
  • 第四个 返回值类型
  • 如果有n个并行度 最大值位a,会将数据均分为n份 a/n, 比如最大值100 并行度2,每个并行度生成50个数