flink版本:1.13.0 scala版本:2.12.7

在flink处理过程中,我们往往需要关联某些维表来对当前数据进行染色,一般会采用通过jdbc关联查询或redis查询等方法,但效率一般都很差,以redis来讲,在没有进行优化的情况下一般每秒处理数据很难过万,而我现在这个项目每日数据量数百亿,redis很难支持这种量级的查询,之前的做法是使用二级缓存,已减少对redis的查询次数,后面我学习了广播流,感觉这个功能可以通过广播流来做,动态分发配置到有需要的算子中。 broadcast的使用我也是初学,因为数据量不大,所以我是作为变量保存起来的,后期应该需要进行优化,以免占用内存过大。

定义case class:

case class ConfBean(conf1: List[Int], conf2: Map[Int, String])

broadcast代码:

class ConfSource extends RichSourceFunction[ConfBean] {
  private var confDao = _
  private var running = true
  
  override def open(parameters: Configuration): Unit = {
    super.open(parameters)
    if (confDao == null) {
      confDao = new ConfDao()
    }
  }

  override def run(ctx: SourceFunction.SourceContext[ConfBean]): Unit = {
    while (running) {
      val conf1 = confDao.loadConf1()
      val conf2 = confDao.loadConf2()
      ctx.collect(ConfBean(conf1, conf2))
      Thread.sleep(120000)
    }
  }

  override def cancel(): Unit = {
    running = false
    confDao.close()
  }
}

将算子与boradcast进行关联:

val confBroadcast = env.addSource(new ConfSource())
  .name(nodeCfg.nodeName)
  .uid(nodeCfg.nodeName)
  .broadcast(new MapStateDescriptor[String, ConfBean]("conf_broadcast", createTypeInformation[String], createTypeInformation[ConfBean]))

stream
  .keyBy(_.num)
  .connect(confBroadcast)
  .process(new ValueProcess())
  .name(nodeCfg.nodeName)
  .uid(nodeCfg.nodeName)
  .setParallelism(nodeCfg.workerCount)

在process方法中使用broadcast,需要继承 KeyedBroadcastProcessFunction<KS, IN1, IN2, OUT> 抽象类,代码如下:

class ValueProcess(var conf1: List[Int], 
                   var conf2: Map[Int, String]) extends KeyedBroadcastProcessFunction[String, Int, ConfBean, Unit]{

  override def processElement(value: Int, ctx: KeyedBroadcastProcessFunction[String, In, ConfBean, Unit]#ReadOnlyContext, out: Collector[Unit]): Unit = {
    val b = conf1.toSet.contains(value)
    val v = conf2.getOrElse(value, "-1")
    some code .....
  }

  override def processBroadcastElement(value: ConfBean, ctx: KeyedBroadcastProcessFunction[String, Int, ConfBean, Unit]#Context, out: Collector[Unit]): Unit = {
    conf1 = value.conf1
    conf2 = value.conf2
  }
}