标签:: Spark

Spark-DataSet学习

1.DataSet相关概念Dataset是一个分布式的数据集。Dataset是Spark 1.6开始新引入的一个接口,它结合了RDD API的很多优点(包括强类型,支持lambda表达式等),以及Spark SQL的优点(优化后的执行引擎)。Dataset可以通过JVM对象来构造,然后通过transformation类算子(map,flatMap,filter等)来进行操作。Scala和Java的API中支持Dataset,但是Python不支持Dataset API。不过因为Python语言本身的天然动态特性,Dataset API的不少feature本身就已经具备了(比如可以通过row.columnName来直接获取某一行的某个字段)。R语言的情况跟Python也很类似。 Dataframe就是按列组织的Dataset。在逻辑概念上,可以大概认为Dataframe等同于关系型数据库中的表,或者是Python/R语言中的data frame,但是在底层做了大量的优化。Dataframe可以通过很多方式来构造:比如结构化的数据文件,Hive表,数据库,已有的RDD。Scala,Java,Python,R等语言都支持Dataframe。在Scala API中,Dataframe就是Dataset[Row]的类型别名。在Java中,需要使用Dataset来代表一个Dataframe。

Spark实例-自定义聚合函数

Spark自定义聚合函数时,需要实现UserDefinedAggregateFunction中8个方法: inputSchema:输入的数据类型 bufferSchema:中间聚合处理时,需要处理的数据类型 dataType:函数的返回类型 deterministic:是否是确定的 initialize:为每个分组的数据初始化 update:每个分组,有新的值进来时,如何进行分组的聚合计算 merge:由于Spark是分布式的,所以一个分组的数据,可能会在不同的节点上进行局部聚合,就是update,但是最后一个分组,在各节点上的聚合值,要进行Merge,也就是合并 evaluate:一个分组的聚合值,如何通过中间的聚合值,最后返回一个最终的聚合值实例代码:

Parquet元数据合并

当文件使用Parquet格式时,如果多次生成的文件列不同,可以进行元数据的合并,不用再像关系型数据库那样多个表关联。关键点sqlContext.read.option("mergeSchema",true)

Spark实例-DataFrame加载和保存数据

Spark加载不同格式文件时,调用sqlContext.read.format(“”).load方法 1val peopleDF=sqlContext.read.format("json").load("E:\\spark\\src\\main\\resources\\people.json") Spark将DataFrame写入到文件中时,调用DF.write.format(“”).save方法 12345peopleDF.select("name") .write.format("parquet") //.mode(SaveMode.ErrorIfExists) .mode(SaveMode.Append) .save("E:\\spark\\src\\main\\resources\\people")