Process the same input file concurrently Let's say we have a text file that contains 2 different types of lines. For the sake of example, one group starts with "i:" (for Integer) and the other group starts with "s:" (for String), something like the following file: s:New york s:Apple i:387548 s:Amsterdam i:4556 i:39874 s:Orange i:56787 s:Banana i:4657567 s:Turkey i:45679456 s:Iran i:4356456 i:23423 i:456 s:Ukraine i:453645 i:5456 We want to process these input lines separately but concurrently. And the process can be anything, for this example, we just print them to the console. But what matters is that we open the file only for READ so 2 different processes (or routines) can read the file at the same time. The Cats Effect answer to open files (or in general any resources) in a safe and efficient way, is Resource class. So we need a function to take the path to the file and give us a Resource of some type that allows us to read the file line by line. One o...
Process the same input file concurrently
Let's say we have a text file that contains 2 different types of lines.
For the sake of example, one group starts with "i:" (for Integer) and the other group starts with "s:" (for String), something like the following file:
s:New york
s:Apple
i:387548
s:Amsterdam
i:4556
i:39874
s:Orange
i:56787
s:Banana
i:4657567
s:Turkey
i:45679456
s:Iran
i:4356456
i:23423
i:456
s:Ukraine
i:453645
i:5456We want to process these input lines separately but concurrently. And the process can be anything, for this example, we just print them to the console. But what matters is that we open the file only for READ so 2 different processes (or routines) can read the file at the same time.
The Cats Effect answer to open files (or in general any resources) in a safe and efficient way, is Resource class.
So we need a function to take the path to the file and give us a Resource of some type that allows us to read the file line by line. One of the classes on JVM that allows us to do so is java.util.Scanner. So we need a function that takes the file path and gives us a Resource of type Scanner.
def openFile(path: String): Resource[IO, Scanner] = {
Resource.make {
IO(new Scanner(new FileReader(new File(path))))
}(scanner => IO(scanner.close()))
}The first generic parameter of type Resource is the type of effect, usually IO, and the second parameter is the type of the resource, in our case, Scanner.
The make method of Resource has 2 parameter lists, first one is an IO object that gives us the resource, in this case, IO(new Scanner(new FileReader(new File(path)))), and the second one is a function that takes the resource and releases it, as an IO, in this case, scanner.close().
Next, we need a function that takes a file or better said a Resource of type Scanner, and returns a List of Strings.
Our function needs to take a second parameter for determining what kind of lines it should return. (integer lines or string lines). We can put this second parameter as the second parameter list so we can pass it to the function later for better re-use. So our function signature will look like this:
def readLines(file: Resource[IO, Scanner])(lineFilter: String => Boolean): IO[List[String]] = ???The second parameter is a function that takes a String (a line) and returns a Boolean. We can have both required instances of such function implemented as follows:
def intLineFilter(line: String): Boolean = line.startsWith("i:")
def stringLineFilter(line: String): Boolean = line.startsWith("s:")Back to our readLines function, this function needs a nested loop function to be called recursively to achieve the looping until it reaches the end of the file.
def readLines(file: Resource[IO, Scanner])(lineFilter: String => Boolean): IO[List[String]] = {
def loop(scanner: Scanner, result: List[String]): List[String] = {
if (scanner.hasNextLine) {
val line = scanner.nextLine()
if (lineFilter(line))
loop(scanner, result :+ line)
else
loop(scanner, result)
} else result
}
file.use(scanner => IO(loop(scanner, List.empty)))
}Note that the method use of a Resource gives you the underlying resource, in this case, the scanner object.
Now we need to process those lines, in our case just print them into the console, so we need a reusable function to process both groups of inputs because the process is the same for both. So we can write a function that takes the resource of Scanner as the first parameter list, and a filter as the second parameter list.
def printLines(file: Resource[IO, Scanner])(lineFilter: String => Boolean): IO[Unit] = {
for {
lines <- readLines(file)(lineFilter)
_ <- IO(lines.foreach { line =>
println(line)
IO.sleep(100.millis)
})
} yield ()
}Note that IO.sleep(100.millis) is not really required, but we put it there to see concurrency better!
The final function that we need to write is the function that does the main work! If we want to process the two groups of inputs sequentially we can write the process function as follows:
def processFile(path: String): IO[Unit] = {
val printLinesByFilter = printLines(openFile(path))(_)
for {
_ <- printLinesByFilter(intLineFilter)
_ <- printLinesByFilter(stringLineFilter)
} yield ()
}But of course, we do not want that! We are here to process the file concurrently, so we should use the Cats Effect's Fiber! So we simply achieve multi-threading as follows:
def processFileConcurrent(path: String): IO[Unit] = {
val printLinesByFilter = printLines(openFile(path))(_)
for {
fib1 <- printLinesByFilter(intLineFilter).start
fib2 <- printLinesByFilter(stringLineFilter).start
_ <- fib1.join
_ <- fib2.join
} yield ()
}The complete program:
package com.blogspot.scaland7.catseffects
import cats.effect.{IO, IOApp, Resource}
import java.io.{File, FileReader}
import java.util.Scanner
import scala.io.{BufferedSource, Source}
import scala.concurrent.duration._
object ConcurrentFileReading extends IOApp.Simple {
private def openFile(path: String): Resource[IO, Scanner] = {
Resource.make {
IO(new Scanner(new FileReader(new File(path))))
}(scanner => IO(scanner.close()))
}
private def readLines(file: Resource[IO, Scanner])(lineFilter: String => Boolean): IO[List[String]] = {
def loop(scanner: Scanner, result: List[String]): List[String] = {
if (scanner.hasNextLine) {
val line = scanner.nextLine()
if (lineFilter(line))
loop(scanner, result :+ line)
else
loop(scanner, result)
} else result
}
file.use(scanner => IO(loop(scanner, List.empty)))
}
private def intLineFilter(line: String): Boolean = line.startsWith("i:")
private def stringLineFilter(line: String): Boolean = line.startsWith("s:")
private def printLines(file: Resource[IO, Scanner])(lineFilter: String => Boolean): IO[Unit] = {
for {
lines <- readLines(file)(lineFilter)
_ <- IO(lines.foreach { line =>
println(line)
IO.sleep(100.millis)
})
} yield ()
}
private def processFile(path: String): IO[Unit] = {
val printLinesByFilter = printLines(openFile(path))(_)
for {
_ <- printLinesByFilter(intLineFilter)
_ <- printLinesByFilter(stringLineFilter)
} yield ()
}
private def processFileConcurrent(path: String): IO[Unit] = {
val printLinesByFilter = printLines(openFile(path))(_)
for {
fib1 <- printLinesByFilter(intLineFilter).start
fib2 <- printLinesByFilter(stringLineFilter).start
_ <- fib1.join
_ <- fib2.join
} yield ()
}
override def run: IO[Unit] = {
val path = "/path/to/datafile"
// sequential
// processFile(path)
// concurrent
processFileConcurrent(path)
}
}

Comments
Post a Comment