Refs and Streams 2: Updating with an Endpoint
2023-02-09
Oh, hello. Lets continue with our ref and stream experiment by writing and serving a minimal API with http4s.
The Problem, A Recap
From the first post:
We have some data, rows of recommendations. The data is stored on disk, and we want to write an API that will return rows based on some query. The data, for now, is small enough that it can be read into memory when we start the server. A different process will update the data on disk however, and we need that update to propagate to the server without disrupting running requests. The update is the piece I’ve been working on.
I’m continuing from last time by focusing now on the API:
- how to use the
Ref
to construct a response? - how to update the
Ref
via a request? - how to link the http4s
Server
with the scheduled updates?
The Pieces
The method to update the Ref
needed a few changes. The prog
to do the actual updating is good;
however instead of running mulitple streams like in the previous example, we’ll be using multiple
resources. So now I’ve decided to compile the stream to a resource and return a Resource[IO, Unit]
.
1def scheduledUpdater(ref: Ref[IO, Int]): Resource[IO, Unit] = {
2 val prog = ref
3 .updateAndGet(i => i + 1)
4 .flatMap(i => IO.println(s"scheduled update: added 1, ref value is now $i"))
5 Stream.repeatEval(prog).metered(1.minute).compile.resource.drain
6}
Instead of just printing the Ref
value on a schedule, I wanted to make a curl request for the
current value. I also wanted to be able to set the value of the Ref
arbitrarily via a request as
well. For that I wrote a 2-endpoint service:
1def refService(ref: Ref[IO, Int]) = HttpRoutes.of[IO] {
2 case GET -> Root / "ref" =>
3 for {
4 i <- ref.get
5 resp <- Ok(s"Current ref value is: $i")
6 } yield resp
7 case PUT -> Root / "ref" / IntVar(value) =>
8 for {
9 _ <- ref.set(value)
10 resp <- Ok(s"Updated the ref value to: $value")
11 } yield resp
12}
Finally, similar to last time, the tricky part was figuring out how to stitch the scheduled update
together with something else (in this case the Server
resource). The real MVP here is the call to
Resource.both
, which “allocates two resources concurrently, and combines their results in a
tuple.”1 Then I can call .useForever
on the tuple to have a forever-running server and
updater.
1def run(args: List[String]): IO[ExitCode] = {
2 val serverAndUpdater: Resource[IO, (Server, Unit)] = for {
3 ref <- Resource.eval(Ref[IO].of(1))
4 httpApp = Router("/" -> refService(ref)).orNotFound
5 updater = scheduledUpdater(ref)
6 server = EmberServerBuilder
7 .default[IO]
8 .withHost(ipv4"0.0.0.0")
9 .withPort(port"8080")
10 .withHttpApp(httpApp)
11 .build
12 serverAndUpdater <- Resource.both(server, updater)
13 } yield serverAndUpdater
14
15 serverAndUpdater.useForever.as(ExitCode.Success)
16}
All The Code Together
All the pieces come together like so:2
1//> using lib "co.fs2::fs2-core:3.4.0"
2//> using lib "org.typelevel::cats-effect:3.4.4"
3//
4//> using lib "org.http4s::http4s-core:0.23.17"
5//> using lib "org.http4s::http4s-dsl:0.23.17"
6//> using lib "org.http4s::http4s-ember-server:0.23.17"
7//> using lib "org.http4s::http4s-server:0.23.17"
8
9import fs2.Stream
10import cats.effect.IO
11import cats.effect.IOApp
12import scala.concurrent.duration._
13import cats.effect.kernel.Ref
14
15import org.http4s.HttpRoutes
16import org.http4s.dsl.io._
17import org.http4s.implicits._
18import org.http4s.server.Router
19import org.http4s.ember.server.EmberServerBuilder
20
21import com.comcast.ip4s._
22import cats.effect.ExitCode
23import cats.effect.kernel.Resource
24import org.http4s.server.Server
25
26object ScheduledEndpoint extends IOApp {
27
28 def refService(ref: Ref[IO, Int]) = HttpRoutes.of[IO] {
29 case GET -> Root / "ref" =>
30 for {
31 i <- ref.get
32 resp <- Ok(s"Current ref value is: $i")
33 } yield resp
34 case PUT -> Root / "ref" / IntVar(value) =>
35 for {
36 _ <- ref.set(value)
37 resp <- Ok(s"Updated the ref value to: $value")
38 } yield resp
39 }
40
41 def scheduledUpdater(ref: Ref[IO, Int]): Resource[IO, Unit] = {
42 val prog = ref
43 .updateAndGet(i => i + 1)
44 .flatMap(i => IO.println(s"scheduled update: added 1, ref value is now $i"))
45 Stream.repeatEval(prog).metered(1.minute).compile.resource.drain
46 }
47
48 def run(args: List[String]): IO[ExitCode] = {
49 val serverAndUpdater: Resource[IO, (Server, Unit)] = for {
50 ref <- Resource.eval(Ref[IO].of(1))
51 httpApp = Router("/" -> refService(ref)).orNotFound
52 updater = scheduledUpdater(ref)
53 server = EmberServerBuilder
54 .default[IO]
55 .withHost(ipv4"0.0.0.0")
56 .withPort(port"8080")
57 .withHttpApp(httpApp)
58 .build
59 serverAndUpdater <- Resource.both(server, updater)
60 } yield serverAndUpdater
61
62 serverAndUpdater.useForever.as(ExitCode.Success)
63 }
64
65}
And with that running, I can curl for the current value of the ref:
1❯ curl 0.0.0.0:8080/ref
2Current ref value is: 1%
Letting it sit and run for a while, I can see the ref getting updated:
1scheduled update: added 1, ref value is now 2
2scheduled update: added 1, ref value is now 3
I can get the updated ref value, and PUT
a brand new value too:
1❯ curl 0.0.0.0:8080/ref
2Current ref value is: 3%
3
4❯ curl -X PUT 0.0.0.0:8080/ref/42
5Updated the ref value to: 42%
6
7❯ curl 0.0.0.0:8080/ref
8Current ref value is: 42%
Neat! At $work
, the update function was a lot more complicated because it was a streaming request to read from a
GCS bucket (and a lot of error handling to go with it), and the lookup was more complicated because
we have more than just an integer. BUT the bones of this solution are what I’ve shipped and it’s
going pretty well (touch wood, throw some salt over your left shoulder, etc. etc.).