flink版本:1.14.4
scala版本:2.12.7
前段时间将flink由1.13.0版本升级到了1.14.4版本,注意到kafka连接器发生了变化,新增了KafkaSource和KafkaSink两个新版API,于是打算写一篇文章,简单记录一下这两个新版API的使用方法。首先是常用的序列化方式,分别是:SimpleStringSchema、TypeInformationSerializationSchema以及自定义序列化方式。其中自定义序列化方式以protostuff格式消息为例。
序列化类(SerializationSchema)
1. SimpleStringSchema
SimpleStringSchema是非常简单的字符串序列化模式,可以解析String类型的kafka消息,没有什么可以配置的地方。默认情况下,它使用UTF-8进行字符串/字节转换,也可以通过构造方法传入Charset,使用其他类型的字符集。
// 默认UTF-8字符集
val ss = new SimpleStringSchema()
// 使用GBK字符集
val ssWithGbk = new SimpleStringSchema(Charset.forName("GBK"))
2. TypeInformationSerializationSchema
TypeInformation是flink内置的类型,也是flink类型系统的核心,有着很高效的序列化、反序列化效率。在flink之间传输数据的场景下
推荐使用这种序列化方式。但是要注意,不是所有的对象都可以正确序列化,对于不支持的类型,将会退化为Kryo序列化,可以参考官方文档:flinks-typeinformation-class
使用方式:
case class MyBean(id: Long, name: String)
// createTypeInformation方法依赖scala的隐式方法,需要导入flink的scala包
import org.apache.flink.streaming.api.scala._
// 创建Schema需要flink Environment配置
val env = StreamExecutionEnvironment.getExecutionEnvironment
// 对于scala,因为保留了运行时的泛型信息,所以可以很简单的获取类型信息。
val tss = new TypeInformationSerializationSchema(createTypeInformation[MyBean], env.getConfig)
// 对于java,因为泛型擦除的存在,所以flink提供了TypeHint类,用于获取类型信息。
TypeInformationSerializationSchema tss = new TypeInformationSerializationSchema(TypeInformation.of(new TypeHint<MyBean>(){}), env.getConfig)
3. 自定义序列化方式(以protostuff为例)
当消息格式不是String、json等简单类型,且flink或kafka官方没有提供合适的序列化方式时,可以考虑自定义序列化类。只需要继承KafkaRecordDeserializationSchema或者KafkaRecordSerializationSchema,就可以很简单的创建一个自定义的序列化、反序列化类。代码如下:
// 反序列化
class MessageDeserializer extends KafkaRecordDeserializationSchema[MyBean]{
//创建protostuff的Schema,因为protostuff需要空参构造方法,这里用的是javabean,不是上面的case class
private var schema: Schema[MyBean] = RuntimeSchema.getSchema(classOf[MyBean])
//获取类型信息
override def getProducedType: TypeInformation[MyBean] = createTypeInformation[MyBean]
//反序列化
override def deserialize(record: ConsumerRecord[Array[Byte], Array[Byte]], out: Collector[MyBean]): Unit = {
val value = schema.newMessage()
ProtobufIOUtil.mergeFrom(record.value(), value, schema)
//输出结果
out.collect(value)
}
}
// 序列化
class MessageDeserializer extends KafkaRecordSerializationSchema[MyBean]{
//创建protostuff的Schema,因为protostuff需要空参构造方法,这里用的是javabean,不是上面例子的case class
private var schema: Schema[MyBean] = RuntimeSchema.getSchema(classOf[MyBean])
override def serialize(element: MyBean, context: KafkaRecordSerializationSchema.KafkaSinkContext,
timestamp: lang.Long): ProducerRecord[Array[Byte], Array[Byte]] = {
val linkedBuffer = LinkedBuffer.allocate(1024 * 4)
ProtobufIOUtil.toByteArray(element, schema, linkedBuffer)
}
}
KafkaSource
以代码为例
setTopics:设置要读取的topic名称可以支持单个topic,也支持多个或正则,如:setTopics(“topic-a”, “topic-b”), setTopicPattern("topic.* ")
setValueOnlyDeserializer: 设置value的序列化方式,可以使用上面的三种或其他的序列化类。
setProperties:传入Properties文件,传入自定义配置信息,如安全认证信息等
setStartingOffsets:设置起始偏移量,支持最早、最新、指定时间戳、指定偏移量等。
val source: KafkaSource[MyBean] = KafkaSource.builder()
.setBootstrapServers(bootstrapServer)
.setTopics(topic)
.setGroupId(groupId)
.setValueOnlyDeserializer(new TypeInformationSerializationSchema(createTypeInformation[MyBean], env.getConfig))
.setProperties(kafkaProperties)
.setStartingOffsets(OffsetsInitializer.latest())
.build()
KafkaSink
KafkaSink与KafkaSource基本一致,需要注意的就是多了一个setDeliverGuarantee配置项,用于设置精准一次、至少一次等,其使用kafka事务实现,使用前需确认平台上的kafka是否支持事务,以及消费者是否开启事物的READ_COMMITED配置。
val sink = KafkaSink.builder()
.setBootstrapServers(serverConfig.bootstrapServer)
.setDeliverGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
.setKafkaProducerConfig(kafkaProperties)
.setRecordSerializer(KafkaRecordSerializationSchema.builder()
.setTopic(serverConfig.userDurTopic)
.setValueSerializationSchema(new TypeInformationSerializationSchema(createTypeInformation[MyBean], env.getConfig))
.build())
.build()