Saturday, February 26, 2011

More Tricks with Jruby and Flume (and HBase)

Last time, I set up Flume with a jRuby decorator and managed to process exactly one event with my jruby-flume script. Now we are going to dig in a little deeper and do something useful.

Lets say I have a stream of data that I need to capture and place into a database. I would like for my database to be updated at near-real time - by that I mean a couple of minutes of delay between seeing the data in the stream and having it show up on the database is acceptable. The data does not come to me in the format that I need to put it into the database, but for the most part, the transformations are simple. Also, the stream of data contains information that I ultimately need to live in several tables.

For my purposes at the moment HBase is the database. There is a sink plugin for flume that writes data into HBase called attr2HBase. There may be a better way to get it, but I checked out the flume-hbase branch from github and built the plugin from the plugins directory. It seems to work fine with the latest flume release (but the test cases seem to be broken.)

The hbase sink has many configuration options, but I am just going to use the most basic of them. If you set up a sink attr2HBase("some_table"), it will interpret events sent to it as data destined for the "some_table" table in hbase. In this configuration, it will only act on specially marked attributes in the event: The value of an attribute named "2hb_" will be interpreted as the row key, and any values for attributes of the form "2hb_colfam:desig" will be placed in the database under the "colfam:desig" heading on the row. Other values are ignored, and an error is generated if there is no row key. Most of this is ok for my purposes, but there are a couple of challenges. First, I have to somehow transform my data into "2hb_" form - I'll build a decorator to do this work below. The second problem is that the attr2HBase plugin will serve exactly one table per sink. What I really want is a sink that will write to a table based on criteria given within the event being processed - I have written just this in jRuby.

Ready? Keep your hands inside the vehicle at all times, and be warned that this is an active ride, you may get a little wet.

1. Install HBase. Again, I have relied on Cloudera's deb packages: "sudo apt-get hadoop-hbase-regionserver hadoop-hbase-master". You can also follow the apache's quick start guide. Once you have it installed, make sure everything works by creating a couple of tables we will need later. The details of getting HBase set up are beyond the scope of this blog, but the HBase documentation is excellent. Once you have it set up, use "hbase shell" to create a couple of tables:
chris@basqueseed:~$ hbase shell
HBase Shell; enter 'help<RETURN>' for list of supported commands.
Type "exit<RETURN>" to leave the HBase Shell
Version 0.90.1-CDH3B4, r, Mon Feb 21 12:20:55 PST 2011
>> create 'one','aaa'; create 'two', 'aaa'
0 row(s) in 1.1510 seconds
0 row(s) in 1.1110 seconds
2. Get the hbase plugin. I got it from the plugins section of the hbase branch of flume on github. Build it, put hbase-sink.jar in /usr/lib/flume/lib. You also need to ensure that hbase's main jar file is accessible on flume's classpath. I just sym-linked /usr/lib/hbase/hbase-0.90.1-CDH3B4.jar to /usr/lib/flume/lib/. Finally, you need to modify flume's configuration load the attr2hbaseSink plugin. While you are there, you may as well add the other two jRuby classes from the jruby-flume library. When you are done, your flume-site.xml should look something like this:
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
 <!--- ================================================= -->
 <!--- Flume Plugins =================================== -->
 <!--- ================================================= -->
 <property>
 <name>flume.plugin.classes</name>
  <value>com.infochimps.flume.jruby.JRubyDecorator, com.infochimps.flume.jruby.JRubySink, com.infochimps.flume.jruby.JRubySource, com.cloudera.flume.hbase.Attr2HBaseEventSink</value>

 <description>List of plugin classes to load.</description>
 </property>
</configuration>
3. We need to build a jRubyDecorator script that can take events with random ascii data and turn it into events that the Attr2HBaseEventSink can use. This is not a lot different from the "reverse.rb" script that we set up last time.
# onetwo.rb --- jRubyDecorator script

require 'java'
java_import 'com.cloudera.flume.core.EventSinkDecorator'
java_import 'com.cloudera.flume.core.Event'
java_import 'com.cloudera.flume.core.EventImpl'
class OneTwoDecorator < EventSinkDecorator
  def table
    (@one = !@one) ? "one" : "two"
  end
  def append(e)
    attrs = java::util::HashMap.new e.getAttrs
    attrs["table"] = table.to_java_bytes
    values = String.from_java_bytes( e.getBody ).scan(/.../)
    values.each_index { |i| attrs["2hb_aaa:#{i}"] = values[i].to_java_bytes }
    attrs["2hb_"] = values[0].to_java_bytes if values[0]
    super EventImpl.new( e.getBody, e.getTimestamp, e.getPriority, e.getNanos,
e.getHost, attrs )
  end
end
OneTwoDecorator.new(nil)
Last time, our decorator just changed the body field. This time, we are altering the attributes. The first thing it does is to create a new, writable copy of the attribute hash attached to the event. We then add a couple of attributes to the hash - something that we need to be careful with here is that flume will later expect this to be a HashMap with strings as keys and byte arrays for values - so here we call to_java_bytes on all of the values we insert. We could probably write a wrapper class around our attributes to ensure that this restriction is maintained.

We add three types of things to the attributes. The "table" method will alternately return the strings 'one' and 'two', so we will be tagging alternate events that pass through the decorator with {table:one} and {table:two}. Next, we take the body and split it up into three character groups, and create tags like {'2hb_aaa:1':xxx}. If these make it to the attr2HBase event sink, the will be interpreted as column data to add to an HBase table. Finally, if there were at least one set of three letter, we add a tag like {2hb_:xxx}. This will be interpreted by attr2HBase as a row key.

Lets test this part to make sure everything is working right.
chris@basqueseed:~$ flume shell -c localhost
2011-02-26 18:44:52,268 [main] INFO conf.FlumeConfiguration: Loading configurations from /etc/flume/conf
Using default admin port: 35873
Using default report port: 45678
Connecting to Flume master localhost:35873:45678...
2011-02-26 18:44:52,466 [main] INFO util.AdminRPCThrift: Connected to master at localhost:35873
==================================================
FlumeShell v0.9.3-CDH3B4
Copyright (c) Cloudera 2010, All Rights Reserved
==================================================
Type a command to execute (hint: many commands
only work when you are connected to a master node)


You may connect to a master node by typing:
    connect host[:adminport=35873[:reportport=45678]]


[flume localhost:35873:45678] exec config basquseed console 'jRubyDecorator("/usr/lib/flume/scripts/onetwo.rb") console'
[id: 2] Execing command : config
Command succeeded
[flume localhost:35873:45678] quit
Disconnected!
Now check to see if it does what we want:
chris@basqueseed:~$ flume node_nowatch
2011-02-26 18:48:43,835 [main] INFO agent.FlumeNode: Flume 0.9.3-CDH3B4
2011-02-26 18:48:43,836 [main] INFO agent.FlumeNode:  rev 822c62f0c13ab76921e96dd92e19f68007dbcbe2
2011-02-26 18:48:43,836 [main] INFO agent.FlumeNode: Compiled  on Mon Feb 21 13:01:39 PST 2011
...
2011-02-26 18:49:06,818 [SpawningLogicalNode basqueseed] INFO agent.LogicalNode: Node config successfully set to com.cloudera.flume.conf.FlumeConfigData@156d7c8
2011-02-26 18:49:06,825 [logicalNode basqueseed-20] INFO agent.LogicalNode: Connector started: LazyOpenSource | LazyOpenDecorator
2011-02-26 18:49:06,825 [logicalNode basqueseed-20] INFO console.JLineStdinSource: Opening stdin source
aaabbbccc 
2011-02-26 18:50:31,719 [logicalNode basqueseed-20] INFO debug.ConsoleEventSink: ConsoleEventSink( debug ) opened
basqueseed [INFO Sat Feb 26 18:50:31 CST 2011] { 2hb_ : aaa } { 2hb_aaa:0 : aaa } { 2hb_aaa:1 : bbb } { 2hb_aaa:2 : ccc } { table : one } 
aaabbbccc
cccaaabbb  
basqueseed [INFO Sat Feb 26 18:50:37 CST 2011] { 2hb_ : ccc } { 2hb_aaa:0 : ccc } { 2hb_aaa:1 : aaa } { 2hb_aaa:2 : bbb } { table : two } cccaaabbb
This looks good: we have row keys marked with "2hb_", column data market with "2hb_aaa:x", and we are alternately adding different table tags.

4. Now, we are going to build a jRubySink script to put our data into HBase. We already have an existing sink that will put data into a particular table: using attr2HBase("table") does that just fine. The trouble with it is that you need a separate sink for each table. If you dig around in the Flume source code, you will find a sink called com.cloudera.flume.handlers.hdfs.EscapedCustomDfsSink. This class does the bucketing magic for the collectorSink. It builds a path to a file to write to from data in the event, and it maintains a HashMap of sinks associated with these files. If if encounters a file name it has not seen before, it dynamically generates a new sink for the file and sends the event to it. We are going to do the same thing in jRuby, only with hbase tables instead of files. Here is the script:
# hbase.rb --- jRubySink script
# This script dynamically creates Attr2HBaseEventSink instances
# based on the value of an event's "table" attribute.
require 'java'
java_import 'com.cloudera.flume.core.Event'
java_import 'com.cloudera.flume.core.EventImpl'
java_import 'com.infochimps.flume.jruby.JRubySink'
java_import 'com.cloudera.flume.hbase.Attr2HBaseEventSink'

# Define a class that extends the EventSink.Base class
class HBase < JRubySink
  def initialize
    super
    # Make sure that we have an available empty hash for storing
    # connections to tables.
    @tables = {}
  end

  def get_table( table )
    unless @tables[table]
      # Create a new Attr2HBaseEventSink instance
      @tables[table] = Attr2HBaseEventSink.new(table,"",false,"","","2hb_",0,true)
      @tables[table].open
    end
    @tables[table]
  end

  def append(e)
    table = String.from_java_bytes e.getAttrs[ "table" ]
    throw new IOException( "No table!" ) unless table
    sink = get_table(table)
    throw new IOException( "No sink!" ) unless sink
    sink.append(e)
  end

  def close
    # Close any of the dynamic sinks that we opened
    @tables.each_value do |sink|
      sink.close
    end
    # Forget about the old sinks - create new ones on demand
    @tables={}
    super
  end
end

#Create an instance and return it to the eval function back in java land.
HBase.new
The get_table method is where the magic happens. One kind of clunky thing about this is that I am calling the java constructor for the sink. Flume's configuration language offers friendlier ways to construct collectors - I just have not figured out how to interface to it from jRuby. Basically this will open up connections to hbase on demand and leave them open indefinitely. It is probably a good idea to put some kind of roll() decorator on a stream that uses this to periodically close connections, which will eliminate ones that are not active (active ones will reopen automatically as they are needed...).

Once again, lets check to see if it works - first set up a dataflow from the console through the onetwo.rb decorator and into the hbase.rb sink:
chris@basqueseed:~$ flume shell -c localhost -e "exec config basqueseed console 'jRubyDecorator(\"/usr/lib/flume/scripts/onetwo.rb\") jRubySink(\"/usr/lib/flume/scripts/hbase.rb\")'"
2011-02-26 19:27:46,488 [main] INFO conf.FlumeConfiguration: Loading configurations from /etc/flume/conf
Using default admin port: 35873
Using default report port: 45678
Connecting to Flume master localhost:35873:45678...
2011-02-26 19:27:46,680 [main] INFO util.AdminRPCThrift: Connected to master at localhost:35873
[id: 3] Execing command : config
Command succeeded
Next, check the node
chris@basqueseed:~$ flume node_nowatch
2011-02-26 19:30:41,015 [main] INFO agent.FlumeNode: Flume 0.9.3-CDH3B4
2011-02-26 19:30:41,017 [main] INFO agent.FlumeNode:  rev 822c62f0c13ab76921e96dd92e19f68007dbcbe2
...
2011-02-26 19:30:50,480 [logicalNode basqueseed-16] INFO agent.LogicalNode: basqueseed: Connector stopped: LazyOpenSource | LazyOpenDecorator
2011-02-26 19:30:50,688 [main] INFO agent.FlumeNode: Hadoop Security enabled: false
aaabbbccc
2011-02-26 19:30:59,033 [SpawningLogicalNode basqueseed] WARN debug.LazyOpenDecorator: Closing a lazy sink that was not logically opened
2011-02-26 19:30:59,033 [SpawningLogicalNode basqueseed] INFO agent.LogicalNode: Node config successfully set to com.cloudera.flume.conf.FlumeConfigData@63e563
2011-02-26 19:30:59,034 [logicalNode basqueseed-21] INFO agent.LogicalNode: Connector started: LazyOpenSource | LazyOpenDecorator
2011-02-26 19:30:59,034 [logicalNode basqueseed-21] INFO console.JLineStdinSource: Opening stdin source
aaabbbccc
cccbbbaaa
abcabcabc
defdefdef
Well, something might be happening, but if it is, it is not happening here. Lets go look at hbase:
chris@basqueseed:~$ hbase shell
HBase Shell; enter 'help<RETURN>' for list of supported commands.
Type "exit<RETURN>" to leave the HBase Shell
Version 0.90.1-CDH3B4, r, Mon Feb 21 12:20:55 PST 2011
>> scan 'one'
ROW                   COLUMN+CELL                                            

 aaa                  column=aaa:0, timestamp=1298770259108, value=aaa
 aaa                  column=aaa:1, timestamp=1298770259108, value=bbb
 aaa                  column=aaa:2, timestamp=1298770259108, value=ccc
 abc                  column=aaa:0, timestamp=1298770284298, value=abc
 abc                  column=aaa:1, timestamp=1298770284298, value=abc
 abc                  column=aaa:2, timestamp=1298770284298, value=abc
2 row(s) in 0.9200 seconds

>> scan 'two'
ROW                   COLUMN+CELL

 ccc                  column=aaa:0, timestamp=1298770266971, value=ccc
 ccc                  column=aaa:1, timestamp=1298770266971, value=bbb
 ccc                  column=aaa:2, timestamp=1298770266971, value=aaa
 def                  column=aaa:0, timestamp=1298770287205, value=def
 def                  column=aaa:1, timestamp=1298770287205, value=def
 def                  column=aaa:2, timestamp=1298770287205, value=def
2 row(s) in 0.1410 seconds
I don't know about you, but I think that this is pretty cool!

Lets try again with some more data this time we will use one of Flume's debugging sources - asciisynth, which we will set up to generate 10000 events, each with a random 12 character body. From this we should expect a little less than 5000 rows to be written to each of our two hbase tables (with only three character keys, there are bound to be a few collisions.)
chris@basqueseed:~$ flume shell -c localhost -e "exec config basqueseed 'asciisynth(10000,12)' 'jRubyDecorator(\"/usr/lib/flume/scripts/onetwo.rb\") jRubySink(\"/usr/lib/flume/scripts/hbase.rb\")'"
2011-02-26 19:42:32,992 [main] INFO conf.FlumeConfiguration: Loading configurations from /etc/flume/conf
Using default admin port: 35873
Using default report port: 45678
Connecting to Flume master localhost:35873:45678...
2011-02-26 19:42:33,143 [main] INFO util.AdminRPCThrift: Connected to master at localhost:35873
[id: 4] Execing command : config
Command succeeded
I left my node running from before. The master talked to it and reconfigured it, and here is the output:
2011-02-26 19:42:36,028 [SpawningLogicalNode basqueseed] INFO console.JLineStdinSource: Closing stdin source
2011-02-26 19:42:36,070 [logicalNode basqueseed-21] INFO console.JLineStdinSource: Closing stdin source
2011-02-26 19:42:36,070 [logicalNode basqueseed-21] WARN console.JLineStdinSource: Double close on Stdin Char Source
2011-02-26 19:42:36,070 [logicalNode basqueseed-21] WARN debug.LazyOpenDecorator: Closing a lazy sink that was not logically opened
2011-02-26 19:42:36,071 [logicalNode basqueseed-21] INFO agent.LogicalNode: basqueseed: Connector stopped: LazyOpenSource | LazyOpenDecorator
2011-02-26 19:42:36,071 [SpawningLogicalNode basqueseed] INFO agent.LogicalNode: Node config successfully set to com.cloudera.flume.conf.FlumeConfigData@179567c
2011-02-26 19:42:36,072 [logicalNode basqueseed-37] INFO agent.LogicalNode: Connector started: LazyOpenSource | LazyOpenDecorator
2011-02-26 19:42:36,072 [logicalNode basqueseed-37] INFO debug.SynthSource: Resetting count and seed; openingSynthSource(10000, 12 )
2011-02-26 19:42:59,854 [logicalNode basqueseed-37] INFO debug.SynthSource: closing SynthSource(10000, 12 )
2011-02-26 19:42:59,856 [logicalNode basqueseed-37] INFO agent.LogicalNode: basqueseed: Connector stopped: LazyOpenSource | LazyOpenDecorator
So, it took about 30 seconds to generate the 10000 events (you can tell from the timestamps on when it opened and closed the synthScource.) I am not sure if that translates to how long it took to write stuff to the database - I think that a study of the performance of jRuby scripts in flume is warrented, but out of the scope of this particular blog entry.

Lets look at the database. There are too many entries to list, but we can get some summary information:
>> count 'one'
Current count: 1000, row: %K|

Current count: 2000, row: A p
Current count: 3000, row: \|\
Current count: 4000, row: wW
4278 row(s) in 1.3820 seconds

>> count 'two'
Current count: 1000, row: $#r

Current count: 2000, row: @ H
Current count: 3000, row: \X
Current count: 4000, row: w^H

4271 row(s) in 1.0370 seconds

>> scan 'one', {LIMIT=>5}
ROW                   COLUMN+CELL

                      column=aaa:0, timestamp=1298770979440, value=   
                      column=aaa:1, timestamp=1298770979440, value=d R
                      column=aaa:2, timestamp=1298770979440, value= ib
                      column=aaa:3, timestamp=1298770979440, value=  z
   !                  column=aaa:0, timestamp=1298770973275, value=  !
   !                  column=aaa:1, timestamp=1298770973275, value=sY7
   !                  column=aaa:2, timestamp=1298770973275, value=  J
   !                  column=aaa:3, timestamp=1298770973275, value= mN
   "                  column=aaa:0, timestamp=1298770976072, value=  "
   "                  column=aaa:1, timestamp=1298770976072, value=w
   "                  column=aaa:2, timestamp=1298770976072, value=Y,X
   "                  column=aaa:3, timestamp=1298770976072, value= }O
   #                  column=aaa:0, timestamp=1298770970896, value=  #
   #                  column=aaa:1, timestamp=1298770970896, value=?"

   #                  column=aaa:2, timestamp=1298770970896, value=[!_
   #                  column=aaa:3, timestamp=1298770970896, value=Q I
   $                  column=aaa:0, timestamp=1298770979688, value=  $
   $                  column=aaa:1, timestamp=1298770979688, value=! i
   $                  column=aaa:2, timestamp=1298770979688, value=  (
   $                  column=aaa:3, timestamp=1298770979688, value=:N!
5 row(s) in 0.2160 seconds

>> scan 'two', {LIMIT=>5}
ROW                   COLUMN+CELL

                      column=aaa:0, timestamp=1298770979742, value=  
                      column=aaa:1, timestamp=1298770979742, value=( 1
                      column=aaa:2, timestamp=1298770979742, value= oV
                      column=aaa:3, timestamp=1298770979742, value= }
   !                  column=aaa:0, timestamp=1298770978751, value=  !
   !                  column=aaa:1, timestamp=1298770978751, value=c7T
   !                  column=aaa:2, timestamp=1298770978751, value=7F
   !                  column=aaa:3, timestamp=1298770978751, value=5C2
   "                  column=aaa:0, timestamp=1298770969476, value=  "
   "                  column=aaa:1, timestamp=1298770969476, value=v d
   "                  column=aaa:2, timestamp=1298770969476, value=E@l
   "                  column=aaa:3, timestamp=1298770969476, value=Z M
   #                  column=aaa:0, timestamp=1298770979759, value=  #
   #                  column=aaa:1, timestamp=1298770979759, value={xd
   #                  column=aaa:2, timestamp=1298770979759, value=  z
   #                  column=aaa:3, timestamp=1298770979759, value=?)3
   $                  column=aaa:0, timestamp=1298770972873, value=  $
   $                  column=aaa:1, timestamp=1298770972873, value=? L
   $                  column=aaa:2, timestamp=1298770972873, value= f*
   $                  column=aaa:3, timestamp=1298770972873, value= v[
5 row(s) in 0.1530 seconds
 
It seems to have worked as we expected that it should.

Conclusions?

In review, we built a decorator that did some on-the-fly data processing. Not very exciting really - you could probably do much of the same thing with existing flume decorators, but it would not be pretty. The hysteresis behavior that forked the one stream of data into two separate tables might be something of a challenge. While this particular decorator was not very interesting, I am sure that there are many interesting things you could do in a jRuby decorator.

We also build a somewhat sophisticated jRubySink. It extends the usefulness of an already written HBase sink by dynamically creating more database connections as dictated by the events on the flume. I feel a little bit like Sun Wukong stealing the peaches of immortality on this one - some of the deep magic that is happens in the heart of flume is accessible to us mere jRuby code-monkeys!

I think that there is more potential here, too. I hope that I have given enough examples to spark off some ideas in your head.

3 comments:

  1. Clunkiness solved: com.cloudera.flume.core.CompositeSink constructs sinks from a dataflow specification and a context object. I need to make a minor modification to the jRubySink to pass the configuration object through, but you can probably pass in a 'null' as the configuration for many sinks.

    ReplyDelete
  2. From where can I download class/jar files for com.infochimps.* ?

    ReplyDelete
  3. Would you be interested in having this republished for a community of advanced developers interested in Big Data over at DZone.com? Let me know what you think: egenesky@dzone.com

    ReplyDelete