侧边栏壁纸
博主头像
落叶人生博主等级

走进秋风,寻找秋天的落叶

  • 累计撰写 130562 篇文章
  • 累计创建 28 个标签
  • 累计收到 9 条评论
标签搜索

目 录CONTENT

文章目录

ELK 进阶之收集处理(一)

2023-10-02 星期一 / 0 评论 / 0 点赞 / 50 阅读 / 5630 字

首先是安装redis,因为主要讲解收集处理,所以redis的安装就略过;有需要的可以参考文章:https://my.oschina.net/xwzj/blog/1476074 logstash的输出(

首先是安装redis,因为主要讲解收集处理,所以redis的安装就略过;有需要的可以参考文章:https://my.oschina.net/xwzj/blog/1476074

logstash的输出(转储)

修改入门时安装时的配置文件:

    vi /home/user/elk/logstash-2.1.1/conf/simple.conf

在output的位置加上

   redis {
          data_type =>"channel"
          key=>"logstash"
          host=>"192.168.1.157"
          port=>6379
         # password => "1234567"
    }

如下图:

保存退出后重启ELK;

进入redis的客户端:

  [root@localhost bin]#  cd /usr/local/redis/bin

  [root@localhost bin]#  redis-cli

进去之后执行

127.0.0.1:6379>

127.0.0.1:6379> SUBSCRIBE logstash

回车之后如下图:

然后请求接口之后出现:

就代表运行转储成功了。

使用Jedis的Subscribe功能对redis的数据进行获取和存储

Jedis定义了抽象类JedisPubSub,在这个类中,定义publish/subsribe的回调方法。通过继承JedisPubSub类并重新实现这些回调方法,当publish/subsribe事件发生时,我们可以定制自己的处理逻辑。

在以下例子中,我们定义了Subscriber类,这个类继承了JedisPubSub类,并重新实现了其中的回调方法。

Subscriber.java

 

import redis.clients.jedis.JedisPubSub;


public class Subscriber extends JedisPubSub {
    public Subscriber() {
    }

    public void onMessage(String channel, String message) {
        System.out.println(String.format("receive redis published message, channel %s, message %s", channel, message));
    }

    public void onSubscribe(String channel, int subscribedChannels) {
        System.out.println(String.format("subscribe redis channel success, channel %s, subscribedChannels %d", 
                channel, subscribedChannels));
    }

    public void onUnsubscribe(String channel, int subscribedChannels) {
        System.out.println(String.format("unsubscribe redis channel, channel %s, subscribedChannels %d", 
                channel, subscribedChannels));

    }
}

 

定义SubThread线程类

由于Jedis的subscribe操作是阻塞的,因此,我们另起一个线程来进行subscribe操作。

SubThread.java

import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;


public class SubThread extends Thread {
    private final JedisPool jedisPool;
    private final Subscriber subscriber = new Subscriber();

    private final String channel = "mychannel";

    public SubThread(JedisPool jedisPool) {
        super("SubThread");
        this.jedisPool = jedisPool;
    }

    public static void main(String[] args) {
        System.out.println(String.format("subscribe redis, channel %s, thread will be blocked", channel));
        Jedis jedis = null;
        try {
            jedis = jedisPool.getResource();
            jedis.subscribe(subscriber, channel);
        } catch (Exception e) {
            System.out.println(String.format("subsrcibe channel error, %s", e));
        } finally {
            if (jedis != null) {
                jedis.close();
            }
        }
    }
}

以下是得到的运行结果:

 

有疑问的麻烦留言,谢谢;

参考文章:

logstash 操作redis:http://blog.csdn.net/zhaoyangjian724/article/details/52252166

Jedis实现Subscribe功能:http://blog.csdn.net/lihao21/article/details/48370687

 

广告 广告

评论区