侧边栏壁纸
博主头像
落叶人生博主等级

走进秋风,寻找秋天的落叶

  • 累计撰写 130562 篇文章
  • 累计创建 28 个标签
  • 累计收到 9 条评论
标签搜索

目 录CONTENT

文章目录

python多进程处理大量日志

2023-12-11 星期一 / 0 评论 / 0 点赞 / 115 阅读 / 8012 字

nginx日志格式如下{"serverType":"nginx","meta":{"clientIp":"36.163.27.71","accessTime":"10/Nov/2016:23:01:3

nginx日志格式如下

{"serverType":"nginx","meta":{"clientIp":"36.163.27.71","accessTime":"10/Nov/2016:23:01:32 +0800","accessTime_unix":"1478790092.299","request":"POST /api/v250/index.api?a=1&b=2 HTTP/1.1","status":403,"bodySizes":578,"referer":"-","userAgent":"qt-android/2.5.0","host":"www.youpiaole.com","xForwardedFor":"36.163.27.71, 223.245.176.31","reqTime":"0.000","upRespTime":"-","httpVersion":"2.5.0","devType":"t_android","devId":"353591913759","channel":"t_android","appVer":"250","iosVer":"-","userId":"-","userType":"-","devToken":"-","srcType":"client","cookie":"JSESSIONID=D2D7E71BBCA2461A48B4D5F7015A2B3A; route=8bf44254c3316f9c4f0c454d3","upstreamIp":"192.168.1.101","scheme":"http"}}

原来使用cut、sed提取所需字段

zcat n_log/www.youpiaole.com/20161027/access/* | cut -d , -f 2,5,6,13,14,26 | sed 's/"//g;s/meta:{//;s/clientIp://;s/,request:/,/;s/?.*status:/,/;s/reqTime://;s/,upRespTime:/,/;s/,upstreamIp:/,/;s/GET //;s/POST //' > /tmp/20161027.log

再使用awk做统计

awk -F"," '{a[$1]+=1} END {for(i in a) {if(a[i] > 10) print a[i],i}}' /tmp/20161027.log

但由于日志是json格式,使用cut和sed提取所需字段过于繁琐,也不便于进一步的分析,后面改为使用python做统计分析。

python的多线程由于存在全局锁,效率不高,使用多进程的方式处理大量的日志数据

# -*- coding: utf-8 -*-import string,gzip,json,sys,os,re,threadingimport multiprocessingfrom multiprocessing import Process,Queue,Lock,Managerfrom Queue import Emptyimport multiprocessing.poolimport sys reload(sys) # 设置字符编码格式为utf-8sys.setdefaultencoding('utf-8')test = len(sys.argv)# 字符串转为数字,float类型def time2float(t) :  if t == '-':    tmp = 0  else :    tmp = float(t)        return tmplogfiles = []destDir = sys.argv[1]listfile = os.listdir(destDir)# 遍历日志目录,获取每一个日志文件全路径for path in listfile:  path = destDir + path  logfiles.append(path)def parseLine (line, lock, urlQueue, clientIpQueue, upstreamIpQueue) :  line = re.sub(r'//x','',line)  meta = json.loads(line)["meta"]  # 提取所需字段  clientIp = meta["clientIp"]  # 提取请求url,并去掉无关紧要的信息  request = meta["request"].replace("HTTP/1.1","").replace("POST ","").replace("GET ","")  # 去掉url后面跟的参数  request = re.sub(r'/?.*$',"",request)  status = meta["status"]  reqTime = time2float(meta["reqTime"])  upRespTime = time2float(meta["upRespTime"])  upstreamIp = meta["upstreamIp"]  # 分别放入队列中  urlQueue.put((request, status, reqTime, upRespTime))  clientIpQueue.put(clientIp)  upstreamIpQueue.put(upstreamIp) def parse (path, lock, urlQueue, clientIpQueue, upstreamIpQueue) :  fileContent = gzip.open(path, 'rt')  a = 1  for line in fileContent:    a = a + 1    # 如果是测试用的,到达指定行数(命令行最后一个参数)就退出    if test == 3:      if a > int(sys.argv[2]):        return 1     parseLine(line, lock, urlQueue, clientIpQueue, upstreamIpQueue)def readUrlQueue (urlQueue, urlsResultQueue, lock) :  urls = {}  while True:    try:      request,status,reqTime,upRespTime = urlQueue.get(True, 2)      # 存入字典中         urlCount,urlReqTime,urlUpRespTime,urlStatus = urls.get(request,(1,reqTime,upRespTime,{}))      urlStatus[status] = urlStatus.get(status,1) + 1      urls[request] = (urlCount + 1, urlReqTime + reqTime, urlUpRespTime + upRespTime, urlStatus)    except Empty :      break  urlsResultQueue.put(urls)def readClientIpQueue (clientIpQueue, clientIpsResultQueue, lock) :  clientIps = {}  while True:    try:      clientIp = clientIpQueue.get(True, 2)      clientIps[clientIp] = clientIps.get(clientIp,1) + 1    except Empty :      break  # 把最终结果放入队列,返回给主线程  clientIpsResultQueue.put(clientIps)def readUpstreamIpQueue (upstreamIpQueue, upstreamIpsResultQueue, lock) :  upstreamIps = {}  while True:    try:      upstreamIp = upstreamIpQueue.get(True, 2)      upstreamIps[upstreamIp] = upstreamIps.get(upstreamIp,1) + 1    except Empty :      break  # 把最终结果放入队列,返回给主线程  upstreamIpsResultQueue.put(upstreamIps)# 队列读取进程readThreads = []manager = Manager()lock = manager.Lock()urlsResultQueue = manager.Queue(1)clientIpsResultQueue = manager.Queue(1)upstreamIpsResultQueue = manager.Queue(1)# 每个收集项一个队列urlQueue = manager.Queue(500)clientIpQueue = manager.Queue(500)upstreamIpQueue = manager.Queue(500)# 使用进程池,读取日志文件,每个文件一个进程pool = Pool(24)for f in logfiles :  pool.apply_async(parse, (f, lock, urlQueue, clientIpQueue, upstreamIpQueue))# 分别启动队列读取进程urlP = Process(target=readUrlQueue, args=(urlQueue, urlsResultQueue, lock))urlP.start()readThreads.append(urlP)clientIpP = Process(target=readClientIpQueue, args=(clientIpQueue, clientIpsResultQueue, lock))clientIpP.start()readThreads.append(clientIpP)upstreamIpP = Process(target=readUpstreamIpQueue, args=(upstreamIpQueue, upstreamIpsResultQueue, lock))upstreamIpP.start()readThreads.append(upstreamIpP)# 由于队列读取进程中是死循环(while True),并设置了读取超时时间是2s,所以等待读取进程结束即可确保所有数据被处理完for t in readThreads :   t.join()urls = urlsResultQueue.get(False, 1)clientIps = clientIpsResultQueue.get(False, 1)upstreamIps = upstreamIpsResultQueue.get(False, 1)# 排序,根据访问量从高到底排序finalUrls = sorted(urls.iteritems(), key=lambda d:d[1][0], reverse = True)finalClientIps = sorted(clientIps.iteritems(), key=lambda d:d[1], reverse = True)finalUpstreamIps = sorted(upstreamIps.iteritems(), key=lambda d:d[1], reverse = True)# 打印最终的统计数据print "upstreamIp","count"for key,value in finalUpstreamIps:  print key, valueprintprint "count", "status", "reqTime", "upRespTime", "url"for key,value in finalUrls :  urlCount,urlReqTime, urlUpRespTime, status = value   print urlCount, round(urlReqTime / urlCount,2), round(urlUpRespTime / urlCount,2), str.join(',', map(lambda x: '{0}:{1}'.format(x,status[x]),status)), keyprint print "clientIp","count"for key,value in finalClientIps:  if value > 10 :    print key,value

统计结果如下

upstreamIp count- 385192.168.1.101:8080 26192.168.1.102:8080 25192.168.1.103:8080 21count status reqTime upRespTime url410 403:410,200:102 0.03 0.0 /api/v250/index.api67 200:42 0.2 0.2 /api/wap1.1.0/login/wap.apiclientIp count36.163.27.71 33
  • reqTime指总的接口耗时时间

  • upRespTime指从nginx向后端服务器建立连接,到后端服务器返回完数据,并关闭连接为止的时间

  • 第一段是通过统计每个应用服务器处理的请求数,了解应用服务器的压力分布情况

  • 第二段是统计每个url的请求次数,http code状态分布,掌握应用服务器的处理能力

  • 第三段是统计客户端ip的访问次数,由于ip较多,只打印请求次数大于10的ip,可以找到恶意请求的ip

广告 广告

评论区