文章目录
- 数据源
- 数据格式
- 保存
- JSON
- csv
- SequenceFile
- 对象文件
- 非文件系统数据源
- protocol buffer
- 文件压缩
- 文件系统
- Spark SQL
- Apache Hive
- 数据库
读取
数据源
- 本地或分布式文件系统(NFS、HDFS等)
- Spark中的结构化数据源
- Cassandra、HBase、Elasticsearch、JDBC源
数据格式
文本文件、JSON、CSV、SequenceFiles、Protocol Buffers、对象文件
文本文件
val input = sc.textFile("file:///home/spark/README.md")
这里"///"是指文件根目录
如果要读取一个目录有两种方式:
- textFile,传递一个目录作为参数,它会把各部分都读取到RDD中
- wholeTextFiles 返回一个pairRDD,键是输入文件的文件名。用于有必要知道数据的各部分来自哪个文件
通过wholeTextFile
读取并求每个文件的平均值
val input = sc.wholeTextFiles("file:///home/spark/saleFiles")
val result = input.mapValues{y =>
val nums = y.split(" ").map(x=>x.toDouble)
nums.sum / nums.size.toDouble
}
此外,Spark支持通配字符如part-*.txt
来用于大规模数据集
保存
result.saveAsTextFile(outputFile)
JSON
三种方式:一是文本读取,二是JSON序列化,三是使用自定义的Hadoop格式来操作JSON数据。
import org.apache.spark._
import scala.util.parsing.json.JSON
object JSONApp {
def main(args:Array[String]): Unit ={
val conf = new SparkConf().setMaster("local").setAppName("JSONApp");
val sc = new SparkContext(conf);
val inputFile = "file:///usr/local/spark/examples/src/main/resources/people.json"
val jsonStr = sc.textFile(inputFile);
val result = jsonStr.map(s => JSON.parseFull(s));
result.foreach(
{
r => r match {
case Some(map:Map[String,Any]) => println(map)
case None => println("parsing failed!")
case other => println("unknown data structure" + other)
}
}
);
}
}
使用第三方Jackson
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import com.fasterxml.jackson.module.scala.experimental.ScalaObjectMapper
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.databind.DeserializationFeature
...
case class Person(name: String, lovesPandas: Boolean) //Must be a top-level class
...
// Parse it into a specific case class. We use flatMap to handle errors
// by returning an empty list (None) if we encounter an issue and a
// list with one element if everything is ok (Some(_)).
val result = input.flatMap(record => {
try {
Some(mapper.readValue(record, classOf[Person]))
} catch {
case e: Exception => None
}
})
对于大规模数据集而言,格式错误是常见的现象,如果选择跳过格式不正确的数据,应该尝试通过累加器跟踪错误的个数。
保存JSON
result.filter(p => P.lovesPandas).map(mapper.writeValueAsString(_))
.saveAsTextFile(outputFile)
csv
如果所有数据没有包含换行符,也可以使用textFile()读取并解析数据
import Java.io.StringReader
import au.com.bytecode.opencsv.CSVReader
val input = sc.textFile(inputFile)
val result = input.map{ line=>
val reader = new CSVReader(new StringReader(line));
reader.readNext();
}
如果字段中嵌有换行符,就需要完整读取每个文件,然后解析各段。
case class Person(name:String,favoriteAnimal:String)
val input = sc.wholeTextFile(inputFile)
val result = input.flatMap{ case (_,txt) =>
val reader = new CSVReader(new StringReader(txt));
reader.readAll().map(x => Person(x(0),x(1)))
}
保存CSV,使用StringWriter或StringIO将结果放到RDD中
pandaLovers.map(person=>List(person.name,person.favoriteAnimal).toArray).mapPartitions{people =>
val stringWriter = new StringWriter();
val csvWriter = new CSVWriter(stringWriter);
csvWriter.writeAll(people.tolist)
Iterator(stringWriter.toString)
}.saveAsTextFile(outFile)
SequenceFile
是由没有相对关系结构的键值对文件组成的常用Hadoop格式。SequenceFile是由实现Hadoop的Writable接口元素组成。标准的经验法是尝试在类名的后面加上Writable这个词,检查它是否是org.apache.hadoop.io.Writable
,如果找不到对应的writable类型,可以通过overrideorg.apache.hadoop.io.Writable
的readfields和write类来实现自己的Writable类。
在SparkContext可以调用sequenceFile(path,keyClass,valueClass,minPartitions)
,keyClass和valueClass参数都必须使用正确的Writable类.
val data = sc.sequenceFile(inFile,classOf[Text],classOf[IntWritable]).map{case (x,y) => (x.toString,y.get())}
在Scala中保存SequenceFile
val data = sc.parallelize(List(("Panda",3),("Kay",6),("Snail",2)))
data.saveAsSequenceFile(outputFile)
对象文件
对象文件就像对SequenceFile的简单封装,它允许存储只包含值的RDD,对象文件是使用Java序列化写出来的。对象文件的输出和Hadoop的输出不一样,对象文件通常用于Spark作业间的通信。objeceFile()
读取对象文件返回RDD,savaAsObject
保存对象文件。
在Scala使用老式API读取KeyValueTextInputFormat
val input = sc.hadoopFile[Text,Text,KeyValueTextInputFormat](inputFile).map{
case (x,y) => (x.toString,y.toString)
}
我们也可以使用自定义Hadoop输入格式读取JSON数据。下面展示如何在Spark使用新式Hadoop API。
val input = sc.newAPIHadoopFile(inputFile,classOF[LzoJsonInputFormat],classOf[LongWritable],classOf[MapWritable],conf)
非文件系统数据源
hadoopFile/saveAsHadoopFile、hadoopDataset/saveAsHadoopDataset、newAPIHadoopDataset/saveAsNewAPIHadoopDataset来访问Hadoop所支持的费文件系统的存储格式,像Hbase和MongoDB这样的键值对存储都提供了用来直接读取Hadoop输入格式的接口
protocol buffer
PB是结构化数据,要求字段和类型都确定,是经过优化的编解码速度快,而且占用空间很小。
PB由可选字段、必须字段、重复字段三种字段组成。
下面是一个如何从简单的PB格式中读取许多VenueResponse对象。VenueResponse是一个只包含重复字段的简单格式,这个字段包含一条带有必须字段、可选字段以及枚举类型字段的PB消息.
PB的定义例子
message Venue{
required int32 id=1;
required string name=2;
required VenueType typr=3;
optional string address=4;
enum VenueType{
COFFEESHOP=0;
WORKPLACE=1;
CLUB=2;
OMNOMNOM=3;
OTHER=4;
}
}
message VenueResponse{
repeated Venue results=1;
}
使用Elephant Bird写出protocol buffer
val job = new Job()
val conf = job.getConfiguration
LzoProtobufBlockOutputFormat.setClassConf(classOf[Places.Venue],conf);
val dnaLounge = Places.Venue.newBuilder()
dnaLounge.setId(1);
dnaLounge.setName("DNA Lounge")
dnaLounge.setType(Places.Venue.VenueType.CLUB)
val data = sc.parallelize(List(dnaLounge.build()))
val outputData = data.map{pb=>
val protoWritable = ProtobufWritable.newInstance(classOf[Places.Venue]);
protoWritable.set(pb)
(null,protoWritable)
}
outputData.saveAsNewAPIHadoopFile(outputFile,classOf[Text],classOf[ProtobufWritable[Places.Venue]],classOf[LzoProtobufBlockOutputFormat[ProtobufWritable[Places.Venue]]],conf)
文件压缩
可用的压缩选项有:gzip、lzo、bzip2、zlib、Snappy
读取压缩的输入,推荐newAPIHadoopFile或者hadoopFile
文件系统
本地/常规文件系统
Spark支持从本地系统读取文件,不过他要求文件在集群所有节点的相同路径下都可以找到。
推荐先将文件放到hdfs、NFS、S3等共享文件系统上
以下是一个从本地读取压缩的文本文件
val rdd = sc.textFile("file:///home/spark/happypandas.gz")
在HDFS读取文件
val rdd = sc.textFile("hdfs://master:port/path")
Spark SQL
我们把一条SQL查询给Spark SQL,让它对一个数据源执行查询,然后得到由Row对象组成的RDD,每个Row对象表示一条记录。每个Row都有一个get()方法,会返回一个一般类型让我们进行类型转换,针对常见类型的专用get()方法(getFloat()、getInt()、getLong()、getString()、getShort()、getBoolean()).
Apache Hive
- 1.提供Hive的配置文件,将hive-sit.xml文件复制到Spark的./conf/目录
- 2.创建HiveContext对象
import org.apache.spark.sql.hive.HiveContext
val hiveCtx = new org.apache.spark.sql.hive.HiveContext(sc)
val rows = hiveCtx.sql("SELECT name,age FROM users")
val firstRow = rows.first()
println(firstRow.getString(0))
如果有记录间结构一致的JSON数据,Spark SQL可以自动腿短结构信息,并将数据读取为记录。要读取JSON数据,首先创建HiveContext,HiveContext.jsonFile方法来从整个文件获取由Row对象组成的RDD。
下列是一个JSON的示例
{"users":{"name":"Holden","location":"San Francisco"},"text":"Nice Dat out today"}
{"user":{"name":"Matei","location":"Berkeley"},"text":"Even nicer here :)"}
在Scala中使用Spark SQL读取JSON数据
val tweets = hiveCtx.jsonFile("tweets.json")
tweets.registerTempTable("tweets")
val results = hiveCtx.sql("SELECT user.name,text FROM tweets")
数据库
Spark可以从任何支持Java数据库连接(JDBC)的关系型数据库中读取数据,包括MySQL、Postgres、Cassandra、HBase、Elasticsearch,访问这些数据,需要构建一个org.apache.spark.rdd.JdbcRDD
def createConnection()={
Class.forName("com.mysql.jdbc.Driver").newInstance();
DriverManager.getConnection("jdbc:mysql://localhost/test?user=holden");
}
def extractValues(r:ResultSet) = {
(r.getInt(1),r.getString(2))
}
val data = new JdbcRDD(sc,createConnection,"SELECT * FROM panda WHERE ? <= id and id <= ?",lowerbound=1,upperBound=3,numPartition=2,mapRow=extractValues)
println(data.collect().toList)
JdbcRDD接受这样几个参数:
- 提供一个用于对数据库创建连接的函数,这个函数让每个节点在连接必要的配置后创建自己读取数据的连接
- 提供一个可以读取一定范围数据的查询,以及查询参数中lowerBound和upperBound的值,可以让Spark在不同机器上查询不同范围的数据。
- 最后一个参数是将输出结果从java.sql.Result转为对操作数据有用的格式的函数,我们会得到(Int,String)对。
HBase:由于org.apache.hadoop.hbase.mapreduce.TableInputFormat
类的实现,Spark可以通过Hadoop输入格式访问HBase。返回键值对数据。
从HBase读取数据
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.client.Result //值的类型
import org.apache.hadoop.hbase.io.ImmutableBytesWritable //键的类型
import org.apache.hadoop.hbase.mapreduce.TableInputFormat //优化HBase的读取的设置项
val conf = HBaseConfiguration.create()
conf.set(TableInputFormat.INPUT_TABLE,"tablename")
val rdd = sc.newAPIHadoopRDD(conf,classOf[TableInputFormat],classOf[ImmutableBytesWritable],classOf[Result])