侧边栏壁纸
博主头像
落叶人生博主等级

走进秋风,寻找秋天的落叶

  • 累计撰写 130562 篇文章
  • 累计创建 28 个标签
  • 累计收到 9 条评论
标签搜索

目 录CONTENT

文章目录

自定义 hadoop MapReduce InputFormat 切分输入文件

2022-06-21 星期二 / 0 评论 / 0 点赞 / 83 阅读 / 17396 字

在上一篇中,我们实现了按 cookieId 和 time 进行二次排序,现在又有新问题:假如我需要按 cookieId 和cookieId&time 的组合进行分析呢?此时最好的办法是自定义Input

在上一篇中,我们实现了按 cookieId 和 time 进行二次排序,现在又有新问题:假如我需要按 cookieId 和 cookieId&time 的组合进行分析呢?此时最好的办法是自定义 InputFormat,让 mapreduce 一次读取一个 cookieId 下的所有记录,然后再按 time 进行切分 session,逻辑伪码如下:

for OneSplit in MyInputFormat.getSplit() // OneSplit 是某个 cookieId 下的所有记录

    for session in OneSplit // session 是按 time 把 OneSplit 进行了二次分割

        for line in session // line 是 session 中的每条记录,对应原始日志的某条记录

1、原理:

. InputFormat是MapReduce中一个很常用的概念,它在程序的运行中到底起到了什么作用呢? . . InputFormat其实是一个接口,包含了两个方法: .

public interface InputFormat<K, V> {
  InputSplit[] getSplits(JobConf job, int numSplits) throws IOException;

  RecordReader<K, V> createRecordReader(InputSplit split, 

                                  TaskAttemptContext context)  throws IOException;

}

. 这两个方法有分别完成着以下工作: . . .       方法 getSplits 将输入数据切分成splits,splits的个数即为map tasks的个数,splits的大小默认为块大小,即64M . .      方法 getRecordReader 将每个 split  解析成records, 再依次将record解析成<K,V>对 . 也就是说 InputFormat完成以下工作: . .  InputFile -->  splits  -->  <K,V> . .
. . 系统常用的  InputFormat 又有哪些呢? . .                        . . 其中Text InputFormat便是最常用的,它的 <K,V>就代表 <行偏移,该行内容> . .
. . 然而系统所提供的这几种固定的将  InputFile转换为 <K,V>的方式有时候并不能满足我们的需求: . . 此时需要我们自定义   InputFormat ,从而使Hadoop框架按照我们预设的方式来将 . . InputFile解析为<K,V> . . 在领会自定义   InputFormat 之前,需要弄懂一下几个抽象类、接口及其之间的关系: . .
. . InputFormat(interface), FileInputFormat(abstract class), TextInputFormat(class), . RecordReader (interface), Line RecordReader(class)的关系 . .       FileInputFormat implements  InputFormat . .       TextInputFormat extends  FileInputFormat . .       TextInputFormat.get RecordReader calls  Line RecordReader . .       Line RecordReader  implements  RecordReader . . .
. . 对于InputFormat接口,上面已经有详细的描述 . . 再看看 FileInputFormat,它实现了 InputFormat接口中的 getSplits方法,而将 getRecordReader与isSplitable留给具体类(如 TextInputFormat )实现, isSplitable方法通常不用修改,所以只需要在自定义的 InputFormat中实现 . . . getRecordReader方法即可,而该方法的核心是调用 Line RecordReader(即由LineRecorderReader类来实现 " 将每个s plit解析成records, 再依次将record解析成<K,V>对" ),该方法实现了接口RecordReader . .
. .   public interface RecordReader<K, V> { .   boolean   next(K key, V value) throws IOException;
  K   createKey();
  V   createValue();
  long   getPos() throws IOException;
  public void   close() throws IOException;
  float   getProgress() throws IOException;
} . .
. .      因此自定义InputFormat的核心是自定义一个实现接口RecordReader类似于LineRecordReader的类,该类的核心也正是重写接口RecordReader中的几大方法, . .      定义一个InputFormat的核心是定义一个类似于LineRecordReader的,自己的RecordReader .


2、代码:

package MyInputFormat;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.io.compress.CompressionCodec;import org.apache.hadoop.io.compress.CompressionCodecFactory;import org.apache.hadoop.mapreduce.InputSplit;import org.apache.hadoop.mapreduce.JobContext;import org.apache.hadoop.mapreduce.RecordReader;import org.apache.hadoop.mapreduce.TaskAttemptContext;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;public class TrackInputFormat extends FileInputFormat<LongWritable, Text> {	@SuppressWarnings("deprecation")	@Override	public RecordReader<LongWritable, Text> createRecordReader(			InputSplit split, TaskAttemptContext context) {		return new TrackRecordReader();	}	@Override	protected boolean isSplitable(JobContext context, Path file) {		CompressionCodec codec = new CompressionCodecFactory(				context.getConfiguration()).getCodec(file);		return codec == null;	}}

package MyInputFormat;import java.io.IOException;import java.io.InputStream;import org.apache.commons.logging.Log;import org.apache.commons.logging.LogFactory;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FSDataInputStream;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.io.compress.CompressionCodec;import org.apache.hadoop.io.compress.CompressionCodecFactory;import org.apache.hadoop.mapreduce.InputSplit;import org.apache.hadoop.mapreduce.RecordReader;import org.apache.hadoop.mapreduce.TaskAttemptContext;import org.apache.hadoop.mapreduce.lib.input.FileSplit;/** * Treats keys as offset in file and value as line. *  * @deprecated Use *             {@link org.apache.hadoop.mapreduce.lib.input.LineRecordReader} *             instead. */public class TrackRecordReader extends RecordReader<LongWritable, Text> {	private static final Log LOG = LogFactory.getLog(TrackRecordReader.class);	private CompressionCodecFactory compressionCodecs = null;	private long start;	private long pos;	private long end;	private NewLineReader in;	private int maxLineLength;	private LongWritable key = null;	private Text value = null;	// ----------------------	// 行分隔符,即一条记录的分隔符	private byte[] separator = "END/n".getBytes();	// --------------------	public void initialize(InputSplit genericSplit, TaskAttemptContext context)			throws IOException {		FileSplit split = (FileSplit) genericSplit;		Configuration job = context.getConfiguration();		this.maxLineLength = job.getInt("mapred.linerecordreader.maxlength",				Integer.MAX_VALUE);		start = split.getStart();		end = start + split.getLength();		final Path file = split.getPath();		compressionCodecs = new CompressionCodecFactory(job);		final CompressionCodec codec = compressionCodecs.getCodec(file);		FileSystem fs = file.getFileSystem(job);		FSDataInputStream fileIn = fs.open(split.getPath());		boolean skipFirstLine = false;		if (codec != null) {			in = new NewLineReader(codec.createInputStream(fileIn), job);			end = Long.MAX_VALUE;		} else {			if (start != 0) {				skipFirstLine = true;				this.start -= separator.length;//				// --start;				fileIn.seek(start);			}			in = new NewLineReader(fileIn, job);		}		if (skipFirstLine) { // skip first line and re-establish "start".			start += in.readLine(new Text(), 0,					(int) Math.min((long) Integer.MAX_VALUE, end - start));		}		this.pos = start;	}	public boolean nextKeyValue() throws IOException {		if (key == null) {			key = new LongWritable();		}		key.set(pos);		if (value == null) {			value = new Text();		}		int newSize = 0;		while (pos < end) {			newSize = in.readLine(value, maxLineLength,					Math.max((int) Math.min(Integer.MAX_VALUE, end - pos),							maxLineLength));			if (newSize == 0) {				break;			}			pos += newSize;			if (newSize < maxLineLength) {				break;			}			LOG.info("Skipped line of size " + newSize + " at pos "					+ (pos - newSize));		}		if (newSize == 0) {			key = null;			value = null;			return false;		} else {			return true;		}	}	@Override	public LongWritable getCurrentKey() {		return key;	}	@Override	public Text getCurrentValue() {		return value;	}	/**	 * Get the progress within the split	 */	public float getProgress() {		if (start == end) {			return 0.0f;		} else {			return Math.min(1.0f, (pos - start) / (float) (end - start));		}	}	public synchronized void close() throws IOException {		if (in != null) {			in.close();		}	}	public class NewLineReader {		private static final int DEFAULT_BUFFER_SIZE = 64 * 1024;		private int bufferSize = DEFAULT_BUFFER_SIZE;		private InputStream in;		private byte[] buffer;		private int bufferLength = 0;		private int bufferPosn = 0;		public NewLineReader(InputStream in) {			this(in, DEFAULT_BUFFER_SIZE);		}		public NewLineReader(InputStream in, int bufferSize) {			this.in = in;			this.bufferSize = bufferSize;			this.buffer = new byte[this.bufferSize];		}		public NewLineReader(InputStream in, Configuration conf)				throws IOException {			this(in, conf.getInt("io.file.buffer.size", DEFAULT_BUFFER_SIZE));		}		public void close() throws IOException {			in.close();		}		public int readLine(Text str, int maxLineLength, int maxBytesToConsume)				throws IOException {			str.clear();			Text record = new Text();			int txtLength = 0;			long bytesConsumed = 0L;			boolean newline = false;			int sepPosn = 0;			do {				// 已经读到buffer的末尾了,读下一个buffer				if (this.bufferPosn >= this.bufferLength) {					bufferPosn = 0;					bufferLength = in.read(buffer);					// 读到文件末尾了,则跳出,进行下一个文件的读取					if (bufferLength <= 0) {						break;					}				}				int startPosn = this.bufferPosn;				for (; bufferPosn < bufferLength; bufferPosn++) {					// 处理上一个buffer的尾巴被切成了两半的分隔符(如果分隔符中重复字符过多在这里会有问题)					if (sepPosn > 0 && buffer[bufferPosn] != separator[sepPosn]) {						sepPosn = 0;					}					// 遇到行分隔符的第一个字符					if (buffer[bufferPosn] == separator[sepPosn]) {						bufferPosn++;						int i = 0;						// 判断接下来的字符是否也是行分隔符中的字符						for (++sepPosn; sepPosn < separator.length; i++, sepPosn++) {							// buffer的最后刚好是分隔符,且分隔符被不幸地切成了两半							if (bufferPosn + i >= bufferLength) {								bufferPosn += i - 1;								break;							}							// 一旦其中有一个字符不相同,就判定为不是分隔符							if (this.buffer[this.bufferPosn + i] != separator[sepPosn]) {								sepPosn = 0;								break;							}						}						// 的确遇到了行分隔符						if (sepPosn == separator.length) {							bufferPosn += i;							newline = true;							sepPosn = 0;							break;						}					}				}				int readLength = this.bufferPosn - startPosn;				bytesConsumed += readLength;				// 行分隔符不放入块中				if (readLength > maxLineLength - txtLength) {					readLength = maxLineLength - txtLength;				}				if (readLength > 0) {					record.append(this.buffer, startPosn, readLength);					txtLength += readLength;					// 去掉记录的分隔符					if (newline) {						str.set(record.getBytes(), 0, record.getLength()								- separator.length);					}				}			} while (!newline && (bytesConsumed < maxBytesToConsume));			if (bytesConsumed > (long) Integer.MAX_VALUE) {				throw new IOException("Too many bytes before newline: "						+ bytesConsumed);			}			return (int) bytesConsumed;		}		public int readLine(Text str, int maxLineLength) throws IOException {			return readLine(str, maxLineLength, Integer.MAX_VALUE);		}		public int readLine(Text str) throws IOException {			return readLine(str, Integer.MAX_VALUE, Integer.MAX_VALUE);		}	}}

package MyInputFormat;import java.io.IOException;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;public class TestMyInputFormat {	public static class MapperClass extends Mapper<LongWritable, Text, Text, Text> {		public void map(LongWritable key, Text value, Context context) throws IOException,				InterruptedException {			System.out.println("key:/t " + key);			System.out.println("value:/t " + value);			System.out.println("-------------------------");		}	}	public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {		Configuration conf = new Configuration();		 Path outPath = new Path("/hive/11");		 FileSystem.get(conf).delete(outPath, true);		Job job = new Job(conf, "TestMyInputFormat");		job.setInputFormatClass(TrackInputFormat.class);		job.setJarByClass(TestMyInputFormat.class);		job.setMapperClass(TestMyInputFormat.MapperClass.class);		job.setNumReduceTasks(0);		job.setMapOutputKeyClass(Text.class);		job.setMapOutputValueClass(Text.class);		FileInputFormat.addInputPath(job, new Path(args[0]));		org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.setOutputPath(job, outPath);		System.exit(job.waitForCompletion(true) ? 0 : 1);	}}

3、测试数据:

  cookieId    time     url                 cookieOverFlag

1       a        1_hao1231       a        1_baidu1       b        1_google       2END2       c        2_google2       c        2_hao1232       c        2_google       1END3       a        3_baidu3       a        3_sougou3       b        3_soso         2END

4、结果:

key:	 0value:	 1	a	1_hao123	1	a	 1_baidu	1	b	 1_google	2-------------------------key:	 47value:	 2	c	 2_google	2	c	 2_hao123	2	c	 2_google	1-------------------------key:	 96value:	 3	a	 3_baidu	3	a	 3_sougou	3	b	 3_soso	2-------------------------

REF:

自定义hadoop map/reduce输入文件切割InputFormat

http://hi.baidu.com/lzpsky/item/0d9d84c05afb43ba0c0a7b27

MapReduce高级编程之自定义InputFormat

http://datamining.xmu.edu.cn/bbs/home.php?mod=space&uid=91&do=blog&id=190

http://irwenqiang.iteye.com/blog/1448164

广告 广告

评论区