I recently migrated some of our data pipelines from our local Ambari manged cluster to Amazon Elastic Map Reduce to take advantage of the great cluster startup times, allowing scalable bootstrapping of clusters as necessary (and their subsequent termination).
The process was actually more difficult than I anticipated. The overview page describes lots of magical tools including hive and sqoop, but when it comes to implementing them, you're basically on your own... well not really, on your own with Amazon support (which is awesome). At times, the docs are non-existant.
Below, I've tried to document some of the magic I learned as I went through the migration process. Hope it helps you make your data pipelines faster and more resource efficent.
I wrapped my Amazon EMR app in Ruby and Rake, so I can easily configure the cluster options, but at the core is this command. This is what I use to launch the cluster, specifying the bootstrap actions and steps.
config/emr.rb
:
module Config
class EMR
def initialize opts={}
@root_path = File.expand_path(File.join(File.dirname(__FILE__), '..'))
@shared_path = File.join(@root_path, 'config/aws')
@bucket = opts[:bucket]||'example-etl-app'
raise 'Specify an :app name in opts' if (@app = opts[:app]).nil?
end
def cfg file, opts={}
if opts[:shared].nil?
File.join(@root_path, "#{@app}-workflow", 'aws', file)
else
File.join(@shared_path, file)
end
end
def bucket
@bucket
end
def create_cluster
bootstrap_actions =
if File.exists?(bootstrap_file = cfg('bootstrap.json'))
"--bootstrap-actions file://#{bootstrap_file}"
else
""
end
# http://docs.aws.amazon.com/cli/latest/reference/emr/create-cluster.html
cmd = <<-EOS
aws emr --debug \
create-cluster --ami-version=3.3.0 \
--log-uri s3://#{bucket}/log \
--enable-debugging \
--ec2-attributes file://#{cfg('ec2_attributes_emr.json', shared: true)} \
--applications Name=Hue Name=Hive Name=Pig \
--use-default-roles \
--instance-groups InstanceGroupType=MASTER,InstanceCount=1,InstanceType=m3.xlarge InstanceGroupType=CORE,InstanceCount=2,InstanceType=m1.large \
--steps file://#{cfg('steps.json')} #{bootstrap_actions}
EOS
puts cmd
system cmd
end
def self.run_cluster app
emr = Config::EMR.new app: app
emr.create_cluster
end
end
end
Documentation for the AWS CLI EMR is here: http://docs.aws.amazon.com/cli/latest/reference/emr/index.html
A workflow is a series of steps and can now be run with rake:
emr.rake
:
require './config/emr.rb'
desc "run the cluster"
task :emr do |t, args|
%w/ an_etl another_etl /.each do |app|
Config::EMR.run_cluster app
end
end
In my project folder, each pipeline is organized into folders named:
my-cool-app/
first-pipeline-workflow/
aws/
steps.json
bootstrap.json
hive/
transform-some-data.hive.sql
another-pipeline-workflow/
aws/
steps.json
bootstrap.json
pig/
load-some-data.pig
...
Shared script or organized by the tool name:
my-cool-app/
hive/
create-schema.hive.sql
python/
mysql-to-hive-schema-translator.py
...
I like to put all the configuration tasks before any ETL tasks so we can fail the cluster as early as possible if there is a setup problem.
Currently, I place each setup step in steps.json
for pipeline it is needed in. Since there is duplicated code, these should be moved into a common location and compiled by rake into the steps.json
file. I'll probably move these configs to a yaml file that can be translated into json.
You'll notice I've set: "ActionOnFailure": "CANCEL_AND_WAIT"
, this is for development. In production, these should be switched to TERMINATE_CLUSTER
to avoid any inactive long running clusters.
I use HCatalog in pig to load csvs into Hive
{
"Name": "Configure HCatalog",
"Jar": "s3://elasticmapreduce/libs/script-runner/script-runner.jar",
"Args": [
"s3://support.elasticmapreduce/bootstrap-actions/hive/0.13.0/hcatalog_configurer.rb"
],
"ActionOnFailure": "CANCEL_AND_WAIT",
"Type": "CUSTOM_JAR"
}
Used to import/export data from my application layer db -- postgres.
{
"Name": "Install Sqoop",
"Jar": "s3://elasticmapreduce/libs/script-runner/script-runner.jar",
"Args": [
"s3://support.elasticmapreduce/bootstrap-actions/sqoop/install-sqoop-v2"
],
"ActionOnFailure": "CANCEL_AND_WAIT",
"Type": "CUSTOM_JAR"
}
Then copy in any postgres connector you might need:
{
"Name": "Install Sqoop Postgres",
"Jar": "s3://elasticmapreduce/libs/script-runner/script-runner.jar",
"Args": [
"s3://my-cool-app/current/aws/install_sqoop_postgres.sh"
],
"ActionOnFailure": "CANCEL_AND_WAIT",
"Type": "CUSTOM_JAR"
}
Where install_sqoop_postgres.sh
:
#!/bin/bash
# the postgres jar is on the machine, just not in the right place, yet
cp /usr/lib/oozie/libtools/postgresql-9.0-801.jdbc4.jar /home/hadoop/sqoop/lib/
Start with the hive step runner:
{
"Name": "Hive Transform Companies",
"Args": [
"-f", "s3://my-cool-app/current/an-etl-workflow/hive/transform_companies.hive.sql"
],
"ActionOnFailure": "CANCEL_AND_WAIT",
"Type": "HIVE"
}
If you get an error about specifying base path ERROR missing required argument base-path
, use this instead:
{
"Name": "Hive Setup Databases and UDFs",
"Jar": "s3://elasticmapreduce/libs/script-runner/script-runner.jar",
"Args": [
"s3://elasticmapreduce/libs/hive/hive-script",
"--base-path", "s3://elasticmapreduce/libs/hive/",
"--run-hive-script", "--hive-versions", "0.13.1",
"--args",
"-f", "s3://my-cool-app/current/hive/setup.hive.sql"
],
"ActionOnFailure": "CONTINUE",
"Type": "CUSTOM_JAR"
}
Where setup.hive.sql
is the hive script you want to run.
Load some data
{
"Name": "Pig Stage CSVs",
"Jar": "s3://elasticmapreduce/libs/script-runner/script-runner.jar",
"Args": [
"s3://support.elasticmapreduce/libs/pig/pig-script-run", "--run-pig-script",
"--args", "-useHCatalog",
"-f", "s3://my-cool-app/current/an-etl-workflow/pig/stage-csvs.pig"
],
"ActionOnFailure": "CANCEL_AND_WAIT",
"Type": "CUSTOM_JAR"
}
Whate stage-csvs.pig is something like:
-- runwith: pig -useHCatalog
-- use the -w flag to show detailed warnings
REGISTER 'file:/home/hadoop/lib/pig/piggybank.jar'
DEFINE CSVExcelStorage org.apache.pig.piggybank.storage.CSVExcelStorage(
',', 'YES_MULTILINE', 'NOCHANGE', 'SKIP_INPUT_HEADER'
);
-- get the csvs from this year and last year
daily_raw_csvs = LOAD 's3n://my-cool-app/data/csvs/*201[45]-[0-9][0-9]-[0-9][0-9].csv' using CSVExcelStorage() AS (rank:int, rank_variation:chararry, category:chararray, period:chararray);
-- remove any empty rows
filtered_daily_raw_csvs = FILTER daily_raw_csvs BY (rank IS NOT NULL) AND (period IS NOT NULL);
-- remove any duplicates
distinct_csvs = DISTINCT filtered_daily_raw_csvs;
-- extract the date fields and rank_variation, which is a percent in the input data
with_dates = FOREACH distinct_csvs GENERATE rank, ((double)REGEX_EXTRACT(rank_variation, '([-]{0,1}\\d+)%', 1)/100) AS rank_variation, category, ToDate(REGEX_EXTRACT(period, '.*(\\d{4}-\\d{2}-\\d{2})$', 1)) AS period;
-- store the transformed data into hive, in the `stage` schema
STORE with_dates INTO 'stage.csvs' USING org.apache.hive.hcatalog.pig.HCatStorer();
Use the hive step runner listed above:
{
"Name": "Hive Transform Companies",
"Args": [
"-f", "s3://my-cool-app/current/an-etl-workflow/hive/transform_ranks.hive.sql"
],
"ActionOnFailure": "CANCEL_AND_WAIT",
"Type": "HIVE"
}
Say we have rank data for multiple categories coming in on multiple rows (fully denormalized) and we want to normalize the category data.
transform_ranks.hive.sql
would look something like this:
DROP TABLE IF EXISTS transformed.categories;
CREATE TABLE transformed.categories AS
SELECT
reflect("java.util.UUID", "randomUUID") AS id,
C.category AS name,
CAST(unix_timestamp() AS TIMESTAMP) AS created_at,
CAST(unix_timestamp() AS TIMESTAMP) AS updated_at
FROM stage.csvs C
;
Then when we transform ranks, we can reference the id we generated in the step above.
Use the script-runner.jar
{
"Name": "Export to Sqoop",
"Jar": "s3://elasticmapreduce/libs/script-runner/script-runner.jar",
"Args": [
"s3://my-cool-app/current/an-etl-workflow/sqoop/export.sqoop.sh"
],
"ActionOnFailure": "CANCEL_AND_WAIT",
"Type": "CUSTOM_JAR"
}
Where export.sqoop.sh
:
# amazon emr warehouse directory
WAREHOUSE_DIR=/user/hive/warehouse
# table we're exporting
TABLE=categories
/home/hadoop/sqoop/bin/sqoop export \
--verbose \
--hcatalog-home /home/hadoop/.versions/hive-0.13.1/hcatalog \
--connect jdbc:postgresql://mydbserver.mydomain.com:5432/db_name \
--username dbusername --password dbpassword \
--export-dir ${WAREHOUSE_DIR}/transformed.db/${TABLE} \
--table ${TABLE}_temp \
--input-fields-terminated-by '\0001' --input-null-string '\\N' --input-null-non-string '\\N'
Observant readers will notice I import the transformed data into tables suffixed by _temp
.
Since my webapp uses the postgres database in production, I'll need to cutover to the new data in a single transaction.
I run these postgres scripts from a script runner step:
pg_run.sh
:
#!/bin/bash
# PARAMS:
# the pgparams config file name for the target database (config/postgres/), without .conf
# path from project home of sql file to run
set -o nounset # exit if trying to use an uninitialized var
set -o errexit # exit if any program fails
set -o pipefail # exit if any program in a pipeline fails, also
set -x # debug mode
PGPARAMS=$1
PGPARAMS_STRING=$( hadoop fs -cat s3://my-cool-app/current/config/postgres/${PGPARAMS}.conf )
PG_SQL_PATH=$2
PGCMD=$( hadoop fs -cat ${PG_SQL_PATH} )
env $PGPARAMS_STRING psql -c "${PGCMD}"
steps:
{
"Name": "Postgres Indexes",
"Jar": "s3://elasticmapreduce/libs/script-runner/script-runner.jar",
"Args": [
"s3://my-cool-app/current/postgres/pg_run.sh",
"dbname",
"s3://my-cool-app/current/an-etl-workflow/postgres/indexes.pg.sql"
],
"ActionOnFailure": "CANCEL_AND_WAIT",
"Type": "CUSTOM_JAR"
},
{
"Name": "Postgres Transformations",
"Jar": "s3://elasticmapreduce/libs/script-runner/script-runner.jar",
"Args": [
"s3://my-cool-app/current/postgres/pg_run.sh",
"dbname",
"s3://my-cool-app/current/an-etl-workflow/postgres/transform.pg.sql"
],
"ActionOnFailure": "CANCEL_AND_WAIT",
"Type": "CUSTOM_JAR"
}
indexes.pg.sql
:
CREATE INDEX index_category_names ON categories (name)
transform.pg.sql
:
BEGIN;
-- ranks
ALTER TABLE categories_temp ADD PRIMARY KEY (id);
DROP TABLE categories;
ALTER TABLE categories_temp RENAME TO categories;
COMMIT;
run hive with elasticsearch, in debug mode:
hive -hiveconf hive.aux.jars.path=elasticsearch-hadoop-hive-2.1.1.jar --hiveconf hive.root.logger=DEBUG,console
then use this to create tables:
DROP TABLE elasticsearch_wigets;
CREATE EXTERNAL TABLE elasticsearch_wigets(
`id` string,
`name` string
)
STORED BY 'org.elasticsearch.hadoop.hive.EsStorageHandler'
TBLPROPERTIES('es.nodes' = 'elasticsearch.awesome.com:9200',
'es.resource' = 'wigets')
;
and set the data:
INSERT OVERWRITE TABLE elasticsearch_wigets
SELECT id,
name
FROM wigets
;
use yarn application list (your results will show your ip, I've redacted ours):
[hadoop@ip-0-0-0-0 var]$ yarn application -list
15/07/15 23:37:54 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:9022
Total number of applications (application-types: [] and states: [SUBMITTED, ACCEPTED, RUNNING]):1
Application-Id Application-Name Application-Type User Queue State Final-State Progress Tracking-URL
application_1436973673154_0030 calc_some_ranks_temp.jar MAPREDUCE hadoop default RUNNING UNDEFINED 73.91% http://ip-0-0-0-0.ourdomain.com:36429
yarn application -list
yarn application -kill <application-id>
logs are in /mnt/var
/mnt/var/log/hadoop/steps
while a step is running you can strema the logs in realtime with these command on the master node:
tail -f /mnt/var/log/hadoop/steps/s-54K87AJP83RR/*
hadoop job -logs
command on the master nodefor example, if you see something like this in your hadoop logs, where one task is starting another:
http://10.11.1.29:9026/taskdetails.jsp?jobid=job_1441139008487_0013&tipid=task_1441139008487_0013_r_000009
find the logs for this sub-task with:
hadoop job -logs job_1441139008487_0013
8443 -- all external management connections from the AWS EMR service happen over this port
The EMR management server IP block is:
205.251.233.160/28
205.251.233.176/29
205.251.233.32/28
205.251.233.48/29
205.251.234.32/28
54.240.230.176/29
54.240.230.184/29
54.240.230.240/29
hadoop install is pushed via package installation infrastructure
bootstrap actions are no longer used, instead everything is a step
Let me know if I copy+pasted something wrong or if you have any questions. Leave a comment!