William Liu

Hadoop


##Table of Contents

Summary

Apache’s Hadoop is an open source software framework for big and/or unstructured data that is programmed in Java. This means that if you’re working with very large data sets (e.g. 100 Terabytes), then you want to consider Hadoop. It’s important to note that if you don’t work with very large data sets or unstructured data, then you probably do NOT need Hadoop you’re probably better off with using a standard relational SQL database.

Why Hadoop

So why Hadoop? Instead of running on a single powerful server, Hadoop can run on a large cluster of commodity hardware. This means that a network of personal computers (nodes) can coordinate their processing power AND storage (see HDFS below). Hadoop allows systems to build horizontally (i.e. more computers) instead of just vertically (i.e. a faster computer).

Hadoop is a system for ‘Big Data’. Choose Hadoop if a lot of this applies:

Hadoop is good for:

How does Hadoop work?

Hadoop’s architecture is built from Google’s MapReduce and the Google File System white papers, with Hadoop consisting of MapReduce for processing and the Hadoop File System (HDFS) for storage.

MapReduce

MapReduce does processing and is broken down into two pieces, mappers and reducers:

The general steps look like this:

  1. split
  2. map
  3. sort/shuffle (this is done automatically)
  4. reduce

Note: There are specific MapReduce programs that allow higher level querying like:

HDFS

The Hadoop File System (HDFS) provides redundancy and fault-tolerant storage by breaking data into chunks of about 64MB - 2GB, then it creates instances (based on the replication factor setting, usually 3 instances) of the same data, and spreads it across a network of computers. Say that one of the networked computers has a piece of data you need and it goes down; there are still two other copies of the data on the network that is readily available. This is much different than many enterprise systems where if a major server goes down, it would take anywhere from minutes to hours or days to fully restore.

Note: There are specific HDFS data systems like HBase and Accumulo that allow you to fetch keys quickly, which are good for transactional systems.

Hadoop v2 ecosystem

There were a few changes from Hadoop v1 to Hadoop v2, mainly the addition of YARN and a lot more data processing applications like Pig, Hive, etc.

Hadoop 1

Main pieces of Hadoop 1.0 were MapReduce sitting on top of the HDFS.

Hadoop 2

With Hadoop 2.0, we still have MapReduce and HDFS, but now also have an additional layer YARN that acts as a resource manager for distributed applications; YARN sits between the MapReduce and HDFS layers. Client submits job to YARN Resource Manager, which then automatically distributes and manages the job. Along with MapReduce, we have a few other data processing like Hive, Pig, Spark, etc.

Overview of Hadoop 2 Layers

  1. Applications with Pig, Hive, Cascading, Mahout, Giraph, Presto
  2. Batch jobs with MapReduce; Interactive with Tez; In memory with Spark
  3. YARN for Cluster Resource Management
  4. Storage with S3, HDFS

Hive

Use Hive to interact with your data in HDFS and Amazon S3

Pig

Spark

An alternative to MapReduce processing. Spark uses a Directed Acyclic Graph instead of Hadoop’s Map-Reduce. Spark has very high performance, but is in memory. We can think of the difference as MapReduce will get data from disk, run an operation, write that update to disk, then read from disk again and then do another operation. Spark holds all that data in memory and can do the operations without going back to disk.

To run Spark jobs, you can run in standalone cluster mode, on an EC2, on Hadoop YARN, or on Apache Mesos.

Since Spark is just a processing replacement, you’ll still need to find what to use for data storage. Spark works well with storage solutions like HDFS, Cassandra, HBase, Hive, S3.

Hadoop User Experience (HUE) GUI for Hive and Pig

Can interact in an ad-hoc way with the HUE GUI.

Hello World of Hadoop

Notes

Books: Hadoop the Definitive Guide Streaming with Python just use stdin and stdout

run-hadoop.sh

Run hadoop using: source run-hadoop.sh

hadoop jar $HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-*.jar \
  -input ./input.txt \
  -output ./output \
  -mapper map.py \
  -reducer reduce.py

input.txt

Goog, 230, 240
Apple, 100, 98
MS, 300, 250
MS, 250, 260
MS, 270, 280
Goog, 220, 215
Goog, 300, 350
IBM, 80, 90
IBM, 90, 85

map.py

#!/usr/bin/env python
import sys
import string

for line in sys.stdin:
    record = line.split(",")
    opening = int(record[1])
    closing = int(record[2])
    if (closing > opening):
        change = float(closing - opening) / opening
        print '%s\t%s' % (record[0], change)

reduce.py

#!/usr/bin/env python
import sys
import string

stock = None
max_increase = 0
for line in sys.stdin:
   next_stock, increase = line.split('\t')
   increase = float(increase)
   if next_stock == stock:     # another line for the same stock
       if increase > max_increase:
           max_increase = increase
   else:  # new stock; output result for previous stock
       if stock != None:  # only false on the very first line of input
           print( "%s\t%f" % (stock, max_increase) )
       stock = next_stock
       max_increase = increase
# print the last
print( "%s\t%f" % (stock, max_increase) )    

expected-output

Goog    0.166667
IBM     0.125000
MS      0.040000

output/part-00000

This is created after running the source run-hadoop.sh

Goog    0.166667
IBM     0.125000
MS      0.040000

Amazon Elastic Map Reduce

In multiple EMR instance groups, we have:

S3 and HDFS as your data layers

Data Architectures

You can architect your data a few different ways. Here are a few examples:

  1. Data (e.g. GB of logs) are pushed to S3 2a. You can either push out a small amount of data to a local server 2b. If you don’t want to save data in the local server, you can just use S3 instead of HDFS for your data layer to decouple your compute capacity and storage.

Long-running cluster

Interactive query

EMR for ETL and query engine for investigations

This takes the S3 data and splits it two ways:

  1. Spark for transient EMR cluster for ad-hoc analysis of entire log set
  2. Hourly EMR cluster using Spark for ETL, then load subset into Redshift Data Warehouse

Other interesting ETL setups include:

Streaming Data Processing

Logs stored in Amazon Kinesis, then it splits out to:

How to use execute work on EMR

You can either:

You can have a variety of data stores including:

How to set security for EMR

EMR uses two IAM roles for security:

EMR by default creates two security groups:

Bootstrap Actions configures your applications (e.g. setup core-site.xml, hdfs-site.xml, mapreduce.xml, yarn.xml)

You should consider using client-side encrypted objects in S3. Also should compress your data files. S3 can be used as Landing Zone and/or as Data Lake.

EMR is also HIPAA-eligible.

AWS Data Pipeline for ETL

AWS Data Pipeline, can access through the console, command line interface, APIs

General Data Pipeline Commands

Pipeline Use Case for Clickstreams

  1. S3 as Data Lake / Landing Zone
  2. Amazon EMR as ETL grid (hive, pig)
  3. Data Warehouse with Amazon Redshift

On a Data Pipeline, the activities looks like:

  1. weblogs-bucket
  2. logsProcess-ExtractTransform
  3. goes to staging
  4. goes to redshift
  5. reports connect to redshift

For example, a PigActivity can do: