Installation
This package can be installed from the main git repository
remotes::install_gitlab("zero323/dlt")
or its GitHub mirror
remotes::install_github("zero323/dlt")
and requires following R packages:
SparkR (>= 3.3.0)
magrittr
Additionally, you’ll have to ensure that a compatible Delta Lake jar is available, for example by adding delta-core
to spark.jars.packages
:
spark.jars.packages io.delta:delta-core_2.12:2.1.0
Usage
This package provides:
- Readers and writers for Delta format.
- DeltaTable merge API.
- Delta table builder API.
Batch reads and writes
dlt_read
and dlt_write
can be used to read and write data in Delta format.
library(dlt)
target %>%
printSchema()
# root
# |-- id: integer (nullable = true)
# |-- key: string (nullable = true)
# |-- val: integer (nullable = true)
# |-- ind: integer (nullable = true)
# |-- category: string (nullable = true)
# |-- lat: double (nullable = true)
# |-- long: double (nullable = true)
target %>%
dlt_write("/tmp/target")
dlt_read("/tmp/target") %>%
showDF(5)
# +---+---+---+---+--------+-------------------+-------------------+
# | id|key|val|ind|category| lat| long|
# +---+---+---+---+--------+-------------------+-------------------+
# | 1| a| 4| -1| KBQ| -56.28354165237397|-108.74080670066178|
# | 2| a| 10| 1| ROB| 50.546925463713706|-104.60825988091528|
# | 3| a| 7| -1| SLX|-13.985343240201473|-114.89280310459435|
# | 4| a| 5| 1| ACP| -47.15050248429179|-168.96175763569772|
# | 5| b| 3| -1| EEK|-49.020595396868885|-105.57821027934551|
# +---+---+---+---+--------+-------------------+-------------------+
# only showing top 5 rows
These also come with aliases following SparkR conventions - read.delta
and write.delta
.
source %>%
printSchema()
# root
# |-- id: integer (nullable = true)
# |-- key: string (nullable = true)
# |-- val: integer (nullable = true)
# |-- ind: integer (nullable = true)
# |-- category: string (nullable = true)
# |-- lat: double (nullable = true)
# |-- long: double (nullable = true)
source %>%
write.delta("/tmp/source")
read.delta("/tmp/source") %>%
showDF(5)
# +---+---+---+---+--------+------------------+-------------------+
# | id|key|val|ind|category| lat| long|
# +---+---+---+---+--------+------------------+-------------------+
# | 1| a| 1| 1| NTD| 72.72564971353859| 5.116242365911603|
# | 3| b| 5| 1| RSL|-65.03216980956495| -39.52675184234977|
# | 5| b| 1| -1| SYG| 88.00051120575517| 146.06572712771595|
# | 14| c| 9| -1| MYZ| 80.40028186049312|-19.090933883562684|
# | 16| d| 10| -1| DMO|-75.16123954206705| 120.96153359860182|
# +---+---+---+---+--------+------------------+-------------------+
Loading DeltaTable objects
DataTable
objects can be created for file system path:
dlt_for_path("/tmp/target/") %>%
dlt_to_df()
# SparkDataFrame[id:int, key:string, val:int, ind:int, category:string, lat:double, long:double]
or for the table name:
source %>%
saveAsTable("source", source="delta")
dlt_for_name("source") %>%
dlt_to_df()
# SparkDataFrame[id:int, key:string, val:int, ind:int, category:string, lat:double, long:double]
Streaming reads and writes
dlt_read_stream
and dlt_read_stream
can be used for streaming reads and writes respectively.
query <- dlt_read_stream("/tmp/target") %>%
dlt_write_stream(
path = "/tmp/target-stream", queryName = "test", trigger.once = TRUE,
checkpointLocation = "/tmp/target-stream/_checkpoints/test"
)
awaitTermination(query, 10000)
# [1] TRUE
Getting table details
dlt_detail
can be used to retrieve detailed information about the format, location and other important properties of the table.
dlt_for_path("/tmp/target/") %>%
dlt_detail() %>%
drop(c("id", "location", "createdAt", "lastModified")) %>%
showDF()
# +------+----+-----------+----------------+--------+-----------+----------+----------------+----------------+
# |format|name|description|partitionColumns|numFiles|sizeInBytes|properties|minReaderVersion|minWriterVersion|
# +------+----+-----------+----------------+--------+-----------+----------+----------------+----------------+
# | delta|null| null| []| 1| 2290| {}| 1| 2|
# +------+----+-----------+----------------+--------+-----------+----------+----------------+----------------+
Updates
Updates without
dlt_for_path("/tmp/target") %>%
dlt_update(list(ind = "-ind")) %>%
dlt_show(5)
# +---+---+---+---+--------+-------------------+-------------------+
# | id|key|val|ind|category| lat| long|
# +---+---+---+---+--------+-------------------+-------------------+
# | 1| a| 4| 1| KBQ| -56.28354165237397|-108.74080670066178|
# | 2| a| 10| -1| ROB| 50.546925463713706|-104.60825988091528|
# | 3| a| 7| 1| SLX|-13.985343240201473|-114.89280310459435|
# | 4| a| 5| -1| ACP| -47.15050248429179|-168.96175763569772|
# | 5| b| 3| 1| EEK|-49.020595396868885|-105.57821027934551|
# +---+---+---+---+--------+-------------------+-------------------+
# only showing top 5 rows
and with
dlt_for_path("/tmp/target") %>%
dlt_update(list(
lat = lit(39.08067389467202),
long = lit(-89.73335678516888)
), column("id") %in% c(2, 4)) %>%
dlt_show(5)
# +---+---+---+---+--------+-------------------+-------------------+
# | id|key|val|ind|category| lat| long|
# +---+---+---+---+--------+-------------------+-------------------+
# | 1| a| 4| 1| KBQ| -56.28354165237397|-108.74080670066178|
# | 2| a| 10| -1| ROB| 39.08067389467202| -89.73335678516888|
# | 3| a| 7| 1| SLX|-13.985343240201473|-114.89280310459435|
# | 4| a| 5| -1| ACP| 39.08067389467202| -89.73335678516888|
# | 5| b| 3| 1| EEK|-49.020595396868885|-105.57821027934551|
# +---+---+---+---+--------+-------------------+-------------------+
# only showing top 5 rows
condition are supported.
Deletes
Deletes with
dlt_for_path("/tmp/target") %>%
dlt_delete(column("category") %in% c("ROB", "ACP")) %>%
dlt_show(5)
# +---+---+---+---+--------+-------------------+-------------------+
# | id|key|val|ind|category| lat| long|
# +---+---+---+---+--------+-------------------+-------------------+
# | 1| a| 4| 1| KBQ| -56.28354165237397|-108.74080670066178|
# | 3| a| 7| 1| SLX|-13.985343240201473|-114.89280310459435|
# | 5| b| 3| 1| EEK|-49.020595396868885|-105.57821027934551|
# | 6| b| 2| -1| SMT| 80.79935231711715| -46.80488987825811|
# | 7| b| 9| 1| LBC| 51.65884342510253| 97.16074059717357|
# +---+---+---+---+--------+-------------------+-------------------+
# only showing top 5 rows
and without
dlt_for_path("/tmp/target") %>%
dlt_delete() %>%
dlt_to_df() %>%
count()
# [1] 0
condition are supported.
Merging SparkDataFrame with DeltaTable
dlt
supports a complete set of DeltaMergeBuilder
methods (dlt_when_matched_delete
, dlt_when_matched_update
, dlt_when_matched_update_all
, dlt_when_not_matched_insert
, dlt_when_not_matched_insert_all
).
target %>%
dlt_write("/tmp/target", mode="overwrite")
dlt_for_path("/tmp/target") %>%
dlt_alias("target") %>%
dlt_merge(alias(source, "source"), expr("source.id = target.id")) %>%
dlt_when_matched_update_all() %>%
dlt_when_not_matched_insert_all() %>%
dlt_execute() %>%
dlt_to_df() %>%
arrange(desc(column("id"))) %>%
showDF(10)
# +---+---+---+---+--------+------------------+-------------------+
# | id|key|val|ind|category| lat| long|
# +---+---+---+---+--------+------------------+-------------------+
# | 16| d| 10| -1| DMO|-75.16123954206705| 120.96153359860182|
# | 14| c| 9| -1| MYZ| 80.40028186049312|-19.090933883562684|
# | 12| c| 10| 1| TBL| 6.229456798173487| 55.28501939959824|
# | 11| c| 5| -1| ZSH| 89.73377700895071| 61.67111137881875|
# | 10| c| 1| 1| GKP| 58.43853528611362| 100.64806896261871|
# | 9| c| 10| -1| LCN| 76.90145746339113| -138.4841349441558|
# | 8| b| 8| 1| BOB| 47.12074530310929| -91.43876885063946|
# | 7| b| 9| -1| LBC| 51.65884342510253| 97.16074059717357|
# | 6| b| 2| 1| SMT| 80.79935231711715| -46.80488987825811|
# | 5| b| 1| -1| SYG| 88.00051120575517| 146.06572712771595|
#+---+---+---+---+--------+------------------+-------------------+
# only showing top 10 rows
Querying Delta log
dlt_for_path("/tmp/target-stream/") %>%
dlt_history() %>%
printSchema()
# root
# |-- version: long (nullable = true)
# |-- timestamp: timestamp (nullable = true)
# |-- userId: string (nullable = true)
# |-- userName: string (nullable = true)
# |-- operation: string (nullable = true)
# |-- operationParameters: map (nullable = true)
# | |-- key: string
# | |-- value: string (valueContainsNull = true)
# |-- job: struct (nullable = true)
# | |-- jobId: string (nullable = true)
# | |-- jobName: string (nullable = true)
# | |-- runId: string (nullable = true)
# | |-- jobOwnerId: string (nullable = true)
# | |-- triggerType: string (nullable = true)
# |-- notebook: struct (nullable = true)
# | |-- notebookId: string (nullable = true)
# |-- clusterId: string (nullable = true)
# |-- readVersion: long (nullable = true)
# |-- isolationLevel: string (nullable = true)
# |-- isBlindAppend: boolean (nullable = true)
# |-- operationMetrics: map (nullable = true)
# | |-- key: string
# | |-- value: string (valueContainsNull = true)
# |-- userMetadata: string (nullable = true)
dlt_for_path("/tmp/target-stream/") %>%
dlt_history() %>%
select("version", "operation", "operationParameters") %>%
showDF(truncate = FALSE)
# +-------+----------------+-------------------------------------------------------------------------------------+
# |version|operation |operationParameters |
# +-------+----------------+-------------------------------------------------------------------------------------+
# |0 |STREAMING UPDATE|{outputMode -> Append, queryId -> bb30dba2-3327-44f1-b6b3-b4bb99fdf57b, epochId -> 0}|
# +-------+----------------+-------------------------------------------------------------------------------------+
dlt_for_path("/tmp/target-stream/") %>%
dlt_delete("id IN (1, 3, 5)")
dlt_for_path("/tmp/target-stream/") %>%
dlt_history() %>%
select("version", "operation", "operationParameters") %>%
showDF(truncate = FALSE)
# +-------+----------------+-------------------------------------------------------------------------------------+
# |version|operation |operationParameters |
# +-------+----------------+-------------------------------------------------------------------------------------+
# |1 |DELETE |{predicate -> ["(id IN (1, 3, 5))"]} |
# |0 |STREAMING UPDATE|{outputMode -> Append, queryId -> bb30dba2-3327-44f1-b6b3-b4bb99fdf57b, epochId -> 0}|
# +-------+----------------+-------------------------------------------------------------------------------------+
Time travel
Time travel is possible by setting versionAsOf
dlt_read("/tmp/target-stream/") %>%
count()
# [1] 9
dlt_read("/tmp/target-stream/", versionAsOf=0) %>%
count()
# [1] 12
or timestampAsOf
options
.
timestamps <- dlt_for_path("/tmp/target-stream/") %>%
dlt_history() %>%
where(column("version") %in% c(0, 1)) %>%
arrange("version") %>%
select(alias(date_format(column("timestamp") + expr("INTERVAL 1 second"), "yyyy-MM-dd HH:mm:ss"), "timestamp")) %>%
collect()
dlt_read("/tmp/target-stream/", timestampAsOf = timestamps$timestamp[1]) %>%
count()
# [1] 12
It is also possible to restore data to a specific version
dlt_for_path("/tmp/target-stream/") %>%
dlt_restore_to_version(0) %>%
showDF()
# +------------------------+--------------------------+-----------------+------------------+------------------+-------------------+
# |table_size_after_restore|num_of_files_after_restore|num_removed_files|num_restored_files|removed_files_size|restored_files_size|
# +------------------------+--------------------------+-----------------+------------------+------------------+-------------------+
# | 2295| 1| 1| 1| 2175| 2295|
# + ------------------------+--------------------------+-----------------+------------------+------------------+-------------------+
dlt_read("/tmp/target-stream/") %>%
count()
# [1] 12
dlt_for_path("/tmp/target-stream/") %>%
dlt_restore_to_version(1) %>%
showDF()
# +------------------------+--------------------------+-----------------+------------------+------------------+-------------------+
# |table_size_after_restore|num_of_files_after_restore|num_removed_files|num_restored_files|removed_files_size|restored_files_size|
# +------------------------+--------------------------+-----------------+------------------+------------------+-------------------+
# | 2175| 1| 1| 1| 2295| 2175|
# + ------------------------+--------------------------+-----------------+------------------+------------------+-------------------+
dlt_read("/tmp/target-stream/") %>%
count()
# [1] 9
or timestamp
dlt_for_path("/tmp/target-stream/") %>%
dlt_restore_to_timestamp(timestamp = timestamps$timestamp[1]) %>%
showDF()
# +------------------------+--------------------------+-----------------+------------------+------------------+-------------------+
# |table_size_after_restore|num_of_files_after_restore|num_removed_files|num_restored_files|removed_files_size|restored_files_size|
# +------------------------+--------------------------+-----------------+------------------+------------------+-------------------+
# | 2295| 1| 1| 1| 2175| 2295|
# + ------------------------+--------------------------+-----------------+------------------+------------------+-------------------+
dlt_read("/tmp/target-stream/") %>%
count()
# [1] 12
Building DeltaTables
New tables can be created (dlt_create
), created if not exists (dlt_create_if_not_exists
), replaced (dlt_replace
) and created or replaced (dlt_create_or_replace
). All DeltaTableBuilder
methods are supported.
dlt_create() %>%
dlt_location("/tmp/key-val") %>%
dlt_add_column("id", "integer", nullable = FALSE) %>%
dlt_add_columns(structType("key string, value double")) %>%
dlt_partitioned_by("key") %>%
dlt_comment("Key-value table") %>%
dlt_property("creation-time", as.character(Sys.time())) %>%
dlt_execute() %>%
dlt_to_df() %>%
printSchema()
# root
# |-- id: integer (nullable = false)
# |-- key: string (nullable = true)
# |-- value: double (nullable = true)
Maintenance and conversions
You can use dlt
to convert Parquet directories to DeltaTable
target %>%
write.parquet("/tmp/target-parquet")
dlt_is_delta_table("/tmp/target-parquet/")
# [1] FALSE
tbl <- dlt_convert_to_delta("parquet.`/tmp/target-parquet/`")
dlt_is_delta_table("/tmp/target-parquet/")
# [1] TRUE
tbl %>%
dlt_show(5)
# +---+---+---+---+--------+-------------------+-------------------+
# | id|key|val|ind|category| lat| long|
# +---+---+---+---+--------+-------------------+-------------------+
# | 1| a| 4| -1| KBQ| -56.28354165237397|-108.74080670066178|
# | 2| a| 10| 1| ROB| 50.546925463713706|-104.60825988091528|
# | 3| a| 7| -1| SLX|-13.985343240201473|-114.89280310459435|
# | 4| a| 5| 1| ACP| -47.15050248429179|-168.96175763569772|
# | 5| b| 3| -1| EEK|-49.020595396868885|-105.57821027934551|
# +---+---+---+---+--------+-------------------+-------------------+
# only showing top 5 rows
vacuum DeltaTables
:
dlt_for_path("/tmp/target") %>%
dlt_vacuum()
# Deleted 0 files and directories in a total of 1 directories.
generate manifests:
dlt_for_path("/tmp/target") %>%
dlt_generate_manifest("symlink_format_manifest")
and upgrade Delta protocols
key_val_log_path <- "/tmp/key-val/_delta_log/*json"
read.json(key_val_log_path) %>%
select("metaData.id", "commitInfo.operation", "protocol") %>%
showDF()
# +--------------------+------------+--------+
# | id| operation|protocol|
# +--------------------+------------+--------+
# | null|CREATE TABLE| null|
# | null| null| {1, 2}|
# |72d4784b-e656-44a...| null| null|
# +--------------------+------------+--------+
dlt_for_path("/tmp/key-val") %>%
dlt_upgrade_table_protocol(1, 3)
read.json(key_val_log_path) %>%
select("metaData.id", "commitInfo.operation", "protocol") %>%
showDF()
# +--------------------+----------------+--------+
# | id| operation|protocol|
# +--------------------+----------------+--------+
# | null| CREATE TABLE| null|
# | null| null| {1, 2}|
# |72d4784b-e656-44a...| null| null|
# | null|UPGRADE PROTOCOL| null|
# | null| null| {1, 3}|
# +--------------------+----------------+--------+
This package also provides an interface for layout optimizations. One can compact with
target %>%
repartition(10) %>%
dlt_write("/tmp/target-optimize-compact", mode = "overwrite", partitionBy = "key")
dlt_for_path("/tmp/target-optimize-compact") %>%
dlt_optimize() %>%
dlt_where("key = 'a'") %>%
dlt_execute_compaction() %>%
select("metrics") %>%
showDF(truncate = FALSE)
# +---------------------------------------------------------------------------------------------------------------------+
# |metrics |
# +---------------------------------------------------------------------------------------------------------------------+
# |{1, 4, {1750, 1750, 1750.0, 1, 1750}, {1633, 1633, 1633.0, 4, 6532}, 1, null, 1, 4, 0, false, 0, 0, 1669291200256, 0}|
# +---------------------------------------------------------------------------------------------------------------------+
or without partition filter.
dlt_for_path("/tmp/target-optimize-compact") %>%
dlt_optimize() %>%
dlt_execute_compaction() %>%
select("metrics") %>%
showDF(truncate = FALSE)
# +-----------------------------------------------------------------------------------------------------------------------+
# |metrics |
# +-----------------------------------------------------------------------------------------------------------------------+
# |{2, 8, {1753, 1780, 1766.5, 2, 3533}, {1632, 1633, 1632.75, 8, 13062}, 2, null, 2, 9, 1, false, 0, 0, 1669291227707, 0}|
# +-----------------------------------------------------------------------------------------------------------------------+
Similarly, one can Z-Order the files with
target %>%
repartition(10) %>%
dlt_write("/tmp/target-optimize-zorderby", mode="overwrite", partitionBy="key")
dlt_for_path("/tmp/target-optimize-compact") %>%
dlt_optimize() %>%
dlt_where("key = 'a'") %>%
dlt_execute_z_order_by("lat", "long") %>%
select("metrics") %>%
showDF(truncate = FALSE)
# +----------------------------------------------------------------------------------------------------------------------------------------------------------------+
# |metrics |
# +----------------------------------------------------------------------------------------------------------------------------------------------------------------+
# |{1, 1, {1750, 1750, 1750.0, 1, 1750}, {1750, 1750, 1750.0, 1, 1750}, 1, {all, {0, 0}, {1, 1750}, 0, {1, 1750}, 1, null}, 1, 1, 0, false, 0, 0, 1669291309308, 0}|
# +----------------------------------------------------------------------------------------------------------------------------------------------------------------+
or without partition filter
dlt_for_path("/tmp/target-optimize-compact") %>%
dlt_optimize() %>%
dlt_execute_z_order_by("lat", "long") %>%
select("metrics") %>%
showDF(truncate = FALSE)
# +----------------------------------------------------------------------------------------------------------------------------------------------------------------+
# |metrics |
# +----------------------------------------------------------------------------------------------------------------------------------------------------------------+
# |{3, 3, {1750, 1780, 1761.0, 3, 5283}, {1750, 1780, 1761.0, 3, 5283}, 3, {all, {0, 0}, {3, 5283}, 0, {3, 5283}, 3, null}, 3, 3, 0, false, 0, 0, 1669291365663, 0}|
# +----------------------------------------------------------------------------------------------------------------------------------------------------------------+
Acknowledgments
Logo based on Yellow wasp, m, left, Kruger National Park, South Africa by USGS Bee Inventory and Monitoring Lab.