魯春利的工作筆記,誰說程序員不能有文藝范?

創(chuàng)新互聯(lián)專注于企業(yè)成都全網(wǎng)營銷推廣、網(wǎng)站重做改版、硚口網(wǎng)站定制設(shè)計(jì)、自適應(yīng)品牌網(wǎng)站建設(shè)、H5高端網(wǎng)站建設(shè)、商城網(wǎng)站建設(shè)、集團(tuán)公司官網(wǎng)建設(shè)、成都外貿(mào)網(wǎng)站建設(shè)公司、高端網(wǎng)站制作、響應(yīng)式網(wǎng)頁設(shè)計(jì)等建站業(yè)務(wù),價格優(yōu)惠性價比高,為硚口等各大城市提供網(wǎng)站開發(fā)制作服務(wù)。
MapReduce提供了許多默認(rèn)的輸出格式,如TextOutputFormat、KeyValueOutputFormat等。MapReduce中輸出文件的個數(shù)與Reduce的個數(shù)一致,默認(rèn)情況下有一個Reduce,輸出只有一個文件,文件名為part-r-00000,文件內(nèi)容的行數(shù)與map輸出中不同key的個數(shù)一致。如果有兩個Reduce,輸出的結(jié)果就有兩個文件,第一個為part-r-00000,第二個為part-r-00001,依次類推。
MapReduce中默認(rèn)實(shí)現(xiàn)輸出功能的類是TextOutputFormat,它主要用來將文本數(shù)據(jù)輸出到HDFS上。
public class TextOutputFormat<K, V> extends FileOutputFormat<K, V> {
public static String SEPERATOR = "mapreduce.output.textoutputformat.separator";
// 定義了內(nèi)部類用來實(shí)現(xiàn)輸出,換行符為\n,分隔符為\t(可以通過參數(shù)修改)
protected static class LineRecordWriter<K, V> extends RecordWriter<K, V> {
public LineRecordWriter(DataOutputStream out) { // 實(shí)際為FSDataOutputStream
this(out, "\t");
}
/** 主要的結(jié)構(gòu)就是兩個方法:write和close **/
public synchronized void write(K key, V value)throws IOException {
boolean nullKey = key == null || key instanceof NullWritable;
boolean nullValue = value == null || value instanceof NullWritable;
if (nullKey && nullValue) {
return;
}
if (!nullKey) {
writeObject(key); // 將Text類型數(shù)據(jù)處理成字節(jié)數(shù)組
}
if (!(nullKey || nullValue)) {
out.write(keyValueSeparator);
}
if (!nullValue) {
writeObject(value);
}
out.write(newline); // 換行(newline = "\n".getBytes(utf8);)
}
public synchronized void close(TaskAttemptContext context) throws IOException {
out.close();
}
}
// 內(nèi)部類定義結(jié)束,下面為TextOutputFormat唯一的關(guān)鍵方法
public RecordWriter<K, V> getRecordWriter(TaskAttemptContext job)
throws IOException, InterruptedException {
// 1、根據(jù)Configuration判定是否需要壓縮,若需要壓縮獲取壓縮格式及后綴;
// 2. 獲取需要生成的文件路徑,getDefaultWorkFile(job, extension)
// 3. 根據(jù)文件生成FSDataOutputStream對象,并return new LineRecordWriter。
Configuration conf = job.getConfiguration();
boolean isCompressed = getCompressOutput(job);
String keyValueSeparator= conf.get(SEPERATOR, "\t");
CompressionCodec codec = null;
String extension = "";
if (isCompressed) { // 如果是壓縮,則根據(jù)壓縮獲取擴(kuò)展名
Class<? extends CompressionCodec> codecClass = getOutputCompressorClass(job, GzipCodec.class);
codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, conf);
extension = codec.getDefaultExtension();
}
// getDefaultWorkFile用來獲取保存輸出數(shù)據(jù)的文件名,由FileOutputFormat類實(shí)現(xiàn)
Path file = getDefaultWorkFile(job, extension);
FileSystem fs = file.getFileSystem(conf);
// 獲取writer對象
if (!isCompressed) {
FSDataOutputStream fileOut = fs.create(file, false);
return new LineRecordWriter<K, V>(fileOut, keyValueSeparator);
} else {
FSDataOutputStream fileOut = fs.create(file, false);
DataOutputStream dataOut = new DataOutputStream(codec.createOutputStream(fileOut));
return new LineRecordWriter<K, V>(dataOut, keyValueSeparator);
}
}
}通過TextFileOutput類分析出具體需要將數(shù)據(jù)保存到HDFS的什么位置上,是通過FileOutputFormat類的getDefaultWorkFile方法來獲取的。實(shí)際上對于MapReduce中所有的輸出都需要繼承OutputFormat,先看一下OutputFormat的類定義。
/**
* OutputFormat定義了Map-Reduce作業(yè)的輸出規(guī)范,如:
* 1、校驗(yàn),如指定的輸出目錄是否存在,輸出的空間是否足夠大;
* 2、指定RecordWriter來將MapReduce的輸出寫入到FileSystem(一般為HDFS);
*/
public abstract class OutputFormat<K, V> {
// 獲取與當(dāng)前task相關(guān)聯(lián)的RecordWriter對象
public abstract RecordWriter<K, V> getRecordWriter(TaskAttemptContext context)
throws IOException, InterruptedException;
// 當(dāng)提交job時檢查當(dāng)前job的輸出規(guī)范是否有效,如輸出目錄是否已存在等
public abstract void checkOutputSpecs(JobContext context)
throws IOException, InterruptedException;
// Get the output committer for this output format.
// This is responsible for ensuring the output is committed correctly.
public abstract OutputCommitter getOutputCommitter(TaskAttemptContext context)
throws IOException, InterruptedException;
}在TextOutputFormat中實(shí)現(xiàn)了getRecordWriter,而TextOutputFormat的是FileOutputFormat的子類,而FileOutputFormat是的子類。
/** 用來實(shí)現(xiàn)寫數(shù)據(jù)到HDFS的OutputFormat的基類 **/
public abstract class FileOutputFormat<K, V> extends OutputFormat<K, V> {
/** 當(dāng)有多個分區(qū)時,會有多個輸出文件,通過NUMBER_FORMAT定義輸出文件編號,如part-r-00000,00001等。 **/
private static final NumberFormat NUMBER_FORMAT = NumberFormat.getInstance();
/** 默認(rèn)的輸出文件為part開頭的,可以通過該參數(shù)給指定一個輸出的文件名 **/
protected static final String BASE_OUTPUT_NAME = "mapreduce.output.basename";
protected static final String PART = "part";
static {
NUMBER_FORMAT.setMinimumIntegerDigits(5);
NUMBER_FORMAT.setGroupingUsed(false);
}
// 對MapReduce的輸出可以指定是否壓縮及壓縮形式,通過配置文件mapred-site.xml進(jìn)行配置
// 默認(rèn)為false
public static final String COMPRESS ="mapreduce.output.fileoutputformat.compress";
// 默認(rèn)為org.apache.hadoop.io.compress.DefaultCodec
public static final String COMPRESS_CODEC = "mapreduce.output.fileoutputformat.compress.codec";
// 默認(rèn)為RECORD,針對每行記錄進(jìn)行壓縮。如果設(shè)置為BLOCK,針對一組記錄進(jìn)行壓縮。
public static final String COMPRESS_TYPE = "mapreduce.output.fileoutputformat.compress.type";
// 設(shè)置map-reduce job的輸出目錄
public static void setOutputPath(Job job, Path outputDir) {
try {
outputDir = outputDir.getFileSystem(job.getConfiguration()).makeQualified(outputDir);
} catch (IOException e) {
// Throw the IOException as a RuntimeException to be compatible with MR1
throw new RuntimeException(e);
}
job.getConfiguration().set(FileOutputFormat.OUTDIR, outputDir.toString());
}
// 進(jìn)行check檢查
public void checkOutputSpecs(JobContext job) throws FileAlreadyExistsException, IOException{
// 1. 判定是否設(shè)定了輸出目錄(FileOutputFormat.setOutputPath);
// 2. 判定輸出目錄是否存在(需指定空目錄)。
}
// 獲取輸出的committer對象,MRv2引入的,以允許用戶自己定制合適的OutputCommitter實(shí)現(xiàn)
public synchronized OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException {
if (committer == null) {
Path output = getOutputPath(context);
committer = new FileOutputCommitter(output, context);
}
return committer;
}
// 獲取當(dāng)前output format對應(yīng)的默認(rèn)輸出路徑和文件名
public Path getDefaultWorkFile(TaskAttemptContext context, String extension) throws IOException{
FileOutputCommitter committer = (FileOutputCommitter) getOutputCommitter(context);
return new Path(committer.getWorkPath(), getUniqueFile(context, getOutputName(context), extension));
}
/**
* Generate a unique filename, based on the task id, name, and extension
* 獲取文件名,如part-r-00000,00001等
* @param context the task that is calling this
* @param name the base filename
* @param extension the filename extension
* @return a string like $name-[mrsct]-$id$extension
*/
public synchronized static String getUniqueFile(TaskAttemptContext context, String name, String extension) {
TaskID taskId = context.getTaskAttemptID().getTaskID();
int partition = taskId.getId();
StringBuilder result = new StringBuilder();
result.append(name);
result.append('-');
result.append(TaskID.getRepresentingCharacter(taskId.getTaskType()));
result.append('-');
result.append(NUMBER_FORMAT.format(partition));
result.append(extension);
return result.toString();
}
}任務(wù)的類型是通過類org.apache.hadoop.mapreduce.TaskID$CharTaskTypeMaps獲取
static String allTaskTypes = "(m|r|s|c|t)";
static {
setupTaskTypeToCharMapping();
setupCharToTaskTypeMapping();
}
private static void setupTaskTypeToCharMapping() {
typeToCharMap.put(TaskType.MAP, 'm');
typeToCharMap.put(TaskType.REDUCE, 'r');
typeToCharMap.put(TaskType.JOB_SETUP, 's');
typeToCharMap.put(TaskType.JOB_CLEANUP, 'c');
typeToCharMap.put(TaskType.TASK_CLEANUP, 't');
}
private static void setupCharToTaskTypeMapping() {
charToTypeMap.put('m', TaskType.MAP);
charToTypeMap.put('r', TaskType.REDUCE);
charToTypeMap.put('s', TaskType.JOB_SETUP);
charToTypeMap.put('c', TaskType.JOB_CLEANUP);
charToTypeMap.put('t', TaskType.TASK_CLEANUP);
}
// 獲取part-r-00000中間的那個r
static char getRepresentingCharacter(TaskType type) {
return typeToCharMap.get(type);
}應(yīng)用示例:把首字母相同的單詞放到一個文件里面
輸入文件內(nèi)容:
[hadoop@nnode code]$ [hadoop@nnode code]$ hdfs dfs -ls /data Found 2 items -rw-r--r-- 1 hadoop hadoop 47 2015-06-09 17:59 /data/file1.txt -rw-r--r-- 2 hadoop hadoop 36 2015-06-09 17:59 /data/file2.txt [hadoop@nnode code]$ hdfs dfs -text /data/file1.txt hello world hello markhuang hello hadoop [hadoop@nnode code]$ hdfs dfs -text /data/file2.txt hadoop ok hadoop fail hadoop 2.3 [hadoop@nnode code]$
自定義OutputFormat:
package com.lucl.hadoop.mapreduce.multiple;
import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.ReflectionUtils;
/**
* @author luchunli
* @description 自定義OutputFormat,這里繼承TextOutputFormat,避免了自己實(shí)現(xiàn)OutputCommitter,<br/>
* MapReduce中key要求為WritableComparable類型的,value要求為Writable類型的.
*/
public class MultipleOutputFormat<K extends WritableComparable<?>, V extends Writable>
extends TextOutputFormat<K, V> {
/**
* OutputFormat通過獲取Writer對象,將數(shù)據(jù)輸出到指定目錄特定名稱的文件中。
*/
private MultipleRecordWriter writer = null;
// 在TextOutputFormat實(shí)現(xiàn)的時候?qū)τ诿恳粋€map或task任務(wù)都有一個唯一的標(biāo)識,通過TaskID來控制,
// 其在輸出時文件名是固定的,每一個輸出文件對應(yīng)一個LineRecordWriter,取其輸出流對象(FSDataOutputStream),
// 在輸出時通過輸出流對象實(shí)現(xiàn)數(shù)據(jù)輸出。
//
// 但是在這里實(shí)現(xiàn)的時候,實(shí)際上是要求對于一個task任務(wù),將它需要輸出的數(shù)據(jù)寫入多個文件,文件是不固定的;
// 因此在每次輸出的時候判定對應(yīng)的文件是否已經(jīng)有Writer對象,若有則通過該對象繼續(xù)輸出,否則創(chuàng)建新的。
@Override
public RecordWriter<K, V> getRecordWriter(TaskAttemptContext context)
throws IOException, InterruptedException {
if (null == writer) {
writer = new MultipleRecordWriter(context, this.getTaskOutputPath(context));
}
return writer;
}
// 獲取任務(wù)的輸出路徑,仍然采用從committer中獲取,TaskAttemptContext封裝了task的上下文,后續(xù)分析。
// 在TextOutputFormat中是通過調(diào)用父類(FileOutputFormat)的getDefaultWorkFile來實(shí)現(xiàn)的,
// 而getDefaultWorkFile中獲取MapReduce定義的默認(rèn)的文件名,如需要自定義文件名,需自己實(shí)現(xiàn)
private Path getTaskOutputPath(TaskAttemptContext context) throws IOException {
Path workPath = null;
OutputCommitter committer = super.getOutputCommitter(context);
if (committer instanceof FileOutputCommitter) {
// Get the directory that the task should write results into.
workPath = ((FileOutputCommitter) committer).getWorkPath();
} else {
// Get the {@link Path} to the output directory for the map-reduce job.
// context.getConfiguration().get(FileOutputFormat.OUTDIR);
Path outputPath = super.getOutputPath(context);
if (null == outputPath) {
throw new IOException("Undefined job output-path.");
}
workPath = outputPath;
}
return workPath;
}
/**
* @author luchunli
* @description 自定義RecordWriter, MapReduce的TextOutputFormat的LineRecordWriter也是內(nèi)部類,這里參照其實(shí)現(xiàn)方式
*/
public class MultipleRecordWriter extends RecordWriter<K, V> {
/** RecordWriter的緩存 **/
private HashMap<String, RecordWriter<K, V>> recordWriters = null;
private TaskAttemptContext context;
/** 輸出目錄 **/
private Path workPath = null;
public MultipleRecordWriter () {}
public MultipleRecordWriter(TaskAttemptContext context, Path path) {
super();
this.context = context;
this.workPath = path;
this.recordWriters = new HashMap<String, RecordWriter<K, V>>();
}
@Override
public void write(K key, V value) throws IOException, InterruptedException {
String baseName = generateFileNameForKeyValue (key, value, this.context.getConfiguration());
RecordWriter<K, V> rw = this.recordWriters.get(baseName);
if (null == rw) {
rw = this.getBaseRecordWriter(context, baseName);
this.recordWriters.put(baseName, rw);
}
// 這里實(shí)際仍然為通過LineRecordWriter來實(shí)現(xiàn)的
rw.write(key, value);
}
// 通過MultipleRecordWriter對LineRecordWriter進(jìn)行了封裝,對于同一個task在輸出的時候進(jìn)行了拆分
// 在MapReduce實(shí)現(xiàn)中,默認(rèn)情況下只有一個reduce(Reduce的數(shù)量分區(qū)部分分析),根據(jù)之前的示例所有的輸出都將寫入到part-r-00000的文件中,
// 這里所做的工作就是屏蔽了到part-r-00000的輸出,而是將同一個reduce的數(shù)據(jù)拆分為多個文件。
private RecordWriter<K, V> getBaseRecordWriter(TaskAttemptContext context, String baseName) throws IOException {
Configuration conf = context.getConfiguration();
boolean isCompressed = getCompressOutput(context);
// 在LineRecordWriter的實(shí)現(xiàn)中,分隔符是通過變量如下方式指定的:
// public static String SEPERATOR = "mapreduce.output.textoutputformat.separator";
// String keyValueSeparator= conf.get(SEPERATOR, "\t");
// 這里給了個逗號作為分割
String keyValueSeparator = ",";
RecordWriter<K, V> rw = null;
if (isCompressed) {
Class<? extends CompressionCodec> codecClass = getOutputCompressorClass(context, GzipCodec.class);
CompressionCodec codec = ReflectionUtils.newInstance(codecClass, conf);
Path file = new Path(workPath, baseName + codec.getDefaultExtension());
FSDataOutputStream out = file.getFileSystem(conf).create(file, false);
rw = new LineRecordWriter<>(out, keyValueSeparator);
} else {
Path file = new Path(workPath, baseName);
FSDataOutputStream out = file.getFileSystem(conf).create(file, false);
rw = new LineRecordWriter<>(out, keyValueSeparator);
}
return rw;
}
@Override
public void close(TaskAttemptContext context) throws IOException, InterruptedException {
Iterator<RecordWriter<K, V>> it = this.recordWriters.values().iterator();
while (it.hasNext()) {
RecordWriter<K, V> rw = it.next();
rw.close(context);
}
this.recordWriters.clear();
}
/** 獲取生成的文件的后綴名 **/
private String generateFileNameForKeyValue(K key, V value, Configuration configuration) {
char c = key.toString().toLowerCase().charAt(0);
if (c >= 'a' && c <= 'z') {
return c + ".txt";
}
return "other.txt";
}
}
}實(shí)現(xiàn)Mapper
package com.lucl.hadoop.mapreduce.multiple;
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
/**
* @author luchunli
* @description 自定義Mapper
*/
public class TokenizerMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
private static final IntWritable one = new IntWritable(1);
private Text text = new Text();
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
StringTokenizer token = new StringTokenizer(value.toString());
while (token.hasMoreTokens()) {
String word = token.nextToken();
text.set(word);
context.write(text, one);
}
}
}實(shí)現(xiàn)Reducer
package com.lucl.hadoop.mapreduce.multiple;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
/**
* @author luchunli
* @description 自定義Reducer
*/
public class TokenizerReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
@Override
protected void reduce(Text key, Iterable<IntWritable> value, Context context)
throws IOException, InterruptedException {
int sum = 0;
for (IntWritable intWritable : value) {
sum += intWritable.get();
}
context.write(key, new IntWritable(sum));
}
}實(shí)現(xiàn)Driver
package com.lucl.hadoop.mapreduce.multiple;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
/**
* @author luchunli
* @description 驅(qū)動類
*/
public class MultipleWorkCount extends Configured implements Tool {
public static void main(String[] args) {
try {
ToolRunner.run(new MultipleWorkCount(), args);
} catch (Exception e) {
e.printStackTrace();
}
}
@Override
public int run(String[] args) throws Exception {
Job job = Job.getInstance(this.getConf(), this.getClass().getSimpleName());
job.setJarByClass(MultipleWorkCount.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
job.setMapperClass(TokenizerMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setReducerClass(TokenizerReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputKeyClass(IntWritable.class);
job.setOutputFormatClass(MultipleOutputFormat.class);
FileOutputFormat.setOutputPath(job, new Path(args[1]));
return job.waitForCompletion(true) ? 0 : 1;
}
}調(diào)用執(zhí)行
[hadoop@nnode code]$ hadoop jar MultipleMR.jar /data /2015120500010 15/12/05 16:45:54 INFO client.RMProxy: Connecting to ResourceManager at nnode/192.168.137.117:8032 15/12/05 16:45:55 INFO input.FileInputFormat: Total input paths to process : 2 15/12/05 16:45:55 INFO mapreduce.JobSubmitter: number of splits:2 15/12/05 16:45:55 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1449302623953_0004 15/12/05 16:45:56 INFO impl.YarnClientImpl: Submitted application application_1449302623953_0004 15/12/05 16:45:56 INFO mapreduce.Job: The url to track the job: http://nnode:8088/proxy/application_1449302623953_0004/ 15/12/05 16:45:56 INFO mapreduce.Job: Running job: job_1449302623953_0004 15/12/05 16:46:27 INFO mapreduce.Job: Job job_1449302623953_0004 running in uber mode : false 15/12/05 16:46:27 INFO mapreduce.Job: map 0% reduce 0% 15/12/05 16:46:56 INFO mapreduce.Job: map 50% reduce 0% 15/12/05 16:46:58 INFO mapreduce.Job: map 100% reduce 0% 15/12/05 16:47:16 INFO mapreduce.Job: map 100% reduce 100% 15/12/05 16:47:18 INFO mapreduce.Job: Job job_1449302623953_0004 completed successfully 15/12/05 16:47:18 INFO mapreduce.Job: Counters: 49 File System Counters FILE: Number of bytes read=152 FILE: Number of bytes written=323517 FILE: Number of read operations=0 FILE: Number of large read operations=0 FILE: Number of write operations=0 HDFS: Number of bytes read=271 HDFS: Number of bytes written=55 HDFS: Number of read operations=9 HDFS: Number of large read operations=0 HDFS: Number of write operations=7 Job Counters Launched map tasks=2 Launched reduce tasks=1 Data-local map tasks=2 Total time spent by all maps in occupied slots (ms)=58249 Total time spent by all reduces in occupied slots (ms)=17197 Total time spent by all map tasks (ms)=58249 Total time spent by all reduce tasks (ms)=17197 Total vcore-seconds taken by all map tasks=58249 Total vcore-seconds taken by all reduce tasks=17197 Total megabyte-seconds taken by all map tasks=59646976 Total megabyte-seconds taken by all reduce tasks=17609728 Map-Reduce Framework Map input records=6 Map output records=12 Map output bytes=122 Map output materialized bytes=158 Input split bytes=188 Combine input records=0 Combine output records=0 Reduce input groups=7 Reduce shuffle bytes=158 Reduce input records=12 Reduce output records=7 Spilled Records=24 Shuffled Maps =2 Failed Shuffles=0 Merged Map outputs=2 GC time elapsed (ms)=313 CPU time spent (ms)=4770 Physical memory (bytes) snapshot=511684608 Virtual memory (bytes) snapshot=2545770496 Total committed heap usage (bytes)=257171456 Shuffle Errors BAD_ID=0 CONNECTION=0 IO_ERROR=0 WRONG_LENGTH=0 WRONG_MAP=0 WRONG_REDUCE=0 File Input Format Counters Bytes Read=83 File Output Format Counters Bytes Written=55 [hadoop@nnode code]$
查看輸出結(jié)果:
[hadoop@nnode code]$ hdfs dfs -ls /2015120500010 Found 7 items -rw-r--r-- 2 hadoop hadoop 0 2015-12-05 16:47 /2015120500010/_SUCCESS -rw-r--r-- 2 hadoop hadoop 7 2015-12-05 16:47 /2015120500010/f.txt -rw-r--r-- 2 hadoop hadoop 17 2015-12-05 16:47 /2015120500010/h.txt -rw-r--r-- 2 hadoop hadoop 12 2015-12-05 16:47 /2015120500010/m.txt -rw-r--r-- 2 hadoop hadoop 5 2015-12-05 16:47 /2015120500010/o.txt -rw-r--r-- 2 hadoop hadoop 6 2015-12-05 16:47 /2015120500010/other.txt -rw-r--r-- 2 hadoop hadoop 8 2015-12-05 16:47 /2015120500010/w.txt [hadoop@nnode code]$ hdfs dfs -text /2015120500010/h.txt hadoop,4 hello,3 [hadoop@nnode code]$ hdfs dfs -text /2015120500010/o.txt ok,1 [hadoop@nnode code]$ hdfs dfs -text /2015120500010/other.txt 2.3,1 [hadoop@nnode code]$
錯誤記錄:
1、java.lang.RuntimeException: java.lang.InstantiationException
[hadoop@nnode code]$ hadoop jar MultipleMR.jar /data /2015120500001 15/12/05 16:18:19 INFO client.RMProxy: Connecting to ResourceManager at nnode/192.168.137.117:8032 java.lang.RuntimeException: java.lang.InstantiationException at org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:131) at org.apache.hadoop.mapreduce.JobSubmitter.checkSpecs(JobSubmitter.java:559) at org.apache.hadoop.mapreduce.JobSubmitter.submitJobInternal(JobSubmitter.java:432) at org.apache.hadoop.mapreduce.Job$10.run(Job.java:1296) at org.apache.hadoop.mapreduce.Job$10.run(Job.java:1293) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:415) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1628) at org.apache.hadoop.mapreduce.Job.submit(Job.java:1293) at org.apache.hadoop.mapreduce.Job.waitForCompletion(Job.java:1314) at com.lucl.hadoop.mapreduce.multiple.MultipleWorkCount.run(MultipleWorkCount.java:49) at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:70) at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:84) at com.lucl.hadoop.mapreduce.multiple.MultipleWorkCount.main(MultipleWorkCount.java:22) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.hadoop.util.RunJar.run(RunJar.java:221) at org.apache.hadoop.util.RunJar.main(RunJar.java:136) Caused by: java.lang.InstantiationException at sun.reflect.InstantiationExceptionConstructorAccessorImpl.newInstance(InstantiationExceptionConstructorAccessorImpl.java:48) at java.lang.reflect.Constructor.newInstance(Constructor.java:526) at org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:129) ... 19 more [hadoop@nnode code]$
原因:
由于之前還有一個子類,在Driver中是通過子類定義輸出,后來感覺子類沒有必要,于是去掉了,但是MultipleOutputFormat類定義仍然為abstract MultipleOutputFormat,沒有把a(bǔ)bstract給注釋掉。
2、Error: java.io.IOException: Unable to initialize any output collector
[hadoop@nnode code]$ hadoop jar MultipleMR.jar /data /2015120500005 15/12/05 16:26:06 INFO client.RMProxy: Connecting to ResourceManager at nnode/192.168.137.117:8032 15/12/05 16:26:07 INFO input.FileInputFormat: Total input paths to process : 2 15/12/05 16:26:07 INFO mapreduce.JobSubmitter: number of splits:2 15/12/05 16:26:08 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1449302623953_0003 15/12/05 16:26:08 INFO impl.YarnClientImpl: Submitted application application_1449302623953_0003 15/12/05 16:26:08 INFO mapreduce.Job: The url to track the job: http://nnode:8088/proxy/application_1449302623953_0003/ 15/12/05 16:26:08 INFO mapreduce.Job: Running job: job_1449302623953_0003 15/12/05 16:26:43 INFO mapreduce.Job: Job job_1449302623953_0003 running in uber mode : false 15/12/05 16:26:43 INFO mapreduce.Job: map 0% reduce 0% 15/12/05 16:27:13 INFO mapreduce.Job: Task Id : attempt_1449302623953_0003_m_000000_0, Status : FAILED Error: java.io.IOException: Unable to initialize any output collector at org.apache.hadoop.mapred.MapTask.createSortingCollector(MapTask.java:412) at org.apache.hadoop.mapred.MapTask.access$100(MapTask.java:81) at org.apache.hadoop.mapred.MapTask$NewOutputCollector.<init>(MapTask.java:695) at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:767) at org.apache.hadoop.mapred.MapTask.run(MapTask.java:341) at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:163) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:415) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1628) at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:158) 15/12/05 16:27:13 INFO mapreduce.Job: Task Id : attempt_1449302623953_0003_m_000001_0, Status : FAILED Error: java.io.IOException: Unable to initialize any output collector at org.apache.hadoop.mapred.MapTask.createSortingCollector(MapTask.java:412) at org.apache.hadoop.mapred.MapTask.access$100(MapTask.java:81) at org.apache.hadoop.mapred.MapTask$NewOutputCollector.<init>(MapTask.java:695) at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:767) at org.apache.hadoop.mapred.MapTask.run(MapTask.java:341) at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:163) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:415) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1628) at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:158) ^C[hadoop@nnode code]$
原因:
Text引用錯了:com.sun.jersey.core.impl.provider.entity.XMLJAXBElementProvider.Text
正確的引用:org.apache.hadoop.io.Text
說明:
attempt_1449302623953_0003_m_000000_0
通過第二個錯誤信息能看到map task的命名規(guī)則:
// TaskAttemptID represents the immutable and unique identifier for a task attempt.
// Each task attempt is one particular instance of a Map or Reduce Task identified by TaskID.
// An example TaskAttemptID is : attempt_200707121733_0003_m_000005_0
// zeroth task attempt for the fifth map task in the third job running at the jobtracker started at 200707121733
public class TaskAttemptID extends org.apache.hadoop.mapred.ID {
protected static final String ATTEMPT = "attempt";
private TaskID taskId;
// ......
}
標(biāo)題名稱:Hadoop2.6.0學(xué)習(xí)筆記(六)TextOutputFormat及RecordWriter解析
網(wǎng)址分享:http://www.yijiale78.com/article22/pdsgcc.html
成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供外貿(mào)網(wǎng)站建設(shè)、定制開發(fā)、App開發(fā)、外貿(mào)建站、網(wǎng)站設(shè)計(jì)公司、ChatGPT
聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請盡快告知,我們將會在第一時間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時需注明來源: 創(chuàng)新互聯(lián)