Flink, 大数据

flink-04 YARN运行模式

Hadoop版本需要在2.2以上

配置环境变量

#HADOOP_HOME
export HADOOP_HOME=/data/hadoop
export PATH=$PATH:$HADOOP_HOME/bin
export PATH=$PATH:$HADOOP_HOME/sbin

export HADOOP_CLASSPATH=`hadoop classpath`
export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop

启动hadoop集群

会话模式部署

Yarn的绘画模式与独立集群略有不同,需要首先申请一个yarn会话YARN Session来启动Flink集群

1、启动集群

  • 启动hadoop集群(HDFS YARN)
  • 执行脚本命令向yarn集群申请资源,开启一个yarn会话,启动flink集群
/data/flink/bin/yarn-session.sh -nm test

可用参数

  • -d 分离模式,如果你不想让flink yarn客户端一直前台运行,可以使用这个参数,即使关掉当前会话窗口 yarn session也可以后台运行
  • -jm(–jobManagerMemory)配置jobmanager所需内存,默认单位MB
  • -nm(–name)配置在YARN UI界面上显示的任务名
  • -qu(–queue)指定yarn队列名
  • -tm(–taskManager)配置每个TaskManager所使用内存

注意flink.1.11.0版本开始不再使用-n参数和-s参数分别指定TaskManager数量和slot数量

Yarn会按照需求动态分配TaskManager和slot,所以从这个意义上讲,yarn的会话模式也不会把集群资源固定,同样是动态分配的

Yarn session启动之后会给出一个WEB UI地址以及一个yarn application id 如下所示,用户可以通过web ui 或者命令行两种方式提交作业

bin/flink run -c com.learn.flink.wc.WordCountUnboundedDemo /root/FlinkLearn1.17-1.0-SNAPSHOT.jar

单作业模式部署

bin/flink run -t yarn-per-job -c com.learn.flink.wc.WordCountUnboundedDemo /root/FlinkLearn1.17-1.0-SNAPSHOT.jar

查看或取消作业

bin/flink list -t yarn-per-job -Dyarn.application.id=application_1688864706187_0003
bin/flink cancel -t yarn-per-job -Dyarn.application.id=application_1688864706187_0003 3e6f8cbe5e1f8af0e89cf6e430498c37
  • 取消作业时候要指定application_id和job_id

如果使用-d 参数报错:classloader.check-leaked-classloader

Exception in thread "Thread-5" java.lang.IllegalStateException: Trying to access closed classloader. Please check if you store classloaders directly or indirectly in static fields. If the stacktrace suggests that the leak occurs in a third party library and cannot be fixed immediately, you can disable this check with the configuration 'classloader.check-leaked-classloader'.
    at org.apache.flink.util.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.ensureInner(FlinkUserCodeClassLoaders.java:184)
    at org.apache.flink.util.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.getResource(FlinkUserCodeClassLoaders.java:208)
    at org.apache.hadoop.conf.Configuration.getResource(Configuration.java:2780)
    at org.apache.hadoop.conf.Configuration.getStreamReader(Configuration.java:3036)
    at org.apache.hadoop.conf.Configuration.loadResource(Configuration.java:2995)
    at org.apache.hadoop.conf.Configuration.loadResources(Configuration.java:2968)
    at org.apache.hadoop.conf.Configuration.getProps(Configuration.java:2848)
    at org.apache.hadoop.conf.Configuration.get(Configuration.java:1200)
    at org.apache.hadoop.conf.Configuration.getTimeDuration(Configuration.java:1812)
    at org.apache.hadoop.conf.Configuration.getTimeDuration(Configuration.java:1789)
    at org.apache.hadoop.util.ShutdownHookManager.getShutdownTimeout(ShutdownHookManager.java:183)
    at org.apache.hadoop.util.ShutdownHookManager.shutdownExecutor(ShutdownHookManager.java:145)
    at org.apache.hadoop.util.ShutdownHookManager.access$300(ShutdownHookManager.java:65)
    at org.apache.hadoop.util.ShutdownHookManager$1.run(ShutdownHookManager.java:102)

解决办法:vim flink-conf.yaml

classloader.check-leaked-classloader: false

应用模式部署

与单作业模式类似,直接执行flink run-application命令即可

bin/flink run-application -t yarn-application -d -c com.learn.flink.wc.WordCountUnboundedDemo /root/FlinkLearn1.17-1.0-SNAPSHOT.jar

查看或取消任务

bin/flink list -t yarn-application -Dyarn.application.id=application_1688864706187_0006
bin/flink cancel -t yarn-application -Dyarn.application.id=application_1688864706187_0006 112f4afa9a5e031707b2eb711434cd6f

上传HDFS提交

可以通过yarn.provided.lib.dirs配置选项指定位置,将flink的依赖上传到远程

# 上传flink的lib和plugin文件夹
hadoop fs -mkdir /flink-dist
hadoop fs -put lib/ /flink-dist
hadoop fs -put plugins/ /flink-dist

# 上传flink应用jar包
hadoop fs -mkdir /flink-jars
hadoop fs -put FlinkLearn1.17-1.0-SNAPSHOT.jar /flink-jars

运行

bin/flink run-application -t yarn-application -d -Dyarn.provided.lib.dirs="hdfs://hadoop600:8020/flink-dist" -c com.learn.flink.wc.WordCountUnboundedDemo hdfs://hadoop600:8020/flink-jars/FlinkLearn1.17-1.0-SNAPSHOT.jar