目录
- 思路与环境
- 首先配置Kafka信息
- Flink Kafka连接器
- 自定义序列化器
- 完整代码
- 总结
- Flink 1.13.2
- Kafka 2.6.2
思路与环境
从kafka中读取数据 根据逻辑判断分配到不同的topic中去
需要重写Flink Kafka的Key序列化器,并通过加入自己的逻辑主动往指定的topic发送消息。
首先配置Kafka信息
Properties props = new Properties(); props.put("bootstrap.servers","10.116.0.16:9092"); props.put("acks", "all"); props.put("retries", 1); props.put("BATch.size", 16384); props.put("linger.ms", 1); props.put("buffer.memory", 33554432); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put(Producewww.devze.comrConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
注意这里key序列化器和value序列化器都为StringSerializer
Flink Kafka连接器
FlinkKafkaProducer<FlinkJobBO> fkProducer = new FlinkKafkaProducer<>("", new MyKeySerialization(), props, FlinkKafkaProducer.Semantic.EXACTLY_ONCE);
其中MyKeySerialization便是重写的key序列化器
自定义序列化器
public class MyKeySerialization implements KafkaSerializationSchema<FlinkJobBO> { String topic; public MyKeySerialization(String topic){ this.topic = topic; } public MyKeySerialization(){ } // 注意:都是byte[]类型,所以我们要重新指定新的序列化器 @Override public ProducerRecord<byte[], byte[]> serialize(FlinkJobBO flinkJobBO, @Nullable Long aLong) { // 根据自身的逻辑条件 jsonUtils.setObjectMapper(new ObjectMapper()); if("1".equals(flinkJobBO.getApiModel())){ // 动态生成topic return new ProducerRecord<>("topic-"+flinkJobBO.getGroupId(), JsonUtils.toJson(flinkJobBO).getBytes(StandardCharsets.UTF_8)); } return http://www.devze.comnew ProducerRecord<>("", "".getBytes(StandardCharsets.UTF_8)); } }
需要把
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
替换为
props.put(ProducerCo编程nfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
完整代码
// 创建Flink Stream执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); // 1、设置默认topic String TOPIC = "TEST"; // 2. 从kafka获取流数据 Properties props = new Properties(); props.put("bootstrap.servers","10.116.0.16:9092"); props.put("acks", "all"); props.put("retries", 1); props.put("batch.size", 16384); props.put("linger.ms", 1); props.put("buffer.memory", 33js554432); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIhttp://www.devze.comG, "org.apache.kafka.common.serialization.StringSerializer"); // 从kafka中消费数据 DataStreamSource<String> kafkaDataStream = env.addSource(new FlinkKafkaConsumer<>(TOPIC, new SimpleStringSchema(), props)); // 3. 针对流做处理 把string转成bo 主流 DataStream<FlinkJobBO> ds = kafkaDataStream .map((MapFunction<String, FlinkJobBO>) s -> JsonUtils.toBean(s, FlinkJobBO.class)); // 修改value序列化器 props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer"); // 4.1.1 自定义序列化器 分配topic ***** FlinkKafkaProducer<FlinkJobBO> fkProducer = new FlinkKafkaProducer<>("", new MyKeySerialization(), props, FlinkKafkaProducer.Semantic.EXACTLY_ONCE); fkProducer.setLogFailuresOnly(false); ds.addSink(fkProducer); env.execute();
总结
以上为个人经验,希望能给大家一个参考,也希望大家多多支持编程客栈(www.devze.com)。
精彩评论