Showing posts with label concurrency. Show all posts
Showing posts with label concurrency. Show all posts

Wednesday, November 7, 2007

Concurrent Thinking, Part 3

When I decided to dust off and publish the old post I had written, I wasn't expecting it to turn into a week-log series. And it's not. This is the last.
If you'll remember, gentle reader, I've been foreshadowing trouble. Today's the payoff. You'll remember that the reason I didn't just slurp the entire file into memory at one time and build lists from it is that I need to be able to process files that were much larger than would fit in memory. If the process reading the file can read all of it before the transformation process can handle much of it, or before the output process can write the output, the file will be read into memory in its entirety, more or less, and this entire exercise has failed. To imagine how this could happen, suppose the input comes from a local file, but the output is being written across a network. The input process could read all the file, and the transformer process could handle the data, but the messages would still pile up in the output process's mailbox, filling memory and bringing the system to its knees. Or suppose the transformation being applied involves a time-intensive computation. Its mailbox would fill up memory when the input process sends it the entire file while the first few messages are being transformed. Again, the system dies, horribly, horribly. I won't tell you how I solved this when it happened to me. Looking back, it was a nasty, nasty kludge that I only used because I was under time pressure. What I should have done was use the functional solution someone mentioned in a comment to Concurrent Thinking, Part 1. This solution involves having the input and transformation functions return a tuple with two values: data and another function. The new function will return more data and yet another function. Eventually, when there's no more data, the function will just return a flag indicating that. I won't rewrite the entire example from yesterday in this style, but the input function would look like this:
input(Filename) ->
    {ok, F} = file:open(Filename, [read]),
    input_continuation(F).

input_continuation(F) ->
    fun() ->
        case file:read(F, 1024) of
            eof ->
                file:close(F),
                eof;
            Data ->
                {Data, input_continuation(F)}
        end
    end.
Here, input/1 opens the file and calls input_continuation/1. This returns a function, A. When you call A, you get the first block of data and a new function, B. B will return the second block of data and a function C. This happens until there is no more data, at which point the file is closed and eof is returned. This solution is very Erlang-ish, and it throttles nicely so it doesn't accidentally fill up memory. End of series.

Tuesday, November 6, 2007

Concurrent Thinking, Part 2

So, briefly, the problem involves piping large amounts of data through a series of transformations, usually just one. In other languages I would use some form of lazy lists to plug together the input reader, transformations, and output writer. In Erlang, I decided to use processes. One interesting thing about Erlang processes: If you squint at them just right, they act like classes do in other languages, just without inheritance. The processes communicate by passing messages, and those messages trigger the processes to perform predefined actions. This seems very like message-passing paradigm of object oriented programming. Basically, one process reads the input and sends it to the transformer process, which in turn sends it on to the output process. Here's the pipeline I implemented in Python above (it has to be saved in a file named "erltransform.erl" to work):
  -module(erltransform).

  -export([input/2, transform/1, output/1, main/2]).

  input(Filename, NextPid) ->
      {ok, F} = file:open(Filename, [read]),
      input(F, file:read(F, 1024), NextPid).

  input(F, {ok, Data}, Pid) ->
      Pid ! Data,
      input(F, file:read(F, 1024), Pid);
  input(F, eof, Pid) ->
      io:format("closing input.~n", []),
      ok = file:close(F),
      Pid !  eof.

  transform(OutPid) ->
      transform(OutPid, 0).

  transform(OutPid, 0) ->
      receive
          eof ->
              OutPid ! eof;
          _Data ->
              transform(OutPid, 1)
      end;
  transform(OutPid, 1) ->
      receive
          eof ->
              OutPid ! eof;
          Data ->
              OutPid ! Data,
              transform(OutPid, 0)
      end.

  output(Filename) ->
      {ok, F} = file:open(Filename, [write]),
      write_output(F).

  write_output(F) ->
      receive
          eof ->
              ok = file:close(F),
              io:format("closing output. done.~n", []),
              eof;
          Data ->
              ok = file:write(F, Data),
              write_output(F)
      end.

  main(InFilename, OutFilename) ->
      OutPid = spawn(erltransform, output, [OutFilename]),
      TransformPid = spawn(erltransform, transform, [OutPid]),
      spawn(erltransform, input, [InFilename, TransformPid]).
Hmm. This is a lot longer, and it doesn't do as much. Erlang's libraries are OK, but the standard distribution doesn't have Python's batteries included approach. Since Erlang doesn't come with a library to read and write CSV files, this just reads a block of 1024 bytes, and it drops every other block. Some explanation: At the beginning of the code above, the input/2 function opens the input file, reads the first block, and calls input/3, which sends the data to the transformer PID and loops. When it reaches the end of the file, it closes the file, prints a message, and exits. The transform/1 function calls transform/2 with a flag indicating whether it should keep or drop the next block of data that's sent to it. Based on the value of this, it either sends the data on or ignores it. When it receives eof, it sends that on and exits. The output/1 function opens the file and sends it on to write_output/1. (Since both of these functions have the same arity, I had to give the second function a different name.) Output just writes the data it receives out and closes the file when told to. Finally, main/2 spawns all the processes, linking them together, then it gets out of the way. In this case, the messages being passed are simple: the data read, dropped, or written. Yesterday, I hinted that this approach got me in trouble. How? That, gentle reader, is tomorrow's episode. Stay tuned.

Monday, November 5, 2007

Concurrent Thinking, Part 1

Note: I originally wrote this back in June, but never finished it. An now, presenting part one.... Concurrency is the key idea to come to terms with in Erlang and the key tool that it gives you. It's the modeling tool to use when solving a problem. I realized this when I was implementing a code transformation pipeline. The pipeline would go input to transformation to output. I wanted to be able to interchange any of those pieces, so I could read from multiple formats, transform the data in different ways, and write the data out in different formats. And it needed to be able to handle a lot of data. On my first attempt, I approached the problem the way I might with Python: Input, transformation, and output are iterators which are just chained together. For example, the input would read CSV, the transformation would drop every other item, and the output would write it back to CSV::
  from __future__ import with_statement

  import csv

  def input(filename):
      with open(filename, 'rb') as f:
          for row in csv.reader(f):
              yield row

  def transform(rows):
      rows = iter(rows)
      while True:
          rows.next()
          yield rows.next()

  def output(rows, filename):
      with open(filename, 'wb') as f:
          writer = csv.writer(f)
          writer.writerows(rows)
That's not very useful, but you get an idea of the problem and what I was trying to do. Using iterators/generators is nice here because I don't have to worry about too much input filling up memory. Also, I don't build a list I don't need or use. Unfortunately, Erlang doesn't have iterators. I thought I would use lists instead and work from there. I could have had the input function read everything into a list, the transformation function transforms it into another list, and the output function writes a list's contents to disk. It wouldn't have been pretty, but it would have worked. Part way through coding this, I realized that Erlang has a great solution to this: concurrency.
Stay tuned tomorrow, when the hero attempts to use powerful Erlang concurrency and ends up hoisted on his own petard.