一、环境准备:
下载kafka官方安装包:http://mirrors.tuna.tsinghua.edu.cn/apache/kafka/0.9.0.1/kafka_2.10-0.9.0.1.tgz
获取最新的下载包: http://kafka.apache.org/downloads.html
服务器使用:centos7 jdk1.8或者jdk1.7
kafka版本:kafka_2.10-0.9.0.1.tgz
二、安装JDK
解压配置环境变量即可(已安装openjdk,需要先卸载)
rpm包安装参照:http://wkm.iteye.com/blog/1249553
三、安装kafka:
# tar -xvf kafka_2.10-0.9.0.1.tgz
kafka解压即可使用
四、启动kafka
kafka依赖 ZooKeeper服务,故先要启动ZooKeeper服务。
kafka中带有ZooKeeper服务,启动方式如下:
# cd kafka_2.10-0.9.0.1
# ./bin/zookeeper-server-start.sh config/zookeeper.properties
ZooKeeper服务默认端口为 2181
接下来启动kafka服务
# ./bin/kafka-server-start.sh config/server.properties
kafka服务默认端口为 9092
五、测试kafka
1、创建名称为my-topic的topic:
# ./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic my-topic
2、查询topic列表
# ./bin/kafka-topics.sh --list --zookeeper localhost:2181
my-topic
3、启动消息生产者并发布消息:
# ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-topic
message1
4、启动消息消费者订阅接收消息:
# ./bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic my-topic --from-beginning
message1
到这里kafka单击服务已经成功启动,并可以发送和接收消息了。
六、关闭kafka
1、关闭kafka
# ./bin/kafka-server-stop.sh
2、关闭ZooKeeper服务
# ./bin/zookeeper-server-stop.sh
-------到这里,单机版的kafka服务已成功完成安装配置了,下面我们用kafka-client来发布订阅消息---------
生产者代码样例详见官方文档:http://kafka.apache.org/090/javadoc/index.html?org/apache/kafka/clients/producer/KafkaProducer.html
消费者代码样例详见官方文档:http://kafka.apache.org/090/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html
注意:由于上面的kafka服务配置的是单机版的,下面的代码运行需要与kafka服务在同一台机器上。
如果你的kafka-client和kafka服务不在同一台机器上,修改kafka服务配置文件
修改配置文件 kafka_2.10-0.9.0.1/config/server.properties
1、将
#advertised.host.name=<hostname routable by clients>
改为
advertised.host.name=kafka机器的IP地址
2、将
#host.name=localhost
改为
host.name=kafka机器的IP地址
添加依赖包:kafka-client
gradle 项目:
compile group: 'org.apache.kafka', name: 'kafka-clients', version: '0.9.0.1'
maven项目:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.9.0.1</version>
</dependency>
参照官方样例(生产者):
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
for(Object key:props.keySet()){
LOG.info("key:" + key + ";value:" + props.get(key));
}
Producer<String, String> producer = new KafkaProducer<String, String>(props);
for(int i = 0; i < 10000; i++) {
LOG.info("send ..." + i);
ProducerRecord record = new ProducerRecord<String, String>("my-topic", Integer.toString(i), Integer.toString(i));
LOG.info("p:" + record.partition());
LOG.info("t:" + record.topic());
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if(metadata != null) {
LOG.info("metadata:" + metadata.topic() + "" + metadata.partition() + "" + metadata.offset());
} else {
LOG.info("metadata:" + metadata);
}
if(exception != null) {
LOG.info("exception:" + exception.getMessage(), exception);
} else {
LOG.info("exception:" + exception);
}
}
});
}
producer.close();
参照官方样例(消费者):
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test1");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
for(Object key:props.keySet()){
LOG.info("key:" + key + ";value:" + props.get(key));
}
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("my-topic"));
int count = 0;
while (count < 1000) {
LOG.info("receive message...");
ConsumerRecords<String, String> records = consumer.poll(1000);
LOG.info("received");
for (ConsumerRecord<String, String> record : records) {
LOG.info("offset = " + record.offset() + ", key = " + record.key() + ", value = " + record.value());
}
count ++;
}
consumer.close();
已经过测试,希望本文能帮到您。
分享到:
相关推荐
依赖方式 <groupId>org.apache.kafka <artifactId>kafka_2.10 <version>0.10.0.0 但是没有被中央仓库的任何jar包依赖!您可以在这里点击下载
<groupId>org.apache.kafka <artifactId>kafka-clients <version>0.10.1.1 </dependency>
<groupId>org.apache.kafka <artifactId>kafka-log4j-appender <version>0.10.2.0 </dependency>
Apache Kafka实战.pdf..
Apache Kafka Apache Kafka Apache Kafka Apache Kafka
Apache Kafka源码剖析 PDF较大,分5份上传!一起解压即可。
apache-kafka-documentation-cn
Spring for Apache Kafka API。 Spring for Apache Kafka 开发文档。
This book is here to help you get familiar with Apache Kafka and to solve your challenges related to the consumption of millions of messages in publisher-subscriber architectures. It is aimed at ...
Kafka是一个对于像Hadoop的一样的日志数据和离线分析系统,但又要求实时处理的限制,这是一个可行的解决方案。 1.Kafka集群包含一个或多个服务器,这种服务器被称为broker 2.Partition是物理上的概念,每个Topic...
apache-kafka-1.0.0 java Demo(附jar),只是简单的实现,没有使用连接池。
Apache Kafka is a popular distributed streaming platform that acts as a messaging queue or an enterprise messaging system. It lets you publish and subscribe to a stream of records and process them in ...
apache kafka技术内幕 和 apacke kafka源码分析2本PDF 电子书 网盘下载
Apache Kafka 官方文档中文版,Apache Kafka 官方文档中文版
Building Data Streaming Applications with Apache Kafka 英文azw3 本资源转载自网络,如有侵权,请联系上传者或csdn删除 本资源转载自网络,如有侵权,请联系上传者或csdn删除
Learning Apache Kafka Second Edition provides you with step-by-step, practical examples that help you take advantage of the real power of Kafka and handle hundreds of megabytes of messages per second ...
li-apache-kafka-clients介绍li-apache-kafka-clients是在香草Apache Kafka客户端之上构建的包装Kafka客户端库。 Apache Kafka现在已成为非常流行的消息传递系统,并以其低延迟,高吞吐量和持久的消息传递而闻名。 ...
Kafka是由Apache软件基金会开发的一个开源流处理平台,由Scala和Java编写。Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者规模的网站中的所有动作流数据。 这种动作(网页浏览,搜索和其他用户的...
。Apache Kafka Cookbook(PACKT,2015)