fileinputformatmapreduce 键值对怎么定义的

fileinputformat  时间:2021-06-08  阅读:()

hadoop怎么重新启动tasktracker

如果是某个TaskTracker挂掉了想要重启,比较稳妥的方式是在master直接运行一次start-mapred.sh(或者start-all.sh也可以)脚本。

脚本会忽略正在正常运行的hadoop进程,并在没有运行TaskTracker的节点上启动TaskTracker。

如果是想停止某个节点上的Taskracker再启动,直接登录到该节点,kill掉TaskTracker进程,再返回master运行start-mapred.sh。

Hadoop,怎么实现多个输入路径的输入?

1.多路径输入 1)FileInputFormat.addInputPath 多次调用加载不同路径 .apache.hadoop.mapreduce.lib.input.FileInputFormat; .apache.hadoop.mapreduce.lib.output.FileOutputFormat; String?in0?=?args[0]; String?in1?=?args[1]; String?out?=?args[2]; FileInputFormat.addInputPath(job,new?Path(in0)); FileInputFormat.addInputPath(job,new?Path(in1)); FileOutputFormat.setOutputPath(job,new?Path(out));2)FileInputFormat.addInputPaths一次调用加载 多路径字符串用逗号隔开 FileInputFormat.addInputPaths(job, "hdfs://RS5-112:9000/cs/path1,hdfs://RS5-112:9000/cs/path2"); 2.多种输入 MultipleInputs可以加载不同路径的输入文件,并且每个路径可用不同的maper MultipleInputs.addInputPath(job, new Path("hdfs://RS5-112:9000/cs/path1"), TextInputFormat.class,MultiTypeFileInput1Mapper.class); MultipleInputs.addInputPath(job, new Path("hdfs://RS5-112:9000/cs/path3"), TextInputFormat.class,MultiTypeFileInput3Mapper.class);

Hadoop是怎么分块的

hadoop的分块有两部分,其中第一部分更为人熟知一点。

第一部分就是数据的划分(即把File划分成Block),这个是物理上真真实实的进行了划分,数据文件上传到HDFS里的时候,需要划分成一块一块,每块的大小由hadoop-default.xml里配置选项进行划分。

<property> <name>dfs.block.size</name> <value>67108864</value> <description>The default block size for new files.</description> </property> 这个就是默认的每个块64MB。

数据划分的时候有冗余,个数是由 <property> <name>dfs.replication</name> <value>3</value> <description>Default block replication. The actual number of replications can be specified when the file is created. The default is used if replication is not specified in create time. </description> </property> 指定的。

具体的物理划分步骤要看Namenode,这里要说的是更有意思的hadoop中的第二种划分。

在hadoop中第二种划分是由InputFormat这个接口来定义的,其中有个getSplits方法。

这里就有了一个新的不为人熟知的概念:Split。

Split的作用是什么,Split和Block是什么关系,下面就可以说明清楚。

在Hadoop0.1中,split划分是在JobTracker端完成的,发生在JobInitThread对JobInProgress调用inittasks()的时候;而在0.18.3中是由JobClient完成的,JobClient划分好后,把split.file写入hdfs里,到时候jobtracker端只需要读这个文件,就知道Split是怎么划分的了。

第二种划分只是一种逻辑上划分,目的是为了让Map Task更好的获取数据输入,仔细分析如下这个场景: File 1 : Block11, Block 12, Block 13, Block 14, Block 15 File 2 : Block21, Block 22, Block 23 File1有5个Block,最后一个Block当然可能小于64MB;File2有3个Block 如果用户在程序中指定map tasks的个数,比如说是2(如果不指定的话maptasks个数默认是1),那么在 FileInputFormat(最常见的InputFormat实现)的getSplits方法中,首先会计算totalSize=8(可以对照源码看看,注意getSplits这个函数里的计量单位是Block个数,而不是Byte个数,后面有个变量叫bytesremaining仍然表示剩余的Block个数,有些变量名让人无语),然后会计算goalSize=totalSize/numSplits=4,对于File1,计算一个Split有多少个Block是这样计算的 long splitSize =puteSplitSize(goalSize, minSize, blockSize); protected puteSplitSize(long goalSize, long minSize, long blockSize) { return Math.max(minSize, Math.min(goalSize, blockSize)); } 这里minSize是1(说明了一个Split至少包含一个Block,不会出现一个Split包含零点几个Block的情况),计算得出splitSize=4,所以接下来Split划分是这样分的: Split 1: Block11, Block12, Block13,Block14 Split 2: Block15 Split 3: Block21, Block22, Block23 那用户指定的map个数是2,出现了三个split怎么办?在JobInProgress里其实maptasks的个数是根据Splits的长度来指定的,所以用户指定的map个数只是个参考。

可以参看JobInProgress: initTasks() 里的代码: try { splits = JobClient.readSplitFile(splitFile); } finally { splitFile.close(); } numMapTasks = splits.length; maps = new TaskInProgress[numMapTasks]; 所以问题就很清晰了,还如果用户指定了20个map作业,那么最后会有8个Split(每个Split一个Block),所以最后实际上就有8个MapTasks,也就是说maptask的个数是由splits的长度决定的。

几个简单的结论: 1. 一个split不会包含零点几或者几点几个Block,一定是包含大于等于1个整数个Block 2. 一个split不会包含两个File的Block,不会跨越File边界 3. split和Block的关系是一对多的关系 4. maptasks的个数最终决定于splits的长度 还有一点需要说明,在FileSplit类中,有一项是private String[] hosts; 看上去是说明这个FileSplit是放在哪些机器上的,实际上hosts里只是存储了一个Block的冗余机器列表。

比如上面例子中的Split 1: Block11, Block12, Block13,Block14,这个FileSplit中的hosts里最终存储的是Block11本身和其冗余所在的机器列表,也就是说Block12,Block13,Block14存在哪些机器上没有在FileSplit中记录。

FileSplit中的这个属性有利于调度作业时候的数据本地性问题。

如果一个tasktracker前来索取task,jobtracker就会找个task给他,找到一个maptask,得先看这个task的输入的FileSplit里hosts是否包含tasktracker所在机器,也就是判断和该tasktracker同时存在一个机器上的datanode是否拥有FileSplit中某个Block的备份。

但总之,只能牵就一个Block,其他Block就从网络上传罢。

mapreduce怎么导入hbace

1、先看一个标准的hbase作为数据读取源和输出源的样例: [java] view plaincopy在CODE上查看代码片派生到我的代码片 Configuration conf = HBaseConfiguration.create(); Job job = new Job(conf, "job name "); job.setJarByClass(test.class); Scan scan = new Scan(); TableMapReduceUtil.initTableMapperJob(inputTable, scan, mapper.class, Writable.class, Writable.class, job); TableMapReduceUtil.initTableReducerJob(outputTable, reducer.class, job); job.waitForCompletion(true); 首先创建配置信息和作业对象,设置作业的类。

这些和正常的mapreduce一样, [java] view plaincopy在CODE上查看代码片派生到我的代码片 Configuration conf = HBaseConfiguration.create(); Job job = new Job(conf, "job name "); job.setJarByClass(test.class); Scan scan = new Scan(); TableMapReduceUtil.initTableMapperJob(inputTable, scan, mapper.class, Writable.class, Writable.class, job); TableMapReduceUtil.initTableReducerJob(outputTable, reducer.class, job); job.waitForCompletion(true); 唯一不一样的就是数据源的说明部分,TableMapReduceUtil的initTableMapperJob和initTableReducerJob方法来实现。

用如上代码: 数据输入源是hbase的inputTable表,执行mapper.class进行map过程,输出的key/value类型是ImmutableBytesWritable和Put类型,最后一个参数是作业对象。

需要指出的是需要声明一个扫描读入对象scan,进行表扫描读取数据用,其中scan可以配置参数,这里为了例子简单不再详述。

数据输出目标是hbase的outputTable表,输出执行的reduce过程是reducer.class类,操作的作业目标是job。

与map比缺少输出类型的标注,因为他们不是必要的,看过源代码就知道mapreduce的TableRecordWriter中write(key,value)方法中,key值是没有用到的。

value只能是Put或者Delete两种类型,write方法会自行判断并不用用户指明。

接下来就是mapper类: [java] view plaincopy在CODE上查看代码片派生到我的代码片 public class mapper extends TableMapper { public void map(Writable key, Writable value, Context context) throws IOException, InterruptedException { //mapper逻辑 context.write(key, value); } } } 继承的是hbase中提供的TableMapper类,其实这个类也是继承的MapReduce类。

后边跟的两个泛型参数指定类型是mapper输出的数据类型,该类型必须继承自Writable类,例如可能用到的put和delete就可以。

需要注意的是要和initTableMapperJob方法指定的数据类型一直。

该过程会自动从指定hbase表内一行一行读取数据进行处理。

然后reducer类: [java] view plaincopy在CODE上查看代码片派生到我的代码片 public class countUniteRedcuer extends TableReducer { public void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { //reducer逻辑 context.write(null, put or delete); } } reducer继承的是TableReducer类。

后边指定三个泛型参数,前两个必须对应map过程的输出key/value类型,第三个必须是put或者delete。

write的时候可以把key写null,它是不必要的。

这样reducer输出的数据会自动插入outputTable指定的表内。

2、有时候我们需要数据源是hdfs的文本,输出对象是hbase。

这时候变化也很简单: [java] view plaincopy 你会发现只需要像平常的mapreduce的作业声明过程一样,指定mapper的执行类和输出key/value类型,指定FileInputFormat.setInputPaths的数据源路径,输出声明不变。

便完成了从hdfs文本读取数据输出到hbase的命令声明过程。

mapper和reducer如下: [java] view plaincopy在CODE上查看代码片派生到我的代码片 Configuration conf = HBaseConfiguration.create(); Job job = new Job(conf, "job name "); job.setJarByClass(test.class); job.setMapperClass(mapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(LongWritable.class); FileInputFormat.setInputPaths(job, path); TableMapReduceUtil.initTableReducerJob(tableName, reducer.class, job); [java] view plaincopy在CODE上查看代码片派生到我的代码片 public class mapper extends Mapper { public void map(LongWritable key, Text line, Context context) { //mapper逻辑 context.write(k, one); } } public class redcuer extends TableReducer { public void reduce(Writable key, Iterable values, Context context) throws IOException, InterruptedException { //reducer逻辑 context.write(null, put or delete); } } [java] view plaincopy mapper还依旧继承原来的MapReduce类中的Mapper即可。

同样注意这前后数据类型的key/value一直性。

[java] view plaincopy 3、最后就是从hbase中的表作为数据源读取,hdfs作为数据输出,简单的如下: [java] view plaincopy在CODE上查看代码片派生到我的代码片 Configuration conf = HBaseConfiguration.create(); Job job = new Job(conf, "job name "); job.setJarByClass(test.class); Scan scan = new Scan(); TableMapReduceUtil.initTableMapperJob(inputTable, scan, mapper.class, Writable.class, Writable.class, job); job.setOutputKeyClass(Writable.class); job.setOutputValueClass(Writable.class); FileOutputFormat.setOutputPath(job, Path); job.waitForCompletion(true); mapper和reducer简单如下: [java] view plaincopy在CODE上查看代码片派生到我的代码片Configuration conf = HBaseConfiguration.create(); Job job = new Job(conf, "job name "); job.setJarByCl ass(test.class); Scan scan = new Scan(); TableMapReduceUtil.initTableMapperJob(inputTable, scan, mapper.class, Writable.class, Writable.class, job); TableMapReduceUtil.initTableReducerJob(outputTable, reducer.class, job); job.waitForCompletion(true); Configuration conf = HBaseConfiguration.create(); Job job = new Job(conf, "job name "); job.setJarByClass(test.class); Scan scan = new Scan(); TableMapReduceUtil.initTableMapperJob(inputTable, scan, mapper.class, Writable.class, Writable.class, job); TableMapReduceUtil.initTableReducerJob(outputTable, reducer.class, job); job.waitForCompletion(true); 唯一不一样的就是数据源的说明部分,TableMapReduceUtil的initTableMapperJob和initTableReducerJob方法来实现。

public class mapper extends TableMapper { public void map(Writable key, Writable value, Context context) throws IOException, InterruptedException { //mapper逻辑 context.write(key, value); } } } public class countUniteRedcuer extends TableReducer { public void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { //reducer逻辑 context.write(null, put or delete); } }

mapreduce 键值对怎么定义的

一般情况下Mapreduce输出的键值对是以制表符 为分隔符的,如下图所示: 但有时候我们像将其设置为其它的分隔符输出,比如",",如下图所示: 此时可以在Mapreduce的主函数中添加如下的两行代码: [java] view plain copy print? conf.set("mapred.textoutputformat.ignoreseparator","true"); conf.set("mapred.textoutputformat.separator",","); 具体如下的WordCount程序: [java] view plain copy print? import java.io.IOException; import java.util.StringTokenizer; .apache.hadoop.conf.Configuration; .apache.hadoop.fs.Path; .apache.hadoop.io.IntWritable; .apache.hadoop.io.Text; .apache.hadoop.mapreduce.Job; .apache.hadoop.mapreduce.Mapper; .apache.hadoop.mapreduce.Reducer; .apache.hadoop.mapreduce.lib.input.FileInputFormat; .apache.hadoop.mapreduce.lib.output.FileOutputFormat; .apache.hadoop.util.GenericOptionsParser; public class WordCount { public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> { private final static IntWritable one = new IntWritable(1); private Text word = new Text(); public void map(Object key, Text value, Context context) throws IOException, InterruptedException { StringTokenizer itr = new StringTokenizer(value.toString()); while (itr.hasMoreTokens()) { word.set(itr.nextToken()); context.write(word, one); } } } public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> { private IntWritable result = new IntWritable(); public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); } result.set(sum); context.write(key, result); } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); //设置MapReduce的输出的分隔符为逗号 conf.set("mapred.textoutputformat.ignoreseparator", "true"); conf.set("mapred.textoutputformat.separator", ","); String[] otherArgs = new GenericOptionsParser(conf, args) .getRemainingArgs(); if (otherArgs.length != 2) { System.err.println("Usage: wordcount <in> <out>"); System.exit(2); } Job job = new Job(conf, "word count"); job.setJarByClass(WordCount.class); job.setMapperClass(TokenizerMapper.class); job.setCombinerClass(IntSumReducer.class); job.setReducerClass(IntSumReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(job, new Path(otherArgs[0])); FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); } }

创梦网络-四川一手资源高防大带宽云服务器,物理机租用,机柜资源,自建防火墙,雅安最高单机700G防护,四川联通1G大带宽8.3W/年,无视UDP攻击,免费防CC

? ? ? ?创梦网络怎么样,创梦网络公司位于四川省达州市,属于四川本地企业,资质齐全,IDC/ISP均有,从创梦网络这边租的服务器均可以****,属于一手资源,高防机柜、大带宽、高防IP业务,另外创梦网络近期还会上线四川联通大带宽,四川联通高防IP,一手整CIP段,四川电信,联通高防机柜,CN2专线相关业务。成都优化线路,机柜租用、服务器云服务器租用,适合建站做游戏,不须要在套CDN,全国访问快...

ZJI:韩国BGP+CN2线路服务器,国内三网访问速度优秀,8折优惠码每月实付440元起

zji怎么样?zji最近新上韩国BGP+CN2线路服务器,国内三网访问速度优秀,适用8折优惠码zji,优惠后韩国服务器最低每月440元起。zji主机支持安装Linux或者Windows操作系统,会员中心集成电源管理功能,8折优惠码为终身折扣,续费同价,全场适用。ZJI是原Wordpress圈知名主机商:维翔主机,成立于2011年,2018年9月启用新域名ZJI,提供中国香港、台湾、日本、美国独立服...

香港 E5-2650 16G 10M 900元首月 美国 E5-2660 V2 16G 100M 688元/月 华纳云

华纳云双11钜惠出海:CN2海外物理服务器终身价688元/月,香港/美国机房,免费送20G DDos防御,50M CN2或100M国际带宽可选,(文内附带测评)华纳云作为一家专业的全球数据中心基础服务提供商,总部在香港,拥有香港政府颁发的商业登记证明,APNIC 和 ARIN 会员单位。主营香港服务器、美国服务器、香港/美国OpenStack云服务器、香港高防物理服务器、美国高防服务器、香港高防I...

fileinputformat为你推荐
ostringstreamstringbuf类是干什么用的解决方法股价图简单易懂的股票图外加说明eagleeye电脑进程中出现Eaglesvr这种程序,据说是一种蠕虫病毒。。。怎样杀掉?知识分享平台微信看到一些文章,可以分享到知识付费的平台吗?怎么操作呀?网络审计网络审计和传统审计的范围有什么变化模式识别算法算法是编程么?考研学模式识别方向,编程多么?云盘网谁知道免费的网盘?labelforhtml中label是什么意思啊?腾讯贴吧QQ贴吧图标灭了后该怎样再点亮?软件开发技术文档软件开发项目中,过程管理文档都包括什么?
域名解析 域名备案信息查询 openv 主机 kddi godaddy 贵州电信宽带测速 ev证书 申请空间 商务主机 湖南服务器托管 本网站在美国维护 e蜗 静态空间 免费美国空间 免费高速空间 网站在线扫描 空间登入 智能dns解析 美国迈阿密 更多