Tuesday, November 6, 2007

Concurrent Thinking, Part 2

So, briefly, the problem involves piping large amounts of data through a series of transformations, usually just one. In other languages I would use some form of lazy lists to plug together the input reader, transformations, and output writer. In Erlang, I decided to use processes. One interesting thing about Erlang processes: If you squint at them just right, they act like classes do in other languages, just without inheritance. The processes communicate by passing messages, and those messages trigger the processes to perform predefined actions. This seems very like message-passing paradigm of object oriented programming. Basically, one process reads the input and sends it to the transformer process, which in turn sends it on to the output process. Here's the pipeline I implemented in Python above (it has to be saved in a file named "erltransform.erl" to work):
  -module(erltransform).

  -export([input/2, transform/1, output/1, main/2]).

  input(Filename, NextPid) ->
      {ok, F} = file:open(Filename, [read]),
      input(F, file:read(F, 1024), NextPid).

  input(F, {ok, Data}, Pid) ->
      Pid ! Data,
      input(F, file:read(F, 1024), Pid);
  input(F, eof, Pid) ->
      io:format("closing input.~n", []),
      ok = file:close(F),
      Pid !  eof.

  transform(OutPid) ->
      transform(OutPid, 0).

  transform(OutPid, 0) ->
      receive
          eof ->
              OutPid ! eof;
          _Data ->
              transform(OutPid, 1)
      end;
  transform(OutPid, 1) ->
      receive
          eof ->
              OutPid ! eof;
          Data ->
              OutPid ! Data,
              transform(OutPid, 0)
      end.

  output(Filename) ->
      {ok, F} = file:open(Filename, [write]),
      write_output(F).

  write_output(F) ->
      receive
          eof ->
              ok = file:close(F),
              io:format("closing output. done.~n", []),
              eof;
          Data ->
              ok = file:write(F, Data),
              write_output(F)
      end.

  main(InFilename, OutFilename) ->
      OutPid = spawn(erltransform, output, [OutFilename]),
      TransformPid = spawn(erltransform, transform, [OutPid]),
      spawn(erltransform, input, [InFilename, TransformPid]).
Hmm. This is a lot longer, and it doesn't do as much. Erlang's libraries are OK, but the standard distribution doesn't have Python's batteries included approach. Since Erlang doesn't come with a library to read and write CSV files, this just reads a block of 1024 bytes, and it drops every other block. Some explanation: At the beginning of the code above, the input/2 function opens the input file, reads the first block, and calls input/3, which sends the data to the transformer PID and loops. When it reaches the end of the file, it closes the file, prints a message, and exits. The transform/1 function calls transform/2 with a flag indicating whether it should keep or drop the next block of data that's sent to it. Based on the value of this, it either sends the data on or ignores it. When it receives eof, it sends that on and exits. The output/1 function opens the file and sends it on to write_output/1. (Since both of these functions have the same arity, I had to give the second function a different name.) Output just writes the data it receives out and closes the file when told to. Finally, main/2 spawns all the processes, linking them together, then it gets out of the way. In this case, the messages being passed are simple: the data read, dropped, or written. Yesterday, I hinted that this approach got me in trouble. How? That, gentle reader, is tomorrow's episode. Stay tuned.

1 comment:

Kaleb said...

Hello matte great blog post