One of the core use-cases for Apache Hudi is enabling seamless, efficient database ingestion to your data lake. Even though a lot has been talked about and even users already adopting this model, content on how to go about this is sparse.
In this blog, we will build an end-end solution for capturing changes from a MySQL instance running on AWS RDS to a Hudi table on S3, using capabilities in the Hudi 0.5.1 release
We can break up the problem into two pieces.
Extracting change logs from MySQL : Surprisingly, this is still a pretty tricky problem to solve and often Hudi users get stuck here. Thankfully, at-least for AWS users, there is a Database Migration service (DMS for short), that does this change capture and uploads them as parquet files on S3
Applying these change logs to your data lake table : Once there are change logs in some form, the next step is to apply them incrementally to your table. This mundane task can be fully automated using the Hudi DeltaStreamer tool.
The actual end-end architecture looks something like this.
Let’s now illustrate how one can accomplish this using a simple orders table, stored in MySQL (these instructions should broadly apply to other database engines like Postgres, or Aurora as well, though SQL/Syntax may change)
In the table, order_id is the primary key which will be enforced on the Hudi table as well. Since a batch of change records can contain changes to the same primary key, we also include updated_at and created_at fields, which are kept upto date as writes happen to the table.
Extracting Change logs from MySQL
Before we can configure DMS, we first need to prepare the MySQL instance for change capture, by ensuring backups are enabled and binlog is turned on.
Source hudi-source-db endpoint, points to the DB server and provides basic authentication details
Target parquet-s3 endpoint, points to the bucket and folder on s3 to store the change logs records as parquet files
Then proceed to create a migration task, as below. Give it a name, connect the source to the target and be sure to pick the right Migration type as shown below, to ensure ongoing changes are continuously replicated to S3. Also make sure to specify, the rules using which DMS decides which MySQL schema/tables to replicate. In this example, we simply whitelist orders table under the hudi_dms schema, as specified in the table SQL above.
Starting the DMS task and should result in an initial load, like below.
Simply reading the raw initial load file, shoud give the same values as the upstream table
Now, we are ready to start consuming the change logs. Hudi DeltaStreamer runs as Spark job on your favorite workflow scheduler (it also supports a continuous mode using –continuous flag, where it runs as a long running Spark job), that tails a given path on S3 (or any DFS implementation) for new files and can issue an upsert to a target hudi dataset. The tool automatically checkpoints itself and thus to repeatedly ingest, all one needs to do is to keep executing the DeltaStreamer periodically.
With an initial load already on S3, we then run the following command (deltastreamer command, here on) to ingest the full load first and create a Hudi dataset on S3.
First, we specify the –table-type as COPY_ON_WRITE. Hudi also supports another _MERGE_ON_READ ty_pe you can use if you choose from.
To handle cases where the input parquet files contain multiple updates/deletes or insert/updates to the same record, we use updated_at as the ordering field. This ensures that the change record which has the latest timestamp will be reflected in Hudi.
We specify a target base path and a table table, all needed for creating and writing to the Hudi table
We use a special payload class - AWSDMSAvroPayload , to handle the different change operations correctly. The parquet files generated have an Op field, that indicates whether a given change record is an insert (I), delete (D) or update (U) and the payload implementation uses this field to decide how to handle a given change record.
You may also notice a special transformer class AWSDmsTransformer , being specified. The reason here is tactical, but important. The initial load file does not contain an Op field, so this adds one to Hudi table schema additionally.
Finally, we specify the record key for the Hudi table as same as the upstream table. Then we specify partitioning by customer_name and also the root of the DMS output.
Once the command is run, the Hudi table should be created and have same records as the upstream table (with all the _hoodie fields as well).
A nice debugging aid would be read all of the DMS output now and sort it by update_at, which should give us a sequence of changes that happened on the upstream table. As we can see, the Hudi table above is a compacted snapshot of this raw change log.
Note that the delete and update have the same updated_at, value. thus it can very well order differently here.. In short this way of looking at the changelog has its caveats. For a true changelog of the Hudi table itself, you can issue an incremental query.
And Life goes on ….. Hope this was useful to all the data engineers out there!