大数据人|大数据第一社区

 找回密码
 注册会员

扫一扫,访问微社区

Spark系列之一:Spark,一种快速数据分析替代方案

2015-11-2 18:29| 发布者: admin| 查看: 12782| 评论: 0|来自: DB2中国实验室

摘要: Spark 是一种与 Hadoop 相似的开源集群计算环境,但是两者之间还存在一些不同之处,这些有用的不同之处使 Spark 在某些工作负载方面表现得更加优越,换句话说,Spark 启用了内存分布数据集,除了能够提供交互式查询 ...
Spark系列之一:Spark,一种快速数据分析替代方案(二)

安装 Scala 和 Spark

第一步是下载和配置 Scala。清单 4 中显示的命令阐述了 Scala 安装的下载和准备工作。使用 Scala v2.8,因为这是经过证实的 Spark 所需的版本。

清单 4. 安装 Scala
$ wget http://www.scala-lang.org/downloads/distrib/files/scala-2.8.1.final.tgz$ sudo tar xvfz scala-2.8.1.final.tgz --directory /opt/

要使 Scala 可视化,请将下列行添加至您的 .bashrc 中(如果您正使用 Bash 作为 shell):

export SCALA_HOME=/opt/scala-2.8.1.final
export PATH=$SCALA_HOME/bin:$PATH

接着可以对您的安装进行测试,如 清单 5 所示。这组命令会将更改加载至 bashrc 文件中,接着快速测试 Scala 解释器 shell。

清单 5. 配置和运行交互式 Scala
$ scalaWelcome to Scala version 2.8.1.final (OpenJDK Client VM, Java 1.6.0_20).
Type in expressions to have them evaluated.
Type :help for more information.

scala> println("Scala is installed!")Scala is installed!

scala> :quit$

如清单中所示,现在应该看到一个 Scala 提示。您可以通过输入 :quit 执行退出。注意,Scala 要在 JVM 的上下文中执行操作,所以您会需要 JVM。我使用的是 Ubuntu,它在默认情况下会提供 OpenJDK。

接下来,请获取最新的 Spark 框架副本。为此,请使用 清单 6 中的脚本。

清单 6. 下载和安装 Spark 框架
wget https://github.com/mesos/spark/tarball/0.3-scala-2.8/
mesos-spark-0.3-scala-2.8-0-gc86af80.tar.gz
$ sudo tar xvfz mesos-spark-0.3-scala-2.8-0-gc86af80.tar.gz

接下来,使用下列行将 spark 配置设置在 Scala 的根目录 ./conf/spar-env.sh 中:

export SCALA_HOME=/opt/scala-2.8.1.final

设置的最后一步是使用简单的构建工具 (sbt) 更新您的分布。sbt 是一款针对 Scala 的构建工具,用于 Spark 分布中。您可以在 mesos-spark-c86af80 子目录中执行更新和变异步骤,如下所示:

$ sbt/sbt update compile

注意,在执行此步骤时,需要连接至 Internet。当完成此操作后,请执行 Spark 快速检测,如 清单 7 所示。在该测试中,需要运行 SparkPi 示例,它会计算 pi 的估值(通过单位平方中的任意点采样)。所显示的格式需要样例程序 (spark.examples.SparkPi) 和主机参数,该参数定义了 Mesos 主机(在此例中,是您的本地主机,因为它是一个单节点集群)和要使用的线程数量。注意,在 清单 7 中,执行了两个任务,而且这两个任务被序列化(任务 0 开始和结束之后,任务 1 再开始)。

清单 7. 对 Spark 执行快速检测
$ ./run spark.examples.SparkPi local[1]11/08/26 19:52:33 INFO spark.CacheTrackerActor: Registered actor on port 50501
11/08/26 19:52:33 INFO spark.MapOutputTrackerActor: Registered actor on port 50501
11/08/26 19:52:33 INFO spark.SparkContext: Starting job...
11/08/26 19:52:33 INFO spark.CacheTracker: Registering RDD ID 0 with cache
11/08/26 19:52:33 INFO spark.CacheTrackerActor: Registering RDD 0 with 2 partitions
11/08/26 19:52:33 INFO spark.CacheTrackerActor: Asked for current cache locations
11/08/26 19:52:33 INFO spark.LocalScheduler: Final stage: Stage 0
11/08/26 19:52:33 INFO spark.LocalScheduler: Parents of final stage: List()
11/08/26 19:52:33 INFO spark.LocalScheduler: Missing parents: List()
11/08/26 19:52:33 INFO spark.LocalScheduler: Submitting Stage 0, which has no missing ...
11/08/26 19:52:33 INFO spark.LocalScheduler: Running task 0
11/08/26 19:52:33 INFO spark.LocalScheduler: Size of task 0 is 1385 bytes
11/08/26 19:52:33 INFO spark.LocalScheduler: Finished task 0
11/08/26 19:52:33 INFO spark.LocalScheduler: Running task 1
11/08/26 19:52:33 INFO spark.LocalScheduler: Completed ResultTask(0, 0)
11/08/26 19:52:33 INFO spark.LocalScheduler: Size of task 1 is 1385 bytes
11/08/26 19:52:33 INFO spark.LocalScheduler: Finished task 1
11/08/26 19:52:33 INFO spark.LocalScheduler: Completed ResultTask(0, 1)
11/08/26 19:52:33 INFO spark.SparkContext: Job finished in 0.145892763 s
Pi is roughly 3.14952
$

通过增加线程数量,您不仅可以增加线程执行的并行化,还可以用更少的时间执行作业(如 清单 8 所示)。

清单 8. 对包含两个线程的 Spark 执行另一个快速检测
$ ./run spark.examples.SparkPi local[2]11/08/26 20:04:30 INFO spark.MapOutputTrackerActor: Registered actor on port 50501
11/08/26 20:04:30 INFO spark.CacheTrackerActor: Registered actor on port 50501
11/08/26 20:04:30 INFO spark.SparkContext: Starting job...
11/08/26 20:04:30 INFO spark.CacheTracker: Registering RDD ID 0 with cache
11/08/26 20:04:30 INFO spark.CacheTrackerActor: Registering RDD 0 with 2 partitions
11/08/26 20:04:30 INFO spark.CacheTrackerActor: Asked for current cache locations
11/08/26 20:04:30 INFO spark.LocalScheduler: Final stage: Stage 0
11/08/26 20:04:30 INFO spark.LocalScheduler: Parents of final stage: List()
11/08/26 20:04:30 INFO spark.LocalScheduler: Missing parents: List()
11/08/26 20:04:30 INFO spark.LocalScheduler: Submitting Stage 0, which has no missing ...
11/08/26 20:04:30 INFO spark.LocalScheduler: Running task 0
11/08/26 20:04:30 INFO spark.LocalScheduler: Running task 1
11/08/26 20:04:30 INFO spark.LocalScheduler: Size of task 1 is 1385 bytes
11/08/26 20:04:30 INFO spark.LocalScheduler: Size of task 0 is 1385 bytes
11/08/26 20:04:30 INFO spark.LocalScheduler: Finished task 0
11/08/26 20:04:30 INFO spark.LocalScheduler: Finished task 1
11/08/26 20:04:30 INFO spark.LocalScheduler: Completed ResultTask(0, 1)
11/08/26 20:04:30 INFO spark.LocalScheduler: Completed ResultTask(0, 0)
11/08/26 20:04:30 INFO spark.SparkContext: Job finished in 0.101287331 s
Pi is roughly 3.14052
$

使用 Scala 构建一个简单的 Spark 应用程序

要构建 Spark 应用程序,您需要单一 Java 归档 (JAR) 文件形式的 Spark 及其依赖关系。使用sbt 在 Spark 的顶级目录中创建该 JAR 文件,如下所示:

$ sbt/sbt assembly

结果产生一个文件 ./core/target/scala_2.8.1/"Spark Core-assembly-0.3.jar"。将该文件添加至您的 CLASSPATH 中,以便可以访问它。在本示例中,不会用到此 JAR 文件,因为您将会使用 Scala 解释器运行它,而不是对其进行编译。

在本示例中,使用了标准的 MapReduce 转换(如 清单 9 所示)。该示例从执行必要的 Spark 类导入开始。接着,需要定义您的类 (SparkTest) 及其主方法,用它解析稍后使用的参数。这些参数定义了执行 Spark 的环境(在本例中,该环境是一个单节点集群)。接下来,要创建SparkContext 对象,它会告知 Spark 如何对您的集群进行访问。该对象需要两个参数:Mesos 主机名称(已传入)以及您分配给作业的名称 (SparkTest)。解析命令行中的切片数量,它会告知 Spark 用于作业的线程数量。要设置的最后一项是指定用于 MapReduce 操作的文本文件。

最后,您将了解 Spark 示例的实质,它是由一组转换组成。使用您的文件时,可调用 flatMap方法返回一个 RDD(通过指定的函数将文本行分解为标记)。然后通过 map 方法(该方法创建了键值对)传递此 RDD ,最终通过 ReduceByKey 方法合并键值对。合并操作是通过将键值对传递给 _ + _ 匿名函数来完成的。该函数只采用两个参数(密钥和值),并返回将两者合并所产生的结果(一个 String 和一个 Int)。接着以文本文件的形式发送该值(到输出目录)。

清单 9. Scala/Spark 中的 MapReduce (SparkTest.scala)
import spark.SparkContext
import SparkContext._
 
object SparkTest {
 
  def main( args: Array[String]) {
 
    if (args.length == 0) {
      System.err.println("Usage: SparkTest  []")
      System.exit(1)
    }
 
    val spark = new SparkContext(args(0), "SparkTest")
    val slices = if (args.length > 1) args(1).toInt else 2
 
    val myFile = spark.textFile("test.txt")
    val counts = myFile.flatMap(line => line.split(" "))
                        .map(word => (word, 1))
                        .reduceByKey(_ + _)
 
    counts.saveAsTextFile("out.txt")
 
  }
 
}
 
SparkTest.main(args)

要执行您的脚本,只需要执行以下命令:

$ scala SparkTest.scala local[1]

您可以在输出目录中找到 MapReduce 测试文件(如 output/part-00000)。

其他的大数据分析框架

自从开发了 Hadoop 后,市场上推出了许多值得关注的其他大数据分析平台。这些平台范围广阔,从简单的基于脚本的产品到与 Hadoop 类似的生产环境。

名为 bashreduce 的平台是这些平台中最简单的平台之一,顾名思义,它充许您在 Bash 环境中的多个机器上执行 MapReduce 类型的操作。bashreduce 依赖于您计划使用的机器集群的 Secure Shell(无密码),并以脚本的形式存在,通过它,您可以使用 UNIX®-style 工具(sortawknetcat 等)请求作业。

GraphLab 是另一个受人关注的 MapReduce 抽象实现,它侧重于机器学习算法的并行实现。在 GraphLab 中,Map 阶段会定义一些可单独(在独立主机上)执行的计算指令,而 Reduce 阶段会对结果进行合并。

最后,大数据场景的一个新成员是来自 Twitter 的 Storm(通过收购 BackType 获得)。Storm 被定义为 “实时处理的 Hadoop”,它主要侧重于流处理和持续计算(流处理可以得出计算的结果)。Storm 是用 Clojure 语言(Lisp 语言的一种方言)编写的,但它支持用任何语言(比如 Ruby 和 Python)编写的应用程序。Twitter 于 2011 年 9 月以开源形式发布 Storm。

12

鲜花

握手

雷人

路过

鸡蛋

最新评论

关闭

站长推荐上一条 /2 下一条


id="mn_portal" >首页Portalid="mn_P18" onmouseover="navShow('P18')">应用id="mn_P15" onmouseover="navShow('P15')">技术id="mn_P37" onmouseover="showMenu({'ctrlid':this.id,'ctrlclass':'hover','duration':2})">前沿id="mn_P36" onmouseover="navShow('P36')">宝箱id="mn_P61" onmouseover="showMenu({'ctrlid':this.id,'ctrlclass':'hover','duration':2})">专栏id="mn_P65" >企业id="mn_Nd633" >导航 折叠导航 关注微信 关注微博 关注我们

QQ|广告服务|关于我们|Archiver|手机版|小黑屋|大数据人 ( 鄂ICP备14012176号-2  

GMT+8, 2024-4-25 22:18 , Processed in 0.191773 second(s), 21 queries .

Powered by 小雄! X3.2

© 2014-2020 bigdataer Inc.

返回顶部