Setup Environment

In [None]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://archive.apache.org/dist/spark/spark-2.4.3/spark-2.4.3-bin-hadoop2.6.tgz
!tar -xvf spark-2.4.3-bin-hadoop2.6.tgz
!pip install -q findspark
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.3-bin-hadoop2.6"
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()

In [None]:
!wget https://raw.githubusercontent.com/futurexskill/bigdata/master/bank_prospects.csv

In [4]:
!ls

bank_prospects.csv  spark-2.4.3-bin-hadoop2.6
sample_data	    spark-2.4.3-bin-hadoop2.6.tgz


Read inputdata

In [5]:
bankProspectDF= spark.read.csv('bank_prospects.csv', header=True)

In [6]:
bankProspectDF.show()

+----+------+------+-------+---------+
| Age|Salary|Gender|Country|Purchased|
+----+------+------+-------+---------+
|  18| 20000|  Male|Germany|        N|
|  19| 22000|Female| France|        N|
|  20| 24000|Female|England|        N|
|  21|  null|  Male|England|        N|
|  22| 50000|  Male| France|        Y|
|  23| 35000|Female|England|        N|
|  24|  null|  Male|Germany|        N|
|  25| 32000|Female| France|        Y|
|null| 35000|  Male|Germany|        N|
|  27| 37000|Female| France|        N|
|  27| 37000|Female|unknown|        N|
+----+------+------+-------+---------+



Remove records with unknown data

In [9]:
bankprospectDF1 = bankProspectDF.filter(bankProspectDF['country'] != 'unknown')

In [10]:
bankprospectDF1.show()

+----+------+------+-------+---------+
| Age|Salary|Gender|Country|Purchased|
+----+------+------+-------+---------+
|  18| 20000|  Male|Germany|        N|
|  19| 22000|Female| France|        N|
|  20| 24000|Female|England|        N|
|  21|  null|  Male|England|        N|
|  22| 50000|  Male| France|        Y|
|  23| 35000|Female|England|        N|
|  24|  null|  Male|Germany|        N|
|  25| 32000|Female| France|        Y|
|null| 35000|  Male|Germany|        N|
|  27| 37000|Female| France|        N|
+----+------+------+-------+---------+



In [12]:
bankprospectDF1.printSchema()

root
 |-- Age: string (nullable = true)
 |-- Salary: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Purchased: string (nullable = true)



In [14]:
from pyspark.sql.types import IntegerType,FloatType

In [16]:
bankprospectDF2 = bankprospectDF1.withColumn('age', bankprospectDF1['age'].cast(IntegerType())).withColumn('salary', bankprospectDF1['salary'].cast(FloatType()))

In [17]:
bankprospectDF2.printSchema()

root
 |-- age: integer (nullable = true)
 |-- salary: float (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Purchased: string (nullable = true)



Derive mean values for age and salary

In [18]:
from pyspark.sql.functions import mean

In [20]:
mean_age_val = bankprospectDF2.select(mean(bankprospectDF2['age'])).collect()

In [21]:
type(mean_age_val)

list

In [33]:
mean_age = mean_age_val[0][0]

In [34]:
mean_age

22.11111111111111

In [28]:
mean_salary_val = bankprospectDF2.select(mean(bankprospectDF2['salary'])).collect()

In [29]:
mean_salary = mean_salary_val[0][0]

In [30]:
mean_salary

31875.0

In [31]:
bankprospectDF2.show()

+----+-------+------+-------+---------+
| age| salary|Gender|Country|Purchased|
+----+-------+------+-------+---------+
|  18|20000.0|  Male|Germany|        N|
|  19|22000.0|Female| France|        N|
|  20|24000.0|Female|England|        N|
|  21|   null|  Male|England|        N|
|  22|50000.0|  Male| France|        Y|
|  23|35000.0|Female|England|        N|
|  24|   null|  Male|Germany|        N|
|  25|32000.0|Female| France|        Y|
|null|35000.0|  Male|Germany|        N|
|  27|37000.0|Female| France|        N|
+----+-------+------+-------+---------+



Replace missing values with mean values

In [35]:
bankprospectDF3 = bankprospectDF2.na.fill(mean_age,['age'])

In [36]:
bankprospectDF3.show()

+---+-------+------+-------+---------+
|age| salary|Gender|Country|Purchased|
+---+-------+------+-------+---------+
| 18|20000.0|  Male|Germany|        N|
| 19|22000.0|Female| France|        N|
| 20|24000.0|Female|England|        N|
| 21|   null|  Male|England|        N|
| 22|50000.0|  Male| France|        Y|
| 23|35000.0|Female|England|        N|
| 24|   null|  Male|Germany|        N|
| 25|32000.0|Female| France|        Y|
| 22|35000.0|  Male|Germany|        N|
| 27|37000.0|Female| France|        N|
+---+-------+------+-------+---------+



In [37]:
bankprospectDF4 = bankprospectDF3.na.fill(mean_salary,['salary'])

In [38]:
bankprospectDF4.show()

+---+-------+------+-------+---------+
|age| salary|Gender|Country|Purchased|
+---+-------+------+-------+---------+
| 18|20000.0|  Male|Germany|        N|
| 19|22000.0|Female| France|        N|
| 20|24000.0|Female|England|        N|
| 21|31875.0|  Male|England|        N|
| 22|50000.0|  Male| France|        Y|
| 23|35000.0|Female|England|        N|
| 24|31875.0|  Male|Germany|        N|
| 25|32000.0|Female| France|        Y|
| 22|35000.0|  Male|Germany|        N|
| 27|37000.0|Female| France|        N|
+---+-------+------+-------+---------+



Export transformation to csv file

In [39]:
bankprospectDF4.write.format('csv').save('bank_prospects_transformed')

In [40]:
!ls

bank_prospects.csv	    spark-2.4.3-bin-hadoop2.6
bank_prospects_transformed  spark-2.4.3-bin-hadoop2.6.tgz
sample_data


In [41]:
!ls bank_prospects_transformed/

part-00000-6e414f24-1ef1-4b71-85e7-ce0bb4859d9b-c000.csv  _SUCCESS


In [42]:
!cat bank_prospects_transformed/part-00000-6e414f24-1ef1-4b71-85e7-ce0bb4859d9b-c000.csv

18,20000.0,Male,Germany,N
19,22000.0,Female,France,N
20,24000.0,Female,England,N
21,31875.0,Male,England,N
22,50000.0,Male,France,Y
23,35000.0,Female,England,N
24,31875.0,Male,Germany,N
25,32000.0,Female,France,Y
22,35000.0,Male,Germany,N
27,37000.0,Female,France,N
