Streaming in VisTrails¶
Streaming data may be useful for a number of reasons, such as to incrementally update a visualization, or to process more data than fit into memory. VisTrails supports streaming data through the workflow. By implementing modules that supports streaming, data items will be passed through the whole workflow one at a time.
Using Streaming¶
Streaming is similar to list handling (see Chapter List Handling in VisTrails). Modules that create streams should output a port with list depth 1. Downstream modules that do not accept lists will be executed once for each item in the stream. Modules with multiple input streams will combine them pairwise. For this reason the input streams should contain the same number of items (or ben unlimited).
Modules accepting a type with list depth 1, but does not support streaming, will convert input streams to lists and execute after the streaming have ended.
Try it Now!
Lets use PythonSources to create a simple example that incrementally sums up
a sequence of numbers. First we will create a module that streams the
natural numbers up to some value. Create a new workflow and add a
PythonSource
module. Give it an input port named inputs
of type
Integer, which will specify the maxim number to stream, and an output port
named out
of type Integer
with list depth 1, which will be the output
stream. An output stream can be created by using
self.set_streaming_output
, which takes the port name, an iterator object,
and an optional length of the input items. To create an integer iterator we
can use xrange. Add this to the PythonSource:
self.set_streaming_output('out',
xrange(inputs).__iter__(),
inputs)
Next Step!
Now lets create a module that captures the item in the string. Add a second
PythonSource
module below the first one. Give it an input port named
integerStream
of type Integer and list depth 1 that will be our input
stream. An input stream can be captured by adding the magic string
#STREAMING
to the PYthonSource code and calling self.set_streaming
with a generator method as argument. The generator method should take the
module as an input. It should first initialize its value, in our case set
intsum=0
. Then it should receive the inputs in a loop ending with yield.
In each iteration the module will be updated to contain a new input in the
stream. Similar to a normal module, the loop should:
- get inputs
- compute outputs
- set outputs
- call
yield
Below is the complete example. Add it to the PythonSource.
#STREAMING - This tag is magic, do not change.
def generator(module):
intsum = 0
while 1:
i = module.get_input('integerStream')
intsum += i
print "Sum so far:", intsum
yield
self.set_streaming(generator)
Finally:
Connect the two PythonSource’s, set inputs
to 100 in the first
PythonSource, open the vistrails console and execute. See how the output is
printed to the console while the stream runs and how the progress of the
modules increase. The output should look like this: (open in vistrails)
Sum so far: 0
Sum so far: 1
Sum so far: 3
...
Sum so far: 4851
Sum so far: 4950