首先是安装redis,因为主要讲解收集处理,所以redis的安装就略过;有需要的可以参考文章:https://my.oschina.net/xwzj/blog/1476074 logstash的输出(
首先是安装redis,因为主要讲解收集处理,所以redis的安装就略过;有需要的可以参考文章:https://my.oschina.net/xwzj/blog/1476074
logstash的输出(转储)
修改入门时安装时的配置文件:
vi /home/user/elk/logstash-2.1.1/conf/simple.conf
在output的位置加上
redis {
data_type =>"channel"
key=>"logstash"
host=>"192.168.1.157"
port=>6379
# password => "1234567"
}
如下图:
保存退出后重启ELK;
进入redis的客户端:
[root@localhost bin]# cd /usr/local/redis/bin
[root@localhost bin]# redis-cli
进去之后执行
127.0.0.1:6379>
127.0.0.1:6379> SUBSCRIBE logstash
回车之后如下图:
然后请求接口之后出现:
就代表运行转储成功了。
使用Jedis的Subscribe功能对redis的数据进行获取和存储
Jedis定义了抽象类JedisPubSub
,在这个类中,定义publish/subsribe的回调方法。通过继承JedisPubSub
类并重新实现这些回调方法,当publish/subsribe事件发生时,我们可以定制自己的处理逻辑。
在以下例子中,我们定义了Subscriber
类,这个类继承了JedisPubSub
类,并重新实现了其中的回调方法。
Subscriber.java
import redis.clients.jedis.JedisPubSub;
public class Subscriber extends JedisPubSub {
public Subscriber() {
}
public void onMessage(String channel, String message) {
System.out.println(String.format("receive redis published message, channel %s, message %s", channel, message));
}
public void onSubscribe(String channel, int subscribedChannels) {
System.out.println(String.format("subscribe redis channel success, channel %s, subscribedChannels %d",
channel, subscribedChannels));
}
public void onUnsubscribe(String channel, int subscribedChannels) {
System.out.println(String.format("unsubscribe redis channel, channel %s, subscribedChannels %d",
channel, subscribedChannels));
}
}
定义SubThread线程类
由于Jedis的subscribe
操作是阻塞的,因此,我们另起一个线程来进行subscribe操作。
SubThread.java
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
public class SubThread extends Thread {
private final JedisPool jedisPool;
private final Subscriber subscriber = new Subscriber();
private final String channel = "mychannel";
public SubThread(JedisPool jedisPool) {
super("SubThread");
this.jedisPool = jedisPool;
}
public static void main(String[] args) {
System.out.println(String.format("subscribe redis, channel %s, thread will be blocked", channel));
Jedis jedis = null;
try {
jedis = jedisPool.getResource();
jedis.subscribe(subscriber, channel);
} catch (Exception e) {
System.out.println(String.format("subsrcibe channel error, %s", e));
} finally {
if (jedis != null) {
jedis.close();
}
}
}
}
以下是得到的运行结果:
有疑问的麻烦留言,谢谢;
参考文章:
logstash 操作redis:http://blog.csdn.net/zhaoyangjian724/article/details/52252166
Jedis实现Subscribe功能:http://blog.csdn.net/lihao21/article/details/48370687