Kafka

    介绍

    官网:http://kafka.apache.org/

    Apache Kafka是分布式发布-订阅消息系统。它最初由LinkedIn公司开发,之后成为Apache项目的一部分。Kafka是一种快速、可扩展的、设计内在就是分布式的,分区的和可复制的提交日志服务。

    Apache Kafka与传统消息系统相比,有以下不同:

    • 它被设计为一个分布式系统,易于向外扩展;
    • 它同时为发布和订阅提供高吞吐量;
    • 它支持多订阅者,当失败时能自动平衡消费者;
    • 它将消息持久化到磁盘,因此可用于批量消费,例如ETL,以及实时应用程序。

    Kafka 安装

    下载

    https://kafka.apache.org/downloads

    mkdir -p /storage/app/kafka && cd storage/app/kafka
    wget https://mirrors.gigenet.com/apache/kafka/2.7.0/kafka_2.13-2.7.0.tgz
    tar xvf kafka_2.13-2.7.0.tgz
    

    修改配置

    vim /storage/app/kafka/kafka_2.13-2.7.0/config/server.properties

    broker.id=1
    log.dirs=/storage/app/kafka/kafka_2.13-2.7.0/logs
    

    需要注意的是 kafka 依赖 zookeeper, 建议使用外部 zk, 但也支持使用内置的 zk

    启动

    使用安装包中的脚本启动单节点 Zookeeper 实例:

    /storage/app/kafka/kafka_2.13-2.7.0/bin/zookeeper-server-start.sh\
    -daemon /storage/app/kafka/kafka_2.13-2.7.0/config/zookeeper.properties
    

    启动 kafka

    /storage/app/kafka/kafka_2.13-2.7.0/bin/kafka-server-start.sh\
     /storage/app/kafka/kafka_2.13-2.7.0/config/server.properties
    

    配置 kafka 到 systemd

    新建配置 vim /etc/systemd/system/kafka.service

    [Unit]
    Description=kafka
    Documentation=kafka
    After=network-online.target
    Wants=network-online.target
    
    [Service]
    Environment=" "
    EnvironmentFile=-/etc/default/%p
    ExecStart=/storage/app/kafka/kafka_2.13-2.7.0/bin/kafka-server-start.sh /storage/app/kafka/kafka_2.13-2.7.0/config/server.properties
    ExecStop=/bin/kill -HUP $MAINPID
    Restart=on-failure
    ExecReload=/bin/kill -HUP $MAINPID
    Restart=on-failure
    
    KillSignal=SIGINT
    
    [Install]
    WantedBy=multi-user.target
    
    

    重载 systemd

    systemctl daemon-reload
    

    启动 kafka

    systemctl start kafka.service 
    

    查看 kafka 状态

    systemctl status kafka.service 
    
    root@jansora-Vostro-3669:/storage/app/kafka/kafka_2.13-2.7.0/config# systemctl status kafka.service 
    ● kafka.service - kafka
         Loaded: loaded (/etc/systemd/system/kafka.service; disabled; vendor preset: enabled)
         Active: active (running) since Tue 2021-01-19 12:27:40 CST; 4s ago
       Main PID: 1061890 (java)
          Tasks: 74 (limit: 19037)
         Memory: 323.4M
         CGroup: /system.slice/kafka.service
                 └─1061890 java -Xmx1G -Xms1G -server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent -XX:MaxInlineL>
    
    119 12:27:43 jansora-Vostro-3669 kafka-server-start.sh[1061890]: [2021-01-19 12:27:43,115] INFO [ExpirationReaper-1-AlterAcls]: Starting (kafka.server.DelayedOperationPu>
    119 12:27:43 jansora-Vostro-3669 kafka-server-start.sh[1061890]: [2021-01-19 12:27:43,144] INFO [/config/changes-event-process-thread]: Starting (kafka.common.ZkNodeChan>
    119 12:27:43 jansora-Vostro-3669 kafka-server-start.sh[1061890]: [2021-01-19 12:27:43,172] INFO [SocketServer brokerId=1] Starting socket server acceptors and processors>
    119 12:27:43 jansora-Vostro-3669 kafka-server-start.sh[1061890]: [2021-01-19 12:27:43,199] INFO [SocketServer brokerId=1] Started data-plane acceptor and processor(s) fo>
    119 12:27:43 jansora-Vostro-3669 kafka-server-start.sh[1061890]: [2021-01-19 12:27:43,199] INFO [SocketServer brokerId=1] Started socket server acceptors and processors >
    119 12:27:43 jansora-Vostro-3669 kafka-server-start.sh[1061890]: [2021-01-19 12:27:43,202] INFO Kafka version: 2.7.0 (org.apache.kafka.common.utils.AppInfoParser)
    119 12:27:43 jansora-Vostro-3669 kafka-server-start.sh[1061890]: [2021-01-19 12:27:43,202] INFO Kafka commitId: 448719dc99a19793 (org.apache.kafka.common.utils.AppInfoPa>
    119 12:27:43 jansora-Vostro-3669 kafka-server-start.sh[1061890]: [2021-01-19 12:27:43,202] INFO Kafka startTimeMs: 1611030463200 (org.apache.kafka.common.utils.AppInfoPa>
    119 12:27:43 jansora-Vostro-3669 kafka-server-start.sh[1061890]: [2021-01-19 12:27:43,209] INFO [KafkaServer id=1] started (kafka.server.KafkaServer)
    119 12:27:43 jansora-Vostro-3669 kafka-server-start.sh[1061890]: [2021-01-19 12:27:43,236] INFO [broker-1-to-controller-send-thread]: Recorded new controller, from now o>
    
    

    演示

    发送消息

    --broker-list 当前 kafka 服务

    --topic kafka 主题

    /storage/app/kafka/kafka_2.13-2.7.0/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
    

    发送消息

    >1
    >2
    >3
    >4
    >5
    >6
    >7
    >8
    >9
    

    接收消息

    --bootstrap-server 当前 kafka 服务

    --topic kafka 主题

    --from-beginning 从开始记录

    /storage/app/kafka/kafka_2.13-2.7.0/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    

    评论栏