Refs and Streams 2: Updating with an Endpoint

2023-02-09

Oh, hello. Lets continue with our ref and stream experiment by writing and serving a minimal API with http4s.

The Problem, A Recap

From the first post:

We have some data, rows of recommendations. The data is stored on disk, and we want to write an API that will return rows based on some query. The data, for now, is small enough that it can be read into memory when we start the server. A different process will update the data on disk however, and we need that update to propagate to the server without disrupting running requests. The update is the piece I’ve been working on.

I’m continuing from last time by focusing now on the API:

  • how to use the Ref to construct a response?
  • how to update the Ref via a request?
  • how to link the http4s Server with the scheduled updates?

The Pieces

The method to update the Ref needed a few changes. The prog to do the actual updating is good; however instead of running mulitple streams like in the previous example, we’ll be using multiple resources. So now I’ve decided to compile the stream to a resource and return a Resource[IO, Unit].

1def scheduledUpdater(ref: Ref[IO, Int]): Resource[IO, Unit] = {
2  val prog = ref
3    .updateAndGet(i => i + 1)
4    .flatMap(i => IO.println(s"scheduled update: added 1, ref value is now $i"))
5  Stream.repeatEval(prog).metered(1.minute).compile.resource.drain
6}

Instead of just printing the Ref value on a schedule, I wanted to make a curl request for the current value. I also wanted to be able to set the value of the Ref arbitrarily via a request as well. For that I wrote a 2-endpoint service:

 1def refService(ref: Ref[IO, Int]) = HttpRoutes.of[IO] {
 2  case GET -> Root / "ref" =>
 3    for {
 4      i    <- ref.get
 5      resp <- Ok(s"Current ref value is: $i")
 6    } yield resp
 7  case PUT -> Root / "ref" / IntVar(value) =>
 8    for {
 9      _    <- ref.set(value)
10      resp <- Ok(s"Updated the ref value to: $value")
11    } yield resp
12}

Finally, similar to last time, the tricky part was figuring out how to stitch the scheduled update together with something else (in this case the Server resource). The real MVP here is the call to Resource.both, which “allocates two resources concurrently, and combines their results in a tuple.”1 Then I can call .useForever on the tuple to have a forever-running server and updater.

 1def run(args: List[String]): IO[ExitCode] = {
 2  val serverAndUpdater: Resource[IO, (Server, Unit)] = for {
 3    ref <- Resource.eval(Ref[IO].of(1))
 4    httpApp = Router("/" -> refService(ref)).orNotFound
 5    updater = scheduledUpdater(ref)
 6    server = EmberServerBuilder
 7      .default[IO]
 8      .withHost(ipv4"0.0.0.0")
 9      .withPort(port"8080")
10      .withHttpApp(httpApp)
11      .build
12    serverAndUpdater <- Resource.both(server, updater)
13  } yield serverAndUpdater
14
15  serverAndUpdater.useForever.as(ExitCode.Success)
16}

All The Code Together

All the pieces come together like so:2

 1//> using lib "co.fs2::fs2-core:3.4.0"
 2//> using lib "org.typelevel::cats-effect:3.4.4"
 3//
 4//> using lib "org.http4s::http4s-core:0.23.17"
 5//> using lib "org.http4s::http4s-dsl:0.23.17"
 6//> using lib "org.http4s::http4s-ember-server:0.23.17"
 7//> using lib "org.http4s::http4s-server:0.23.17"
 8
 9import fs2.Stream
10import cats.effect.IO
11import cats.effect.IOApp
12import scala.concurrent.duration._
13import cats.effect.kernel.Ref
14
15import org.http4s.HttpRoutes
16import org.http4s.dsl.io._
17import org.http4s.implicits._
18import org.http4s.server.Router
19import org.http4s.ember.server.EmberServerBuilder
20
21import com.comcast.ip4s._
22import cats.effect.ExitCode
23import cats.effect.kernel.Resource
24import org.http4s.server.Server
25
26object ScheduledEndpoint extends IOApp {
27
28  def refService(ref: Ref[IO, Int]) = HttpRoutes.of[IO] {
29    case GET -> Root / "ref" =>
30      for {
31        i    <- ref.get
32        resp <- Ok(s"Current ref value is: $i")
33      } yield resp
34    case PUT -> Root / "ref" / IntVar(value) =>
35      for {
36        _    <- ref.set(value)
37        resp <- Ok(s"Updated the ref value to: $value")
38      } yield resp
39  }
40
41  def scheduledUpdater(ref: Ref[IO, Int]): Resource[IO, Unit] = {
42    val prog = ref
43      .updateAndGet(i => i + 1)
44      .flatMap(i => IO.println(s"scheduled update: added 1, ref value is now $i"))
45    Stream.repeatEval(prog).metered(1.minute).compile.resource.drain
46  }
47
48  def run(args: List[String]): IO[ExitCode] = {
49    val serverAndUpdater: Resource[IO, (Server, Unit)] = for {
50      ref <- Resource.eval(Ref[IO].of(1))
51      httpApp = Router("/" -> refService(ref)).orNotFound
52      updater = scheduledUpdater(ref)
53      server = EmberServerBuilder
54        .default[IO]
55        .withHost(ipv4"0.0.0.0")
56        .withPort(port"8080")
57        .withHttpApp(httpApp)
58        .build
59      serverAndUpdater <- Resource.both(server, updater)
60    } yield serverAndUpdater
61
62    serverAndUpdater.useForever.as(ExitCode.Success)
63  }
64
65}

And with that running, I can curl for the current value of the ref:

1❯ curl 0.0.0.0:8080/ref
2Current ref value is: 1%

Letting it sit and run for a while, I can see the ref getting updated:

1scheduled update: added 1, ref value is now 2
2scheduled update: added 1, ref value is now 3

I can get the updated ref value, and PUT a brand new value too:

1❯ curl 0.0.0.0:8080/ref
2Current ref value is: 3%
3
4❯ curl -X PUT 0.0.0.0:8080/ref/42
5Updated the ref value to: 42%
6
7❯ curl 0.0.0.0:8080/ref
8Current ref value is: 42%

Neat! At $work, the update function was a lot more complicated because it was a streaming request to read from a GCS bucket (and a lot of error handling to go with it), and the lookup was more complicated because we have more than just an integer. BUT the bones of this solution are what I’ve shipped and it’s going pretty well (touch wood, throw some salt over your left shoulder, etc. etc.).


  1. Check out the API docs (which I’m quoting) for more details ↩︎

  2. As before, this code is available as a gist. You can run it directly using scala-cli↩︎