---
title: "Production patterns"
output: rmarkdown::html_vignette
vignette: >
  %\VignetteIndexEntry{Production patterns}
  %\VignetteEngine{knitr::rmarkdown}
  %\VignetteEncoding{UTF-8}
---

```{r setup, include = FALSE}
knitr::opts_chunk$set(
  collapse = TRUE,
  comment = "#>"
)
```

## Purpose of this workshop

The earlier vignettes introduced the main workflow:

1. define feature logic in R;
2. compute features from raw rows;
3. store those features in a database table;
4. rerun the pipeline as new raw rows arrive.

This vignette focuses on patterns that become important once the workflow is
used repeatedly. In practice, a production feature pipeline needs more than a
single successful run. You need a way to preview changes, inspect failures,
refresh old rows when definitions change, control schema evolution, and monitor
what the pipeline did.

`featdelta` is not a scheduler, an orchestration platform, or a replacement for
database permissions. It is the R feature-engineering component inside that
larger workflow. This workshop shows how to use the package in a way that is
more comfortable to maintain over time.

## Workshop database

We will use a small device-monitoring example. Imagine a database table that
receives equipment readings over time. Analysts want to derive features in R and
store them in a separate table for dashboards, alerts, or predictive-maintenance
models.

```{r}
library(DBI)
library(RSQLite)
library(featdelta)

con <- dbConnect(SQLite(), ":memory:")

readings <- data.frame(
  reading_id = 1:10,
  device_id = c("A", "A", "B", "C", "A", "B", "C", "A", "B", "C"),
  temperature = c(68, 71, 75, 70, 83, 78, 72, 88, 81, 74),
  vibration = c(0.12, 0.15, 0.31, 0.18, 0.44, 0.39, 0.22, 0.55, 0.41, 0.24),
  pressure = c(31, 33, 36, 32, 39, 37, 34, 42, 38, 35),
  runtime_hours = c(120, 125, 210, 88, 130, 216, 93, 136, 223, 97),
  stringsAsFactors = FALSE
)

day_one <- 1:6
day_two <- 7:10

dbWriteTable(
  con,
  "raw_readings",
  readings[day_one, ],
  overwrite = TRUE
)

source_sql <- "
  SELECT
    reading_id,
    device_id,
    temperature,
    vibration,
    pressure,
    runtime_hours
  FROM raw_readings
  ORDER BY reading_id
"

key <- "reading_id"

dbGetQuery(con, source_sql)
```

This is a small table, but the operating pattern is realistic: raw readings
arrive, R creates transformed features, and the database stores the results.

## Start by testing definitions locally

Before writing to a production database table, test the feature definitions on a
small local extract. This lets you check names, row counts, and basic values
without changing the database feature table.

```{r}
defs <- fd_define(
  temp_above_80 = temperature > 80,
  vibration_score = vibration * runtime_hours,
  pressure_per_hour = pressure / runtime_hours,
  maintenance_flag = temp_above_80 | vibration_score > 80
)

raw_preview <- dbGetQuery(con, source_sql)

features_preview <- fd_compute(
  data = raw_preview,
  defs = defs,
  key = key
)

features_preview
```

This is the first production habit: separate definition development from
database writes. When the output table has the expected shape, use the same
definitions in `fd_run()`.

```{r}
run_initial <- fd_run(
  con = con,
  sql = source_sql,
  defs = defs,
  key = key,
  feat_table_name = "reading_features",
  verbose = FALSE
)

dbGetQuery(con, "SELECT * FROM reading_features ORDER BY reading_id")
```

## Use a development feature table for dry runs

`fetch_limit`, `preview_n`, and `return_data` are useful during development, but
remember that `fd_run()` is still a database-writing function. If you want to
test a pipeline run without touching the production feature table, use a
development database or a separate feature table.

```{r}
dev_report <- fd_run(
  con = con,
  sql = source_sql,
  defs = defs,
  key = key,
  feat_table_name = "reading_features_dev",
  fetch_limit = 3,
  return_data = "both",
  preview_n = 2,
  verbose = FALSE
)

dev_report$preview$raw
dev_report$preview$features
dev_report$data$features
```

This pattern is useful when changing definitions. You can inspect a small run
against a scratch table, then run the same definitions against the real feature
table once the output looks right.

## Let the report answer operational questions

Every `fd_run()` returns a structured report. In production, this object is more
useful than the printed table because it can be logged, inspected, or checked by
monitoring code.

```{r}
run_initial$success
run_initial$stage
run_initial$fetch$n_rows
run_initial$compute$feature_names
run_initial$upsert$counts
```

A typical monitoring check can be simple: did the run succeed, which stage
finished, how many rows were fetched, and how many rows were inserted or
updated?

## Handle compute failures without losing context

During development, a broken feature definition should be easy to diagnose. In a
production job, you may also want a failed run to return a report instead of
immediately stopping the whole script.

Set `fail_fast = FALSE` to capture stage failures in the returned
`fd_run_report`.

In the example below we define a feature, named `broken_feature`, that won't compute
as the definition requires a column that does not exist.

```{r}
bad_defs <- fd_define(
  temp_above_80 = temperature > 80,
  broken_feature = missing_sensor_column / 10
)

failed_report <- fd_run(
  con = con,
  sql = source_sql,
  defs = bad_defs,
  key = key,
  feat_table_name = "reading_features_broken",
  fail_fast = FALSE,
  return_data = "both",
  verbose = FALSE
)

failed_report$success
failed_report$stage
failed_report$error
failed_report$compute$report
```

The returned report keeps the failure attached to the compute stage. When
possible, it also keeps partial data and the compute report, which makes it
easier to see which step failed.

In contrast, `fail_fast = TRUE` is useful when the surrounding job scheduler
should treat any stage failure as an immediate error.

```{r, eval = FALSE}
fd_run(
  con = con,
  sql = source_sql,
  defs = bad_defs,
  key = key,
  feat_table_name = "reading_features",
  fail_fast = TRUE
)
```

## Choose the right refresh mode

For regular daily processing, use the default `fetch_mode = "new_only"`. This
mode processes only raw keys that are not yet present in the feature table.

```{r}
dbAppendTable(con, "raw_readings", readings[day_two, ])

run_incremental <- fd_run(
  con = con,
  sql = source_sql,
  defs = defs,
  key = key,
  feat_table_name = "reading_features",
  verbose = FALSE
)

# The default fetch mode is "new_only".
run_incremental$fetch$mode

# Only the four new day-two readings were fetched.
run_incremental$fetch$n_rows

# Four rows were inserted, and no existing feature rows were updated.
run_incremental$upsert$counts
```

Use `fetch_mode = "all"` when feature definitions changed and existing rows
should be recomputed.

```{r}
defs_v2 <- fd_define(
  temp_above_80 = temperature > 80,
  vibration_score = vibration * runtime_hours,
  pressure_per_hour = pressure / runtime_hours,
  maintenance_flag = temp_above_80 | vibration_score > 80,
  high_pressure = pressure >= 38
)

run_refresh <- fd_run(
  con = con,
  sql = source_sql,
  defs = defs_v2,
  key = key,
  feat_table_name = "reading_features",
  fetch_mode = "all",
  verbose = FALSE
)

# Refresh mode fetches all rows returned by the source query.
run_refresh$fetch$mode
run_refresh$fetch$n_rows

# All rows already existed in the feature table, so they were updated.
# No new rows were inserted.
run_refresh$upsert$counts
```

This is one of the most important production distinctions. `new_only` is for
incremental append-style processing. `all` is for refreshes, backfills, and
definition changes.

## Treat schema evolution as additive

With `alter_table = TRUE`, missing feature columns are added to the target
table. This is convenient when a new feature is introduced.

The package deliberately does not drop, rename, or retire old columns
automatically. If a column exists in the database feature table but is no longer
produced by the current definitions, it is reported as an extra column and left
untouched.

In the example below, `defs_v3` no longer produces the previously created
`pressure_per_hour` feature. We also use `fetch_mode = "all"` so that all
existing rows in the feature table are refreshed with the current definitions.

```{r}
defs_v3 <- fd_define(
  temp_above_80 = temperature > 80,
  vibration_score = vibration * runtime_hours,
  maintenance_flag = temp_above_80 | vibration_score > 80,
  high_pressure = pressure >= 38
)

run_removed_column <- fd_run(
  con = con,
  sql = source_sql,
  defs = defs_v3,
  key = key,
  feat_table_name = "reading_features",
  fetch_mode = "all",
  verbose = FALSE
)

# All ten source rows already existed in the feature table, so they were
# counted as updates rather than inserts.
run_removed_column$upsert$counts

# The database table still contains the old `pressure_per_hour` column.
dbGetQuery(con, "pragma table_info(reading_features)")
```

This behavior is intentional. Removing a feature column from a shared database
table can affect dashboards, models, and other users. In production, column
removal should usually be handled as an explicit database migration.

If your organization requires approval before any schema change, set
`alter_table = FALSE`. Then a new feature column will cause the run to fail
instead of altering the table. To demonstrate this, we define `defs_v4` with a
new feature, `thermal_load`, which does not yet exist in `reading_features`.

```{r}
defs_v4 <- fd_define(
  temp_above_80 = temperature > 80,
  vibration_score = vibration * runtime_hours,
  maintenance_flag = temp_above_80 | vibration_score > 80,
  high_pressure = pressure >= 38,
  thermal_load = temperature * runtime_hours
)

alter_error <- tryCatch(
  fd_run(
    con = con,
    sql = source_sql,
    defs = defs_v4,
    key = key,
    feat_table_name = "reading_features",
    fetch_mode = "all",
    alter_table = FALSE,
    verbose = FALSE
  ),
  error = function(e) conditionMessage(e)
)

alter_error
```

## Use insert-only mode when duplicates should fail

Some pipelines should only append new feature rows. In those workflows, an
incoming key that already exists in the feature table may indicate a logic or
scheduling problem.

Set `update_table = FALSE` to make existing-key conflicts fail. In the example
below, `fetch_mode = "all"` sends rows whose keys already exist in
`reading_features`, so insert-only mode reports a conflict.

```{r}
insert_only_error <- tryCatch(
  fd_run(
    con = con,
    sql = source_sql,
    defs = defs_v3,
    key = key,
    feat_table_name = "reading_features",
    fetch_mode = "all",
    update_table = FALSE,
    verbose = FALSE
  ),
  error = function(e) conditionMessage(e)
)

insert_only_error
```

This is stricter than the default upsert behavior. Use it when accidental
updates should be blocked.

## Use chunking for large writes

For large feature tables, writing everything in one batch may be inconvenient.
Use `chunk_size` to stage and merge rows in batches.

```{r, eval = FALSE}
fd_run(
  con = con,
  sql = source_sql,
  defs = defs,
  key = key,
  feat_table_name = "reading_features",
  chunk_size = 10000
)
```

Chunking does not change which features are computed. It changes how computed
rows are written to the database.

## Be explicit about keys and constraints

The key column is the contract between the raw query and the feature table. In
production, make sure that:

1. the source query returns the key column;
2. the key values are unique and non-missing in the source data;
3. the feature table has the same key column;
4. existing feature tables used with upsert behavior have a primary key or
   unique constraint on that key.

Tables created by `fd_upsert()` include a primary key automatically. If you are
using a pre-existing table, check the database schema before running the
pipeline.

## Avoid concurrent writes to the same feature table

`featdelta` does not currently coordinate concurrent writers. Avoid running
multiple `fd_run()` or `fd_upsert()` calls against the same feature table at the
same time.

Concurrent writes to different feature tables are independent, but concurrent
writes to the same target table should be handled by your scheduler, database
permissions, or orchestration layer.

## A practical production checklist

Before using a feature pipeline repeatedly, it is worth checking a few things.

1. The source SQL returns exactly the population you intend to process.
2. The source SQL returns the key column.
3. The key is unique and has no missing values.
4. Feature definitions have been tested with `fd_compute()`.
5. A small `fd_run()` has been tested against a development table.
6. The intended refresh mode is clear: `new_only` or `all`.
7. Schema evolution is intentional: `alter_table = TRUE` or `FALSE`.
8. The feature table has a primary key or unique constraint on the key.
9. The run report is logged or inspected.
10. The scheduler prevents overlapping writes to the same feature table.

The package handles the repeated database mechanics, but these operational
decisions still belong to the user or the surrounding production system.

## What to remember

Production use is mostly about making the feature workflow predictable. Develop
definitions locally, use a scratch table for small end-to-end tests, inspect the
run report, choose the correct refresh mode, and be intentional about schema
changes.

For regular repeated jobs, `fd_run()` is the main entry point. The supporting
functions, especially `fd_compute()`, `fd_fetch()`, and `fd_upsert()`, are there
when you need to inspect or test individual stages.

```{r cleanup, include = FALSE}
dbDisconnect(con)
```
