flink版本:1.13.0
scala版本:2.12.7

在flink处理过程中,我们往往需要关联某些维表来对当前数据进行染色,一般会采用通过jdbc关联查询或redis查询等方法,但效率一般都很差,以redis来讲,在没有进行优化的情况下一般每秒处理数据很难过万,而我现在这个项目每日数据量数百亿,redis很难支持这种量级的查询,之前的做法是使用二级缓存,已减少对redis的查询次数,后面我学习了广播流,感觉这个功能可以通过广播流来做,动态分发配置到有需要的算子中。
broadcast的使用我也是初学,因为数据量不大,所以我是作为变量保存起来的,后期应该需要进行优化,以免占用内存过大。

broadcast代码:

class OrderConfSource extends RichSourceFunction[Map[String, AnyRef]] {
  private var running = true

  override def run(ctx: SourceFunction.SourceContext[Map[String, AnyRef]]): Unit = {
    while (running) {
      val conf = loadConf();
      ctx.collect(conf)
      Thread.sleep(120000)
    }
  }

  override def cancel(): Unit = {
    running = false
  }
}

将算子与boradcast进行关联:

val orderConfBroadcast = env.addSource(new OrderConfSource())
  .name(nodeCfg.nodeName)
  .uid(nodeCfg.nodeName)
  .broadcast(new MapStateDescriptor[String, Map[String, AnyRef]]("order_conf_broadcast", createTypeInformation[String], 
    createTypeInformation[Map[String, AnyRef]]))

val orderStream = stayStream
  .keyBy(_.num)
  .connect(orderConfBroadcast)
  .process(new OrderFilterProcess())
  .name(nodeCfg.nodeName)
  .uid(nodeCfg.nodeName)
  .setParallelism(nodeCfg.workerCount)

在process方法中使用broadcast,需要继承 KeyedBroadcastProcessFunction<KS, IN1, IN2, OUT> 抽象类,代码如下:

class OrderFilterProcess extends KeyedBroadcastProcessFunction[String, In, Map[String, AnyRef], Out]{

  var orderInfo: Map[String, AnyRef] = _

  override def processElement(value: In, ctx: KeyedBroadcastProcessFunction[String, In, Map[String, AnyRef], Out]#ReadOnlyContext, out: Collector[Out]): Unit = {
    some code .....
  }

  override def processBroadcastElement(value: Map[String, AnyRef], ctx: KeyedBroadcastProcessFunction[String, In, Map[String, AnyRef], Out]#Context, out: Collector[Out]): Unit = {
    if (value != null) {
      orderInfo = value
    }
  }
}

Q.E.D.