Flink Side Output

Flink Side Output 代码示例

示例背景为用户型为分析数据清洗时,使用 Side Output 进行 event 数据、profile 数据的分离


代码片段

1
2
3
// sideoutput 处理 profile数据
SingleOutputStreamOperator<JSONObject> dataMap = ds.process(new ProfileSideoutputProcessFunction());
SingleOutputStreamOperator<JSONObject> profiledata = dataMap.getSideOutput(new OutputTag<JSONObject>("PROFILE") {});
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
package com.xxx.flink.etl.sideoutput_function;

import com.alibaba.fastjson.JSONObject;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;

public class ProfileSideoutputProcessFunction extends ProcessFunction<JSONObject, JSONObject> {
@Override
public void processElement(JSONObject value, Context ctx, Collector<JSONObject> out) throws Exception {
// 分流操作,xwhat的事件ID为profile类型的,侧输出到profile流中
String xwhat = value.getString("xwhat");
if ("$profile_set".equals(xwhat) ||
"$profile_set_once".equals(xwhat) ||
"$profile_unset".equals(xwhat) ||
"$profile_increment".equals(xwhat) ||
"$profile_append".equals(xwhat) ||
"$profile_delete".equals(xwhat)) {
ctx.output(new OutputTag<JSONObject>("PROFILE") {
}, value);
} else {
out.collect(value);
}
}
}