In [67]:
sc
Out[67]:
In [68]:
usedcar=spark.read.csv("data/usedcarsales.csv",header=True,inferSchema=True)
In [69]:
# NOTES FOR IMPORT
# UPLOAD THE DATA USING MyData TAB RIGHT HAND TOP CORNER FIRST. ONCE UPLOADED SEE IN DATA FOLDER
# RIGHT CLICK ON THE FILE AND FIND COPY PATH SECOND OPTION FROM BOTTOM PASTE IN DOUBLE QUOTES
# DO NOT FORGET THE HEADER & INFERSCHEMA ARGUMENTS AS TRUE ELSE DATA WILL IPORTED AS STRINGS
In [70]:
usedcar.printSchema() # FOR CHECKING THE INDIVUDAL DATA TYPES. STANDARD COMMAND TO BE USED ON ANY DATA
In [71]:
usedcar.describe().toPandas().transpose()
# BASIC DESCRIPTIVE STATISTICS - MEAN, STANDARD DEVIATION MIN & MAX
Out[71]:
In [72]:
# THE BELOW COMMANDS ARE FOR SPLITTING THE DATA FRAME INTO TWO NUMERIC & STRING
# THIS OPERATION IS DONE FOR DOING THE CLEANING AND PREPROCESSING LIKE MISSING VALUES
# NUMERIC MISSING VALUES - IMPUTE WITH MEAN OR MEDIAN OR CAP WITH MIN OR MAX
# STRING OR CATEGORICAL MISSING VALUES - IMPUTE WITH MODE OR MOST FREQUENT
In [73]:
integercols=[item[0] for item in usedcar.dtypes if item[1]=='int'] # for integer - "int" not "integer"
In [74]:
stringcols=[item[0] for item in usedcar.dtypes if item[1]=='string']
In [75]:
# StringIndexer encodes a string column of labels to a column of label indices.
# The indices are in [0, numLabels), ordered by label frequencies,
# so the most frequent label gets index 0.
# THE BELOW STRING INDEXER IS STANDARD INDEXER CAN BE USED FOR ANY TYPE OF DATA USING THE FOR LOOP
# ONLY ONCE DO THE STRING INDEXING & DO NOT DO STRING INDEXING ON NUMERICAL DATA
# SPLIT DATA INTO STRING FIRST AND RUN THE INDEXER ONLY ON STRING DATA
In [76]:
from pyspark.ml.feature import StringIndexer
In [77]:
stringindex=[StringIndexer(inputCol=c,outputCol="stringindex_"+c)
for c in stringcols] # stringcols=['FuelType','AutoType','MetColorType']
In [78]:
# THE ABOVE STRINGINDEXER RUNS ON THE THREE STRING COLS AND CREATES NEW COLUMN WITH DUMMY VARIABLES
# NEW COLUMNS WIL BE ADDED AT THE END WITH STRINGINDEX + RELEVANT COLUMN NAME
# THE STRING COLUMNS ARE RETAINED AS IT IS & NEW COLUMNS ARE ADDED AT THE END
In [79]:
from pyspark.ml import Pipeline
In [80]:
# A Pipeline is specified as a sequence of stages, and each stage is either a Transformer or an Estimator.
# These stages are run in order, and the input DataFrame is transformed as it passes through each stage.
# For Transformer stages, the transform() method is called on the DataFrame.
# For Estimator stages, the fit() method is called to produce a Transformer (which becomes part of the
# PipelineModel, or fitted Pipeline),
# and that Transformer’s transform() method is called on the DataFrame.
In [81]:
pipeline=Pipeline(stages=stringindex)
In [82]:
pipelinemodel=pipeline.fit(usedcar)
In [83]:
usedcar1=pipelinemodel.transform(usedcar)
In [84]:
# CALL THE FUNCTION AND RENAME IT TO SMALL NAME. THEN FIT THE FUNCTION ON DATAFRAME
# ONCE FIT TRANSFORM MUST BE DONE. IF TRANSFORM NOT DONE THE PROCESS FAILS WITH ERROR
In [85]:
usedcar1.printSchema() # OBSERVE THE LAST 3 COLUMNS SPECIFICALLY WITH STRINGINDEX PREFIX
In [86]:
doublecols=[item[0] for item in usedcar1.dtypes if item[1]=='double']
integercols=[item[0] for item in usedcar1.dtypes if item[1]=='int']
In [87]:
finalcols=integercols+doublecols # THIS IS A LIST NOT DATA FRAME
In [88]:
usedcar2=usedcar1.select(finalcols) # THIS STEP CREATES THE DATAFRAME
In [89]:
# THE ABOVE TWO STEPS WILL RESELECT THE REQUIRED COLUMNS AND CREATES THE NEW DATA FRAME.
In [90]:
usedcar2.printSchema()
In [91]:
from pyspark.ml.linalg import DenseVector
In [92]:
# A dense vector is backed by a double array representing its entry values,
# while a sparse vector is backed by two parallel arrays: indices and values.
# For example, a vector (1.0, 0.0, 3.0) can be represented in dense format as [1.0, 0.0, 3.0]
# in sparse format as (3, [0, 2], [1.0, 3.0]), where 3 is the size of the vector.
# FOR RUNNING THE MACHINE LEARNING ALGORITHMS THE DENSE VECTOR FORMAT IS A MUST
# THE COLUMN NAMES OF DENSE VECTOR MUST BE FEATURES(INDEPENDENT VARIABLES) & LABEL(DEPENDENTVARIABLE)
In [93]:
usedcardf=usedcar2.rdd.map(lambda x:(DenseVector(x[2:11]),x[1])).toDF(['features','label'])
# DELETING COLUMN O AS IT IS SERIAL NUMBER COLUMN AND DELETED LAST TWO COLUMNS AUTOTYPE & METCOLORYTYPE
# AS THEY ARE DUPLICATED COLUMNS AUTOMATIC AND METCOLOR
In [94]:
usedcardf.show(5)
In [95]:
from pyspark.ml.regression import LinearRegression
In [96]:
LinReg=LinearRegression(featuresCol='features',labelCol='label')
In [97]:
LinRegmodel=LinReg.fit(usedcardf)
In [98]:
LinRegmodel.intercept
Out[98]:
In [99]:
print("Coefficients: "+ str(LinRegmodel.coefficients))
In [100]:
LinRegmodel.summary.r2
Out[100]:
In [101]:
LinRegmodel.summary.rootMeanSquaredError
Out[101]:
In [102]:
LinRegmodel.summary.pValues
Out[102]:
In [103]:
from pyspark.ml.regression import DecisionTreeRegressor
In [104]:
DecTree=DecisionTreeRegressor(featuresCol='features',labelCol='label')
In [105]:
DecTreemodel=DecTree.fit(usedcardf)
In [106]:
DecTreemodelpredict=DecTreemodel.transform(usedcardf)
In [107]:
from pyspark.ml.evaluation import RegressionEvaluator
In [108]:
evaluator = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="rmse")
In [109]:
rmse = evaluator.evaluate(DecTreemodelpredict)
print("Root Mean Squared Error (RMSE) on usedcardf data = %g" % rmse)
In [110]:
from pyspark.ml.regression import RandomForestRegressor
In [111]:
# Random forests are ensembles of decision trees.
# Random forests combine many decision trees in order to reduce the risk of overfitting.
# The spark.ml implementation supports random forests for binary and multiclass classification
# and for regression, using both continuous and categorical features.
In [112]:
RF=RandomForestRegressor(featuresCol='features',labelCol='label',numTrees=1000)
In [113]:
Rfmodel=RF.fit(usedcardf)
In [114]:
Rfmodelpredict=Rfmodel.transform(usedcardf)
In [115]:
Rfrmse = evaluator.evaluate(Rfmodelpredict)
print("Root Mean Squared Error (RMSE) on usedcardf data = %g" % Rfrmse)
In [116]:
from pyspark.ml.regression import GBTRegressor
In [117]:
# Gradient-Boosted Trees (GBTs) are ensembles of decision trees.
# GBTs iteratively train decision trees in order to minimize a loss function.
# The spark.ml implementation supports GBTs for binary classification and for regression,
# using both continuous and categorical features.
In [118]:
GBTree=GBTRegressor(featuresCol='features',labelCol='label')
In [119]:
GBTreemodel=GBTree.fit(usedcardf)
In [120]:
GBTreemodelpredict=GBTreemodel.transform(usedcardf)
In [121]:
GBTrmse = evaluator.evaluate(GBTreemodelpredict)
print("Root Mean Squared Error (RMSE) on usedcardf data = %g" % GBTrmse)
In [ ]:
No comments:
Post a Comment