mapreduce源码解析之mapper-凯发k873.com

易观 2018-11-21
详解mapreduce中map(映射)的实现者mapper。

导语:



说起mapreduce,只要是大数据领域的小伙伴,相信都不陌生。它作为hadoop生态系统中的一部分,最早是由google公司研究提出的一种面向大规模数据处理的并行计算模型。

 

mapreduce主要由"map(映射)"和"reduce(归约)"组成,主要思想是用map函数用来把一组键值对映射成一组新的键值对,然后指定并发的reduce函数进行合并输出。然而就是这个简单的分布式思想,却敲开了大数据时代的大门,很多大数据工作者都把学习mapreduce作为基础的一课。 不过对于大多数小伙伴来说,虽然学习或写了很久的mapreduce,却不一定真正研究过它的源码。

 

这一期,我就先给大家介绍一下mapreduce中map(映射)的实现者mapper。更深入的理解mapreduce,在应用到易观千帆技术数据处理方面时,可以更好的提升易观大数据处理能力。



类图



如图为mapper以及它的一些子类的类图mapper一共有九个子类。我们挑了其中的4个子类来做分析)



 

mapper源码  



从源码中我们可以看出,mapper类里总共包含四个方法,一个抽象类。



1.setup()方法—一般作为map()方法的准备工作,如进行相关配置文件的读取、参数的传递;

2.cleanup()方法—是用来做一些收尾工作,如关闭文件、key-value的分发;

3.map()方法—是真正的程序逻辑部分,如对一行文本的split、filter处理之后,将数据以key-value的形式写入context;

4.run()方法—是驱动整个mapper执行的一个方法,按照run()>>setup()>>map()>>cleanup()顺序执行;

5.context抽象类—是mapper里的一个内部抽象类,主要是为了在map任务或者reduce任务中跟踪task的相关状态和数据的存放。如context可以存储一些jobconf有关的信息,在setup()方法中,就可以用context读取相关的配置信息,以及作为key-value数据的载体。(context比较复杂,以后可以单独介绍)

 

mapper子类



inversemapper

 


这个类很简单,只是将key-value进行反转,然后直接分发,如面包-生产商,我们既可以统计某一种面包来自多少个生产商,也可以统计一个生产商生产多少种面包。不同的需求,利用inversemapper可以达到不同的效果

 

tokencountermapper

 


这个类使用stringtokenizer来获取value中的tokens(在stringtokenizer的构造函数中将value按照“\t\n\t\f”进行分割),然后对每一个token,分发出一个对,这将在reduce端被收集,同一个token对应的key-value对都会被收集到同一个reducer上,计算出所有mapper分发出来的以某个token为key的的数量,然后加起来,就得到了token的计数。在我们学习的wordcount程序中,其实只需要在main方法中将job.setmapperclass(tokencountermapper.class)进行设置,便可以统计单词的个数。



regexmapper



这个类其实就是将wordcount进行了正则化,匹配自己需要格式的word进行统计。



multithreadedmapper

 



这个类使用多线程来执行mapper(它由mapreduce.mapper.multithreadedmapper.mapclass设置)。该类重写了run()方法,首先设置运行上下文context和workmapper,然后启动多个maprunner(内部类)子线程(由mapred.map.multithreadedrunner.threads 设置 ),最后使用join()等待子线程执行完毕。maprunner(内部类)继承了thread,拥有独立的context去执行mapper,并进行异常处理。从maprunner的constructor中我们看见,它使用了独立的submaprecordreader、submaprecordwriter和submapstatusreporter。submaprecordreader在读key-value和submaprecordwriter在写key-value的时候都要同步。这是通过互斥访问multithreadedmapper的上下文outer来实现的。multithreadedmapper可以充分利用cpu,采用多个线程处理后,一个线程可以同时在另一个线程执行的时候读取数据并执行,这样就使用了更多的cpu周期来执行任务,从而提高吞吐率(注意读写操作都是线程安全的)。但对于io密集型的作业,采用multithreadedmapper会适得其反,因为会有多个线程等待io,io便成为限制吞吐率的关键,这时候我们可以通过增多task数量的方法来解决,因为这样在io上就是并行的。



以上即是对mapreduce中mapper以及部分子类源码的解析,后续将继续讨论mapreduce中的其他关键类源码。

 


热门文章

  • 关注微信公众号

  • 市场合作电话

    4006-010-231

    会员服务邮箱

    [email protected]

  • 关注微信公众号

  • 市场合作电话

    4006-010-231

    会员服务邮箱

    [email protected]