fileinputformat不同mapreduce程序可以连续运行吗?比如说多个这样的程序,用上一个的输出作为下一个的输入,求

fileinputformat  时间:2021-06-08  阅读:()

mapreduce中reducenum数量对程序有什么影响

mapreduce中的reduce数量是由什么来进行控制的呢? 1、numReduceTasks 如下是用来进行测试的一段wordcount的代码 import java.io.IOException; import java.util.StringTokenizer; .apache.hadoop.fs.Path; .apache.hadoop.io.IntWritable; .apache.hadoop.io.LongWritable; .apache.hadoop.io.Text; .apache.hadoop.mapred.JobConf; .apache.hadoop.mapreduce.Job; .apache.hadoop.mapreduce.Mapper; .apache.hadoop.mapreduce.Partitioner; .apache.hadoop.mapreduce.Reducer; .apache.hadoop.mapreduce.lib.input.FileInputFormat; .apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class PartTest { public static void main(String[] args){ Path inFile = new Path(args[0]); Path outFile = new Path(args[1]); Job job; try { job = Job.getInstance(); job.setJarByClass(PartTest.class); FileInputFormat.addInputPath(job , inFile); FileOutputFormat.setOutputPath(job, outFile); job.setReducerClass(PartTestreducer.class); job.setMapperClass(PartTestmapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); try { job.waitForCompletion(true); } catch (ClassNotFoundException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } /** * InputFormat描述map-reduce中对job的输入定义 * setInputPaths():为map-reduce job设置路径数组作为输入列表 * setInputPath():为map-reduce job设置路径数组作为输出列表 */ } } class PartTestmapper extends Mapper{ private final IntWritable one = new IntWritable(1); //private Text word = new Text(); @Override protected void map(LongWritable key, Text value, Mapper.Context context) throws IOException, InterruptedException { // TODO Auto-generated method stub /* String line = value.toString(); for(String s : line.split("\s+")){ //if(s.length() > 0){ context.write(new Text(s), one); //} } */ String[] line = value.toString().split("\W+"); for(int i = 0 ; i<= line.length-1 ;i++){ String s = line[i]; context.write(new Text(s), new IntWritable(1)); } /* String line = value.toString(); Text word = new Text(); StringTokenizer token = new StringTokenizer(line); while (token.hasMoreTokens()) { word.set(token.nextToken()); context.write(word, one); } */ } } class PartTestreducer extends Reducer{ @Override protected void reduce(Text arg0, Iterable arg1, Reducer.Context context) throws IOException, InterruptedException { // TODO Auto-generated method stub int sum = 0; for(IntWritable i : arg1){ sum += i.get(); } context.write(arg0, new IntWritable(sum)); } } 将上述代码打包成 parttest.jar,并上传到服务器的opt目录 创建文件/opt/test.txt,并上传到hdfs的/tmp目录下 文本内容如下: hello world hello test test hadoop hadoop hdfs hive sql sqoop 在服务器上执行: hadoop jar parttest.jar "PartTest" "/tmp/test.txt" "/tmp/part/out1" 我们可以看到日志输出文件: 在这里可以看到只启动了一个reduce任务 然后使用 hadoop fs -ls /tmp/part/out1 可以看到只生成了一个分区文件part-r-00000: 如果我们把上述代码进行修改: job = Job.getInstance(); job.setJarByClass(PartTest.class); FileInputFormat.addInputPath(job , inFile); FileOutputFormat.setOutputPath(job, outFile); job.setNumReduceTasks(3); job.setReducerClass(PartTestreducer.class); job.setMapperClass(PartTestmapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); 我们在代码里新加了一行 :job.setNumReduceTasks(3); 将代码重新打包上传,执行: hadoop jar parttest.jar "PartTest" "/tmp/test.txt" "/tmp/part/out2" 将结果输出到/tmp/part/out2目录 可以看到启动了3个reduce任务。

然后使用 hadoop fs -ls /tmp/part/out2 可以看到/tmp/part/out2文件夹中生成了3个part文件: 所以可以使用 setNumReduceTasks 来设置reduce的数量 2、mapreduce的分区 我们在原来的代码的最后一段加上如下代码: class PartTestPartitioner extends Partitioner{ @Override //参数含义:第一个参数为map任务的outputkey。

class,第二个参数为map任务的alue。

class,第三个参数为分区的数量,默认为1 public int getPartition(Text key, IntWritable value, int numPartitions) { // TODO Auto-generated method stub if(key.toString().startsWith("h")){ return 0%numPartitions; } else if(key.toString().startsWith("s")){ return 1%numPartitions; } else{ return 2%numPartitions; } } } 这段代码的含义是: 将以h开头的统计结果输出到part-r-00000 将以s开头的统计结果输出到part-r-00001 将以其他字母开头的统计结果输出到part-r-00002 对原有代码进行如下修改: job = Job.getInstance(); job.setJarByClass(PartTest.class); FileInputFormat.addInputPath(job , inFile); FileOutputFormat.setOutputPath(job, outFile); job.setNumReduceTasks(3); job.setPartitionerClass(PartTestPartitioner.class); job.setReducerClass(PartTestreducer.class); job.setMapperClass(PartTestmapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); 新加了一行代码:job.setPartitionerClass(PartTestPartitioner.class); 将代码重新打包上传,执行: hadoop jar parttest.jar "PartTest" "/tmp/test.txt" "/tmp/part/out3" 将结果输出到/tmp/part/out3目录 可以看到启动了3个reduce任务。

然后使用 hadoop fs -ls /tmp/part/out3 可以看到/tmp/part/out3文件夹中生成了3个part文件: 分别查看三个文件: 可以看到输出结果已经分别输出到对应的分区文件。

注意: job.setNumReduceTasks(3); job.setPartitionerClass(PartTestPartitioner.class); NumReduceTasks的数量不能小于partitioner的数量,否则结果会写到part-r-00000中 分类: HADOOP

mapreduce 怎么查看每个reducer处理的数据量

您好,第一种方法是用Mapper读取文本文件用StringTokenizer对读取文件内的每一行的数字(Hadoop处理文本文件时,处理时是一行一行记取的)进行分隔,获取每一个数字,然后求和,再将求得的值按Key/Value格式写入Context,最后用Reducer对求得中间值进行汇总求和,得出整个文件所有数字的和。

第二种方法是用Mapper读取文本文件用StringTokenizer对文件内的数字进行分隔,获取每一个数字,并救出文件中该数字有多少个,在合并过程中,求出每个数字在文件中的和,最后用Reducer对求得每个数字求得的和进行汇总求和,得出整个文件所有数字的和。

.hadoop; import java.io.IOException; import java.util.StringTokenizer; .apache.hadoop.conf.Configuration; .apache.hadoop.fs.Path; .apache.hadoop.io.LongWritable; .apache.hadoop.io.Text; .apache.hadoop.mapreduce.Job; .apache.hadoop.mapreduce.Mapper; .apache.hadoop.mapreduce.Reducer; .apache.hadoop.mapreduce.lib.input.FileInputFormat; .apache.hadoop.mapreduce.lib.output.FileOutputFormat; .apache.hadoop.util.GenericOptionsParser; public class NumberSum { //对每一行数据进行分隔,并求和 public static class SumMapper extends Mapper<Object, Text, Text, LongWritable> { private Text word = new Text("sum"); private static LongWritable numValue = new LongWritable(1); public void map(Object key, Text value, Context context) throws IOException, InterruptedException { StringTokenizer itr = new StringTokenizer(value.toString()); long sum = 0; while (itr.hasMoreTokens()) { String s = itr.nextToken(); long val = Long.parseLong(s); sum += val; } numValue.set(sum); context.write(word, numValue); } } // 汇总求和,输出 public static class SumReducer extends Reducer<Text, LongWritable, Text, LongWritable> { private LongWritable result = new LongWritable(); private Text k = new Text("sum"); public void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException { long sum = 0; for (LongWritable val : values) { long v = val.get(); sum += v; } result.set(sum); context.write(k, result); } } /** * @param args * @throws Exception */ public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); String[] otherArgs = new GenericOptionsParser(conf, args) .getRemainingArgs(); if (otherArgs.length != 2) { System.err.println("Usage: numbersum <in> <out>"); System.exit(2); } Job job = new Job(conf, "number sum"); job.setJarByClass(NumberSum.class); job.setMapperClass(SumMapper.class); job.setReducerClass(SumReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(LongWritable.class); FileInputFormat.addInputPath(job, new Path(otherArgs[0])); FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); System.out.println("ok"); } } 第一种实现的方法相对简单,第二种实现方法用到了Combiner类,Hadoop 在 对中间求的的数值进行Combiner时,是通过递归的方式不停地对 Combiner方法进行调用,进而合并数据的。

从两种方法中,我们可以看出Map/Reduce的核心是在怎样对输入的数据按照何种方式是进行Key/Value分对的,分的合理对整个结果输出及算法实现有很大的影响。

spark 读取多个文件支持通配符吗?为什么用通配符提示找不到文件

val wc = sc.textFile("/user/boco/yy/_*").flatMap(_.split()).map((_,1)).groupByKey 直接用*代替,不用加“/”,刚我试过了。

而且就算加,怎么会加到*后面啊,加到后面就是找"_*"文件夹了

Hadoop中mapred包和mapreduce包的区别

今天写了段代码突然发现,很多类在mapred和mapreduce中分别都有定义,下面是小菜写的一段代码: public class MyJob extends Configured implements Tool { public static class MapClass extends MapReduceBase implements Mapper<Text, Text, Text, Text> {// public void map(Text key, Text value, OutputCollector<Text, Text> output, Reporter reporter) throws IOException { output.collect(value, key); } } public static class Reduce extends MapReduceBase implements Reducer<Text, Text, Text, Text> { @Override public void reduce(Text key, Iterator<Text> values, OutputCollector<Text, Text> output, Reporter reporter) throws IOException { String csv = ""; while (values.hasNext()) { csv += csv.length() > 0 ? "," : ""; csv += values.next().toString(); } output.collect(key, new Text(csv)); } } @Override public int run(String[] args) throws Exception { Configuration conf = getConf(); JobConf job = new JobConf(conf, MyJob.class); //JobConf job = new JobConf(conf, MyJob.class); Path in = new Path(args[0]); Path out = new Path(args[1]); FileInputFormat.setInputPaths(job, in); FileOutputFormat.setOutputPath(job, out); job.setJobName("MyJob"); job.setMapperClass(MapClass.class); job.setReducerClass(Reduce.class); job.setInputFormat(KeyValueTextInputFormat.class); job.setOutputFormat(TextOutputFormat.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); job.set("key.value.separator.in.input.line", ","); JobClient.runJob(job); return 0; } public static void main(String[] args) throws Exception { // TODO Auto-generated method stub int res = ToolRunner.run(new Configuration(), new MyJob(), args); System.exit(res); } } 主要看run方法: 上面代码中的Jobconf无可厚非,只有在mapred包中有定义,这个没问题。

但是FileInputFormat和FileOutputFormat在mapred和mapreduce中都有定义,刚开始脑海里对这些都没有概念,就引用了mapreduce中的FileInputFormat和FIleOutputFormat。

这样操作就带来了后面的问题 FileInputFormat.setInputPaths(job, in); FileOutputFormat.setOutputPath(job, out); 这两条语句不能通过编译,为什么呢,因为FileInputFormat.setInputPaths和FileOutputFormat.setOutputPath的第一个参数都是Job,而不是JobConf,找了很多资料,由于对Hadoop了解少,所以找资料没有方向感,这也是学习新东西效率低下的原因,如果有哪位大牛,知道怎么克服效率低下的问题,请不吝赐教! 后来,无意中,看到mapred包中也有这两个类的定义,于是火箭速度修改为mapred下的包,OK,顺利通过编译! 下面还有 job.setOutputFormat(TextOutputFormat.class);语句编译不同通过,提示参数需要扩展。





的参数;于是小菜也去mapred下面查找是否存在此类,正如期望,也存在此类,当即立段,修改为此包下的类,顺利编译通过,此时,颇有成就感! 可是现在小菜发现,mapred包下和mapreduce包下同时都存在又相应的类,不知道是为什么,那么下面就有目标的请教搜索引擎啦,呵呵,比刚才有很大进步。

结果令小菜很失望,就找到了一个符合理想的帖子。

但是通过这个帖子,小菜知道了,mapred代表的是hadoop旧API,而mapreduce代表的是hadoop新的API。

OK,小菜在google输入框中输入“hadoop新旧API的区别”,结果很多。

看了之后,又结合权威指南归结如下: 1. 首先第一条,也是小菜今天碰到这些问题的原因,新旧API不兼容。

所以,以前用旧API写的hadoop程序,如果旧API不可用之后需要重写,也就是上面我的程序需要重写,如果旧API不能用的话,如果真不能用,这个有点儿小遗憾! 2. 新的API倾向于使用抽象类,而不是接口,使用抽象类更容易扩展。

例如,我们可以向一个抽象类中添加一个方法(用默认的实现)而不用修改类之前的实现方法。

因此,在新的API中,Mapper和Reducer是抽象类。

3. 新的API广泛使用context object(上下文对象),并允许用户代码与MapReduce系统进行通信。

例如,在新的API中,MapContext基本上充当着JobConf的OutputCollector和Reporter的角色。

4. 新的API同时支持"推"和"拉"式的迭代。

在这两个新老API中,键/值记录对被推mapper中,但除此之外,新的API允许把记录从map()方法中拉出,这也适用于reducer。

分批处理记录是应用"拉"式的一个例子。

5. 新的API统一了配置。

旧的API有一个特殊的JobConf对象用于作业配置,这是一个对于Hadoop通常的Configuration对象的扩展。

在新的API中,这种区别没有了,所以作业配置通过Configuration来完成。

作业控制的执行由Job类来负责,而不是JobClient,并且JobConf和JobClient在新的API中已经荡然无存。

这就是上面提到的,为什么只有在mapred中才有Jobconf的原因。

6. 输出文件的命名也略有不同,map的输出命名为part-m-nnnnn,而reduce的输出命名为part-r-nnnnn,这里nnnnn指的是从0开始的部分编号。

这样了解了二者的区别就可以通过程序的引用包来判别新旧API编写的程序了。

小菜建议最好用新的API编写hadoop程序,以防旧的API被抛弃!!!

fileinputformat是抽象类吗

不是抽象类,你看看API啊,继承于InputStream是抽象类, FileInputStream不是,它有3个构造器 FileInputStream(File file) Creates a FileInputStream by opening a connection to an actual file, the file named by the File object file in the file system. FileInputStream(FileDescriptor fdObj) Creates a FileInputStream by using the file descriptor fdObj, which represents an existing connection to an actual file in the file system. FileInputStream(String name) Creates a FileInputStream by opening a connection to an actual file, the file named by the path name name in the file system.

不同mapreduce程序可以连续运行吗?比如说多个这样的程序,用上一个的输出作为下一个的输入,求

可以的。

最简单的方式就是在一个类里定义 map1 reduce1 map2 reduce2 map3 reduce3 map2 的输入设为map1的输出 map3 的输入可以设为map2和map3的输入。

百星数据(60元/月,600元/年)日本/韩国/香港cn2 gia云服务器,2核2G/40G/5M带宽

百星数据(baixidc),2012年开始运作至今,主要提供境外自营云服务器和独立服务器出租业务,根据网络线路的不同划分为:美国cera 9929、美国cn2 gia、香港cn2 gia、韩国cn2 gia、日本cn2 gia等云服务器及物理服务器业务。目前,百星数据 推出的日本、韩国、香港cn2 gia云服务器,2核2G/40G/5M带宽低至60元/月,600元/年。百星数据优惠码:优惠码:30...

PIGYun中秋特惠:香港/韩国VPS月付14元起

PIGYun发布了九月份及中秋节特惠活动,提供8折优惠码,本月商家主推中国香港和韩国机房,优惠后最低韩国每月14元/中国香港每月19元起。这是一家成立于2019年的国人商家,提供中国香港、韩国和美国等地区机房VPS主机,基于KVM架构,采用SSD硬盘,CN2+BGP线路(美国为CUVIP-AS9929、GIA等)。下面列出两款主机配置信息。机房:中国香港CPU:1core内存:1GB硬盘:10GB...

Pia云服务商春节6.66折 美国洛杉矶/中国香港/俄罗斯和深圳机房

Pia云这个商家的云服务器在前面也有介绍过几次,从价格上确实比较便宜。我们可以看到最低云服务器低至月付20元,服务器均采用KVM虚拟架构技术,数据中心包括美国洛杉矶、中国香港、俄罗斯和深圳地区,这次春节活动商家的活动力度比较大推出出全场6.66折,如果我们有需要可以体验。初次体验的记得月付方案,如果合适再续约。pia云春节活动优惠券:piayun-2022 Pia云服务商官方网站我们一起看看这次活...

fileinputformat为你推荐
知识分享平台全国知识共享的平台有哪些?vga接口定义VGA接口的15针分别接什么?awv如何把普通电影转换成AWV腾讯技术腾讯简介----币众筹收益权众筹为什么有吸引力监控插件观看远程监控时,提示需要一个插件来显示什么是网络地址手机IP地址是什么?红牛下架红牛下架事件怎么回事?美宜佳最近怎么买不到红牛了?embed函数datedif是什么函数哈?value又是什么函数哈smo优化vivo手机一直反复优化要怎么弄?
高防服务器租用选锐一 vps是什么意思 香港bgp机房 vultr美国与日本 paypal认证 payoneer 本网站在美国维护 天互数据 anylink hkg 跟踪路由命令 双线asp空间 阿里云邮箱登陆地址 阿里dns umax 葫芦机 studentmain 在线tracert 电脑显示屏不亮但是主机已开机 新浪轻博客 更多