Basic Usages
Create a data pipeline
from seeknal.project import Project from seeknal.flow import ( Flow, FlowInput, FlowOutput, FlowInputEnum, FlowOutputEnum, ) from seeknal.tasks.sparkengine import SparkEngineTask from seeknal.tasks.duckdb import DuckDBTask project = Project(name="my_project", description="My project") project.get_or_create() flow_input = FlowInput(kind=FlowInputEnum.HIVE_TABLE, value="my_df") flow_output = FlowOutput(kind=FlowOutputEnum.SPARK_DATAFRAME) # Develop a pipeline that mixes Spark and DuckDB. task_on_spark = SparkEngineTask().add_sql("SELECT * FROM __THIS__ WHERE day = date_format(current_date(), 'yyyy-MM-dd')") task_on_duckdb = DuckDBTask().add_sql("SELECT id, lat, lon, movement_type, day FROM __THIS__") flow = Flow( name="my_flow", input=flow_input, tasks=[task_on_spark, task_on_duckdb], output=FlowOutput(), ) # save the data pipeline flow.get_or_create() res = flow.run()
Load the saved data pipeline
project = Project(name="my_project", description="My project") project.get_or_create() flow = Flow(name="my_flow").get_or_create() res = flow.run()
Save the results to a feature group
from datetime import datetime from seeknal.entity import Entity from seeknal.featurestore.feature_group import ( FeatureGroup, Materialization, OfflineMaterialization, OfflineStore, OfflineStoreEnum, FeatureStoreFileOutput, OnlineStore, OnlineStoreEnum, HistoricalFeatures, FeatureLookup, FillNull, GetLatestTimeStrategy, OnlineFeatures, ) # Define a materialization for the offline feature store materialization = Materialization(event_time_col="day", offline_materialization=OfflineMaterialization( store=OfflineStore(kind=OfflineStoreEnum.FILE, name="object_storage", value=FeatureStoreFileOutput(path="s3a://warehouse/feature_store")), mode="overwrite", ttl=None), offline=True) # Define feature group loc_feature_group = FeatureGroup( name="location_feature_group", entity=Entity(name="user_movement", join_keys=["msisdn", "movement_type"]).get_or_create(), materialization=materialization, ) # Attach transformation for create the feature group loc_feature_group.set_flow(flow) # Register all columns as features loc_feature_group.set_features() # Save feature group loc_feature_group.get_or_create() # materialize the feature group to offline feature store loc_feature_group.write( # store features from specific date to the latest feature_start_time=datetime(2019, 3, 5) )
Load feature group from offline feature store
loc_feature_group = FeatureGroup(name="location_feature_group").get_or_create() # lookup for all features of loc_feature_group fs = FeatureLookup(source=loc_feature_group) # impute null to 0.0 fillnull = FillNull(value="0.0", dataType="double") # load the features from offline feature store hist = HistoricalFeatures(lookups=[fs], fill_nulls=[fillnull]) df = hist.to_dataframe(feature_start_time=datetime(2019, 3, 5))
Serve features to online feature store
latest_features = hist.using_latest.serve() user_one = Entity(name="user_movement").get_or_create().set_key_values("05X5wBWKN3") user_one_features = latest_features.get_features(keys=[user_one])
Last updated