Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -408,6 +408,20 @@ class ExpressionTypeCheckingSuite extends SparkFunSuite with SQLHelper with Quer
}
}

test("SPARK-57103: max_by/min_by accept a nanosecond ordering and preserve the value type") {
// MaxBy/MinBy gate only on the ordering expression's orderability; a nanosecond ordering is an
// orderable AtomicType (SPARK-57103). The value expression is unrestricted, and the result type
// is the value's type, so a nanosecond value is returned with its precision preserved.
Seq(TimestampNTZNanosType(9), TimestampLTZNanosType(7)).foreach { dt =>
val v = AttributeReference("v", dt)()
val ord = AttributeReference("ord", dt)()
Seq(MaxBy(v, ord), MinBy(v, ord)).foreach { agg =>
assert(agg.checkInputDataTypes() == TypeCheckResult.TypeCheckSuccess)
assert(agg.dataType == dt)
}
}
Comment thread
stevomitric marked this conversation as resolved.
}

test("check types for aggregates") {
// We use AggregateFunction directly at here because the error will be thrown from it
// instead of from AggregateExpression, which is the wrapper of an AggregateFunction.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -722,6 +722,18 @@ Project [unix_timestamp(cast(null as timestamp_ltz(9)), yyyy-MM-dd HH:mm:ss, Som
+- OneRowRelation


-- !query
SELECT max_by(v, k), min_by(v, k) FROM VALUES
(TIMESTAMP_LTZ '2020-01-01 00:00:00.000000001 UTC', 1),
(TIMESTAMP_LTZ '2020-01-01 00:00:00.000000999 UTC', 3),
(TIMESTAMP_LTZ '2020-01-01 00:00:00.000000500 UTC', 2),
(TIMESTAMP_LTZ '2020-01-01 00:00:00.000000007 UTC', CAST(NULL AS INT)) AS t(v, k)
-- !query analysis
Aggregate [max_by(v#x, k#x) AS max_by(v, k)#x, min_by(v#x, k#x) AS min_by(v, k)#x]
+- SubqueryAlias t
+- LocalRelation [v#x, k#x]


-- !query
SELECT unix_nanos(TIMESTAMP_LTZ '2020-01-01 13:24:35.123456789 UTC')
-- !query analysis
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -642,6 +642,18 @@ Project [unix_timestamp(cast(null as timestamp_ntz(9)), yyyy-MM-dd HH:mm:ss, Som
+- OneRowRelation


-- !query
SELECT max_by(v, k), min_by(v, k) FROM VALUES
(TIMESTAMP_NTZ '2020-01-01 00:00:00.000000001', 1),
(TIMESTAMP_NTZ '2020-01-01 00:00:00.000000999', 3),
(TIMESTAMP_NTZ '2020-01-01 00:00:00.000000500', 2),
(TIMESTAMP_NTZ '2020-01-01 00:00:00.000000007', CAST(NULL AS INT)) AS t(v, k)
-- !query analysis
Aggregate [max_by(v#x, k#x) AS max_by(v, k)#x, min_by(v#x, k#x) AS min_by(v, k)#x]
+- SubqueryAlias t
+- LocalRelation [v#x, k#x]


-- !query
SELECT unix_nanos(TIMESTAMP_NTZ '2020-01-01 13:24:35.123456789')
-- !query analysis
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,15 @@ SELECT unix_timestamp(TIMESTAMP_LTZ '1969-12-31 23:59:59.500000000 UTC');
-- NULL nanosecond timestamp.
SELECT unix_timestamp(NULL :: timestamp_ltz(9)), to_unix_timestamp(NULL :: timestamp_ltz(9));

-- SPARK-57103: max_by / min_by return the nanosecond-precision TIMESTAMP_LTZ value at the extreme
-- ordering key, preserving the nanosecond type. The ordering keys are distinct so the result is
-- deterministic; a NULL-ordering row is ignored.
SELECT max_by(v, k), min_by(v, k) FROM VALUES
(TIMESTAMP_LTZ '2020-01-01 00:00:00.000000001 UTC', 1),
(TIMESTAMP_LTZ '2020-01-01 00:00:00.000000999 UTC', 3),
(TIMESTAMP_LTZ '2020-01-01 00:00:00.000000500 UTC', 2),
(TIMESTAMP_LTZ '2020-01-01 00:00:00.000000007 UTC', CAST(NULL AS INT)) AS t(v, k);

-- SPARK-57527: unix_nanos over nanosecond-precision values returns DECIMAL(21, 0) nanoseconds since
-- the epoch. The explicit-zone literals below fix the instant directly, independent of the session
-- time zone. The sub-microsecond digits are kept, truncated to the type's precision.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,15 @@ SELECT unix_timestamp(TIMESTAMP_NTZ '1969-12-31 23:59:59.500000000');
-- NULL nanosecond timestamp.
SELECT unix_timestamp(NULL :: timestamp_ntz(9)), to_unix_timestamp(NULL :: timestamp_ntz(9));

-- SPARK-57103: max_by / min_by return the nanosecond-precision TIMESTAMP_NTZ value at the extreme
-- ordering key, preserving the nanosecond type. The ordering keys are distinct so the result is
-- deterministic; a NULL-ordering row is ignored.
SELECT max_by(v, k), min_by(v, k) FROM VALUES
(TIMESTAMP_NTZ '2020-01-01 00:00:00.000000001', 1),
(TIMESTAMP_NTZ '2020-01-01 00:00:00.000000999', 3),
(TIMESTAMP_NTZ '2020-01-01 00:00:00.000000500', 2),
(TIMESTAMP_NTZ '2020-01-01 00:00:00.000000007', CAST(NULL AS INT)) AS t(v, k);

-- SPARK-57527: unix_nanos over nanosecond-precision values returns DECIMAL(21, 0) nanoseconds since
-- the epoch; NTZ applies no zone shift, so the wall-clock value is read as the epoch instant. The
-- sub-microsecond digits are kept, truncated to the type's precision.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -808,6 +808,18 @@ struct<unix_timestamp(CAST(NULL AS TIMESTAMP_LTZ(9)), yyyy-MM-dd HH:mm:ss):bigin
NULL NULL


-- !query
SELECT max_by(v, k), min_by(v, k) FROM VALUES
(TIMESTAMP_LTZ '2020-01-01 00:00:00.000000001 UTC', 1),
(TIMESTAMP_LTZ '2020-01-01 00:00:00.000000999 UTC', 3),
(TIMESTAMP_LTZ '2020-01-01 00:00:00.000000500 UTC', 2),
(TIMESTAMP_LTZ '2020-01-01 00:00:00.000000007 UTC', CAST(NULL AS INT)) AS t(v, k)
-- !query schema
struct<max_by(v, k):timestamp_ltz(9),min_by(v, k):timestamp_ltz(9)>
-- !query output
2019-12-31 16:00:00.000000999 2019-12-31 16:00:00.000000001


-- !query
SELECT unix_nanos(TIMESTAMP_LTZ '2020-01-01 13:24:35.123456789 UTC')
-- !query schema
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -718,6 +718,18 @@ struct<unix_timestamp(CAST(NULL AS TIMESTAMP_NTZ(9)), yyyy-MM-dd HH:mm:ss):bigin
NULL NULL


-- !query
SELECT max_by(v, k), min_by(v, k) FROM VALUES
(TIMESTAMP_NTZ '2020-01-01 00:00:00.000000001', 1),
(TIMESTAMP_NTZ '2020-01-01 00:00:00.000000999', 3),
(TIMESTAMP_NTZ '2020-01-01 00:00:00.000000500', 2),
(TIMESTAMP_NTZ '2020-01-01 00:00:00.000000007', CAST(NULL AS INT)) AS t(v, k)
-- !query schema
struct<max_by(v, k):timestamp_ntz(9),min_by(v, k):timestamp_ntz(9)>
-- !query output
2020-01-01 00:00:00.000000999 2020-01-01 00:00:00.000000001


-- !query
SELECT unix_nanos(TIMESTAMP_NTZ '2020-01-01 13:24:35.123456789')
-- !query schema
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -450,6 +450,97 @@ abstract class TimestampNanosFunctionsSuiteBase extends SharedSparkSession {
}
}

// ===== max_by / min_by over nanosecond-precision timestamps (SPARK-56822) =====
// `MaxBy`/`MinBy` gate only on the ordering expression's orderability
// (`MaxMinBy.checkInputDataTypes` -> `TypeUtils.checkForOrderingExpr`), which the nanosecond
// types pass (SPARK-57103); the value expression is unrestricted and `dataType = valueExpr
// .dataType`, so a nanosecond *value* is returned with its precision preserved. No change to the
// aggregates is needed -- these tests lock in both the nanos-as-value and nanos-as-ordering
// paths.

test("SPARK-57103: max_by/min_by return a nanosecond value and preserve its precision") {
Seq(7, 8, 9).foreach { p =>
// Value columns are nanos; the ordering column is a plain int key (max at k=3, min at k=1).
// The sub-microsecond parts are multiples of 100ns, so they are exact at every p in [7, 9]
// (no flooring) yet still non-zero -- proving the nanos value survives, not truncated to
// micros.
val schema = new StructType()
.add("ntz", TimestampNTZNanosType(p))
.add("ltz", TimestampLTZNanosType(p))
.add("k", IntegerType)
val data = Seq(
Row(LocalDateTime.parse("2020-01-01T00:00:00.000000100"),
Instant.parse("2020-01-01T00:00:00.000000100Z"), 1),
Row(LocalDateTime.parse("2020-01-01T00:00:00.000000900"),
Instant.parse("2020-01-01T00:00:00.000000900Z"), 3),
Row(LocalDateTime.parse("2020-01-01T00:00:00.000000500"),
Instant.parse("2020-01-01T00:00:00.000000500Z"), 2))
val df = spark.createDataFrame(spark.sparkContext.parallelize(data), schema)
val res = df.select(
max_by(col("ntz"), col("k")), min_by(col("ntz"), col("k")),
max_by(col("ltz"), col("k")), min_by(col("ltz"), col("k")))
checkAnswer(res, Row(
LocalDateTime.parse("2020-01-01T00:00:00.000000900"),
LocalDateTime.parse("2020-01-01T00:00:00.000000100"),
Instant.parse("2020-01-01T00:00:00.000000900Z"),
Instant.parse("2020-01-01T00:00:00.000000100Z")))
// The returned value keeps the family (NTZ/LTZ) and precision of the value column.
assert(res.schema.map(_.dataType) === Seq(
TimestampNTZNanosType(p), TimestampNTZNanosType(p),
TimestampLTZNanosType(p), TimestampLTZNanosType(p)))
}
}

test("SPARK-57103: max_by/min_by order by a nanosecond key down to the sub-microsecond") {
// The ordering values share epochMicros and differ only within the microsecond, so picking the
// extreme requires the full TimestampNanosVal comparison; a NULL-ordering row is ignored.
// Run on both the codegen and the interpreted paths.
Seq(
Seq(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "true",
SQLConf.CODEGEN_FACTORY_MODE.key -> "CODEGEN_ONLY"),
Seq(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "false",
SQLConf.CODEGEN_FACTORY_MODE.key -> "NO_CODEGEN")
).foreach { conf =>
withSQLConf(conf: _*) {
val df = spark.createDataFrame(
spark.sparkContext.parallelize(Seq(
Row("lo", Instant.parse("2020-01-01T00:00:00.000000001Z")),
Row("hi", Instant.parse("2020-01-01T00:00:00.000000999Z")),
Row("skip", null))),
new StructType().add("label", StringType).add("ts", TimestampLTZNanosType(9)))
checkAnswer(
df.select(max_by(col("label"), col("ts")), min_by(col("label"), col("ts"))),
Row("hi", "lo"))
}
}
}

test("SPARK-57103: max_by/min_by over nanos handle all-NULL ordering and GROUP BY") {
Seq(7, 8, 9).foreach { p =>
// All ordering values NULL -> result NULL.
val allNull = spark.createDataFrame(
spark.sparkContext.parallelize(Seq(Row("a", null), Row("b", null))),
new StructType().add("label", StringType).add("ts", TimestampNTZNanosType(p)))
checkAnswer(
allNull.select(max_by(col("label"), col("ts")), min_by(col("label"), col("ts"))),
Row(null, null))

// GROUP BY: per group, pick the label at the extreme nanosecond ordering key.
val grouped = spark.createDataFrame(
spark.sparkContext.parallelize(Seq(
Row("g1", "g1-lo", LocalDateTime.parse("2020-01-01T00:00:00.000000001")),
Row("g1", "g1-hi", LocalDateTime.parse("2020-01-01T00:00:00.000000999")),
Row("g2", "g2-only", LocalDateTime.parse("2020-01-01T00:00:00.000000005")))),
new StructType().add("g", StringType).add("label", StringType)
.add("ts", TimestampNTZNanosType(p)))
checkAnswer(
grouped.groupBy("g").agg(
max_by(col("label"), col("ts")).as("mx"),
min_by(col("label"), col("ts")).as("mn")).orderBy("g"),
Seq(Row("g1", "g1-hi", "g1-lo"), Row("g2", "g2-only", "g2-only")))
}
}

test("SPARK-57527: unix_nanos over nanosecond-precision timestamps") {
// unix_nanos returns DECIMAL(21, 0) nanoseconds since the epoch and applies no zone shift to a
// timestamp argument. The chosen fractions have zeros beyond the 7th digit, so truncating to
Expand Down