侧边栏壁纸
博主头像
落叶人生博主等级

走进秋风,寻找秋天的落叶

  • 累计撰写 130562 篇文章
  • 累计创建 28 个标签
  • 累计收到 9 条评论
标签搜索

目 录CONTENT

文章目录

Kafka JAVA客户端代码示例

2022-07-06 星期三 / 0 评论 / 0 点赞 / 59 阅读 / 9273 字

介绍 http://kafka.apache.org kafka是一种高吞吐量的分布式发布订阅消息系统 kafka是linkedin用于日志处理的分布式消息队列,linkedin的日志数据容

介绍

     http://kafka.apache.org
    kafka是一种高吞吐量的分布式发布订阅消息系统
    kafka是linkedin用于日志处理的分布式消息队列,linkedin的日志数据容量大,但对可靠性要求不高,其日志数据主要包括用户行为(登录、浏览、点击、分享、喜欢)以及系统运行日志(CPU、内存、磁盘、网络、系统及进程状态)

    当前很多的消息队列服务提供可靠交付保证,并默认是即时消费(不适合离线)。

高可靠交付对linkedin的日志不是必须的,故可通过降低可靠性来提高性能,同时通过构建分布式的集群,允许消息在系统中累积,使得kafka同时支持离线和在线日志处理

测试环境

    kafka_2.10-0.8.1.1 3个节点做的集群

    zookeeper-3.4.5 一个实例节点

代码示例

消息生产者代码示例

import java.util.Collections;import java.util.Date;import java.util.Properties;import java.util.Random;import kafka.javaapi.producer.Producer;import kafka.producer.KeyedMessage;import kafka.producer.ProducerConfig;/** * 详细可以参考:https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+Producer+Example * @author Fung * */public class ProducerDemo {	public static void main(String[] args) {		Random rnd = new Random();		int events=100;		// 设置配置属性		Properties props = new Properties();		props.put("metadata.broker.list","172.168.63.221:9092,172.168.63.233:9092,172.168.63.234:9092");		props.put("serializer.class", "kafka.serializer.StringEncoder");		// key.serializer.class默认为serializer.class		props.put("key.serializer.class", "kafka.serializer.StringEncoder");		// 可选配置,如果不配置,则使用默认的partitioner		props.put("partitioner.class", "com.catt.kafka.demo.PartitionerDemo");		// 触发acknowledgement机制,否则是fire and forget,可能会引起数据丢失		// 值为0,1,-1,可以参考		// http://kafka.apache.org/08/configuration.html		props.put("request.required.acks", "1");		ProducerConfig config = new ProducerConfig(props);		// 创建producer		Producer<String, String> producer = new Producer<String, String>(config);		// 产生并发送消息		long start=System.currentTimeMillis();		for (long i = 0; i < events; i++) {			long runtime = new Date().getTime();			String ip = "192.168.2." + i;//rnd.nextInt(255);			String msg = runtime + ",www.example.com," + ip;			//如果topic不存在,则会自动创建,默认replication-factor为1,partitions为0			KeyedMessage<String, String> data = new KeyedMessage<String, String>(					"page_visits", ip, msg);			producer.send(data);		}		System.out.println("耗时:" + (System.currentTimeMillis() - start));		// 关闭producer		producer.close();	}}

消息消费者代码示例

import java.util.HashMap;import java.util.List;import java.util.Map;import java.util.Properties;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import kafka.consumer.Consumer;import kafka.consumer.ConsumerConfig;import kafka.consumer.KafkaStream;import kafka.javaapi.consumer.ConsumerConnector;/** * 详细可以参考:https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example *  * @author Fung * */public class ConsumerDemo {	private final ConsumerConnector consumer;	private final String topic;	private ExecutorService executor;	public ConsumerDemo(String a_zookeeper, String a_groupId, String a_topic) {		consumer = Consumer.createJavaConsumerConnector(createConsumerConfig(a_zookeeper,a_groupId));		this.topic = a_topic;	}	public void shutdown() {		if (consumer != null)			consumer.shutdown();		if (executor != null)			executor.shutdown();	}	public void run(int numThreads) {		Map<String, Integer> topicCountMap = new HashMap<String, Integer>();		topicCountMap.put(topic, new Integer(numThreads));		Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer				.createMessageStreams(topicCountMap);		List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);		// now launch all the threads		executor = Executors.newFixedThreadPool(numThreads);		// now create an object to consume the messages		//		int threadNumber = 0;		for (final KafkaStream stream : streams) {			executor.submit(new ConsumerMsgTask(stream, threadNumber));			threadNumber++;		}	}	private static ConsumerConfig createConsumerConfig(String a_zookeeper,			String a_groupId) {		Properties props = new Properties();		props.put("zookeeper.connect", a_zookeeper);		props.put("group.id", a_groupId);		props.put("zookeeper.session.timeout.ms", "400");		props.put("zookeeper.sync.time.ms", "200");		props.put("auto.commit.interval.ms", "1000");		return new ConsumerConfig(props);	}	public static void main(String[] arg) {		String[] args = { "172.168.63.221:2188", "group-1", "page_visits", "12" };		String zooKeeper = args[0];		String groupId = args[1];		String topic = args[2];		int threads = Integer.parseInt(args[3]);		ConsumerDemo demo = new ConsumerDemo(zooKeeper, groupId, topic);		demo.run(threads);		try {			Thread.sleep(10000);		} catch (InterruptedException ie) {		}		demo.shutdown();	}}

消息处理类

import kafka.consumer.ConsumerIterator;import kafka.consumer.KafkaStream;public class ConsumerMsgTask implements Runnable {	private KafkaStream m_stream;	private int m_threadNumber;	public ConsumerMsgTask(KafkaStream stream, int threadNumber) {		m_threadNumber = threadNumber;		m_stream = stream;	}	public void run() {		ConsumerIterator<byte[], byte[]> it = m_stream.iterator();		while (it.hasNext())			System.out.println("Thread " + m_threadNumber + ": "					+ new String(it.next().message()));		System.out.println("Shutting down Thread: " + m_threadNumber);	}}

Partitioner类示例

import kafka.producer.Partitioner;import kafka.utils.VerifiableProperties;public class PartitionerDemo implements Partitioner {	public PartitionerDemo(VerifiableProperties props) {	}	@Override	public int partition(Object obj, int numPartitions) {		int partition = 0;		if (obj instanceof String) {			String key=(String)obj;			int offset = key.lastIndexOf('.');			if (offset > 0) {				partition = Integer.parseInt(key.substring(offset + 1)) % numPartitions;			}		}else{			partition = obj.toString().length() % numPartitions;		}				return partition;	}}

pom.xml文件

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">	<modelVersion>4.0.0</modelVersion>	<groupId>com.xxx</groupId>	<artifactId>kafka-demo</artifactId>	<version>0.0.1-SNAPSHOT</version>	<packaging>jar</packaging>	<name>kafka-demo</name>	<url>http://maven.apache.org</url>	<properties>		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>	</properties>	<dependencies>		<dependency>			<groupId>org.apache.kafka</groupId>			<artifactId>kafka_2.10</artifactId>			<version>0.8.1.1</version>			<exclusions>				<exclusion>					<artifactId>jmxtools</artifactId>					<groupId>com.sun.jdmk</groupId>				</exclusion>				<exclusion>					<artifactId>jmxri</artifactId>					<groupId>com.sun.jmx</groupId>				</exclusion>				<exclusion>					<artifactId>jms</artifactId>					<groupId>javax.jms</groupId>				</exclusion>			</exclusions>		</dependency>		<dependency>			<groupId>log4j</groupId>			<artifactId>log4j</artifactId>			<version>1.2.15</version>			<exclusions>				<exclusion>					<artifactId>jmxtools</artifactId>					<groupId>com.sun.jdmk</groupId>				</exclusion>				<exclusion>					<artifactId>jmxri</artifactId>					<groupId>com.sun.jmx</groupId>				</exclusion>				<exclusion>					<artifactId>jms</artifactId>					<groupId>javax.jms</groupId>				</exclusion>				<exclusion>					<artifactId>mail</artifactId>					<groupId>javax.mail</groupId>				</exclusion>			</exclusions>		</dependency>		<dependency>			<groupId>junit</groupId>			<artifactId>junit</artifactId>			<version>4.11</version>			<scope>test</scope>		</dependency>	</dependencies></project>

参考

https://cwiki.apache.org/confluence/display/KAFKA/Index

https://kafka.apache.org/

广告 广告

评论区