Flink 获取最新的 Checkpoint 路径

Flink On Yarn运行时,作业的 ApplicationId 是随机生成的

Checkpoint 保存的位置是使用 ApplicationId 生成的路径(我们保存在 HDFS)

以下 Shell 脚本可以自动获取最新的 Checkpoint 保存路径


lastCheckpointExternalPath.sh

1
2
3
4
5
6
7
8
9
10
11
12
13
14
#! /bin/sh

jobname=$1

## 获取 checkpointid_path
# -t 按照时间取最新(倒序)
# -C 只显示值
# awk 'NR==1' 取第一行
checkpointid_path=`hadoop fs -ls -t -C /opt/user/flink/checkpoint/"$jobname" | awk 'NR==1'`

## 获取 lastCheckpointExternalPath
lastCheckpointExternalPath=`hadoop fs -ls -t -C "$checkpointid_path" | awk 'NR==1'`

echo "hdfs://hmdservice$lastCheckpointExternalPath"

启动脚本 demo

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
#!/bin/bash

flinkUbtOdsCheckpointPath=`sh /opt/sync/sync_script/flink/lastCheckpointExternalPath.sh flink-ubt-ods`
flinkdimaccountCheckpointPath=`sh /opt/sync/sync_script/flink/lastCheckpointExternalPath.sh dim_account_info`
flinkfbehaviourcdpCheckpointPath=`sh /opt/sync/sync_script/flink/lastCheckpointExternalPath.sh flink-ubt-m4-flow`
flink_homedo_realdata_chk=`sh /opt/sync/sync_script/flink/lastCheckpointExternalPath.sh flow_order_item_detail_real`

ssh root@10.0.**.*** > /dev/null 2>&1 << eeooff

nohup /opt/cloudera/parcels/FLINK/lib/flink/bin/flink run -m yarn-cluster -ynm flink-ubt-ods -c com.homedo.flink.app.Main -s $flinkUbtOdsCheckpointPath /opt/flink_job/flink-ubt-ods-1.0.0.jar > /dev/null 2>&1 &

nohup /opt/cloudera/parcels/FLINK/lib/flink/bin/flink run -m yarn-cluster -ynm flink-dwd -c com.homedo.flink.app.Main /opt/flink_job/flink-dwd-1.0.0.jar > /dev/null 2>&1 &

sh /opt/flink_job/flink-realdata-run.sh

sh /opt/flink_job/flink-realdata-run2.sh

nohup /opt/cloudera/parcels/FLINK/lib/flink/bin/flink run -m yarn-cluster -ynm dim_account_info -s $flinkdimaccountCheckpointPath -c com.homedo.user.app.AsyncJoin /opt/flink_job/dim_account_info-1.0-SNAPSHOT-jar-with-dependencies.jar > /dev/null 2>&1 &

nohup /opt/cloudera/parcels/FLINK/lib/flink/bin/flink run -m yarn-cluster -ynm flink-ubt-m4-flow -s $flinkfbehaviourcdpCheckpointPath -c com.homedo.flink.app.Main /opt/flink_job/flink-ubt-ods-m4-flow-1.0.0-jar-with-dependencies.jar > /opt/flink_job/log/ubt-m4-flow_`date +%Y%m%d_%H%M%S`.log 2>&1 &

nohup /opt/cloudera/parcels/FLINK/lib/flink/bin/flink run -m yarn-cluster -ynm flow_order_item_detail_real -yqu users.flink -s $flink_homedo_realdata_chk -c com.homedo.HmdApplication /opt/flink_job/homedo_realdata-1.0.jar > /opt/flink_job/log/orderitem_detailreal_`date +%Y%m%d_%H%M%S`.log 2>&1 &

exit

eeooff
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
#!/bin/bash

/opt/cloudera/parcels/CDH/bin/yarn application --list > /opt/sync/sync_script/flink/flink-job.txt

cat /opt/sync/sync_script/flink/flink-job.txt | while read line

do

if [[ $line == *"Apache Flink"* ]]

then

array=(${line//'\t'/ })

application_id=${array[0]}

/opt/cloudera/parcels/CDH/bin/yarn application --kill $application_id

rm -f /opt/sync/sync_script/flink/flink-job.txt

fi

done