這篇文章主要介紹“MapReduce Map Join怎么使用”,在日常操作中,相信很多人在MapReduce Map Join怎么使用問題上存在疑惑,小編查閱了各式資料,整理出簡單好用的操作方法,希望對大家解答”MapReduce Map Join怎么使用”的疑惑有所幫助!接下來,請跟著小編一起來學習吧!
創新互聯是專業的綿陽網站建設公司,綿陽接單;提供網站建設、成都做網站,網頁設計,網站設計,建網站,PHP網站建設等專業做網站服務;采用PHP框架,可快速的進行綿陽網站開發網頁制作和功能擴展;專業做搜索引擎喜愛的網站,專業的做網站團隊,希望更多企業前來合作!
1. 樣例數據
011990-99999 SIHCCAJAVRI 012650-99999 TYNSET-HANSMOEN
012650-99999 194903241200 111 012650-99999 194903241800 78 011990-99999 195005150700 0 011990-99999 195005151200 22 011990-99999 195005151800 -11
2. 需求
3. 思路、代碼
將足夠小的關聯文件(即氣象臺信息)添加到分布式緩存,然后在每個 Mapper 端讀取被緩存到本地的全量氣象臺信息,再與天氣信息相關聯。
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
public class MapJoin {
static class RecordMapper extends Mapper<LongWritable, Text, Text, Text> {
private Map<String, String> stationMap = new HashMap<String, String>();
@Override
protected void setup(Context context) throws IOException, InterruptedException {
//預處理,把要關聯的文件加載到緩存中
Path[] paths = context.getLocalCacheFiles();
//新的檢索緩存文件的API是 context.getCacheFiles() ,而 context.getLocalCacheFiles() 被棄用
//然而 context.getCacheFiles() 返回的是 HDFS 路徑; context.getLocalCacheFiles() 返回的才是本地路徑
//這里只緩存了一個文件,所以取第一個即可
BufferedReader reader = new BufferedReader(new FileReader(paths[0].toString()));
String line = null;
try {
while ((line = reader.readLine()) != null) {
String[] vals = line.split("\\t");
if (vals.length == 2) {
stationMap.put(vals[0], vals[1]);
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
reader.close();
}
super.setup(context);
}
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] vals = value.toString().split("\\t");
if (vals.length == 3) {
String stationName = stationMap.get(vals[0]); //Join
stationName = stationName == null ? "" : stationName;
context.write(new Text(vals[0]),
new Text(stationName + "\t" + vals[1] + "\t" + vals[2]));
}
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length != 3) {
System.err.println("Parameter number is wrong, please enter three parameters:<ncdc input> <station input> <output>");
System.exit(-1);
}
Path inputPath = new Path(otherArgs[0]);
Path stationPath = new Path(otherArgs[1]);
Path outputPath = new Path(otherArgs[2]);
Job job = Job.getInstance(conf, "MapJoin");
job.setJarByClass(MapJoin.class);
FileInputFormat.addInputPath(job, inputPath);
FileOutputFormat.setOutputPath(job, outputPath);
job.addCacheFile(stationPath.toUri()); //添加緩存文件,可添加多個
job.setMapperClass(RecordMapper.class);
job.setMapOutputKeyClass(Text.class);
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}4. 運行結果
到此,關于“MapReduce Map Join怎么使用”的學習就結束了,希望能夠解決大家的疑惑。理論與實踐的搭配能更好的幫助大家學習,快去試試吧!若想繼續學習更多相關知識,請繼續關注創新互聯網站,小編會繼續努力為大家帶來更多實用的文章!
當前題目:MapReduceMapJoin怎么使用
URL標題:http://www.yijiale78.com/article46/pjddhg.html
成都網站建設公司_創新互聯,為您提供域名注冊、網站設計公司、關鍵詞優化、虛擬主機、全網營銷推廣、用戶體驗
聲明:本網站發布的內容(圖片、視頻和文字)以用戶投稿、用戶轉載內容為主,如果涉及侵權請盡快告知,我們將會在第一時間刪除。文章觀點不代表本網站立場,如需處理請聯系客服。電話:028-86922220;郵箱:631063699@qq.com。內容未經允許不得轉載,或轉載時需注明來源: 創新互聯