flink版本: 1.13.0
scala版本: 2.12.7
开发的flink项目使用Streaming File Sink写入hdfs,最近接到了文件需要每五分钟分一个桶的需求,而flink默认的分桶策略无法支持这种需求,无奈之下只好自己写分桶策略。
该策略为使用当前日期小时再拼上整五分钟做为一个桶,每个桶保存了下五分钟的数据。
BucketAssigner代码如下:
class HdfsBucketAssigner extends BucketAssigner[String, String]{
/**
* 获取桶分区, 每五分钟一个桶
*
* @param element String
* @param context BucketAssigner.Context
* @return
*/
override def getBucketId(element: String, context: BucketAssigner.Context): String = {
val instant = Instant.ofEpochMilli(context.currentProcessingTime)
val fullDate = HdfsBucketAssigner.getFormatter.format(instant)
val mi = fullDate.substring(10).toInt
val statDate = fullDate.substring(0, 10)
//获取分钟分区
val deal = mi match {
case x if 0 until 5 contains x => "00"
case x if 5 until 10 contains x => "05"
case x if 10 until 15 contains x => "10"
case x if 15 until 20 contains x => "15"
case x if 20 until 25 contains x => "20"
case x if 25 until 30 contains x => "25"
case x if 30 until 35 contains x => "30"
case x if 35 until 40 contains x => "35"
case x if 40 until 45 contains x => "40"
case x if 45 until 50 contains x => "45"
case x if 50 until 55 contains x => "50"
case x if 55 until 60 contains x => "55"
}
statDate + deal
}
override def getSerializer: SimpleVersionedSerializer[String] = {
SimpleVersionedStringSerializer.INSTANCE
}
}
object HdfsBucketAssigner {
@transient private var formatter: DateTimeFormatter = _
def getFormatter: DateTimeFormatter = {
if (formatter == null) {
formatter = DateTimeFormatter.ofPattern("yyyyMMddHHmm").withZone(ZoneId.of("Asia/Shanghai"))
}
formatter
}
}
使用方法:
val defaultRollingPolicy: DefaultRollingPolicy[String, String] = DefaultRollingPolicy.builder()
//30秒没有写入即滚动
.withInactivityInterval(TimeUnit.SECONDS.toMillis(30))
//超过5分钟即滚动
.withRolloverInterval(TimeUnit.MINUTES.toMillis(5))
//文件大小超过1G即滚动
.withMaxPartSize(1024L * 1024L * 128L)
.build()
val outputConfig: OutputFileConfig = OutputFileConfig.builder()
//生成文件的前缀
.withPartPrefix(hdfsConfig.hdfsFilePrefix)
//生成文件的后缀
.withPartSuffix(hdfsConfig.hdfsFileExtension)
.build()
val sink: StreamingFileSink[String] = StreamingFileSink
//设置文件路径,以及文件中的编码格式
.forRowFormat(new Path(hdfsConfig.hdfsUrl + hdfsConfig.hdfsPath), new SimpleStringEncoder[String]("UTF-8"))
//设置分桶策略
.withBucketAssigner(new HdfsBucketAssigner)
//设置文件滚动条件
.withRollingPolicy(defaultRollingPolicy)
//设置桶检查间隔
.withBucketCheckInterval(30L * 1000L)
.withOutputFileConfig(outputConfig)
.build()
resultStream.addSink(sink)
.name(nodeCfg.nodeName)
.uid(nodeCfg.nodeName)
.setParallelism(nodeCfg.workerCount)