A normal route handler builds the entire response body in one shot and returns it. Sometimes you need the opposite — start sending bytes before the work is done, and keep emitting more over time. Common cases: a Server-Sent-Events feed, an LLM token stream, an NDJSON log tail, anything where the client cares about first byte time more than last byte time.
drogonR supports this through dr_stream() and the SSE
convenience wrapper dr_stream_sse().
A streaming handler returns a drogon_stream value
(instead of a normal response list). The dispatcher recognises the
class, opens a HTTP chunked-transfer response on Drogon’s side, and from
then on calls your next_chunk() function on the main R
thread, one chunk at a time. Each call returns the bytes to send and a
done flag.
R handler returns drogon_stream
|
v
Drogon sends 200 + chunked headers ────► client
|
v
pump: next_chunk(state, cancelled = FALSE)
| returns list(chunk, state, done = FALSE)
v
Drogon sends one chunk ────► client
|
v
pump: next_chunk(state, …)
...
| returns list(..., done = TRUE)
v
Drogon closes the chunked response ────► client
If the client disconnects mid-stream, the dispatcher catches the
close event from Drogon and runs your next_chunk() one
final time with cancelled = TRUE, so you can free state.
The stream is then torn down regardless of what that final call
returns.
dr_stream() — the base APIlibrary(drogonR)
app <- dr_app() |>
dr_get("/numbers", function(req) {
dr_stream(
state = list(i = 0L, n = 10L),
next_chunk = function(state, cancelled) {
if (cancelled || state$i >= state$n) {
return(list(chunk = "", state = state, done = TRUE))
}
state$i <- state$i + 1L
list(
chunk = sprintf("%d\n", state$i),
state = state,
done = FALSE)
},
content_type = "text/plain")
})
dr_serve(app, port = 8080L)next_chunk() always returns
list(chunk = , state = , done = ). chunk is
sent verbatim — format SSE / NDJSON / whatever yourself, or use one of
the helpers built on top.
dr_stream_sse() — Server-Sent Events90% of streaming endpoints just emit data: frames.
dr_stream_sse() takes care of:
data: <line>\n\n, with multi-line
data automatically split per the spec),Content-Type: text/event-stream,
Cache-Control: no-cache, X-Accel-Buffering: no
so reverse proxies don’t buffer).app <- dr_app() |>
dr_get("/sse", function(req) {
dr_stream_sse(
state = list(i = 0L, n = 5L),
generator = function(state, cancelled) {
if (cancelled || state$i >= state$n) {
return(list(data = "", state = state, done = TRUE))
}
state$i <- state$i + 1L
list(
data = sprintf("tick %d", state$i),
state = state,
done = FALSE)
})
})
dr_serve(app, port = 8080L)Test it:
Need event:, id:, or retry:?
Use dr_stream() directly and build the frame yourself — the
helper deliberately keeps to just data:.
next_chunk() always runs on the main R
thread. R is single-threaded, so this is the only place it
could safely run. Heavy work inside one pump blocks every other request
and every other stream until it returns.
Sys.sleep(1), or runs a long loop in pure R.If you have CPU-bound work, split it across many pumps (carry
progress in state). If you have blocking I/O — read it on a
worker process and pass results in via state updates from
outside, not from inside the pump.
When the client goes away, next_chunk() is called
exactly once with cancelled = TRUE. Use it
to release per-stream resources (file handles, DB cursors, accumulated
buffers). The return value is ignored — the stream is torn down either
way.
generator <- function(state, cancelled) {
if (cancelled) {
if (!is.null(state$conn)) close(state$conn)
return(list(data = "", state = state, done = TRUE))
}
# ... normal path ...
}The notification arrives as soon as Drogon’s I/O thread sees the TCP
close (epoll on Linux, kqueue on BSD/macOS), not on the next attempted
send — drogonR carries a small Drogon patch
(setUserCloseCallback) that wires this up. Without it,
small SSE chunks would keep ticking long after the kernel had silently
absorbed the writes for a closed connection.
next_chunk()If your generator raises an R error mid-stream, the dispatcher
catches it (R_tryEval), prints
drogonR stream: next_chunk() raised an error; closing stream
to stderr, and tears the session down. Headers are already on the wire
by then, so the client sees a truncated chunked response — there is no
way to send a 500 once streaming has started. The server stays up and
other requests are unaffected. If you want a custom on-error behaviour,
tryCatch() inside the generator yourself and return a final
error frame plus done = TRUE.
[dr_use()] middleware runs once, when the request
enters R and the handler returns the drogon_stream object.
After that, the dispatcher pumps next_chunk() directly —
middleware is not called per chunk, and [dr_on_error()] is
not invoked for errors raised inside next_chunk()
(those are handled as described above). If you need per-chunk hooks
(metrics, mutation), wrap your generator yourself.
For LLM token streams or any other case where R-side overhead is the bottleneck, register a streaming handler that bypasses R entirely:
app <- dr_app() |>
dr_get_cpp_stream("/v1/generate",
package = "myllmbackend",
callable = "generate")The backend implements drogonr_stream_handler_t from
<drogonR.h> (shipped under
inst/include/) and pushes chunks via
dr_send_chunk() / dr_close_chunk() on a
drogonR worker thread. R-side middleware and the [dr_on_error()] hook do
not apply — the request never enters R. See
?dr_routes_cpp_stream.
Connection: close won’t receive any chunks — Drogon
shortcuts to a single non-chunked response in that case.dr_serve() and immediately exit a script —
the call is non-blocking. Standalone Rscript files that
host a server need a loop pump,
e.g. repeat later::run_now(timeoutSecs = 3600), or the
process will exit before the stream produces anything. Interactive R
sessions don’t need this.inst/examples/bench-stream-*.R if you want to gauge
throughput on your hardware.