-
Notifications
You must be signed in to change notification settings - Fork 255
feat: Add support for RegExpExtract/RegExpExtractAll
#2831
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
RegExpExtract/RegExpExtractAll (WIP)RegExpExtract/RegExpExtractAll
| fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> { | ||
| // regexp_extract always returns String | ||
| Ok(DataType::Utf8) | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@danielhumanmod , could we verify we dont return a LargeUtf8 as well ?
| ColumnarValue::Scalar(ScalarValue::Int32(Some(i))) => *i as usize, | ||
| _ => { | ||
| return exec_err!("regexp_extract idx must be an integer literal"); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are we sure that this is only int32 and not higher / lower ints like In64 , In8 etc ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point. I checked the Spark's implementation, the index there is cast into i32 only. I was wondering if we should follow their behavior
| "parquet.enable.dictionary" -> "true") { | ||
| // Use repeated values to trigger dictionary encoding | ||
| val data = (0 until 1000).map(i => { | ||
| val text = if (i % 3 == 0) "a1b2c3" else if (i % 3 == 1) "x5y6" else "no-match" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe that the tests should be a little more exhaustive and test out supr large strings / mixed inputs and multiple complex regex patterns
| // Check if the pattern is compatible with Spark or allow incompatible patterns | ||
| expr.regexp match { | ||
| case Literal(pattern, DataTypes.StringType) => | ||
| if (!RegExp.isSupportedPattern(pattern.toString) && |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What are the limitations of RegExp.isSupportedPattern(pattern.toString) here ? Could we make sure that we are in absolute coherence with spark perhaps ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For now it is a placeholder introduced in this PR
@andygrove when you have a chance, could you share a bit more context on its purpose? I’d really appreciate it. Thank you!
| case Literal(_, DataTypes.IntegerType) => | ||
| Compatible() | ||
| case _ => | ||
| Unsupported(Some("Only literal group index is supported")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could the input for group idx be a different numeric type (byte / int / long / short etc) ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch, will add support for these data types
| // Pattern must be a literal string | ||
| let pattern_str = match pattern { | ||
| ColumnarValue::Scalar(ScalarValue::Utf8(Some(s))) => s.clone(), | ||
| _ => { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Might have to check if the string input is LargeUtf8 as well or ensure we are only sending Utf8 from spark side
| match &args.args[2] { | ||
| ColumnarValue::Scalar(ScalarValue::Int32(Some(i))) => *i as usize, | ||
| _ => { | ||
| return exec_err!("regexp_extract_all idx must be an integer literal"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We might want to have one common DataFusion::InternalError to make sure we throw right exception all along
| internal_datafusion_err!("Invalid regex pattern '{}': {}", pattern_str, e) | ||
| })?; | ||
|
|
||
| match subject { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could probably remove some duplication in the code with regex parsing here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will extract some shared code, thanks for the suggestion!
| fn extract_group(text: &str, regex: &Regex, idx: usize) -> Result<String> { | ||
| match regex.captures(text) { | ||
| Some(caps) => { | ||
| // Spark behavior: throw error if group index is out of bounds |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
AFAIS Spark returns an empty string when there is no such group - https://github.com/apache/spark/blob/branch-3.5/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/regexpExpressions.scala#L809-L821
One of the tests below confirms that empty string is expected - https://github.com/apache/datafusion-comet/pull/2831/files#diff-a5bdde2aa2b633f697d86aa0e609cd72610de14e20e3d05c2e97763335c9e791R402
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the reference! After checking the Spark code
- If index within valid range
- returns the matched content if it exists
- otherwise (i.e., the group is null), Spark returns an empty string
- If index out of range, throws an exception (code)
the implementation should be aligned with the behavior
|
|
||
| // idx must be a literal int | ||
| let idx_val = match idx { | ||
| ColumnarValue::Scalar(ScalarValue::Int32(Some(i))) => *i as usize, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Negative i will lead to a big idx_val here. Does it need some kind of validation ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good call, will fix this, thanks!
|
|
||
| // Compile regex once | ||
| let regex = Regex::new(&pattern_str).map_err(|e| { | ||
| internal_datafusion_err!("Invalid regex pattern '{}': {}", pattern_str, e) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| internal_datafusion_err!("Invalid regex pattern '{}': {}", pattern_str, e) | |
| exec_err!("Invalid regex pattern '{}': {}", pattern_str, e) |
Invalid regex is an user error.
|
|
||
| // Compile regex once | ||
| let regex = Regex::new(&pattern_str).map_err(|e| { | ||
| internal_datafusion_err!("Invalid regex pattern '{}': {}", pattern_str, e) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| internal_datafusion_err!("Invalid regex pattern '{}': {}", pattern_str, e) | |
| exec_err!("Invalid regex pattern '{}': {}", pattern_str, e) |
user error
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #2831 +/- ##
============================================
- Coverage 56.12% 54.24% -1.88%
- Complexity 976 1435 +459
============================================
Files 119 167 +48
Lines 11743 15232 +3489
Branches 2251 2531 +280
============================================
+ Hits 6591 8263 +1672
- Misses 4012 5752 +1740
- Partials 1140 1217 +77 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
1. more data type support in scala side 2. unify errors as execution ones 3. reduce code duplication 4. negative index check
Which issue does this PR close?
Closes #2708 .
Rationale for this change
Spark’s RegExpExtract and RegExpExtractAll expressions were previously not accelerated through Comet.
This PR adds full Rust-native implementations and the corresponding Spark–Comet bridging logic, enabling these two UDFs to run on the native engine while preserving Spark-compatible semantics.
What changes are included in this PR?
• Added CometRegExpExtract and CometRegExpExtractAll in strings.scala
• Introduced SparkRegExpExtract and SparkRegExpExtractAll as ScalarUDFImpl.
• Wired both expressions into the Comet proto conversion pipeline.
How are these changes tested?