Skip to content

Commit

Permalink
Create TweetsSearch.py which allows for searching tweets by keywords,…
Browse files Browse the repository at this point in the history
… hashtags or location
  • Loading branch information
drwoj committed May 29, 2024
1 parent 4328df3 commit d29139f
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 0 deletions.
41 changes: 41 additions & 0 deletions src/analysers/TweetsSearch.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
from pyspark.sql import DataFrame, Column
from pyspark.sql.functions import *
from src.utils.columns import Columns


class TweetsSearch:
@classmethod
def search_by_keyword(cls, tweets: DataFrame, keyword: str) -> DataFrame:
return tweets.filter(lower(col(Columns.TEXT.value)).contains(keyword.lower())) \
.select(Columns.TEXT.value, Columns.DATE.value, Columns.SOURCE.value) \
.orderBy(desc(Columns.DATE.value))

@classmethod
def search_by_keywords(cls, tweets:DataFrame, keywords: list[str]) -> DataFrame:
keywords: list[str] = [keyword.lower() for keyword in keywords]

return tweets.filter(lower(col(Columns.TEXT.value)).rlike("|".join(keywords))) \
.select(Columns.TEXT.value, Columns.DATE.value, Columns.SOURCE.value) \
.orderBy(desc(Columns.DATE.value))

@classmethod
def search_by_any_hashtag(cls, tweets: DataFrame, hashtags: list[str]) -> DataFrame:
keywords_col: Column = array(*[lit(hashtag.lower()) for hashtag in hashtags])

return tweets.filter(arrays_overlap(col(Columns.HASHTAGS.value), keywords_col)) \
.select(Columns.HASHTAGS.value, Columns.TEXT.value, Columns.DATE.value, Columns.SOURCE.value) \
.orderBy(desc(Columns.DATE.value))

@classmethod
def search_by_all_hashtags(cls, tweets: DataFrame, hashtags: list[str]) -> DataFrame:
keywords_col: Column = array(*[lit(hashtag.lower()) for hashtag in hashtags])

return tweets.filter(size(array_intersect(col(Columns.HASHTAGS.value), keywords_col)) == size(keywords_col)) \
.select(Columns.HASHTAGS.value, Columns.TEXT.value, Columns.DATE.value, Columns.SOURCE.value) \
.orderBy(desc(Columns.DATE.value))

@classmethod
def search_by_location(cls, tweets: DataFrame, location: str) -> DataFrame:
return tweets.filter(lower(col(Columns.USER_LOCATION.value)).contains(location.lower())) \
.select(Columns.USER_LOCATION.value, Columns.TEXT.value, Columns.DATE.value, Columns.SOURCE.value) \
.orderBy(desc(Columns.DATE.value))
26 changes: 26 additions & 0 deletions src/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from src.loaders.TweetsLoader import TweetsLoader
from src.cleaners.TweetsCleaner import TweetsCleaner as Cleaner
from src.analysers.TweetsAnalyser import TweetsAnalyser as Analyser
from src.analysers.TweetsSearch import TweetsSearch as Search

if __name__ == '__main__':
spark = (SparkSession.builder
Expand All @@ -25,3 +26,28 @@

avg_user_followers_per_location: DataFrame = Analyser.get_avg_user_followers_per_location(tweets)
avg_user_followers_per_location.show()

search: str = 'Adele'
search_tweets: DataFrame = Search.search_by_keyword(tweets, search)
print(search_tweets.count())
search_tweets.show()

search_keywords: list[str] = ['Adele', 'Grammys']
search_tweets: DataFrame = Search.search_by_keywords(tweets, search_keywords)
print(search_tweets.count())
search_tweets.show()

search_hashtags: list[str] = ['adele', 'Grammys']
search_tweets: DataFrame = Search.search_by_any_hashtag(tweets, search_hashtags)
print(search_tweets.count())
search_tweets.show()

search_hashtags: list[str] = ['Adele', 'grammys']
search_tweets: DataFrame = Search.search_by_all_hashtags(tweets, search_hashtags)
print(search_tweets.count())
search_tweets.show()

search_location: str = 'Poland'
search_tweets: DataFrame = Search.search_by_location(tweets, search_location)
print(search_tweets.count())
search_tweets.show()

0 comments on commit d29139f

Please sign in to comment.