`

初尝Apache之kafka

阅读更多

一、环境准备:

       下载kafka官方安装包:http://mirrors.tuna.tsinghua.edu.cn/apache/kafka/0.9.0.1/kafka_2.10-0.9.0.1.tgz

        获取最新的下载包: http://kafka.apache.org/downloads.html

        服务器使用:centos7  jdk1.8或者jdk1.7

        kafka版本:kafka_2.10-0.9.0.1.tgz

二、安装JDK

       解压配置环境变量即可(已安装openjdk,需要先卸载)

       rpm包安装参照:http://wkm.iteye.com/blog/1249553

三、安装kafka:

       # tar -xvf kafka_2.10-0.9.0.1.tgz

       kafka解压即可使用

四、启动kafka

       kafka依赖 ZooKeeper服务,故先要启动ZooKeeper服务。

       kafka中带有ZooKeeper服务,启动方式如下:

       # cd kafka_2.10-0.9.0.1

       # ./bin/zookeeper-server-start.sh config/zookeeper.properties

       ZooKeeper服务默认端口为 2181

 

       接下来启动kafka服务

       # ./bin/kafka-server-start.sh config/server.properties

       kafka服务默认端口为 9092

 

五、测试kafka

       1、创建名称为my-topic的topic:

       # ./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic my-topic

       2、查询topic列表

       # ./bin/kafka-topics.sh --list --zookeeper localhost:2181

          my-topic

       3、启动消息生产者并发布消息:

       # ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-topic

          message1

       4、启动消息消费者订阅接收消息:

       # ./bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic my-topic --from-beginning

          message1

  

       到这里kafka单击服务已经成功启动,并可以发送和接收消息了。

六、关闭kafka

      1、关闭kafka

       # ./bin/kafka-server-stop.sh

       2、关闭ZooKeeper服务

       # ./bin/zookeeper-server-stop.sh

 

 -------到这里,单机版的kafka服务已成功完成安装配置了,下面我们用kafka-client来发布订阅消息---------

   生产者代码样例详见官方文档:http://kafka.apache.org/090/javadoc/index.html?org/apache/kafka/clients/producer/KafkaProducer.html

   消费者代码样例详见官方文档:http://kafka.apache.org/090/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html

   注意:由于上面的kafka服务配置的是单机版的,下面的代码运行需要与kafka服务在同一台机器上。

   如果你的kafka-client和kafka服务不在同一台机器上,修改kafka服务配置文件

   修改配置文件  kafka_2.10-0.9.0.1/config/server.properties

    1、将

         #advertised.host.name=<hostname routable by clients>

         改为

          advertised.host.name=kafka机器的IP地址

    2、将

         #host.name=localhost

         改为

         host.name=kafka机器的IP地址

   添加依赖包:kafka-client

   gradle 项目:

  

compile group: 'org.apache.kafka', name: 'kafka-clients', version: '0.9.0.1'

 

   maven项目:
  

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>0.9.0.1</version>
</dependency>

 

 

    参照官方样例(生产者):

 

        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("acks", "all");
        props.put("retries", 0);
        props.put("batch.size", 16384);
        props.put("linger.ms", 1);
        props.put("buffer.memory", 33554432);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        for(Object key:props.keySet()){
            LOG.info("key:" + key + ";value:" + props.get(key));
        }
        Producer<String, String> producer = new KafkaProducer<String, String>(props);
        for(int i = 0; i < 10000; i++) {
            LOG.info("send ..." + i);
            ProducerRecord record = new ProducerRecord<String, String>("my-topic", Integer.toString(i), Integer.toString(i));
            LOG.info("p:" + record.partition());
            LOG.info("t:" + record.topic());
            producer.send(record, new Callback() {
                @Override
                public void onCompletion(RecordMetadata metadata, Exception exception) {
                    if(metadata != null) {
                        LOG.info("metadata:" + metadata.topic() + "" + metadata.partition() + "" + metadata.offset());
                    } else {
                        LOG.info("metadata:" + metadata);
                    }
                    if(exception != null) {
                        LOG.info("exception:" + exception.getMessage(), exception);
                    } else {
                        LOG.info("exception:" + exception);
                    }
                }
            });
        }
        producer.close();
     参照官方样例(消费者):

 

   
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("group.id", "test1");
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "1000");
        props.put("session.timeout.ms", "30000");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        for(Object key:props.keySet()){
            LOG.info("key:" + key + ";value:" + props.get(key));
        }
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList("my-topic"));
        int count = 0;
        while (count < 1000) {
            LOG.info("receive message...");
            ConsumerRecords<String, String> records = consumer.poll(1000);
            LOG.info("received");
            for (ConsumerRecord<String, String> record : records) {
                LOG.info("offset = " + record.offset() + ", key = " + record.key() + ", value = "  + record.value());
            }
            count ++;
        }
        consumer.close();
     已经过测试,希望本文能帮到您。
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics