Sunday, October 14, 2018

Apache Spark PySpark Case Study - Used Car Sales Regression Techniques

UsedCarSalesRegressionPySpark
In [67]:
sc
Out[67]:

SparkContext

Spark UI

Version
v2.3.1
Master
local[*]
AppName
PySparkShell
In [68]:
usedcar=spark.read.csv("data/usedcarsales.csv",header=True,inferSchema=True)
In [69]:
# NOTES FOR IMPORT
# UPLOAD THE DATA USING MyData TAB RIGHT HAND TOP CORNER FIRST. ONCE UPLOADED SEE IN DATA FOLDER
# RIGHT CLICK ON THE FILE AND FIND COPY PATH SECOND OPTION FROM BOTTOM PASTE IN DOUBLE QUOTES
# DO NOT FORGET THE HEADER & INFERSCHEMA ARGUMENTS AS TRUE ELSE DATA WILL IPORTED AS STRINGS
In [70]:
usedcar.printSchema()  # FOR CHECKING THE INDIVUDAL DATA TYPES. STANDARD COMMAND TO BE USED ON ANY DATA
root
 |-- _c0: integer (nullable = true)
 |-- Price: integer (nullable = true)
 |-- Age: integer (nullable = true)
 |-- KM: integer (nullable = true)
 |-- FuelType: string (nullable = true)
 |-- HP: integer (nullable = true)
 |-- MetColor: integer (nullable = true)
 |-- Automatic: integer (nullable = true)
 |-- CC: integer (nullable = true)
 |-- Doors: integer (nullable = true)
 |-- Weight: integer (nullable = true)
 |-- AutoType: string (nullable = true)
 |-- MetColorType: string (nullable = true)

In [71]:
usedcar.describe().toPandas().transpose()
# BASIC DESCRIPTIVE STATISTICS - MEAN, STANDARD DEVIATION MIN & MAX
Out[71]:
0 1 2 3 4
summary count mean stddev min max
_c0 1436 718.5 414.6818057257878 1 1436
Price 1436 10730.824512534818 3626.9645849102403 4350 32500
Age 1436 55.94707520891365 18.599988342806252 1 80
KM 1436 68533.25974930362 37506.448872189554 1 243000
FuelType 1436 None None CNG Petrol
HP 1436 101.50208913649026 14.981079675567686 69 192
MetColor 1436 0.674791086350975 0.46861604925657424 0 1
Automatic 1436 0.055710306406685235 0.22944133861584212 0 1
CC 1436 1566.8279944289693 187.1824363291632 1300 2000
Doors 1436 4.0334261838440115 0.9526766046325884 2 5
Weight 1436 1072.4596100278552 52.64112048693142 1000 1615
AutoType 1436 None None Auto Manual
MetColorType 1436 None None Metcolor NonMetcolor
In [72]:
# THE BELOW COMMANDS ARE FOR SPLITTING THE DATA FRAME INTO TWO NUMERIC & STRING 
# THIS OPERATION IS DONE FOR DOING THE CLEANING AND PREPROCESSING LIKE MISSING VALUES 
# NUMERIC MISSING VALUES - IMPUTE WITH MEAN OR MEDIAN OR CAP WITH MIN OR MAX
# STRING OR CATEGORICAL MISSING VALUES - IMPUTE WITH MODE OR MOST FREQUENT
In [73]:
integercols=[item[0] for item in usedcar.dtypes if item[1]=='int'] # for integer - "int" not "integer"
In [74]:
stringcols=[item[0] for item in usedcar.dtypes if item[1]=='string']
In [75]:
# StringIndexer encodes a string column of labels to a column of label indices. 
# The indices are in [0, numLabels), ordered by label frequencies,
#                     so the most frequent label gets index 0. 
# THE BELOW STRING INDEXER IS STANDARD INDEXER CAN BE USED FOR ANY TYPE OF DATA USING THE FOR LOOP
# ONLY ONCE DO THE STRING INDEXING & DO NOT DO STRING INDEXING ON NUMERICAL DATA
# SPLIT DATA INTO STRING FIRST AND RUN THE INDEXER ONLY ON STRING DATA 
In [76]:
from pyspark.ml.feature import StringIndexer 
In [77]:
stringindex=[StringIndexer(inputCol=c,outputCol="stringindex_"+c) 
             for c in stringcols] # stringcols=['FuelType','AutoType','MetColorType']
In [78]:
# THE ABOVE STRINGINDEXER RUNS ON THE THREE STRING COLS AND CREATES NEW COLUMN WITH DUMMY VARIABLES
# NEW COLUMNS WIL BE ADDED AT THE END WITH STRINGINDEX + RELEVANT COLUMN NAME 
# THE STRING COLUMNS ARE RETAINED AS IT IS & NEW COLUMNS ARE ADDED AT THE END
In [79]:
from pyspark.ml import Pipeline
In [80]:
# A Pipeline is specified as a sequence of stages, and each stage is either a Transformer or an Estimator. 
# These stages are run in order, and the input DataFrame is transformed as it passes through each stage.
# For Transformer stages, the transform() method is called on the DataFrame. 
# For Estimator stages, the fit() method is called to produce a Transformer (which becomes part of the 
# PipelineModel, or fitted Pipeline), 
# and that Transformer’s transform() method is called on the DataFrame.
In [81]:
pipeline=Pipeline(stages=stringindex)
In [82]:
pipelinemodel=pipeline.fit(usedcar)
In [83]:
usedcar1=pipelinemodel.transform(usedcar)
In [84]:
# CALL THE FUNCTION AND RENAME IT TO SMALL NAME. THEN FIT THE FUNCTION ON DATAFRAME 
# ONCE FIT TRANSFORM MUST BE DONE. IF TRANSFORM NOT DONE THE PROCESS FAILS WITH ERROR
In [85]:
usedcar1.printSchema() # OBSERVE THE LAST 3 COLUMNS SPECIFICALLY WITH STRINGINDEX PREFIX
root
 |-- _c0: integer (nullable = true)
 |-- Price: integer (nullable = true)
 |-- Age: integer (nullable = true)
 |-- KM: integer (nullable = true)
 |-- FuelType: string (nullable = true)
 |-- HP: integer (nullable = true)
 |-- MetColor: integer (nullable = true)
 |-- Automatic: integer (nullable = true)
 |-- CC: integer (nullable = true)
 |-- Doors: integer (nullable = true)
 |-- Weight: integer (nullable = true)
 |-- AutoType: string (nullable = true)
 |-- MetColorType: string (nullable = true)
 |-- stringindex_FuelType: double (nullable = false)
 |-- stringindex_AutoType: double (nullable = false)
 |-- stringindex_MetColorType: double (nullable = false)

In [86]:
doublecols=[item[0] for item in usedcar1.dtypes if item[1]=='double']
integercols=[item[0] for item in usedcar1.dtypes if item[1]=='int']
In [87]:
finalcols=integercols+doublecols # THIS IS A LIST NOT DATA FRAME
In [88]:
usedcar2=usedcar1.select(finalcols) # THIS STEP CREATES THE DATAFRAME
In [89]:
# THE ABOVE TWO STEPS WILL RESELECT THE REQUIRED COLUMNS AND CREATES THE NEW DATA FRAME.
In [90]:
usedcar2.printSchema()
root
 |-- _c0: integer (nullable = true)
 |-- Price: integer (nullable = true)
 |-- Age: integer (nullable = true)
 |-- KM: integer (nullable = true)
 |-- HP: integer (nullable = true)
 |-- MetColor: integer (nullable = true)
 |-- Automatic: integer (nullable = true)
 |-- CC: integer (nullable = true)
 |-- Doors: integer (nullable = true)
 |-- Weight: integer (nullable = true)
 |-- stringindex_FuelType: double (nullable = false)
 |-- stringindex_AutoType: double (nullable = false)
 |-- stringindex_MetColorType: double (nullable = false)

In [91]:
from pyspark.ml.linalg import DenseVector
In [92]:
# A dense vector is backed by a double array representing its entry values, 
# while a sparse vector is backed by two parallel arrays: indices and values. 
# For example, a vector (1.0, 0.0, 3.0) can be represented in dense format as [1.0, 0.0, 3.0]
# in sparse format as (3, [0, 2], [1.0, 3.0]), where 3 is the size of the vector.

# FOR RUNNING THE MACHINE LEARNING ALGORITHMS THE DENSE VECTOR FORMAT IS A MUST
# THE COLUMN NAMES OF DENSE VECTOR MUST BE FEATURES(INDEPENDENT VARIABLES) & LABEL(DEPENDENTVARIABLE)
In [93]:
usedcardf=usedcar2.rdd.map(lambda x:(DenseVector(x[2:11]),x[1])).toDF(['features','label'])
# DELETING COLUMN O AS IT IS SERIAL NUMBER COLUMN AND DELETED LAST TWO COLUMNS AUTOTYPE & METCOLORYTYPE
# AS THEY ARE DUPLICATED COLUMNS AUTOMATIC AND METCOLOR
In [94]:
usedcardf.show(5)
+--------------------+-----+
|            features|label|
+--------------------+-----+
|[23.0,46986.0,90....|13500|
|[23.0,72937.0,90....|13750|
|[24.0,41711.0,90....|13950|
|[26.0,48000.0,90....|14950|
|[30.0,38500.0,90....|13750|
+--------------------+-----+
only showing top 5 rows

In [95]:
from pyspark.ml.regression import LinearRegression
In [96]:
LinReg=LinearRegression(featuresCol='features',labelCol='label')
In [97]:
LinRegmodel=LinReg.fit(usedcardf)
In [98]:
LinRegmodel.intercept
Out[98]:
-6635.867399994437
In [99]:
print("Coefficients: "+ str(LinRegmodel.coefficients))
Coefficients: [-122.91984669359219,-0.016023361057631247,29.859896169290355,38.513561851052636,179.38304407378993,-1.3684020490676259,-65.39933191930807,23.051496825370904,-296.0000936943593]
In [100]:
 LinRegmodel.summary.r2
Out[100]:
0.8657147053312059
In [101]:
 LinRegmodel.summary.rootMeanSquaredError
Out[101]:
1328.6367897244356
In [102]:
LinRegmodel.summary.pValues
Out[102]:
[0.0,
 0.0,
 0.0,
 0.6118420013048862,
 0.2543674984358093,
 1.5863129689153155e-05,
 0.09815739330353734,
 0.0,
 0.06941297616856668,
 4.5791082037283104e-10]
In [103]:
from pyspark.ml.regression import DecisionTreeRegressor
In [104]:
DecTree=DecisionTreeRegressor(featuresCol='features',labelCol='label')
In [105]:
DecTreemodel=DecTree.fit(usedcardf)
In [106]:
DecTreemodelpredict=DecTreemodel.transform(usedcardf)
In [107]:
from pyspark.ml.evaluation import RegressionEvaluator
In [108]:
evaluator = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="rmse")
In [109]:
rmse = evaluator.evaluate(DecTreemodelpredict)
print("Root Mean Squared Error (RMSE) on usedcardf data = %g" % rmse)
Root Mean Squared Error (RMSE) on usedcardf data = 1118.17
In [110]:
from pyspark.ml.regression import RandomForestRegressor
In [111]:
# Random forests are ensembles of decision trees. 
# Random forests combine many decision trees in order to reduce the risk of overfitting. 
# The spark.ml implementation supports random forests for binary and multiclass classification 
# and for regression, using both continuous and categorical features.
In [112]:
RF=RandomForestRegressor(featuresCol='features',labelCol='label',numTrees=1000)
In [113]:
Rfmodel=RF.fit(usedcardf)
In [114]:
Rfmodelpredict=Rfmodel.transform(usedcardf)
In [115]:
Rfrmse = evaluator.evaluate(Rfmodelpredict)
print("Root Mean Squared Error (RMSE) on usedcardf data = %g" % Rfrmse)
Root Mean Squared Error (RMSE) on usedcardf data = 1145.37
In [116]:
from pyspark.ml.regression import GBTRegressor
In [117]:
# Gradient-Boosted Trees (GBTs) are ensembles of decision trees. 
# GBTs iteratively train decision trees in order to minimize a loss function. 
# The spark.ml implementation supports GBTs for binary classification and for regression, 
# using both continuous and categorical features.
In [118]:
GBTree=GBTRegressor(featuresCol='features',labelCol='label')
In [119]:
GBTreemodel=GBTree.fit(usedcardf)
In [120]:
GBTreemodelpredict=GBTreemodel.transform(usedcardf)
In [121]:
GBTrmse = evaluator.evaluate(GBTreemodelpredict)
print("Root Mean Squared Error (RMSE) on usedcardf data = %g" % GBTrmse)
Root Mean Squared Error (RMSE) on usedcardf data = 878.067
In [ ]:
 

No comments:

Post a Comment