Skip to content

Commit 98246e2

Browse files
godaridillibabubibith4
authored andcommitted
Remove redundant metastore call from IcebergPageSourceProvider
1 parent a57845b commit 98246e2

File tree

6 files changed

+39
-22
lines changed

6 files changed

+39
-22
lines changed

presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergAbstractMetadata.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -372,6 +372,7 @@ public ConnectorTableLayoutResult getTableLayoutForConstraint(
372372
.setPartitionColumnPredicate(partitionColumnPredicate.simplify())
373373
.setPartitions(Optional.ofNullable(partitions.size() == 0 ? null : partitions))
374374
.setTable(handle)
375+
.setTableLocation(Optional.of(icebergTable.location()))
375376
.build());
376377
return new ConnectorTableLayoutResult(layout, constraint.getSummary());
377378
}

presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergPageSourceProvider.java

Lines changed: 7 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,6 @@
7979
import com.facebook.presto.spi.FixedPageSource;
8080
import com.facebook.presto.spi.PageIndexerFactory;
8181
import com.facebook.presto.spi.PrestoException;
82-
import com.facebook.presto.spi.SchemaTableName;
8382
import com.facebook.presto.spi.SplitContext;
8483
import com.facebook.presto.spi.connector.ConnectorPageSourceProvider;
8584
import com.facebook.presto.spi.connector.ConnectorTransactionHandle;
@@ -753,21 +752,18 @@ public ConnectorPageSource createPageSource(
753752
IcebergSplit split = (IcebergSplit) connectorSplit;
754753
IcebergTableHandle table = icebergLayout.getTable();
755754
if (similaritySearchEnabled && split.isAnn()) {
756-
SchemaTableName schemaTableName = table.getSchemaTableName();
757-
Table icebergTable = IcebergUtil.getHiveIcebergTable(
758-
metastore,
759-
hdfsEnvironment,
760-
tableOperationsConfig,
761-
manifestFileCache,
762-
session,
763-
catalogName,
764-
schemaTableName);
755+
String tableLocation = icebergLayout.getTableLocation()
756+
.orElseThrow(() -> new IllegalStateException("Table location is required for ANN queries"));
757+
758+
HdfsContext hdfsContext = new HdfsContext(session, table.getSchemaName(), table.getTableName(), split.getPath(), false);
759+
HdfsFileIO hdfsFileIO = new HdfsFileIO(manifestFileCache, hdfsEnvironment, hdfsContext);
765760

766761
return new ANNPageSource(
767762
new FixedPageSource(ImmutableList.of()),
768763
split.getQueryVector(),
769764
split.getTopN(),
770-
icebergTable);
765+
tableLocation,
766+
hdfsFileIO);
771767
}
772768

773769
List<ColumnHandle> columns = desiredColumns;

presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergTableLayoutHandle.java

Lines changed: 23 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ public class IcebergTableLayoutHandle
4242
private final Map<String, IcebergColumnHandle> predicateColumns;
4343
private final Optional<Set<IcebergColumnHandle>> requestedColumns;
4444
private final IcebergTableHandle table;
45+
private final Optional<String> tableLocation;
4546

4647
@JsonCreator
4748
public IcebergTableLayoutHandle(
@@ -53,7 +54,8 @@ public IcebergTableLayoutHandle(
5354
@JsonProperty("requestedColumns") Optional<Set<IcebergColumnHandle>> requestedColumns,
5455
@JsonProperty("pushdownFilterEnabled") boolean pushdownFilterEnabled,
5556
@JsonProperty("partitionColumnPredicate") TupleDomain<ColumnHandle> partitionColumnPredicate,
56-
@JsonProperty("table") IcebergTableHandle table)
57+
@JsonProperty("table") IcebergTableHandle table,
58+
@JsonProperty("tableLocation") Optional<String> tableLocation)
5759
{
5860
this(
5961
partitionColumns.stream().map(BaseHiveColumnHandle.class::cast).collect(toList()),
@@ -65,7 +67,8 @@ public IcebergTableLayoutHandle(
6567
pushdownFilterEnabled,
6668
partitionColumnPredicate,
6769
Optional.empty(),
68-
table);
70+
table,
71+
tableLocation);
6972
}
7073

7174
public IcebergTableLayoutHandle(
@@ -78,7 +81,8 @@ public IcebergTableLayoutHandle(
7881
boolean pushdownFilterEnabled,
7982
TupleDomain<ColumnHandle> partitionColumnPredicate,
8083
Optional<List<HivePartition>> partitions,
81-
IcebergTableHandle table)
84+
IcebergTableHandle table,
85+
Optional<String> tableLocation)
8286
{
8387
super(
8488
partitionColumns,
@@ -92,6 +96,7 @@ public IcebergTableLayoutHandle(
9296
this.predicateColumns = requireNonNull(predicateColumns, "predicateColumns is null");
9397
this.requestedColumns = requireNonNull(requestedColumns, "requestedColumns is null");
9498
this.table = requireNonNull(table, "table is null");
99+
this.tableLocation = requireNonNull(tableLocation, "tableLocation is null");
95100
}
96101

97102
@JsonProperty
@@ -118,6 +123,12 @@ public IcebergTableHandle getTable()
118123
return table;
119124
}
120125

126+
@JsonProperty
127+
public Optional<String> getTableLocation()
128+
{
129+
return tableLocation;
130+
}
131+
121132
public TupleDomain<IcebergColumnHandle> getValidPredicate()
122133
{
123134
TupleDomain<IcebergColumnHandle> predicate = getDomainPredicate()
@@ -173,6 +184,7 @@ public static class Builder
173184
private TupleDomain<ColumnHandle> partitionColumnPredicate;
174185
private Optional<List<HivePartition>> partitions;
175186
private IcebergTableHandle table;
187+
private Optional<String> tableLocation = Optional.empty();
176188

177189
public Builder setPartitionColumns(List<BaseHiveColumnHandle> partitionColumns)
178190
{
@@ -234,6 +246,12 @@ public Builder setTable(IcebergTableHandle table)
234246
return this;
235247
}
236248

249+
public Builder setTableLocation(Optional<String> tableLocation)
250+
{
251+
this.tableLocation = tableLocation;
252+
return this;
253+
}
254+
237255
public IcebergTableLayoutHandle build()
238256
{
239257
return new IcebergTableLayoutHandle(
@@ -246,7 +264,8 @@ public IcebergTableLayoutHandle build()
246264
pushdownFilterEnabled,
247265
partitionColumnPredicate,
248266
partitions,
249-
table);
267+
table,
268+
tableLocation);
250269
}
251270
}
252271
}

presto-iceberg/src/main/java/com/facebook/presto/iceberg/optimizer/IcebergFilterPushdown.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -178,6 +178,7 @@ protected ConnectorPushdownFilterResult getConnectorPushdownFilterResult(
178178
.setPartitionColumnPredicate(partitionColumnPredicate)
179179
.setPartitions(Optional.ofNullable(partitions.size() == 0 ? null : partitions))
180180
.setTable((IcebergTableHandle) tableHandle)
181+
.setTableLocation(Optional.of(icebergTable.location()))
181182
.build()),
182183
remainingExpressions.getDynamicFilterExpression());
183184
}

presto-iceberg/src/main/java/com/facebook/presto/iceberg/optimizer/IcebergPlanOptimizer.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -227,7 +227,8 @@ public PlanNode visitFilter(FilterNode filter, RewriteContext<Void> context)
227227
identityPartitionColumnPredicate.simplify()
228228
.intersect(icebergTableLayoutHandle.getPartitionColumnPredicate()),
229229
icebergTableLayoutHandle.getPartitions(),
230-
icebergTableLayoutHandle.getTable()));
230+
icebergTableLayoutHandle.getTable(),
231+
icebergTableLayoutHandle.getTableLocation()));
231232
TableScanNode newTableScan = new TableScanNode(
232233
tableScan.getSourceLocation(),
233234
tableScan.getId(),

presto-iceberg/src/main/java/com/facebook/presto/iceberg/tvf/ANNPageSource.java

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@
3232
import io.github.jbellis.jvector.vector.VectorizationProvider;
3333
import io.github.jbellis.jvector.vector.types.VectorFloat;
3434
import io.github.jbellis.jvector.vector.types.VectorTypeSupport;
35-
import org.apache.iceberg.Table;
3635

3736
import java.io.IOException;
3837
import java.io.InputStream;
@@ -49,17 +48,19 @@ public class ANNPageSource
4948
private final List<Float> queryVector;
5049
private final int topN;
5150
private boolean finished;
52-
private Table icebergTable;
51+
private final String tableLocation;
52+
private final HdfsFileIO hdfsFileIO;
5353
private static final VectorTypeSupport vts = VectorizationProvider.getInstance().getVectorTypeSupport();
5454
private static final String VECTOR_INDEX_DIR = ".vector_index";
5555
private static final Logger log = Logger.get(ANNPageSource.class);
5656

57-
public ANNPageSource(ConnectorPageSource delegate, List<Float> queryVector, int topN, Table icebergTable)
57+
public ANNPageSource(ConnectorPageSource delegate, List<Float> queryVector, int topN, String tableLocation, HdfsFileIO hdfsFileIO)
5858
{
5959
this.delegate = delegate;
6060
this.queryVector = queryVector;
6161
this.topN = topN;
62-
this.icebergTable = icebergTable;
62+
this.tableLocation = tableLocation;
63+
this.hdfsFileIO = hdfsFileIO;
6364
}
6465

6566
@Override
@@ -92,7 +93,6 @@ public Page getNextPage()
9293
if (finished) {
9394
return null;
9495
}
95-
String tableLocation = icebergTable.location();
9696
log.info("tableLocation: %s", tableLocation);
9797

9898
String indexDirPath = tableLocation + "/" + VECTOR_INDEX_DIR;
@@ -102,7 +102,6 @@ public Page getNextPage()
102102
log.info("Loading node-to-row mapping from: %s", mappingPath);
103103
java.nio.file.Path tempIndexFile = null;
104104
try {
105-
HdfsFileIO hdfsFileIO = (HdfsFileIO) icebergTable.io();
106105
// Load mapping directly from stream
107106
NodeRowIdMapping mapping;
108107
try {

0 commit comments

Comments
 (0)