Blog
flink-04 YARN运行模式
Hadoop版本需要在2.2以上
配置环境变量
#HADOOP_HOME
export HADOOP_HOME=/data/hadoop
export PATH=$PATH:$HADOOP_HOME/bin
export PATH=$PATH:$HADOOP_HOME/sbin
export HADOOP_CLASSPATH=`hadoop classpath`
export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop
启动hadoop集群
会话模式部署
Yarn的绘画模式与独立集群略有不同,需要首先申请一个yarn会话YARN Session来启动Flink集群
1、启动集群
- 启动hadoop集群(HDFS YARN)
- 执行脚本命令向yarn集群申请资源,开启一个yarn会话,启动flink集群
/data/flink/bin/yarn-session.sh -nm test
可用参数
- -d 分离模式,如果你不想让flink yarn客户端一直前台运行,可以使用这个参数,即使关掉当前会话窗口 yarn session也可以后台运行
- -jm(--jobManagerMemory)配置jobmanager所需内存,默认单位MB
- -nm(--name)配置在YARN UI界面上显示的任务名
- -qu(--queue)指定yarn队列名
- -tm(--taskManager)配置每个TaskManager所使用内存
注意flink.1.11.0版本开始不再使用-n参数和-s参数分别指定TaskManager数量和slot数量
Yarn会按照需求动态分配TaskManager和slot,所以从这个意义上讲,yarn的会话模式也不会把集群资源固定,同样是动态分配的。
Yarn session启动之后会给出一个WEB UI地址以及一个yarn application id 如下所示,用户可以通过web ui 或者命令行两种方式提交作业
bin/flink run -c com.learn.flink.wc.WordCountUnboundedDemo /root/FlinkLearn1.17-1.0-SNAPSHOT.jar
单作业模式部署
bin/flink run -t yarn-per-job -c com.learn.flink.wc.WordCountUnboundedDemo /root/FlinkLearn1.17-1.0-SNAPSHOT.jar
查看或取消作业
bin/flink list -t yarn-per-job -Dyarn.application.id=application_1688864706187_0003
bin/flink cancel -t yarn-per-job -Dyarn.application.id=application_1688864706187_0003 3e6f8cbe5e1f8af0e89cf6e430498c37
- 取消作业时候要指定application_id和job_id
如果使用-d 参数报错:classloader.check-leaked-classloader
Exception in thread "Thread-5" java.lang.IllegalStateException: Trying to access closed classloader. Please check if you store classloaders directly or indirectly in static fields. If the stacktrace suggests that the leak occurs in a third party library and cannot be fixed immediately, you can disable this check with the configuration 'classloader.check-leaked-classloader'.
at org.apache.flink.util.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.ensureInner(FlinkUserCodeClassLoaders.java:184)
at org.apache.flink.util.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.getResource(FlinkUserCodeClassLoaders.java:208)
at org.apache.hadoop.conf.Configuration.getResource(Configuration.java:2780)
at org.apache.hadoop.conf.Configuration.getStreamReader(Configuration.java:3036)
at org.apache.hadoop.conf.Configuration.loadResource(Configuration.java:2995)
at org.apache.hadoop.conf.Configuration.loadResources(Configuration.java:2968)
at org.apache.hadoop.conf.Configuration.getProps(Configuration.java:2848)
at org.apache.hadoop.conf.Configuration.get(Configuration.java:1200)
at org.apache.hadoop.conf.Configuration.getTimeDuration(Configuration.java:1812)
at org.apache.hadoop.conf.Configuration.getTimeDuration(Configuration.java:1789)
at org.apache.hadoop.util.ShutdownHookManager.getShutdownTimeout(ShutdownHookManager.java:183)
at org.apache.hadoop.util.ShutdownHookManager.shutdownExecutor(ShutdownHookManager.java:145)
at org.apache.hadoop.util.ShutdownHookManager.access$300(ShutdownHookManager.java:65)
at org.apache.hadoop.util.ShutdownHookManager$1.run(ShutdownHookManager.java:102)
解决办法:vim flink-conf.yaml
classloader.check-leaked-classloader: false
应用模式部署
与单作业模式类似,直接执行flink run-application命令即可
bin/flink run-application -t yarn-application -d -c com.learn.flink.wc.WordCountUnboundedDemo /root/FlinkLearn1.17-1.0-SNAPSHOT.jar
查看或取消任务
bin/flink list -t yarn-application -Dyarn.application.id=application_1688864706187_0006
bin/flink cancel -t yarn-application -Dyarn.application.id=application_1688864706187_0006 112f4afa9a5e031707b2eb711434cd6f
上传HDFS提交
可以通过yarn.provided.lib.dirs配置选项指定位置,将flink的依赖上传到远程
# 上传flink的lib和plugin文件夹
hadoop fs -mkdir /flink-dist
hadoop fs -put lib/ /flink-dist
hadoop fs -put plugins/ /flink-dist
# 上传flink应用jar包
hadoop fs -mkdir /flink-jars
hadoop fs -put FlinkLearn1.17-1.0-SNAPSHOT.jar /flink-jars
运行
bin/flink run-application -t yarn-application -d -Dyarn.provided.lib.dirs="hdfs://hadoop600:8020/flink-dist" -c com.learn.flink.wc.WordCountUnboundedDemo hdfs://hadoop600:8020/flink-jars/FlinkLearn1.17-1.0-SNAPSHOT.jar