flink版本:1.13.0
scala版本:2.12.7
在flink处理过程中,我们往往需要关联某些维表来对当前数据进行染色,一般会采用通过jdbc关联查询或redis查询等方法,但效率一般都很差,以redis来讲,在没有进行优化的情况下一般每秒处理数据很难过万,而我现在这个项目每日数据量数百亿,redis很难支持这种量级的查询,之前的做法是使用二级缓存,已减少对redis的查询次数,后面我学习了广播流,感觉这个功能可以通过广播流来做,动态分发配置到有需要的算子中。
broadcast的使用我也是初学,因为数据量不大,所以我是作为变量保存起来的,后期应该需要进行优化,以免占用内存过大。
broadcast代码:
class OrderConfSource extends RichSourceFunction[Map[String, AnyRef]] {
private var running = true
override def run(ctx: SourceFunction.SourceContext[Map[String, AnyRef]]): Unit = {
while (running) {
val conf = loadConf();
ctx.collect(conf)
Thread.sleep(120000)
}
}
override def cancel(): Unit = {
running = false
}
}
将算子与boradcast进行关联:
val orderConfBroadcast = env.addSource(new OrderConfSource())
.name(nodeCfg.nodeName)
.uid(nodeCfg.nodeName)
.broadcast(new MapStateDescriptor[String, Map[String, AnyRef]]("order_conf_broadcast", createTypeInformation[String],
createTypeInformation[Map[String, AnyRef]]))
val orderStream = stayStream
.keyBy(_.num)
.connect(orderConfBroadcast)
.process(new OrderFilterProcess())
.name(nodeCfg.nodeName)
.uid(nodeCfg.nodeName)
.setParallelism(nodeCfg.workerCount)
在process方法中使用broadcast,需要继承 KeyedBroadcastProcessFunction<KS, IN1, IN2, OUT> 抽象类,代码如下:
class OrderFilterProcess extends KeyedBroadcastProcessFunction[String, In, Map[String, AnyRef], Out]{
var orderInfo: Map[String, AnyRef] = _
override def processElement(value: In, ctx: KeyedBroadcastProcessFunction[String, In, Map[String, AnyRef], Out]#ReadOnlyContext, out: Collector[Out]): Unit = {
some code .....
}
override def processBroadcastElement(value: Map[String, AnyRef], ctx: KeyedBroadcastProcessFunction[String, In, Map[String, AnyRef], Out]#Context, out: Collector[Out]): Unit = {
if (value != null) {
orderInfo = value
}
}
}