大数据人|大数据第一社区

 找回密码
 注册会员

扫一扫,访问微社区

Spark系列之四(完):MapReduce 转换到 Spark

2015-11-2 18:39| 发布者: admin| 查看: 12381| 评论: 0|来自: DB2中国实验室

摘要: Spark 是类似于 MapReduce 的计算引擎,它提出的内存方式解决了 MapReduce 存在的读取磁盘速度较慢的困难,此外,它基于 Scala 的函数式编程风格和 API,进行并行计算时效率很高。由 于 Spark 采用的是 RDD(弹性分布 ...

Spark 是类似于 MapReduce 的计算引擎,它提出的内存方式解决了 MapReduce 存在的读取磁盘速度较慢的困难,此外,它基于 Scala 的函数式编程风格和 API,进行并行计算时效率很高。

由 于 Spark 采用的是 RDD(弹性分布式结果集) 方式对数据进行计算,这种方式与 MapReduce 的 Map()、Reduce() 方式差距较大,所以很难直接使用 Mapper、Reducer 的 API,这也是阻碍 MapReduce 转为 Spark 的绊脚石。

Scala 或者 Spark 里面的 map() 和 reduce() 方法与 Hadoop MapReduce 里面的 map()、reduce() 方法相比,Hadoop MapReduce 的 API 更加灵活和复杂,下面列出了 Hadoop MapReduce 的一些特性:

  1. Mappers 和 Reducers 通常使用 key-value 键值对作为输入和输出;

  2. 一个 key 对应一个 Reducer 的 reduce;

  3. 每一个 Mapper 或者 Reducer 可能发出类似于 0,1 这样的键值对作为每一次输出;

  4. Mappers 和 Reducers 可能发出任意的 key 或者 value,而不是标准数据集方式;

  5. Mapper 和 Reducer 对象对每一次 map() 和 reduce() 的调用都存在生命周期。它们支持一个 setup() 方法和 cleanup() 方法,这些方法可以被用来在处理批量数据之前的操作。

试想这么一个场景,我们需要计算一个文本文件里每一行的字符数量。在 Hadoop MapReduce 里,我们需要为 Mapper 方法准备一个键值对,key 用作行的行数,value 的值是这一行的字符数量。

清单 9. MapReduce 方式 Map 函数
public class LineLengthCountMapper
 extends Mapper<LongWritable,Text,IntWritable,IntWritable> {
 @Override
 protected void map(LongWritable lineNumber, Text line, Context context)
 throws IOException, InterruptedException {
 context.write(new IntWritable(line.getLength()), new IntWritable(1));
 }
}

清单 9 所示代码,由于 Mappers 和 Reducers 只处理键值对,所以对于类 LineLengthCountMapper 而言,输入是 TextInputFormat 对象,它的 key 由行数提供,value 就是该行所有字符。换成 Spark 之后的代码如清单 10 所示。

清单 10. Spark 方式 Map 函数
lines.map(line => (line.length, 1))

在 Spark 里,输入是弹性分布式数据集 (Resilient Distributed Dataset),Spark 不需要 key-value 键值对,代之的是 Scala 元祖 (tuple),它是通过 (line.length, 1) 这样的 (a,b) 语法创建的。以上代码中 map() 操作是一个 RDD,(line.length, 1) 元祖。当一个 RDD 包含元祖时,它依赖于其他方法,例如 reduceByKey(),该方法对于重新生成 MapReduce 特性具有重要意义。

清单 11 所示代码是 Hadoop MapReduce 统计每一行的字符数,然后以 Reduce 方式输出。

清单 11. MapReduce 方式 Reduce 函数
public class LineLengthReducer
 extends Reducer<IntWritable,IntWritable,IntWritable,IntWritable> {
 @Override
 protected void reduce(IntWritable length, Iterable<IntWritable> counts, Context context)
 throws IOException, InterruptedException {
 int sum = 0;
 for (IntWritable count : counts) {
 sum += count.get();
 }
 context.write(length, new IntWritable(sum));
 }
}

Spark 里面的对应代码如清单 12 所示。

清单 12. Spark 方式 Reduce 函数
val lengthCounts = lines.map(line => (line.length, 1)).reduceByKey(_ + _)

Spark 的 RDD API 有一个 reduce() 方法,它会 reduce 所有的 key-value 键值对到一个独立的 value。

我们现在需要统计大写字母开头的单词数量,对于文本的每一行而言,一个 Mapper 可能需要统计很多个键值对,代码如清单 13 所示。

清单 13. MapReduce 方式计算字符数量
public class CountUppercaseMapper
 extends Mapper<LongWritable,Text,Text,IntWritable> {
 @Override
 protected void map(LongWritable lineNumber, Text line, Context context)
 throws IOException, InterruptedException {
 for (String word : line.toString().split(" ")) {
 if (Character.isUpperCase(word.charAt(0))) {
 context.write(new Text(word), new IntWritable(1));
 }
 }
 }
}

在 Spark 里面,对应的代码如清单 14 所示。

清单 14. Spark 方式计算字符数量
lines.flatMap(
_.split(" ").filter(word => Character.isUpperCase(word(0))).map(word => (word,1))
)

MapReduce 依赖的 Map 方法这里并不适用,因为每一个输入必须对应一个输出,这样的话,每一行可能占用到很多的输出。相反的,Spark 里面的 Map 方法比较简单。Spark 里面的方法是,首先对每一行数据进行汇总后存入一个输出结果物数组,这个数组可能是空的,也可能包含了很多的值,最终这个数组会作为一个 RDD 作为输出物。这就是 flatMap() 方法的功能,它对每一行文本里的单词转换成函数内部的元组后进行了过滤。

在 Spark 里面,reduceByKey() 方法可以被用来统计每篇文章里面出现的字母数量。如果我们想统计每一篇文章里面出现的大写字母数量,在 MapReduce 里程序可以如清单 15 所示。

清单 15. MapReduce 方式
public class CountUppercaseReducer
 extends Reducer<Text,IntWritable,Text,IntWritable> {
 @Override
 protected void reduce(Text word, Iterable<IntWritable> counts, Context context)
 throws IOException, InterruptedException {
 int sum = 0;
 for (IntWritable count : counts) {
 sum += count.get();
 }
 context.write(new Text(word.toString().toUpperCase()), new IntWritable(sum));
 }
}

在 Spark 里,代码如清单 16 所示。

清单 16. Spark 方式
groupByKey().map { case (word,ones) => (word.toUpperCase, ones.sum) }

groupByKey() 方法负责收集一个 key 的所有值,不应用于一个 reduce 方法。本例中,key 被转换成大写字母,值被直接相加算出总和。但这里需要注意,如果一个 key 与很多个 value 相关联,可能会出现 Out Of Memory 错误。

Spark 提供了一个简单的方法可以转换 key 对应的值,这个方法把 reduce 方法过程移交给了 Spark,可以避免出现 OOM 异常。

reduceByKey(_ + _).map { case (word,total) => (word.toUpperCase,total) }

setup() 方法在 MapReduce 里面主要的作用是在 map 方法开始前对输入进行处理,常用的场景是连接数据库,可以在 cleanup() 方法中释放在 setup() 方法里面占用的资源。

清单 17. MapReduce 方式
public class SetupCleanupMapper extends Mapper<LongWritable,Text,Text,IntWritable> {
 private Connection dbConnection;
 @Override
 protected void setup(Context context) {
 dbConnection = ...;
 }
 ...
 @Override
 protected void cleanup(Context context) {
 dbConnection.close();
 }
}

在 Spark 里面没有这样的方法。

结束语

通过本文的学习,读者了解了 MapReduce 和 Spark 之间的差异及切换成本。本文针对的是对 Spark 完全没有了解的用户,后续文章会从实际应用出发,从安装、应用程序的角度给出更加实际的教程。


鲜花

握手

雷人

路过

鸡蛋

最新评论

关闭

站长推荐上一条 /2 下一条


id="mn_portal" >首页Portalid="mn_P18" onmouseover="navShow('P18')">应用id="mn_P15" onmouseover="navShow('P15')">技术id="mn_P37" onmouseover="showMenu({'ctrlid':this.id,'ctrlclass':'hover','duration':2})">前沿id="mn_P36" onmouseover="navShow('P36')">宝箱id="mn_P61" onmouseover="showMenu({'ctrlid':this.id,'ctrlclass':'hover','duration':2})">专栏id="mn_P65" >企业id="mn_Nd633" >导航 折叠导航 关注微信 关注微博 关注我们

QQ|广告服务|关于我们|Archiver|手机版|小黑屋|大数据人 ( 鄂ICP备14012176号-2  

GMT+8, 2024-4-26 21:08 , Processed in 0.189267 second(s), 21 queries .

Powered by 小雄! X3.2

© 2014-2020 bigdataer Inc.

返回顶部