Saturday 3 March 2012

NDataFlow - Open Source .NET Dataflow Library

Last year, a colleague of mine showed me an open source .NET extract, transform and load (ETL) helper library that he's working on. It is called NDataFlow and allows you to annotate methods with attributes to create a dataflow in your application. It's a nice lightweight library that you can use whenever you are developing simple or complex ETL programs. The example below simulates a very simple ETL scenario where a set of people (hard-coded in the example) are filtered based on their location and then output to a file.

class Program : DataflowComponent
{
  static void Main(string[] args)
  {
    new Program().Run();
  }

  //First method in the flow
  [Operation]
  public void DataSource(Output output)
  {
    //Imagine retrieving people from a real data source
    //e.g. database, xml file, csv file, etc.
    output.Emit(new Person() { Name = "Alice", City = "London" });
    output.Emit(new Person() { Name = "Bob", City = "New York" });
    output.Emit(new Person() { Name = "Foo", City = "London" });
    output.Emit(new Person() { Name = "Bar", City = "Sydney" });
  }

  [Operation]
  public IEnumerable FilterForLondonPeople
    ([Link("DataSource")] IEnumerable input)
  {
    return input.Where
      (p => p.City.Equals("London", 
        StringComparison.InvariantCultureIgnoreCase));
  }

  [Operation]
  public void OutputResults
    ([Link("FilterForLondonPeople")] IEnumerable results)
  {
    using (var sw = new StreamWriter(@"C:\LondonPeople.txt", false)
    {
      foreach (var p in results)
        sw.WriteLine("{0} {1}", p.Name, p.City);
    }
  }
}
The example shows that there is little work needed to get a simple dataflow setup and running. You inherit the Run method by deriving from the NDataFlow.DataflowComponent class. Then, if you've setup your method and attributes correctly using the LinkAttribute it's a simple case of calling Run to start your dataflow. In this case, the first method in the dataflow would be DataSource, whose output is sent to FilterForLondonPeople and finally whose output is sent to the OutputResults method.

4 comments:

  1. Thank you for this wonderful post, Sir.
    I have a few questions. I am developing a simple wireless sensor device simulator where I am going to simulate 2 nodes communicating with each other. My current problem is how to simulate the transfer of data from one node to the other. Do you think this Dataflow library will do the trick?

    ReplyDelete
  2. Hi awchigee,

    Are your nodes running on separate concurrent threads? Can there be more than two nodes? If there can be more than two nodes, does each node need to communicate the extracted data to all nodes (i.e. broadcast the data) or is each node only ever communicating with one other node?

    I don't see why you couldn't use this ETL library in your simulator, each node can derive from DataflowComponent and run on a separate thread. Your simulation engine that spawns these threads/nodes can randomly call the Run method on each node to simulate a dataflow within a node (i.e. sensor activity). But this still gives you the problem of communicating between nodes - there are probably a number of solutions depending on your answers to my questions :)

    ReplyDelete
    Replies
    1. Hi!
      Thanks for your quick response. :)
      Yes, the nodes are running on separate concurrent threads. I'm trying to first simulate just 2 nodes and expand it later on to more than 3 nodes (maybe 5-6 maximum). Each node will only communicate to one other node (since what I'm actually trying to simulate is a Body Sensor Network).

      Can you give me an idea of a few ways to simulate communication between nodes?

      By the way, your blog is awesome!

      Delete
    2. Hi, thanks for your kind words :)

      Just like any well designed OO application, try to model the simulation in your design as close as possible to the real world domain. Unfortunately I don't know a lot about the domain of your simulation (wireless sensors) so my suggestion below may be a bit abstract.

      You could implement something like a "WirelessChannel" class. Node objects can then "connect" to the channel (e.g. implement a method like Connect(Node node) in the WirelessChannel). Connecting to the channel is essentially just adding the node to an internal list encapsulated in the channel. The channel can then have a method with a signature like SendData(Node fromNode, Node toNode, Data datum).

      When nodes are connecting to a channel, you can pass the current instance of the channel to the connecting nodes (so that the nodes are "aware" of what channel they're on). If each node has a reference to the channel, they can then retrieve a list of the other connected nodes via their channel reference (through a property/method on the channel). You can then pass data between nodes through the WirelessChannel.SendData method (from a particular node). A node can have a ReceiveData(Node fromNode, Data datum) method which the WirelessChannel.SendData method uses. The implementation of SendData could be as simple as:

      public void SendData(Node fromNode, Node toNode, Data datum)
      {
      toNode.ReceiveData(fromNode, datum);
      }

      I think this solution would work well as it lets each node communicate with any node (if required in the future) as long as the other node is connected to the same WirelessChannel instance. Obviously you can adapt it to suit your requirements (like restrict the communication aspect so that a node can only talk to one other particular node). Your engine would obviously tie all this together (create the wireless channel, create the nodes, add the nodes to the channel etc). Hope that gives you a good starting point.

      Delete