Streaming responses (chunked HTTP, SSE)

A normal route handler builds the entire response body in one shot and returns it. Sometimes you need the opposite — start sending bytes before the work is done, and keep emitting more over time. Common cases: a Server-Sent-Events feed, an LLM token stream, an NDJSON log tail, anything where the client cares about first byte time more than last byte time.

drogonR supports this through dr_stream() and the SSE convenience wrapper dr_stream_sse().

How it works

A streaming handler returns a drogon_stream value (instead of a normal response list). The dispatcher recognises the class, opens a HTTP chunked-transfer response on Drogon’s side, and from then on calls your next_chunk() function on the main R thread, one chunk at a time. Each call returns the bytes to send and a done flag.

   R handler returns drogon_stream
                |
                v
   Drogon sends 200 + chunked headers ────► client
                |
                v
   pump: next_chunk(state, cancelled = FALSE)
                |  returns list(chunk, state, done = FALSE)
                v
   Drogon sends one chunk             ────► client
                |
                v
   pump: next_chunk(state, …)
                ...
                |  returns list(..., done = TRUE)
                v
   Drogon closes the chunked response ────► client

If the client disconnects mid-stream, the dispatcher catches the close event from Drogon and runs your next_chunk() one final time with cancelled = TRUE, so you can free state. The stream is then torn down regardless of what that final call returns.

dr_stream() — the base API

library(drogonR)

app <- dr_app() |>
  dr_get("/numbers", function(req) {
    dr_stream(
      state = list(i = 0L, n = 10L),
      next_chunk = function(state, cancelled) {
        if (cancelled || state$i >= state$n) {
          return(list(chunk = "", state = state, done = TRUE))
        }
        state$i <- state$i + 1L
        list(
          chunk = sprintf("%d\n", state$i),
          state = state,
          done  = FALSE)
      },
      content_type = "text/plain")
  })

dr_serve(app, port = 8080L)

next_chunk() always returns list(chunk = , state = , done = ). chunk is sent verbatim — format SSE / NDJSON / whatever yourself, or use one of the helpers built on top.

dr_stream_sse() — Server-Sent Events

90% of streaming endpoints just emit data: frames. dr_stream_sse() takes care of:

app <- dr_app() |>
  dr_get("/sse", function(req) {
    dr_stream_sse(
      state = list(i = 0L, n = 5L),
      generator = function(state, cancelled) {
        if (cancelled || state$i >= state$n) {
          return(list(data = "", state = state, done = TRUE))
        }
        state$i <- state$i + 1L
        list(
          data  = sprintf("tick %d", state$i),
          state = state,
          done  = FALSE)
      })
  })

dr_serve(app, port = 8080L)

Test it:

curl -N http://127.0.0.1:8080/sse
# data: tick 1
#
# data: tick 2
# ...

Need event:, id:, or retry:? Use dr_stream() directly and build the frame yourself — the helper deliberately keeps to just data:.

Threading: keep each pump short

next_chunk() always runs on the main R thread. R is single-threaded, so this is the only place it could safely run. Heavy work inside one pump blocks every other request and every other stream until it returns.

If you have CPU-bound work, split it across many pumps (carry progress in state). If you have blocking I/O — read it on a worker process and pass results in via state updates from outside, not from inside the pump.

Cancellation contract

When the client goes away, next_chunk() is called exactly once with cancelled = TRUE. Use it to release per-stream resources (file handles, DB cursors, accumulated buffers). The return value is ignored — the stream is torn down either way.

generator <- function(state, cancelled) {
  if (cancelled) {
    if (!is.null(state$conn)) close(state$conn)
    return(list(data = "", state = state, done = TRUE))
  }
  # ... normal path ...
}

The notification arrives as soon as Drogon’s I/O thread sees the TCP close (epoll on Linux, kqueue on BSD/macOS), not on the next attempted send — drogonR carries a small Drogon patch (setUserCloseCallback) that wires this up. Without it, small SSE chunks would keep ticking long after the kernel had silently absorbed the writes for a closed connection.

Errors inside next_chunk()

If your generator raises an R error mid-stream, the dispatcher catches it (R_tryEval), prints drogonR stream: next_chunk() raised an error; closing stream to stderr, and tears the session down. Headers are already on the wire by then, so the client sees a truncated chunked response — there is no way to send a 500 once streaming has started. The server stays up and other requests are unaffected. If you want a custom on-error behaviour, tryCatch() inside the generator yourself and return a final error frame plus done = TRUE.

Middleware does not wrap individual chunks

[dr_use()] middleware runs once, when the request enters R and the handler returns the drogon_stream object. After that, the dispatcher pumps next_chunk() directly — middleware is not called per chunk, and [dr_on_error()] is not invoked for errors raised inside next_chunk() (those are handled as described above). If you need per-chunk hooks (metrics, mutation), wrap your generator yourself.

Native (C / C++) streaming

For LLM token streams or any other case where R-side overhead is the bottleneck, register a streaming handler that bypasses R entirely:

app <- dr_app() |>
  dr_get_cpp_stream("/v1/generate",
                    package  = "myllmbackend",
                    callable = "generate")

The backend implements drogonr_stream_handler_t from <drogonR.h> (shipped under inst/include/) and pushes chunks via dr_send_chunk() / dr_close_chunk() on a drogonR worker thread. R-side middleware and the [dr_on_error()] hook do not apply — the request never enters R. See ?dr_routes_cpp_stream.

Caveats