开发者

kafka-console-consumer.sh使用2次grep管道无法提取消息的解决

开发者 https://www.devze.com 2023-03-08 10:23 出处:网络 作者: Geoffrey Turing
目录问题怀疑分析验证生产消息消费时两次grep则无消息消费时一次grep则都出来结论解php决最后问题
目录
  • 问题
  • 怀疑
  • 分析
  • 验证
    • 生产消息
    • 消费时两次grep则无消息
    • 消费时一次grep则都出来
  • 结论
    • php
      • 最后

        问题

        在使用kafka自带脚本消费时,想用grep关键词过滤出来想要的信息

        ./kafka-console-consumer.sh \
        --bootstrap-server node1:9092,node2:9092,node3:9092 \
        --topic test \
        --from-beginning \
        --consumer-property group.id=gaofeng_test\
         | grep  "b786aba6f3a6" | grep 388aabd
        

        但是发现grep命令在这里有问题,无法取到想要消息

        kafka-console-consumer.sh使用2次grep管道无法提取消息的解决

        已经消费到latest了,都没有提取到想要的msg

        这里确认了producer端已经发送成功了,回调函数里都没有任何异常

        怀疑

        怀疑是否kafka存在丢数据的情况

        排查问题后发现kafka集群一切正常,性能也很强劲

        并且在存储消息的日志文件中也已经查询到了我想要的消息了,为什么就是消费不出来呢

        分析

        将kafk编程a的这个主题 --from-beginning 消费到文件topic.log中去

        对topic.log这个静态文件进行grep “b786aba6f3a6” | grep 388aabd抽取,发现取到了自己想要的这条消息

        kafka消费时结果是动态的,grep这里用法不对,只有对静态的文件可以抽取到数据

        --line-buffered

                 Force output to be line buffered.  By defaulwww.devze.comt, output is line buffered when standard output is

                 a terminal and block buffered otherwise

        上面的意思是

        • 强制输出结果使用行缓冲
        • 默认情况下,如果标准输入时终端,则使用line bufferred
        • 否则,使用块缓冲,(默认的大小为4096 bytes,因系统和配置而异)

        所以,这也就解释了为什么双重grep过滤没有内容,因为没有达到块缓冲限制。

        验证

        生产消息

        a,b,c

        kafka-console-consumer.sh使用2次grep管道无法提取消息的解决

        消费时两次grep则无消息

        kafka-console-consumer.sh使用2次grep管道无法提取消息的解决

        消费时一次grep则都出来

        kafka-console-consumer.sh使用2次grep管道无法提取消息的解决

        结论

        • 刚刚/kafka-console-consumer.sh第一次grep过滤出来的的消息小于4k了,所以传给第二次grehttp://www.devze.comp是不可以直接输出出来的,因为在缓冲区
        • 如果第一次grep出来的数据大于4k,传给第二个grep时则不会缓冲着,第二个grep收到就立刻传入到terminal输出了
        • 反之第一次grep过滤出来的的消息大于还是小于4k,一旦后面没有管道符号,直接输出给terminal的话,则遵循line bufferred规则立马输出

        解决

        使用如下命令即可

        ./ka开发者_Python教程fka-console-consumer.sh \
        --bootstrap-server node1:9092,node2:9092,node3:9092 \
        --topic test \
        --from-beginning \
        --consumer-property group.id=gaofeng_test\
         | grphpep --line-buffered "b786aba6f3a6" | grep 388aabd

        【注意】

        这里用了grep 的 --line-buffered参数

        具体用法

        查阅https://www.jb51.net/article/277299.htm

        最后

        以上为个人经验,希望能给大家一个参考,也希望大家多多支持我们。

        0

        精彩评论

        暂无评论...
        验证码 换一张
        取 消