Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 31 additions & 13 deletions examples/tpch/q01_pricing_summary_report.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,30 @@

The above problem statement text is copyrighted by the Transaction Processing Performance Council
as part of their TPC Benchmark H Specification revision 2.18.0.

Reference SQL (from TPC-H specification, used by the benchmark suite)::

select
l_returnflag,
l_linestatus,
sum(l_quantity) as sum_qty,
sum(l_extendedprice) as sum_base_price,
sum(l_extendedprice * (1 - l_discount)) as sum_disc_price,
sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) as sum_charge,
avg(l_quantity) as avg_qty,
avg(l_extendedprice) as avg_price,
avg(l_discount) as avg_disc,
count(*) as count_order
from
lineitem
where
l_shipdate <= date '1998-12-01' - interval '90 days'
group by
l_returnflag,
l_linestatus
order by
l_returnflag,
l_linestatus;
"""

import pyarrow as pa
Expand Down Expand Up @@ -58,31 +82,25 @@

# Aggregate the results

disc_price = col("l_extendedprice") * (lit(1) - col("l_discount"))

df = df.aggregate(
[col("l_returnflag"), col("l_linestatus")],
["l_returnflag", "l_linestatus"],
[
F.sum(col("l_quantity")).alias("sum_qty"),
F.sum(col("l_extendedprice")).alias("sum_base_price"),
F.sum(col("l_extendedprice") * (lit(1) - col("l_discount"))).alias(
"sum_disc_price"
),
F.sum(
col("l_extendedprice")
* (lit(1) - col("l_discount"))
* (lit(1) + col("l_tax"))
).alias("sum_charge"),
F.sum(disc_price).alias("sum_disc_price"),
F.sum(disc_price * (lit(1) + col("l_tax"))).alias("sum_charge"),
F.avg(col("l_quantity")).alias("avg_qty"),
F.avg(col("l_extendedprice")).alias("avg_price"),
F.avg(col("l_discount")).alias("avg_disc"),
F.count(col("l_returnflag")).alias(
"count_order"
), # Counting any column should return same result
F.count_star().alias("count_order"),
],
)

# Sort per the expected result

df = df.sort(col("l_returnflag").sort(), col("l_linestatus").sort())
df = df.sort_by("l_returnflag", "l_linestatus")

# Note: There appears to be a discrepancy between what is returned here and what is in the generated
# answers file for the case of return flag N and line status O, but I did not investigate further.
Expand Down
87 changes: 63 additions & 24 deletions examples/tpch/q02_minimum_cost_supplier.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,52 @@

The above problem statement text is copyrighted by the Transaction Processing Performance Council
as part of their TPC Benchmark H Specification revision 2.18.0.

Reference SQL (from TPC-H specification, used by the benchmark suite)::

select
s_acctbal,
s_name,
n_name,
p_partkey,
p_mfgr,
s_address,
s_phone,
s_comment
from
part,
supplier,
partsupp,
nation,
region
where
p_partkey = ps_partkey
and s_suppkey = ps_suppkey
and p_size = 15
and p_type like '%BRASS'
and s_nationkey = n_nationkey
and n_regionkey = r_regionkey
and r_name = 'EUROPE'
and ps_supplycost = (
select
min(ps_supplycost)
from
partsupp,
supplier,
nation,
region
where
p_partkey = ps_partkey
and s_suppkey = ps_suppkey
and s_nationkey = n_nationkey
and n_regionkey = r_regionkey
and r_name = 'EUROPE'
)
order by
s_acctbal desc,
n_name,
s_name,
p_partkey limit 100;
"""

import datafusion
Expand Down Expand Up @@ -67,35 +113,30 @@
"r_regionkey", "r_name"
)

# Filter down parts. Part names contain the type of interest, so we can use strpos to find where
# in the p_type column the word is. `strpos` will return 0 if not found, otherwise the position
# in the string where it is located.
# Filter down parts. The reference SQL uses ``p_type like '%BRASS'`` which
# is an ``ends_with`` check; use the dedicated string function rather than
# a manual substring match.

df_part = df_part.filter(
F.strpos(col("p_type"), lit(TYPE_OF_INTEREST)) > lit(0)
).filter(col("p_size") == lit(SIZE_OF_INTEREST))
F.ends_with(col("p_type"), lit(TYPE_OF_INTEREST)),
col("p_size") == SIZE_OF_INTEREST,
)

# Filter regions down to the one of interest

df_region = df_region.filter(col("r_name") == lit(REGION_OF_INTEREST))
df_region = df_region.filter(col("r_name") == REGION_OF_INTEREST)

# Now that we have the region, find suppliers in that region. Suppliers are tied to their nation
# and nations are tied to the region.

df_nation = df_nation.join(
df_region, left_on=["n_regionkey"], right_on=["r_regionkey"], how="inner"
)
df_supplier = df_supplier.join(
df_nation, left_on=["s_nationkey"], right_on=["n_nationkey"], how="inner"
)
df_nation = df_nation.join(df_region, left_on="n_regionkey", right_on="r_regionkey")
df_supplier = df_supplier.join(df_nation, left_on="s_nationkey", right_on="n_nationkey")

# Now that we know who the potential suppliers are for the part, we can limit out part
# supplies table down. We can further join down to the specific parts we've identified
# as matching the request

df = df_partsupp.join(
df_supplier, left_on=["ps_suppkey"], right_on=["s_suppkey"], how="inner"
)
df = df_partsupp.join(df_supplier, left_on="ps_suppkey", right_on="s_suppkey")

# Locate the minimum cost across all suppliers. There are multiple ways you could do this,
# but one way is to create a window function across all suppliers, find the minimum, and
Expand All @@ -112,9 +153,9 @@
),
)

df = df.filter(col("min_cost") == col("ps_supplycost"))

df = df.join(df_part, left_on=["ps_partkey"], right_on=["p_partkey"], how="inner")
df = df.filter(col("min_cost") == col("ps_supplycost")).join(
df_part, left_on="ps_partkey", right_on="p_partkey"
)

# From the problem statement, these are the values we wish to output

Expand All @@ -132,12 +173,10 @@
# Sort and display 100 entries
df = df.sort(
col("s_acctbal").sort(ascending=False),
col("n_name").sort(),
col("s_name").sort(),
col("p_partkey").sort(),
)

df = df.limit(100)
"n_name",
"s_name",
"p_partkey",
).limit(100)

# Show results

Expand Down
51 changes: 36 additions & 15 deletions examples/tpch/q03_shipping_priority.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,31 @@

The above problem statement text is copyrighted by the Transaction Processing Performance Council
as part of their TPC Benchmark H Specification revision 2.18.0.

Reference SQL (from TPC-H specification, used by the benchmark suite)::

select
l_orderkey,
sum(l_extendedprice * (1 - l_discount)) as revenue,
o_orderdate,
o_shippriority
from
customer,
orders,
lineitem
where
c_mktsegment = 'BUILDING'
and c_custkey = o_custkey
and l_orderkey = o_orderkey
and o_orderdate < date '1995-03-15'
and l_shipdate > date '1995-03-15'
group by
l_orderkey,
o_orderdate,
o_shippriority
order by
revenue desc,
o_orderdate limit 10;
"""

from datafusion import SessionContext, col, lit
Expand All @@ -50,38 +75,34 @@

# Limit dataframes to the rows of interest

df_customer = df_customer.filter(col("c_mktsegment") == lit(SEGMENT_OF_INTEREST))
df_customer = df_customer.filter(col("c_mktsegment") == SEGMENT_OF_INTEREST)
df_orders = df_orders.filter(col("o_orderdate") < lit(DATE_OF_INTEREST))
df_lineitem = df_lineitem.filter(col("l_shipdate") > lit(DATE_OF_INTEREST))

# Join all 3 dataframes

df = df_customer.join(
df_orders, left_on=["c_custkey"], right_on=["o_custkey"], how="inner"
).join(df_lineitem, left_on=["o_orderkey"], right_on=["l_orderkey"], how="inner")
df = df_customer.join(df_orders, left_on="c_custkey", right_on="o_custkey").join(
df_lineitem, left_on="o_orderkey", right_on="l_orderkey"
)

# Compute the revenue

df = df.aggregate(
[col("l_orderkey")],
["l_orderkey"],
[
F.first_value(col("o_orderdate")).alias("o_orderdate"),
F.first_value(col("o_shippriority")).alias("o_shippriority"),
F.sum(col("l_extendedprice") * (lit(1.0) - col("l_discount"))).alias("revenue"),
],
)

# Sort by priority

df = df.sort(col("revenue").sort(ascending=False), col("o_orderdate").sort())

# Only return 10 results
# Sort by priority, take 10, and project in the order expected by the spec.

df = df.limit(10)

# Change the order that the columns are reported in just to match the spec

df = df.select("l_orderkey", "revenue", "o_orderdate", "o_shippriority")
df = (
df.sort(col("revenue").sort(ascending=False), "o_orderdate")
.limit(10)
.select("l_orderkey", "revenue", "o_orderdate", "o_shippriority")
)

# Show result

Expand Down
67 changes: 38 additions & 29 deletions examples/tpch/q04_order_priority_checking.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,18 +24,40 @@

The above problem statement text is copyrighted by the Transaction Processing Performance Council
as part of their TPC Benchmark H Specification revision 2.18.0.

Reference SQL (from TPC-H specification, used by the benchmark suite)::

select
o_orderpriority,
count(*) as order_count
from
orders
where
o_orderdate >= date '1993-07-01'
and o_orderdate < date '1993-07-01' + interval '3' month
and exists (
select
*
from
lineitem
where
l_orderkey = o_orderkey
and l_commitdate < l_receiptdate
)
group by
o_orderpriority
order by
o_orderpriority;
"""

from datetime import datetime
from datetime import date

import pyarrow as pa
from datafusion import SessionContext, col, lit
from datafusion import functions as F
from util import get_data_path

# Ideally we could put 3 months into the interval. See note below.
INTERVAL_DAYS = 92
DATE_OF_INTEREST = "1993-07-01"
QUARTER_START = date(1993, 7, 1)
QUARTER_END = date(1993, 10, 1)

# Load the dataframes we need

Expand All @@ -48,36 +70,23 @@
"l_orderkey", "l_commitdate", "l_receiptdate"
)

# Create a date object from the string
date = datetime.strptime(DATE_OF_INTEREST, "%Y-%m-%d").date()

interval = pa.scalar((0, INTERVAL_DAYS, 0), type=pa.month_day_nano_interval())

# Limit results to cases where commitment date before receipt date
# Aggregate the results so we only get one row to join with the order table.
# Alternately, and likely more idiomatic is instead of `.aggregate` you could
# do `.select("l_orderkey").distinct()`. The goal here is to show
# multiple examples of how to use Data Fusion.
df_lineitem = df_lineitem.filter(col("l_commitdate") < col("l_receiptdate")).aggregate(
[col("l_orderkey")], []
# Keep only orders in the quarter of interest, then restrict to those that
# have at least one late lineitem via a semi join (the DataFrame form of
# ``EXISTS`` from the reference SQL).
df_orders = df_orders.filter(
col("o_orderdate") >= lit(QUARTER_START),
col("o_orderdate") < lit(QUARTER_END),
)

# Limit orders to date range of interest
df_orders = df_orders.filter(col("o_orderdate") >= lit(date)).filter(
col("o_orderdate") < lit(date) + lit(interval)
)
late_lineitems = df_lineitem.filter(col("l_commitdate") < col("l_receiptdate"))

# Perform the join to find only orders for which there are lineitems outside of expected range
df = df_orders.join(
df_lineitem, left_on=["o_orderkey"], right_on=["l_orderkey"], how="inner"
late_lineitems, left_on="o_orderkey", right_on="l_orderkey", how="semi"
)

# Based on priority, find the number of entries
df = df.aggregate(
[col("o_orderpriority")], [F.count(col("o_orderpriority")).alias("order_count")]
# Count the number of orders in each priority group and sort.
df = df.aggregate(["o_orderpriority"], [F.count_star().alias("order_count")]).sort_by(
"o_orderpriority"
)

# Sort the results
df = df.sort(col("o_orderpriority").sort())

df.show()
Loading
Loading