介绍 http://kafka.apache.org kafka是一种高吞吐量的分布式发布订阅消息系统 kafka是linkedin用于日志处理的分布式消息队列,linkedin的日志数据容
介绍
http://kafka.apache.orgkafka是一种高吞吐量的分布式发布订阅消息系统
kafka是linkedin用于日志处理的分布式消息队列,linkedin的日志数据容量大,但对可靠性要求不高,其日志数据主要包括用户行为(登录、浏览、点击、分享、喜欢)以及系统运行日志(CPU、内存、磁盘、网络、系统及进程状态)
当前很多的消息队列服务提供可靠交付保证,并默认是即时消费(不适合离线)。
高可靠交付对linkedin的日志不是必须的,故可通过降低可靠性来提高性能,同时通过构建分布式的集群,允许消息在系统中累积,使得kafka同时支持离线和在线日志处理
测试环境
kafka_2.10-0.8.1.1 3个节点做的集群
zookeeper-3.4.5 一个实例节点
代码示例
消息生产者代码示例
import java.util.Collections;import java.util.Date;import java.util.Properties;import java.util.Random;import kafka.javaapi.producer.Producer;import kafka.producer.KeyedMessage;import kafka.producer.ProducerConfig;/** * 详细可以参考:https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+Producer+Example * @author Fung * */public class ProducerDemo { public static void main(String[] args) { Random rnd = new Random(); int events=100; // 设置配置属性 Properties props = new Properties(); props.put("metadata.broker.list","172.168.63.221:9092,172.168.63.233:9092,172.168.63.234:9092"); props.put("serializer.class", "kafka.serializer.StringEncoder"); // key.serializer.class默认为serializer.class props.put("key.serializer.class", "kafka.serializer.StringEncoder"); // 可选配置,如果不配置,则使用默认的partitioner props.put("partitioner.class", "com.catt.kafka.demo.PartitionerDemo"); // 触发acknowledgement机制,否则是fire and forget,可能会引起数据丢失 // 值为0,1,-1,可以参考 // http://kafka.apache.org/08/configuration.html props.put("request.required.acks", "1"); ProducerConfig config = new ProducerConfig(props); // 创建producer Producer<String, String> producer = new Producer<String, String>(config); // 产生并发送消息 long start=System.currentTimeMillis(); for (long i = 0; i < events; i++) { long runtime = new Date().getTime(); String ip = "192.168.2." + i;//rnd.nextInt(255); String msg = runtime + ",www.example.com," + ip; //如果topic不存在,则会自动创建,默认replication-factor为1,partitions为0 KeyedMessage<String, String> data = new KeyedMessage<String, String>( "page_visits", ip, msg); producer.send(data); } System.out.println("耗时:" + (System.currentTimeMillis() - start)); // 关闭producer producer.close(); }}
消息消费者代码示例
import java.util.HashMap;import java.util.List;import java.util.Map;import java.util.Properties;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import kafka.consumer.Consumer;import kafka.consumer.ConsumerConfig;import kafka.consumer.KafkaStream;import kafka.javaapi.consumer.ConsumerConnector;/** * 详细可以参考:https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example * * @author Fung * */public class ConsumerDemo { private final ConsumerConnector consumer; private final String topic; private ExecutorService executor; public ConsumerDemo(String a_zookeeper, String a_groupId, String a_topic) { consumer = Consumer.createJavaConsumerConnector(createConsumerConfig(a_zookeeper,a_groupId)); this.topic = a_topic; } public void shutdown() { if (consumer != null) consumer.shutdown(); if (executor != null) executor.shutdown(); } public void run(int numThreads) { Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); topicCountMap.put(topic, new Integer(numThreads)); Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer .createMessageStreams(topicCountMap); List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic); // now launch all the threads executor = Executors.newFixedThreadPool(numThreads); // now create an object to consume the messages // int threadNumber = 0; for (final KafkaStream stream : streams) { executor.submit(new ConsumerMsgTask(stream, threadNumber)); threadNumber++; } } private static ConsumerConfig createConsumerConfig(String a_zookeeper, String a_groupId) { Properties props = new Properties(); props.put("zookeeper.connect", a_zookeeper); props.put("group.id", a_groupId); props.put("zookeeper.session.timeout.ms", "400"); props.put("zookeeper.sync.time.ms", "200"); props.put("auto.commit.interval.ms", "1000"); return new ConsumerConfig(props); } public static void main(String[] arg) { String[] args = { "172.168.63.221:2188", "group-1", "page_visits", "12" }; String zooKeeper = args[0]; String groupId = args[1]; String topic = args[2]; int threads = Integer.parseInt(args[3]); ConsumerDemo demo = new ConsumerDemo(zooKeeper, groupId, topic); demo.run(threads); try { Thread.sleep(10000); } catch (InterruptedException ie) { } demo.shutdown(); }}
消息处理类
import kafka.consumer.ConsumerIterator;import kafka.consumer.KafkaStream;public class ConsumerMsgTask implements Runnable { private KafkaStream m_stream; private int m_threadNumber; public ConsumerMsgTask(KafkaStream stream, int threadNumber) { m_threadNumber = threadNumber; m_stream = stream; } public void run() { ConsumerIterator<byte[], byte[]> it = m_stream.iterator(); while (it.hasNext()) System.out.println("Thread " + m_threadNumber + ": " + new String(it.next().message())); System.out.println("Shutting down Thread: " + m_threadNumber); }}
Partitioner类示例
import kafka.producer.Partitioner;import kafka.utils.VerifiableProperties;public class PartitionerDemo implements Partitioner { public PartitionerDemo(VerifiableProperties props) { } @Override public int partition(Object obj, int numPartitions) { int partition = 0; if (obj instanceof String) { String key=(String)obj; int offset = key.lastIndexOf('.'); if (offset > 0) { partition = Integer.parseInt(key.substring(offset + 1)) % numPartitions; } }else{ partition = obj.toString().length() % numPartitions; } return partition; }}
pom.xml文件
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.xxx</groupId> <artifactId>kafka-demo</artifactId> <version>0.0.1-SNAPSHOT</version> <packaging>jar</packaging> <name>kafka-demo</name> <url>http://maven.apache.org</url> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> </properties> <dependencies> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.10</artifactId> <version>0.8.1.1</version> <exclusions> <exclusion> <artifactId>jmxtools</artifactId> <groupId>com.sun.jdmk</groupId> </exclusion> <exclusion> <artifactId>jmxri</artifactId> <groupId>com.sun.jmx</groupId> </exclusion> <exclusion> <artifactId>jms</artifactId> <groupId>javax.jms</groupId> </exclusion> </exclusions> </dependency> <dependency> <groupId>log4j</groupId> <artifactId>log4j</artifactId> <version>1.2.15</version> <exclusions> <exclusion> <artifactId>jmxtools</artifactId> <groupId>com.sun.jdmk</groupId> </exclusion> <exclusion> <artifactId>jmxri</artifactId> <groupId>com.sun.jmx</groupId> </exclusion> <exclusion> <artifactId>jms</artifactId> <groupId>javax.jms</groupId> </exclusion> <exclusion> <artifactId>mail</artifactId> <groupId>javax.mail</groupId> </exclusion> </exclusions> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.11</version> <scope>test</scope> </dependency> </dependencies></project>
参考
https://cwiki.apache.org/confluence/display/KAFKA/Index
https://kafka.apache.org/