In this blog post, I would like to demonstrate the steps to load RDBMS table (Oracle in this case) into Spark and Create an External table.
Before we start the actual steps, lets understand few of the definitions which are needed in this process,
Data Frame:
A Data Frame is a Data Set organized in Columns, it is more or less equivalent to a Table in RDBMS
Parquet:
Apache Parquet is a columnar storage format available to any project in the Hadoop ecosystem, regardless of the choice of data processing framework, data model or programming language.
Create a Data Frame:
[hadoop@node1 lib]$ spark-shell –driver-class-path ojdbc6.jar –jars ojdbc6.jar
Using Spark’s default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to “WARN”.
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ ‘_/
/___/ .__/\_,_/_/ /_/\_\ version 2.1.0
/_/
Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.7.0_80)
Type in expressions to have them evaluated.
Type :help for more information.
scala> val employees = spark.read.format(“jdbc”).option(“url”, “jdbc:oracle:thin:scott/tiger@//oracle:1521/sample”).option(“driver”, “oracle.jdbc.OracleDriver”).option(“dbtable”, “CUSTOMER_LOC”).load()
employees: org.apache.spark.sql.DataFrame = [CUSTOMER_ID: string, ADDRESS_TITLE: string … 36 more fields]
You can issue, the below syntax to check the structure of table.
Scala> employees.printSchema()
To view the top 5 rows of the table, issue the below command,
Scala>employees.show(5)
Register this Table as a temp table,
scala> employees.createOrReplaceTempView(“CUSTOMER_LOC”)
Create a directory in HDFS to store the external table in Parquet Format.
[hadoop@node1 ~]$ hadoop fs -mkdir -p /mnt/oracle/hadoop/hadoopdata/parquettest
scala> import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.SaveMode
scala> employees.select(“CUSTOMER_ID”,”ADDRESS_TITLE”,”ST_ADDRESS1″,”ST_ADDRESS2″,”CITY”,”COUNTY”,”POSTAL_CODE”,”STATE_CODE”,”STATE_NAME”,”STATE_REGION”,”COUNTRY_CODE”,”COUNTRY_NAME”,”COUNTRY_REGION”,”PHONE_NUM”,”FAX_NUM”,”EMAIL_ADDRESS”,”WEB_ADDRESS”,”AUTO_ROUTING_CODE”,”ADDR_LATITUDE”,”ADDR_LONGITUDE”,”PRMRY_CNTCT_NAME”,”ACTIVE_FLG”).write.mode(SaveMode.Overwrite).format(“parquet”).save(“/mnt/oracle/hadoop/hadoopdata/parquettest/CUSTOMER_LOC”)
Create a Hive Context and Create an External Table,
scala> import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.sql.hive.HiveContext
scala> val hc = new HiveContext(sc)
warning: there was one deprecation warning; re-run with -deprecation for details
scala> hc.sql(“create external table ET_CUSTOMER_LOC ( CUSTOMER_ID String,ADDRESS_TITLE String,ST_ADDRESS1 String,ST_ADDRESS2 String,CITY String,COUNTY String,POSTAL_CODE String,STATE_CODE String,STATE_NAME String,STATE_REGION String,COUNTRY_CODE String,COUNTRY_NAME String,COUNTRY_REGION String,PHONE_NUM String,FAX_NUM String,EMAIL_ADDRESS String,WEB_ADDRESS String,AUTO_ROUTING_CODE String,ADDR_LATITUDE Decimal(22,7),ADDR_LONGITUDE Decimal(22,7),PRMRY_CNTCT_NAME String,ACTIVE_FLG String) stored as PARQUET LOCATION ‘/mnt/oracle/hadoop/hadoopdata/parquettest/CUSTOMER_LOC'”)
res0: org.apache.spark.sql.DataFrame = []
This creates an external table in Parquet format (#Hadoop). Issue the below command to view the table,
scala> spark.sql(“SHOW TABLES”).show(false)
+——–+——————–+———–+
|database|tableName |isTemporary|
+——–+——————–+———–+
|default |et_customer_loc|false |
+——–+——————–+———–+
Count of table in Spark:
To Cross check the table Count in Source (Oracle Database) and in Target (HDFS), issue the below command,
scala> spark.sql(“select count(*) from et_customer_loc”)
res10: org.apache.spark.sql.DataFrame = [count(1): bigint]
scala> res10.show()
+——–+
|count(1)|
+——–+
| 204|
+——–+
This concludes loading Oracle (RDBMS) Data to Spark using Scala, Creating an external table and view the output.
#Hadoop #Oracle #Scala #Spark