Blog

Introduction to Akka Streams

Yiğitcan Aydoğmuş
Yiğitcan Aydoğmuş
-Jul 29, 2022

At Intenseye, we frequently encounter the need to perform bulk operations. These tasks may involve changing the structure of stored data, migration, or a cleanup. Whatever the case may be, we need to make API calls, DB calls or maybe both. These APIs may or may not have a rate limitation. And making a huge amount of DB calls in the production environment always poses a risk to disrupt the flow of the systems. Another problem might be if the required operations contain time-consuming actions such as complex calculations; then it might take forever to complete even though it doesn’t increase the load of DB or make extensive API calls.

Thus, the problems mentioned above necessitate the ability to configure the operation’s throughput. That’s where Akka streams come in. Akka streams offer a safe way to do asynchronous, non-blocking backpressure stream processing and they have Scala and Java compatibility. At Intenseye, we are using Scala so it is a perfect fit for us. You can find more detailed info about the Akka streams here.

To simply put, you have a Source that is the entry point of your stream. You need to have at least one Source to use Akka streams. You can create a simple source with the following:

val source = Source(1 to 10)

Then, you have a Sink which is the exit point of your stream. Sink is also a must-have to be able to use Akka streams. You can create a simple source with the following:

val sink = Sink.foreach[Int](println)

And lastly, you have a Flow which is the operations that will be applied to your stream. You can create a simple source with the following:

val multiply = Flow[Int].map(x => x * 2)

Now, you can process your stream. Here is the “hello world.” of Akka streams:

<code>import akka.actor.ActorSystem
import akka.stream.scaladsl.{Flow, Sink, Source}
object AkkaStreams {
  def main(args: Array[String]): Unit = {
    implicit val system = ActorSystem()
    val source = Source(1 to 10)
    val sink = Sink.foreach[Int](println)
    val multiply = Flow[Int].map(x => x * 2)
    source
      .via(multiply)
      .runWith(sink)
} }

And it will print out:

2
4
6
8
10
12
14
16
18
20

(Note if you are using an earlier version before Akka 2.6, you may also need a metarializer)

Now that we know the basics of Akka streams, let’s use them in real-life scenarios that we faced.

At Intenseye, we are using reactive programming. Thus, every API call we make or every DB call returns a Future. Since the responses are Futures, we will use Source.future() method.

Source.future(service.findAll)

We have demo accounts to present our platform to customers or use them in our local development. We need to create some sample alerts. Since we have multiple facilities, cameras, alert rules types, and date ranges (daily, weekly, monthly). We need to create thousands of alerts to be able to use them effectively. Here is a simple scenario.

Let’s assume we will fetch all the alert rules from DB and create alerts for each rule.

val alertRules = alertRuleService.findAll

which returns a sequence of alert rules in future form Future[Seq[AlertRule]] . Since we need alert rules passed down downstream individually instead of the complete sequence. Therefore, we need to flatten the sequence. Thus, the implementation will be:

Source
.future(alertRuleService.findAll)
.mapConcat(identity)

Now, we will have individual alert rules passed down downstream, we can begin the create alerts. Instead of creating a separate Flow, we will use a .map() function, to be a more precise .mapAsync() function. There are two variants of the .mapAsync() function which are: here and here

If the ordering is important for your operation you can use .mapAsync(). If you value throughput over order, then you can use .mapAsyncUnordered(). In our case, ordering is not important so we will use .mapAsyncUnordered()

Source.future(alertRuleService.findAll)
      .mapConcat(identity)
      .mapAsyncUnordered { alertRule =>
        val alerts = createAlertsForMonthForAlertRule(alertRule, month, year)
        alertService.insert(alerts)
}
      .runWith(Sink.ignore)
  }

We don’t care about returning a result after the operation. Thus we used Sink.ignore

We now successfully generated alerts for that specific month, year, and alert rule. But the number of alert rules that need to be processed can be relatively large so this operation can take quite a time. We can speed up this operation by parallelism. We can parallelize this operation by giving parallelism parameters to the .mapAsyncUnordered() function. Let’s modify our code:

Source.future(alertRuleService.findAll)
      .mapConcat(identity)
      .mapAsyncUnordered(5) { alertRule =>
        val alerts = createAlertsForMonthForAlertRule(alertRule, month, year)
        alertService.insert(alerts)
}
      .runWith(Sink.ignore)
  }

We give the parallelism parameter as 5, so now our operation will be performed concurrently.

Now let’s look at another problem. This time, we want to limit the number of elements processed per time unit to avoid rate limitations or exhaust DB connections. Assume we want to connect a camera stream via a cameraService. That cameraService connects to the live stream of the camera and emits frames. Then our code will be:

Source
 .future(cameraService.getFrames)
 .mapAsync { frame =>
    // do something
 }
 .runWith(Sink.ignore)

We can define our requirements as follow:

  • We want to process 24 frames per second with 1080p resolution
  • Do some operations to frames and generate a result
  • Save the result to DB.

What we need to do:

  • Limit the number of frames,
  • Convert the resolution of the frame,
  • Do operations and prepare the result and save the result to the DB.

For the sake of the example, we will assume we are calling an external API to convert the resolution of the frame. To limit the number of elements processed per time unit, we can use .throttle() method. So our code will be:

Source
 .future(cameraService.getFrames)
 .throttle(24, 1.second)
 .mapAsync { frame =>
    val resizedFrame = frameResizeService.resize(frame, 1080)
    resizedFrame
      .map(f =>
         val result = // do some operations
         frameService.saveResult(result)
) }
 .runWith(Sink.ignore)

By using .throttle() you can tackle the following possible problems that might face

1- The source may be sending more than you can handle or need.
2- Your operations may have external API requests which may have rate limitations. Then your API calls may return HTTP - 429 Too Many Requests error.
3- Your operations may require a number of DB calls. All these calls might exhaust the DB connections which disrupt or even crash the whole system.

4- Depending on your requirement you may want to pile a bunch of elements before beginning your operations, then you can extend the time unit and wait for a sufficient number of elements to be collected.

In this post, we looked at the usage of Akka streams. We examined how to configure your streams according to your requirements. We can speed up the operation by using parallelism or we can reduce the number of the elements that are passed down the downstream to throttle the operations.

#Blog Post
#intenseye
Schedule a Demo