William Liu

Apache Beam

Summary

Apache Beam is a way to create data processing pipelines that can be used on many execution engines including Apache Spark and Flink. Beam provides these engines abstractions for large-scale distributed data processing so you can write the same code used for batch and streaming data sources and just specify the Pipeline Runner.

Overview

An overview of our abstractions include:

Typical Beam driver Workflow

A typical Beam driver program might look like:

When you run your Beam driver program, the Pipeline Runner that you designated constructs a workflow graph of your pipeline based on the PCollection objects that you created and transforms that you applied. The graph is then executed using the appropriate distributed processing back-end, becoming an asynchronous job on that back-end.

Pipeline

To use Beam, your first step is to create a Pipeline and set some configuration options.

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions

p = beam.Pipeline(options=PipelineOptions())

Configuring Pipeline Options

Your Pipeline will want to know a few configuration options including the Pipeline Runner, or things like a Project ID, or a location for storing files. You can set the options through PipelineOptions, which can be passed into command-line arguments.

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
 
p = beam.Pipeline(options=PipelineOptions())

You can also customize additional arguments:

class MyOptions(PipelineOptions):

    @classmethod
    def _add_argparse_args(cls, parser):
        parser.add_argument('--input',
                            help='Input for the pipeline',
                            default='gs://my-bucket/input')
        parser.add_argument('--output',
                            help='Output for the pipeline',
                            default='gs://my-bucket/output')

PCollections

The PCollection abstraction is a potentially distributed, multi-element data set; think of a PCollection as a “pipeline” data. If you want to work with data in your pipeline, it must be in the form of a PCollection. You can either create a PCollection from in-memory data or through an external source.

The elements of a PCollection may be of any type, but must be of the same type. Beam is able to encode each individual element as a byte string. The Beam SDK provides a data encoding mechanism that includes built-in encoding for commonly-used types as well as support for custom encodings. A PCollection is also immutable (once created, cannot add, remove, or change individual elements).

Creating PCollection from in-memory data

You can create a PCollection from an in-memory object like a list by using Beam’s Create transform.

with beam.Pipeline(options=pipeline_options) as p:
    lines = (p
             | beam.Create([
                 'To be, or not to be: that is the question: ',
                 'Whether \'tis nobler in the mind to suffer ',
                 'The slings and arrows of outrageous fortune, ',
                 'Or to take arms against a sea of troubles, ']))

Creating PCollection from an external source

You can create a PCollection from an external source including reading text from a file.

lines = p | 'ReadMyFile' >> beam.io.ReadFromText('gs://some/inputData.txt')

You can also create a PCollection from in-memory data, like using beam.Create to make a list.

with beam.Pipeline(options=pipeline_options) as p:
    
    lines = (p
             | beam.Create([
                'To be, or not to be: that is the question: ',
                'Whether \'tis nobler in the mind to suffer',
                'The slings and arrows of outrageous fortune']))

PCollection Characteristics

A PCollection is owned by a Pipeline object; multiple pipelines cannot share a PCollection.

Transforms

A Transform is the operations in your pipeline. You provide processing logic as a function (aka user code) and this user code is applied to each element of the input PCollection (or more than one PCollection). Depending on the pipeline runner, different workers across a cluster can execute instances of your user code in parallel. The output of the workers is added to a separate, final output PCollection. Core transforms include ParDo and Combine, and Composite Transforms.

Core Beam Transforms

The core transforms include:

ParDo

A ParDo is a transform for generic parallel processing similar to a Map in the Map/Shuffle/Reduce-style. A ParDo applies your user code on each element in the input PCollection and emits zero, one, or more elements to an ouptut PCollection. Example usage cases include:

DoFn w/ ParDo

User Code is in the form of a DoFn object, which is the processing logic that gets applied to each element in the input collection. Make sure this implementation does NOT depend on the number of invocations (it may be called multiple times in case of failure or retries).

# the input PCollection of Strings
words = ... 

# Using DoFn that performs on each element in the input PCollection
class ComputeWordLengthFn(beam.DoFn):
    def process(self, element):
        return [len(element)]

# Apply a ParDo to the PCollection 'words' to compute lengths for each word
word_lengths = words | beam.ParDo(ComputeWordLengthFn())

Here we assume an input PCollection of String values. We apply a ParDo transform that specifies a function ComputeWordLengthFn to compute the length of each string and outputs the result to a new PCollection of Integer values.

FlatMap w/ lambda

If you have a simple DoFn, you can just use a FlatMap with a lambda function.

# the input PCollection of Strings
words = ...

# Apply a lambda function to the PCollection 'words'
word_lengths = words | beam.FlatMap(lambda word: [len(word)])

Map

If you have a ParDo that has a one-to-one mapping of input elemnts to output elments, you can use the Map transform.

# the input PCollection of Strings
words = ...

# Apply a Map with a lambda function to the PCollection 'words'
word_lengths = words | beam.Map(len)

GroupByKey

The GroupByKey transform is for processing collections of key/value pairs. It’s a parallel reduction operation, similar to the Shuffle phase of Map/Shuffle/Reduce. The input to GroupByKey is a collection of key/value pairs that represents a multimap, where the collection contains multiple pairs that have the same key, but different values.

You would use GroupByKey to aggregate data that has something in common. For example, a collection that stores records of customer orders, you might want to group by the postal code key/field and the ‘value’ is the rest of the record.

Example

# Input File
cat, 1
dog, 5
and, 1
jump, 3
tree, 2
cat, 5
dog, 2
and, 2
...


# Grouped By
cat, [1,5,9]
dog, [5,2]
and, [1,2,6]
jump, [3]
tree, [2]
...

We transform from a multiple (multiple keys to individual values) to uni-map (unique keys to collections of values)

CoGroupByKey

CoGroupByKey joins two or more key/value PCollections that have the same key type and emits a KV<K, CoGbkResult> pair, which is essentially a join.

// collection 1
user1, address1
user2, address2
user3, address3

// collection 2
user1, order1
user1, order2
user2, order3
guest, order4

// output of collection1 and collection2
user1, [[address1], [order1, order2]]
user2, [[address2], [order3]]
user3, [[address3], []]
guest, [[], [order4]]

Combine

Combine is a transform for combining collections of elements or values in your data. Combine can be used on entire PCollection(s) as well as the values for each key in PCollection(s). Pre-built combines include common numeric combination operations like sum, min, and max.

Simple Combine

A simple combine

pc = [1, 10, 100, 1000]

def bounded_sum(values, bound=500):
    return min(sum(values), bound)

small_sum = pc | beam.CombineGlobally(bounded_sum)  # [500]
large_sum = pc | beam.CombineGlobally(bounded_sum, bound=5000)  # [1111]

An advanced Combine uses CombineFn, which defines complex combine functions. Here we have a custom subclass of CombineFn.

pc = ...

class AverageFn(beam.CombineFn):

    def create_accumulator(self):
        return (0.0, 0)

    def add_input(self, sum_count, input):
        (sum, count) = sum_count
        return sum + input, count + 1

    def merge_accumulators(self, accumulators):
        sum, counts = zip(*accumulators)
        return sum(sums), sum(counts)

    def extract_output(self, sum_count):
        (sum, count) = sum_count
        return sum / count if count else float('NaN')

Flatten

Flatten is a transform for PCollection objects that store the same data type. Flatten will merge multiple PCollection objects into a single PCollection.

# Takes a tuple of PCollection objects and returns a single PCollection
merged = (
    (pcoll1, pcoll2, pcoll3)

    # A list of tuples can be "piped" directly into a Flatten transform
    | beam.Flatten()
)

The coder for the output PCollection is the same as the coder for the first PCollection from the input list of PCollection.

Partition

Partition is a transform for Pcollection objects that splits a single PCollection into a fixed number of smaller collections. You provide the partitioning function logic that determines how to split up the elements of the input PCollection into eaach resulting partition PCollection.

# Provide an int value with the desired number of result partitions, and
a partitioning function (partition_fn in this example)
# Returns a tuple of PCollection objects containing each of the resulting
partitoins as individual PCollection objects

students = ...
def partition_fn(student, num_partitions):
    return int(get_percentile(student) * num_partitions / 100)

by_decile = students | beam.Partition(partitoin_fn, 10)

# You can extract each partition from the tuple of PCollection objects as:
fortieth_percentile = by_decile[4]

Transform Requirements

User code for a Beam Transform must keep below points in mind due to the distributed nature of the jobs:

The above applies to subclasses of DoFn, CombineFn, and WindowFn.

Serializability

Any function object provided to a transform must be fully serilizable. The reason is that a copy of the function is serialized and sent to a remote worker in a processing cluster.

Thread-Compability

Any function object should be thread-compatible because each instance of the function object is accessed by a single thread on aworker instance. The Beam SDK is not thread-safe.

Idempotence

It is recommended that your function object be idempotent, meaning it should be able to be repeated/retried as many times as necessary without creating any unintended side effects.

Side Inputs

Besides a main input PCollection, you can also provide additional inputs to a ParDo transform as a side input. A side input is an additional input that your DoFn can access each time it processes an element in the input PCollection. This side input needs to be determined at runtime (not hard-coded).

# Side inputs are available as extra arguments in the DoFn's process method
# or Map / FlatMap's callable.
# In this example side inputs are passed to a FlatMap transform as extra
# arguments and consumed by 'filter_using_length'

words = ....

# Callable takes additional arguments
def filter_using_length(word, lower_bound, upper_bound=float('inf')):
    if lower_bound <= len(word) <= upper_bound:
        yield word

# Construct a deferred side input
avg_word_len = (words
                | beam.Map(len)
                | beam.CombineGlobally(beam.combiners.MeanCombineFn()))

# Call with explicit side inputs
small_words = words | 'small' >> beam.FlatMap(filter_using_length, 0, 3)

# A single deferred side input
larger_than_average = (words | 'large' >> beam.FlatMap(
    filter_using_length,
    lower_bound=pvalue.AsSingleton(avg_word_len)))

# Mix and Match
small_but_nontrivial_words = words | beam.FlatMap(
    filter_using_length,
    lower_bound=2,
    upper_bound=pvalue.AsSingleton(avg_word_len))

# We can also pass side inputs to a ParDo transform, which will gets passed
# to its process method. The first two arguments for the process method
# would be self and element
class FilterUsingLength(beam.DoFn):
    def process(self, element, lower_bound, upper_bound=float('inf')):
        if lower_bound <= len(element) <= upper_bound:
            yield element

small_words = words | beam.ParDo(FilterUsingLength(), 0, 3)

Side Inputs and Windowing

TODO

Pipeline I/O

Pipelines often need to read data from an external source like a file or a database or likewise write to a file or database. Beam has a few common data storage transforms or you can write your own read/write transforms.

Reading input data

You can read data from an external source and return a PCollection at any point in the pipeline, though its common to have this data be in the start of your pipeline.

lines = pipeline | beam.io.ReadFromText('gs://some/inputData.txt')

Reading multiple locations

You can read from multiple locations at the same time using glob operators for files.

lines = p | 'ReadFromText' >> beam.io.ReadFromText('path/to/input-*.csv')

Writing output data

You can write data from a PCollection to an external data source, usually at the end of your pipeline, but can technically be used at any point in the pipeline.

output | beam.io.WriteToText('gs://some/outputData')

Writing to multiple output files

You can write transforms to write to multiple output files.

filtered_words | 'WriteToText' >> beam.io.WriteToText(
    '/path/to/numbers', file_name_suffix='csv')

Data Encoding

Python TypeDefault Coder

int VarIntCoder float FloatCoder str BytesCoder bytes StrUtf8Coder Tuple TupleCoder

Changing default Coder:

apache_beam.coders.registry.register_code(int, BigEndianIntegerCoder)

Windowing

Windowing subdivides a PCollection according to the timestamps of its individual elements. Transforms that aggregate multiple elements like GroupByKey and Combine.