Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
325aaba
Add ReactFlow visualization for Feast registry metadata
devin-ai-integration[bot] Apr 26, 2025
cc765e0
Merge master: Integrate ReactFlow visualization with RegistrySearch
devin-ai-integration[bot] Apr 27, 2025
16ba51a
Fix code formatting
devin-ai-integration[bot] Apr 27, 2025
0c129f7
Fix ReactFlow imports and UI styling
devin-ai-integration[bot] Apr 27, 2025
2220af0
Fix formatting issues in RegistryVisualization component
devin-ai-integration[bot] Apr 27, 2025
1e5bf79
Update dependencies for ReactFlow visualization
devin-ai-integration[bot] Apr 27, 2025
106a0fe
feat: Add directed graph arrows to ReactFlow visualization
devin-ai-integration[bot] Apr 27, 2025
0dfc260
Refactor visualization to use left-to-right layout with dashed lines
devin-ai-integration[bot] Apr 28, 2025
2907624
Increase node dimensions and rename visualization to Lineage
devin-ai-integration[bot] Apr 28, 2025
2e3bff9
Add color coding for different object types
devin-ai-integration[bot] Apr 28, 2025
ce93241
Replace colored backgrounds with color-coded icons
devin-ai-integration[bot] Apr 28, 2025
d09b909
Add background colors to icons for better visibility
devin-ai-integration[bot] Apr 28, 2025
3ab382f
Update node design with vertical division and light gray outline
devin-ai-integration[bot] Apr 28, 2025
0ffb2cd
Update box color to match icon color
devin-ai-integration[bot] Apr 28, 2025
730b62d
Space out DAG and add animated green overlay to edges
devin-ai-integration[bot] Apr 28, 2025
e3bc3f5
Add dummy NPM token for local development to fix CI yarn install issue
devin-ai-integration[bot] Apr 28, 2025
a01cbfc
Fix syntax error in yarn.lock file by regenerating it
devin-ai-integration[bot] Apr 28, 2025
cc6787d
Fix formatting issues in RegistryVisualization.tsx
devin-ai-integration[bot] Apr 28, 2025
68ad2e8
Fix protobuf import resolution by adding --path parameter to pbjs com…
devin-ai-integration[bot] Apr 28, 2025
f4ee41f
chore: Refactor transform on write (#5300)
franciscojavierarceo Apr 28, 2025
9005a3b
removing ui/.npmrc.ci
franciscojavierarceo Apr 28, 2025
1f1a5d0
updated
franciscojavierarceo Apr 29, 2025
7c2fa07
linter
franciscojavierarceo Apr 29, 2025
2d6ce2e
making items more clickable
franciscojavierarceo Apr 29, 2025
1719fb6
Delete ui/.npmrc.local
franciscojavierarceo Apr 29, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
247 changes: 152 additions & 95 deletions sdk/python/feast/feature_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -1518,134 +1518,191 @@ def _offline_write():

await run_in_threadpool(_offline_write)

def _get_feature_view_and_df_for_online_write(
def _validate_and_convert_input_data(
self,
feature_view_name: str,
df: Optional[pd.DataFrame] = None,
inputs: Optional[Union[Dict[str, List[Any]], pd.DataFrame]] = None,
allow_registry_cache: bool = True,
transform_on_write: bool = True,
):
feature_view_dict = {
fv_proto.name: fv_proto
for fv_proto in self.list_all_feature_views(allow_registry_cache)
}
try:
feature_view = feature_view_dict[feature_view_name]
except FeatureViewNotFoundException:
raise FeatureViewNotFoundException(feature_view_name, self.project)
df: Optional[pd.DataFrame],
inputs: Optional[Union[Dict[str, List[Any]], pd.DataFrame]],
) -> Optional[pd.DataFrame]:
"""
Validates input parameters and converts them to a pandas DataFrame.

Args:
df: Optional DataFrame input
inputs: Optional dictionary or DataFrame input

Returns:
Validated pandas DataFrame or None

Raises:
ValueError: If both df and inputs are provided
DataFrameSerializationError: If input data cannot be converted to DataFrame
"""
if df is not None and inputs is not None:
raise ValueError("Both df and inputs cannot be provided at the same time.")

if df is None and inputs is not None:
if isinstance(inputs, dict) or isinstance(inputs, List):
try:
df = pd.DataFrame(inputs)
return pd.DataFrame(inputs)
except Exception as _:
raise DataFrameSerializationError(inputs)
elif isinstance(inputs, pd.DataFrame):
pass
return inputs
else:
raise ValueError("inputs must be a dictionary or a pandas DataFrame.")

if df is not None and inputs is None:
if isinstance(df, dict) or isinstance(df, List):
try:
df = pd.DataFrame(df)
return pd.DataFrame(df)
except Exception as _:
raise DataFrameSerializationError(df)

if feature_view.features[0].vector_index and df is not None:
return df

def _transform_on_demand_feature_view_df(
self, feature_view: OnDemandFeatureView, df: pd.DataFrame
) -> pd.DataFrame:
"""
Apply transformations for an OnDemandFeatureView to the input dataframe.

Args:
feature_view: The OnDemandFeatureView containing the transformation
df: The input dataframe to transform

Returns:
Transformed dataframe

Raises:
Exception: For unsupported OnDemandFeatureView modes
"""
if feature_view.mode == "python" and isinstance(
feature_view.feature_transformation, PythonTransformation
):
input_dict = (
df.to_dict(orient="records")[0]
if feature_view.singleton
else df.to_dict(orient="list")
)

if feature_view.singleton:
transformed_rows = []

for i, row in df.iterrows():
output = feature_view.feature_transformation.udf(row.to_dict())
if i == 0:
transformed_rows = output
else:
for k in output:
if isinstance(output[k], list):
transformed_rows[k].extend(output[k])
else:
transformed_rows[k].append(output[k])

transformed_data = pd.DataFrame(transformed_rows)
else:
transformed_data = feature_view.feature_transformation.udf(input_dict)

if feature_view.write_to_online_store:
entities = [
self.get_entity(entity) for entity in (feature_view.entities or [])
]
join_keys = [entity.join_key for entity in entities if entity]
join_keys = [k for k in join_keys if k in input_dict.keys()]
transformed_df = (
pd.DataFrame(transformed_data)
if not isinstance(transformed_data, pd.DataFrame)
else transformed_data
)
input_df = pd.DataFrame(
[input_dict] if feature_view.singleton else input_dict
)
if input_df.shape[0] == transformed_df.shape[0]:
for k in input_dict:
if k not in transformed_data:
transformed_data[k] = input_dict[k]
transformed_df = pd.DataFrame(transformed_data)
else:
transformed_df = pd.merge(
transformed_df,
input_df,
how="left",
on=join_keys,
)
else:
# overwrite any transformed features and update the dictionary
for k in input_dict:
if k not in transformed_data:
transformed_data[k] = input_dict[k]

return pd.DataFrame(transformed_data)

elif feature_view.mode == "pandas" and isinstance(
feature_view.feature_transformation, PandasTransformation
):
transformed_df = feature_view.feature_transformation.udf(df)
for col in df.columns:
transformed_df[col] = df[col]
return transformed_df
else:
raise Exception("Unsupported OnDemandFeatureView mode")

def _validate_vector_features(self, feature_view, df: pd.DataFrame) -> None:
"""
Validates vector features in the DataFrame against the feature view specifications.

Args:
feature_view: The feature view containing vector feature specifications
df: The DataFrame to validate

Raises:
ValueError: If vector dimension constraints are violated
"""
if feature_view.features and feature_view.features[0].vector_index:
fv_vector_feature_name = feature_view.features[0].name
df_vector_feature_index = df.columns.get_loc(fv_vector_feature_name)

if feature_view.features[0].vector_length != 0:
if (
df.shape[df_vector_feature_index]
> feature_view.features[0].vector_length
):
raise ValueError(
f"The dataframe for {fv_vector_feature_name} column has {df.shape[1]} vectors which is greater than expected (i.e {feature_view.features[0].vector_length}) by feature view {feature_view.name}."
f"The dataframe for {fv_vector_feature_name} column has {df.shape[1]} vectors "
f"which is greater than expected (i.e {feature_view.features[0].vector_length}) "
f"by feature view {feature_view.name}."
)

def _get_feature_view_and_df_for_online_write(
self,
feature_view_name: str,
df: Optional[pd.DataFrame] = None,
inputs: Optional[Union[Dict[str, List[Any]], pd.DataFrame]] = None,
allow_registry_cache: bool = True,
transform_on_write: bool = True,
):
feature_view_dict = {
fv_proto.name: fv_proto
for fv_proto in self.list_all_feature_views(allow_registry_cache)
}
try:
feature_view = feature_view_dict[feature_view_name]
except FeatureViewNotFoundException:
raise FeatureViewNotFoundException(feature_view_name, self.project)

# Convert inputs/df to a consistent DataFrame format
df = self._validate_and_convert_input_data(df, inputs)

if df is not None:
self._validate_vector_features(feature_view, df)

# # Apply transformations if this is an OnDemandFeatureView with write_to_online_store=True
if (
isinstance(feature_view, OnDemandFeatureView)
and feature_view.write_to_online_store
and transform_on_write
):
if (
feature_view.mode == "python"
and isinstance(
feature_view.feature_transformation, PythonTransformation
)
and df is not None
):
input_dict = (
df.to_dict(orient="records")[0]
if feature_view.singleton
else df.to_dict(orient="list")
)
if feature_view.singleton:
transformed_rows = []

for i, row in df.iterrows():
output = feature_view.feature_transformation.udf(row.to_dict())
if i == 0:
transformed_rows = output
else:
for k in output:
if isinstance(output[k], list):
transformed_rows[k].extend(output[k])
else:
transformed_rows[k].append(output[k])

transformed_data = pd.DataFrame(transformed_rows)
else:
transformed_data = feature_view.feature_transformation.udf(
input_dict
)
if feature_view.write_to_online_store:
entities = [
self.get_entity(entity)
for entity in (feature_view.entities or [])
]
join_keys = [entity.join_key for entity in entities if entity]
join_keys = [k for k in join_keys if k in input_dict.keys()]
transformed_df = (
pd.DataFrame(transformed_data)
if not isinstance(transformed_data, pd.DataFrame)
else transformed_data
)
input_df = pd.DataFrame(
[input_dict] if feature_view.singleton else input_dict
)
if input_df.shape[0] == transformed_df.shape[0]:
for k in input_dict:
if k not in transformed_data:
transformed_data[k] = input_dict[k]
transformed_df = pd.DataFrame(transformed_data)
else:
transformed_df = pd.merge(
transformed_df,
input_df,
how="left",
on=join_keys,
)
else:
# overwrite any transformed features and update the dictionary
for k in input_dict:
if k not in transformed_data:
transformed_data[k] = input_dict[k]
df = pd.DataFrame(transformed_data)
elif feature_view.mode == "pandas" and isinstance(
feature_view.feature_transformation, PandasTransformation
):
transformed_df = feature_view.feature_transformation.udf(df)
if df is not None:
for col in df.columns:
transformed_df[col] = df[col]
df = transformed_df

else:
raise Exception("Unsupported OnDemandFeatureView mode")
df = self._transform_on_demand_feature_view_df(feature_view, df)

return feature_view, df

Expand Down
1 change: 1 addition & 0 deletions ui/jest.config.js
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ module.exports = {
"^react-native$": "react-native-web",
"^.+\\.module\\.(css|sass|scss)$": "identity-obj-proxy",
"chroma-js": "<rootDir>/node_modules/chroma-js/dist/chroma.min.cjs",
"^reactflow/dist/style\\.css$": "identity-obj-proxy",
},
moduleFileExtensions: [
"web.js",
Expand Down
Loading