Sunday, December 16, 2012

Introducing cfndsl

Last posting I ranted a little about what I like and don't like about [AWS CloudFormation](http://aws.amazon.com/cloudformation/). This time, I am going to do something about it.

AWS CloudFormation Templates

If you are using AWS for anything substantial and you are not using CloudFormation, you should think about it. It gives you a place to launch a whole bunch of AWS resources in a well defined and repeatable fashion. In my mind, there is really only one drawback: the template language is awful.

What do I mean?

Here is a template I have been playing with this afternoon:

{
   "AWSTemplateFormatVersion" : "2010-09-09"
   "Parameters" : {
      "BucketName" : {
         "Type" : "String",
         "Default" : "MyBucket",
         "Description" : "Name of the bucket to grant read access to."
      },
      "Folder" : {
         "Type" : "String",
         "Default" : "myFolder",
         "MinLength" : 2,
         "Description" : "Name of a folder in the bucket to grant read access to."
      }
   },
   "Resources" : {
      "ReadBucketIProfile" : {
         "Type" : "AWS::IAM::InstanceProfile",
         "Properties" : {
            "Roles" : [{"Ref" : "ReadBucketRole"}],
            "Path" : "/"
         }
      },
      "ReadBucketRole" : {
         "Type" : "AWS::IAM::Role",
         "Properties" : {
            "AssumeRolePolicyDocument" : {
               "Statement" : [
                  {
                     "Effect" : "Allow",
                     "Action" : ["sts:AssumeRole"                     ],
                     "Principal" : { "Service" : ["ec2.amazonaws.com"]
                     }
                  }
               ]
            },
            "Policies" : [
               {
                  "PolicyDocument" : {
                     "Statement" : [
                        {
                           "Effect" : "Allow",
                           "Resource" : {
                              "Fn::Join" : [
                                 "",
                                 [
                                    "arn:aws:s3:::",
                                    {
                                       "Ref" : "BucketName"
                                    },
                                    "/",
                                    {
                                       "Ref" : "Folder"
                                    },
                                    "/*"
                                 ]
                              ]
                           },
                           "Action" : [
                              "s3:GetObject",
                              "s3:GetObjectVersion"
                           ]
                        }
                     ]
                  },
                  "PolicyName" : "readBucket"
               }
            ],
            "Path" : "/"
         }
      }
   }
}

As you can see, the language is ghastly. This particular template sets up an IAM Role that allows the machines that it is associated with have read access to the files stored in a particular folder in a particular bucket in S3. It has two input parameters, and it creates two resources. The input parameters allow you to tell it what bucket and folder you want the created role to access. The resources are an IAM Role and and instance profile. The role contains some policy definitions, allowing access to an S3 bucket, while the profile object gathers roles together to associate with EC2 instances. This probably is not a template that is going to be of much use to you, or anyone else, but when you run it, it does create resources in AWS (I think that they are all free!)

Anyway, here goes launching a stack with command line tools:

 
chris@frankentstein:~$ cfn-create-stack TestStack --template-file rr.template -p "BucketName=deathbydatadata;Folder=cfn" -n arn:aws:sns:us-east-1:99999999999:deathbydatadata
arn:aws:cloudformation:us-east-1:999999999999:stack/TestStack/e9dae100-47d8-11e2-b6f5-5081c366858d

One thing I have learned about setting up CFN stacks is that it is really useful to set up an SNS topic for it to talk to using the "-n arn:aws:sns:..." notation, especially when you are writing a new stack. CloudFormation will send very detailed notifications about what is going on, and sometimes this is about the only way to diagnose what went wrong when a stack folds on creation because of some failure to create a resource. You can also of course watch the progression of stack creation through the events tab on the cloudformation web panel, or you can periodically ask for updates from the api:

chris@frankentstein:~$ cfn-describe-stack-events -s TestStack
STACK_EVENT  TestStack  TestStack           AWS::CloudFormation::Stack  2012-12-16T23:35:47Z  CREATE_COMPLETE     
STACK_EVENT  TestStack  ReadBucketIProfile  AWS::IAM::InstanceProfile   2012-12-16T23:35:46Z  CREATE_COMPLETE     
STACK_EVENT  TestStack  ReadBucketIProfile  AWS::IAM::InstanceProfile   2012-12-16T23:33:26Z  CREATE_IN_PROGRESS  
STACK_EVENT  TestStack  ReadBucketRole      AWS::IAM::Role              2012-12-16T23:33:26Z  CREATE_COMPLETE     
STACK_EVENT  TestStack  ReadBucketRole      AWS::IAM::Role              2012-12-16T23:33:13Z  CREATE_IN_PROGRESS  
STACK_EVENT  TestStack  TestStack           AWS::CloudFormation::Stack  2012-12-16T23:32:53Z  CREATE_IN_PROGRESS  User Initiated

Yay, it looks as if the stack worked: (The CREATE_COMPLETE event for the AWS::CloudFormation::Stack at the top of the list is a good indicator of this). One of the nice things about parameterizing stacks is that you can often update the paramaters without destroying everything (although, sometimes updates can be pretty ugly in terms of rebuilding resources - read the docs about your resources carefully before you do it to a production-level system...) Here is an update, chagning the Folder parameter to "dsl" insdtead of "cfn":

chris@frankentstein:~$ cfn-update-stack TestStack --template-file rr.template -p "BucketName=deathbydatadata;Folder=dsl"
arn:aws:cloudformation:us-east-1:999999999999:stack/TestStack/e9dae100-47d8-11e2-b6f5-5081c366858d
chris@frankentstein:~$ cfn-describe-stack-events -s TestStack
STACK_EVENT  TestStack  TestStack           AWS::CloudFormation::Stack  2012-12-16T23:56:36Z  UPDATE_COMPLETE                      
STACK_EVENT  TestStack  TestStack           AWS::CloudFormation::Stack  2012-12-16T23:56:32Z  UPDATE_COMPLETE_CLEANUP_IN_PROGRESS  
STACK_EVENT  TestStack  ReadBucketRole      AWS::IAM::Role              2012-12-16T23:56:28Z  UPDATE_COMPLETE                      
STACK_EVENT  TestStack  ReadBucketRole      AWS::IAM::Role              2012-12-16T23:56:11Z  UPDATE_IN_PROGRESS                   
STACK_EVENT  TestStack  TestStack           AWS::CloudFormation::Stack  2012-12-16T23:55:58Z  UPDATE_IN_PROGRESS                   User Initiated

Note that in this case, all we needed to touch was the Role object, and CloudFormation was able to update it in place. Nifty. If I has machines associated with this role, they would now be able to read the "dsl" folder, but not the "cfn" folder of the deathbydatadata bucket. Outstanding!


A Snippet of CfnDsl

Now, go back to the template a minute. The list of parameters is not too bad, but the resources part is ugly. Even though there are only two resources present, I get lost looking at the resource definitions. The real culprit here is the "readBucket" policy object, and it is especially bad because we are building up an arn string out of its compnents. The template language has a tool that is sufficient for this kind of work in the form of the built in function Fn::Join. It works a lot like you would expect a join command to work if you have used JavaScript, Perl or Ruby - it builds a string by concatenating together an array of strings, interspersed by a separator string. Here it is in detail:

"Fn::Join" : [ "",
                 [ "arn:aws:s3:::",
                   { "Ref" : "BucketName" },
                   "/",
                   { "Ref" : "Folder" },
                   "/*"
                 ]
               ]
}

You know, the "Ref"s dont help a whole lot either when you are trying to read this thing. So, what does this same thing look like in cfndsl?

FnFormat("arn:aws:s3:::%0/%1/*", Ref("BucketName"), Ref("Folder") )

Ref("BucketName") is ultimately going to turn into a "Ref" style JSON object. What's up with the FnFormat? It will ultimately resolve to a string with instances of %0 replaced with the value of the first parameter after the format string, %1 replaced by the second after the format string, etc. AWS doesn't have one of those! Of course, it doesnt need it as you can do the same thing with Fn::Join. If you use FnFormat, the ruby DSL will take care of figuring out how to write it into Fn::Join notation. So far, FnFormat is the only extra function that I have written for the DSL. The AWS builtin functions are all available by their Amazon names, with the "::" removed. FnJoin(...) produces {"Fn::Join":,,,}.


A whole Template in CfnDsl

Ok, so now that you have had a taste of it, here is the whole read bucket template written in cfndsl:

CloudFormation {
  AWSTemplateFormatVersion "2010-09-09"
  Parameter("BucketName") {
    Type :String
    Default "MyBucket"
    Description "Name of the bucket to grant read access to."
  }

  Parameter("Folder") {
    Type :String
    Default "myFolder"
    MinLength 2
    Description "Name of a folder in the bucket to grant read access to."
  }
  
  Resource("ReadBucketRole") {
    Type "AWS::IAM::Role"
    Property( "AssumeRolePolicyDocument", {
                "Statement" => [ {
                                   "Effect" => "Allow",
                                   "Principal"=> 
                                   {
                                     "Service" => [ "ec2.amazonaws.com" ]
                                   },
                                   "Action" => [ "sts:AssumeRole" ]
                                 } ]
              })
    Property("Path", "/")
    Property("Policies", 
             [ 
              { "PolicyName"=> "readBucket",
                "PolicyDocument"=> 
                {
                  "Statement" => 
                  [ 
                   {
                     "Effect" => "Allow",
                     "Action" => ["s3:GetObject","s3:GetObjectVersion"],
                     "Resource" => FnFormat("arn:aws:s3:::%0/%1/*", 
                                            Ref("BucketName"),
                                            Ref("Folder") )
                   }
                  ]
                }
              }
             ]
             )
  }
  
  Resource( "ReadBucketIProfile") {
    Type "AWS::IAM::InstanceProfile"
    Property( "Path", "/")
    Property( "Roles", [ Ref("ReadBucketRole") ] )
  }
}

Easier to read? I think so, but it is probably a matter of opinion. I like that the resources are declared individually, rather than as a long list. Resource properties are usually pretty simple, so I declare them here by just giving the value as a second parameter to the Property constructor keyword - you could actually use the block form just as easily. 

I have defined special objects for handling most of the top level stuff in a template - the template itself, Parameters, Resources, Mappings, Outputs, Metadata, and Resource Properties. I also have in place a means for dealing with function calls, discussed previously. There is of course a whole lot more to  a template than these things, as many of the resources have complicated and dedicated data types used to specify their inner workings. While I eventually plan to capture some more of these into the same object notation, it is not always convenient to do so.  When a particular type structure has not been explicitly implemented in the dsl, template authors can always fall back on creating ruby hashes and arrays that parallel the JSON notation for the structure that they are creating, and the dsl will handle it appropriately.  


Running CfnDsl

How do your turn this ruby thing into something that AWS understands? Ah - simplicity itself (assuming that you have ruby 1.9). First, you need to get yourself set up with the cfndsl ruby gem

chris@frankentstein:~$ sudo gem install cfndsl
Fetching: cfndsl-0.0.4.gem (100%)
Successfully installed cfndsl-0.0.4
1 gem installed
Installing ri documentation for cfndsl-0.0.4...
Installing RDoc documentation for cfndsl-0.0.4...

Then you just run cfndsl on the ruby

chris@frankentstein:~$ cfndsl rr.rb 
{"AWSTemplateFormatVersion":"2010-09-09","Parameters":{"BucketName":{"Type":"String","Default":"MyBucket","Description":"Name of the bucket to grant read access to."},"Folder":{"Type":"String","Default":"myFolder","Description":"Name of a folder in the bucket to grant read access to.","MinLength":2}},"Resources":{"ReadBucketRole":{"Type":"AWS::IAM::Role","Properties":{"AssumeRolePolicyDocument":{"Statement":[{"Effect":"Allow","Principal":{"Service":["ec2.amazonaws.com"]},"Action":["sts:AssumeRole"]}]},"Path":"/","Policies":[{"PolicyName":"readBucket","PolicyDocument":{"Statement":[{"Effect":"Allow","Action":["s3:GetObject","s3:GetObjectVersion"],"Resource":{"Fn::Join":["",["arn:aws:s3:::",{"Ref":"BucketName"},"/",{"Ref":"Folder"},"/*"]]}}]}}]}},"ReadBucketIProfile":{"Type":"AWS::IAM::InstanceProfile","Properties":{"Path":"/","Roles":[{"Ref":"ReadBucketRole"}]}}}}

There it is, ready to build resources with! How do you build a stack with it? Well, so far I have just been redirecting the output of cfndsl into a text file and then running "cfn-create-stack" referencing the result. There may be better ways to hook these tools together.

It could be that these few improvements are enough to justify having a ruby dsl behind what is effectively a JSON dsl.  As I said, before I believe that the dsl representation of my template is a little nicer. However, ruby allows some other things that we have not explored yet. Not the least of these is comments - sometimes a small (or a large) amount of comments in a piece of code keeps it maintainable. But of course, ruby lets you do much more, but I will save it for next time.

Saturday, December 15, 2012

Amazon CloudFormation (DSL anyone?)


It has been quite a while since I have posted anything to a blog, but I thought I would get started writing again.

I wanted to talk a little about Amazon CloudFormation. (And plug something that I wrote recently.) I have worked with it for several months now, and I have to say that overall I am quite impressed.
that this particular system 
In the past, I worked on a product that used fog and chef and some other ruby scripting glue to deploy resources in AWS. Overall, the system worked quite well, and I was able to deploy some fairly complicated systems with very little effort, but there were a few drawbacks.
One of the most significant things about this system (that's right, I'm not naming it...) was it was effectively using the database on the chef Master as a kind of registry of cluster available resources. In one way, this was a very powerful move, in that it allowed clustered resources to be aware of their surroundings in a way that was previously difficult to do. For instance, if I was setting up a Hadoop cluster in this manner, I could tell the machine that was running the namenode that it was part of a hadoop cluster, and it could use an api call to the chef master to figure out where all of the datanodes were, and vice versa, allowing both types of machines to set up the correct configuration. So what is not to love? Well, the chef master server was something of a point of failure for the hadoop cluster. Well, really for a hadoop cluster, it is not all that much of a problem, as the chef master only has to be around when significant configuration changes occur - like adding more datanodes to the cluster. The system in question was not using Autoscaling groups, so changes in the configuration pretty much only happened during times of manual intervention, so it was not really all that much of a burden that there was a dependency on the chef master.
Life is different when you are putting together an autoscaling group of webservers. The idea is that you tell AWS how to bootstrap a machine to be a web server. You also tell it that machines in such a group need to have http requests coming to a particular address be load balanced across the set. Then you tell it (the autoscaling group) that when the load on your website exceeds a certain threshold, that it should spin up some more servers, or if the traffic falls below some other level, it is ok to kill off a few servers. In this framework, automatic configuration of cloud resources can become a critical operation - if you can't manage to get them up and running to meet rising demand for requests, your web service could potentially fail to meet SLAs. If you are using chef (or puppet for that matter) as a part of the bootstrap process, a failure on the master could precipitate a larger system failure. There are of course ways to mitigate this risk, but lets move forward - even though both of these systems are fantastic at configuring what goes inside a virtual box, neither of them (to my imperfect knowledge) is particularly good about making sure that external systems are configured.
Enter CloudFormation. CloudFormation gives you an entry point into the AWS ecology where you can orchestrate the provisioning of a suite of AWS resources. AWS is api driven, and just about all of the api is accessible to you through CloudFormation templates - including autoscaling groups, which are not supported on the web console. There is something magic about being able to set up a data store, a dns record, a load balanced set of servers that will automatically scale with load, and other resources from a single call to AWS. I can not say enough good things about the service, and I am sure that they have all been said before, so I will stop trying to sell it. I have had a lot of success with it so far, and I plan to use it again in future deployments.
One of the ways that CloudFormation wins over my previous approach of using chef master as the registry for clustered systems is that you you can embed configuration information into the script that builds the cluster, and the machines in the cluster are able to get that data out at configuration time (this is what the aws bootstrapping tool cfn-init does when it runs) or you can set up a process on the machine that polls for chagnes to this (this is what cfn-hup does, I believe). These are roughly analogous to the the an initial chef run and a periodic update call to chef-client. In actuality, by putting the cluster configuration information into a CloudFormation template, I have transferred the potential failure from a chef master to some nebulous parts of the Amazon API, but I am pretty comfortable with the redundancy that I get from multiple availability zones, so I think that this is still probably a win. Of course, there is no reason why you can't engineer a solution that uses both, so this is not by any means an argument to use cloudFormation to replace chef functionality.
However, I have to say that I have a couple of complaints about CloudFormation. The first one is that debug cycles for cloudformations stacks are long. It can take several minutes for resources to get created. In some cases, you get warned early that there are problems with your template, but if there are problems getting all of the resources that you install on EC2 instances working, you may have to wait 10 minutes to find out if something is wrong, and then have to tear the whole thing down and start again (just a single API call, but a long wait.) I suppose that this is the nature of the beast, but it gets infuriating at times. (I remind myself at these times that if I were waiting for an IT staff to set up my cluster of 50 servers, I would be waiting for a week, not for half an hour, and I would have to pay them even when the machines were not running...)
My real complaint is that the language used for the templates is ghastly. From Amazon's architectural point of view, it makes a lot of sense to represent this language as a giant JSON object - everything else about their API is in JSON and there is no good reason to make an exception for this. However, from my point of view as a human (well, half, anyway), JSON is a terrible language. I find the syntax for function calls confusing. I get lost in the long lists of things. In an especially large template file I find that I have trouble figuring out where one resource ends and the next one begins. You can do a certain amount of organization with white space: you can indent objects - that helps some. If you could include comments, I suspect that many of my complaints would evaporate entirely, as I could then add the kind of commenting structure that I am used to in regular programming languages to help me navigate and to explain particularly tricky parts of the code, etc.
As I was plowing through a particularly complicated CloudFormation template the other day, it struck me that there was a real lack of tools to help you build CloudFormation templates. There are a few - Amazon has a tool called CloudFormer. Basically, you fire up an ec2 instance with cloudformer on it, and it helps you piece together a template from your existing resources. This is somewhat helpful, but it means that you have to set up the thing that you want first. If you need to modify the result, you have to wade into the JSON yourself. There were a couple of other things out there too, AWS gives you command line tools for manipulating your stacks, Fog has some support for doing the same, etc. I didn't really see anything that substantially changed the drudgery creating and maintaining the awful JSON that comprises the template itself.
Just because the native version of a language is hard to use does not mean you should give up the tool. I hardly ever code in assembly these days - or even C, for that matter - we have computer programs to write our computer programs in those languages, and most people try to write in something that comes easier to them. Remember a couple of years ago, everyone was talking about building ruby domain specific languages (dsl's) for everything. Chef and Puppet both have a dsl hiding around around them, and they are quite useful. As I thought about this, I thought that surely someone has already put the two ideas together (that is ruby dsl and cloudFormation), but after a quick web search I found that I was wrong. Nobody had. 

I wrote one this week.
It is still pretty raw, but feel free to take a look at it on github https://github.com/howech/cfndsl, and rubygems.org (http://rubygems.org/gems/cfndsl). It probably requires ruby 1.9, and it probably has some horrible bugs in it, still.

Next posting (soon) will talk a little bit about cfndsl.

Saturday, February 26, 2011

More Tricks with Jruby and Flume (and HBase)

Last time, I set up Flume with a jRuby decorator and managed to process exactly one event with my jruby-flume script. Now we are going to dig in a little deeper and do something useful.

Lets say I have a stream of data that I need to capture and place into a database. I would like for my database to be updated at near-real time - by that I mean a couple of minutes of delay between seeing the data in the stream and having it show up on the database is acceptable. The data does not come to me in the format that I need to put it into the database, but for the most part, the transformations are simple. Also, the stream of data contains information that I ultimately need to live in several tables.

For my purposes at the moment HBase is the database. There is a sink plugin for flume that writes data into HBase called attr2HBase. There may be a better way to get it, but I checked out the flume-hbase branch from github and built the plugin from the plugins directory. It seems to work fine with the latest flume release (but the test cases seem to be broken.)

The hbase sink has many configuration options, but I am just going to use the most basic of them. If you set up a sink attr2HBase("some_table"), it will interpret events sent to it as data destined for the "some_table" table in hbase. In this configuration, it will only act on specially marked attributes in the event: The value of an attribute named "2hb_" will be interpreted as the row key, and any values for attributes of the form "2hb_colfam:desig" will be placed in the database under the "colfam:desig" heading on the row. Other values are ignored, and an error is generated if there is no row key. Most of this is ok for my purposes, but there are a couple of challenges. First, I have to somehow transform my data into "2hb_" form - I'll build a decorator to do this work below. The second problem is that the attr2HBase plugin will serve exactly one table per sink. What I really want is a sink that will write to a table based on criteria given within the event being processed - I have written just this in jRuby.

Ready? Keep your hands inside the vehicle at all times, and be warned that this is an active ride, you may get a little wet.

1. Install HBase. Again, I have relied on Cloudera's deb packages: "sudo apt-get hadoop-hbase-regionserver hadoop-hbase-master". You can also follow the apache's quick start guide. Once you have it installed, make sure everything works by creating a couple of tables we will need later. The details of getting HBase set up are beyond the scope of this blog, but the HBase documentation is excellent. Once you have it set up, use "hbase shell" to create a couple of tables:
chris@basqueseed:~$ hbase shell
HBase Shell; enter 'help<RETURN>' for list of supported commands.
Type "exit<RETURN>" to leave the HBase Shell
Version 0.90.1-CDH3B4, r, Mon Feb 21 12:20:55 PST 2011
>> create 'one','aaa'; create 'two', 'aaa'
0 row(s) in 1.1510 seconds
0 row(s) in 1.1110 seconds
2. Get the hbase plugin. I got it from the plugins section of the hbase branch of flume on github. Build it, put hbase-sink.jar in /usr/lib/flume/lib. You also need to ensure that hbase's main jar file is accessible on flume's classpath. I just sym-linked /usr/lib/hbase/hbase-0.90.1-CDH3B4.jar to /usr/lib/flume/lib/. Finally, you need to modify flume's configuration load the attr2hbaseSink plugin. While you are there, you may as well add the other two jRuby classes from the jruby-flume library. When you are done, your flume-site.xml should look something like this:
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
 <!--- ================================================= -->
 <!--- Flume Plugins =================================== -->
 <!--- ================================================= -->
 <property>
 <name>flume.plugin.classes</name>
  <value>com.infochimps.flume.jruby.JRubyDecorator, com.infochimps.flume.jruby.JRubySink, com.infochimps.flume.jruby.JRubySource, com.cloudera.flume.hbase.Attr2HBaseEventSink</value>

 <description>List of plugin classes to load.</description>
 </property>
</configuration>
3. We need to build a jRubyDecorator script that can take events with random ascii data and turn it into events that the Attr2HBaseEventSink can use. This is not a lot different from the "reverse.rb" script that we set up last time.
# onetwo.rb --- jRubyDecorator script

require 'java'
java_import 'com.cloudera.flume.core.EventSinkDecorator'
java_import 'com.cloudera.flume.core.Event'
java_import 'com.cloudera.flume.core.EventImpl'
class OneTwoDecorator < EventSinkDecorator
  def table
    (@one = !@one) ? "one" : "two"
  end
  def append(e)
    attrs = java::util::HashMap.new e.getAttrs
    attrs["table"] = table.to_java_bytes
    values = String.from_java_bytes( e.getBody ).scan(/.../)
    values.each_index { |i| attrs["2hb_aaa:#{i}"] = values[i].to_java_bytes }
    attrs["2hb_"] = values[0].to_java_bytes if values[0]
    super EventImpl.new( e.getBody, e.getTimestamp, e.getPriority, e.getNanos,
e.getHost, attrs )
  end
end
OneTwoDecorator.new(nil)
Last time, our decorator just changed the body field. This time, we are altering the attributes. The first thing it does is to create a new, writable copy of the attribute hash attached to the event. We then add a couple of attributes to the hash - something that we need to be careful with here is that flume will later expect this to be a HashMap with strings as keys and byte arrays for values - so here we call to_java_bytes on all of the values we insert. We could probably write a wrapper class around our attributes to ensure that this restriction is maintained.

We add three types of things to the attributes. The "table" method will alternately return the strings 'one' and 'two', so we will be tagging alternate events that pass through the decorator with {table:one} and {table:two}. Next, we take the body and split it up into three character groups, and create tags like {'2hb_aaa:1':xxx}. If these make it to the attr2HBase event sink, the will be interpreted as column data to add to an HBase table. Finally, if there were at least one set of three letter, we add a tag like {2hb_:xxx}. This will be interpreted by attr2HBase as a row key.

Lets test this part to make sure everything is working right.
chris@basqueseed:~$ flume shell -c localhost
2011-02-26 18:44:52,268 [main] INFO conf.FlumeConfiguration: Loading configurations from /etc/flume/conf
Using default admin port: 35873
Using default report port: 45678
Connecting to Flume master localhost:35873:45678...
2011-02-26 18:44:52,466 [main] INFO util.AdminRPCThrift: Connected to master at localhost:35873
==================================================
FlumeShell v0.9.3-CDH3B4
Copyright (c) Cloudera 2010, All Rights Reserved
==================================================
Type a command to execute (hint: many commands
only work when you are connected to a master node)


You may connect to a master node by typing:
    connect host[:adminport=35873[:reportport=45678]]


[flume localhost:35873:45678] exec config basquseed console 'jRubyDecorator("/usr/lib/flume/scripts/onetwo.rb") console'
[id: 2] Execing command : config
Command succeeded
[flume localhost:35873:45678] quit
Disconnected!
Now check to see if it does what we want:
chris@basqueseed:~$ flume node_nowatch
2011-02-26 18:48:43,835 [main] INFO agent.FlumeNode: Flume 0.9.3-CDH3B4
2011-02-26 18:48:43,836 [main] INFO agent.FlumeNode:  rev 822c62f0c13ab76921e96dd92e19f68007dbcbe2
2011-02-26 18:48:43,836 [main] INFO agent.FlumeNode: Compiled  on Mon Feb 21 13:01:39 PST 2011
...
2011-02-26 18:49:06,818 [SpawningLogicalNode basqueseed] INFO agent.LogicalNode: Node config successfully set to com.cloudera.flume.conf.FlumeConfigData@156d7c8
2011-02-26 18:49:06,825 [logicalNode basqueseed-20] INFO agent.LogicalNode: Connector started: LazyOpenSource | LazyOpenDecorator
2011-02-26 18:49:06,825 [logicalNode basqueseed-20] INFO console.JLineStdinSource: Opening stdin source
aaabbbccc 
2011-02-26 18:50:31,719 [logicalNode basqueseed-20] INFO debug.ConsoleEventSink: ConsoleEventSink( debug ) opened
basqueseed [INFO Sat Feb 26 18:50:31 CST 2011] { 2hb_ : aaa } { 2hb_aaa:0 : aaa } { 2hb_aaa:1 : bbb } { 2hb_aaa:2 : ccc } { table : one } 
aaabbbccc
cccaaabbb  
basqueseed [INFO Sat Feb 26 18:50:37 CST 2011] { 2hb_ : ccc } { 2hb_aaa:0 : ccc } { 2hb_aaa:1 : aaa } { 2hb_aaa:2 : bbb } { table : two } cccaaabbb
This looks good: we have row keys marked with "2hb_", column data market with "2hb_aaa:x", and we are alternately adding different table tags.

4. Now, we are going to build a jRubySink script to put our data into HBase. We already have an existing sink that will put data into a particular table: using attr2HBase("table") does that just fine. The trouble with it is that you need a separate sink for each table. If you dig around in the Flume source code, you will find a sink called com.cloudera.flume.handlers.hdfs.EscapedCustomDfsSink. This class does the bucketing magic for the collectorSink. It builds a path to a file to write to from data in the event, and it maintains a HashMap of sinks associated with these files. If if encounters a file name it has not seen before, it dynamically generates a new sink for the file and sends the event to it. We are going to do the same thing in jRuby, only with hbase tables instead of files. Here is the script:
# hbase.rb --- jRubySink script
# This script dynamically creates Attr2HBaseEventSink instances
# based on the value of an event's "table" attribute.
require 'java'
java_import 'com.cloudera.flume.core.Event'
java_import 'com.cloudera.flume.core.EventImpl'
java_import 'com.infochimps.flume.jruby.JRubySink'
java_import 'com.cloudera.flume.hbase.Attr2HBaseEventSink'

# Define a class that extends the EventSink.Base class
class HBase < JRubySink
  def initialize
    super
    # Make sure that we have an available empty hash for storing
    # connections to tables.
    @tables = {}
  end

  def get_table( table )
    unless @tables[table]
      # Create a new Attr2HBaseEventSink instance
      @tables[table] = Attr2HBaseEventSink.new(table,"",false,"","","2hb_",0,true)
      @tables[table].open
    end
    @tables[table]
  end

  def append(e)
    table = String.from_java_bytes e.getAttrs[ "table" ]
    throw new IOException( "No table!" ) unless table
    sink = get_table(table)
    throw new IOException( "No sink!" ) unless sink
    sink.append(e)
  end

  def close
    # Close any of the dynamic sinks that we opened
    @tables.each_value do |sink|
      sink.close
    end
    # Forget about the old sinks - create new ones on demand
    @tables={}
    super
  end
end

#Create an instance and return it to the eval function back in java land.
HBase.new
The get_table method is where the magic happens. One kind of clunky thing about this is that I am calling the java constructor for the sink. Flume's configuration language offers friendlier ways to construct collectors - I just have not figured out how to interface to it from jRuby. Basically this will open up connections to hbase on demand and leave them open indefinitely. It is probably a good idea to put some kind of roll() decorator on a stream that uses this to periodically close connections, which will eliminate ones that are not active (active ones will reopen automatically as they are needed...).

Once again, lets check to see if it works - first set up a dataflow from the console through the onetwo.rb decorator and into the hbase.rb sink:
chris@basqueseed:~$ flume shell -c localhost -e "exec config basqueseed console 'jRubyDecorator(\"/usr/lib/flume/scripts/onetwo.rb\") jRubySink(\"/usr/lib/flume/scripts/hbase.rb\")'"
2011-02-26 19:27:46,488 [main] INFO conf.FlumeConfiguration: Loading configurations from /etc/flume/conf
Using default admin port: 35873
Using default report port: 45678
Connecting to Flume master localhost:35873:45678...
2011-02-26 19:27:46,680 [main] INFO util.AdminRPCThrift: Connected to master at localhost:35873
[id: 3] Execing command : config
Command succeeded
Next, check the node
chris@basqueseed:~$ flume node_nowatch
2011-02-26 19:30:41,015 [main] INFO agent.FlumeNode: Flume 0.9.3-CDH3B4
2011-02-26 19:30:41,017 [main] INFO agent.FlumeNode:  rev 822c62f0c13ab76921e96dd92e19f68007dbcbe2
...
2011-02-26 19:30:50,480 [logicalNode basqueseed-16] INFO agent.LogicalNode: basqueseed: Connector stopped: LazyOpenSource | LazyOpenDecorator
2011-02-26 19:30:50,688 [main] INFO agent.FlumeNode: Hadoop Security enabled: false
aaabbbccc
2011-02-26 19:30:59,033 [SpawningLogicalNode basqueseed] WARN debug.LazyOpenDecorator: Closing a lazy sink that was not logically opened
2011-02-26 19:30:59,033 [SpawningLogicalNode basqueseed] INFO agent.LogicalNode: Node config successfully set to com.cloudera.flume.conf.FlumeConfigData@63e563
2011-02-26 19:30:59,034 [logicalNode basqueseed-21] INFO agent.LogicalNode: Connector started: LazyOpenSource | LazyOpenDecorator
2011-02-26 19:30:59,034 [logicalNode basqueseed-21] INFO console.JLineStdinSource: Opening stdin source
aaabbbccc
cccbbbaaa
abcabcabc
defdefdef
Well, something might be happening, but if it is, it is not happening here. Lets go look at hbase:
chris@basqueseed:~$ hbase shell
HBase Shell; enter 'help<RETURN>' for list of supported commands.
Type "exit<RETURN>" to leave the HBase Shell
Version 0.90.1-CDH3B4, r, Mon Feb 21 12:20:55 PST 2011
>> scan 'one'
ROW                   COLUMN+CELL                                            

 aaa                  column=aaa:0, timestamp=1298770259108, value=aaa
 aaa                  column=aaa:1, timestamp=1298770259108, value=bbb
 aaa                  column=aaa:2, timestamp=1298770259108, value=ccc
 abc                  column=aaa:0, timestamp=1298770284298, value=abc
 abc                  column=aaa:1, timestamp=1298770284298, value=abc
 abc                  column=aaa:2, timestamp=1298770284298, value=abc
2 row(s) in 0.9200 seconds

>> scan 'two'
ROW                   COLUMN+CELL

 ccc                  column=aaa:0, timestamp=1298770266971, value=ccc
 ccc                  column=aaa:1, timestamp=1298770266971, value=bbb
 ccc                  column=aaa:2, timestamp=1298770266971, value=aaa
 def                  column=aaa:0, timestamp=1298770287205, value=def
 def                  column=aaa:1, timestamp=1298770287205, value=def
 def                  column=aaa:2, timestamp=1298770287205, value=def
2 row(s) in 0.1410 seconds
I don't know about you, but I think that this is pretty cool!

Lets try again with some more data this time we will use one of Flume's debugging sources - asciisynth, which we will set up to generate 10000 events, each with a random 12 character body. From this we should expect a little less than 5000 rows to be written to each of our two hbase tables (with only three character keys, there are bound to be a few collisions.)
chris@basqueseed:~$ flume shell -c localhost -e "exec config basqueseed 'asciisynth(10000,12)' 'jRubyDecorator(\"/usr/lib/flume/scripts/onetwo.rb\") jRubySink(\"/usr/lib/flume/scripts/hbase.rb\")'"
2011-02-26 19:42:32,992 [main] INFO conf.FlumeConfiguration: Loading configurations from /etc/flume/conf
Using default admin port: 35873
Using default report port: 45678
Connecting to Flume master localhost:35873:45678...
2011-02-26 19:42:33,143 [main] INFO util.AdminRPCThrift: Connected to master at localhost:35873
[id: 4] Execing command : config
Command succeeded
I left my node running from before. The master talked to it and reconfigured it, and here is the output:
2011-02-26 19:42:36,028 [SpawningLogicalNode basqueseed] INFO console.JLineStdinSource: Closing stdin source
2011-02-26 19:42:36,070 [logicalNode basqueseed-21] INFO console.JLineStdinSource: Closing stdin source
2011-02-26 19:42:36,070 [logicalNode basqueseed-21] WARN console.JLineStdinSource: Double close on Stdin Char Source
2011-02-26 19:42:36,070 [logicalNode basqueseed-21] WARN debug.LazyOpenDecorator: Closing a lazy sink that was not logically opened
2011-02-26 19:42:36,071 [logicalNode basqueseed-21] INFO agent.LogicalNode: basqueseed: Connector stopped: LazyOpenSource | LazyOpenDecorator
2011-02-26 19:42:36,071 [SpawningLogicalNode basqueseed] INFO agent.LogicalNode: Node config successfully set to com.cloudera.flume.conf.FlumeConfigData@179567c
2011-02-26 19:42:36,072 [logicalNode basqueseed-37] INFO agent.LogicalNode: Connector started: LazyOpenSource | LazyOpenDecorator
2011-02-26 19:42:36,072 [logicalNode basqueseed-37] INFO debug.SynthSource: Resetting count and seed; openingSynthSource(10000, 12 )
2011-02-26 19:42:59,854 [logicalNode basqueseed-37] INFO debug.SynthSource: closing SynthSource(10000, 12 )
2011-02-26 19:42:59,856 [logicalNode basqueseed-37] INFO agent.LogicalNode: basqueseed: Connector stopped: LazyOpenSource | LazyOpenDecorator
So, it took about 30 seconds to generate the 10000 events (you can tell from the timestamps on when it opened and closed the synthScource.) I am not sure if that translates to how long it took to write stuff to the database - I think that a study of the performance of jRuby scripts in flume is warrented, but out of the scope of this particular blog entry.

Lets look at the database. There are too many entries to list, but we can get some summary information:
>> count 'one'
Current count: 1000, row: %K|

Current count: 2000, row: A p
Current count: 3000, row: \|\
Current count: 4000, row: wW
4278 row(s) in 1.3820 seconds

>> count 'two'
Current count: 1000, row: $#r

Current count: 2000, row: @ H
Current count: 3000, row: \X
Current count: 4000, row: w^H

4271 row(s) in 1.0370 seconds

>> scan 'one', {LIMIT=>5}
ROW                   COLUMN+CELL

                      column=aaa:0, timestamp=1298770979440, value=   
                      column=aaa:1, timestamp=1298770979440, value=d R
                      column=aaa:2, timestamp=1298770979440, value= ib
                      column=aaa:3, timestamp=1298770979440, value=  z
   !                  column=aaa:0, timestamp=1298770973275, value=  !
   !                  column=aaa:1, timestamp=1298770973275, value=sY7
   !                  column=aaa:2, timestamp=1298770973275, value=  J
   !                  column=aaa:3, timestamp=1298770973275, value= mN
   "                  column=aaa:0, timestamp=1298770976072, value=  "
   "                  column=aaa:1, timestamp=1298770976072, value=w
   "                  column=aaa:2, timestamp=1298770976072, value=Y,X
   "                  column=aaa:3, timestamp=1298770976072, value= }O
   #                  column=aaa:0, timestamp=1298770970896, value=  #
   #                  column=aaa:1, timestamp=1298770970896, value=?"

   #                  column=aaa:2, timestamp=1298770970896, value=[!_
   #                  column=aaa:3, timestamp=1298770970896, value=Q I
   $                  column=aaa:0, timestamp=1298770979688, value=  $
   $                  column=aaa:1, timestamp=1298770979688, value=! i
   $                  column=aaa:2, timestamp=1298770979688, value=  (
   $                  column=aaa:3, timestamp=1298770979688, value=:N!
5 row(s) in 0.2160 seconds

>> scan 'two', {LIMIT=>5}
ROW                   COLUMN+CELL

                      column=aaa:0, timestamp=1298770979742, value=  
                      column=aaa:1, timestamp=1298770979742, value=( 1
                      column=aaa:2, timestamp=1298770979742, value= oV
                      column=aaa:3, timestamp=1298770979742, value= }
   !                  column=aaa:0, timestamp=1298770978751, value=  !
   !                  column=aaa:1, timestamp=1298770978751, value=c7T
   !                  column=aaa:2, timestamp=1298770978751, value=7F
   !                  column=aaa:3, timestamp=1298770978751, value=5C2
   "                  column=aaa:0, timestamp=1298770969476, value=  "
   "                  column=aaa:1, timestamp=1298770969476, value=v d
   "                  column=aaa:2, timestamp=1298770969476, value=E@l
   "                  column=aaa:3, timestamp=1298770969476, value=Z M
   #                  column=aaa:0, timestamp=1298770979759, value=  #
   #                  column=aaa:1, timestamp=1298770979759, value={xd
   #                  column=aaa:2, timestamp=1298770979759, value=  z
   #                  column=aaa:3, timestamp=1298770979759, value=?)3
   $                  column=aaa:0, timestamp=1298770972873, value=  $
   $                  column=aaa:1, timestamp=1298770972873, value=? L
   $                  column=aaa:2, timestamp=1298770972873, value= f*
   $                  column=aaa:3, timestamp=1298770972873, value= v[
5 row(s) in 0.1530 seconds
 
It seems to have worked as we expected that it should.

Conclusions?

In review, we built a decorator that did some on-the-fly data processing. Not very exciting really - you could probably do much of the same thing with existing flume decorators, but it would not be pretty. The hysteresis behavior that forked the one stream of data into two separate tables might be something of a challenge. While this particular decorator was not very interesting, I am sure that there are many interesting things you could do in a jRuby decorator.

We also build a somewhat sophisticated jRubySink. It extends the usefulness of an already written HBase sink by dynamically creating more database connections as dictated by the events on the flume. I feel a little bit like Sun Wukong stealing the peaches of immortality on this one - some of the deep magic that is happens in the heart of flume is accessible to us mere jRuby code-monkeys!

I think that there is more potential here, too. I hope that I have given enough examples to spark off some ideas in your head.

Wednesday, February 23, 2011

Simple Flume Decorators with JRuby

Flume is a framework for moving chunks of data around on your network. It's primary mission isto move log data from where it is generated (perhaps a web server) to someplace where it can actually be used   - like an HDFS file system where it can be crunched by Hadoop. Flume's design is very flexible - the final destination for your data could also be a database like HBase or Cassandra, a search index system like Elastic Search, another file system like an S3 bucket, or any of a myriad of other configurations. Flume will also go to some efforts to make sure that your data is delivered reliably - it includes some tunable reliability features out of the box.

The Flume User Guide does a good job of explaining how its component architecture works. You can configure data flows by chaining together systems of "nodes" - each node is a data moving unit - each has an input ("source") and an output ("sink"). Sources can conceivably be anything that produces data - flume can tail sets of files on disk, listen to network ports, periodically run programs, etc. Sinks are a little more interesting - they can write data to disk, push data into an network connection, or into a database. Even more interesting, sinks can be composites - you can fan data out to many other sinks, or set up conditional sinks where if data fails to be accepted by the first sink, it will instead be routed to a second sink.  Also, you can build "Decorators" that can modify the data as it moves down the path. Flume offers many sources, sinks, and decorators out of the box - but also gives you the ability to write your own through a Java-based plugin API.

Flume chops the data up into a series of "events". For instance, if you are using flume to tail a log file, every line written to the file gets turned into a flume event. Each event carries with it the body of the data that produced it, as well as some meta-data: the machine that it was collected on, a time-stamp showing when the data was collected, the event's priority, etc. You can even add your own key-value pairs to an event in its attributes. Flume sinks can store both the body data and the metadata of an event  or in some cases, use the metadata to help ensure that the data lands in the right place - like with the "collectorSink" file bucketing mechanism.

To me, and to some of the other primates I work with at Infochimps, decorators are especially interesting. In a decorator, you get to do some processing on your data as it flies from wherever it was produced to its final destination(s). Flume comes with a handful of basic decorators that will allow you to do some small scale processing of flume events. For instance, the "value" decorator lets you set a value in the metadata of an event. The out-of-the-box decorators are not quite sufficient to handle my processing demands. I wanted a little more flexibility, so I wrote (in less time than it took me to write this blog entry) a quick interface to jRuby.  Now I have access to my flume data in transit with a powerful scripting engine.

Enough with the intro - lets jump in. The following steps will lead you down the road to processing data on the fly through the flume with a jRuby script:

1. Install flume. Cloudera has good documentation on how to get it set up. I run Ubuntu, so I just added the cloudera apt  package repository to my apt sources, and used "apt-get" to install the packages flume, flume-node and flume-master.

2. Get jRuby. If you use apt-get, you will be getting a slightly out-of-date version, but it will do for the moment. The jRuby website has more details If you need it.

3. Get my jRubyPlugin. For it to work, you have to have it and jruby.jar in Flume's classpath. You can make custom adjustments to Flume's runtime environment, including Flume classpath changes in the flume-env.sh script in flume/bin.  The easy way is to just drop jruby-flume.jar in the /usr/lib/flume/lib directory (or wherever flume landed in your install process). Getting your jRuby envornment completely set up so that you can see jruby gems and stuff is going to involve making adjustments to your environment, but for now, just having the jruby.jar on the classpath will work. I just created a symbolic link to /usr/lib/jruby/lib/jruby.jar in /usr/lib/flume/lib.

( Aside: I don't know the full answer to getting everything jruby set up in an embedded mode. However, if you add the following to your flume-env.sh script, you will be at least part of the way there
export UOPTS="-Djruby.home=/usr/lib/jruby -Djruby.lib=/usr/lib/jruby/lib -Djruby.script=jruby"
)

4. You have to tell flume explicitly what classes to load as plugins when the nodes start up. To do this, create or edit "flume-site.xml" in the flume/conf directory. It should contain at least the following:
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
 <!--- ====================================== -->
 <!--- Flume Plugins ======================== -->
 <!--- ====================================== -->
 <property>
  <name>flume.plugin.classes</name>
  <value>com.infochimps.flume.jruby.JRubyDecorator</value>
  <description>List of plugin classes to load.</description>
 </property>
</configuration>
After you get this in place, restart your flume-master and flume-node. If everything went ok, the services will start up, and you can go to http://localhost:35871/masterext.jsp to see if the plugin loaded successfully. If you see "jRubyDecorator" listed under the decorators, you are in business.

5. Ok, now let's build a decorator that does something simple. Create a directory somewhere to keep ruby scripts for flume - I like /usr/lib/flume/scripts. The files in this directory need to be readable by the user that flume is running as. Also, if you are in a distributed world, scripts are going to have to be available both on the master and on the machine that will house the logical node that will run the script.

Here is a simple script. Put it in /usr/lib/flume/scripts/reverse.rb:
# reverse.rb  --- jRubyDecorator script
require 'java'
java_import 'com.cloudera.flume.core.EventSinkDecorator'
java_import 'com.cloudera.flume.core.Event'
java_import 'com.cloudera.flume.core.EventImpl'
 
class ReverseDecorator < EventSinkDecorator
  def append(e)
    body = String.from_java_bytes e.getBody
    super EventImpl.new( body.reverse.to_java_bytes, e.getTimestamp, e.getPriority, e.getNanos, e.getHost, e.getAttrs )
  end
end
ReverseDecorator.new(nil)
What does it do? Well, it defines a subclass of com.cloudera.flume.core.EventSinkDecorator which redefines the append method. Our special append method builds a new event from an appended event, except that the text of the "body" field is reversed. Not too much nonsense, but we do have to be a little careful with strings. Flume likes its data to be represented as arrays of bytes, but ruby would prefer to deal with strings as Strings, so I convert both ways: String.from_java_bytes() to get a string object, and the to_java_bytes() method on string-like objects to convert back. Hidden in there, is the ruby string method "reverse".

The last line of the append method shows off some of the power of jRuby. It creates a new instance of EventImpl and passes it off to EventSinkDecorator's implementation of append - basically letting the parent class handle all of the difficult work.

Finally, the last line of the script instantiates a new object of the (jRuby!) ReverseDecorator class and returns it to jRubyDecorator. jRubyDecorator is really a factory class for producing decorator instances. It passes off our stuff as a java object, and flume never suspects what has happened.

Does it work? Lets see:
chris@basqueseed:~$ flume shell -c localhost
2011-02-23 17:35:03,785 [main] INFO conf.FlumeConfiguration: Loading configurations from /etc/flume/conf
Using default admin port: 35873
Using default report port: 45678
Connecting to Flume master localhost:35873:45678...
2011-02-23 17:35:03,993 [main] INFO util.AdminRPCThrift: Connected to master at localhost:35873
==================================================
FlumeShell v0.9.3-CDH3B4
Copyright (c) Cloudera 2010, All Rights Reserved
==================================================
Type a command to execute (hint: many commands
only work when you are connected to a master node)

You may connect to a master node by typing:
    connect host[:adminport=35873[:reportport=45678]]

[flume localhost:35873:45678] exec config basqueseed console '{jRubyDecorator("/usr/lib/flume/scripts/reverse.rb")=>console}'
[id: 0] Execing command : config
Command succeeded
[flume localhost:35873:45678] quit
 So far, so good - the master node has decided that everything is kosher. By the way, be careful with the single and double quotes in flume shell commands. The flume shell is very picky about its input. If you have any structure to your sources or sinks, you must single quote the declaration. Lets now play with with a node:
chris@basqueseed:~$ sudo /etc/init.d/flume-node stop
[sudo] password for chris:
Stopping Flume node daemon (flume-node): stopping node

chris@basqueseed:~$ sudo -u flume flume node_nowatch
/2011-02-23 17:38:21,709 [main] INFO agent.FlumeNode: Flume 0.9.3-CDH3B4
2011-02-23 17:38:21,710 [main] INFO agent.FlumeNode:  rev 822c62f0c13ab76921e96dd92e19f68007dbcbe2
2011-02-23 17:38:21,710 [main] INFO agent.FlumeNode: Compiled  on Mon Feb 21 13:01:39 PST 2011
...{stuff deleted}...
2011-02-23 17:39:40,471 [logicalNode basqueseed-20] INFO console.JLineStdinSource: Opening stdin source
?was I tam a ti saW 

2011-02-23 17:39:45,720 [logicalNode basqueseed-20] INFO debug.ConsoleEventSink: ConsoleEventSink( debug ) opened
basqueseed [INFO Wed Feb 23 17:39:45 CST 2011] Was it a mat I saw?
Holy chute! It works!

I think that is enough for today. Next time, I'll try some more complicated scripts, deal with attributes and play some games with data flows.