mapreduce源码解析之inputformat-凯发k873.com

易观 2019-02-13
mapreduce中的输入文件读取者inputformat

导读

上一篇文章介绍了mapreduce中map(映射)的实现者mapper,本章将会介绍mapreduce中的输入文件读取者inputformat。mapreduce程序获取的数据类型多种多样,当程序把数据输入给mapper时,需要格式化读取,例如读取普通文本文件需要设置job.setinputformatclass(textinputformat.class)。所有的输入格式类都继承于inputformat,它的主要作用是将输入数据切分成分片(比如多少行为一个分片),以及如何读取分片中的数据(比如按行读取)。前者由getsplits()完成,后者由recordreader完成。下面我就详细介绍一下inputformat和它的一些相关类是如何实现数据的输入和读取的。

 

类图

 

 

inputformat源码

 

 

inputformat是一个抽象类,有两个抽象方法:

1.  getsplits() 负责将文件切分成多个分片(inputsplit),但并没有实际切分文件,而只是说明了如何切分数据,inputsplit只是逻辑上的切分。例如,可以拆分为元组。

2.  createrecordreader() 则创建了recordreader,用来从inputsplit读取记录。例如,linerecordreader以偏移值为key,一行的数据为value,它将以k-v对从inputsplit中正确读出来。



inputsplit源码


inputsplit也是一个抽象类,inputformat通过getsplits()切分成的分片就存储在这个类中,它在逻辑上包含了提供给处理这个inputsplit的mapper的所有k-v对。



1.getlength() 用来获取inputsplit的大小,以便对inputsplit进行排序。


2.getlocations() 则用来获取存储分片的位置列表,这些位置是本地的,不需要序列化。


3.getlocationinfo() 获取有关输入拆分存储在哪些节点,以及如何在每个位置存储信息。



recordreader源码



recordreader是用来从一个输入分片中读取一个一个的k-v对的抽象类,我们可以将其看作是inputsplit上的迭代器。



initialize() 初始化recordreader。


1. nextkeyvalue()      recordreader中最主要的方法,由它获取分片上的下一个k-v对。


2. getcurrentkey() 获取当前的key。


3.getcurrentvalue() 获取当前的value。


4.getprogress() 记录recordreader当前通过其数据的进展。


5.close() 关闭recordreader。



具体实现 (按行读取数据)

 

sept1:通过getsplits()获取文件切分



fileinputformat(inputformat的子类)中的getsplits()



主要功能:

1.获取inputsplit的splitsize。可以通过设置mapred.min.split.size和mapred.max.split.size来设置,默认情况下min为1,max为block的大小。


2.判断文件是否可以切分。比如,密码文件、压缩文件只能按照一个分片进行处理。


3.通过computesplitsize计算出splitsize。计算方法是:math.max(minsize,      math.min(maxsize, blocksize))。也就是保证在minsize和maxsize之间,且如果minsize<=blocksize<=maxsize,则设为blocksize。


4.通过add()将分片加入列表,该方法中通过makesplit()实现逻辑块的切分。


5.分析文件长度不为0程序如何执行。

      

样例:

      文件大小300,length=300,bytesremaining=300

  • 执行第一次makesplit(0,128)  按splitsize=128切分

  • bytesremaining=300-128=172

  • 执行第二次makesplit(300-172=128,128)

  • bytesremaining=172-128=44

  • 执行第三次makesplit(300-44=256,128)



sept2:通过createrecordreader()处理map任务



textinputformat(fileinputformat的子类)中的createrecordreader()


 

主要功能:

1.设置终止符。textinputformat.record.delimiter 指的是读取一行的数据的终止符,即遇到终止符时,这一行的读取结束。可以通过configuration的set()方法来设置自定义的终止符,如果没有设置,那么hadoop就采用以cr,lf或者crlf作为终止符。


2.获取一个linerecordreader对象,读取inputsplit。



linerecordreader(recordreader的子类)nextkeyvalue()

 

主要功能是通过nextkeyvalue()方法给key和value赋值。由readline()方法从输入流中读取给定文本,返回值为被读取字节的数量 newsize = in.readline(value, maxlinelength, maxbytestoconsume(pos))。读取一行数据,将数据放入value中,返回值为被读取字节的长度,还包括新行(换行)。



样例:

hello analysys 

hello  me

  • 上述文件会被切分成一个split

  • 第一次调用nextkeyvalue()的时候start=0,value=hello analysys,end=24, pos=0,key=0,newsize=15

  • 第二次调用nextkeyvalue()的时候key=15,value=hello  me,newsize=10          



sept3:通过run()调用nextkeyvalue()



mapper中的run()



主要功能是通过context中nextkeyvalue()读取recordreader中的数据。



mapcontextimp(mapcontext的子类,mapcontext在context中构造)中的run()



主要功能是使用了一个recordreader进行构造。



以上就是mapreduce中inputformat以及相关类源码的解析,后续将继续讨论mapreduce中的其他关键类源码。

 

热门文章

  • 关注微信公众号

  • 市场合作电话

    4006-010-231

    会员服务邮箱

    [email protected]

  • 关注微信公众号

  • 市场合作电话

    4006-010-231

    会员服务邮箱

    [email protected]