In my last post I threw 100 randomly generated email addresses to an Azure event hub and it didn’t even budge. Now it’s time to process the data and store it in a setup that resembles haveibeenpwned.
With the events now in the event hub, I could create another cloud service which uses the .NET SDK to read the data. There are two ways to implement this, either with the EventHubReceiver class or with the EventProcessorHost. I’ll try these out in future post.
For now I wanted to use something else, HDInsight & Storm. HDInsight is Hadoop as PaaS. Storm is a computation system to process streams of data and, as advertised on their website, should be able to process millions of tuples per second. Sounds like a good way to handle the 100 million e-mail addresses I have waiting.
In order to use Storm you need to understand only a few concepts:
- Tuple: a piece of information that needs to be processed.
- Spout: reads from a stream and returns tuples
- Bolt: processes tuples and can forward or produce new tuples
- Topology: links spouts and bolts together
This is a very rudimentary explanation, but should be enough for this post.
In order to write and publish a topology in Visual Studio, you should have the Azure SDK and the HDInsight tools for Visual Studio. Let’s look at the different components we need.
The spout will be reading from our event hub. Microsoft has already written a spout which you can use. However it is written in Java. Storm is not bound to one language, a core feature and with HDInsight you can have hybrid topologies. Any work you have already invested in Storm can be reused. Java and C# spouts and bolts can even work together. Let’s look at the configuration that we need to get our spout up and running.
TopologyBuilder topologyBuilder = new TopologyBuilder("EmailProcessingTopology");
int partitionCount = Properties.Settings.Default.EventHubPartitionCount;
JavaComponentConstructor constructor = JavaComponentConstructor.CreateFromClojureExpr(
String.Format(@"(com.microsoft.eventhubs.spout.EventHubSpout. (com.microsoft.eventhubs.spout.EventHubSpoutConfig. " +
@"""{0}"" ""{1}"" ""{2}"" ""{3}"" {4} ""{5}""))",
Properties.Settings.Default.EventHubPolicyName,
Properties.Settings.Default.EventHubPolicyKey,
Properties.Settings.Default.EventHubNamespace,
Properties.Settings.Default.EventHubName,
partitionCount,
""));
topologyBuilder.SetJavaSpout(
"EventHubSpout",
constructor,
partitionCount); |
TopologyBuilder topologyBuilder = new TopologyBuilder("EmailProcessingTopology");
int partitionCount = Properties.Settings.Default.EventHubPartitionCount;
JavaComponentConstructor constructor = JavaComponentConstructor.CreateFromClojureExpr(
String.Format(@"(com.microsoft.eventhubs.spout.EventHubSpout. (com.microsoft.eventhubs.spout.EventHubSpoutConfig. " +
@"""{0}"" ""{1}"" ""{2}"" ""{3}"" {4} ""{5}""))",
Properties.Settings.Default.EventHubPolicyName,
Properties.Settings.Default.EventHubPolicyKey,
Properties.Settings.Default.EventHubNamespace,
Properties.Settings.Default.EventHubName,
partitionCount,
""));
topologyBuilder.SetJavaSpout(
"EventHubSpout",
constructor,
partitionCount);
This code can be found in the samples of the HDInsight team and is pretty straightforward. Create an eventhubspout and add it to the topology. The partitionCount indicates how many executors and tasks there should be and it’s suggested that this should match the amount of partitions of your event hub.
It gets more interesting in the first bolt. A bolt is a class that implements ISCPBolt. This interface has one method, Execute, which receives a tuple.
public void Execute(SCPTuple tuple)
{
string emailAddress = (string)tuple.GetValue(0);
if (!string.IsNullOrWhiteSpace(emailAddress))
{
JObject eventData = JObject.Parse(emailAddress);
try
{
var address = new System.Net.Mail.MailAddress((string)eventData["address"]);
var leakedIn = (string)eventData["leak"];
var breachedAccount = new BreachedAccount(address, leakedIn);
var retrieveOperation = TableOperation.Retrieve(breachedAccount.PartitionKey, breachedAccount.RowKey);
var retrievedResult = table.Execute(retrieveOperation);
var existingAccount = (Device)retrievedResult.Result;
if(existingAccount == null)
{
TableOperation insertOperation = TableOperation.Insert(new BreachedAccount(address, leakedIn));
table.Execute(insertOperation);
ctx.Emit(Constants.DEFAULT_STREAM_ID,new[]{tuple}, new Values { address.Address });
}
else
{
existingAccount.BreachedIn += breachedAccount.BreachedIn;
TableOperation insertOperation = TableOperation.Replace(existingAccount);
table.Execute(insertOperation);
}
this.ctx.Ack(tuple);
}
catch (Exception ex)
{
Context.Logger.Error(eventData.ToString());
Context.Logger.Error(ex.Message);
this.ctx.Fail(tuple);
}
}
else
{
Context.Logger.Info("empty address");
this.ctx.Ack(tuple);
}
} |
public void Execute(SCPTuple tuple)
{
string emailAddress = (string)tuple.GetValue(0);
if (!string.IsNullOrWhiteSpace(emailAddress))
{
JObject eventData = JObject.Parse(emailAddress);
try
{
var address = new System.Net.Mail.MailAddress((string)eventData["address"]);
var leakedIn = (string)eventData["leak"];
var breachedAccount = new BreachedAccount(address, leakedIn);
var retrieveOperation = TableOperation.Retrieve(breachedAccount.PartitionKey, breachedAccount.RowKey);
var retrievedResult = table.Execute(retrieveOperation);
var existingAccount = (Device)retrievedResult.Result;
if(existingAccount == null)
{
TableOperation insertOperation = TableOperation.Insert(new BreachedAccount(address, leakedIn));
table.Execute(insertOperation);
ctx.Emit(Constants.DEFAULT_STREAM_ID,new[]{tuple}, new Values { address.Address });
}
else
{
existingAccount.BreachedIn += breachedAccount.BreachedIn;
TableOperation insertOperation = TableOperation.Replace(existingAccount);
table.Execute(insertOperation);
}
this.ctx.Ack(tuple);
}
catch (Exception ex)
{
Context.Logger.Error(eventData.ToString());
Context.Logger.Error(ex.Message);
this.ctx.Fail(tuple);
}
}
else
{
Context.Logger.Info("empty address");
this.ctx.Ack(tuple);
}
}
The tuple is the JSON object which was send to the Azure event hub in my previous post. It contains the email address and the website which was breached. Table storage is queried to see if the email address is already present, if this is the case the entry is updated otherwise the new account is added to the system. Only new accounts are emitted for further processing.
The second bolt in the system will insert new accounts into a SQL database. Its execute method is quite different.
public void Execute(SCPTuple tuple)
{
// Check if the tuple is of type Tick Tuple
if (IsTickTuple(tuple))
{
if((DateTime.UtcNow - lastRun) >= TimeSpan.FromSeconds(batchIntervalInSec))
{
Context.Logger.Info("Time to purge!");
FinishBatch();
}
}
else
{
breachedAccounts.Add(tuple);
if (breachedAccounts.Count >= batchSize)
{
Context.Logger.Info("Max reached, time to purge!");
FinishBatch();
}
else
{
Context.Logger.Info("Not yet time to purge!");
}
}
} |
public void Execute(SCPTuple tuple)
{
// Check if the tuple is of type Tick Tuple
if (IsTickTuple(tuple))
{
if((DateTime.UtcNow - lastRun) >= TimeSpan.FromSeconds(batchIntervalInSec))
{
Context.Logger.Info("Time to purge!");
FinishBatch();
}
}
else
{
breachedAccounts.Add(tuple);
if (breachedAccounts.Count >= batchSize)
{
Context.Logger.Info("Max reached, time to purge!");
FinishBatch();
}
else
{
Context.Logger.Info("Not yet time to purge!");
}
}
}
This bolt does not process every tuple it receives immediately. Instead it keeps a list of tuples ready for processing when either a maximum count is reached or a specific amount of time has passed since the last purge. In the FinishBatch method a SqlBulk copy is performed to insert records in a SQL database.
private void FinishBatch()
{
lastRun = DateTime.UtcNow;
DataTable table = new DataTable();
table.Columns.Add("Email");
if (breachedAccounts.Any())
{
foreach (var emailAddress in breachedAccounts)
{
var row = table.NewRow();
row["Email"] = emailAddress.GetString(0);
table.Rows.Add(row);
}
try
{
using (SqlBulkCopy bulkCopy = new SqlBulkCopy(connection))
{
bulkCopy.DestinationTableName = "Emails";
bulkCopy.ColumnMappings.Add("Email", "Email");
bulkCopy.WriteToServer(table);
}
foreach (var emailAddress in breachedAccounts)
{
ctx.Ack(emailAddress);
}
}
catch (Exception ex)
{
foreach (var emailAddress in breachedAccounts)
{
ctx.Fail(emailAddress);
}
Context.Logger.Error(ex.Message);
}
finally
{
breachedAccounts.Clear();
}
}
} |
private void FinishBatch()
{
lastRun = DateTime.UtcNow;
DataTable table = new DataTable();
table.Columns.Add("Email");
if (breachedAccounts.Any())
{
foreach (var emailAddress in breachedAccounts)
{
var row = table.NewRow();
row["Email"] = emailAddress.GetString(0);
table.Rows.Add(row);
}
try
{
using (SqlBulkCopy bulkCopy = new SqlBulkCopy(connection))
{
bulkCopy.DestinationTableName = "Emails";
bulkCopy.ColumnMappings.Add("Email", "Email");
bulkCopy.WriteToServer(table);
}
foreach (var emailAddress in breachedAccounts)
{
ctx.Ack(emailAddress);
}
}
catch (Exception ex)
{
foreach (var emailAddress in breachedAccounts)
{
ctx.Fail(emailAddress);
}
Context.Logger.Error(ex.Message);
}
finally
{
breachedAccounts.Clear();
}
}
}
Both the table storage bolt and the SQL bulk copy bolt have lines with ctx.Ack, Fail or Emit. These are places where you can indicate whether processing of a tuple has succeeded, failed or you are emitting a new tuple to be processed further downstream. It also enables to replay tuples. If the SQL bulk copy fails the tuple will be processed again at a later time. The azure table spout expects your bolts to ack.
topologyBuilder.SetBolt(
"TableStorageBolt",
TableStorageBolt.Get,
new Dictionary<string, List>{
{Constants.DEFAULT_STREAM_ID, new List(){"address"}}
},
256,
true)
.DeclareCustomizedJavaSerializer(javaSerializerInfo)
.shuffleGrouping("EventHubSpout");
topologyBuilder.SetBolt(
"SQLBolt",
SQLStorageBolt.Get,
new Dictionary<string, List>(),
2,
true)
.shuffleGrouping("TableStorageBolt", Constants.DEFAULT_STREAM_ID)
.addConfigurations(new Dictionary<string,string>{{"topology.tick.tuple.freq.secs","1"}});
topologyBuilder.SetTopologyConfig(new Dictionary<string, string> {
{"topology.workers", partitionCount.ToString()}
}); |
topologyBuilder.SetBolt(
"TableStorageBolt",
TableStorageBolt.Get,
new Dictionary<string, List>{
{Constants.DEFAULT_STREAM_ID, new List(){"address"}}
},
256,
true)
.DeclareCustomizedJavaSerializer(javaSerializerInfo)
.shuffleGrouping("EventHubSpout");
topologyBuilder.SetBolt(
"SQLBolt",
SQLStorageBolt.Get,
new Dictionary<string, List>(),
2,
true)
.shuffleGrouping("TableStorageBolt", Constants.DEFAULT_STREAM_ID)
.addConfigurations(new Dictionary<string,string>{{"topology.tick.tuple.freq.secs","1"}});
topologyBuilder.SetTopologyConfig(new Dictionary<string, string> {
{"topology.workers", partitionCount.ToString()}
});
The two bolts also need to be added to the topology. The tableStorageBolt is configured with a name, a method which can be used to create instances and a description of its output. In this case I’m emitting the email address to the default stream. The parallelism hint is also configured and the boolean flag which indicates this bolt supports acking. Since this bolt sits behind the Java spout, we also need to configure how the data of the spout can be deserialized. The bolt is also configured to use shuffleGrouping, meaning that the load will be spread evenly among all instances.
The SQLBolt is configured accordingly, with the additional configuration to support time ticks.
After I created an HDInsight Storm cluster on Azure I was able to publish the topology from within Visual Studio. The cluster itself was created in 15 minutes.
It took quite some time to find the right parallelism for this topology but with the code I’ve posted I got these results after one hour:
Table storage is slow, the 2 SQL bolts are able to keep up with the 256 table storage bolts. After one hour my topology was able to process only 10 million records. Even though I was able to send all 100 million events to the event hub in under two hours, processing would take 5 times longer.
One of the causes for this is the test data. The partition and row key for the table storage is based on the email address.
class BreachedAccount : TableEntity
{
public BreachedAccount() { }
public BreachedAccount(System.Net.Mail.MailAddress emailAddress, string leakedIn)
{
this.PartitionKey = emailAddress.Host;
this.RowKey = emailAddress.Address;
}
public string BreachedIn { get; set; }
} |
class BreachedAccount : TableEntity
{
public BreachedAccount() { }
public BreachedAccount(System.Net.Mail.MailAddress emailAddress, string leakedIn)
{
this.PartitionKey = emailAddress.Host;
this.RowKey = emailAddress.Address;
}
public string BreachedIn { get; set; }
}
Yet none of the email addresses in my test data share the same partition key. If the data was more realistic I would be able to use batches. In the original Adobe data breach 115 million addresses were part of 130 different domains. Table storage allows you to send batches of 100 records to the same partition key, which should result in less processing time. I’ll regenerate the test data with the same characteristics, modify the table storage bolt and rerun the experiment at a later time, hoping to get a better result.
The original standard instance of SQL Server was scaled to an S3, which allowed me to stay just below the max throughput with batches of 1000 records.
At the moment I actually have the same performance as Troy’s setup but with quite a different approach. Storm and HDInsight were new to me and I enjoyed the programming model. The scale and possibilities it can give are enormous, yet every part of your topology needs quite some attention if you want to have a high throughput solution.
The .NET part of the SDK is not yet entirely complete, so you can’t just port Java snippets to C# and expect them to work. I also ran into some NotImplementedExceptions. Microsoft is still busy implementing the SDK and the team is actively adding examples and functionality.
Further reading:
Event Hubs Programming Guide
Storm Applied
Introduction to Apache Storm on HDInsight: Real-time analytics for Hadoop
HDInsight Storm Examples
Process events from Azure Event Hubs with Storm on HDInsight
Performance Tuning for HDInsight Storm and Microsoft Azure EventHub
Understanding the Parallelism of a Storm Topology