时至今日,Spark 已成为大数据领域最火的一个开源项目,具备高性能、易于使用等特性。然而作为一个年轻的开源项目,其使用上存在的挑战亦不可为不大,这里为大家分享 SciSpike 软件架构师 Ashwini Kuntamukkala 在 Dzone 上进行的 Spark 入门总结(虽然有些地方基于的是 Spark 1.0 版本,但仍然值得阅读)—— Apache Spark:An Engine for Large-Scale Data Processing。
本文聚焦 Apache Spark 入门,了解其在大数据领域的地位,覆盖 Apache Spark 的安装及应用程序的建立,并解释一些常见的行为和操作。
时下,我们正处在一个“大数据”的时代,每时每刻,都有各种类型的数据被生产。而在此紫外,数据增幅的速度也在显著增加。从广义上看,这些数据包含交易数据、社交媒体内容(比如文本、图像和视频)以及传感器数据。那么,为什么要在这些内容上投入如此多精力,其原因无非就是从海量数据中提取洞见可以对生活和生产实践进行很好的指导。
在几年前,只有少部分公司拥有足够的技术力量和资金去储存和挖掘大量数据,并对其挖掘从而获得洞见。然而,被雅虎 2009 年开源的 Apache Hadoop 对这一状况产生了颠覆性的冲击——通过使用商用服务器组成的集群大幅度地降低了海量数据处理的门槛。因此,许多行业(比如 Health care、Infrastructure、Finance、Insurance、Telematics、Consumer、Retail、Marketing、E-commerce、Media、 Manufacturing 和 Entertainment)开始了 Hadoop 的征程,走上了海量数据提取价值的道路。着眼 Hadoop ,其主要提供了两个方面的功能:
下图展示了 MapReduce 的数据处理流程,其中一个 Map-Reduce step 的输出将作为下一个典型 Hadoop job 的输入结果。
在整个过程中,中间结果会借助磁盘传递,因此对比计算,大量的 Map-Reduced 作业都受限于 IO 。然而对于 ETL 、数据整合和清理这样的用例来说,IO 约束并不会产生很大的影响,因为这些场景对数据处理时间往往不会有较高的需求。然而,在现实世界中,同样存在许多对延时要求较为苛刻的用例,比如:
毫无疑问,历经数年发展,Hadoop 生态圈中的丰富工具已深受用户喜爱,然而这里仍然存在众多问题给使用带来了挑战:
每个用例都需要多个不同的技术堆栈来支撑,在不同使用场景下,大量的解决方案往往捉襟见肘。
在生产环境中机构往往需要精通数门技术。
许多技术存在版本兼容性问题。
无法在并行 job 中更快地共享数据。
而通过 Apache Spark,上述问题迎刃而解!Apache Spark 是一个轻量级的内存集群计算平台,通过不同的组件来支撑批、流和交互式用例,如下图。
Apache Spark 是个开源和兼容 Hadoop 的集群计算平台。由加州大学伯克利分校的 AMPLabs 开发,作为 Berkeley Data Analytics Stack(BDAS) 的一部分,当下由大数据公司 Databricks 保驾护航,更是 Apache 旗下的顶级项目,下图显示了 Apache Spark 堆栈中的不同组件。
Apache Spark 的5大优势:
更高的性能,因为数据被加载到集群主机的分布式内存中。数据可以被快速的转换迭代,并缓存用以后续的频繁访问需求。很多对 Spark 感兴趣的朋友可能也会听过这样一句话——在数据全部加载到内存的情况下, Spark 可以比 Hadoop 快 100 倍,在内存不够存放所有数据的情况下快 Hadoop 10 倍。
通过建立在 Java、Scala、Python、SQL (应对交互式查询)的标准 API 以方便各行各业使用,同时还含有大量开箱即用的机器学习库。
与现有 Hadoop v1 ( SIMR ) 和 2.x (YARN) 生态兼容,因此机构可以进行无缝迁移。
方便下载和安装。方便的 shell(REPL: Read-Eval-Print-Loop)可以对 API 进行交互式的学习。
借助高等级的架构提高生产力,从而可以讲精力放到计算上。
同时, Apache Spark 由 Scala 实现,代码非常简洁。
下表列出了一些重要链接和先决条件:
Current Release | 1.0.1 @http://d3kbcqa49mib13.cloudfront.net/spark-1.0.1.tgz |
---|---|
Downloads Page | https://spark.apache.org/downloads.html |
JDK Version (Required) | 1.6 or higher |
Scala Version (Required) | 2.10 or higher |
Python (Optional) | [2.6, 3.0) |
Simple Build Tool (Required) | http://www.scala-sbt.org |
Development Version | git clone git://github.com/apache/spark.git |
Building Instructions | https://spark.apache.org/docs/latest/building-with-maven.html |
Maven | 3.0 or higher |
如上图所示,Apache Spark 的部署方式包括 standalone、Hadoop V1 SIMR、Hadoop 2 YARN/Mesos 。Apache Spark 需求一定的 Java、Scala 或 Python 知识。这里,我们将专注 standalone 配置下的安装和运行。
安装 JDK 1.6+、Scala 2.10+、Python [2.6,3] 和 sbt
下载 Apache Spark 1.0.1 Release
在指定目录下 Untar 和 Unzip spark-1.0.1.tgz
akuntamukkala@localhost~/Downloads$ pwd /Users/akuntamukkala/Downloads akuntamukkala@localhost~/Downloads$ tar -zxvf spark- 1.0.1.tgz -C /Users/akuntamukkala/spark
运行 sbt 建立 Apache Spark
akuntamukkala@localhost~/spark/spark-1.0.1$ pwd /Users/akuntamukkala/spark/spark-1.0.1 akuntamukkala@localhost~/spark/spark-1.0.1$ sbt/sbt assembly
发布 Scala 的 Apache Spark standalone REPL
/Users/akuntamukkala/spark/spark-1.0.1/bin/spark-shell
如果是 Python
/Users/akuntamukkala/spark/spark-1.0.1/bin/ pyspark
查看 SparkUI @ http://localhost:4040
Spark 引擎提供了在集群中所有主机上进行分布式内存数据处理的能力,下图显示了一个典型 Spark job 的处理流程。
下图显示了 Apache Spark 如何在集群中执行一个作业。
Master 控制数据如何被分割,利用了数据本地性,并在 Slaves 上跟踪所有分布式计算。在某个Slave不可用时,其存储的数据会分配给其他可用的 Slaves 。虽然当下( 1.0.1 版本) Master 还存在单点故障,但后期必然会被修复。
弹性分布式数据集(RDD,从 Spark 1.3 版本开始已被 DataFrame 替代)是 Apache Spark 的核心理念。它是由数据组成的不可变分布式集合,其主要进行两个操作:transformation 和 action 。Transformation 是类似在 RDD 上做 filter()、map() 或 union() 以生成另一个 RDD 的操作,而 action 则是 count()、first()、take(n)、collect() 等促发一个计算并返回值到 Master 或者稳定存储系统的操作。Transformations 一般都是 lazy 的,直到 action 执行后才会被执行。Spark Master/Driver 会保存 RDD 上的 Transformations 。这样一来,如果某个 RDD 丢失(也就是 salves 宕掉),它可以快速和便捷地转换到集群中存活的主机上。这也就是 RDD 的弹性所在。
下图展示了 Transformation 的 lazy :
我们可以通过下面示例来理解这个概念:从文本中发现 5 个最常用的 word 。下图显示了一个可能的解决方案。
在上面命令中,我们对文本进行读取并且建立字符串的 RDD 。每个条目代表了文本中的 1 行。
scala> val hamlet = sc.textFile(“/Users/akuntamukkala/temp/gutenburg.txt”)
hamlet: org.apache.spark.rdd.RDD[String] = MappedRDD[1] at textFile at <console>:12
scala> val topWordCount = hamlet.flatMap(str=>str.split(“ “)). filter(!_.isEmpty).map(word=>(word,1)).reduceByKey(_+_).map{case (word, count) => (count, word)}.sortByKey(false)
topWordCount: org.apache.spark.rdd.RDD[(Int, String)] = MapPartitionsRDD[10] at sortByKey at <console>:14
通过上述命令我们可以发现这个操作非常简单——通过简单的 Scala API 来连接 transformations 和 actions 。
可能存在某些 words 被 1 个以上空格分隔的情况,导致有些 words 是空字符串,因此需要使用 filter(!_.isEmpty) 将它们过滤掉。
每个 word 都被映射成一个键值对:map(word=>(word,1))。
为了合计所有计数,这里需要调用一个 reduce 步骤—— reduceByKey(+) 。 + 可以非常便捷地为每个 key 赋值。
我们得到了 words 以及各自的 counts,下一步需要做的是根据 counts 排序。在 Apache Spark ,用户只能根据 key 排序,而不是值。因此,这里需要使用 map{case (word, count) => (count, word)} 将 (word, count) 流转到 (count, word)。
需要计算最常用的 5 个 words ,因此需要使用 sortByKey(false) 做一个计数的递减排序。
上述命令包含了一个 .take(5) (an action operation, which triggers computation) 和在 /Users/akuntamukkala/temp/gutenburg.txt 文本中输出 10 个最常用的 words 。在 Python shell 中用户可以实现同样的功能。
RDD lineage 可以通过 toDebugString (一个值得记住的操作)来跟踪。
scala> topWordCount.take(5).foreach(x=>println(x))
(1044,the)
(730,and)
(679,of)
(648,to)
(511,I)
常用的 Transformations:
Transformation & Purpose | Example & Result |
---|---|
filter(func) Purpose: new RDD by selecting those data elements on which func returns true | scala> val rdd = sc.parallelize(List(“ABC”,”BCD”,”DEF”)) scala> val filtered = rdd.filter(_.contains(“C”)) scala> filtered.collect() Result:<br> Array[String] = Array(ABC, BCD) |
map(func) Purpose: return new RDD by applying func on each data element | scala> val rdd=sc.parallelize(List(1,2,3,4,5)) scala> val times2 = rdd.map(_2) scala> times2.collect() **Result*:<br> Array[Int] = Array(2, 4, 6, 8, 10) |
flatMap(func) Purpose: Similar to map but func returns a Seq instead of a value. For example, mapping a sentence into a Seq of words | scala> val rdd=sc.parallelize(List(“Spark is awesome”,”It is fun”)) <br> scala> val fm=rdd.flatMap(str=>str.split(“ “)) scala> fm.collect() Result:<br> Array[String] = Array(Spark, is, awesome, It, is, fun) |
reduceByKey(func,[numTasks]) Purpose: To aggregate values of a key using a function. “numTasks” is an optional parameter to specify number of reduce tasks scala> val word1=fm.map(word=>(word,1)) | scala> val wrdCnt=word1.reduceByKey(+) scala> wrdCnt.collect() <br>Result: <br>Array[(String, Int)] = Array((is,2), (It,1), (awesome,1), (Spark,1), (fun,1)) |
groupByKey([numTasks]) Purpose: To convert (K,V) to (K,Iterable<V>) | scala> val cntWrd = wrdCnt.map { case ( word, count ) => (count, word ) } scala> cntWrd.groupByKey().collect() <br>Result:<br>Array[(Int, Iterable[String])] = Array((1,ArrayBuffer(It, awesome, Spark, fun)), (2,ArrayBuffer(is))) |
distinct([numTasks]) Purpose: Eliminate duplicates from RDD | scala> fm.distinct().collect()<br> Result:<br> Array[String] = Array(is, It, awesome, Spark, fun) |
常用的集合操作:
Transformation and Purpose | Example and Result |
---|---|
union() Purpose: new RDD containing all elements from source RDD and argument. | Scala> val rdd1=sc.parallelize(List(‘A’,’B’)) scala> val rdd2=sc.parallelize(List(‘B’,’C’)) scala> rdd1.union(rdd2).collect() <br>Result: Array[Char] = Array(A, B, B, C) |
intersection() Purpose: new RDD containing only common elements from source RDD and argument. | Scala> rdd1.intersection(rdd2).collect() <br>Result: Array[Char] = Array(B) |
cartesian() Purpose: new RDD cross product of all elements from source RDD and argument | Scala> rdd1.cartesian(rdd2).collect()<br>Result: Array[(Char, Char)] = Array((A,B), (A,C), (B,B), (B,C)) |
subtract() Purpose: new RDD created by removing data elements in source RDD in common with argument | scala> rdd1.subtract(rdd2).collect() <br>Result: Array[Char] = Array(A) |
join(RDD,[numTasks]) Purpose: When invoked on (K,V) and (K,W), this operation creates a new RDD of (K, (V,W)) | scala> val personFruit = sc.parallelize(Seq((“Andy”, “Apple”), (“Bob”, “Banana”), (“Charlie”, “Cherry”), (“Andy”,”Apricot”))) scala> val personSE = sc.parallelize(Seq((“Andy”, “Google”), (“Bob”, “Bing”), (“Charlie”, “Yahoo”), (“Bob”,”AltaVista”))) scala> personFruit.join(personSE).collect()<br> Result:<br>Array[(String, (String, String))] = Array((Andy,(Apple,Google)), (Andy,(Apricot,Google)), (Charlie,(Cherry,Yahoo)), (Bob,(Banana,Bing)), (Bob,(Banana,AltaVista))) |
cogroup(RDD,[numTasks])Purpose: To convert (K,V) to (K,Iterable<V>) | scala> personFruit.cogroup(personSe).collect() Result:<br> Array[(String, (Iterable[String], Iterable[String]))] = Array((Andy,(ArrayBuffer(Apple, Apricot),ArrayBuffer(google))), (Charlie,(ArrayBuffer(Cherry),ArrayBuffer(Yahoo))), (Bob,(ArrayBuffer(Banana),ArrayBuffer(Bing, AltaVista)))) |
更多 transformations 信息,请查看 http://spark.apache.org/docs/latest/programming-guide.html#transformations
常用的 actions
Action & Purpose | Example & Result |
---|---|
count() Purpose: get the number of data elements in the RDD | scala> val rdd = sc.parallelize(list(‘A’,’B’,’c’)) scala> rdd.count() <br>Result:<br> long = 3 |
collect()Purpose: get all the data elements in an RDD as an array | scala> val rdd = sc.parallelize(list(‘A’,’B’,’c’)) scala> rdd.collect() <br>Result: Array[char] = Array(A, B, c) |
reduce(func) Purpose: Aggregate the data elements in an RDD using this function which takes two arguments and returns one | scala> val rdd = sc.parallelize(list(1,2,3,4)) scala> rdd.reduce(+) <br>Result: Int = 10 |
take (n) Purpose: fetch first n data elements in an RDD. computed by driver program. | Scala> val rdd = sc.parallelize(list(1,2,3,4)) scala> rdd.take(2)<br> Result: Array[Int] = Array(1, 2) |
foreach(func) Purpose: execute function for each data element in RDD. usually used to update an accumulator(discussed later) or interacting with external systems. | Scala> val rdd = sc.parallelize(list(1,2,3,4)) scala> rdd.foreach(x=>println(“%s10=%s”. format(x,x*10))) **Result:*<br>1*10=10 4*10=40 3*10=30 2*10=20 |
first() Purpose: retrieves the first data element in RDD. | Similar to take(1) scala> val rdd = sc.parallelize(list(1,2,3,4)) scala> rdd.first() Result: Int = 1 |
saveAsTextFile(path) Purpose: Writes the content of RDD to a text file or a set of text files to local file system/ HDFS | scala> val hamlet = sc.textFile(“/users/akuntamukkala/ temp/gutenburg.txt”) scala>hamlet.filter(_.contains(“Shakespeare”)). saveAsTextFile(“/users/akuntamukkala/temp/ filtered”) <br>Result: akuntamukkala@localhost~/temp/filtered$ ls _SUCCESS part-00000 part-00001 |
更多 actions 参见 http://spark.apache.org/docs/latest/programming-guide.html#actions
新手福利: Apache Spark 入门攻略 - Part 2 链接: http://v2ex.com/t/205852
本文系 OneAPM 工程师编译整理。OneAPM 是中国基础软件领域的新兴领军企业,能帮助企业用户和开发者轻松实现:缓慢的程序代码和 SQL 语句的实时抓取。想阅读更多技术文章,请访问 OneAPM 新闻。