nginx日志格式如下{"serverType":"nginx","meta":{"clientIp":"36.163.27.71","accessTime":"10/Nov/2016:23:01:3
nginx日志格式如下
{"serverType":"nginx","meta":{"clientIp":"36.163.27.71","accessTime":"10/Nov/2016:23:01:32 +0800","accessTime_unix":"1478790092.299","request":"POST /api/v250/index.api?a=1&b=2 HTTP/1.1","status":403,"bodySizes":578,"referer":"-","userAgent":"qt-android/2.5.0","host":"www.youpiaole.com","xForwardedFor":"36.163.27.71, 223.245.176.31","reqTime":"0.000","upRespTime":"-","httpVersion":"2.5.0","devType":"t_android","devId":"353591913759","channel":"t_android","appVer":"250","iosVer":"-","userId":"-","userType":"-","devToken":"-","srcType":"client","cookie":"JSESSIONID=D2D7E71BBCA2461A48B4D5F7015A2B3A; route=8bf44254c3316f9c4f0c454d3","upstreamIp":"192.168.1.101","scheme":"http"}}
原来使用cut、sed提取所需字段
zcat n_log/www.youpiaole.com/20161027/access/* | cut -d , -f 2,5,6,13,14,26 | sed 's/"//g;s/meta:{//;s/clientIp://;s/,request:/,/;s/?.*status:/,/;s/reqTime://;s/,upRespTime:/,/;s/,upstreamIp:/,/;s/GET //;s/POST //' > /tmp/20161027.log
再使用awk做统计
awk -F"," '{a[$1]+=1} END {for(i in a) {if(a[i] > 10) print a[i],i}}' /tmp/20161027.log
但由于日志是json格式,使用cut和sed提取所需字段过于繁琐,也不便于进一步的分析,后面改为使用python做统计分析。
python的多线程由于存在全局锁,效率不高,使用多进程的方式处理大量的日志数据
# -*- coding: utf-8 -*-import string,gzip,json,sys,os,re,threadingimport multiprocessingfrom multiprocessing import Process,Queue,Lock,Managerfrom Queue import Emptyimport multiprocessing.poolimport sys reload(sys) # 设置字符编码格式为utf-8sys.setdefaultencoding('utf-8')test = len(sys.argv)# 字符串转为数字,float类型def time2float(t) : if t == '-': tmp = 0 else : tmp = float(t) return tmplogfiles = []destDir = sys.argv[1]listfile = os.listdir(destDir)# 遍历日志目录,获取每一个日志文件全路径for path in listfile: path = destDir + path logfiles.append(path)def parseLine (line, lock, urlQueue, clientIpQueue, upstreamIpQueue) : line = re.sub(r'//x','',line) meta = json.loads(line)["meta"] # 提取所需字段 clientIp = meta["clientIp"] # 提取请求url,并去掉无关紧要的信息 request = meta["request"].replace("HTTP/1.1","").replace("POST ","").replace("GET ","") # 去掉url后面跟的参数 request = re.sub(r'/?.*$',"",request) status = meta["status"] reqTime = time2float(meta["reqTime"]) upRespTime = time2float(meta["upRespTime"]) upstreamIp = meta["upstreamIp"] # 分别放入队列中 urlQueue.put((request, status, reqTime, upRespTime)) clientIpQueue.put(clientIp) upstreamIpQueue.put(upstreamIp) def parse (path, lock, urlQueue, clientIpQueue, upstreamIpQueue) : fileContent = gzip.open(path, 'rt') a = 1 for line in fileContent: a = a + 1 # 如果是测试用的,到达指定行数(命令行最后一个参数)就退出 if test == 3: if a > int(sys.argv[2]): return 1 parseLine(line, lock, urlQueue, clientIpQueue, upstreamIpQueue)def readUrlQueue (urlQueue, urlsResultQueue, lock) : urls = {} while True: try: request,status,reqTime,upRespTime = urlQueue.get(True, 2) # 存入字典中 urlCount,urlReqTime,urlUpRespTime,urlStatus = urls.get(request,(1,reqTime,upRespTime,{})) urlStatus[status] = urlStatus.get(status,1) + 1 urls[request] = (urlCount + 1, urlReqTime + reqTime, urlUpRespTime + upRespTime, urlStatus) except Empty : break urlsResultQueue.put(urls)def readClientIpQueue (clientIpQueue, clientIpsResultQueue, lock) : clientIps = {} while True: try: clientIp = clientIpQueue.get(True, 2) clientIps[clientIp] = clientIps.get(clientIp,1) + 1 except Empty : break # 把最终结果放入队列,返回给主线程 clientIpsResultQueue.put(clientIps)def readUpstreamIpQueue (upstreamIpQueue, upstreamIpsResultQueue, lock) : upstreamIps = {} while True: try: upstreamIp = upstreamIpQueue.get(True, 2) upstreamIps[upstreamIp] = upstreamIps.get(upstreamIp,1) + 1 except Empty : break # 把最终结果放入队列,返回给主线程 upstreamIpsResultQueue.put(upstreamIps)# 队列读取进程readThreads = []manager = Manager()lock = manager.Lock()urlsResultQueue = manager.Queue(1)clientIpsResultQueue = manager.Queue(1)upstreamIpsResultQueue = manager.Queue(1)# 每个收集项一个队列urlQueue = manager.Queue(500)clientIpQueue = manager.Queue(500)upstreamIpQueue = manager.Queue(500)# 使用进程池,读取日志文件,每个文件一个进程pool = Pool(24)for f in logfiles : pool.apply_async(parse, (f, lock, urlQueue, clientIpQueue, upstreamIpQueue))# 分别启动队列读取进程urlP = Process(target=readUrlQueue, args=(urlQueue, urlsResultQueue, lock))urlP.start()readThreads.append(urlP)clientIpP = Process(target=readClientIpQueue, args=(clientIpQueue, clientIpsResultQueue, lock))clientIpP.start()readThreads.append(clientIpP)upstreamIpP = Process(target=readUpstreamIpQueue, args=(upstreamIpQueue, upstreamIpsResultQueue, lock))upstreamIpP.start()readThreads.append(upstreamIpP)# 由于队列读取进程中是死循环(while True),并设置了读取超时时间是2s,所以等待读取进程结束即可确保所有数据被处理完for t in readThreads : t.join()urls = urlsResultQueue.get(False, 1)clientIps = clientIpsResultQueue.get(False, 1)upstreamIps = upstreamIpsResultQueue.get(False, 1)# 排序,根据访问量从高到底排序finalUrls = sorted(urls.iteritems(), key=lambda d:d[1][0], reverse = True)finalClientIps = sorted(clientIps.iteritems(), key=lambda d:d[1], reverse = True)finalUpstreamIps = sorted(upstreamIps.iteritems(), key=lambda d:d[1], reverse = True)# 打印最终的统计数据print "upstreamIp","count"for key,value in finalUpstreamIps: print key, valueprintprint "count", "status", "reqTime", "upRespTime", "url"for key,value in finalUrls : urlCount,urlReqTime, urlUpRespTime, status = value print urlCount, round(urlReqTime / urlCount,2), round(urlUpRespTime / urlCount,2), str.join(',', map(lambda x: '{0}:{1}'.format(x,status[x]),status)), keyprint print "clientIp","count"for key,value in finalClientIps: if value > 10 : print key,value
统计结果如下
upstreamIp count- 385192.168.1.101:8080 26192.168.1.102:8080 25192.168.1.103:8080 21count status reqTime upRespTime url410 403:410,200:102 0.03 0.0 /api/v250/index.api67 200:42 0.2 0.2 /api/wap1.1.0/login/wap.apiclientIp count36.163.27.71 33
reqTime指总的接口耗时时间
upRespTime指从nginx向后端服务器建立连接,到后端服务器返回完数据,并关闭连接为止的时间
第一段是通过统计每个应用服务器处理的请求数,了解应用服务器的压力分布情况
第二段是统计每个url的请求次数,http code状态分布,掌握应用服务器的处理能力
第三段是统计客户端ip的访问次数,由于ip较多,只打印请求次数大于10的ip,可以找到恶意请求的ip