Skip to content
12 changes: 10 additions & 2 deletions sdk/python/feast/feature_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -2046,9 +2046,17 @@ def write_to_offline_store(
source_columns = [column for column, _ in column_names_and_types]
input_columns = df.columns.values.tolist()

if set(input_columns) != set(source_columns):
input_columns_set = set(input_columns)
source_columns_set = set(source_columns)

if input_columns_set != source_columns_set:
missing_expected_columns = sorted(source_columns_set - input_columns_set)
extra_unexpected_columns = sorted(input_columns_set - source_columns_set)

raise ValueError(
f"The input dataframe has columns {set(input_columns)} but the batch source has columns {set(source_columns)}."
"The input dataframe columns do not match the batch source columns. "
f"missing_expected_columns: {missing_expected_columns}, "
f"extra_unexpected_columns: {extra_unexpected_columns}."
)

if reorder_columns:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,11 +66,18 @@ def test_writing_incorrect_schema_fails(environment, universal_data_sources):
"created": [ts, ts],
},
)
with pytest.raises(ValueError):
expected_missing = ["acc_rate", "avg_daily_trips"]
expected_extra = ["incorrect_schema"]

with pytest.raises(ValueError, match="missing_expected_columns") as excinfo:
store.write_to_offline_store(
driver_fv.name, expected_df, allow_registry_cache=False
)

error_message = str(excinfo.value)
assert f"missing_expected_columns: {expected_missing}" in error_message
assert f"extra_unexpected_columns: {expected_extra}" in error_message


@pytest.mark.integration
@pytest.mark.universal_offline_stores
Expand Down
Loading