flink版本: 1.13.0
scala版本: 2.12.7

开发的flink项目使用Streaming File Sink写入hdfs,最近接到了文件需要每五分钟分一个桶的需求,而flink默认的分桶策略无法支持这种需求,无奈之下只好自己写分桶策略。
该策略为使用当前日期小时再拼上整五分钟做为一个桶,每个桶保存了下五分钟的数据。
BucketAssigner代码如下:

class HdfsBucketAssigner extends BucketAssigner[String, String]{
  /**
   * 获取桶分区, 每五分钟一个桶
   *
   * @param element String
   * @param context BucketAssigner.Context
   * @return
   */
  override def getBucketId(element: String, context: BucketAssigner.Context): String = {
    val instant = Instant.ofEpochMilli(context.currentProcessingTime)
    val fullDate = HdfsBucketAssigner.getFormatter.format(instant)

    val mi = fullDate.substring(10).toInt
    val statDate = fullDate.substring(0, 10)
    //获取分钟分区
    val deal = mi match {
      case x if 0 until 5 contains x => "00"
      case x if 5 until 10 contains x => "05"
      case x if 10 until 15 contains x => "10"
      case x if 15 until 20 contains x => "15"
      case x if 20 until 25 contains x => "20"
      case x if 25 until 30 contains x => "25"
      case x if 30 until 35 contains x => "30"
      case x if 35 until 40 contains x => "35"
      case x if 40 until 45 contains x => "40"
      case x if 45 until 50 contains x => "45"
      case x if 50 until 55 contains x => "50"
      case x if 55 until 60 contains x => "55"
    }
    statDate + deal
  }

  override def getSerializer: SimpleVersionedSerializer[String] = {
    SimpleVersionedStringSerializer.INSTANCE
  }
}

object HdfsBucketAssigner {
  @transient private var formatter: DateTimeFormatter = _

  def getFormatter: DateTimeFormatter = {
    if (formatter == null) {
      formatter = DateTimeFormatter.ofPattern("yyyyMMddHHmm").withZone(ZoneId.of("Asia/Shanghai"))
    }
    formatter
  }
}

使用方法:

val defaultRollingPolicy: DefaultRollingPolicy[String, String] = DefaultRollingPolicy.builder()
  //30秒没有写入即滚动
  .withInactivityInterval(TimeUnit.SECONDS.toMillis(30))
  //超过5分钟即滚动
  .withRolloverInterval(TimeUnit.MINUTES.toMillis(5))
  //文件大小超过1G即滚动
  .withMaxPartSize(1024L * 1024L * 128L)
  .build()
val outputConfig: OutputFileConfig = OutputFileConfig.builder()
  //生成文件的前缀
  .withPartPrefix(hdfsConfig.hdfsFilePrefix)
  //生成文件的后缀
  .withPartSuffix(hdfsConfig.hdfsFileExtension)
  .build()

val sink: StreamingFileSink[String] = StreamingFileSink
  //设置文件路径,以及文件中的编码格式
  .forRowFormat(new Path(hdfsConfig.hdfsUrl + hdfsConfig.hdfsPath), new SimpleStringEncoder[String]("UTF-8"))
  //设置分桶策略
  .withBucketAssigner(new HdfsBucketAssigner)
  //设置文件滚动条件
  .withRollingPolicy(defaultRollingPolicy)
  //设置桶检查间隔
  .withBucketCheckInterval(30L * 1000L)
  .withOutputFileConfig(outputConfig)
  .build()

resultStream.addSink(sink)
  .name(nodeCfg.nodeName)
  .uid(nodeCfg.nodeName)
  .setParallelism(nodeCfg.workerCount)