All about DataSince, DataEngineering and ComputerScience
View the Project on GitHub datainsightat/DataScience_Examples
In an HDFS context, Hive writes its tables on disks, which reduces speed. Spark keeps its data in memory, to improve performance. Spark can be used for batch processing and datastreaming.
Spark is written in Scala.
Hive > MapReduce > YARN > HDFS
Spark > YARN|Mesos > HDFS|AWS|Azure|Blob|Relational DB|No SQL
Driver Program
Spark Session
|
|-------|-------|
Worker Worker Worker
GCP > Dataproc > Create a Cluster > SSH
$ wget https://raw.githubusercontent.com/futurexskill/bigdata/master/retailstore.csv
$ hadoop fs -mkdir /user/newuser
$ hadoop fs -mkdir /user/newuser/data
$ haddop fs -put retailstore.csv data/
$ pyspark
>>> spark.read.csv("data/retailstore.csv").show()
>>> spark.read.option("header","True").csv("data/retailstore.csv").show()
+----+------+------+-------+---------+
| Age|Salary|Gender|Country|Purchased|
+----+------+------+-------+---------+
| 18| 20000| Male|Germany| N|
| 19| 22000|Female| France| N|
$ hive
hive> create database if not exists retail;
hive> use retail;
hive> create table retailcust (age int, salary float, gender string, country string, purchased string) row format delimited fields terminated by ',' location '/user/newuser/data/' tblproperties ("skip.header.line.count"="1");
$ spark
>>> spark.sql("select * from retail.retailcust").show()
$ spark-shell
scala> spark.read.option("header","true").csv("data/retailstore.csv").show()
+----+------+------+-------+---------+
| Age|Salary|Gender|Country|Purchased|
+----+------+------+-------+---------+
| 18| 20000| Male|Germany| N|
| 19| 22000|Female| France| N|
scala> spark.sql("select * from retail.retailcust").show()
Colab > File > New Notebook
colab> !wget https://raw.githubusercontent.com/futurexskill/bigdata/master/retailstore.csv
colab> !wget https://dlcdn.apache.org/spark/spark-3.1.2/spark-3.1.2-bin-hadoop3.2.tgz
colab> !tar -xvf spark-3.1.2-bin-hadoop3.2.tgz
colab> !ls
Check, if JVM11 is available (java-11-openjdk-amd64):
colab> !ls /usr/lib/jvm/
colab> import os
colab> os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
colab> os.environ["SPARK_HOME"] = "/content/spark-3.1.2-bin-hadoop3.2"
colab> !pip install findspark
colab> import findspark
colab> findspark.init()
colab> !pip install pyspark
colab> from pyspark.sql import SparkSession
colab> spark = SparkSession.builder.master("local[*]").getOrCreate()
colab> spark.read.option("header",True).csv("retailstore.csv").show()
CSV-File, Hive Table > Spark Dataframe > Transform 1 > Transform 2 > Transform n > persist
Spark is used for high-volume data.
Hold data in row-column format in memory.
+----+------+------+-------+---------+
| Age|Salary|Gender|Country|Purchased|
+----+------+------+-------+---------+
| 18| 20000| Male|Germany| N|
| 19| 22000|Female| France| N|
+----+------+------+-------+---------+
The fundamental building blocks of Spark. Data ist splitted in multiple, redundant RDDs and distributed across different nodes. RDDs are held in memory and can be operated in parallel. Spark evaluates transformations ‘lazyly’. Only if you: collect(), first(), head(), etc Sprak is working on the data.
colab> !wget https://dlcdn.apache.org/spark/spark-3.1.2/spark-3.1.2-bin-hadoop3.2.tgz
colab> !tar -xvf spark-3.1.2-bin-hadoop3.2.tgz
colab> import os
colab> os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
colab> os.environ["SPARK_HOME"] = "/content/spark-3.1.2-bin-hadoop3.2"
colab> !pip install findspark
colab> import findspark
colab> findspark.init()
colab> from pyspark.sql import SparkSession
colab> from pyspark import SparkContext
colab> spark = SparkSession.builder.master("local[*]").getOrCreate()
colab> sc = spark.sparkContext
create RDD
colab> my_rdd = sc.parallelize([20,40,50,60,70])
colab> my_rdd.collect()
colab> !wget https://raw.githubusercontent.com/futurexskill/bigdata/master/retailstore.csv
colab> my_csv_rdd = sc.textFile('retailstore.csv')
colab> my_csv_rdd.collect()
colab> my_csv_rdd.head()
colab> for line in my_csv_rdd.collevt():
print(line)
Map calls an function an applies it on every line of the RDD. The number of lines of the old and the new rdd are the same.
newRDD = oldrdd.map(function)
colab> my_csv_rdd_2 = my_csv_rdd.map(lambda x : x.replace("Male","M"))
colab> my_csv_rdd_2.collect()
newRDD = oldrdd.filter(function)
colab> femaleCustomers = my_csv_rdd_2.filter(lambda x: "Female" in x)
colab> femaleCustomers.collect()
Works like map, but the new rdd has more lines than the old one.
colab> words = femaleCustomers.flatMap(lambda line: line.split(","))
colab> words.collect()
colab> words.count()
Combine two rdds.
colab> rdd1 = sc.parallelize(["a","b","c","d","e"])
colab> rdd2 = sc.parallelize(["c","e","k","l"])
colab> rdd1.union(rdd2).collect()
colab> rdd1.union(rdd2).distinct().collect()
colab> rdd1.intersection(rdd2).collect()
def transformRDD(customer):
words = customer.split(",")
if words[2] == "Male":
words[2] = "0"
else:
words[2] = "1"
if words[4] == "N":
words[4] = "0"
else:
words[4] = "1"
words[3] = words[3].upper()
return ",".join(words)
colab> my_csv_transform = my_csv_rdd.map(transformRDD)
colab> my_csv_transform.collect()
collect(), count(), take(), first()
[10,20,30,40]
sampleRDD.reduce(lambda a,b: a + b)
func(func(func(10+20),30),40)
colab> sampleRDD = sc.parallelize([10,20,30,40])
colab> sampleRDD.reduce(lambda a,b: a + b)
colab> customer_df = spark.read.csv("retailstore.csv",header=True)
colab> customer_df.show()
colab> customer_df.describe().show()
+-------+-----------------+-----------------+------+-------+---------+
|summary| Age| Salary|Gender|Country|Purchased|
+-------+-----------------+-----------------+------+-------+---------+
| count| 9| 8| 10| 10| 10|
| mean|22.11111111111111| 31875.0| null| null| null|
| stddev|2.934469476943168|9818.895777311942| null| null| null|
| min| 18| 20000|Female|England| N|
| max| 27| 50000| Male|Germany| Y|
+-------+-----------------+-----------------+------+-------+---------+
colab> country_df = customer_df.select("country")
colab> customer_df.groupBy("country").count().show()
+-------+-----+
|country|count|
+-------+-----+
|Germany| 3|
| France| 4|
|England| 3|
+-------+-----+
colab> customer_df.createOrReplaceTempView("customer")
colab> results = spark.sql("select * from customer")
colab> results.show()
colab> new_results = spark.sql("select * from customer where age > 22")
colab> new_results.show()
Dataframes are immutable. To change a dataframe, create a copy.
colab> filtered_df = customer_df.filter('age>22')
colab> filtered_df.show()
colab> customer_df.groupBy("gender").agg({"salary":"avg","age":"max"}).show()
colab> customer_df.select(["age","salary"]).show()
colab> customer_df.withColumn('doubleSalary',customer_df['salary']*2).show()
colab> new_customer_df = customer_df.withColumn('doubleSalary',customer_df['salary']*2)
colab> new_customer_df.show()
colab> customer_df.withColumnRenamed('salary','income').show()
colab> customer_df.filter("salary > 30000").select('age').show()
colab> spark.sql("select age from customer where salary > 30000").show()
colab> customer_df.filter("salary > 30000 and salary < 40000").select('age').show()
colab> spark.sql("select age from customer where salary > 30000 and salary < 40000").show()
colab> from pyspark.sql.functions import countDistinct, avg, stddev
colab> customer_df.select(countDistinct("country").alias("Distinct Countries")).show()
colab> customer_df.select(avg('age')).show()
colab> salary_std = ustomer_df.select(stddev('age'))
colab> from pyspark.sql.functions import format_number
colab> salary_std.select(format_number('std',2)).show()
colab> customer_df.orderBy('salary').show()
colab> from pyspark.sql.functions import col
colab> customer_df.orderBy(col('salary').desc()).show()
colab> new_df = customer_df.drop('pruchased')
colab> customer_df.na.drop().show()
colab> customer_df.na.fill('NEW VALUE').show()