Docker Demo
一个使用 Docker 容器的 Demo
我们来使用一个真实世界的案例,来看看 Hudi 是如何闭环运转的。 为了这个目的,在你的计算机中的本地 Docker 集群中组建了一个自包含的数据基础设施。
以下步骤已经在一台 Mac 笔记本电脑上测试过了。
前提条件
- Docker 安装 : 对于 Mac ,请依照 [https://docs.docker.com/v17.12/docker-for-mac/install/] 当中定义的步骤。 为了运行 Spark-SQL 查询,请确保至少分配给 Docker 6 GB 和 4 个 CPU 。(参见 Docker -> Preferences -> Advanced)。否则,Spark-SQL 查询可能被因为内存问题而被杀停。
- kafkacat : 一个用于发布/消费 Kafka Topic 的命令行工具集。使用
brew install kafkacat
来安装 kafkacat 。 - /etc/hosts : Demo 通过主机名引用了多个运行在容器中的服务。将下列设置添加到 /etc/hosts :
127.0.0.1 adhoc-1
127.0.0.1 adhoc-2
127.0.0.1 namenode
127.0.0.1 datanode1
127.0.0.1 hiveserver
127.0.0.1 hivemetastore
127.0.0.1 kafkabroker
127.0.0.1 sparkmaster
127.0.0.1 zookeeper
此外,这未在其它一些环境中进行测试,例如 Windows 上的 Docker 。
设置 Docker 集群
构建 Hudi
构建 Hudi 的第一步:
cd <HUDI_WORKSPACE>
mvn package -DskipTests
组建 Demo 集群
下一步是运行 Docker 安装脚本并设置配置项以便组建集群。 这需要从 Docker 镜像库拉取 Docker 镜像,并设置 Docker 集群。
cd docker
./setup_demo.sh
....
....
....
Stopping spark-worker-1 ... done
Stopping hiveserver ... done
Stopping hivemetastore ... done
Stopping historyserver ... done
.......
......
Creating network "hudi_demo" with the default driver
Creating hive-metastore-postgresql ... done
Creating namenode ... done
Creating zookeeper ... done
Creating kafkabroker ... done
Creating hivemetastore ... done
Creating historyserver ... done
Creating hiveserver ... done
Creating datanode1 ... done
Creating presto-coordinator-1 ... done
Creating sparkmaster ... done
Creating presto-worker-1 ... done
Creating adhoc-1 ... done
Creating adhoc-2 ... done
Creating spark-worker-1 ... done
Copying spark default config and setting up configs
Copying spark default config and setting up configs
Copying spark default config and setting up configs
$ docker ps
至此, Docker 集群将会启动并运行。 Demo 集群提供了下列服务:
- HDFS 服务( NameNode, DataNode )
- Spark Master 和 Worker
- Hive 服务( Metastore, HiveServer2 以及 PostgresDB )
- Kafka Broker 和一个 Zookeeper Node ( Kafka 将被用来当做 Demo 的上游数据源 )
- 用来运行 Hudi/Hive CLI 命令的 Adhoc 容器
Demo
Stock Tracker 数据将用来展示不同的 Hudi 视图以及压缩带来的影响。
看一下 docker/demo/data
目录。那里有 2 批股票数据——都是 1 分钟粒度的。
第 1 批数据包含一些股票代码在交易窗口(9:30 a.m 至 10:30 a.m)的第一个小时里的行情数据数据。第 2 批包含接下来 30 分钟(10:30 - 11 a.m)的交易数据。 Hudi 将被用来将两个批次的数据采集到一个数据集中,这个数据集将会包含最新的小时级股票行情数据。
两个批次被有意地按窗口切分,这样在第 2 批数据中包含了一些针对第 1 批数据条目的更新数据。
Step 1 : 将第 1 批数据发布到 Kafka
将第 1 批数据上传到 Kafka 的 Topic “stock ticks” 中 cat docker/demo/data/batch_1.json | kafkacat -b kafkabroker -t stock_ticks -P
为了检查新的 Topic 是否出现,使用
kafkacat -b kafkabroker -L -J | jq .
{
"originating_broker": {
"id": 1001,
"name": "kafkabroker:9092/1001"
},
"query": {
"topic": "*"
},
"brokers": [
{
"id": 1001,
"name": "kafkabroker:9092"
}
],
"topics": [
{
"topic": "stock_ticks",
"partitions": [
{
"partition": 0,
"leader": 1001,
"replicas": [
{
"id": 1001
}
],
"isrs": [
{
"id": 1001
}
]
}
]
}
]
}
Step 2: 从 Kafka Topic 中增量采集数据
Hudi 自带一个名为 DeltaStreamer 的工具。 这个工具能连接多种数据源(包括 Kafka),以便拉取变更,并通过 upsert/insert 操作应用到 Hudi 数据集。此处,我们将使用这个工具从 Kafka Topic 下载 JSON 数据,并采集到前面步骤中初始化的 COW 和 MOR 表中。如果数据集不存在,这个工具将自动初始化数据集到文件系统中。
docker exec -it adhoc-2 /bin/bash
# Run the following spark-submit command to execute the delta-streamer and ingest to stock_ticks_cow dataset in HDFS
spark-submit --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer $HUDI_UTILITIES_BUNDLE --storage-type COPY_ON_WRITE --source-class org.apache.hudi.utilities.sources.JsonKafkaSource --source-ordering-field ts --target-base-path /user/hive/warehouse/stock_ticks_cow --target-table stock_ticks_cow --props /var/demo/config/kafka-source.properties --schemaprovider-class org.apache.hudi.utilities.schema.FilebasedSchemaProvider
# Run the following spark-submit command to execute the delta-streamer and ingest to stock_ticks_mor dataset in HDFS
spark-submit --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer $HUDI_UTILITIES_BUNDLE --storage-type MERGE_ON_READ --source-class org.apache.hudi.utilities.sources.JsonKafkaSource --source-ordering-field ts --target-base-path /user/hive/warehouse/stock_ticks_mor --target-table stock_ticks_mor --props /var/demo/config/kafka-source.properties --schemaprovider-class org.apache.hudi.utilities.schema.FilebasedSchemaProvider --disable-compaction
# As part of the setup (Look at setup_demo.sh), the configs needed for DeltaStreamer is uploaded to HDFS. The configs
# contain mostly Kafa connectivity settings, the avro-schema to be used for ingesting along with key and partitioning fields.
exit
你可以使用 HDFS 的 Web 浏览器来查看数据集
http://namenode:50070/explorer#/user/hive/warehouse/stock_ticks_cow
.
你可以浏览在数据集中新创建的分区文件夹,同时还有一个在 .hoodie 目录下的 deltacommit 文件 。
在 MOR 数据集中也有类似的设置
http://namenode:50070/explorer#/user/hive/warehouse/stock_ticks_mor
Step 3: 与 Hive 同步
到了这一步,数据集在 HDFS 中可用。我们需要与 Hive 同步来创建新 Hive 表并添加分区,以便在那些数据集上执行 Hive 查询。
docker exec -it adhoc-2 /bin/bash
# THis command takes in HIveServer URL and COW Hudi Dataset location in HDFS and sync the HDFS state to Hive
/var/hoodie/ws/hudi-sync/hudi-hive-sync/run_sync_tool.sh --jdbc-url jdbc:hive2://hiveserver:10000 --user hive --pass hive --partitioned-by dt --base-path /user/hive/warehouse/stock_ticks_cow --database default --table stock_ticks_cow
.....
2018-09-24 22:22:45,568 INFO [main] hive.HiveSyncTool (HiveSyncTool.java:syncHoodieTable(112)) - Sync complete for stock_ticks_cow
.....
# Now run hive-sync for the second data-set in HDFS using Merge-On-Read (MOR storage)
/var/hoodie/ws/hudi-sync/hudi-hive-sync/run_sync_tool.sh --jdbc-url jdbc:hive2://hiveserver:10000 --user hive --pass hive --partitioned-by dt --base-path /user/hive/warehouse/stock_ticks_mor --database default --table stock_ticks_mor
...
2018-09-24 22:23:09,171 INFO [main] hive.HiveSyncTool (HiveSyncTool.java:syncHoodieTable(112)) - Sync complete for stock_ticks_mor
...
2018-09-24 22:23:09,559 INFO [main] hive.HiveSyncTool (HiveSyncTool.java:syncHoodieTable(112)) - Sync complete for stock_ticks_mor_rt
....
exit
执行了以上命令后,你会发现:
- 一个名为
stock_ticks_cow
的 Hive 表被创建,它为写时复制数据集提供了读优化视图。 - 两个新表
stock_ticks_mor
和stock_ticks_mor_rt
被创建用于读时合并数据集。 前者为 Hudi 数据集提供了读优化视图,而后者为数据集提供了实时视图。