Trong quá trình làm việc với các dự án bigdata, việc phải sử dụng PySpark để đọc dữ liệu từ Hive Table sau đó ghi vào 1 CSDL quan hệ như SQL Server hoặc PostgreQuery thường xuyên phải sử dụng. Vì thế, hôm nay mình quyết định viết 1 bài hướng dẫn để những bạn mới làm quen với bigdata có một demo nhỏ tham khảo trước khi bắt tay vào tìm hiểu sâu hơn các công nghệ xung quanh

Demo này tạo 1 python script sử dụng pySpark để đọc dữ liệu từ Hive Table thành 1 DataFrame và thực hiện thao tác trên DataFrame sau đó ghi kết quả tìm được ra các hệ quản trị cơ sở dữ liệu quan hệ, cụ thể ở demo này là PostgreQuery. Phiên bản Spark mình sử dụng là 1.6.2

1. Lấy dữ liệu từ CSV nhập vào Hive Table

Ở đây mình sử dụng dữ liệu tổng hợp các tội phạm của thành phố Chicago. Dữ liệu định dạng CSV, chúng ta có thể tải từ link bên dưới
Dữ liệu tội phạm CSV

2. Sao chép dữ liệu CSV sang HDFS

Mình sẽ tạo một số thư mục ở HDFS để lưu data, sau đó mình sẽ set các quyền để các thư mục của mình có quyền được đọc ghi

[hdfs@jyoung-hdp25-node-1 ~]$ hadoop fs -mkdir -p /tmp/crime
[hdfs@jyoung-hdp25-node-1 ~]$ hadoop fs -chmod -R 777 /tmp/crime
[hdfs@jyoung-hdp25-node-1 ~]$ hadoop fs -copyFromLocal /tmp/crime/crime /tmp/crime/
[hdfs@jyoung-hdp25-node-1 ~]$ hadoop fs -chown -R ambari-qa:hdfs /tmp/crime
[hdfs@jyoung-hdp25-node-1 ~]$ hadoop fs -chown -R ambari-qa:hdfs /tmp/crime/crime
[hdfs@jyoung-hdp25-node-1 ~]$ hadoop fs -chmod -R 777 /tmp/crime/crime
[hdfs@jyoung-hdp25-node-1 ~]$ hadoop fs -ls /tmp/crime/
Found 1 items
-rwxrwxrwx   3 ambari-qa hdfs 1452392942 2016-10-02 04:27 /tmp/crime/crime
[hdfs@jyoung-hdp25-node-1 ~]$ exit

Ở đây, chúng ta đã tạo 1 thư mục trên HDFS là /tmp/crime, sau đó chúng ta copy data đã lưu ở local lên HDFS và set quyền cho thư mục

3. Tạo 1 file HQL (DDL commands) sử dụng để tạo và insert dữ liệu vào Hive Table

[hive@jyoung-hdp25-node-1 ~]$ cat << EOF > /tmp/load_crime_table.txt
CREATE EXTERNAL TABLE IF NOT EXISTS crime(
    ID STRING,
    Case_Number STRING,
    Case_Date STRING,
    Block STRING,
    IUCR INT,
    Primary_Type STRING,
    Description STRING,
    Location_Description STRING,
    Arrest BOOLEAN,
    Domestic BOOLEAN,
    Beat STRING,
    District STRING,
    Ward STRING,
    Community_Area STRING,
    FBI_Code STRING,
    X_Coordinate INT,
    Y_Coordinate INT,
    Case_Year INT,
    Updated_On STRING,
    Latitude DOUBLE,
    Longitude DOUBLE,
    Location STRING)
COMMENT 'This is crime data for the city of Chicago.'
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde'
STORED AS TEXTFILE
LOCATION '/tmp/crime'
TBLPROPERTIES("skip.header.line.count"="1");
EOF

Ở đây chúng ta đã tạo 1 Hive QL với câu Query tạo bảng trên Hive, và lưu vào thư mục /tmp/crime chúng ta đã tạo ở trên.

4. Tạo bảng Hive sử dụng file Hive QL đã tạo

Chúng ta sẽ sử dụng beeline để thực hiện việc tạo bảng Hive Table từ bên ngoài. Nếu bạn đã từng làm việc với Hive thì việc sử dụng beeline không còn gì mới lạ. Nếu bạn chưa biết gì về beeline hãy tham khảo tại link sau
Beeline – Command Line Shell

[hive@jyoung-hdp25-node-1 ~]$ /usr/bin/beeline -u "jdbc:hive2://jyoung-hdp25-node-2.openstacklocal:10000/default;principal=hive/_HOST@EXAMPLE.COM" -f "/tmp/load_crime_table.txt"
[hive@jyoung-hdp25-node-1 ~]$ exit

5. Tạo file SQL file (DDL Commands) để tạo bảng trên PostgreSQL

[root@jyoung-hdp25-node-1 ~]# cat << EOF /tmp/create_pettytheft.sql
CREATE table pettytheft(
  id BIGINT ,
  case_number VARCHAR(255) NOT NULL ,
  primary_type VARCHAR(255) NOT NULL,
  description VARCHAR(255) NOT NULL,
  location_description VARCHAR(255) NOT NULL,
  beat VARCHAR(255) NOT NULL,
  district VARCHAR(255) NOT NULL,
  ward VARCHAR(255) NOT NULL,
  community_area VARCHAR(255) NOT NULL,
  PRIMARY KEY(id)
);

GRANT ALL ON pettytheft TO ambari;
ALTER TABLE pettytheft OWNER TO ambari;
EOF
[root@jyoung-hdp25-node-1 ~]# chmod 777 /tmp/create_pettytheft.sql

6. Tạo DB và table sử dụng file SQL từ bước 5

Chúng ta sử dụng 1 tiện ích là psql để tạo 1 database từ file SQL đã tạo ở bên trên

[root@jyoung-hdp25-node-1 ~]# su - postgres
-bash-4.2$ psql -d postgres -f /tmp/create_pettytheft.sql
-bash-4.2$ exit

7. Tạo python script để đọc dữ liệu từ Hive và ghi dữ liệu tới PostgreSQL table

Chúng ta sẽ tạo 1 file python script ở thư mục tmp trên HDFS /tmp/pyspark_hive_jdbc_demo.py

Ta sẽ đi qua từng đoạn code trong file python script này để xem chúng làm công việc gì

import những module cần thiết cho pySpark

from pyspark import SparkContext
from pyspark.sql import HiveContext

Tạo 1 Spark Context, sau đó sử dụng Spark Context để tạo Hive Context Object, việc này giúp chúng ta thực hiện SQL query như Hive Command

sc = SparkContext("local", "pySpark Hive JDBC Demo App")
# Create a Hive Context
hive_context = HiveContext(sc)

Đọc data từ Hive Table 'crime' với db tên là default. Kết quả sẽ trả về 1 dataframe

print "Reading Hive table..."
crime = hive_context.table("default.crime")

Đăng ký dataframe thành 1 bảng tạm thời

print "Registering DataFrame as a table..."
crime.registerTempTable("crime_temp")

Thực hiện truy vấn SQL trên bảng tạm thời này để lấy ra những vụ trộm tài sản dưới $500 và kết quả trả về 1 dataframe khác

print "Executing SQL SELECT query on DataFrame registered as a temporary table..."
pettythefts = hive_context.sql('SELECT * FROM crime_temp WHERE Primary_Type = "THEFT" AND Description = "$500 AND UNDER"')

Tạo 1 dataframe mới chỉ chứa các cột mà chúng ta muốn ghi vào CSDL quan hệ bằng cách sử dụng select[list of columns]

print "Creating a DataFrame of only the columns of our resultset to be persisted to JDBC DataSource..."
pettythefts_table_df = pettythefts.select("id", "case_number", "primary_type", "description", "location_description", "beat", "district", "ward", "community_area")

Chuẩn bị config kết nối tới CSDL quan hệ

mode = 'overwrite'
url = 'jdbc:postgresql://<database server IP address>:5432/postgres?searchpath=public'
properties = {"user": "<username>", "password": "<password>", "driver": "org.postgresql.Driver"}
table = 'public.pettytheft'

Ghi nội dung của Dataframe tới CSDL quan hệ sử dụng connection config đã định nghĩa bên trên

print "Writing DataFrame to JDBC Datasource..."
pettythefts_table_df.write.jdbc(url=url, table=table, mode=mode, properties=properties)

print "Exiting..."

File được save ở thư mục /tmp/pyspark_hive_jdbc_demo.py

8. Chạy python script bằng Spark

[root@jyoung-hdp25-node-1 ~]# su - yarn
[yarn@jyoung-hdp25-node-1 ~]$ /bin/spark-submit --jars /usr/share/java/postgresql-jdbc.jar --driver-class-path /usr/share/java/postgresql-jdbc.jar --master local[1] /tmp/pyspark_hive_jdbc_demo.py
[yarn@jyoung-hdp25-node-1 ~]$ exit

9. Confirm data đã lưu vào CSDL quan hệ

[root@jyoung-hdp25-node-1 ~]# su - postgres
-bash-4.2$ psql -d postgres -U <username> -c "select * from public.pettytheft limit 5;"
Password for user ambari: 
   id    | case_number | primary_type |  description   | location_description | beat | district | ward | community_area 
---------+-------------+--------------+----------------+----------------------+------+----------+------+----------------
 8503954 | HV180410    | THEFT        | $500 AND UNDER | STREET               | 1114 | 011      | 28   | 26
 8503999 | HV180357    | THEFT        | $500 AND UNDER | STREET               | 2412 | 024      | 50   | 2
 8504003 | HV180508    | THEFT        | $500 AND UNDER | STREET               | 0725 | 007      | 17   | 67
 8504108 | HV180682    | THEFT        | $500 AND UNDER | SIDEWALK             | 0123 | 001      | 2    | 32
 8504109 | HV180672    | THEFT        | $500 AND UNDER | STREET               | 1911 | 019      | 47   | 5
(5 rows)

10. Tóm tắt

Trong bài demo trên, chúng ta đã thực hiện những công việc sau

  • Tạo 1 Hive Table

  • Insert data từ file CSV vào Hive Table

  • Tạo 1 CSDL quan hệ (PostgreSQL)

  • Tạo 1 pySpark script:

    • Đọc dữ liệu từ Hive Table - Lưu dưới dạng dataframe
    • Đăng ký dataframe vào bảng tạm
    • Thực thi câu SQL trên bảng tạm
    • Thực thi select() và trả về dữ liệu dạng dataframe
    • Ghi dữ liệu tìm được vào PostgreSQL
  • Confirm dữ liệu đã lưu vào PostgreSQL

Trên đây là 1 bài demo nhỏ để các bạn làm quen với HDFS, pySpark, Hive. Chúng ta sẽ gặp nhau ở những bài viết về bigdata tiếp theo