Skip to main content

Process text file concurrently

 Process the same input file concurrently Let's say we have a text file that contains 2 different types of lines.  For the sake of example, one group starts with "i:" (for Integer) and the other group starts with "s:" (for String), something like the following file: s:New york s:Apple i:387548 s:Amsterdam i:4556 i:39874 s:Orange i:56787 s:Banana i:4657567 s:Turkey i:45679456 s:Iran i:4356456 i:23423 i:456 s:Ukraine i:453645 i:5456 We want to process these input lines separately but concurrently. And the process can be anything, for this example, we just print them to the console. But what matters is that we open the file only for READ so 2 different processes (or routines) can read the file at the same time. The Cats Effect answer to open files (or in general any resources) in a safe and efficient way, is  Resource class. So we need a function to take the path to the file and give us a Resource of some type that allows us to read the file line by line. One o...

Process text file concurrently

 Process the same input file concurrently

Let's say we have a text file that contains 2 different types of lines. 


For the sake of example, one group starts with "i:" (for Integer) and the other group starts with "s:" (for String), something like the following file:

s:New york
s:Apple
i:387548
s:Amsterdam
i:4556
i:39874
s:Orange
i:56787
s:Banana
i:4657567
s:Turkey
i:45679456
s:Iran
i:4356456
i:23423
i:456
s:Ukraine
i:453645
i:5456

We want to process these input lines separately but concurrently. And the process can be anything, for this example, we just print them to the console. But what matters is that we open the file only for READ so 2 different processes (or routines) can read the file at the same time.

The Cats Effect answer to open files (or in general any resources) in a safe and efficient way, is Resource class.

So we need a function to take the path to the file and give us a Resource of some type that allows us to read the file line by line. One of the classes on JVM that allows us to do so is java.util.Scanner. So we need a function that takes the file path and gives us a Resource of type Scanner.
def openFile(path: String): Resource[IO, Scanner] = {
  Resource.make {
    IO(new Scanner(new FileReader(new File(path))))
  }(scanner => IO(scanner.close()))
}
The first generic parameter of type Resource is the type of effect, usually IO, and the second parameter is the type of the resource, in our case, Scanner.

The make method of Resource has 2 parameter lists, first one is an IO object that gives us the resource, in this case, IO(new Scanner(new FileReader(new File(path)))), and the second one is a function that takes the resource and releases it, as an IO, in this case, scanner.close().
Next, we need a function that takes a file or better said a Resource of type Scanner, and returns a List of Strings. 
Our function needs to take a second parameter for determining what kind of lines it should return. (integer lines or string lines). We can put this second parameter as the second parameter list so we can pass it to the function later for better re-use. So our function signature will look like this:
def readLines(file: Resource[IO, Scanner])(lineFilter: String => Boolean): IO[List[String]] = ???
The second parameter is a function that takes a String (a line) and returns a Boolean. We can have both required instances of such function implemented as follows:
def intLineFilter(line: String): Boolean = line.startsWith("i:")

def stringLineFilter(line: String): Boolean = line.startsWith("s:")
Back to our readLines function, this function needs a nested loop function to be called recursively to achieve the looping until it reaches the end of the file.
def readLines(file: Resource[IO, Scanner])(lineFilter: String => Boolean): IO[List[String]] = {

  def loop(scanner: Scanner, result: List[String]): List[String] = {
    if (scanner.hasNextLine) {
      val line = scanner.nextLine()
      if (lineFilter(line))
        loop(scanner, result :+ line)
      else
        loop(scanner, result)
    } else result
  }

  file.use(scanner => IO(loop(scanner, List.empty)))
}
Note that the method use of a Resource gives you the underlying resource, in this case, the scanner object.
Now we need to process those lines, in our case just print them into the console, so we need a reusable function to process both groups of inputs because the process is the same for both. So we can write a function that takes the resource of Scanner as the first parameter list, and a filter as the second parameter list.
def printLines(file: Resource[IO, Scanner])(lineFilter: String => Boolean): IO[Unit] = {
  for {
    lines <- readLines(file)(lineFilter)
    _ <- IO(lines.foreach { line =>
      println(line)
      IO.sleep(100.millis)
    })
  } yield ()
}
Note that IO.sleep(100.millis) is not really required, but we put it there to see concurrency better!

The final function that we need to write is the function that does the main work! If we want to process the two groups of inputs sequentially we can write the process function as follows:
def processFile(path: String): IO[Unit] = {
  val printLinesByFilter = printLines(openFile(path))(_)

  for {
    _ <- printLinesByFilter(intLineFilter)
    _ <- printLinesByFilter(stringLineFilter)
  } yield ()
}
But of course, we do not want that! We are here to process the file concurrently, so we should use the Cats Effect's Fiber! So we simply achieve multi-threading as follows: 
def processFileConcurrent(path: String): IO[Unit] = {
  val printLinesByFilter = printLines(openFile(path))(_)

  for {
    fib1 <- printLinesByFilter(intLineFilter).start
    fib2 <- printLinesByFilter(stringLineFilter).start
    _ <- fib1.join
    _ <- fib2.join
  } yield ()
}

The complete program:

package com.blogspot.scaland7.catseffects

import cats.effect.{IO, IOApp, Resource}

import java.io.{File, FileReader}
import java.util.Scanner
import scala.io.{BufferedSource, Source}
import scala.concurrent.duration._

object ConcurrentFileReading extends IOApp.Simple {

  private def openFile(path: String): Resource[IO, Scanner] = {
    Resource.make {
      IO(new Scanner(new FileReader(new File(path))))
    }(scanner => IO(scanner.close()))
  }

  private def readLines(file: Resource[IO, Scanner])(lineFilter: String => Boolean): IO[List[String]] = {

    def loop(scanner: Scanner, result: List[String]): List[String] = {
      if (scanner.hasNextLine) {
        val line = scanner.nextLine()
        if (lineFilter(line))
          loop(scanner, result :+ line)
        else
          loop(scanner, result)
      } else result
    }

    file.use(scanner => IO(loop(scanner, List.empty)))
  }

  private def intLineFilter(line: String): Boolean = line.startsWith("i:")

  private def stringLineFilter(line: String): Boolean = line.startsWith("s:")

  private def printLines(file: Resource[IO, Scanner])(lineFilter: String => Boolean): IO[Unit] = {
    for {
      lines <- readLines(file)(lineFilter)
      _ <- IO(lines.foreach { line =>
        println(line)
        IO.sleep(100.millis)
      })
    } yield ()
  }

  private def processFile(path: String): IO[Unit] = {
    val printLinesByFilter = printLines(openFile(path))(_)

    for {
      _ <- printLinesByFilter(intLineFilter)
      _ <- printLinesByFilter(stringLineFilter)
    } yield ()
  }

  private def processFileConcurrent(path: String): IO[Unit] = {
    val printLinesByFilter = printLines(openFile(path))(_)

    for {
      fib1 <- printLinesByFilter(intLineFilter).start
      fib2 <- printLinesByFilter(stringLineFilter).start
      _ <- fib1.join
      _ <- fib2.join
    } yield ()
  }

  override def run: IO[Unit] = {
    val path = "/path/to/datafile"

    // sequential
    // processFile(path)

    // concurrent
    processFileConcurrent(path)
  }
}

Comments

Popular posts from this blog

Functional algorithm to find the position of a sub-string in a string

Let's practice some functional problem-solving!  Consider the classic problem of finding the index of a sub-string in a larger string. So we want to write a function that if we pass it the following strings it returns 0 . "hello world", "hello" If we pass it the following strings it returns 6 "hello world", "world" If we pass it the following strings it returns None , because "apple" can not be found inside "hello world. "hello world", "apple" So our function signature should look like this: def findFirstSubString(str: String, subStr: String): Option[Int] = ??? If we want to implement it in an imperative way, and yes Scala allows imperative programming, we can implement the function as follows, but we don't want imperative programming, do we? def findFirstSubString_bad(str: String, subStr: String): Option[Int] = { var j = 0; var temp = "" for (i So let's find the functional algorith...

Find a pair in an array for the given sum

Algorithm: Find a pair in an array of numbers for a given sum Given an Array (or List) and a sum, we want to find a pair of numbers whose sum will be equal to the given sum.  For example for the [1,4,7] and 5 the outcome should be pair of 1 and 4. Or for [5, 7, 2, 8, 3] and 15 the outcome should be pair of 7 and 8.  Both pairs of 1 and 5, and 2 and 4 are correct for the input of [2, 4, 1, 5] and 6. The function signature looks like this: def findPairForSum(list: List[Int], sum: Int): Option[(Int, Int)] = ??? The return type is  Option[(Int, Int)] because there might be no pair at all. There are two approaches to solving this problem: The brute force approach which is the simplest and most naive one The first sort approach which is more efficient and a bit more complex Naive approach We can check the given sum against all possible pair combinations. In imperative programming, it would be the famous i and j for loops. But in functional programming, we can implement this us...

Functional algorithm to find how many times a sub-string occurs in a string

Another String search problem that we like to solve in a functional way. This problem is very similar to the previous one,  Functional algorithm to find the position of a sub-string in a string , but it is a bit easier in the sense that we can always return an Int , so we do not have to return an Option of Int , because the number of the occurrences of a String in another String is always a non-negative number. So the signature of the function that we want to write looks like this: def countSubstring(str: String, subString: String): Int = ??? ⚠️ Be careful about overthinking the problems and always remain within the scope, in this case, it is very easy to overthink this problem by mixing up the terms sub-string and word . We only care about sub-strings and our algorithm does not know about words, meaning that our function should return the number 3 ( and not 2 )  if we pass it the following parameter: "This book is the best book among my books", "book" As we saw...