Getting up and running with Mono and Raspberry Pi 3

Last week I got my hands on a Raspberry Pi and this weekend I finally found some time to sit down and get my first project with a Pi going. Naturally I ran into several issues and with today being Pi day, I thought I’d share my notes.

Last year I started a project with a bunch of colleagues where we try to monitor our gas, water and electricity meter. I’ve presented what we achieved so far at Cloudbrew last year. The aim is to build an IoT solution with a mobile application, cloud backend and lots of devices (Arduino’s and Pi’s for now). I didn’t want to wait on us finishing this to gather readouts from our utility meters so I thought I’d grab a Pi or two and get started. Since we already had figured out how to get the readouts I thought an hour or two would be all I need to put the solution on a Pi.

First thing I did was to download Raspbian Jessie Lite and follow the steps from the official site. Jessie is a headless operating system, since I won’t be connecting a monitor. I’m not choosing Windows 10 IoT for now because the onboard Wifi, a main selling point of the new Pi, is not supported at the time of this writing.

After connecting an ethernet cable and a power supply I used PuTTY to open an SSH session to connect to the Pi. So far so good.

The next task was to get the Wifi going. This turned out to be rather easy. Open “/etc/wpa_supplicant/wpa_supplicant.conf” in your editor and add the SSID and password of the network you want to connect to. Like most of the time, someone else had written down the instructions.

sudo nano /etc/wpa_supplicant/wpa_supplicant.conf

blog_pi_wifi_commands

I unplugged the ethernet cable and connected via Wifi. I then updated the Pi with “sudo apt-get update” and “sudo apt-get upgrade” so I was running the latest bits.

sudo apt-get update
sudo apt-get upgrade

Next up, programming. I briefly looked at the options I had. I could program in C, Python, C++, and many others. But time was limited this weekend. I live and breath .NET so I changed the list to .NET Core or Mono. I chose Mono because I had experimented with it years ago and .NET Core has not yet reached a stable point. Toying around with alpha and beta releases was not on my todo list for today.

The default package repository has a very old version of Mono, so you need to follow the instructions on the Mono site. Add the signing key and package repository to your system then run “sudo apt-get install mono-runtime”.

sudo apt-key adv --keyserver hkp://keyserver.ubuntu.com:80 --recv-keys 3FA7E0328081BFF6A14DA29AA6A19B38D3D831EF
echo "deb http://download.mono-project.com/repo/debian wheezy main" | sudo tee /etc/apt/sources.list.d/mono-xamarin.list
sudo apt-get update
sudo apt-get install mono-runtime

I created a Hello World console application on my Windows 10 laptop and used PSFTP to copy the exe to the Pi. It just worked.

pasted_image_at_2016_03_12_12_57_pm

Then the search was on to find a library to interface with the GPIO pins on the Pi. After looking around I found Raspberry Sharp IO . It had the API I wanted. You can use the event model to track changes in the GPIO pins, just what I needed.

var pin2Sensor = ConnectorPin.P1Pin11.Input();
 
GpioConnection connection = new GpioConnection(pin2Sensor);
connection.PinStatusChanged += (sender, statusArgs) 
                    => Console.WriteLine("Pin changed", statusArgs.Configuration.Name);

Deploying this to the Pi however resulted in catastrophic failure. Some weird error message:

pi@raspberrypi:~/ticktack $ sudo mono TickTackConsole.exe
Missing method .ctor in assembly /home/pi/ticktack/Raspberry.IO.GeneralPurpose.dll, type System.Runtime.CompilerServices.ExtensionAttribute
Can't find custom attr constructor image: /home/pi/ticktack/Raspberry.IO.GeneralPurpose.dll mtoken: 0x0a000014
* Assertion at class.c:5597, condition `!mono_loader_get_last_error ()' not met
 
Stacktrace:
 
 
Native stacktrace:
 
 
Debug info from gdb:
 
[New LWP 1965]
[Thread debugging using libthread_db enabled]
Using host libthread_db library "/lib/arm-linux-gnueabihf/libthread_db.so.1".
0x76e67ee8 in __libc_waitpid (Cannot access memory at address 0x1
pid=1966, stat_loc=0x7e904960, options=0) at ../sysdeps/unix/sysv/linux/waitpid.c:40
40      ../sysdeps/unix/sysv/linux/waitpid.c: No such file or directory.
  Id   Target Id         Frame
  2    Thread 0x769f3430 (LWP 1965) "mono" 0x76e65a40 in do_futex_wait (isem=isem@entry=0x3181a4) at ../nptl/sysdeps/unix/sysv/linux/sem_wait.c:48
* 1    Thread 0x76f5e000 (LWP 1961) "mono" 0x76e67ee8 in __libc_waitpid (Cannot access memory at address 0x1
pid=1966, stat_loc=0x7e904960, options=0) at ../sysdeps/unix/sysv/linux/waitpid.c:40
 
Thread 2 (Thread 0x769f3430 (LWP 1965)):
#0  0x76e65a40 in do_futex_wait (isem=isem@entry=0x3181a4) at ../nptl/sysdeps/unix/sysv/linux/sem_wait.c:48
#1  0x76e65af4 in __new_sem_wait (sem=0x3181a4) at ../nptl/sysdeps/unix/sysv/linux/sem_wait.c:69
#2  0x00219f98 in mono_sem_wait ()
#3  0x0019091c in ?? ()
Backtrace stopped: previous frame identical to this frame (corrupt stack?)
 
Thread 1 (Thread 0x76f5e000 (LWP 1961)):
Cannot access memory at address 0x1
#0  0x76e67ee8 in __libc_waitpid (pid=1966, stat_loc=0x7e904960, options=0) at ../sysdeps/unix/sysv/linux/waitpid.c:40
#1  0x000c0ba4 in ?? ()
Cannot access memory at address 0x1
Backtrace stopped: previous frame identical to this frame (corrupt stack?)
 
=================================================================
Got a SIGABRT while executing native code. This usually indicates
a fatal error in the mono runtime or one of the native libraries
used by your application.
=================================================================

Fast forward 4 hours. It turns out installing mono runtime doesn’t quite get you all the bits you need. You need to run the following command:

sudo apt-get install  libmono-system-core4.0-cil

After that the application started again. The button I had connected to the pins activated my code. Yay! With that my time for the weekend was all used up but I’m now ready to be create my actual application.

pasted_image_at_2016_03_12_05_18_pm

To be continued. #MarchIsForMakers

Running Gulp and NPM in VSO Build fails with “Run as Administrator” message

The past months I’ve been heavily involved with Angular and ASP.NET Web API projects. While moving a project from an on-premise build server to Visual Studio Online I ran into an issue for which I could not find any solution at first. The default npm task was failing for no apparent reason and just outputted that the task had to be run as Administrator.

Long story short, I added the –force attribute and suddenly the issue was resolved.

ForceBuild

Since I wanted to write this down, I had to reproduce the issue. So I removed the attribute again but the build somehow kept building. I had also deleted the failed builds so I have no log anymore to illustrate the issue.

Something has changed though as the builds now take much longer than before, even without the force attribute. You can see that in the image below. Hopefully I don’t run into it again, I added the force attribute after reading this GitHub issue.

madness

 

Session: Storm with HDInsight

Two weeks ago I spoke at the Belgian Azure user group (AZUG). I gave an introduction on Storm with HDInsight. You can find a recording of the session on their website.

My talk was divided in three parts, a introduction, a deep dive to give an overview of the main concepts of a Storm topology and then several scenario’s and how they can be solved.

The deep dive centered around creating a Twitter battle where hashtags were counted and the results then displayed on a website. You can find the code on my GitHub account.

Scaling an Azure Event Hub: Throughput units

When navigating to the scale tab of an event hub there are only two options you can choose: messaging tier and eventhub throughput units.

Scale settings

The messaging tier enables features and sets the amount you pay for messages or connections. You can find more info on the Azure website.

A throughput unit (TU) has quite a direct impact on the performance. A TU currently has these limits: 1 MB/s Ingress, 2 MB/s egress, and up to 84 GB of event storage. The default value is 1 TU.

In the picture below you can see that I had one cloud service pushing messages to an event hub until 10:00. I then scaled out the service to 20 instances. This resulted in about twice the amount of messages being sent (from 200k to 400k), not really what you expect. I was also getting more errors, from time to time the event hub was sending back server busy messages.

TP

At about 10:30 I increased the TU from 1 to 3, this not only stopped the errors from occurring but further increased the throughput from 400k to over 1 million messages being received on the event hub per 5 minutes.

Partitioning and wildcards in an Azure Data Factory pipeline

In a previous post I created an Azure Data Factory pipeline to copy files from an on-premise system to blob storage. This was a simple copy from one folder to another one.

It’s possible to add a time aspect to this pipeline. Let’s say I want to keep an archive of these files. In my source folder files get added, modified and deleted. I’d like to create daily snapshots of this folder.

To enable this scenario you only need to change the dataset that represents the file location in blob storage.

{
    "name": "AzureBlobDatasetTemplate",
    "properties": {
        "location": {
            "type": "AzureBlobLocation",
            "folderPath": "myblobcontainer/{Year}/{Month}/{Day}",
            "partitionedBy": [
                {
                    "name": "Year",
                    "value": {
                        "type": "DateTime",
                        "date": "SliceStart",
                        "format": "yyyy"
                    }
                },
                {
                    "name": "Month",
                    "value": {
                        "type": "DateTime",
                        "date": "SliceStart",
                        "format": "MM"
                    }
                },
                {
                    "name": "Day",
                    "value": {
                        "type": "DateTime",
                        "date": "SliceStart",
                        "format": "dd"
                    }
                }
            ],
            "linkedServiceName": "StorageLinkedService"
        },
        "availability": {
            "frequency": "Day",
            "interval": 1
        }
    }
}

I’ve changed the folderPath and added a partitionedBy element. The folderPath now contains placeholders, the elements between curly braces. The definition of these placeholders is defined in the partitionedBy array. Each element will use the variable SliceStart to create a value. The only other variable you can use, at this time, is SliceEnd. If you make this change to the dataset the next run of the pipeline will produce the desired output.

partitionedfiles

 

I also tested what happens when a file is in use, an exception. So make sure the application that creates the files you want to copy releases any locks.

inuse

 

Wildcards can be used in the filename and the filter, so you can copy all txt files in a folder with *.txt. Unfortunately wildcards and partition placeholders can not be combined. So if your files are all being stored in one folder, but each file has the time in its filename (myFile-2015-07-01.txt), you can’t create a filter with the dynamic partitions (myFile-{Year}-{Month}-{Day}txt). It’s only possible to use the partitionedBy section in the folder structure as shown above. If you think this is a nice feature, go vote here!

The price of the current setup is determined by a couple of things. First we have a low frequency activity, that’s an activity that runs daily or less. The first 5 are free, so we have 25 activities remaining. The pricing of an activity is determined on the place where it occurs, on-premise or in the cloud. I’m assuming here it’s an on-premise activity since the files are not located in Azure. I’ve asked around if this assumption is correct but don’t have a response yet. The pricing of an on-premise activity is €0.5586 per activity. So that would mean almost €14 for this daily snapshot each month. If we modified everything to run hourly we’d have to pay €554,80 per month. You can find more info on the pricing on their website.

In this scenario I’ve demonstrated how to get started with Azure Data Factory. The real power however lies in the transformation steps which you can add. Instead of doing a simple copy the data can be read, combined and stored in many different forms. A topic for a future post.

Upside:

  • Rich editor
  • Fairly easy to get started
  • No custom application to write
  • On-premise support via the Data Management Gateway
  • No firewall settings need to be changed

Downside

  • Can get quite expensive

Copying files with Azure Data Factory

The goal of Azure Data Factory is to create a pipeline which gathers a lot of data sources and produces a reliable source of information which can be used by other applications. The pain of interfacing with every differnt type of datastore is abstracted away from every consuming application. You can have relational databases, flat files, whatever and create a pipeline which transforms and enriches the data.  In an update published the end of March it was announced that you can also copy files. I wanted to try this out and it proved to be a bit more cumbersome than I first imagined. Let’s take a look.

filecopyflowI want to create a very basic flow. Let’s say I have an application which is populating files in a folder and I now want to move the file into Azure blob storage. I can use the same use case as mentioned in my previous post, I’m placing data leaks in a folder and need them to be sent online for further processing.

After creating a data factory in your Azure account we’ll need the following components:

  • Two linked services
    • A connection to my on-premises folder
    • A connection to my blob storage account
  • Two datasets (also called tables)
    • A dataset containing info on where my data is stored on premise and how many times per day it can be fetched.
    • A dataset which has info on how and where to store the data in blob storage
  • One pipeline which contains an activity which connects the datasets.

The connection to the on premise file is handled by an application which you need to install on-premise.  By navigating to the linked services slice you can add a data gateway.

datagateway

To configure a gateway you only need to provide a name, you can then download the data gateway application and install it on premise. After installing the application you need to enter the key which can be viewed in the Azure portal.

newdatagateway

As far as the on-premise configuration, you are done. You do not need to configure any firewall ports but you can only install it once on a PC.

So far the wizards. Now we need to create the pipeline. After clicking on the “Author and deploy” tile. The browser navigates to an online editor.

authoranddeploy

You create linked services, datasets and pipelines by using JSON. When clicking on any of the menu options, you can select a template to get started.

editornew

 

As mentioned earlier I needed two linked services. You can create those via the new data store option. The first one I’ll create is the on-premise file data store.

{
    "name": "OnPremisesFileSystemLinkedService",
    "properties": {
        "host": "localhost",
        "gatewayName": "mydatagateway",
        "userId": "BennyM",
        "password": "**********",
        "type": "OnPremisesFileSystemLinkedService"
    }
}

The options you need to configure are:

  • host, which will be the name of the machine which contains the files/folder you need to connect to
  • gatewayName, which has to match the name of the gateway which we created earlier
  • a userid and password or encryptedcredentials to use to connect from the gateway to the target machine
  • type, which needs to be OnPremisesFileSystemLinkedService
  • name, which we will use later

The next one will be the connection to the Azure storage. You can get the template by clicking “New data store” and selecting “Azure storage”.

{
    "name": "StorageLinkedService",
    "properties": {
        "connectionString": "DefaultEndpointsProtocol=https;AccountName=bennymdatafactory;AccountKey=**********",
        "type": "AzureStorageLinkedService"
    }
}

The options you need to configure are:

  • name, which we will use later
  • connectionstring, which needs to match your connectionstring for Azure storage
  • type, which needs to be AzureStorageLinkedService

Next one, the first dataset: on-premises file.

{
    "name": "OnPremisesFile",
    "properties": {
        "location": {
            "type": "OnPremisesFileSystemLocation",
            "folderPath": "c:\\Temp",
            "linkedServiceName": "OnPremisesFileSystemLinkedService"
        },
        "availability": {
            "frequency": "Day",
            "interval": 1,
            "waitOnExternal": {
                "dataDelay": "00:10:00",
                "retryInterval": "00:01:00",
                "retryTimeout": "00:10:00",
                "maximumRetry": 3
            }
        }
    }
}

The options you need to configure:

  • again a name, which we’ll use later
  • type, which has to be OnPremisesFileSystemLocation
  • folderpath, where is the folder I want to sync. Note the double slashes.
  • linkedServiceName, this has to be the same value which we used earlier when we created the data store for the on-premises gateway.
  • availability, how many times will the on-premises file or folder by synchronized. What’s very important is the waitOnExternal. You have to configure this if the data is not produced by the data factory itself. In this case it’s an external source so I have to fill in some values.

Our next dataset is the Azure blob.

{
    "name": "AzureBlobDatasetTemplate",
    "properties": {
        "location": {
            "type": "AzureBlobLocation",
            "folderPath": "myblobcontainer",
            "linkedServiceName": "StorageLinkedService"
        },
        "availability": {
            "frequency": "Day",
            "interval": 1
        }
    }
}

Fairly easy to configure.

  • again a name
  • the type which has to be AzureBlobLocation
  • the folderPath wich will be the path inside my Azure blob storage account which was configured in the linked service
  • linkedServiceName, which has to match the name we used earlier.

Then the actual workflow, the pipeline.

{
    "name": "CopyFileToBlobPipeline",
    "properties": {
        "activities": [
            {
                "type": "CopyActivity",
                "transformation": {
                    "source": {
                        "type": "FileSystemSource"
                    },
                    "sink": {
                        "type": "BlobSink",
                        "writeBatchSize": 0,
                        "writeBatchTimeout": "00:00:00"
                    }
                },
                "inputs": [
                    {
                        "name": "OnPremisesFile"
                    }
                ],
                "outputs": [
                    {
                        "name": "AzureBlobDatasetTemplate"
                    }
                ],
                "policy": {
                    "timeout": "00:05:00",
                    "concurrency": 4
                },
                "name": "Ingress"
            }
        ],
        "start": "2015-06-28T00:00:00Z",
        "end": "2015-06-30T00:00:00Z"
    }
}

I will not go over every property. What’s important is that in the copy activity we tie our two datasets together. The start and end times indicate the time period our pipeline will be active. This is very important as I’ve not found any way you can pause a running pipeline. If for instance I indicated earlier that I want my folder to be copied every 15 minutes and indicate a start date in the pipline in the year 2010, the pipeline will do all historical runs! So be careful when you configure this as Azure Data Factory isn’t the cheapest service around.

Every time you create a linked service, dataset or pipeline you click deploy.  The editor will then validate if all the names you mention are actually correct. The editor is quite good, as it even detects errors in datatypes and gives you help and intellisense (control + space).

editorerror

editorhelp

And with all this deployed, the folder is being synchronized online.

copyresult

This is probably one of the most expensive file copy samples. In a next post I’ll investigate more features. One of the drawbacks of the current setup is that every time the sync runs all files are overwritten. All files are copied all the time as is.

Sources:

 

Using HDInsight & Storm to process 100 million events

bqezxjIn my last post I threw 100 randomly generated email addresses to an Azure event hub and it didn’t even budge. Now it’s time to process the data and store it in a setup that resembles haveibeenpwned.

With the events now in the event hub, I could create another cloud service which uses the .NET SDK to read the data. There are two ways to implement this, either with the EventHubReceiver class or with the EventProcessorHost. I’ll try these out in future post.

For now I wanted to use something else, HDInsight & Storm. HDInsight is Hadoop as PaaS. Storm is a computation system to process streams of data and, as advertised on their website, should be able to process millions of tuples per second. Sounds like a good way to handle the 100 million e-mail addresses I have waiting.

In order to use Storm you need to understand only a few concepts:

  • Tuple: a piece of information that needs to be processed.
  • Spout: reads from a stream and returns tuples
  • Bolt: processes tuples and can forward or produce new tuples
  • Topology: links spouts and bolts together

This is a very rudimentary explanation, but should be enough for this post.

In order to write and publish a topology in Visual Studio, you should have the Azure SDK and the HDInsight tools for Visual Studio. Let’s look at the different components we need.

The spout will be reading from our event hub. Microsoft has already written a spout which you can use. However it is written in Java. Storm is not bound to one language, a core feature and with HDInsight you can have hybrid topologies. Any work you have already invested in Storm can be reused. Java and C# spouts and bolts can even work together. Let’s look at the configuration that we need to get our spout up and running.

TopologyBuilder topologyBuilder = new TopologyBuilder("EmailProcessingTopology");           
 
int partitionCount = Properties.Settings.Default.EventHubPartitionCount;
 
JavaComponentConstructor constructor = JavaComponentConstructor.CreateFromClojureExpr(
    String.Format(@"(com.microsoft.eventhubs.spout.EventHubSpout. (com.microsoft.eventhubs.spout.EventHubSpoutConfig. " +
        @"""{0}"" ""{1}"" ""{2}"" ""{3}"" {4} ""{5}""))",
        Properties.Settings.Default.EventHubPolicyName,
        Properties.Settings.Default.EventHubPolicyKey,
        Properties.Settings.Default.EventHubNamespace,
        Properties.Settings.Default.EventHubName,
        partitionCount,
        "")); 
 
topologyBuilder.SetJavaSpout(
    "EventHubSpout",  
    constructor,      
    partitionCount);

This code can be found in the samples of the HDInsight team and is pretty straightforward. Create an eventhubspout and add it to the topology. The partitionCount indicates how many executors and tasks there should be and it’s suggested that this should match the amount of partitions of your event hub.

It gets more interesting in the first bolt. A bolt is a class that implements ISCPBolt. This interface has one method, Execute, which receives a tuple.

public void Execute(SCPTuple tuple)
{  
    string emailAddress = (string)tuple.GetValue(0);
 
    if (!string.IsNullOrWhiteSpace(emailAddress))
    {             
        JObject eventData = JObject.Parse(emailAddress);
        try
        {
            var address = new System.Net.Mail.MailAddress((string)eventData["address"]);
            var leakedIn = (string)eventData["leak"];
            var breachedAccount = new BreachedAccount(address, leakedIn);
            var retrieveOperation = TableOperation.Retrieve(breachedAccount.PartitionKey, breachedAccount.RowKey);
            var retrievedResult = table.Execute(retrieveOperation);
            var existingAccount = (Device)retrievedResult.Result;
            if(existingAccount == null)
            {
                TableOperation insertOperation = TableOperation.Insert(new BreachedAccount(address, leakedIn));
                table.Execute(insertOperation);
                ctx.Emit(Constants.DEFAULT_STREAM_ID,new[]{tuple}, new Values { address.Address });
            }
            else
            {
                existingAccount.BreachedIn += breachedAccount.BreachedIn;
                TableOperation insertOperation = TableOperation.Replace(existingAccount);
                table.Execute(insertOperation);
            }                    
            this.ctx.Ack(tuple);
        }
        catch (Exception ex)
        {
            Context.Logger.Error(eventData.ToString());
            Context.Logger.Error(ex.Message);
            this.ctx.Fail(tuple);
        }
    }
    else
    {
        Context.Logger.Info("empty address");
        this.ctx.Ack(tuple);
    }
}

The tuple is the JSON object which was send to the Azure event hub in my previous post. It contains the email address and the website which was breached. Table storage is queried to see if the email address is already present, if this is the case the entry is updated otherwise the new account is added to the system. Only new accounts are emitted for further processing.

The second bolt in the system will insert new accounts into a SQL database. Its execute method is quite different.

public void Execute(SCPTuple tuple)
{           
    // Check if the tuple is of type Tick Tuple
    if (IsTickTuple(tuple))
    {               
        if((DateTime.UtcNow - lastRun) >= TimeSpan.FromSeconds(batchIntervalInSec))
        {
            Context.Logger.Info("Time to purge!");
            FinishBatch();
        }                                
    }
    else
    {                
        breachedAccounts.Add(tuple);                
        if (breachedAccounts.Count >= batchSize)
        {
            Context.Logger.Info("Max reached, time to purge!");
            FinishBatch();
        }
        else
        {
            Context.Logger.Info("Not yet time to purge!");
        }
    }
}

This bolt does not process every tuple it receives immediately. Instead it keeps a list of tuples ready for processing when either a maximum count is reached or a specific amount of time has passed since the last purge. In the FinishBatch method a SqlBulk copy is performed to insert records in a SQL database.

private void FinishBatch()
{
    lastRun = DateTime.UtcNow;
    DataTable table = new DataTable();
    table.Columns.Add("Email");
    if (breachedAccounts.Any())
    {
        foreach (var emailAddress in breachedAccounts)
        {
            var row = table.NewRow();
            row["Email"] = emailAddress.GetString(0);
            table.Rows.Add(row);
        }
        try
        {
            using (SqlBulkCopy bulkCopy = new SqlBulkCopy(connection))
            {
                bulkCopy.DestinationTableName = "Emails";
                bulkCopy.ColumnMappings.Add("Email", "Email");
                bulkCopy.WriteToServer(table);
            }
            foreach (var emailAddress in breachedAccounts)
            {
                ctx.Ack(emailAddress);
            }
 
        }
        catch (Exception ex)
        {
            foreach (var emailAddress in breachedAccounts)
            {
                ctx.Fail(emailAddress);
            }
            Context.Logger.Error(ex.Message);
        }
        finally
        {
            breachedAccounts.Clear();
        }
    }
}

Both the table storage bolt and the SQL bulk copy bolt have lines with ctx.Ack, Fail or Emit. These are places where you can indicate whether processing of a tuple has succeeded, failed or you are emitting a new tuple to be processed further downstream. It also enables to replay tuples. If the SQL bulk copy fails the tuple will be processed again at a later time. The azure table spout expects your bolts to ack.

topologyBuilder.SetBolt(
    "TableStorageBolt",                                            
    TableStorageBolt.Get,
    new Dictionary<string, List>{
        {Constants.DEFAULT_STREAM_ID, new List(){"address"}}
    },
    256,                                      
    true)
.DeclareCustomizedJavaSerializer(javaSerializerInfo)
.shuffleGrouping("EventHubSpout");
 
topologyBuilder.SetBolt(
    "SQLBolt",                                              
    SQLStorageBolt.Get,
     new Dictionary<string, List>(),
    2,                                     
    true)
.shuffleGrouping("TableStorageBolt", Constants.DEFAULT_STREAM_ID)
.addConfigurations(new Dictionary<string,string>{{"topology.tick.tuple.freq.secs","1"}});
 
topologyBuilder.SetTopologyConfig(new Dictionary<string, string> { 
    {"topology.workers", partitionCount.ToString()}
});

The two bolts also need to be added to the topology. The tableStorageBolt is configured with a name, a method which can be used to create instances and a description of its output. In this case I’m emitting the email address to the default stream. The parallelism hint is also configured and the boolean flag which indicates this bolt supports acking. Since this bolt sits behind the Java spout, we also need to configure how the data of the spout can be deserialized. The bolt is also configured to use shuffleGrouping, meaning that the load will be spread evenly among all instances.

The SQLBolt is configured accordingly, with the additional configuration to support time ticks.

After I created an HDInsight Storm cluster on Azure I was able to publish the topology from within Visual Studio. The cluster itself was created in 15 minutes.

Capturepublish

It took quite some time to find the right parallelism for this topology but with the code I’ve posted I got these results after one hour:

Capturebolts

Table storage is slow, the 2 SQL bolts are able to keep up with the 256 table storage bolts. After one hour my topology was able to process only 10 million records. Even though I was able to send all 100 million events to the event hub in under two hours, processing would take 5 times longer.

One of the causes for this is the test data. The partition and row key for the table storage is based on the email address.

class BreachedAccount : TableEntity
{
    public BreachedAccount() { }
 
    public BreachedAccount(System.Net.Mail.MailAddress emailAddress, string leakedIn)
    {         
        this.PartitionKey = emailAddress.Host;         
        this.RowKey = emailAddress.Address;            
    }
 
    public string BreachedIn { get; set; }
}

Yet none of the email addresses in my test data share the same partition key. If the data was more realistic I would be able to use batches. In the original Adobe data breach 115 million addresses were part of 130 different domains. Table storage allows you to send batches of 100 records to the same partition key, which should result in less processing time. I’ll regenerate the test data with the same characteristics, modify the table storage bolt and rerun the experiment at a later time, hoping to get a better result.

The original standard instance of SQL Server was scaled to an S3, which allowed me to stay just below the max throughput with batches of 1000 records.

Capturesql

At the moment I actually have the same performance as Troy’s setup but with quite a different approach. Storm and HDInsight were new to me and I enjoyed the programming model. The scale and possibilities it can give are enormous, yet every part of your topology needs quite some attention if you want to have a high throughput solution.

The .NET part of the SDK is not yet entirely complete, so you can’t just port Java snippets to C# and expect them to work. I also ran into some NotImplementedExceptions. Microsoft is still busy implementing the SDK and the team is actively adding examples and functionality.

Further reading:

Event Hubs Programming Guide
Storm Applied
Introduction to Apache Storm on HDInsight: Real-time analytics for Hadoop
HDInsight Storm Examples
Process events from Azure Event Hubs with Storm on HDInsight
Performance Tuning for HDInsight Storm and Microsoft Azure EventHub
Understanding the Parallelism of a Storm Topology

 

Data leak processing

On the excellent blog of Troy Hunt you can read how he’s using Azure webjobs to process leaked user account dumps. As with anything in IT, there are many solutions to a problem. Lately I’ve been busy with Azure event hubs, streaming analytics and HDInsight, I thought it would be a nice experiment to use the same data dump but process it in an entirely different manner. In this first post, I’ll illustrate the data input part. In a follow-up post, I’ll use HDInsight to process the actual data.

The data source that has to be processed is a 5,12 GB text file containing 100 million email addresses which I uploaded to blob storage. Just like Troy I then created a batching solution to process the file in chuncks. I didn’t want to use webjobs so instead I created a cloud service. This service reads line numbers from a queue. The line numbers indicate which lines to read from the text file and send to the event hub.

DataInArchitecture

The code in the cloud service is pretty straightforward. Open a filereader and move to the line number which is indicated in the incoming message from queue storage. Then start reading the lines of the file that need to be processed and fire away at the event hub. I have not given the code much thought, so there are probably several ways this can be improved, I just wanted to get up and running. The event hub receives json objects containing the name of the website and the email address.

using (var streamReader = new StreamReader(file.OpenRead()))
{
    int row = 0;
    while (row < beginAt)
    {
        streamReader.ReadLine();
        row++;
    }
    while (row <= endAt)
    {
         row++;        
         var line = streamReader.ReadLine();        
         lines.Add(line);     
    } 
    Parallel.ForEach(lines, line => {
         var leakedEmail = JsonConvert.SerializeObject(new { leak = "websitesource", address = line });
         eventHubClient.Send(new EventData(Encoding.UTF8.GetBytes(leakedEmail)));
    });
}

I created batches of 1 million rows which means 100 batches that had to be processed. I deployed the service and then observed the system. After seeing one instance running correctly I scaled up the system to get 20 instances running simultaneous.

Capture03

The result was already quite nice. All of the instances together were sending around 4,2 million leaked email addresses every 5 minutes with a peak of 4.5 million at the end.

Capture01

 

This meant that in under two hours the entire file had been sent into my event hub. Capture06

Between 12 and 1 PM I only had one instance running. I scaled up somewhere between 1 and 2 PM.

What did this cost?

  • Running 20 A1 instances of a cloud service: ~ €3,5
  • Receiving 100 million event hub request (and doing some processing): ~ €2

In the next post I’ll show you the HDInsight part which will take every event from the event hub and insert or update the address in table storage.

 

ISXperience May 2015 – Machine Learning Presentation

On the last ISXperience I introduced attendees to machine learning and AzureML in general. The talk was largely based on the one I gave on the Azure Bootcamp. The biggest change was the demo on the movie recommendations, which was now fully integrated. I also updated the slide deck so it contained the latest info.

You can download the slides here. If you’re interested and want to learn more, check out the free e-book MVA has provided.

 

Flemish media HTTPS bankruptcy

Note: I usually blog in something that resembles English, this post however will be in Dutch, in an effort to make the Flemish corner of the internet a little safer.

Voor wie het nog niet wist, ik werk in de IT-sector. Af en toe geef ik een presentatie, zowel voor professionals als voor leken. Eén van de onderwerpen die ik wel eens behandel, is beveiliging. Hierbij hoef ik niet te werken met een fictieve casus, want er zijn jammer genoeg voldoende voorbeelden voorhanden. In het verleden heb ik bijvoorbeeld al verschillende keren Canvas gewezen op hun onveilige inlogpagina. Eerst wat context.

Web 101

Het web is gebouwd rond het HTTP protocol. Het komt erop neer dat er een hoop tekst heen en weer wordt gestuurd. Navigeer je naar een website met je browser dan wordt er een “GET” verzoek naar een bepaalde url gestuurd. Wanneer je een formulier invult op een website, doet je browser normaal gezien een “POST” verzoek. Ik kan bijvoorbeeld surfen naar de website “tweakers.net”. Wat er achter de schermen allemaal gebeurt, kan je zien door in je browser een keer op de toets “F12” te drukken. Wat er dan tevoorschijn komt is de “developer console”, die wordt gebruikt door ontwikkelaars wanneer ze een website bouwen of een probleem moeten oplossen. Je kan deze ook zelf gebruiken om te leren hoe het allemaal werkt. In de screenshot zie je onderaan de technische informatie. Het eerste verzoek dat mijn browser doet is een GET verzoek naar de url tweakers.net.

01tweakers

De computer waar de website op draait, krijgt dit verzoek binnen en zal een hoop tekst terugsturen. Je browser zal dit dan interpreteren en je krijgt een website te zien. Al deze tekst wordt op een leesbare manier doorgestuurd, wat op zich geen probleem is. Soms is er echter informatie die je niet zomaar als leesbare tekst wilt versturen, bijvoorbeeld wanneer je een wachtwoord of kredietkaartnummer moet invullen.

Ook dit kunnen we nakijken op de website. Wanneer je op inloggen klikt, word je doorgestuurd naar een andere pagina. In de adresbalk kunnen we zien dat we niet meer HTTP gebruiken, maar HTTPS.

02tweakers

De ontwikkelaars van de website hebben ervoor gekozen om HTTPS te gebruiken. De S staat voor secure en zolang dat er bijstaat, worden de door jou ingevulde en verstuurde gegevens versleuteld. Ook de gegevens die de website naar jou stuurt, worden geëncrypteerd. Andere personen kunnen dus niet meer meekijken.

03tweakers

Ligt al je internetverkeer dan zomaar op straat? Eigenlijk wel, maar wanneer je thuis op je eigen netwerk zit, is de kans klein dat er mensen meekijken. Ben je echter in een restaurant, station of op een andere publieke plaats waar er gratis WiFi wordt aangeboden, dan bevind je je eigenlijk wel in een potentiële jungle. Met een tool zoals Wireshark kan je al het netwerkverkeer, bedraad en draadloos, inkijken. Als het iets mag kosten, dan kan je ook een WiFi Pineapple kopen waarmee een man-in-the-middle-aanval kinderspel is, zeker in deze tijd van smartphones. Genoeg theorie, laten we een keer kijken naar enkele Vlaamse media websites.

Knack

Op de Knack site staat bovenaan een “Aanmelden” link. Wanneer je verder klikt, krijg je een popup die je inloggegevens vraagt. Op het eerste zicht werkt het niet over HTTPS, we moeten naar de ontwikkelaarstools van de browser gaan om dit te achterhalen. Daar zien we gelukkig dat de inhoud van deze popup wel via HTTPS wordt geladen.

04knack

Bij het invullen van een gebruikersnaam en wachtwoord wordt ook alles netjes via HTTPS verstuurd. Het wachtwoord wordt zelfs niet gewoon als tekst verstuurd. Interessant

05knack

 

07knack

De computers van Knack ontvangen een MD5 hash van mijn wachtwoord. Je kan in de Javascript die de website gebruikt, opzoeken wat er precies gebeurt. Met deze gegevens kan ik veronderstellen dat mijn wachtwoord op deze manier wordt opgeslagen, wat ook weer een risico met zich meebrengt, maar dat is niet de focus van deze blogpost.

06knack

Vier

Ook op de website van Vier kan ik me aanmelden. Wanneer je op de profielpagina klikt, verschijnt er een extra stukje op de pagina. Dit stond al verborgen op de pagina toen ik er naartoe ben gesurft. Een man-in-the-middle-aanval is dus mogelijk.

08vier

Na het invullen van een willekeurige gebruikersnaam (test@test.be) en wachtwoord (test), ben ik tot mijn verbazing niet alleen ingelogd, maar mijn gebruikersnaam en wachtwoord worden onversleuteld doorgestuurd. Iedereen kan dus mijn gegevens zien. Waarschijnlijk is dit een testaccount die men bij Vier gebruikt. Je kan je dus ook vragen stellen over de verplichte wachtwoordcomplexiteit, maar ook hier ga ik niet dieper op in in deze blogpost.

09vier

Vijf

Op de site van Vijf kan je lezen dat je kan inloggen met accounts van Vier. Dus ook op deze site ben ik met mijn willekeurige testaccount ingelogd en ook hier wordt mijn wachtwoord onversleuteld verstuurd.

10vijf

Vlaamse media websites

Voor andere sites heb ik dezelfde methode gehanteerd en de resultaten staan in de tabel hieronder. Ik heb de volgende puntencriteria gebruikt:

  • Wordt de inlogpagina versleuteld opgevraagd: 2,5 punten
  • Wordt de inlogpagina versleuteld verstuurd: 5 punten
  • Het certificaat dat wordt gebruikt om alles te beveiligen is gecontroleerd via de website SSLLabs. Indien zij een A geven, krijgt de website 2,5 punten. Een A- wordt 2 enzovoort.
Laden Versturen Certificaat Totaal
Knack 2,5 5 2,5 10
Vier 0 0 0 0
Vijf 0 0 0 0
VTM 2,5 5 2 9,5
GVA 0 0 0 0
Beleggerscompetitie 0 0 0 0
Canvas 0 0 0 0
Radio1 0 0 0 0
MNM 0 0 0 0
Belang van Limburg 0 0 0 0
Nieuwsblad 0 0 0 0
De Standaard 0 0 0 0
De Morgen 2,5 5 1,5 9

De resultaten van SSLLabs kan je hier vinden voor Knack, VTM en De Morgen

Conclusie

Dat er een slechte leerling in de klas zou zitten, had ik wel verwacht. Dat de situatie echter zo slecht is, is voer tot nadenken. Deze websites kiezen er momenteel voor om de zwakste schakel te zijn. Mensen hergebruiken wachtwoorden en elke keer je een verbinding maakt met één van deze websites bestaat er dus de kans dat iemand je wachtwoord kan zien. Het is net hetzelfde als je pincode invullen terwijl de bankautomaat op een scherm wordt geprojecteerd.

Zoals ik al heb vermeld, heb ik al enkele keren Canvas op de hoogte gebracht van het probleem. Ik kreeg toen te horen dat het te moeilijk was. Als dat daadwerkelijk zo is, kunnen ze beter hun hele inlogpagina wegnemen of een alternatief zoeken. Zo gebruikt VTM een derde partij en kan je bij Newsmonkey enkel inloggen via sociale netwerksites. Dan hoef je niet een zoveelste gebruikersnaam wachtwoord combinatie te maken.

Knack komt als beste uit de vergelijking, maar de beleggerscompetitie die zij organiseren is wel gebuisd.

Toekomst

Hopelijk veranderen de bovenvermelde websites binnenkort en kan ik hier neerschrijven dat de wereld weer een beetje veiliger is. Denk ook twee keer na voordat je gevoelige informatie invult, controleer dat je over HTTPS werkt en verbind niet met elk WiFi netwerk dat je tegenkomt.

#httpscrusade