Copying files with Azure Data Factory

The goal of Azure Data Factory is to create a pipeline which gathers a lot of data sources and produces a reliable source of information which can be used by other applications. The pain of interfacing with every differnt type of datastore is abstracted away from every consuming application. You can have relational databases, flat files, whatever and create a pipeline which transforms and enriches the data.  In an update published the end of March it was announced that you can also copy files. I wanted to try this out and it proved to be a bit more cumbersome than I first imagined. Let’s take a look.

filecopyflowI want to create a very basic flow. Let’s say I have an application which is populating files in a folder and I now want to move the file into Azure blob storage. I can use the same use case as mentioned in my previous post, I’m placing data leaks in a folder and need them to be sent online for further processing.

After creating a data factory in your Azure account we’ll need the following components:

  • Two linked services
    • A connection to my on-premises folder
    • A connection to my blob storage account
  • Two datasets (also called tables)
    • A dataset containing info on where my data is stored on premise and how many times per day it can be fetched.
    • A dataset which has info on how and where to store the data in blob storage
  • One pipeline which contains an activity which connects the datasets.

The connection to the on premise file is handled by an application which you need to install on-premise.  By navigating to the linked services slice you can add a data gateway.

datagateway

To configure a gateway you only need to provide a name, you can then download the data gateway application and install it on premise. After installing the application you need to enter the key which can be viewed in the Azure portal.

newdatagateway

As far as the on-premise configuration, you are done. You do not need to configure any firewall ports but you can only install it once on a PC.

So far the wizards. Now we need to create the pipeline. After clicking on the “Author and deploy” tile. The browser navigates to an online editor.

authoranddeploy

You create linked services, datasets and pipelines by using JSON. When clicking on any of the menu options, you can select a template to get started.

editornew

 

As mentioned earlier I needed two linked services. You can create those via the new data store option. The first one I’ll create is the on-premise file data store.

{
    "name": "OnPremisesFileSystemLinkedService",
    "properties": {
        "host": "localhost",
        "gatewayName": "mydatagateway",
        "userId": "BennyM",
        "password": "**********",
        "type": "OnPremisesFileSystemLinkedService"
    }
}

The options you need to configure are:

  • host, which will be the name of the machine which contains the files/folder you need to connect to
  • gatewayName, which has to match the name of the gateway which we created earlier
  • a userid and password or encryptedcredentials to use to connect from the gateway to the target machine
  • type, which needs to be OnPremisesFileSystemLinkedService
  • name, which we will use later

The next one will be the connection to the Azure storage. You can get the template by clicking “New data store” and selecting “Azure storage”.

{
    "name": "StorageLinkedService",
    "properties": {
        "connectionString": "DefaultEndpointsProtocol=https;AccountName=bennymdatafactory;AccountKey=**********",
        "type": "AzureStorageLinkedService"
    }
}

The options you need to configure are:

  • name, which we will use later
  • connectionstring, which needs to match your connectionstring for Azure storage
  • type, which needs to be AzureStorageLinkedService

Next one, the first dataset: on-premises file.

{
    "name": "OnPremisesFile",
    "properties": {
        "location": {
            "type": "OnPremisesFileSystemLocation",
            "folderPath": "c:\\Temp",
            "linkedServiceName": "OnPremisesFileSystemLinkedService"
        },
        "availability": {
            "frequency": "Day",
            "interval": 1,
            "waitOnExternal": {
                "dataDelay": "00:10:00",
                "retryInterval": "00:01:00",
                "retryTimeout": "00:10:00",
                "maximumRetry": 3
            }
        }
    }
}

The options you need to configure:

  • again a name, which we’ll use later
  • type, which has to be OnPremisesFileSystemLocation
  • folderpath, where is the folder I want to sync. Note the double slashes.
  • linkedServiceName, this has to be the same value which we used earlier when we created the data store for the on-premises gateway.
  • availability, how many times will the on-premises file or folder by synchronized. What’s very important is the waitOnExternal. You have to configure this if the data is not produced by the data factory itself. In this case it’s an external source so I have to fill in some values.

Our next dataset is the Azure blob.

{
    "name": "AzureBlobDatasetTemplate",
    "properties": {
        "location": {
            "type": "AzureBlobLocation",
            "folderPath": "myblobcontainer",
            "linkedServiceName": "StorageLinkedService"
        },
        "availability": {
            "frequency": "Day",
            "interval": 1
        }
    }
}

Fairly easy to configure.

  • again a name
  • the type which has to be AzureBlobLocation
  • the folderPath wich will be the path inside my Azure blob storage account which was configured in the linked service
  • linkedServiceName, which has to match the name we used earlier.

Then the actual workflow, the pipeline.

{
    "name": "CopyFileToBlobPipeline",
    "properties": {
        "activities": [
            {
                "type": "CopyActivity",
                "transformation": {
                    "source": {
                        "type": "FileSystemSource"
                    },
                    "sink": {
                        "type": "BlobSink",
                        "writeBatchSize": 0,
                        "writeBatchTimeout": "00:00:00"
                    }
                },
                "inputs": [
                    {
                        "name": "OnPremisesFile"
                    }
                ],
                "outputs": [
                    {
                        "name": "AzureBlobDatasetTemplate"
                    }
                ],
                "policy": {
                    "timeout": "00:05:00",
                    "concurrency": 4
                },
                "name": "Ingress"
            }
        ],
        "start": "2015-06-28T00:00:00Z",
        "end": "2015-06-30T00:00:00Z"
    }
}

I will not go over every property. What’s important is that in the copy activity we tie our two datasets together. The start and end times indicate the time period our pipeline will be active. This is very important as I’ve not found any way you can pause a running pipeline. If for instance I indicated earlier that I want my folder to be copied every 15 minutes and indicate a start date in the pipline in the year 2010, the pipeline will do all historical runs! So be careful when you configure this as Azure Data Factory isn’t the cheapest service around.

Every time you create a linked service, dataset or pipeline you click deploy.  The editor will then validate if all the names you mention are actually correct. The editor is quite good, as it even detects errors in datatypes and gives you help and intellisense (control + space).

editorerror

editorhelp

And with all this deployed, the folder is being synchronized online.

copyresult

This is probably one of the most expensive file copy samples. In a next post I’ll investigate more features. One of the drawbacks of the current setup is that every time the sync runs all files are overwritten. All files are copied all the time as is.

Sources:

 

Using HDInsight & Storm to process 100 million events

bqezxjIn my last post I threw 100 randomly generated email addresses to an Azure event hub and it didn’t even budge. Now it’s time to process the data and store it in a setup that resembles haveibeenpwned.

With the events now in the event hub, I could create another cloud service which uses the .NET SDK to read the data. There are two ways to implement this, either with the EventHubReceiver class or with the EventProcessorHost. I’ll try these out in future post.

For now I wanted to use something else, HDInsight & Storm. HDInsight is Hadoop as PaaS. Storm is a computation system to process streams of data and, as advertised on their website, should be able to process millions of tuples per second. Sounds like a good way to handle the 100 million e-mail addresses I have waiting.

In order to use Storm you need to understand only a few concepts:

  • Tuple: a piece of information that needs to be processed.
  • Spout: reads from a stream and returns tuples
  • Bolt: processes tuples and can forward or produce new tuples
  • Topology: links spouts and bolts together

This is a very rudimentary explanation, but should be enough for this post.

In order to write and publish a topology in Visual Studio, you should have the Azure SDK and the HDInsight tools for Visual Studio. Let’s look at the different components we need.

The spout will be reading from our event hub. Microsoft has already written a spout which you can use. However it is written in Java. Storm is not bound to one language, a core feature and with HDInsight you can have hybrid topologies. Any work you have already invested in Storm can be reused. Java and C# spouts and bolts can even work together. Let’s look at the configuration that we need to get our spout up and running.

TopologyBuilder topologyBuilder = new TopologyBuilder("EmailProcessingTopology");           
 
int partitionCount = Properties.Settings.Default.EventHubPartitionCount;
 
JavaComponentConstructor constructor = JavaComponentConstructor.CreateFromClojureExpr(
    String.Format(@"(com.microsoft.eventhubs.spout.EventHubSpout. (com.microsoft.eventhubs.spout.EventHubSpoutConfig. " +
        @"""{0}"" ""{1}"" ""{2}"" ""{3}"" {4} ""{5}""))",
        Properties.Settings.Default.EventHubPolicyName,
        Properties.Settings.Default.EventHubPolicyKey,
        Properties.Settings.Default.EventHubNamespace,
        Properties.Settings.Default.EventHubName,
        partitionCount,
        "")); 
 
topologyBuilder.SetJavaSpout(
    "EventHubSpout",  
    constructor,      
    partitionCount);

This code can be found in the samples of the HDInsight team and is pretty straightforward. Create an eventhubspout and add it to the topology. The partitionCount indicates how many executors and tasks there should be and it’s suggested that this should match the amount of partitions of your event hub.

It gets more interesting in the first bolt. A bolt is a class that implements ISCPBolt. This interface has one method, Execute, which receives a tuple.

public void Execute(SCPTuple tuple)
{  
    string emailAddress = (string)tuple.GetValue(0);
 
    if (!string.IsNullOrWhiteSpace(emailAddress))
    {             
        JObject eventData = JObject.Parse(emailAddress);
        try
        {
            var address = new System.Net.Mail.MailAddress((string)eventData["address"]);
            var leakedIn = (string)eventData["leak"];
            var breachedAccount = new BreachedAccount(address, leakedIn);
            var retrieveOperation = TableOperation.Retrieve(breachedAccount.PartitionKey, breachedAccount.RowKey);
            var retrievedResult = table.Execute(retrieveOperation);
            var existingAccount = (Device)retrievedResult.Result;
            if(existingAccount == null)
            {
                TableOperation insertOperation = TableOperation.Insert(new BreachedAccount(address, leakedIn));
                table.Execute(insertOperation);
                ctx.Emit(Constants.DEFAULT_STREAM_ID,new[]{tuple}, new Values { address.Address });
            }
            else
            {
                existingAccount.BreachedIn += breachedAccount.BreachedIn;
                TableOperation insertOperation = TableOperation.Replace(existingAccount);
                table.Execute(insertOperation);
            }                    
            this.ctx.Ack(tuple);
        }
        catch (Exception ex)
        {
            Context.Logger.Error(eventData.ToString());
            Context.Logger.Error(ex.Message);
            this.ctx.Fail(tuple);
        }
    }
    else
    {
        Context.Logger.Info("empty address");
        this.ctx.Ack(tuple);
    }
}

The tuple is the JSON object which was send to the Azure event hub in my previous post. It contains the email address and the website which was breached. Table storage is queried to see if the email address is already present, if this is the case the entry is updated otherwise the new account is added to the system. Only new accounts are emitted for further processing.

The second bolt in the system will insert new accounts into a SQL database. Its execute method is quite different.

public void Execute(SCPTuple tuple)
{           
    // Check if the tuple is of type Tick Tuple
    if (IsTickTuple(tuple))
    {               
        if((DateTime.UtcNow - lastRun) >= TimeSpan.FromSeconds(batchIntervalInSec))
        {
            Context.Logger.Info("Time to purge!");
            FinishBatch();
        }                                
    }
    else
    {                
        breachedAccounts.Add(tuple);                
        if (breachedAccounts.Count >= batchSize)
        {
            Context.Logger.Info("Max reached, time to purge!");
            FinishBatch();
        }
        else
        {
            Context.Logger.Info("Not yet time to purge!");
        }
    }
}

This bolt does not process every tuple it receives immediately. Instead it keeps a list of tuples ready for processing when either a maximum count is reached or a specific amount of time has passed since the last purge. In the FinishBatch method a SqlBulk copy is performed to insert records in a SQL database.

private void FinishBatch()
{
    lastRun = DateTime.UtcNow;
    DataTable table = new DataTable();
    table.Columns.Add("Email");
    if (breachedAccounts.Any())
    {
        foreach (var emailAddress in breachedAccounts)
        {
            var row = table.NewRow();
            row["Email"] = emailAddress.GetString(0);
            table.Rows.Add(row);
        }
        try
        {
            using (SqlBulkCopy bulkCopy = new SqlBulkCopy(connection))
            {
                bulkCopy.DestinationTableName = "Emails";
                bulkCopy.ColumnMappings.Add("Email", "Email");
                bulkCopy.WriteToServer(table);
            }
            foreach (var emailAddress in breachedAccounts)
            {
                ctx.Ack(emailAddress);
            }
 
        }
        catch (Exception ex)
        {
            foreach (var emailAddress in breachedAccounts)
            {
                ctx.Fail(emailAddress);
            }
            Context.Logger.Error(ex.Message);
        }
        finally
        {
            breachedAccounts.Clear();
        }
    }
}

Both the table storage bolt and the SQL bulk copy bolt have lines with ctx.Ack, Fail or Emit. These are places where you can indicate whether processing of a tuple has succeeded, failed or you are emitting a new tuple to be processed further downstream. It also enables to replay tuples. If the SQL bulk copy fails the tuple will be processed again at a later time. The azure table spout expects your bolts to ack.

topologyBuilder.SetBolt(
    "TableStorageBolt",                                            
    TableStorageBolt.Get,
    new Dictionary<string, List>{
        {Constants.DEFAULT_STREAM_ID, new List(){"address"}}
    },
    256,                                      
    true)
.DeclareCustomizedJavaSerializer(javaSerializerInfo)
.shuffleGrouping("EventHubSpout");
 
topologyBuilder.SetBolt(
    "SQLBolt",                                              
    SQLStorageBolt.Get,
     new Dictionary<string, List>(),
    2,                                     
    true)
.shuffleGrouping("TableStorageBolt", Constants.DEFAULT_STREAM_ID)
.addConfigurations(new Dictionary<string,string>{{"topology.tick.tuple.freq.secs","1"}});
 
topologyBuilder.SetTopologyConfig(new Dictionary<string, string> { 
    {"topology.workers", partitionCount.ToString()}
});

The two bolts also need to be added to the topology. The tableStorageBolt is configured with a name, a method which can be used to create instances and a description of its output. In this case I’m emitting the email address to the default stream. The parallelism hint is also configured and the boolean flag which indicates this bolt supports acking. Since this bolt sits behind the Java spout, we also need to configure how the data of the spout can be deserialized. The bolt is also configured to use shuffleGrouping, meaning that the load will be spread evenly among all instances.

The SQLBolt is configured accordingly, with the additional configuration to support time ticks.

After I created an HDInsight Storm cluster on Azure I was able to publish the topology from within Visual Studio. The cluster itself was created in 15 minutes.

Capturepublish

It took quite some time to find the right parallelism for this topology but with the code I’ve posted I got these results after one hour:

Capturebolts

Table storage is slow, the 2 SQL bolts are able to keep up with the 256 table storage bolts. After one hour my topology was able to process only 10 million records. Even though I was able to send all 100 million events to the event hub in under two hours, processing would take 5 times longer.

One of the causes for this is the test data. The partition and row key for the table storage is based on the email address.

class BreachedAccount : TableEntity
{
    public BreachedAccount() { }
 
    public BreachedAccount(System.Net.Mail.MailAddress emailAddress, string leakedIn)
    {         
        this.PartitionKey = emailAddress.Host;         
        this.RowKey = emailAddress.Address;            
    }
 
    public string BreachedIn { get; set; }
}

Yet none of the email addresses in my test data share the same partition key. If the data was more realistic I would be able to use batches. In the original Adobe data breach 115 million addresses were part of 130 different domains. Table storage allows you to send batches of 100 records to the same partition key, which should result in less processing time. I’ll regenerate the test data with the same characteristics, modify the table storage bolt and rerun the experiment at a later time, hoping to get a better result.

The original standard instance of SQL Server was scaled to an S3, which allowed me to stay just below the max throughput with batches of 1000 records.

Capturesql

At the moment I actually have the same performance as Troy’s setup but with quite a different approach. Storm and HDInsight were new to me and I enjoyed the programming model. The scale and possibilities it can give are enormous, yet every part of your topology needs quite some attention if you want to have a high throughput solution.

The .NET part of the SDK is not yet entirely complete, so you can’t just port Java snippets to C# and expect them to work. I also ran into some NotImplementedExceptions. Microsoft is still busy implementing the SDK and the team is actively adding examples and functionality.

Further reading:

Event Hubs Programming Guide
Storm Applied
Introduction to Apache Storm on HDInsight: Real-time analytics for Hadoop
HDInsight Storm Examples
Process events from Azure Event Hubs with Storm on HDInsight
Performance Tuning for HDInsight Storm and Microsoft Azure EventHub
Understanding the Parallelism of a Storm Topology