flink版本:1.13.0 scala版本:2.12.7
在flink处理过程中,我们往往需要关联某些维表来对当前数据进行染色,一般会采用通过jdbc关联查询或redis查询等方法,但效率一般都很差,以redis来讲,在没有进行优化的情况下一般每秒处理数据很难过万,而我现在这个项目每日数据量数百亿,redis很难支持这种量级的查询,之前的做法是使用二级缓存,已减少对redis的查询次数,后面我学习了广播流,感觉这个功能可以通过广播流来做,动态分发配置到有需要的算子中。 broadcast的使用我也是初学,因为数据量不大,所以我是作为变量保存起来的,后期应该需要进行优化,以免占用内存过大。
定义case class:
case class ConfBean(conf1: List[Int], conf2: Map[Int, String])
broadcast代码:
class ConfSource extends RichSourceFunction[ConfBean] {
private var confDao = _
private var running = true
override def open(parameters: Configuration): Unit = {
super.open(parameters)
if (confDao == null) {
confDao = new ConfDao()
}
}
override def run(ctx: SourceFunction.SourceContext[ConfBean]): Unit = {
while (running) {
val conf1 = confDao.loadConf1()
val conf2 = confDao.loadConf2()
ctx.collect(ConfBean(conf1, conf2))
Thread.sleep(120000)
}
}
override def cancel(): Unit = {
running = false
confDao.close()
}
}
将算子与boradcast进行关联:
val confBroadcast = env.addSource(new ConfSource())
.name(nodeCfg.nodeName)
.uid(nodeCfg.nodeName)
.broadcast(new MapStateDescriptor[String, ConfBean]("conf_broadcast", createTypeInformation[String], createTypeInformation[ConfBean]))
stream
.keyBy(_.num)
.connect(confBroadcast)
.process(new ValueProcess())
.name(nodeCfg.nodeName)
.uid(nodeCfg.nodeName)
.setParallelism(nodeCfg.workerCount)
在process方法中使用broadcast,需要继承 KeyedBroadcastProcessFunction<KS, IN1, IN2, OUT> 抽象类,代码如下:
class ValueProcess(var conf1: List[Int],
var conf2: Map[Int, String]) extends KeyedBroadcastProcessFunction[String, Int, ConfBean, Unit]{
override def processElement(value: Int, ctx: KeyedBroadcastProcessFunction[String, In, ConfBean, Unit]#ReadOnlyContext, out: Collector[Unit]): Unit = {
val b = conf1.toSet.contains(value)
val v = conf2.getOrElse(value, "-1")
some code .....
}
override def processBroadcastElement(value: ConfBean, ctx: KeyedBroadcastProcessFunction[String, Int, ConfBean, Unit]#Context, out: Collector[Unit]): Unit = {
conf1 = value.conf1
conf2 = value.conf2
}
}