Batch 'em up, Move 'em on

2024-08-28

I really like working with fs2. I like saving little code examples for myself. I want to use the blog more and get out of the habit of reaching for gists. Also I need somewhere to put my intrusive-thought song parodies1.

The Problem

We have a stream of “things”. We want to batch those things up, and do something to each batch. We need to count which batch we’re on. Oh and also, if the stream of “things” is empty, we want to fallback to some other operation.

The Code

 1import cats.effect.IO
 2import fs2.Stream
 3
 4def getAndProcessThings(batchSize: Int): IO[Unit] = {
 5  val things: Stream[IO, String] = Stream("a", "b", "c", "d", "e", "f", "g", "h", "i", "j").covary[IO]
 6  val fallback: Stream[IO, Unit] = Stream.eval(IO.println("falling back"))
 7
 8  things
 9    .chunkN(batchSize) // split stream into chunks of batchSize
10    .zipWithIndex // zip the chunks together with an index
11    .evalMap { case (chunk, batchNumber) =>
12      IO.println(s"Processing batch $batchNumber: $chunk")
13    }
14    .ifEmpty(fallback)
15    .compile
16    .drain
17}

If that function is run with a batch size of 3, we’ll get the following output

1processing batch number 0; Chunk(a, b, c)
2processing batch number 1; Chunk(d, e, f)
3processing batch number 2; Chunk(g, h, i)
4processing batch number 3; Chunk(j)

If things was instead set to a value of Stream.empty, we’ll instead see falling back printed.


  1. Rawhidddeeeee ↩︎