Dataflow Bench in Java

25 Feb 2016

Mkay, this is the kind of thing that we're trying to build (a sketchy UML diagram created with yUML):

yuml class diagram

Let's start by talking about the client. They're going to build a network (well, actually a chain) of Ops, and then feed things into the network at one end (by invoking an op) and expect that op to invoke the next op, and so on, percolating down, something like this:

public class Benchmark
{
    private SinkOp s;
    private PredicateOp p4;
    private PredicateOp p6;
    private JoinOp j1;
    private PredicateOp p8;
    private PredicateOp p10;
    private JoinOp j2;
    
    public Benchmark()
    {
        s = new SinkOp();
        p4 = new PredicateOp(4, s);
        p6 = new PredicateOp(6, p4);
        j1 = new JoinOp(p6);
        p8 = new PredicateOp(8, j1);
        p10 = new PredicateOp(10, p8);
        j2 = new JoinOp(p10);
        for (int i = 0; i < 12000; i += 1) {
            if (i % 2 == 0) {
                j1.add(i);
                j2.add(i);
            }
        }
    }

    public void run()
    {
        for (int i = 0; i < 2*1000*1000; i += 1) {
            if (i%2 == 1) {
                j2.invoke(i);
            }
        }
    }
}

What this class does is build a join-pred(10)-pred(8)-join-pred(6)-pred(4)-sink chain, and then feed a million odd numbers into it - in order to test how fast numbers percolate through this kind of chain.

In order to make the various Op nodes play together nicely like legos, we need an interface, so that the JoinOp for example can take any kind of Op as the next thing in its chain.

public interface OpInterface
{
    void invoke(int incoming);
}

Working our way up the chain from the bottom, the Sink implements this interface by storing the incoming on its list.

import java.util.Collection;
import java.util.ArrayList;

public class SinkOp implements OpInterface
{
    Collection<Integer> list;

    public SinkOp()
    {
        list = new ArrayList<Integer>();
    }

    public void invoke(int incoming)
    {
        list.add(incoming);
    }
}

The Predicate implements this interface by filtering out incoming items equal to its predicate item.

public class PredicateOp implements OpInterface
{
    private int predicate;
    private OpInterface next;
    
    // Constructor
    public PredicateOp(int predicate, OpInterface next)
    {
        this.predicate = predicate;
        this.next = next;
    }
    
    @Override
    public void invoke(int incoming)
    {
        if (incoming != predicate) {
            next.invoke(incoming);
        }
    }
}

So for example, if the predicate was zero, and you invoked it with zero, then the next op would not be invoked, but if the predicate was zero and you invoked it with one, then the next op would be invoked - with one.


import static org.junit.Assert.*;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

public class PredicateOpTest
{
    @Test
    public void IfPredicateIsZero_FiltersOutZero()
    {
        MockOp next = new MockOp();
        PredicateOp to_test = new PredicateOp(0, next);
        to_test.invoke(0);
        assertFalse(next.sawInvoke);
    }

    @Test
    public void IfPredicateIsZero_DelegatesOne()
    {
        MockOp next = new MockOp();
        PredicateOp to_test = new PredicateOp(0, next);
        to_test.invoke(1);
        assertTrue(next.sawInvoke);
        assertEquals(next.saw, 1);
    }

    @Test
    public void IfPredicateIsZero_DelegatesTwo()
    {
        MockOp next = new MockOp();
        PredicateOp to_test = new PredicateOp(0, next);
        to_test.invoke(2);
        assertTrue(next.sawInvoke);
        assertEquals(next.saw, 2);
    }

    @Test
    public void IfPredicateIsOne_DelegatesZero()
    {
        MockOp next = new MockOp();
        PredicateOp to_test = new PredicateOp(1, next);
        to_test.invoke(0);
        assertTrue(next.sawInvoke);
        assertEquals(next.saw, 0);
    }
}

The Join is like the predicate, but it filters out everything that matches a whole set of items.

import java.util.Set;
import java.util.HashSet;

public class JoinOp implements OpInterface
{
    private OpInterface next;
    private Set<Integer> table;
    
    // Constructor
    public JoinOp(OpInterface next)
    {
        this.next = next;
        this.table = new HashSet<Integer>();
    }

    // Mutator
    public void add(int to_add)
    {
        this.table.add(to_add);
    }
    
    @Override
    public void invoke(int incoming)
    {
        if (!this.table.contains(incoming)) {
            next.invoke(incoming);
        }
    }
}

This is a little interesting as an example of object orientation and Java syntax, but of course I'm thinking about Abstract Nonsense.