Flink Side Output 代码示例
示例背景为用户型为分析数据清洗时,使用 Side Output 进行 event 数据、profile 数据的分离
代码片段
1 2 3
| SingleOutputStreamOperator<JSONObject> dataMap = ds.process(new ProfileSideoutputProcessFunction()); SingleOutputStreamOperator<JSONObject> profiledata = dataMap.getSideOutput(new OutputTag<JSONObject>("PROFILE") {});
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| package com.xxx.flink.etl.sideoutput_function;
import com.alibaba.fastjson.JSONObject; import org.apache.flink.streaming.api.functions.ProcessFunction; import org.apache.flink.util.Collector; import org.apache.flink.util.OutputTag;
public class ProfileSideoutputProcessFunction extends ProcessFunction<JSONObject, JSONObject> { @Override public void processElement(JSONObject value, Context ctx, Collector<JSONObject> out) throws Exception { String xwhat = value.getString("xwhat"); if ("$profile_set".equals(xwhat) || "$profile_set_once".equals(xwhat) || "$profile_unset".equals(xwhat) || "$profile_increment".equals(xwhat) || "$profile_append".equals(xwhat) || "$profile_delete".equals(xwhat)) { ctx.output(new OutputTag<JSONObject>("PROFILE") { }, value); } else { out.collect(value); } } }
|