前言这两个关于NameNode的问题其实非常地经典,不仅有很多细节可询,而且也是面试的一个高频问题,所以特意独立出来一篇。元数据管理会结合源码来讲,而双缓冲虽然暂时没去翻源码,但是我们可以借由一个简单
前言
这两个关于NameNode的问题其实非常地经典,不仅有很多细节可询,而且也是面试的一个高频问题,所以特意独立出来一篇。元数据管理会结合源码来讲,而双缓冲虽然暂时没去翻源码,但是我们可以借由一个简单的实现去向大家好好地说明。后面也会对这段源码进行一些修改操作来让它更为高效。那话不多说咱们就开始吧
因为直接看源码大家可能接受不了,所以我们先来聊聊双缓冲机制。
一、Namenode的双缓冲机制
1.1 问题场景
Namenode里面的元数据是以两种状态进行存储的:
第一状态即是存储在内存里面,也就是刚刚所提到的目录树,它就是一个list,在内存里面更新元数据速度是很快的。但是如果仅仅只在内存里存放元数据,数据是不太安全的。
所以我们在磁盘上也会存储一份元数据,可是此时问题就出现了,我们需要把数据写进磁盘,这个性能肯定是不太好的呀。可NameNode作为整个集群的老大,在hadoop上进行hive,HBASE,spark,flink等计算,这些数据都会不停给NameNode施压写元数据,一天下来一亿条元数据都是可能的,所以NameNode的设计肯定是要支持超高并发的,可是写磁盘这操作是非常非常慢的,一秒几十或者最多几百都已经封顶了,那现在咋办?
1.2 元数据的双缓冲机制
而且在此基础上,hadoop会给每一个元数据信息的修改赋予一个事务ID号,保证操作都是有序的。这也是出于数据的安全考虑。这样整个系统要求的内存会非常大,所以这关乎一个hadoop的优化问题,在之后将会提及。
1.3 双缓冲机制的代码实现
这个对理解起来其实十分有帮助,希望大家能跟着思路走
1.3.1 模拟一个元数据信息类
我们先设计一条元数据信息出来
public class EditLog{ //事务的ID public long taxid; public String log; public EditLog(long taxid, String log) { this.taxid = taxid; this.log = log; } @Override public String toString() { return "EditLog [taxid=" + taxid + ", log=" + log + "]"; } }
1.3.2 双缓冲区
代码其实不难,分为5块
① 定义了两个缓冲区currentBuffer(有序队列)和syncBuffer
② 一个write方法负责写入元数据
③ 一个flush方法把元数据写入到磁盘上,这里我只用了一个打印语句模拟了一下写入磁盘,写入完成后清空syncBuffer的数据
④ 一个exchange方法来交换currentBuffer和syncBuffer
⑤ 还有一个getMaxTaxid方法获取到正在同步数据的内存里面事务ID的最大ID,这个方法的作用稍后说明
这5块除了最后的获取ID,应该大家都知道是干嘛用的了吧,那行,之后就会揭晓
public class DoubleBuffer{ //写数据,有序队列 LinkedList<EditLog> currentBuffer = new LinkedList<EditLog>(); //用来把数据持久化到磁盘上面的内存 LinkedList<EditLog> syncBuffer = new LinkedList<EditLog>(); /** * 写元数据信息 * @param editLog */ public void write(EditLog editLog){ currentBuffer.add(editLog); } /** * 把数据写到磁盘 */ public void flush() { for(EditLog editLog:syncBuffer) { //模拟将数据写到磁盘 System.out.println(editLog); } syncBuffer.clear(); } /** * 交换currentBuffer和syncBuffer */ public void exchange() { LinkedList<EditLog> tmp=currentBuffer; currentBuffer=syncBuffer; syncBuffer=tmp; } /** * 获取到正在同步数据的内存里面事务ID的最大ID */ public long getMaxTaxid() { return syncBuffer.getLast().taxid; } }
1.3.3 写元数据日志的核心方法
那我现在要保证这个写操作(这里的写操作是客户端向bufCurrent写)的顺序,所以我们在这里会使用synchronized来加锁,然后通过taxid++顺序处理。然后new出一个元数据对象,把对象写进磁盘
long taxid=0L;//DoubleBuffer doubleBuffer=new DoubleBuffer();//每个线程自己拥有的副本ThreadLocal<Long> threadLocal=new ThreadLocal<Long>();private void logEdit(String log) { synchronized (this) { taxid++; // 让每个线程里面都拥有自己的事务ID号,作用后面会解释 threadLocal.set(taxid); EditLog editLog=new EditLog(taxid,log); //往内存里面写东西 doubleBuffer.write(editLog); } // 此时锁释放 //将数据持久化到硬盘的方法 logFlush(); }
那有小伙伴就会有疑问了,都加了锁了这运行的性能能好?可是你要知道这把锁里面doubleBuffer.write(editLog)这是往内存里面写东西的呀。所以这是没有问题的,也能完美支持高并发
事先提及一下,这里将会用到分段加锁,比如此时我们有3个线程,线程1进来logEdit,执行完write之后,立刻锁就会被释放,然后线程2立刻又能紧随其后write,写完又到线程3。
因为写内存的速度是极快的,所以此时在还没轮到**logFlush()方法(将数据持久化到硬盘的方法)**执行,我们可能都已经都已经完成了3个数据往bufCurrent写入的操作。
温馨提示:此时这边的线程1将要进入到logFlush了,可是此时bufCurrent可能已经夹带了线程1,2,3的数据了,现在我先做个假设,线程1,2,3写入的元数据分别就是1,2,3,这句话非常重要!!!这句话非常重要!!!这句话非常重要!!!非常重要的事情说三遍,然后请看logFlush的解释
1.3.4 logFlush --- 将数据持久化到硬盘的方法
//判断此时后台正在把数据同步到磁盘上public boolean isSyncRunning =false;//正在同步磁盘的内存块里面最大的一个ID号。long maxtaxid=0L;boolean isWait=false;private void logFlush() { synchronized(this) { if(isSyncRunning) { //获取当前线程的是事务ID long localTaxid=threadLocal.get(); if(localTaxid <= maxtaxid) { return; } if(isWait) { return; } isWait=true; while(isSyncRunning) { try { //一直等待 //wait这个操作是释放锁 this.wait(1000); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } isWait=false; // } //代码就走到这儿 //syncBuffer(1,2,3) doubleBuffer.exchange(); //maxtaxid = 3 if(doubleBuffer.syncBuffer.size() > 0) { maxtaxid=doubleBuffer.getMaxTaxid(); } isSyncRunning=true; } //释放锁 //把数据持久化到磁盘,比较耗费性能的。 doubleBuffer.flush(); //分段加锁 synchronized (this) { //修改isSyncRunning的状态 isSyncRunning=false; //唤醒wait; this.notifyAll(); }
思路非常清晰,现在我们就来整理一下。
这个logFlush的方法的流程就是,我先使用一个boolean值 isSyncRunning 判断此时后台正在把数据同步到磁盘上,这个东西在客户端没有足够数据写进来之前一开始肯定是false的,但是如果写进bufCurrent的数据已经差不多了,那我就要把bufCurrent和syncBuffer交换,把 isSyncRunning 改成true。此时记录一下正在同步磁盘的内存块里面最大的一个ID号maxtaxid(后面需要使用)。然后让原本的bufCurrent往磁盘上写数据。在写入完成后,isSyncRunning的值修改回false
如果现在第二个线程夹带着它的数据2进来了logFlush,此时写入磁盘的操作还没有执行完成,那它就会先获取当前线程的事务ID---localTaxid,如果当前的这个localTaxid小于我现在进行同步的事务ID的最大值(2<3),那就说明现在的这个线程2所夹带的数据我已经在上一个线程操作中了。那我就直接无视(如果不理解为啥无视,直接看下一段话)
这里就要使用到我刚刚说了三遍很重要的事情了,上一个线程1进来的时候,bufCurrent已经夹带了数据1,2,3,此时我的maxtaxid=3,线程2所夹带的2,已经是在处理中的数据了
但是如果localTaxid大于我现在进行同步的事务ID的最大值,但是此时又还有线程在同步元数据,那我就让它等wait,此时我这边一wait,那边客户端又可以继续往bufCurrent写入元数据了。这里代码的逻辑是等待1s后,又重新去查看是否有线程正在同步元数据。然后在我同步元数据的操作后面,添加上唤醒这个wait的操作,因为在这一瞬间我同步结束后,如果这个线程仍然在wait,那岂不是在白等了,所以我这边处理完了,立刻就唤醒它来继续同步,不浪费大家时间。
1.3.5 编写一个main方法跑跑看
public static void main(String[] args) { FSEdit fs=new FSEdit(); for (int i = 0; i < 1000; i++) { new Thread(new Runnable() { @Override public void run() { for (int j = 0; j < 100; j++) { fs.logEdit("日志"); } } }).start(); }}
我们随便上个10W条跑跑看,似乎是3到4秒就搞定了,而且生成的EditLog都是有序的
这个套路其实完全是模仿了hadoop的源码写了一个大概的,后面我们也会对这个地方的源码进行修改。但是也是非常地接近了。
在这里我也可以说明有哪些地方的不足,比如我们这样操作内存频繁地交换,那肯定是会对性能产生一定影响的,所以我们会在这块设置一个合理的大小再进行交换。
二、NameNode是如何管理元数据的
分析NameNode对元数据的管理这个问题我们的做法很简单,先通过命令创建一个目录,然后看HDFS的元数据是否随之发生了变化
hadoop fs -mkdir /user/soft
按照这个思路,那我们打开hadoop-src吧
2.1 通过Java代码先模拟创建目录的操作
现在通过一段Java代码来创建目录
public static void main(String[] args) throws IOException { Configuration configuration=new Configuration(); FileSystem fileSystem=FileSystem.newInstance(configuration); //创建目录(分析的是元数据的管理流程) fileSystem.mkdirs(new Path("")); /** * TODO 分析HDFS上传文件的流程 * TODO 做一些重要的初始化工作 */ FSDataOutputStream fsous=fileSystem.create(new Path("/user.txt")); //TODO 完成上传文件的流程 fsous.write("showMeYourDream".getBytes()); }
2.2 mkdirs的操作
此时我们就懂了,这个mkdirs方法是被这个DistributedFileSystem(分布式文件系统)类给实现了,那我们就过去它那里去找找看吧
2.2.1 DistributedFileSystem的mkdirs
2.2.2 DFSClient的mkdirs
所以现在我们可以直接找出NameNode,好好看看它的mkdirs方法了
2.2.3 NameNode的mkdirs
继续点进去FSDirMkdirOp.mkdirs
2.3 FSDirectory和FSNameSystem的关系
首先是第一个关键词FSDirectory,这个是管理元数据的目录树(fsimage),元数据就是在NameNode的内存里面
而且这里还要注意,我们的元数据是有两份的,一份是在磁盘中的 fsimage + edit log,由FSNameSystem来进行管理,还有一份是内存里面的fsimage,由这个FSDirectory来管理
那这种东西我口说无凭啊,那我是怎么知道的,那当然还是hadoop的开发者告诉我的啦,点进去FSDirectory
而这个FSDirectory的对象是如何管理一个目录的呢,我们要通过FSDirectory的代码说明
2.4 FSDirectory的结构及目录树的生成流程
回到FSDirMkdirOp中的mkdirs
到现在为止,目录树fsimage就完成了更新。
流程总结
DistributedFileSystem实现了mkdirs方法,跳转到DFSClient,在DFSClient中看到了这个mkdirs其实是调用了Namenode的mkdirs,所以跳到Namenode的mkdirs,
fsimage其实就是一个目录树,我们的元数据是有两份的,一份是在磁盘中的 fsimage + edit log,由FSNameSystem来进行管理,还有一份是内存里面的fsimage,由这个FSDirectory来管理,还有就是目录树的每一个节点都为INode,这个INode有两种形态,INodeFIle代表文件,INodeDirectory代表文件夹,创建目录其实就是,先获取到集群原本已存在的目录的最后一个INode,我们称之为lastNode,然后通过一个for循环来将目录拼接到这个lastNode的末尾。
finally
字数算是较多,不过其实总得来说都不难理解。重要的位置基本都已经加粗了,也是希望对大家有所帮助。以后再被问到这俩问题,在明白这个套路的前提下,按照我们的总结过程简短地说出来即可。希望大家都能装的一手好B,hhh。
.
- 0