Basic Usages

  1. Create a data pipeline

    from seeknal.project import Project
    from seeknal.flow import (
        Flow,
        FlowInput,
        FlowOutput,
        FlowInputEnum,
        FlowOutputEnum,
    )
    from seeknal.tasks.sparkengine import SparkEngineTask
    from seeknal.tasks.duckdb import DuckDBTask
    
    project = Project(name="my_project", description="My project")
    project.get_or_create()
    
    flow_input = FlowInput(kind=FlowInputEnum.HIVE_TABLE, value="my_df")
    flow_output = FlowOutput(kind=FlowOutputEnum.SPARK_DATAFRAME)
    
    # Develop a pipeline that mixes Spark and DuckDB.
    task_on_spark = SparkEngineTask().add_sql("SELECT * FROM __THIS__ WHERE day = date_format(current_date(), 'yyyy-MM-dd')")
    task_on_duckdb = DuckDBTask().add_sql("SELECT id, lat, lon, movement_type, day FROM __THIS__")
    flow = Flow(
        name="my_flow",
        input=flow_input,
        tasks=[task_on_spark, task_on_duckdb],
        output=FlowOutput(),
    )
    # save the data pipeline
    flow.get_or_create()
    res = flow.run()
  2. Load the saved data pipeline

    project = Project(name="my_project", description="My project")
    project.get_or_create()
    
    flow = Flow(name="my_flow").get_or_create()
    res = flow.run()
  3. Save the results to a feature group

    from datetime import datetime
    from seeknal.entity import Entity
    from seeknal.featurestore.feature_group import (
        FeatureGroup,
        Materialization,
        OfflineMaterialization,
        OfflineStore,
        OfflineStoreEnum,
        FeatureStoreFileOutput,
        OnlineStore,
        OnlineStoreEnum,
        HistoricalFeatures,
        FeatureLookup,
        FillNull,
        GetLatestTimeStrategy,
        OnlineFeatures,
    )
    
    # Define a materialization for the offline feature store
    materialization = Materialization(event_time_col="day", 
    offline_materialization=OfflineMaterialization(
        store=OfflineStore(kind=OfflineStoreEnum.FILE, 
                           name="object_storage",
                           value=FeatureStoreFileOutput(path="s3a://warehouse/feature_store")), 
                           mode="overwrite", ttl=None),
        offline=True)
    # Define feature group
    loc_feature_group = FeatureGroup(
        name="location_feature_group",
        entity=Entity(name="user_movement", join_keys=["msisdn", "movement_type"]).get_or_create(),
        materialization=materialization,
    )
    # Attach transformation for create the feature group
    loc_feature_group.set_flow(flow)
    
    # Register all columns as features
    loc_feature_group.set_features()
    
    # Save feature group
    loc_feature_group.get_or_create()
    
    # materialize the feature group to offline feature store
    loc_feature_group.write(
        # store features from specific date to the latest
        feature_start_time=datetime(2019, 3, 5)
    )
  4. Load feature group from offline feature store

    loc_feature_group = FeatureGroup(name="location_feature_group").get_or_create()
    # lookup for all features of loc_feature_group
    fs = FeatureLookup(source=loc_feature_group)
    # impute null to 0.0
    fillnull = FillNull(value="0.0", dataType="double")
    # load the features from offline feature store
    hist = HistoricalFeatures(lookups=[fs], fill_nulls=[fillnull])
    df = hist.to_dataframe(feature_start_time=datetime(2019, 3, 5))
  5. Serve features to online feature store

    latest_features = hist.using_latest.serve()
    user_one = Entity(name="user_movement").get_or_create().set_key_values("05X5wBWKN3")
    user_one_features = latest_features.get_features(keys=[user_one])

Last updated