Bussiness Goal: Predict if the post will be followed.
import findspark
findspark.init()
import pyspark.sql.functions as f
from pyspark.sql.functions import *
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.ml.feature import OneHotEncoder, StringIndexer, IndexToString, VectorAssembler
from pyspark.ml.classification import *
from pyspark.ml.regression import *
from pyspark.ml.evaluation import *
from pyspark.ml import Pipeline, Model
from pyspark.ml.pipeline import PipelineModel
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
spark = SparkSession.builder.appName("MLTask1").getOrCreate()
Make sure your SparkSession is active:
spark
SparkSession - in-memory
Create a DataFrame called df_in
, which nlp project result.
df_in = spark.read.parquet('s3://yl1269-labdata5/nlp_df/')
df_in.printSchema()
root |-- parent_id: string (nullable = true) |-- author: string (nullable = true) |-- id: string (nullable = true) |-- author_premium: boolean (nullable = true) |-- author_flair_richtext: string (nullable = true) |-- author_flair_text: string (nullable = true) |-- author_flair_text_color: string (nullable = true) |-- body: string (nullable = true) |-- collapsed: boolean (nullable = true) |-- controversiality: long (nullable = true) |-- no_follow: boolean (nullable = true) |-- score: long (nullable = true) |-- send_replies: boolean (nullable = true) |-- total_awards_received: long (nullable = true) |-- stickied: boolean (nullable = true) |-- author_created_time: string (nullable = true) |-- created_time: string (nullable = true) |-- post_date: string (nullable = true) |-- is_PtoP: integer (nullable = true) |-- is_BuckleUp: integer (nullable = true) |-- is_voted: integer (nullable = true) |-- author_postcnt: long (nullable = true) |-- is_popular_commented: integer (nullable = true) |-- is_weekend: integer (nullable = true) |-- author_age: integer (nullable = true) |-- gme_mentioned: integer (nullable = true) |-- sentiment: string (nullable = true) |-- blackrock_mentioned: boolean (nullable = true) |-- share_mentioned: boolean (nullable = true)
#See the column relationship
# adjust figure size and font size
sns.set(rc = {"figure.figsize":(20, 12)})
sns.set(font_scale=1)
# compute the correlation matrix using kendall method
corr = df_in.limit(100000).toPandas().corr(method='kendall')
mask = np.zeros_like(corr)
mask[np.triu_indices_from(mask)] = True
sns.heatmap(corr,annot=True, fmt='.2f', square=True, mask = mask, cmap="Blues");
#drop columns due to week relationship
df = df_in.drop("blackrock_mentioned","send_replies","gme_mentioned","share_mentioned","blackrock_mentioned","stickied")
df = df.dropna()
#change column value type
df = df.withColumn("collapsed",col("collapsed").cast(IntegerType()))\
.withColumn("no_follow",col("no_follow").cast(DoubleType()))
df.count()
5471500
train_data, test_data, predict_data = df.randomSplit([0.8, 0.18, 0.02], 24)
After splitting into three datasets, report the number of rows for each split.
print("Number of training records: " + str(train_data.count()))
print("Number of testing records : " + str(test_data.count()))
print("Number of prediction records : " + str(predict_data.count()))
Number of training records: 4376331
Number of testing records : 986010
[Stage 22:=====================================> (37 + 17) / 54]
Number of prediction records : 109159
train_data.printSchema()
root |-- parent_id: string (nullable = true) |-- author: string (nullable = true) |-- id: string (nullable = true) |-- author_premium: boolean (nullable = true) |-- author_flair_richtext: string (nullable = true) |-- author_flair_text: string (nullable = true) |-- author_flair_text_color: string (nullable = true) |-- body: string (nullable = true) |-- collapsed: integer (nullable = true) |-- controversiality: long (nullable = true) |-- no_follow: double (nullable = true) |-- score: long (nullable = true) |-- total_awards_received: long (nullable = true) |-- author_created_time: string (nullable = true) |-- created_time: string (nullable = true) |-- post_date: string (nullable = true) |-- is_PtoP: integer (nullable = true) |-- is_BuckleUp: integer (nullable = true) |-- is_voted: integer (nullable = true) |-- author_postcnt: long (nullable = true) |-- is_popular_commented: integer (nullable = true) |-- is_weekend: integer (nullable = true) |-- author_age: integer (nullable = true) |-- sentiment: string (nullable = true)
LABEL = "no_follow"
stringIndexer_sentiment = StringIndexer(inputCol="sentiment", outputCol="sentiment_ix")
stringIndexer_postdate = StringIndexer(inputCol="post_date", outputCol="post_date_ix")
grade_label_fit = stringIndexer_sentiment.fit(df)
grade_label_fit.labels
['positive', 'negative', 'neutral']
onehot_sentiment= OneHotEncoder(inputCol="sentiment_ix", outputCol="sentiment_vec")
onehot_postdate= OneHotEncoder(inputCol="post_date_ix", outputCol="post_date_vec")
vectorAssembler_features = VectorAssembler(
inputCols=[ 'collapsed',
'controversiality',
'score',
'total_awards_received',
'is_PtoP',
'is_BuckleUp',
'is_voted',
'author_postcnt',
'is_popular_commented',
'author_age',
'sentiment_ix',
'is_popular_commented',
'is_weekend'],
outputCol= "features")
#vectorAssembler_features.transform(train_data).select("features","user_activity").show(truncate = False)
lsvc= LinearSVC(labelCol=LABEL, featuresCol="features")
#ovr = OneVsRest(classifier=lsvc)
Let's build the pipeline now. A pipeline consists of transformers and an estimator.
pipeline_dt = Pipeline(stages=[stringIndexer_sentiment,
onehot_sentiment,
vectorAssembler_features,
lsvc])
model_lsvc = pipeline_dt.fit(train_data)
model_lsvc.transform(train_data)
DataFrame[parent_id: string, author: string, id: string, author_premium: boolean, author_flair_richtext: string, author_flair_text: string, author_flair_text_color: string, body: string, collapsed: int, controversiality: bigint, no_follow: double, score: bigint, total_awards_received: bigint, author_created_time: string, created_time: string, post_date: string, is_PtoP: int, is_BuckleUp: int, is_voted: int, author_postcnt: bigint, is_popular_commented: int, is_weekend: int, author_age: int, sentiment: string, sentiment_ix: double, sentiment_vec: vector, features: vector, rawPrediction: vector, prediction: double]
To evaluate the model, use test data.
predictions_lsvc = model_lsvc.transform(test_data.dropna())
evlModel = predictions_lsvc
accuracy = MulticlassClassificationEvaluator(labelCol=LABEL, predictionCol="prediction", metricName="accuracy").evaluate(evlModel)
precision = MulticlassClassificationEvaluator(labelCol=LABEL, predictionCol="prediction", metricName="weightedPrecision").evaluate(evlModel)
recall = MulticlassClassificationEvaluator(labelCol=LABEL, predictionCol="prediction", metricName="weightedRecall").evaluate(evlModel)
f1 = MulticlassClassificationEvaluator(labelCol=LABEL, predictionCol="prediction", metricName="f1").evaluate(evlModel)
print("Accuracy = %g" % accuracy)
print("Recall = %g" % recall)
print("Precision = %g" % precision)
print("F1 = %g" % f1)
Accuracy = 0.833336 Recall = 0.833336 Precision = 0.861339 F1 = 0.827329
from sklearn.metrics import confusion_matrix
y_pred=evlModel.select("prediction").collect()
y_orig=evlModel.select(LABEL).collect()
cm = confusion_matrix(y_orig, y_pred)
print("Confusion Matrix:")
print(cm)
Confusion Matrix: [[295459 153939] [ 10393 526219]]
import pandas as pd
import seaborn as sn
import matplotlib.pyplot as plt
sn.heatmap(cm, annot=True,cmap="Blues")
plt.show()
evaluatorRF = BinaryClassificationEvaluator(labelCol=LABEL, rawPredictionCol="prediction", metricName="areaUnderROC")
roc_result = evaluatorRF.evaluate(evlModel)
roc_result
0.8190436364888757
lsvc1= LinearSVC(labelCol=LABEL, featuresCol="features", aggregationDepth=5)
Let's build the pipeline now. A pipeline consists of transformers and an estimator.
pipeline_lsvc1 = Pipeline(stages=[stringIndexer_sentiment,
onehot_sentiment,
vectorAssembler_features,
lsvc1])
model_lsvc1 = pipeline_lsvc1.fit(train_data)
model_lsvc1.transform(train_data)
DataFrame[parent_id: string, author: string, id: string, author_premium: boolean, author_flair_richtext: string, author_flair_text: string, author_flair_text_color: string, body: string, collapsed: int, controversiality: bigint, no_follow: double, score: bigint, total_awards_received: bigint, author_created_time: string, created_time: string, post_date: string, is_PtoP: int, is_BuckleUp: int, is_voted: int, author_postcnt: bigint, is_popular_commented: int, is_weekend: int, author_age: int, sentiment: string, sentiment_ix: double, sentiment_vec: vector, features: vector, rawPrediction: vector, prediction: double]
To evaluate the model, use test data.
predictions_lsvc1 = model_lsvc1.transform(test_data.dropna())
evlModel = predictions_lsvc1
accuracy = MulticlassClassificationEvaluator(labelCol=LABEL, predictionCol="prediction", metricName="accuracy").evaluate(evlModel)
precision = MulticlassClassificationEvaluator(labelCol=LABEL, predictionCol="prediction", metricName="weightedPrecision").evaluate(evlModel)
recall = MulticlassClassificationEvaluator(labelCol=LABEL, predictionCol="prediction", metricName="weightedRecall").evaluate(evlModel)
f1 = MulticlassClassificationEvaluator(labelCol=LABEL, predictionCol="prediction", metricName="f1").evaluate(evlModel)
print("Accuracy = %g" % accuracy)
print("Recall = %g" % recall)
print("Precision = %g" % precision)
print("F1 = %g" % f1)
Accuracy = 0.848219 Recall = 0.848219 Precision = 0.870555 F1 = 0.843759
from sklearn.metrics import confusion_matrix
y_pred=evlModel.select("prediction").collect()
y_orig=evlModel.select(LABEL).collect()
cm = confusion_matrix(y_orig, y_pred)
print("Confusion Matrix:")
print(cm)
Confusion Matrix: [[311052 138346] [ 11312 525300]]
import pandas as pd
import seaborn as sn
import matplotlib.pyplot as plt
sn.heatmap(cm, annot=True,cmap="Blues")
plt.show()
evaluatorRF = BinaryClassificationEvaluator(labelCol=LABEL, rawPredictionCol="prediction", metricName="areaUnderROC")
roc_result = evaluatorRF.evaluate(evlModel)
roc_result
0.8355361024302947
First, let's transform the test data using our model pipeline
predictions = model_rf.transform(test_data)
Next, let's run the MulticlassClassificationEvaluator
by passing in the label column (acutal result), prediction column (from our model), and the metric we want to calculate.
evaluatorRF = MulticlassClassificationEvaluator(labelCol="is_popular_commented", predictionCol="prediction", metricName="accuracy")
accuracy = evaluatorRF.evaluate(predictions)
evaluatorRF = MulticlassClassificationEvaluator(labelCol="is_popular_commented", predictionCol="prediction", metricName="accuracy")
accuracy = evaluatorRF.evaluate(predictions)
print("Accuracy = %g" % accuracy)
print("Test Error = %g" % (1.0 - accuracy))
Accuracy = 0.577651 Test Error = 0.422349
#predictions.show()
Was the test error similar to the train error? Run the same accuracy metrics on the training data
train_predictions = model_rf.transform(test_data)
evaluatorRF = MulticlassClassificationEvaluator(labelCol="is_popular_commented", predictionCol="prediction", metricName="accuracy")
accuracy = evaluatorRF.evaluate(train_predictions)
print("Accuracy = %g" % accuracy)
print("Train Error = %g" % (1.0 - accuracy))
Accuracy = 0.577651 Train Error = 0.422349
In this section, we will build the confusion matrix from the model. We will use sci-kit learn to build the confusion matrix. We first have to extract the predicted label and the true label columns. These are the numeric binary form of the data. Finally, send them through the confusion_matrix
method.
from sklearn.metrics import confusion_matrix
y_pred=predictions.select("prediction").collect()
y_orig=predictions.select("is_popular_commented").collect()
cm = confusion_matrix(y_orig, y_pred)
print("Confusion Matrix:")
print(cm)
Confusion Matrix: [[140296 323231] [100835 439704]]
In this section, we will evaluate the model area under the curve. This requires using the evaluator BinaryClassificationEvaluator
, which you can read about in the doc here. To evaluate the model, use test data.
evaluatorRF = BinaryClassificationEvaluator(labelCol="is_popular_commented", rawPredictionCol="prediction", metricName="areaUnderROC")
roc_result = evaluatorRF.evaluate(predictions)
roc_result
0.5580626643439777
gbt= GBTClassifier(labelCol=LABEL, featuresCol="features")
#ovr = OneVsRest(classifier=lsvc)
Let's build the pipeline now. A pipeline consists of transformers and an estimator.
pipeline_gbt = Pipeline(stages=[stringIndexer_sentiment,
onehot_sentiment,
vectorAssembler_features,
gbt])
model_gbt = pipeline_gbt.fit(train_data)
model_gbt.transform(train_data)
DataFrame[parent_id: string, author: string, id: string, author_premium: boolean, author_flair_richtext: string, author_flair_text: string, author_flair_text_color: string, body: string, collapsed: int, controversiality: bigint, no_follow: double, score: bigint, total_awards_received: bigint, author_created_time: string, created_time: string, post_date: string, is_PtoP: int, is_BuckleUp: int, is_voted: int, author_postcnt: bigint, is_popular_commented: int, is_weekend: int, author_age: int, sentiment: string, sentiment_ix: double, sentiment_vec: vector, features: vector, rawPrediction: vector, probability: vector, prediction: double]
To evaluate the model, use test data.
predictions_gbt = model_gbt.transform(test_data.dropna())
evlModel = predictions_gbt
accuracy = MulticlassClassificationEvaluator(labelCol=LABEL, predictionCol="prediction", metricName="accuracy").evaluate(evlModel)
precision = MulticlassClassificationEvaluator(labelCol=LABEL, predictionCol="prediction", metricName="weightedPrecision").evaluate(evlModel)
recall = MulticlassClassificationEvaluator(labelCol=LABEL, predictionCol="prediction", metricName="weightedRecall").evaluate(evlModel)
f1 = MulticlassClassificationEvaluator(labelCol=LABEL, predictionCol="prediction", metricName="f1").evaluate(evlModel)
print("Accuracy = %g" % accuracy)
print("Recall = %g" % recall)
print("Precision = %g" % precision)
print("F1 = %g" % f1)
Accuracy = 0.88052 Recall = 0.88052 Precision = 0.891986 F1 = 0.878511
from sklearn.metrics import confusion_matrix
y_pred=evlModel.select("prediction").collect()
y_orig=evlModel.select(LABEL).collect()
cm = confusion_matrix(y_orig, y_pred)
print("Confusion Matrix:")
print(cm)
Confusion Matrix: [[345730 103668] [ 14140 522472]]
import pandas as pd
import seaborn as sn
import matplotlib.pyplot as plt
sn.heatmap(cm, annot=True,cmap="Blues")
plt.show()
evaluatorRF = BinaryClassificationEvaluator(labelCol=LABEL, rawPredictionCol="prediction", metricName="areaUnderROC")
roc_result = evaluatorRF.evaluate(evlModel)
roc_result
0.8714837776313246
gbt1= GBTClassifier(labelCol=LABEL, featuresCol="features", maxDepth=20, maxBins=128, seed = 1234, stepSize=0.5)
Let's build the pipeline now. A pipeline consists of transformers and an estimator.
pipeline_gbt1 = Pipeline(stages=[stringIndexer_sentiment,
onehot_sentiment,
vectorAssembler_features,
gbt1])
model_gbt1 = pipeline_gbt.fit(train_data)
model_gbt1.transform(train_data)
DataFrame[parent_id: string, author: string, id: string, author_premium: boolean, author_flair_richtext: string, author_flair_text: string, author_flair_text_color: string, body: string, collapsed: int, controversiality: bigint, no_follow: double, score: bigint, total_awards_received: bigint, author_created_time: string, created_time: string, post_date: string, is_PtoP: int, is_BuckleUp: int, is_voted: int, author_postcnt: bigint, is_popular_commented: int, is_weekend: int, author_age: int, sentiment: string, sentiment_ix: double, sentiment_vec: vector, features: vector, rawPrediction: vector, probability: vector, prediction: double]
To evaluate the model, use test data.
predictions_gbt1 = model_gbt1.transform(test_data.dropna())
evlModel = predictions_gbt1
accuracy = MulticlassClassificationEvaluator(labelCol=LABEL, predictionCol="prediction", metricName="accuracy").evaluate(evlModel)
precision = MulticlassClassificationEvaluator(labelCol=LABEL, predictionCol="prediction", metricName="weightedPrecision").evaluate(evlModel)
recall = MulticlassClassificationEvaluator(labelCol=LABEL, predictionCol="prediction", metricName="weightedRecall").evaluate(evlModel)
f1 = MulticlassClassificationEvaluator(labelCol=LABEL, predictionCol="prediction", metricName="f1").evaluate(evlModel)
print("Accuracy = %g" % accuracy)
print("Recall = %g" % recall)
print("Precision = %g" % precision)
print("F1 = %g" % f1)
Accuracy = 0.88052 Recall = 0.88052 Precision = 0.891986 F1 = 0.878511
from sklearn.metrics import confusion_matrix
y_pred=evlModel.select("prediction").collect()
y_orig=evlModel.select(LABEL).collect()
cm = confusion_matrix(y_orig, y_pred)
print("Confusion Matrix:")
print(cm)
Confusion Matrix: [[345730 103668] [ 14140 522472]]
import pandas as pd
import seaborn as sn
import matplotlib.pyplot as plt
sn.heatmap(cm, annot=True,cmap="Blues")
plt.show()
evaluatorRF = BinaryClassificationEvaluator(labelCol=LABEL, rawPredictionCol="prediction", metricName="areaUnderROC")
roc_result = evaluatorRF.evaluate(evlModel)
roc_result
0.8714837776313246
model_gbt.write().save('s3://yl1269-labdata5/model_gbt')
spark.stop()