import findspark
findspark.init()
!/mnt/miniconda/bin/pip install sparknlp
!/mnt/miniconda/bin/pip install plotly
!/mnt/miniconda/bin/pip install wordcloud
!/mnt/miniconda/bin/pip install altair
Collecting sparknlp Downloading sparknlp-1.0.0-py3-none-any.whl (1.4 kB) Requirement already satisfied: spark-nlp in /mnt/miniconda/lib/python3.7/site-packages (from sparknlp) (3.4.3) Requirement already satisfied: numpy in /mnt/miniconda/lib/python3.7/site-packages (from sparknlp) (1.21.2) Installing collected packages: sparknlp Successfully installed sparknlp-1.0.0 Collecting plotly Downloading plotly-5.7.0-py2.py3-none-any.whl (28.8 MB) |████████████████████████████████| 28.8 MB 27.8 MB/s eta 0:00:01 Collecting tenacity>=6.2.0 Downloading tenacity-8.0.1-py3-none-any.whl (24 kB) Requirement already satisfied: six in /mnt/miniconda/lib/python3.7/site-packages (from plotly) (1.16.0) Installing collected packages: tenacity, plotly Successfully installed plotly-5.7.0 tenacity-8.0.1 Collecting wordcloud Downloading wordcloud-1.8.1-cp37-cp37m-manylinux1_x86_64.whl (366 kB) |████████████████████████████████| 366 kB 29.8 MB/s eta 0:00:01 Requirement already satisfied: pillow in /mnt/miniconda/lib/python3.7/site-packages (from wordcloud) (9.0.1) Requirement already satisfied: numpy>=1.6.1 in /mnt/miniconda/lib/python3.7/site-packages (from wordcloud) (1.21.2) Requirement already satisfied: matplotlib in /mnt/miniconda/lib/python3.7/site-packages (from wordcloud) (3.5.1) Requirement already satisfied: packaging>=20.0 in /mnt/miniconda/lib/python3.7/site-packages (from matplotlib->wordcloud) (21.3) Requirement already satisfied: pyparsing>=2.2.1 in /mnt/miniconda/lib/python3.7/site-packages (from matplotlib->wordcloud) (3.0.4) Requirement already satisfied: python-dateutil>=2.7 in /mnt/miniconda/lib/python3.7/site-packages (from matplotlib->wordcloud) (2.8.2) Requirement already satisfied: fonttools>=4.22.0 in /mnt/miniconda/lib/python3.7/site-packages (from matplotlib->wordcloud) (4.25.0) Requirement already satisfied: cycler>=0.10 in /mnt/miniconda/lib/python3.7/site-packages (from matplotlib->wordcloud) (0.11.0) Requirement already satisfied: kiwisolver>=1.0.1 in /mnt/miniconda/lib/python3.7/site-packages (from matplotlib->wordcloud) (1.3.2) Requirement already satisfied: six>=1.5 in /mnt/miniconda/lib/python3.7/site-packages (from python-dateutil>=2.7->matplotlib->wordcloud) (1.16.0) Installing collected packages: wordcloud Successfully installed wordcloud-1.8.1 Collecting altair Downloading altair-4.2.0-py3-none-any.whl (812 kB) |████████████████████████████████| 812 kB 30.8 MB/s eta 0:00:01 Requirement already satisfied: pandas>=0.18 in /mnt/miniconda/lib/python3.7/site-packages (from altair) (1.3.5) Requirement already satisfied: toolz in /mnt/miniconda/lib/python3.7/site-packages (from altair) (0.11.2) Requirement already satisfied: jsonschema>=3.0 in /mnt/miniconda/lib/python3.7/site-packages (from altair) (4.4.0) Requirement already satisfied: numpy in /mnt/miniconda/lib/python3.7/site-packages (from altair) (1.21.2) Requirement already satisfied: entrypoints in /mnt/miniconda/lib/python3.7/site-packages (from altair) (0.4) Requirement already satisfied: jinja2 in /mnt/miniconda/lib/python3.7/site-packages (from altair) (3.0.3) Requirement already satisfied: typing-extensions in /mnt/miniconda/lib/python3.7/site-packages (from jsonschema>=3.0->altair) (3.10.0.2) Requirement already satisfied: importlib-metadata in /mnt/miniconda/lib/python3.7/site-packages (from jsonschema>=3.0->altair) (4.8.2) Requirement already satisfied: importlib-resources>=1.4.0 in /mnt/miniconda/lib/python3.7/site-packages (from jsonschema>=3.0->altair) (5.2.0) Requirement already satisfied: pyrsistent!=0.17.0,!=0.17.1,!=0.17.2,>=0.14.0 in /mnt/miniconda/lib/python3.7/site-packages (from jsonschema>=3.0->altair) (0.18.0) Requirement already satisfied: attrs>=17.4.0 in /mnt/miniconda/lib/python3.7/site-packages (from jsonschema>=3.0->altair) (21.4.0) Requirement already satisfied: zipp>=3.1.0 in /mnt/miniconda/lib/python3.7/site-packages (from importlib-resources>=1.4.0->jsonschema>=3.0->altair) (3.7.0) Requirement already satisfied: python-dateutil>=2.7.3 in /mnt/miniconda/lib/python3.7/site-packages (from pandas>=0.18->altair) (2.8.2) Requirement already satisfied: pytz>=2017.3 in /mnt/miniconda/lib/python3.7/site-packages (from pandas>=0.18->altair) (2021.3) Requirement already satisfied: six>=1.5 in /mnt/miniconda/lib/python3.7/site-packages (from python-dateutil>=2.7.3->pandas>=0.18->altair) (1.16.0) Requirement already satisfied: MarkupSafe>=2.0 in /mnt/miniconda/lib/python3.7/site-packages (from jinja2->altair) (2.0.1) Installing collected packages: altair Successfully installed altair-4.2.0
import pandas as pd
import numpy as np
import json
import time
import plotly.graph_objects as go
import matplotlib.pyplot as plt
from pyspark.ml import Pipeline
from pyspark.sql import SparkSession
import pyspark.sql.functions as f
from sparknlp.annotator import *
from sparknlp.base import *
import sparknlp
import seaborn as sns
from sparknlp.pretrained import PretrainedPipeline
from wordcloud import WordCloud
from collections import ChainMap
import pyspark.sql.functions as F
spark = SparkSession.builder \
.appName("SparkNLP") \
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
.config("spark.jars.packages", "com.johnsnowlabs.nlp:spark-nlp_2.12:3.4.2") \
.master('yarn') \
.getOrCreate()
spark
SparkSession - in-memory
df = spark.read.parquet('s3://yl1269-labdata5/prepare_df/')
df.printSchema()
root |-- parent_id: string (nullable = true) |-- author: string (nullable = true) |-- id: string (nullable = true) |-- author_premium: boolean (nullable = true) |-- author_flair_richtext: string (nullable = true) |-- author_flair_text: string (nullable = true) |-- author_flair_text_color: string (nullable = true) |-- body: string (nullable = true) |-- collapsed: boolean (nullable = true) |-- controversiality: long (nullable = true) |-- no_follow: boolean (nullable = true) |-- score: long (nullable = true) |-- send_replies: boolean (nullable = true) |-- total_awards_received: long (nullable = true) |-- stickied: boolean (nullable = true) |-- author_created_time: string (nullable = true) |-- created_time: string (nullable = true) |-- post_date: string (nullable = true) |-- is_PtoP: integer (nullable = true) |-- is_BuckleUp: integer (nullable = true) |-- is_voted: integer (nullable = true) |-- author_postcnt: long (nullable = true) |-- is_popular_commented: integer (nullable = true) |-- is_weekend: integer (nullable = true) |-- author_age: integer (nullable = true) |-- gme_mentioned: integer (nullable = true)
from pyspark.sql.functions import *
df.select([f.count(f.when(col(c).isNull(), c)).alias(c) for c in df.columns]).show()
22/05/04 05:48:26 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'. [Stage 1:===================================================> (49 + 5) / 54]
+---------+------+---+--------------+---------------------+-----------------+-----------------------+----+---------+----------------+---------+-----+------------+---------------------+--------+-------------------+------------+---------+-------+-----------+--------+--------------+--------------------+----------+----------+-------------+ |parent_id|author| id|author_premium|author_flair_richtext|author_flair_text|author_flair_text_color|body|collapsed|controversiality|no_follow|score|send_replies|total_awards_received|stickied|author_created_time|created_time|post_date|is_PtoP|is_BuckleUp|is_voted|author_postcnt|is_popular_commented|is_weekend|author_age|gme_mentioned| +---------+------+---+--------------+---------------------+-----------------+-----------------------+----+---------+----------------+---------+-----+------------+---------------------+--------+-------------------+------------+---------+-------+-----------+--------+--------------+--------------------+----------+----------+-------------+ | 0| 0| 0| 777484| 0| 1813934| 1036450| 0| 0| 0| 0| 0| 0| 0| 0| 777484| 0| 0| 0| 0| 0| 0| 0| 0| 777484| 0| +---------+------+---+--------------+---------------------+-----------------+-----------------------+----+---------+----------------+---------+-----+------------+---------------------+--------+-------------------+------------+---------+-------+-----------+--------+--------------+--------------------+----------+----------+-------------+
# Add more stop words
stop_words_customized = ['im', 'dont', 'your', 'didnt', 'doesnt', 'gonna', 'isnt', 'u', 'your','deleted','removed']
stop_words_customized = StopWordsCleaner().getStopWords() + stop_words_customized
# Change all comments into lowercases
df = df.withColumn('body', f.lower(col('body')))
# Check the updated column 'body'
df.select('body').show(3)
[Stage 5:> (0 + 1) / 1]
+--------------------+ | body| +--------------------+ |if it’s not label...| |welcome back to f...| |third apes help ...| +--------------------+ only showing top 3 rows
# before clean the text, counting words first
from wordcloud import WordCloud, STOPWORDS
import matplotlib.pyplot as plt
import pandas as pd
comment_words = ''
stopwords = set(STOPWORDS)
body = df.select("body").distinct().take(1000)
tokens = [item['body'] for item in body]
# Converts each token into lowercase
for i in range(len(tokens)):
tokens[i] = tokens[i].lower()
comment_words += " ".join(tokens)+" "
wordcloud = WordCloud(width = 800, height = 800,
background_color ='white',
stopwords = stopwords,
min_font_size = 10).generate(comment_words)
# plot the WordCloud image
plt.figure(figsize = (8, 8), facecolor = None)
plt.imshow(wordcloud)
plt.axis("off")
plt.tight_layout(pad = 0)
plt.show()
# Define words cleaner pipeline for English
# Transform the raw text into the form of document
documentAssembler = DocumentAssembler() \
.setInputCol("body") \
.setOutputCol("document")
# Word segmentation
tokenizer = Tokenizer() \
.setInputCols(["document"]) \
.setOutputCol("token")
# Remove stop words in English
stop_words1 = StopWordsCleaner.pretrained("stopwords_en", "en") \
.setInputCols(["token"]) \
.setOutputCol("stop") \
.setStopWords(stop_words_customized)
# Removes punctuations (keeps alphanumeric chars)
normalizer = Normalizer() \
.setInputCols(["stop"]) \
.setOutputCol("normalized") \
.setCleanupPatterns(["""[^\w\d\s]"""])
# Return hard-stems out of words with the objective of retrieving the meaningful part of the word.
stemmer = Stemmer() \
.setInputCols(["normalized"]) \
.setOutputCol("stem")
# Remove stop words in English again
stop_words2 = StopWordsCleaner.pretrained("stopwords_en", "en") \
.setInputCols(["stem"]) \
.setOutputCol("final") \
.setStopWords(stop_words_customized)
# Finisher converts tokens to human-readable output
finisher = Finisher() \
.setInputCols(['final']) \
.setCleanAnnotations(False)
# Set up the pipeline
pipeline = Pipeline().setStages([
documentAssembler,
tokenizer,
stop_words1,
normalizer,
stemmer,
stop_words2,
finisher
])
stopwords_en download started this may take some time. Approximate size to download 2.9 KB [ | ]stopwords_en download started this may take some time. Approximate size to download 2.9 KB Download done! Loading the resource.
[OK!] stopwords_en download started this may take some time. Approximate size to download 2.9 KB [OK!]
# Fit the dataset into the pipeline
df_cleaned = pipeline.fit(df).transform(df)
df_cleaned.printSchema()
root |-- parent_id: string (nullable = true) |-- author: string (nullable = true) |-- id: string (nullable = true) |-- author_premium: boolean (nullable = true) |-- author_flair_richtext: string (nullable = true) |-- author_flair_text: string (nullable = true) |-- author_flair_text_color: string (nullable = true) |-- body: string (nullable = true) |-- collapsed: boolean (nullable = true) |-- controversiality: long (nullable = true) |-- no_follow: boolean (nullable = true) |-- score: long (nullable = true) |-- send_replies: boolean (nullable = true) |-- total_awards_received: long (nullable = true) |-- stickied: boolean (nullable = true) |-- author_created_time: string (nullable = true) |-- created_time: string (nullable = true) |-- post_date: string (nullable = true) |-- is_PtoP: integer (nullable = true) |-- is_BuckleUp: integer (nullable = true) |-- is_voted: integer (nullable = true) |-- author_postcnt: long (nullable = true) |-- is_popular_commented: integer (nullable = true) |-- is_weekend: integer (nullable = true) |-- author_age: integer (nullable = true) |-- gme_mentioned: integer (nullable = true) |-- document: array (nullable = true) | |-- element: struct (containsNull = true) | | |-- annotatorType: string (nullable = true) | | |-- begin: integer (nullable = false) | | |-- end: integer (nullable = false) | | |-- result: string (nullable = true) | | |-- metadata: map (nullable = true) | | | |-- key: string | | | |-- value: string (valueContainsNull = true) | | |-- embeddings: array (nullable = true) | | | |-- element: float (containsNull = false) |-- token: array (nullable = true) | |-- element: struct (containsNull = true) | | |-- annotatorType: string (nullable = true) | | |-- begin: integer (nullable = false) | | |-- end: integer (nullable = false) | | |-- result: string (nullable = true) | | |-- metadata: map (nullable = true) | | | |-- key: string | | | |-- value: string (valueContainsNull = true) | | |-- embeddings: array (nullable = true) | | | |-- element: float (containsNull = false) |-- stop: array (nullable = true) | |-- element: struct (containsNull = true) | | |-- annotatorType: string (nullable = true) | | |-- begin: integer (nullable = false) | | |-- end: integer (nullable = false) | | |-- result: string (nullable = true) | | |-- metadata: map (nullable = true) | | | |-- key: string | | | |-- value: string (valueContainsNull = true) | | |-- embeddings: array (nullable = true) | | | |-- element: float (containsNull = false) |-- normalized: array (nullable = true) | |-- element: struct (containsNull = true) | | |-- annotatorType: string (nullable = true) | | |-- begin: integer (nullable = false) | | |-- end: integer (nullable = false) | | |-- result: string (nullable = true) | | |-- metadata: map (nullable = true) | | | |-- key: string | | | |-- value: string (valueContainsNull = true) | | |-- embeddings: array (nullable = true) | | | |-- element: float (containsNull = false) |-- stem: array (nullable = true) | |-- element: struct (containsNull = true) | | |-- annotatorType: string (nullable = true) | | |-- begin: integer (nullable = false) | | |-- end: integer (nullable = false) | | |-- result: string (nullable = true) | | |-- metadata: map (nullable = true) | | | |-- key: string | | | |-- value: string (valueContainsNull = true) | | |-- embeddings: array (nullable = true) | | | |-- element: float (containsNull = false) |-- final: array (nullable = true) | |-- element: struct (containsNull = true) | | |-- annotatorType: string (nullable = true) | | |-- begin: integer (nullable = false) | | |-- end: integer (nullable = false) | | |-- result: string (nullable = true) | | |-- metadata: map (nullable = true) | | | |-- key: string | | | |-- value: string (valueContainsNull = true) | | |-- embeddings: array (nullable = true) | | | |-- element: float (containsNull = false) |-- finished_final: array (nullable = true) | |-- element: string (containsNull = true)
df_cleaned = df_cleaned.withColumn('document', col('document.result'))
df_cleaned = df_cleaned.withColumn('token', col('token.result'))
df_cleaned = df_cleaned.withColumn('stop', col('stop.result'))
df_cleaned = df_cleaned.withColumn('normalized', col('normalized.result'))
df_cleaned = df_cleaned.withColumn('stem', col('stem.result'))
df_cleaned = df_cleaned.withColumn('final', col('final.result'))
df_cleaned.printSchema()
root |-- parent_id: string (nullable = true) |-- author: string (nullable = true) |-- id: string (nullable = true) |-- author_premium: boolean (nullable = true) |-- author_flair_richtext: string (nullable = true) |-- author_flair_text: string (nullable = true) |-- author_flair_text_color: string (nullable = true) |-- body: string (nullable = true) |-- collapsed: boolean (nullable = true) |-- controversiality: long (nullable = true) |-- no_follow: boolean (nullable = true) |-- score: long (nullable = true) |-- send_replies: boolean (nullable = true) |-- total_awards_received: long (nullable = true) |-- stickied: boolean (nullable = true) |-- author_created_time: string (nullable = true) |-- created_time: string (nullable = true) |-- post_date: string (nullable = true) |-- is_PtoP: integer (nullable = true) |-- is_BuckleUp: integer (nullable = true) |-- is_voted: integer (nullable = true) |-- author_postcnt: long (nullable = true) |-- is_popular_commented: integer (nullable = true) |-- is_weekend: integer (nullable = true) |-- author_age: integer (nullable = true) |-- gme_mentioned: integer (nullable = true) |-- document: array (nullable = true) | |-- element: string (containsNull = true) |-- token: array (nullable = true) | |-- element: string (containsNull = true) |-- stop: array (nullable = true) | |-- element: string (containsNull = true) |-- normalized: array (nullable = true) | |-- element: string (containsNull = true) |-- stem: array (nullable = true) | |-- element: string (containsNull = true) |-- final: array (nullable = true) | |-- element: string (containsNull = true) |-- finished_final: array (nullable = true) | |-- element: string (containsNull = true)
# Transform the dataframe into rdd
text_rdd = df_cleaned.select('finished_final').rdd
# Map the rdd by assigning all words with 1 count
text = text_rdd.map(list).map(lambda x: x[0])
text = text.flatMap(lambda x:x).map(lambda x: (x,1))
# Reduce the rdd by aggregate the same word
text_count = text.reduceByKey(lambda x,y:(x+y)).sortBy(lambda x:x[1], ascending=False)
# Take the top 50 words with their counts
text_count.take(50)
[('like', 653993), ('get', 544023), ('share', 543295), ('ap', 527665), ('gme', 461318), ('go', 459404), ('think', 378043), ('remov', 371588), ('delet', 366967), ('know', 356282), ('time', 348047), ('fuck', 336501), ('peopl', 335930), ('bui', 313531), ('see', 310662), ('wai', 309157), ('make', 308572), ('post', 304065), ('dai', 297073), ('short', 282756), ('good', 279773), ('sell', 269497), ('stock', 268298), ('price', 258202), ('want', 254068), ('look', 238382), ('thank', 233901), ('ne', 231151), ('monei', 223217), ('even', 219710), ('hold', 214213), ('right', 209625), ('market', 209010), ('vote', 203903), ('thing', 202910), ('new', 194942), ('take', 194050), ('still', 190574), ('call', 185075), ('back', 184838), ('also', 184155), ('got', 184016), ('shit', 181163), ('work', 180085), ('sai', 179747), ('much', 177449), ('us', 176715), ('lol', 173966), ('happen', 173959), ('well', 171470)]
#generate wordcloud based word count result
#transform rdd to dictionary
words = text_count.collectAsMap()
wordcloud = WordCloud(width= 3000, height = 2000, random_state=1, collocations=False,background_color ='white').generate_from_frequencies(words)
# Plot
plot_cloud(wordcloud)
from pyspark.ml.feature import CountVectorizer,IDF
# Initial a CountVectorizer
cv = CountVectorizer(inputCol="finished_final",
outputCol="tf",
vocabSize=1000) # consider only the 1000 most frequent terms
# Fit the cleaned data
cv_model = cv.fit(df_cleaned)
df_cv = cv_model.transform(df_cleaned)
# Initial a TfidfVectorizer based on the result of CountVectorizer
idf = IDF(inputCol='tf',
outputCol='tfidf')
idf_model = idf.fit(df_cv)
df_idf = idf_model.transform(df_cv)
vocab = spark.createDataFrame(pd.DataFrame({'word': cv_model.vocabulary,
'tfidf': idf_model.idf}))
vocab = vocab.sort('tfidf', ascending=False)
vocab.show(20)
+----------+------------------+ | word| tfidf| +----------+------------------+ | moli| 6.743512160401614| | repo| 6.677971276468011| | candl| 6.576784335699986| | danc| 6.567418168644507| | blackrock| 6.565401942144191| | insur| 6.553673559790433| |conspiraci| 6.542831045613697| | demand| 6.538340020101233| | exit| 6.537873362759245| | sweet| 6.53377614127185| | bond|6.5232380515153565| | common| 6.52268647952123| | platform| 6.521767868396555| | contact| 6.500330041314541| | cnbc| 6.499521524898041| | prevent| 6.49674162802645| | gold| 6.495936006535444| | halt| 6.493880139889549| | track| 6.484105759600915| | remain| 6.478022962161649| +----------+------------------+ only showing top 20 rows
#generate wordcloud based word tfidf
words = dict(ChainMap(*vocab.select(F.create_map('word', 'tfidf')).rdd.map(lambda x: x[0]).collect()))
wordcloud = WordCloud(width= 3000, height = 2000, random_state=1, collocations=False,background_color ='white').generate_from_frequencies(words)
# Plot
plot_cloud(wordcloud)
from sparknlp.pretrained import PretrainedPipeline
pipeline = PretrainedPipeline('sentimentdl_use_twitter', lang = 'en')
MODEL_NAME='sentimentdl_use_twitter'
documentAssembler = DocumentAssembler()\
.setInputCol("text")\
.setOutputCol("document")
use = UniversalSentenceEncoder.pretrained(name="tfhub_use", lang="en")\
.setInputCols(["document"])\
.setOutputCol("sentence_embeddings")
sentimentdl = SentimentDLModel.pretrained(name=MODEL_NAME, lang="en")\
.setInputCols(["sentence_embeddings"])\
.setOutputCol("sentiment")
nlpPipeline = Pipeline(
stages = [
documentAssembler,
use,
sentimentdl
])
empty_df = spark.createDataFrame([['']]).toDF("text")
pipelineModel = nlpPipeline.fit(empty_df)
result = result.withColumn("text",F.explode(result.text))
sentiment_result = pipelineModel.transform(result)
# change column name
sentiment_result = sentiment_result.withColumn('sentiment_result',sentiment_result.sentiment.result)
sentiment_result = sentiment_result.drop("document","sentence_embeddings","sentiment")
sentiment_result = sentiment_result.withColumn('sentiment_result',F.explode(sentiment_result.sentiment_result))
sentiment_result.printSchema()
sentiment_result = sentiment_result.drop("normalizedDocument","token","lemma","cleanTokens", "text")
sentiment_result.printSchema()
sentiment_result.show()
df = df_idf
# Define a function to union the words into a sentence
wordToText = f.udf(lambda x:' '.join(x))
# Apply the function to create the cleaned text
df = df.withColumn('text', wordToText(f.col('finished_final')))
# Define sentiment pipeline
documentAssembler = DocumentAssembler() \
.setInputCol("text") \
.setOutputCol("document")
use = UniversalSentenceEncoder.pretrained('tfhub_use', lang="en") \
.setInputCols(["document"])\
.setOutputCol("sentence_embeddings")
sentimentdl = SentimentDLModel().pretrained('sentimentdl_use_twitter')\
.setInputCols(["sentence_embeddings"])\
.setOutputCol("sentiment")
nlpPipeline = Pipeline(
stages = [
documentAssembler,
use,
sentimentdl
])
# Run the pipeline
pipelineModel = nlpPipeline.fit(df)
result = pipelineModel.transform(df)
# Show some of the results
result.select('sentiment.result').show(5)
result = result.withColumn('labels', result.sentiment.result[0])
result = result.drop('document', 'token', 'normalized', 'stop', 'stem', 'final', 'tf', 'tfidf', 'sentence_embeddings', 'sentiment')
result.printSchema()
result.select("finished_final","text","labels").show()
df = result.drop("finished_final","text")
df.count()
df = df.withColumn('blackrock_mentioned', f.when(df.body.rlike('blackrock'), True).otherwise(False))
df = df.withColumn('share_mentioned', f.when(df.body.rlike('share'), True).otherwise(False))
df = df.withColumnRenamed('labels','sentiment')
df.printSchema()
df.write.save("s3://yl1269-labdata5/nlp_df")
df = spark.read.parquet('s3://yl1269-labdata5/nlp_df/')
df.printSchema()
root |-- parent_id: string (nullable = true) |-- author: string (nullable = true) |-- id: string (nullable = true) |-- author_premium: boolean (nullable = true) |-- author_flair_richtext: string (nullable = true) |-- author_flair_text: string (nullable = true) |-- author_flair_text_color: string (nullable = true) |-- body: string (nullable = true) |-- collapsed: boolean (nullable = true) |-- controversiality: long (nullable = true) |-- no_follow: boolean (nullable = true) |-- score: long (nullable = true) |-- send_replies: boolean (nullable = true) |-- total_awards_received: long (nullable = true) |-- stickied: boolean (nullable = true) |-- author_created_time: string (nullable = true) |-- created_time: string (nullable = true) |-- post_date: string (nullable = true) |-- is_PtoP: integer (nullable = true) |-- is_BuckleUp: integer (nullable = true) |-- is_voted: integer (nullable = true) |-- author_postcnt: long (nullable = true) |-- is_popular_commented: integer (nullable = true) |-- is_weekend: integer (nullable = true) |-- author_age: integer (nullable = true) |-- gme_mentioned: integer (nullable = true) |-- sentiment: string (nullable = true) |-- blackrock_mentioned: boolean (nullable = true) |-- share_mentioned: boolean (nullable = true)
df.groupby('sentiment').count().show()
[Stage 77:===========================================> (3 + 1) / 4]
+---------+-------+ |sentiment| count| +---------+-------+ | positive|4640279| | null| 139060| | neutral| 896614| | negative|1727520| +---------+-------+
df.groupby('no_follow','sentiment').count().show()
+---------+---------+-------+ |no_follow|sentiment| count| +---------+---------+-------+ | true| neutral| 675800| | true| null| 87314| | false| negative| 753896| | false| neutral| 220814| | true| negative| 973624| | true| positive|2578783| | false| null| 51746| | false| positive|2061496| +---------+---------+-------+
df.groupby('no_follow','sentiment').count().toPandas()
no_follow | sentiment | count | |
---|---|---|---|
0 | True | neutral | 675800 |
1 | True | None | 87314 |
2 | False | negative | 753896 |
3 | False | neutral | 220814 |
4 | True | negative | 973624 |
5 | True | positive | 2578783 |
6 | False | None | 51746 |
7 | False | positive | 2061496 |
import pandas as pd
import seaborn as sns
agg_senti = pd.DataFrame({'sentiment': ['positive', 'negative', 'neutral'],
'yes_follow': [264650, 93580,28120,],
'no_follow': [379580,136840, 74560]})
from matplotlib import pyplot as plt
sns.set_theme(style="whitegrid")
sns.set_color_codes("pastel")
# Very simple one-liner using our agg_tips DataFrame.
agg_senti.set_index('sentiment').plot(kind='bar', stacked=True,figsize=(10, 12))
sns.color_palette("mako", as_cmap=True)
# Just add a title and rotate the x-axis labels to be horizontal.
plt.title('Sentiment count by if followed')
plt.xticks(rotation=0, ha='center')
sns.despine(left=True, bottom=True)
df.groupby('is_popular_commented','sentiment').count().show()
[Stage 7:====================================================> (50 + 4) / 54]
+--------------------+---------+-----+ |is_popular_commented|sentiment|count| +--------------------+---------+-----+ | 0| positive|42145| | 0| negative|14870| | 0| neutral| 5966| | 1| neutral| 5174| | 1| positive|21414| | 1| negative| 8194| | 1| null| 548| | 0| null| 1689| +--------------------+---------+-----+
# Import libraries
from matplotlib import pyplot as plt
import numpy as np
# Creating dataset
cars = ['Positive', 'Negative', 'Neutral',]
data = [21414, 8194, 5174]
# Creating plot
fig = plt.figure(figsize =(10, 7))
plt.pie(data, labels = cars)
plt.title('Popular commented Posts Sentiment')
# show plot
plt.show()
dfs.groupby('is_BuckleUp','sentiment').count().show()
[Stage 12:> (0 + 1) / 1]
+-----------+---------+-----+ |is_BuckleUp|sentiment|count| +-----------+---------+-----+ | 0| positive|51675| | 0| negative|19856| | 1| positive|12778| | 1| negative| 3797| | 1| neutral| 1297| | 0| neutral| 8414| | 0| null| 1739| | 1| null| 444| +-----------+---------+-----+
# Import libraries
from matplotlib import pyplot as plt
import numpy as np
# Creating dataset
cars = ['Positive', 'Negative', 'Neutral',]
data = [12778, 3797, 1297]
# Creating plot
fig = plt.figure(figsize =(10, 7))
plt.pie(data, labels = cars)
plt.title('BuckleUp flair Posts Sentiment')
# show plot
plt.show()
dfs.groupby('is_voted','sentiment').count().show()
[Stage 13:==================================================> (49 + 5) / 54]
+--------+---------+-----+ |is_voted|sentiment|count| +--------+---------+-----+ | 0| neutral| 9686| | 0| negative|19348| | 0| positive|51772| | 1| positive|12551| | 1| negative| 3373| | 1| neutral| 1164| | 0| null| 1798| | 1| null| 308| +--------+---------+-----+
# Import libraries
from matplotlib import pyplot as plt
import numpy as np
# Creating dataset
cars = ['Positive', 'Negative', 'Neutral',]
data = [12551, 3373, 1164]
# Creating plot
fig = plt.figure(figsize =(10, 7))
plt.pie(data, labels = cars)
plt.title('Voted flair Posts Sentiment')
# show plot
plt.show()
dfs.groupby('gme_mentioned','sentiment').count().show()
+-------------+---------+-----+ |gme_mentioned|sentiment|count| +-------------+---------+-----+ | 0| positive|59641| | 0| negative|21956| | 1| positive| 3693| | 1| negative| 1112| | 0| neutral|11093| | 1| neutral| 304| | 0| null| 2201| +-------------+---------+-----+
# Import libraries
from matplotlib import pyplot as plt
import numpy as np
# Creating dataset
cars = ['Positive', 'Negative', 'Neutral',]
data = [12551, 3373, 1164]
# Creating plot
fig = plt.figure(figsize =(10, 7))
plt.pie(data, labels = cars)
plt.title('Voted flair Posts Sentiment')
# show plot
plt.show()
dfs.groupby('blackrock_mentioned','sentiment').count().show()
+-------------------+---------+-----+ |blackrock_mentioned|sentiment|count| +-------------------+---------+-----+ | false| neutral|11358| | false| positive|63232| | false| negative|23182| | true| positive| 142| | true| neutral| 15| | true| negative| 39| | false| null| 2032| +-------------------+---------+-----+
dfs.groupby('share_mentioned','sentiment').count().show()
+---------------+---------+-----+ |share_mentioned|sentiment|count| +---------------+---------+-----+ | false| positive|59578| | false| negative|22482| | false| neutral| 9413| | true| positive| 4875| | true| negative| 1171| | true| neutral| 298| | false| null| 2183| +---------------+---------+-----+
df_sentiment = df.groupby("post_date")\
.agg(
count(when(col("sentiment") == "positive", True)).alias("PositiveCnt"),
count(when(col("sentiment") == "negative", True)).alias("negativeCnt"),
count(when(col("sentiment") == "neutral", True)).alias("neutralCnt"),
)\
.withColumnRenamed("post_date","Date")\
.sort(col("Date"))
df_sentiment.show()
[Stage 255:==================================================> (50 + 4) / 54]
+----------+-----------+-----------+----------+ | Date|PositiveCnt|negativeCnt|neutralCnt| +----------+-----------+-----------+----------+ |2021-03-16| 1| 0| 0| |2021-03-19| 1| 0| 2| |2021-03-20| 2| 0| 0| |2021-03-22| 3| 0| 0| |2021-03-23| 1| 0| 3| |2021-03-24| 17| 3| 2| |2021-03-25| 3| 1| 2| |2021-03-27| 9| 0| 0| |2021-03-28| 18| 1| 2| |2021-03-29| 40| 11| 4| |2021-03-30| 16| 2| 2| |2021-03-31| 9| 2| 0| |2021-04-01| 5| 1| 1| |2021-04-02| 6| 1| 1| |2021-04-03| 3| 0| 0| |2021-04-04| 15| 1| 0| |2021-04-05| 44332| 15101| 4654| |2021-04-06| 46264| 16217| 4544| |2021-04-07| 48062| 17384| 5060| |2021-04-08| 56205| 19689| 5448| +----------+-----------+-----------+----------+ only showing top 20 rows
from pyspark.sql.types import *
df_stock = spark.read.csv('s3://yl1269-labdata5/GME.csv',header=True)
df_stock = df_stock.withColumn("Open",col("Open").cast(DoubleType()))\
.withColumn("High",col("High").cast(DoubleType()))\
.withColumn("Low",col("Low").cast(DoubleType()))\
.withColumn("Close",col("Close").cast(DoubleType()))\
.withColumn("Volume",col("Volume").cast(DoubleType()))\
.withColumn("Adj Close",col("Adj Close").cast(DoubleType()))\
.sort(col("Date"))
stock = df_stock.toPandas()
import plotly.express as px
from plotly.subplots import make_subplots
sentiment = df.groupby("post_date","sentiment").count().dropna().sort(col("post_date")).toPandas()
fig = px.bar(sentiment,
x="post_date",
y="count",
color='sentiment',
barmode='group',
height=400)
fig.update_layout(title_text="Sentiment by Date")
fig.write_html("sentimentflow.html")