Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
119 changes: 64 additions & 55 deletions sdk/python/feast/infra/compute_engines/aws_lambda/app.py
Original file line number Diff line number Diff line change
@@ -1,84 +1,93 @@
import base64
import json
import sys
import logging
import tempfile
import traceback
from pathlib import Path

import pyarrow.parquet as pq

from feast import FeatureStore
from feast.constants import FEATURE_STORE_YAML_ENV_NAME
from feast.infra.materialization.local_engine import DEFAULT_BATCH_SIZE
from feast.infra.compute_engines.aws_lambda.lambda_engine import DEFAULT_BATCH_SIZE
from feast.utils import _convert_arrow_to_proto, _run_pyarrow_field_mapping

logger = logging.getLogger()
logger.setLevel("INFO")

def handler(event, context):
"""Provide an event that contains the following keys:

- operation: one of the operations in the operations dict below
- tableName: required for operations that interact with DynamoDB
- payload: a parameter to pass to the operation being performed
def handler(event, context):
"""Load a parquet file and write the feature values to the online store.

Args:
event (dict): payload containing the following keys:
FEATURE_STORE_YAML_ENV_NAME: Base64 encoded feature store config
view_name: Name of FeatureView to be materialized
view_type: Type of FeatureView
path: Path to parquet batch file on S3 bucket
context (dict): Lambda runtime context, not used.
"""
print("Received event: " + json.dumps(event, indent=2), flush=True)
logger.info(f"Received event: {event}")

try:
config_base64 = event[FEATURE_STORE_YAML_ENV_NAME]

config_bytes = base64.b64decode(config_base64)

# Create a new unique directory for writing feature_store.yaml
repo_path = Path(tempfile.mkdtemp())

with open(repo_path / "feature_store.yaml", "wb") as f:
f.write(config_bytes)
with tempfile.TemporaryDirectory() as repo_posix_path:
repo_path = Path(repo_posix_path)

# Initialize the feature store
store = FeatureStore(repo_path=str(repo_path.resolve()))
with open(repo_path / "feature_store.yaml", "wb") as f:
f.write(config_bytes)

view_name = event["view_name"]
view_type = event["view_type"]
path = event["path"]
# Initialize the feature store
store = FeatureStore(repo_path=str(repo_path.resolve()))

bucket = path[len("s3://") :].split("/", 1)[0]
key = path[len("s3://") :].split("/", 1)[1]
print(f"Inferred Bucket: `{bucket}` Key: `{key}`", flush=True)
view_name = event["view_name"]
view_type = event["view_type"]
path = event["path"]

if view_type == "batch":
# TODO: This probably needs to be become `store.get_batch_feature_view` at some point.
feature_view = store.get_feature_view(view_name)
else:
feature_view = store.get_stream_feature_view(view_name)
bucket, key = path[len("s3://") :].split("/", 1)
logger.info(f"Inferred Bucket: `{bucket}` Key: `{key}`")

print(f"Got Feature View: `{feature_view}`", flush=True)
if view_type == "batch":
# TODO: This probably needs to be become `store.get_batch_feature_view` at some point. # noqa: E501,W505
feature_view = store.get_feature_view(view_name)
else:
feature_view = store.get_stream_feature_view(view_name)

table = pq.read_table(path)
if feature_view.batch_source.field_mapping is not None:
table = _run_pyarrow_field_mapping(
table, feature_view.batch_source.field_mapping
logger.info(
f"Got Feature View: `{feature_view.name}`, \
last updated: {feature_view.last_updated_timestamp}"
)

join_key_to_value_type = {
entity.name: entity.dtype.to_value_type()
for entity in feature_view.entity_columns
}

written_rows = 0

for batch in table.to_batches(DEFAULT_BATCH_SIZE):
rows_to_write = _convert_arrow_to_proto(
batch, feature_view, join_key_to_value_type
)
store._provider.online_write_batch(
store.config,
feature_view,
rows_to_write,
lambda x: None,
table = pq.read_table(path)
if feature_view.batch_source.field_mapping is not None:
table = _run_pyarrow_field_mapping(
table, feature_view.batch_source.field_mapping
)

join_key_to_value_type = {
entity.name: entity.dtype.to_value_type()
for entity in feature_view.entity_columns
}

written_rows = 0

for batch in table.to_batches(DEFAULT_BATCH_SIZE):
rows_to_write = _convert_arrow_to_proto(
batch, feature_view, join_key_to_value_type
)
store._provider.online_write_batch(
store.config,
feature_view,
rows_to_write,
lambda x: None,
)
written_rows += len(rows_to_write)
logger.info(
f"Successfully updated {written_rows} rows.",
extra={"num_updated_rows": written_rows, "feature_view": view_name},
)
written_rows += len(rows_to_write)
return {"written_rows": written_rows}
except Exception as e:
print(f"Exception: {e}", flush=True)
print("Traceback:", flush=True)
print(traceback.format_exc(), flush=True)
sys.exit(1)
except Exception:
logger.exception("Error in processing materialization.")
raise
Original file line number Diff line number Diff line change
Expand Up @@ -108,9 +108,14 @@ def update(
r = self.lambda_client.create_function(
FunctionName=self.lambda_name,
PackageType="Image",
Role=self.repo_config.batch_engine.lambda_role,
Code={"ImageUri": self.repo_config.batch_engine.materialization_image},
Role=self.repo_config.batch_engine_config.lambda_role,
Code={
"ImageUri": self.repo_config.batch_engine_config.materialization_image
},
Timeout=DEFAULT_TIMEOUT,
LoggingConfig={
"LogFormat": "JSON",
},
Tags={
"feast-owned": "True",
"project": project,
Expand Down
Loading