Saturday, February 26, 2011

More Tricks with Jruby and Flume (and HBase)

Last time, I set up Flume with a jRuby decorator and managed to process exactly one event with my jruby-flume script. Now we are going to dig in a little deeper and do something useful.

Lets say I have a stream of data that I need to capture and place into a database. I would like for my database to be updated at near-real time - by that I mean a couple of minutes of delay between seeing the data in the stream and having it show up on the database is acceptable. The data does not come to me in the format that I need to put it into the database, but for the most part, the transformations are simple. Also, the stream of data contains information that I ultimately need to live in several tables.

For my purposes at the moment HBase is the database. There is a sink plugin for flume that writes data into HBase called attr2HBase. There may be a better way to get it, but I checked out the flume-hbase branch from github and built the plugin from the plugins directory. It seems to work fine with the latest flume release (but the test cases seem to be broken.)

The hbase sink has many configuration options, but I am just going to use the most basic of them. If you set up a sink attr2HBase("some_table"), it will interpret events sent to it as data destined for the "some_table" table in hbase. In this configuration, it will only act on specially marked attributes in the event: The value of an attribute named "2hb_" will be interpreted as the row key, and any values for attributes of the form "2hb_colfam:desig" will be placed in the database under the "colfam:desig" heading on the row. Other values are ignored, and an error is generated if there is no row key. Most of this is ok for my purposes, but there are a couple of challenges. First, I have to somehow transform my data into "2hb_" form - I'll build a decorator to do this work below. The second problem is that the attr2HBase plugin will serve exactly one table per sink. What I really want is a sink that will write to a table based on criteria given within the event being processed - I have written just this in jRuby.

Ready? Keep your hands inside the vehicle at all times, and be warned that this is an active ride, you may get a little wet.

1. Install HBase. Again, I have relied on Cloudera's deb packages: "sudo apt-get hadoop-hbase-regionserver hadoop-hbase-master". You can also follow the apache's quick start guide. Once you have it installed, make sure everything works by creating a couple of tables we will need later. The details of getting HBase set up are beyond the scope of this blog, but the HBase documentation is excellent. Once you have it set up, use "hbase shell" to create a couple of tables:
chris@basqueseed:~$ hbase shell
HBase Shell; enter 'help<RETURN>' for list of supported commands.
Type "exit<RETURN>" to leave the HBase Shell
Version 0.90.1-CDH3B4, r, Mon Feb 21 12:20:55 PST 2011
>> create 'one','aaa'; create 'two', 'aaa'
0 row(s) in 1.1510 seconds
0 row(s) in 1.1110 seconds
2. Get the hbase plugin. I got it from the plugins section of the hbase branch of flume on github. Build it, put hbase-sink.jar in /usr/lib/flume/lib. You also need to ensure that hbase's main jar file is accessible on flume's classpath. I just sym-linked /usr/lib/hbase/hbase-0.90.1-CDH3B4.jar to /usr/lib/flume/lib/. Finally, you need to modify flume's configuration load the attr2hbaseSink plugin. While you are there, you may as well add the other two jRuby classes from the jruby-flume library. When you are done, your flume-site.xml should look something like this:
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
 <!--- ================================================= -->
 <!--- Flume Plugins =================================== -->
 <!--- ================================================= -->
 <property>
 <name>flume.plugin.classes</name>
  <value>com.infochimps.flume.jruby.JRubyDecorator, com.infochimps.flume.jruby.JRubySink, com.infochimps.flume.jruby.JRubySource, com.cloudera.flume.hbase.Attr2HBaseEventSink</value>

 <description>List of plugin classes to load.</description>
 </property>
</configuration>
3. We need to build a jRubyDecorator script that can take events with random ascii data and turn it into events that the Attr2HBaseEventSink can use. This is not a lot different from the "reverse.rb" script that we set up last time.
# onetwo.rb --- jRubyDecorator script

require 'java'
java_import 'com.cloudera.flume.core.EventSinkDecorator'
java_import 'com.cloudera.flume.core.Event'
java_import 'com.cloudera.flume.core.EventImpl'
class OneTwoDecorator < EventSinkDecorator
  def table
    (@one = !@one) ? "one" : "two"
  end
  def append(e)
    attrs = java::util::HashMap.new e.getAttrs
    attrs["table"] = table.to_java_bytes
    values = String.from_java_bytes( e.getBody ).scan(/.../)
    values.each_index { |i| attrs["2hb_aaa:#{i}"] = values[i].to_java_bytes }
    attrs["2hb_"] = values[0].to_java_bytes if values[0]
    super EventImpl.new( e.getBody, e.getTimestamp, e.getPriority, e.getNanos,
e.getHost, attrs )
  end
end
OneTwoDecorator.new(nil)
Last time, our decorator just changed the body field. This time, we are altering the attributes. The first thing it does is to create a new, writable copy of the attribute hash attached to the event. We then add a couple of attributes to the hash - something that we need to be careful with here is that flume will later expect this to be a HashMap with strings as keys and byte arrays for values - so here we call to_java_bytes on all of the values we insert. We could probably write a wrapper class around our attributes to ensure that this restriction is maintained.

We add three types of things to the attributes. The "table" method will alternately return the strings 'one' and 'two', so we will be tagging alternate events that pass through the decorator with {table:one} and {table:two}. Next, we take the body and split it up into three character groups, and create tags like {'2hb_aaa:1':xxx}. If these make it to the attr2HBase event sink, the will be interpreted as column data to add to an HBase table. Finally, if there were at least one set of three letter, we add a tag like {2hb_:xxx}. This will be interpreted by attr2HBase as a row key.

Lets test this part to make sure everything is working right.
chris@basqueseed:~$ flume shell -c localhost
2011-02-26 18:44:52,268 [main] INFO conf.FlumeConfiguration: Loading configurations from /etc/flume/conf
Using default admin port: 35873
Using default report port: 45678
Connecting to Flume master localhost:35873:45678...
2011-02-26 18:44:52,466 [main] INFO util.AdminRPCThrift: Connected to master at localhost:35873
==================================================
FlumeShell v0.9.3-CDH3B4
Copyright (c) Cloudera 2010, All Rights Reserved
==================================================
Type a command to execute (hint: many commands
only work when you are connected to a master node)


You may connect to a master node by typing:
    connect host[:adminport=35873[:reportport=45678]]


[flume localhost:35873:45678] exec config basquseed console 'jRubyDecorator("/usr/lib/flume/scripts/onetwo.rb") console'
[id: 2] Execing command : config
Command succeeded
[flume localhost:35873:45678] quit
Disconnected!
Now check to see if it does what we want:
chris@basqueseed:~$ flume node_nowatch
2011-02-26 18:48:43,835 [main] INFO agent.FlumeNode: Flume 0.9.3-CDH3B4
2011-02-26 18:48:43,836 [main] INFO agent.FlumeNode:  rev 822c62f0c13ab76921e96dd92e19f68007dbcbe2
2011-02-26 18:48:43,836 [main] INFO agent.FlumeNode: Compiled  on Mon Feb 21 13:01:39 PST 2011
...
2011-02-26 18:49:06,818 [SpawningLogicalNode basqueseed] INFO agent.LogicalNode: Node config successfully set to com.cloudera.flume.conf.FlumeConfigData@156d7c8
2011-02-26 18:49:06,825 [logicalNode basqueseed-20] INFO agent.LogicalNode: Connector started: LazyOpenSource | LazyOpenDecorator
2011-02-26 18:49:06,825 [logicalNode basqueseed-20] INFO console.JLineStdinSource: Opening stdin source
aaabbbccc 
2011-02-26 18:50:31,719 [logicalNode basqueseed-20] INFO debug.ConsoleEventSink: ConsoleEventSink( debug ) opened
basqueseed [INFO Sat Feb 26 18:50:31 CST 2011] { 2hb_ : aaa } { 2hb_aaa:0 : aaa } { 2hb_aaa:1 : bbb } { 2hb_aaa:2 : ccc } { table : one } 
aaabbbccc
cccaaabbb  
basqueseed [INFO Sat Feb 26 18:50:37 CST 2011] { 2hb_ : ccc } { 2hb_aaa:0 : ccc } { 2hb_aaa:1 : aaa } { 2hb_aaa:2 : bbb } { table : two } cccaaabbb
This looks good: we have row keys marked with "2hb_", column data market with "2hb_aaa:x", and we are alternately adding different table tags.

4. Now, we are going to build a jRubySink script to put our data into HBase. We already have an existing sink that will put data into a particular table: using attr2HBase("table") does that just fine. The trouble with it is that you need a separate sink for each table. If you dig around in the Flume source code, you will find a sink called com.cloudera.flume.handlers.hdfs.EscapedCustomDfsSink. This class does the bucketing magic for the collectorSink. It builds a path to a file to write to from data in the event, and it maintains a HashMap of sinks associated with these files. If if encounters a file name it has not seen before, it dynamically generates a new sink for the file and sends the event to it. We are going to do the same thing in jRuby, only with hbase tables instead of files. Here is the script:
# hbase.rb --- jRubySink script
# This script dynamically creates Attr2HBaseEventSink instances
# based on the value of an event's "table" attribute.
require 'java'
java_import 'com.cloudera.flume.core.Event'
java_import 'com.cloudera.flume.core.EventImpl'
java_import 'com.infochimps.flume.jruby.JRubySink'
java_import 'com.cloudera.flume.hbase.Attr2HBaseEventSink'

# Define a class that extends the EventSink.Base class
class HBase < JRubySink
  def initialize
    super
    # Make sure that we have an available empty hash for storing
    # connections to tables.
    @tables = {}
  end

  def get_table( table )
    unless @tables[table]
      # Create a new Attr2HBaseEventSink instance
      @tables[table] = Attr2HBaseEventSink.new(table,"",false,"","","2hb_",0,true)
      @tables[table].open
    end
    @tables[table]
  end

  def append(e)
    table = String.from_java_bytes e.getAttrs[ "table" ]
    throw new IOException( "No table!" ) unless table
    sink = get_table(table)
    throw new IOException( "No sink!" ) unless sink
    sink.append(e)
  end

  def close
    # Close any of the dynamic sinks that we opened
    @tables.each_value do |sink|
      sink.close
    end
    # Forget about the old sinks - create new ones on demand
    @tables={}
    super
  end
end

#Create an instance and return it to the eval function back in java land.
HBase.new
The get_table method is where the magic happens. One kind of clunky thing about this is that I am calling the java constructor for the sink. Flume's configuration language offers friendlier ways to construct collectors - I just have not figured out how to interface to it from jRuby. Basically this will open up connections to hbase on demand and leave them open indefinitely. It is probably a good idea to put some kind of roll() decorator on a stream that uses this to periodically close connections, which will eliminate ones that are not active (active ones will reopen automatically as they are needed...).

Once again, lets check to see if it works - first set up a dataflow from the console through the onetwo.rb decorator and into the hbase.rb sink:
chris@basqueseed:~$ flume shell -c localhost -e "exec config basqueseed console 'jRubyDecorator(\"/usr/lib/flume/scripts/onetwo.rb\") jRubySink(\"/usr/lib/flume/scripts/hbase.rb\")'"
2011-02-26 19:27:46,488 [main] INFO conf.FlumeConfiguration: Loading configurations from /etc/flume/conf
Using default admin port: 35873
Using default report port: 45678
Connecting to Flume master localhost:35873:45678...
2011-02-26 19:27:46,680 [main] INFO util.AdminRPCThrift: Connected to master at localhost:35873
[id: 3] Execing command : config
Command succeeded
Next, check the node
chris@basqueseed:~$ flume node_nowatch
2011-02-26 19:30:41,015 [main] INFO agent.FlumeNode: Flume 0.9.3-CDH3B4
2011-02-26 19:30:41,017 [main] INFO agent.FlumeNode:  rev 822c62f0c13ab76921e96dd92e19f68007dbcbe2
...
2011-02-26 19:30:50,480 [logicalNode basqueseed-16] INFO agent.LogicalNode: basqueseed: Connector stopped: LazyOpenSource | LazyOpenDecorator
2011-02-26 19:30:50,688 [main] INFO agent.FlumeNode: Hadoop Security enabled: false
aaabbbccc
2011-02-26 19:30:59,033 [SpawningLogicalNode basqueseed] WARN debug.LazyOpenDecorator: Closing a lazy sink that was not logically opened
2011-02-26 19:30:59,033 [SpawningLogicalNode basqueseed] INFO agent.LogicalNode: Node config successfully set to com.cloudera.flume.conf.FlumeConfigData@63e563
2011-02-26 19:30:59,034 [logicalNode basqueseed-21] INFO agent.LogicalNode: Connector started: LazyOpenSource | LazyOpenDecorator
2011-02-26 19:30:59,034 [logicalNode basqueseed-21] INFO console.JLineStdinSource: Opening stdin source
aaabbbccc
cccbbbaaa
abcabcabc
defdefdef
Well, something might be happening, but if it is, it is not happening here. Lets go look at hbase:
chris@basqueseed:~$ hbase shell
HBase Shell; enter 'help<RETURN>' for list of supported commands.
Type "exit<RETURN>" to leave the HBase Shell
Version 0.90.1-CDH3B4, r, Mon Feb 21 12:20:55 PST 2011
>> scan 'one'
ROW                   COLUMN+CELL                                            

 aaa                  column=aaa:0, timestamp=1298770259108, value=aaa
 aaa                  column=aaa:1, timestamp=1298770259108, value=bbb
 aaa                  column=aaa:2, timestamp=1298770259108, value=ccc
 abc                  column=aaa:0, timestamp=1298770284298, value=abc
 abc                  column=aaa:1, timestamp=1298770284298, value=abc
 abc                  column=aaa:2, timestamp=1298770284298, value=abc
2 row(s) in 0.9200 seconds

>> scan 'two'
ROW                   COLUMN+CELL

 ccc                  column=aaa:0, timestamp=1298770266971, value=ccc
 ccc                  column=aaa:1, timestamp=1298770266971, value=bbb
 ccc                  column=aaa:2, timestamp=1298770266971, value=aaa
 def                  column=aaa:0, timestamp=1298770287205, value=def
 def                  column=aaa:1, timestamp=1298770287205, value=def
 def                  column=aaa:2, timestamp=1298770287205, value=def
2 row(s) in 0.1410 seconds
I don't know about you, but I think that this is pretty cool!

Lets try again with some more data this time we will use one of Flume's debugging sources - asciisynth, which we will set up to generate 10000 events, each with a random 12 character body. From this we should expect a little less than 5000 rows to be written to each of our two hbase tables (with only three character keys, there are bound to be a few collisions.)
chris@basqueseed:~$ flume shell -c localhost -e "exec config basqueseed 'asciisynth(10000,12)' 'jRubyDecorator(\"/usr/lib/flume/scripts/onetwo.rb\") jRubySink(\"/usr/lib/flume/scripts/hbase.rb\")'"
2011-02-26 19:42:32,992 [main] INFO conf.FlumeConfiguration: Loading configurations from /etc/flume/conf
Using default admin port: 35873
Using default report port: 45678
Connecting to Flume master localhost:35873:45678...
2011-02-26 19:42:33,143 [main] INFO util.AdminRPCThrift: Connected to master at localhost:35873
[id: 4] Execing command : config
Command succeeded
I left my node running from before. The master talked to it and reconfigured it, and here is the output:
2011-02-26 19:42:36,028 [SpawningLogicalNode basqueseed] INFO console.JLineStdinSource: Closing stdin source
2011-02-26 19:42:36,070 [logicalNode basqueseed-21] INFO console.JLineStdinSource: Closing stdin source
2011-02-26 19:42:36,070 [logicalNode basqueseed-21] WARN console.JLineStdinSource: Double close on Stdin Char Source
2011-02-26 19:42:36,070 [logicalNode basqueseed-21] WARN debug.LazyOpenDecorator: Closing a lazy sink that was not logically opened
2011-02-26 19:42:36,071 [logicalNode basqueseed-21] INFO agent.LogicalNode: basqueseed: Connector stopped: LazyOpenSource | LazyOpenDecorator
2011-02-26 19:42:36,071 [SpawningLogicalNode basqueseed] INFO agent.LogicalNode: Node config successfully set to com.cloudera.flume.conf.FlumeConfigData@179567c
2011-02-26 19:42:36,072 [logicalNode basqueseed-37] INFO agent.LogicalNode: Connector started: LazyOpenSource | LazyOpenDecorator
2011-02-26 19:42:36,072 [logicalNode basqueseed-37] INFO debug.SynthSource: Resetting count and seed; openingSynthSource(10000, 12 )
2011-02-26 19:42:59,854 [logicalNode basqueseed-37] INFO debug.SynthSource: closing SynthSource(10000, 12 )
2011-02-26 19:42:59,856 [logicalNode basqueseed-37] INFO agent.LogicalNode: basqueseed: Connector stopped: LazyOpenSource | LazyOpenDecorator
So, it took about 30 seconds to generate the 10000 events (you can tell from the timestamps on when it opened and closed the synthScource.) I am not sure if that translates to how long it took to write stuff to the database - I think that a study of the performance of jRuby scripts in flume is warrented, but out of the scope of this particular blog entry.

Lets look at the database. There are too many entries to list, but we can get some summary information:
>> count 'one'
Current count: 1000, row: %K|

Current count: 2000, row: A p
Current count: 3000, row: \|\
Current count: 4000, row: wW
4278 row(s) in 1.3820 seconds

>> count 'two'
Current count: 1000, row: $#r

Current count: 2000, row: @ H
Current count: 3000, row: \X
Current count: 4000, row: w^H

4271 row(s) in 1.0370 seconds

>> scan 'one', {LIMIT=>5}
ROW                   COLUMN+CELL

                      column=aaa:0, timestamp=1298770979440, value=   
                      column=aaa:1, timestamp=1298770979440, value=d R
                      column=aaa:2, timestamp=1298770979440, value= ib
                      column=aaa:3, timestamp=1298770979440, value=  z
   !                  column=aaa:0, timestamp=1298770973275, value=  !
   !                  column=aaa:1, timestamp=1298770973275, value=sY7
   !                  column=aaa:2, timestamp=1298770973275, value=  J
   !                  column=aaa:3, timestamp=1298770973275, value= mN
   "                  column=aaa:0, timestamp=1298770976072, value=  "
   "                  column=aaa:1, timestamp=1298770976072, value=w
   "                  column=aaa:2, timestamp=1298770976072, value=Y,X
   "                  column=aaa:3, timestamp=1298770976072, value= }O
   #                  column=aaa:0, timestamp=1298770970896, value=  #
   #                  column=aaa:1, timestamp=1298770970896, value=?"

   #                  column=aaa:2, timestamp=1298770970896, value=[!_
   #                  column=aaa:3, timestamp=1298770970896, value=Q I
   $                  column=aaa:0, timestamp=1298770979688, value=  $
   $                  column=aaa:1, timestamp=1298770979688, value=! i
   $                  column=aaa:2, timestamp=1298770979688, value=  (
   $                  column=aaa:3, timestamp=1298770979688, value=:N!
5 row(s) in 0.2160 seconds

>> scan 'two', {LIMIT=>5}
ROW                   COLUMN+CELL

                      column=aaa:0, timestamp=1298770979742, value=  
                      column=aaa:1, timestamp=1298770979742, value=( 1
                      column=aaa:2, timestamp=1298770979742, value= oV
                      column=aaa:3, timestamp=1298770979742, value= }
   !                  column=aaa:0, timestamp=1298770978751, value=  !
   !                  column=aaa:1, timestamp=1298770978751, value=c7T
   !                  column=aaa:2, timestamp=1298770978751, value=7F
   !                  column=aaa:3, timestamp=1298770978751, value=5C2
   "                  column=aaa:0, timestamp=1298770969476, value=  "
   "                  column=aaa:1, timestamp=1298770969476, value=v d
   "                  column=aaa:2, timestamp=1298770969476, value=E@l
   "                  column=aaa:3, timestamp=1298770969476, value=Z M
   #                  column=aaa:0, timestamp=1298770979759, value=  #
   #                  column=aaa:1, timestamp=1298770979759, value={xd
   #                  column=aaa:2, timestamp=1298770979759, value=  z
   #                  column=aaa:3, timestamp=1298770979759, value=?)3
   $                  column=aaa:0, timestamp=1298770972873, value=  $
   $                  column=aaa:1, timestamp=1298770972873, value=? L
   $                  column=aaa:2, timestamp=1298770972873, value= f*
   $                  column=aaa:3, timestamp=1298770972873, value= v[
5 row(s) in 0.1530 seconds
 
It seems to have worked as we expected that it should.

Conclusions?

In review, we built a decorator that did some on-the-fly data processing. Not very exciting really - you could probably do much of the same thing with existing flume decorators, but it would not be pretty. The hysteresis behavior that forked the one stream of data into two separate tables might be something of a challenge. While this particular decorator was not very interesting, I am sure that there are many interesting things you could do in a jRuby decorator.

We also build a somewhat sophisticated jRubySink. It extends the usefulness of an already written HBase sink by dynamically creating more database connections as dictated by the events on the flume. I feel a little bit like Sun Wukong stealing the peaches of immortality on this one - some of the deep magic that is happens in the heart of flume is accessible to us mere jRuby code-monkeys!

I think that there is more potential here, too. I hope that I have given enough examples to spark off some ideas in your head.

Wednesday, February 23, 2011

Simple Flume Decorators with JRuby

Flume is a framework for moving chunks of data around on your network. It's primary mission isto move log data from where it is generated (perhaps a web server) to someplace where it can actually be used   - like an HDFS file system where it can be crunched by Hadoop. Flume's design is very flexible - the final destination for your data could also be a database like HBase or Cassandra, a search index system like Elastic Search, another file system like an S3 bucket, or any of a myriad of other configurations. Flume will also go to some efforts to make sure that your data is delivered reliably - it includes some tunable reliability features out of the box.

The Flume User Guide does a good job of explaining how its component architecture works. You can configure data flows by chaining together systems of "nodes" - each node is a data moving unit - each has an input ("source") and an output ("sink"). Sources can conceivably be anything that produces data - flume can tail sets of files on disk, listen to network ports, periodically run programs, etc. Sinks are a little more interesting - they can write data to disk, push data into an network connection, or into a database. Even more interesting, sinks can be composites - you can fan data out to many other sinks, or set up conditional sinks where if data fails to be accepted by the first sink, it will instead be routed to a second sink.  Also, you can build "Decorators" that can modify the data as it moves down the path. Flume offers many sources, sinks, and decorators out of the box - but also gives you the ability to write your own through a Java-based plugin API.

Flume chops the data up into a series of "events". For instance, if you are using flume to tail a log file, every line written to the file gets turned into a flume event. Each event carries with it the body of the data that produced it, as well as some meta-data: the machine that it was collected on, a time-stamp showing when the data was collected, the event's priority, etc. You can even add your own key-value pairs to an event in its attributes. Flume sinks can store both the body data and the metadata of an event  or in some cases, use the metadata to help ensure that the data lands in the right place - like with the "collectorSink" file bucketing mechanism.

To me, and to some of the other primates I work with at Infochimps, decorators are especially interesting. In a decorator, you get to do some processing on your data as it flies from wherever it was produced to its final destination(s). Flume comes with a handful of basic decorators that will allow you to do some small scale processing of flume events. For instance, the "value" decorator lets you set a value in the metadata of an event. The out-of-the-box decorators are not quite sufficient to handle my processing demands. I wanted a little more flexibility, so I wrote (in less time than it took me to write this blog entry) a quick interface to jRuby.  Now I have access to my flume data in transit with a powerful scripting engine.

Enough with the intro - lets jump in. The following steps will lead you down the road to processing data on the fly through the flume with a jRuby script:

1. Install flume. Cloudera has good documentation on how to get it set up. I run Ubuntu, so I just added the cloudera apt  package repository to my apt sources, and used "apt-get" to install the packages flume, flume-node and flume-master.

2. Get jRuby. If you use apt-get, you will be getting a slightly out-of-date version, but it will do for the moment. The jRuby website has more details If you need it.

3. Get my jRubyPlugin. For it to work, you have to have it and jruby.jar in Flume's classpath. You can make custom adjustments to Flume's runtime environment, including Flume classpath changes in the flume-env.sh script in flume/bin.  The easy way is to just drop jruby-flume.jar in the /usr/lib/flume/lib directory (or wherever flume landed in your install process). Getting your jRuby envornment completely set up so that you can see jruby gems and stuff is going to involve making adjustments to your environment, but for now, just having the jruby.jar on the classpath will work. I just created a symbolic link to /usr/lib/jruby/lib/jruby.jar in /usr/lib/flume/lib.

( Aside: I don't know the full answer to getting everything jruby set up in an embedded mode. However, if you add the following to your flume-env.sh script, you will be at least part of the way there
export UOPTS="-Djruby.home=/usr/lib/jruby -Djruby.lib=/usr/lib/jruby/lib -Djruby.script=jruby"
)

4. You have to tell flume explicitly what classes to load as plugins when the nodes start up. To do this, create or edit "flume-site.xml" in the flume/conf directory. It should contain at least the following:
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
 <!--- ====================================== -->
 <!--- Flume Plugins ======================== -->
 <!--- ====================================== -->
 <property>
  <name>flume.plugin.classes</name>
  <value>com.infochimps.flume.jruby.JRubyDecorator</value>
  <description>List of plugin classes to load.</description>
 </property>
</configuration>
After you get this in place, restart your flume-master and flume-node. If everything went ok, the services will start up, and you can go to http://localhost:35871/masterext.jsp to see if the plugin loaded successfully. If you see "jRubyDecorator" listed under the decorators, you are in business.

5. Ok, now let's build a decorator that does something simple. Create a directory somewhere to keep ruby scripts for flume - I like /usr/lib/flume/scripts. The files in this directory need to be readable by the user that flume is running as. Also, if you are in a distributed world, scripts are going to have to be available both on the master and on the machine that will house the logical node that will run the script.

Here is a simple script. Put it in /usr/lib/flume/scripts/reverse.rb:
# reverse.rb  --- jRubyDecorator script
require 'java'
java_import 'com.cloudera.flume.core.EventSinkDecorator'
java_import 'com.cloudera.flume.core.Event'
java_import 'com.cloudera.flume.core.EventImpl'
 
class ReverseDecorator < EventSinkDecorator
  def append(e)
    body = String.from_java_bytes e.getBody
    super EventImpl.new( body.reverse.to_java_bytes, e.getTimestamp, e.getPriority, e.getNanos, e.getHost, e.getAttrs )
  end
end
ReverseDecorator.new(nil)
What does it do? Well, it defines a subclass of com.cloudera.flume.core.EventSinkDecorator which redefines the append method. Our special append method builds a new event from an appended event, except that the text of the "body" field is reversed. Not too much nonsense, but we do have to be a little careful with strings. Flume likes its data to be represented as arrays of bytes, but ruby would prefer to deal with strings as Strings, so I convert both ways: String.from_java_bytes() to get a string object, and the to_java_bytes() method on string-like objects to convert back. Hidden in there, is the ruby string method "reverse".

The last line of the append method shows off some of the power of jRuby. It creates a new instance of EventImpl and passes it off to EventSinkDecorator's implementation of append - basically letting the parent class handle all of the difficult work.

Finally, the last line of the script instantiates a new object of the (jRuby!) ReverseDecorator class and returns it to jRubyDecorator. jRubyDecorator is really a factory class for producing decorator instances. It passes off our stuff as a java object, and flume never suspects what has happened.

Does it work? Lets see:
chris@basqueseed:~$ flume shell -c localhost
2011-02-23 17:35:03,785 [main] INFO conf.FlumeConfiguration: Loading configurations from /etc/flume/conf
Using default admin port: 35873
Using default report port: 45678
Connecting to Flume master localhost:35873:45678...
2011-02-23 17:35:03,993 [main] INFO util.AdminRPCThrift: Connected to master at localhost:35873
==================================================
FlumeShell v0.9.3-CDH3B4
Copyright (c) Cloudera 2010, All Rights Reserved
==================================================
Type a command to execute (hint: many commands
only work when you are connected to a master node)

You may connect to a master node by typing:
    connect host[:adminport=35873[:reportport=45678]]

[flume localhost:35873:45678] exec config basqueseed console '{jRubyDecorator("/usr/lib/flume/scripts/reverse.rb")=>console}'
[id: 0] Execing command : config
Command succeeded
[flume localhost:35873:45678] quit
 So far, so good - the master node has decided that everything is kosher. By the way, be careful with the single and double quotes in flume shell commands. The flume shell is very picky about its input. If you have any structure to your sources or sinks, you must single quote the declaration. Lets now play with with a node:
chris@basqueseed:~$ sudo /etc/init.d/flume-node stop
[sudo] password for chris:
Stopping Flume node daemon (flume-node): stopping node

chris@basqueseed:~$ sudo -u flume flume node_nowatch
/2011-02-23 17:38:21,709 [main] INFO agent.FlumeNode: Flume 0.9.3-CDH3B4
2011-02-23 17:38:21,710 [main] INFO agent.FlumeNode:  rev 822c62f0c13ab76921e96dd92e19f68007dbcbe2
2011-02-23 17:38:21,710 [main] INFO agent.FlumeNode: Compiled  on Mon Feb 21 13:01:39 PST 2011
...{stuff deleted}...
2011-02-23 17:39:40,471 [logicalNode basqueseed-20] INFO console.JLineStdinSource: Opening stdin source
?was I tam a ti saW 

2011-02-23 17:39:45,720 [logicalNode basqueseed-20] INFO debug.ConsoleEventSink: ConsoleEventSink( debug ) opened
basqueseed [INFO Wed Feb 23 17:39:45 CST 2011] Was it a mat I saw?
Holy chute! It works!

I think that is enough for today. Next time, I'll try some more complicated scripts, deal with attributes and play some games with data flows.