flink版本:1.14.4
scala版本:2.12.7

前段时间将flink由1.13.0版本升级到了1.14.4版本,注意到kafka连接器发生了变化,新增了KafkaSource和KafkaSink两个新版API,于是打算写一篇文章,简单记录一下这两个新版API的使用方法。首先是常用的序列化方式,分别是:SimpleStringSchemaTypeInformationSerializationSchema以及自定义序列化方式。其中自定义序列化方式以protostuff格式消息为例。

序列化类(SerializationSchema)

1. SimpleStringSchema

SimpleStringSchema是非常简单的字符串序列化模式,可以解析String类型的kafka消息,没有什么可以配置的地方。默认情况下,它使用UTF-8进行字符串/字节转换,也可以通过构造方法传入Charset,使用其他类型的字符集。

// 默认UTF-8字符集
val ss = new SimpleStringSchema()
// 使用GBK字符集
val ssWithGbk = new SimpleStringSchema(Charset.forName("GBK"))

2. TypeInformationSerializationSchema

TypeInformation是flink内置的类型,也是flink类型系统的核心,有着很高效的序列化、反序列化效率。在flink之间传输数据的场景下
推荐使用这种序列化方式。但是要注意,不是所有的对象都可以正确序列化,对于不支持的类型,将会退化为Kryo序列化,可以参考官方文档:flinks-typeinformation-class
使用方式:

case class MyBean(id: Long, name: String)

// createTypeInformation方法依赖scala的隐式方法,需要导入flink的scala包
import org.apache.flink.streaming.api.scala._
// 创建Schema需要flink Environment配置
val env = StreamExecutionEnvironment.getExecutionEnvironment
// 对于scala,因为保留了运行时的泛型信息,所以可以很简单的获取类型信息。
val tss = new TypeInformationSerializationSchema(createTypeInformation[MyBean], env.getConfig)

// 对于java,因为泛型擦除的存在,所以flink提供了TypeHint类,用于获取类型信息。
TypeInformationSerializationSchema tss = new TypeInformationSerializationSchema(TypeInformation.of(new TypeHint<MyBean>(){}), env.getConfig)

3. 自定义序列化方式(以protostuff为例)

当消息格式不是String、json等简单类型,且flink或kafka官方没有提供合适的序列化方式时,可以考虑自定义序列化类。只需要继承KafkaRecordDeserializationSchema或者KafkaRecordSerializationSchema,就可以很简单的创建一个自定义的序列化、反序列化类。代码如下:

// 反序列化
class MessageDeserializer extends KafkaRecordDeserializationSchema[MyBean]{
  //创建protostuff的Schema,因为protostuff需要空参构造方法,这里用的是javabean,不是上面的case class
  private var schema: Schema[MyBean] = RuntimeSchema.getSchema(classOf[MyBean])
  //获取类型信息
  override def getProducedType: TypeInformation[MyBean] = createTypeInformation[MyBean]

  //反序列化
  override def deserialize(record: ConsumerRecord[Array[Byte], Array[Byte]], out: Collector[MyBean]): Unit = {
    val value = schema.newMessage()
    ProtobufIOUtil.mergeFrom(record.value(), value, schema)
    //输出结果
    out.collect(value)
  }
}

// 序列化
class MessageDeserializer extends KafkaRecordSerializationSchema[MyBean]{
  //创建protostuff的Schema,因为protostuff需要空参构造方法,这里用的是javabean,不是上面例子的case class
  private var schema: Schema[MyBean] = RuntimeSchema.getSchema(classOf[MyBean])
  
    override def serialize(element: MyBean, context: KafkaRecordSerializationSchema.KafkaSinkContext,
                         timestamp: lang.Long): ProducerRecord[Array[Byte], Array[Byte]] = {
    val linkedBuffer = LinkedBuffer.allocate(1024 * 4)
    ProtobufIOUtil.toByteArray(element, schema, linkedBuffer)
  }
}

KafkaSource

以代码为例
setTopics:设置要读取的topic名称可以支持单个topic,也支持多个或正则,如:setTopics(“topic-a”, “topic-b”), setTopicPattern("topic.* ")
setValueOnlyDeserializer: 设置value的序列化方式,可以使用上面的三种或其他的序列化类。
setProperties:传入Properties文件,传入自定义配置信息,如安全认证信息等
setStartingOffsets:设置起始偏移量,支持最早、最新、指定时间戳、指定偏移量等。

val source: KafkaSource[MyBean] = KafkaSource.builder()
  .setBootstrapServers(bootstrapServer)
  .setTopics(topic)
  .setGroupId(groupId)
  .setValueOnlyDeserializer(new TypeInformationSerializationSchema(createTypeInformation[MyBean], env.getConfig))
  .setProperties(kafkaProperties)
  .setStartingOffsets(OffsetsInitializer.latest())
  .build()

KafkaSink

KafkaSink与KafkaSource基本一致,需要注意的就是多了一个setDeliverGuarantee配置项,用于设置精准一次、至少一次等,其使用kafka事务实现,使用前需确认平台上的kafka是否支持事务,以及消费者是否开启事物的READ_COMMITED配置。

val sink = KafkaSink.builder()
  .setBootstrapServers(serverConfig.bootstrapServer)
  .setDeliverGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
  .setKafkaProducerConfig(kafkaProperties)
  .setRecordSerializer(KafkaRecordSerializationSchema.builder()
    .setTopic(serverConfig.userDurTopic)
    .setValueSerializationSchema(new TypeInformationSerializationSchema(createTypeInformation[MyBean], env.getConfig))
    .build())
  .build()