Quick-Start Guide
本指南通过使用spark-shell简要介绍了Hudi功能。使用Spark数据源,我们将通过代码段展示如何插入和更新的Hudi默认存储类型数据集: 写时复制。每次写操作之后,我们还将展示如何读取快照和增量读取数据。
设置spark-shell
Hudi适用于Spark-2.x版本。您可以按照此处的说明设置spark。 在提取的目录中,使用spark-shell运 行Hudi:
bin/spark-shell --packages org.apache.hudi:hudi-spark-bundle:0.5.0-incubating --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer'
设置表名、基本路径和数据生成器来为本指南生成记录。
import org.apache.hudi.QuickstartUtils._
import scala.collection.JavaConversions._
import org.apache.spark.sql.SaveMode._
import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.config.HoodieWriteConfig._
val tableName = "hudi_cow_table"
val basePath = "file:///tmp/hudi_cow_table"
val dataGen = new DataGenerator
插入数据
生成一些新的行程样本,将其加载到DataFrame中,然后将DataFrame写入Hudi数据集中,如下所示。
val inserts = convertToStringList(dataGen.generateInserts(10))
val df = spark.read.json(spark.sparkContext.parallelize(inserts, 2))
df.write.format("org.apache.hudi").
options(getQuickstartWriteConfigs).
option(PRECOMBINE_FIELD_OPT_KEY, "ts").
option(RECORDKEY_FIELD_OPT_KEY, "uuid").
option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
option(TABLE_NAME, tableName).
mode(Overwrite).
save(basePath);
mode(Overwrite)
覆盖并重新创建数据集(如果已经存在)。
您可以检查在/tmp/hudi_cow_table/<region>/<country>/<city>/
下生成的数据。我们提供了一个记录键
(schema中的uuid
),分区字段(region/county/city
)和组合逻辑(schema中的ts
)
以确保行程记录在每个分区中都是唯一的。更多信息请参阅
对Hudi中的数据进行建模,
有关将数据提取到Hudi中的方法的信息,请参阅写入Hudi数据集。
这里我们使用默认的写操作:插入更新
。 如果您的工作负载没有更新
,也可以使用更快的插入
或批量插入
操作。
想了解更多信息,请参阅写操作
查询数据
将数据文件加载到DataFrame中。
val roViewDF = spark.
read.
format("org.apache.hudi").
load(basePath + "/*/*/*/*")
roViewDF.registerTempTable("hudi_ro_table")
spark.sql("select fare, begin_lon, begin_lat, ts from hudi_ro_table where fare > 20.0").show()
spark.sql("select _hoodie_commit_time, _hoodie_record_key, _hoodie_partition_path, rider, driver, fare from hudi_ro_table").show()
该查询提供已提取数据的读取优化视图。由于我们的分区路径(region/country/city
)是嵌套的3个级别
从基本路径开始,我们使用了load(basePath + "/*/*/*/*")
。
有关支持的所有存储类型和视图的更多信息,请参考存储类型和视图。