Skip to content

Conversation

@danielhumanmod
Copy link

@danielhumanmod danielhumanmod commented Nov 30, 2025

Which issue does this PR close?

Closes #2708 .

Rationale for this change

Spark’s RegExpExtract and RegExpExtractAll expressions were previously not accelerated through Comet.
This PR adds full Rust-native implementations and the corresponding Spark–Comet bridging logic, enabling these two UDFs to run on the native engine while preserving Spark-compatible semantics.

What changes are included in this PR?

  1. Spark-side serde / bridge
    • Added CometRegExpExtract and CometRegExpExtractAll in strings.scala
  2. Rust-native UDF implementation
    • Introduced SparkRegExpExtract and SparkRegExpExtractAll as ScalarUDFImpl.
  3. QueryPlanSerde integration
    • Wired both expressions into the Comet proto conversion pipeline.

How are these changes tested?

  • Added new test cases in CometStringExpressionSuite
  • Added Rust-side unit tests for core extract and extract_all semantics.
  • All existing string expression tests continue to pass.

@danielhumanmod danielhumanmod changed the title feat: Add support for RegExpExtract/RegExpExtractAll (WIP) feat: Add support for RegExpExtract/RegExpExtractAll Dec 1, 2025
@danielhumanmod danielhumanmod marked this pull request as ready for review December 1, 2025 06:21
fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
// regexp_extract always returns String
Ok(DataType::Utf8)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@danielhumanmod , could we verify we dont return a LargeUtf8 as well ?

ColumnarValue::Scalar(ScalarValue::Int32(Some(i))) => *i as usize,
_ => {
return exec_err!("regexp_extract idx must be an integer literal");
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we sure that this is only int32 and not higher / lower ints like In64 , In8 etc ?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. I checked the Spark's implementation, the index there is cast into i32 only. I was wondering if we should follow their behavior

https://github.com/apache/spark/blob/branch-3.5/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/regexpExpressions.scala#L811C21-L811C33

"parquet.enable.dictionary" -> "true") {
// Use repeated values to trigger dictionary encoding
val data = (0 until 1000).map(i => {
val text = if (i % 3 == 0) "a1b2c3" else if (i % 3 == 1) "x5y6" else "no-match"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe that the tests should be a little more exhaustive and test out supr large strings / mixed inputs and multiple complex regex patterns

// Check if the pattern is compatible with Spark or allow incompatible patterns
expr.regexp match {
case Literal(pattern, DataTypes.StringType) =>
if (!RegExp.isSupportedPattern(pattern.toString) &&
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What are the limitations of RegExp.isSupportedPattern(pattern.toString) here ? Could we make sure that we are in absolute coherence with spark perhaps ?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For now it is a placeholder introduced in this PR

@andygrove when you have a chance, could you share a bit more context on its purpose? I’d really appreciate it. Thank you!

case Literal(_, DataTypes.IntegerType) =>
Compatible()
case _ =>
Unsupported(Some("Only literal group index is supported"))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could the input for group idx be a different numeric type (byte / int / long / short etc) ?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch, will add support for these data types

// Pattern must be a literal string
let pattern_str = match pattern {
ColumnarValue::Scalar(ScalarValue::Utf8(Some(s))) => s.clone(),
_ => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might have to check if the string input is LargeUtf8 as well or ensure we are only sending Utf8 from spark side

match &args.args[2] {
ColumnarValue::Scalar(ScalarValue::Int32(Some(i))) => *i as usize,
_ => {
return exec_err!("regexp_extract_all idx must be an integer literal");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We might want to have one common DataFusion::InternalError to make sure we throw right exception all along

internal_datafusion_err!("Invalid regex pattern '{}': {}", pattern_str, e)
})?;

match subject {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could probably remove some duplication in the code with regex parsing here

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will extract some shared code, thanks for the suggestion!

fn extract_group(text: &str, regex: &Regex, idx: usize) -> Result<String> {
match regex.captures(text) {
Some(caps) => {
// Spark behavior: throw error if group index is out of bounds
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the reference! After checking the Spark code

  1. If index within valid range
    • returns the matched content if it exists
    • otherwise (i.e., the group is null), Spark returns an empty string
  2. If index out of range, throws an exception (code)

the implementation should be aligned with the behavior


// idx must be a literal int
let idx_val = match idx {
ColumnarValue::Scalar(ScalarValue::Int32(Some(i))) => *i as usize,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Negative i will lead to a big idx_val here. Does it need some kind of validation ?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good call, will fix this, thanks!


// Compile regex once
let regex = Regex::new(&pattern_str).map_err(|e| {
internal_datafusion_err!("Invalid regex pattern '{}': {}", pattern_str, e)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
internal_datafusion_err!("Invalid regex pattern '{}': {}", pattern_str, e)
exec_err!("Invalid regex pattern '{}': {}", pattern_str, e)

Invalid regex is an user error.


// Compile regex once
let regex = Regex::new(&pattern_str).map_err(|e| {
internal_datafusion_err!("Invalid regex pattern '{}': {}", pattern_str, e)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
internal_datafusion_err!("Invalid regex pattern '{}': {}", pattern_str, e)
exec_err!("Invalid regex pattern '{}': {}", pattern_str, e)

user error

@codecov-commenter
Copy link

codecov-commenter commented Dec 1, 2025

Codecov Report

❌ Patch coverage is 9.09091% with 40 lines in your changes missing coverage. Please review.
✅ Project coverage is 54.24%. Comparing base (f09f8af) to head (ff1ebd6).
⚠️ Report is 730 commits behind head on main.

Files with missing lines Patch % Lines
...rc/main/scala/org/apache/comet/serde/strings.scala 4.76% 40 Missing ⚠️
Additional details and impacted files
@@             Coverage Diff              @@
##               main    #2831      +/-   ##
============================================
- Coverage     56.12%   54.24%   -1.88%     
- Complexity      976     1435     +459     
============================================
  Files           119      167      +48     
  Lines         11743    15232    +3489     
  Branches       2251     2531     +280     
============================================
+ Hits           6591     8263    +1672     
- Misses         4012     5752    +1740     
- Partials       1140     1217      +77     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

1. more data type support in scala side
2. unify errors as execution ones
3. reduce code duplication
4. negative index check
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Add support for RegExpExtract and RegExpExtractAll

4 participants