ETL là gì ?
Trong thời gian gần đây, tôi có điều kiện làm việc trong team ETL. Vậy ETL là gì ?
ETL là viết tắt của Extract Transform Load gọi là quá trình làm thế nào dữ liệu được đưa vào từ các nguồn dữ liệu vào kho dữ liệu.
ETL gồm 3 bước là :
- Extracts : đi thu gom dữ liệu từ nhiều nguồn khác nhau
- Transforms : chuyển đổi dữ liệu, như mô hình ở trên mục đích của chúng ta là chuyển đổi dữ liệu nghiệp vụ thành dạng dữ liệu có thể phân tích được. Ngoài ra ở bước này chúng ta phải làm sạch dữ liệu.
- Loading : Sau khi chuyển đổi và lưu trữ, dữ liệu được đưa vào một datawarehouse. Dữ liệu này dùng cho mục đích phân tích và phát triển của 1 team khác đó chính là team phân tích
Trong các bước ở trên, mình thường xuyên phải sử dụng một công cụ để import dữ liệu từ bên ngoài và import vào hive. Đó chính là Sqoop.
Vậy chúng ta cùng nhau làm 1 ví dụ để xem sqoop import dữ liệu từ MySQL vào hive như thế nào nhé.
Tạo bảng trong MySQL
Kiểm tra MySQL đã được cài đặt hay chưa, nếu chưa bạn tự cài đặt nhé.
shell> mysql --version
mysql Ver 14.14 Distrib 5.1.66, for redhat-linux-gnu (x86_64) using readline 5.
Tạo 1 database, ta đặt tên là sqoop
mysql> create database sqoop;
Sau đó
mysql> use sqoop;
mysql> create table customer(id varchar(3), name varchar(20), age varchar(3), salary integer(10));
Query OK, 0 rows affected (0.09 sec)
mysql> desc customer;
+--------+-------------+------+-----+---------+-------+
| Field | Type | Null | Key | Default | Extra |
+--------+-------------+------+-----+---------+-------+
| id | varchar(3) | YES | | NULL | |
| name | varchar(20) | YES | | NULL | |
| age | varchar(3) | YES | | NULL | |
| salary | int(10) | YES | | NULL | |
+--------+-------------+------+-----+---------+-------+
mysql> select * from customer;
+------+--------+------+--------+
| id | name | age | salary |
+------+--------+------+--------+
| 1 | John | 30 | 80000 |
| 2 | Kevin | 33 | 84000 |
| 3 | Mark | 28 | 90000 |
| 4 | Jenna | 34 | 93000 |
| 5 | Robert | 32 | 100000 |
| 6 | Zoya | 40 | 60000 |
| 7 | Sam | 37 | 75000 |
| 8 | George | 31 | 67000 |
| 9 | Peter | 23 | 70000 |
| 19 | Alex | 26 | 74000 |
+------+--------+------+-----
Sử dụng Sqoop
Như chúng ta đã thấy, ở trên bảng sqoop tôi không tạo khóa chính và thêm một vài bản ghi trong bảng customer. Mặc định, sqoop sẽ xác định primary key là splitting column. Ví dụ --split-by id
Tôi muốn import dữ liệu này vào hive, tôi sử dụng command
sqoop import --connect jdbc:mysql://localhost:3306/sqoop
--username root
-P
--split-by id
--columns id,name
--table customer
--target-dir /user/cloudera/ingest/raw/customers
--fields-terminated-by ","
--hive-import
--create-hive-table
--hive-table sqoop_workspace.customers
Trong đó
- connect - jdbc string của mysql
- username - Database username
- P : Database password
- table : Tên bảng cần import. Ở đây chính là bảng customer
- split-by : chính là split column đã nói ở trên. Ở đây là Id
- target-dir : Path lưu trữ trên hệ thống HDFS
- fields-terminated-by : Các giá trị phân cách nhau giữa các field, ở đây là dấu phẩy
- create-hive-table : Tạo bảng trên hive để lưu trữ dữ liệu từ bảng customer. Sẽ fail nếu bảng đc tạo rồi
- hive-table : Chỉ định database name và tên table trên hive
Chạy lệnh import bằng sqoop
sqoop import --connect jdbc:mysql://localhost:3306/sqoop --username root -P --split-by id --columns id,name --table customer --target-dir /user/cloudera/ingest/raw/customers --fields-terminated-by "," --hive-import --create-hive-table --hive-table sqoop_workspace.customers
Warning: /usr/lib/sqoop/../accumulo does not exist! Accumulo imports will fail.
Please set $ACCUMULO_HOME to the root of your Accumulo installation.
16/03/01 12:59:44 INFO sqoop.Sqoop: Running Sqoop version: 1.4.6-cdh5.5.0
Enter password:
16/03/01 12:59:54 INFO manager.MySQLManager: Preparing to use a MySQL streaming resultset.
16/03/01 12:59:54 INFO tool.CodeGenTool: Beginning code generation
16/03/01 12:59:55 INFO manager.SqlManager: Executing SQL statement: SELECT t.* FROM `customer` AS t LIMIT 1
16/03/01 12:59:56 INFO manager.SqlManager: Executing SQL statement: SELECT t.* FROM `customer` AS t LIMIT 1
16/03/01 12:59:56 INFO orm.CompilationManager: HADOOP_MAPRED_HOME is /usr/lib/hadoop-mapreduce
Note: /tmp/sqoop-cloudera/compile/6471c43b5c867834458d3bf5a67eade2/customer.java uses or overrides a deprecated API.
Note: Recompile with -Xlint:deprecation for details.
16/03/01 13:00:01 INFO orm.CompilationManager: Writing jar file: /tmp/sqoop-cloudera/compile/6471c43b5c867834458d3bf5a67eade2/customer.jar
16/03/01 13:00:01 WARN manager.MySQLManager: It looks like you are importing from mysql.
16/03/01 13:00:01 WARN manager.MySQLManager: This transfer can be faster! Use the --direct
16/03/01 13:00:01 WARN manager.MySQLManager: option to exercise a MySQL-specific fast path.
16/03/01 13:00:01 INFO manager.MySQLManager: Setting zero DATETIME behavior to convertToNull (mysql)
16/03/01 13:00:01 INFO mapreduce.ImportJobBase: Beginning import of customer
16/03/01 13:00:01 INFO Configuration.deprecation: mapred.job.tracker is deprecated. Instead, use mapreduce.jobtracker.address
16/03/01 13:00:02 INFO Configuration.deprecation: mapred.jar is deprecated. Instead, use mapreduce.job.jar
16/03/01 13:00:04 INFO Configuration.deprecation: mapred.map.tasks is deprecated. Instead, use mapreduce.job.maps
16/03/01 13:00:05 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
16/03/01 13:00:11 INFO db.DBInputFormat: Using read commited transaction isolation
16/03/01 13:00:11 INFO db.DataDrivenDBInputFormat: BoundingValsQuery: SELECT MIN(`id`), MAX(`id`) FROM `customer`
16/03/01 13:00:11 WARN db.TextSplitter: Generating splits for a textual index column.
16/03/01 13:00:11 WARN db.TextSplitter: If your database sorts in a case-insensitive order, this may result in a partial import or duplicate records.
16/03/01 13:00:11 WARN db.TextSplitter: You are strongly encouraged to choose an integral split column.
16/03/01 13:00:11 INFO mapreduce.JobSubmitter: number of splits:4
16/03/01 13:00:12 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1456782715090_0004
16/03/01 13:00:13 INFO impl.YarnClientImpl: Submitted application application_1456782715090_0004
16/03/01 13:00:13 INFO mapreduce.Job: The url to track the job: http://quickstart.cloudera:8088/proxy/application_1456782715090_0004/
16/03/01 13:00:13 INFO mapreduce.Job: Running job: job_1456782715090_0004
16/03/01 13:00:47 INFO mapreduce.Job: Job job_1456782715090_0004 running in uber mode : false
16/03/01 13:00:48 INFO mapreduce.Job: map 0% reduce 0%
16/03/01 13:01:43 INFO mapreduce.Job: map 25% reduce 0%
16/03/01 13:01:46 INFO mapreduce.Job: map 50% reduce 0%
16/03/01 13:01:48 INFO mapreduce.Job: map 100% reduce 0%
16/03/01 13:01:48 INFO mapreduce.Job: Job job_1456782715090_0004 completed successfully
16/03/01 13:01:48 INFO mapreduce.Job: Counters: 30
File System Counters
FILE: Number of bytes read=0
FILE: Number of bytes written=548096
FILE: Number of read operations=0
FILE: Number of large read operations=0
FILE: Number of write operations=0
HDFS: Number of bytes read=409
HDFS: Number of bytes written=77
HDFS: Number of read operations=16
HDFS: Number of large read operations=0
HDFS: Number of write operations=8
Job Counters
Launched map tasks=4
Other local map tasks=5
Total time spent by all maps in occupied slots (ms)=216810
Total time spent by all reduces in occupied slots (ms)=0
Total time spent by all map tasks (ms)=216810
Total vcore-seconds taken by all map tasks=216810
Total megabyte-seconds taken by all map tasks=222013440
Map-Reduce Framework
Map input records=10
Map output records=10
Input split bytes=409
Spilled Records=0
Failed Shuffles=0
Merged Map outputs=0
GC time elapsed (ms)=2400
CPU time spent (ms)=5200
Physical memory (bytes) snapshot=418557952
Virtual memory (bytes) snapshot=6027804672
Total committed heap usage (bytes)=243007488
File Input Format Counters
Bytes Read=0
File Output Format Counters
Bytes Written=77
16/03/01 13:01:48 INFO mapreduce.ImportJobBase: Transferred 77 bytes in 104.1093 seconds (0.7396 bytes/sec)
16/03/01 13:01:48 INFO mapreduce.ImportJobBase: Retrieved 10 records.
16/03/01 13:01:49 INFO manager.SqlManager: Executing SQL statement: SELECT t.* FROM `customer` AS t LIMIT 1
16/03/01 13:01:49 INFO hive.HiveImport: Loading uploaded data into Hive
Logging initialized using configuration in jar:file:/usr/jars/hive-common-1.1.0-cdh5.5.0.jar!/hive-log4j.properties
OK
Time taken: 2.163 seconds
Loading data to table sqoop_workspace.customers
chgrp: changing ownership of 'hdfs://quickstart.cloudera:8020/user/hive/warehouse/sqoop_workspace.db/customers/part-m-00000': User does not belong to supergroup
chgrp: changing ownership of 'hdfs://quickstart.cloudera:8020/user/hive/warehouse/sqoop_workspace.db/customers/part-m-00001': User does not belong to supergroup
chgrp: changing ownership of 'hdfs://quickstart.cloudera:8020/user/hive/warehouse/sqoop_workspace.db/customers/part-m-00002': User does not belong to supergroup
chgrp: changing ownership of 'hdfs://quickstart.cloudera:8020/user/hive/warehouse/sqoop_workspace.db/customers/part-m-00003': User does not belong to supergroup
Table sqoop_workspace.customers stats: [numFiles=4, totalSize=77]
OK
Time taken: 1.399 seconds
Cuối cùng, chúng ta kiểm tra dữ liệu đã được import vào hive chưa ?
hive> show databases;
OK
default
sqoop_workspace
Time taken: 0.034 seconds, Fetched: 2 row(s)
hive> use sqoop_workspace;
OK
Time taken: 0.063 seconds
hive> show tables;
OK
customers
Time taken: 0.036 seconds, Fetched: 1 row(s)
hive> show create table customers;
OK
CREATE TABLE `customers`(
`id` string,
`name` string)
COMMENT 'Imported by sqoop on 2016/03/01 13:01:49'
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
LINES TERMINATED BY '\n'
STORED AS INPUTFORMAT
'org.apache.hadoop.mapred.TextInputFormat'
OUTPUTFORMAT
'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION
'hdfs://quickstart.cloudera:8020/user/hive/warehouse/sqoop_workspace.db/customers'
TBLPROPERTIES (
'COLUMN_STATS_ACCURATE'='true',
'numFiles'='4',
'totalSize'='77',
'transient_lastDdlTime'='1456866115')
Time taken: 0.26 seconds, Fetched: 18 row(s)
hive> select * from customers;
OK
1 John
2 Kevin
19 Alex
3 Mark
4 Jenna
5 Robert
6 Zoya
7 Sam
8 George
9 Peter
Time taken: 1.123 seconds, Fetched: 10 row(s).
Đến đây, chúng ta đã thực hành xong việc import dữ liệu từ MySQL sang hive, việc tiếp theo, dữ liệu trên Hive sẽ được sử dụng để team phân tích sử dụng tiếp. Cảm ơn các bạn đã đọc bài viết.