The earlier vignettes introduced the main workflow:
This vignette focuses on patterns that become important once the workflow is used repeatedly. In practice, a production feature pipeline needs more than a single successful run. You need a way to preview changes, inspect failures, refresh old rows when definitions change, control schema evolution, and monitor what the pipeline did.
featdelta is not a scheduler, an orchestration platform,
or a replacement for database permissions. It is the R
feature-engineering component inside that larger workflow. This workshop
shows how to use the package in a way that is more comfortable to
maintain over time.
We will use a small device-monitoring example. Imagine a database table that receives equipment readings over time. Analysts want to derive features in R and store them in a separate table for dashboards, alerts, or predictive-maintenance models.
library(DBI)
library(RSQLite)
library(featdelta)
con <- dbConnect(SQLite(), ":memory:")
readings <- data.frame(
reading_id = 1:10,
device_id = c("A", "A", "B", "C", "A", "B", "C", "A", "B", "C"),
temperature = c(68, 71, 75, 70, 83, 78, 72, 88, 81, 74),
vibration = c(0.12, 0.15, 0.31, 0.18, 0.44, 0.39, 0.22, 0.55, 0.41, 0.24),
pressure = c(31, 33, 36, 32, 39, 37, 34, 42, 38, 35),
runtime_hours = c(120, 125, 210, 88, 130, 216, 93, 136, 223, 97),
stringsAsFactors = FALSE
)
day_one <- 1:6
day_two <- 7:10
dbWriteTable(
con,
"raw_readings",
readings[day_one, ],
overwrite = TRUE
)
source_sql <- "
SELECT
reading_id,
device_id,
temperature,
vibration,
pressure,
runtime_hours
FROM raw_readings
ORDER BY reading_id
"
key <- "reading_id"
dbGetQuery(con, source_sql)
#> reading_id device_id temperature vibration pressure runtime_hours
#> 1 1 A 68 0.12 31 120
#> 2 2 A 71 0.15 33 125
#> 3 3 B 75 0.31 36 210
#> 4 4 C 70 0.18 32 88
#> 5 5 A 83 0.44 39 130
#> 6 6 B 78 0.39 37 216This is a small table, but the operating pattern is realistic: raw readings arrive, R creates transformed features, and the database stores the results.
Before writing to a production database table, test the feature definitions on a small local extract. This lets you check names, row counts, and basic values without changing the database feature table.
defs <- fd_define(
temp_above_80 = temperature > 80,
vibration_score = vibration * runtime_hours,
pressure_per_hour = pressure / runtime_hours,
maintenance_flag = temp_above_80 | vibration_score > 80
)
raw_preview <- dbGetQuery(con, source_sql)
features_preview <- fd_compute(
data = raw_preview,
defs = defs,
key = key
)
features_preview
#> reading_id temp_above_80 vibration_score pressure_per_hour maintenance_flag
#> 1 1 FALSE 14.40 0.2583333 FALSE
#> 2 2 FALSE 18.75 0.2640000 FALSE
#> 3 3 FALSE 65.10 0.1714286 FALSE
#> 4 4 FALSE 15.84 0.3636364 FALSE
#> 5 5 TRUE 57.20 0.3000000 TRUE
#> 6 6 FALSE 84.24 0.1712963 TRUEThis is the first production habit: separate definition development
from database writes. When the output table has the expected shape, use
the same definitions in fd_run().
run_initial <- fd_run(
con = con,
sql = source_sql,
defs = defs,
key = key,
feat_table_name = "reading_features",
verbose = FALSE
)
dbGetQuery(con, "SELECT * FROM reading_features ORDER BY reading_id")
#> reading_id temp_above_80 vibration_score pressure_per_hour maintenance_flag
#> 1 1 0 14.40 0.2583333 0
#> 2 2 0 18.75 0.2640000 0
#> 3 3 0 65.10 0.1714286 0
#> 4 4 0 15.84 0.3636364 0
#> 5 5 1 57.20 0.3000000 1
#> 6 6 0 84.24 0.1712963 1fetch_limit, preview_n, and
return_data are useful during development, but remember
that fd_run() is still a database-writing function. If you
want to test a pipeline run without touching the production feature
table, use a development database or a separate feature table.
dev_report <- fd_run(
con = con,
sql = source_sql,
defs = defs,
key = key,
feat_table_name = "reading_features_dev",
fetch_limit = 3,
return_data = "both",
preview_n = 2,
verbose = FALSE
)
dev_report$preview$raw
#> reading_id device_id temperature vibration pressure runtime_hours
#> 1 1 A 68 0.12 31 120
#> 2 2 A 71 0.15 33 125
dev_report$preview$features
#> reading_id temp_above_80 vibration_score pressure_per_hour maintenance_flag
#> 1 1 FALSE 14.40 0.2583333 FALSE
#> 2 2 FALSE 18.75 0.2640000 FALSE
dev_report$data$features
#> reading_id temp_above_80 vibration_score pressure_per_hour maintenance_flag
#> 1 1 FALSE 14.40 0.2583333 FALSE
#> 2 2 FALSE 18.75 0.2640000 FALSE
#> 3 3 FALSE 65.10 0.1714286 FALSEThis pattern is useful when changing definitions. You can inspect a small run against a scratch table, then run the same definitions against the real feature table once the output looks right.
Every fd_run() returns a structured report. In
production, this object is more useful than the printed table because it
can be logged, inspected, or checked by monitoring code.
run_initial$success
#> [1] TRUE
run_initial$stage
#> [1] "complete"
run_initial$fetch$n_rows
#> [1] 6
run_initial$compute$feature_names
#> [1] "temp_above_80" "vibration_score" "pressure_per_hour"
#> [4] "maintenance_flag"
run_initial$upsert$counts
#> $would_insert
#> [1] 6
#>
#> $would_update
#> [1] 0A typical monitoring check can be simple: did the run succeed, which stage finished, how many rows were fetched, and how many rows were inserted or updated?
During development, a broken feature definition should be easy to diagnose. In a production job, you may also want a failed run to return a report instead of immediately stopping the whole script.
Set fail_fast = FALSE to capture stage failures in the
returned fd_run_report.
In the example below we define a feature, named
broken_feature, that won’t compute as the definition
requires a column that does not exist.
bad_defs <- fd_define(
temp_above_80 = temperature > 80,
broken_feature = missing_sensor_column / 10
)
failed_report <- fd_run(
con = con,
sql = source_sql,
defs = bad_defs,
key = key,
feat_table_name = "reading_features_broken",
fail_fast = FALSE,
return_data = "both",
verbose = FALSE
)
failed_report$success
#> [1] FALSE
failed_report$stage
#> [1] "compute"
failed_report$error
#> $stage
#> [1] "compute"
#>
#> $message
#> [1] "fd_compute(): one or more definition steps failed (see report)."
#>
#> $class
#> [1] "featdelta_compute_error" "simpleError"
#> [3] "error" "condition"
failed_report$compute$report
#> step_name type mode ok
#> 1 temp_above_80 column expression TRUE
#> 2 broken_feature column expression FALSE
#> error
#> 1 <NA>
#> 2 object 'missing_sensor_column' not found Possible reasons: the raw input column is missing; an earlier definition step was removed; the order of definition steps was changed; or an earlier block no longer returns the expected column.
#> error_raw error_class
#> 1 <NA> <NA>
#> 2 object 'missing_sensor_column' not found missing_object
#> hint
#> 1 <NA>
#> 2 Possible reasons: the raw input column is missing; an earlier definition step was removed; the order of definition steps was changed; or an earlier block no longer returns the expected column.
#> time_sec text output_names
#> 1 4.696846e-05 temperature > 80 temp_above_80
#> 2 2.250671e-04 missing_sensor_column/10 broken_featureThe returned report keeps the failure attached to the compute stage. When possible, it also keeps partial data and the compute report, which makes it easier to see which step failed.
In contrast, fail_fast = TRUE is useful when the
surrounding job scheduler should treat any stage failure as an immediate
error.
For regular daily processing, use the default
fetch_mode = "new_only". This mode processes only raw keys
that are not yet present in the feature table.
dbAppendTable(con, "raw_readings", readings[day_two, ])
#> [1] 4
run_incremental <- fd_run(
con = con,
sql = source_sql,
defs = defs,
key = key,
feat_table_name = "reading_features",
verbose = FALSE
)
# The default fetch mode is "new_only".
run_incremental$fetch$mode
#> [1] "new_only"
# Only the four new day-two readings were fetched.
run_incremental$fetch$n_rows
#> [1] 4
# Four rows were inserted, and no existing feature rows were updated.
run_incremental$upsert$counts
#> $would_insert
#> [1] 4
#>
#> $would_update
#> [1] 0Use fetch_mode = "all" when feature definitions changed
and existing rows should be recomputed.
defs_v2 <- fd_define(
temp_above_80 = temperature > 80,
vibration_score = vibration * runtime_hours,
pressure_per_hour = pressure / runtime_hours,
maintenance_flag = temp_above_80 | vibration_score > 80,
high_pressure = pressure >= 38
)
run_refresh <- fd_run(
con = con,
sql = source_sql,
defs = defs_v2,
key = key,
feat_table_name = "reading_features",
fetch_mode = "all",
verbose = FALSE
)
# Refresh mode fetches all rows returned by the source query.
run_refresh$fetch$mode
#> [1] "all"
run_refresh$fetch$n_rows
#> [1] 10
# All rows already existed in the feature table, so they were updated.
# No new rows were inserted.
run_refresh$upsert$counts
#> $would_insert
#> [1] 0
#>
#> $would_update
#> [1] 10This is one of the most important production distinctions.
new_only is for incremental append-style processing.
all is for refreshes, backfills, and definition
changes.
With alter_table = TRUE, missing feature columns are
added to the target table. This is convenient when a new feature is
introduced.
The package deliberately does not drop, rename, or retire old columns automatically. If a column exists in the database feature table but is no longer produced by the current definitions, it is reported as an extra column and left untouched.
In the example below, defs_v3 no longer produces the
previously created pressure_per_hour feature. We also use
fetch_mode = "all" so that all existing rows in the feature
table are refreshed with the current definitions.
defs_v3 <- fd_define(
temp_above_80 = temperature > 80,
vibration_score = vibration * runtime_hours,
maintenance_flag = temp_above_80 | vibration_score > 80,
high_pressure = pressure >= 38
)
run_removed_column <- fd_run(
con = con,
sql = source_sql,
defs = defs_v3,
key = key,
feat_table_name = "reading_features",
fetch_mode = "all",
verbose = FALSE
)
# All ten source rows already existed in the feature table, so they were
# counted as updates rather than inserts.
run_removed_column$upsert$counts
#> $would_insert
#> [1] 0
#>
#> $would_update
#> [1] 10
# The database table still contains the old `pressure_per_hour` column.
dbGetQuery(con, "pragma table_info(reading_features)")
#> cid name type notnull dflt_value pk
#> 1 0 reading_id INTEGER 0 NA 1
#> 2 1 temp_above_80 INTEGER 0 NA 0
#> 3 2 vibration_score REAL 0 NA 0
#> 4 3 pressure_per_hour REAL 0 NA 0
#> 5 4 maintenance_flag INTEGER 0 NA 0
#> 6 5 high_pressure INTEGER 0 NA 0This behavior is intentional. Removing a feature column from a shared database table can affect dashboards, models, and other users. In production, column removal should usually be handled as an explicit database migration.
If your organization requires approval before any schema change, set
alter_table = FALSE. Then a new feature column will cause
the run to fail instead of altering the table. To demonstrate this, we
define defs_v4 with a new feature,
thermal_load, which does not yet exist in
reading_features.
defs_v4 <- fd_define(
temp_above_80 = temperature > 80,
vibration_score = vibration * runtime_hours,
maintenance_flag = temp_above_80 | vibration_score > 80,
high_pressure = pressure >= 38,
thermal_load = temperature * runtime_hours
)
alter_error <- tryCatch(
fd_run(
con = con,
sql = source_sql,
defs = defs_v4,
key = key,
feat_table_name = "reading_features",
fetch_mode = "all",
alter_table = FALSE,
verbose = FALSE
),
error = function(e) conditionMessage(e)
)
alter_error
#> [1] "Target table `reading_features` is missing columns: thermal_load. Set alter_table=TRUE to add them."Some pipelines should only append new feature rows. In those workflows, an incoming key that already exists in the feature table may indicate a logic or scheduling problem.
Set update_table = FALSE to make existing-key conflicts
fail. In the example below, fetch_mode = "all" sends rows
whose keys already exist in reading_features, so
insert-only mode reports a conflict.
insert_only_error <- tryCatch(
fd_run(
con = con,
sql = source_sql,
defs = defs_v3,
key = key,
feat_table_name = "reading_features",
fetch_mode = "all",
update_table = FALSE,
verbose = FALSE
),
error = function(e) conditionMessage(e)
)
insert_only_error
#> [1] "Conflicts detected (update_table=FALSE). Example key values: 1, 2, 3, 4, 5, 6, 7, 8, 9, 10"This is stricter than the default upsert behavior. Use it when accidental updates should be blocked.
For large feature tables, writing everything in one batch may be
inconvenient. Use chunk_size to stage and merge rows in
batches.
fd_run(
con = con,
sql = source_sql,
defs = defs,
key = key,
feat_table_name = "reading_features",
chunk_size = 10000
)Chunking does not change which features are computed. It changes how computed rows are written to the database.
The key column is the contract between the raw query and the feature table. In production, make sure that:
Tables created by fd_upsert() include a primary key
automatically. If you are using a pre-existing table, check the database
schema before running the pipeline.
featdelta does not currently coordinate concurrent
writers. Avoid running multiple fd_run() or
fd_upsert() calls against the same feature table at the
same time.
Concurrent writes to different feature tables are independent, but concurrent writes to the same target table should be handled by your scheduler, database permissions, or orchestration layer.
Before using a feature pipeline repeatedly, it is worth checking a few things.
fd_compute().fd_run() has been tested against a development
table.new_only or
all.alter_table = TRUE or
FALSE.The package handles the repeated database mechanics, but these operational decisions still belong to the user or the surrounding production system.
Production use is mostly about making the feature workflow predictable. Develop definitions locally, use a scratch table for small end-to-end tests, inspect the run report, choose the correct refresh mode, and be intentional about schema changes.
For regular repeated jobs, fd_run() is the main entry
point. The supporting functions, especially fd_compute(),
fd_fetch(), and fd_upsert(), are there when
you need to inspect or test individual stages.