-
Notifications
You must be signed in to change notification settings - Fork 0
/
TweetsToAwardNames.py
223 lines (188 loc) · 8.26 KB
/
TweetsToAwardNames.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
import json
import re
import spacy
import nltk
#from nltk.sentiment import SentimentIntensityAnalyzer
from nltk.metrics.distance import edit_distance
from nltk.corpus import wordnet
from datetime import datetime
from collections import defaultdict
from AwardCategory import AwardCategory
from utils import standardize, filter_dict, sort_dict_alpha, sort_dict_decreasing_count, dict_to_json
def is_award_topic(award_name):
#! this is kind of sketch but i feel like we can defend it as being "domain knowledge"
notMovieRelated = ["speech","outfit","look","insult","hair","dressed"]
for nr in notMovieRelated:
if nr in award_name:
return False
return True
def find_awards(tweets):
uniqueAwards = defaultdict(int)
seenTweets = set()
for tweet in tweets:
text = tweet['text'].lower()
bestSearch = re.search(r"\s*(?P<award_name>[Bb]est .*) goes to",text)
if bestSearch is not None and text not in seenTweets:
seenTweets.add(text)
uniqueAwards[bestSearch['award_name']] += 1
## get rid of awards with less than 2 occurrences
uniqueAwards = filter_dict(uniqueAwards,minCount=2)
## reset counts to 0
uniqueAwards = {k: 0 for k in uniqueAwards.keys()}
## sort dictionary by word length
uniqueAwards = dict(sorted(uniqueAwards.items(), key= lambda x: -len(x[0].split())))
seenTweets = set()
for award in uniqueAwards:
awardRegex = r"(\s*" + re.escape(award) + r".*)"
for tweet in tweets:
text = standardize(tweet['text']).lower()
awardSearch = re.search(awardRegex,text)
if awardSearch is not None and text not in seenTweets:
seenTweets.add(text)
uniqueAwards[award] += 1
uniqueAwards = filter_dict(uniqueAwards,minCount=3)
return {k: AwardCategory(k,v) for k, v in uniqueAwards.items() if is_award_topic(k)}
def merge_identical(d):
with open("saved_jsons/clean_aliases.json", "r") as file:
cleaned_award_names = json.load(file)
new_d = dict()
d = dict(sorted(d.items(), key= lambda x: -len(x[0].split())))
set_to_name = {}
for awardName, awardCategory in d.items():
cleanedAwardName = cleaned_award_names[awardName]
cleanedAwardNameWords = tuple(sorted(tuple(set(cleanedAwardName.split()))))
set_to_name[cleanedAwardNameWords] = awardName
if cleanedAwardNameWords not in new_d:
new_d[cleanedAwardNameWords] = AwardCategory(awardName)
# else:
# print(f"merging {awardName} into {cleanedAwardName}")
new_d[cleanedAwardNameWords].count += awardCategory.count
new_d[cleanedAwardNameWords].aliases |= awardCategory.aliases
new_d = {set_to_name[k]:v for k,v in new_d.items()}
clean_aliases(new_d)
return new_d
def merge_substrings(d):
new_d = dict()
d = dict(sorted(d.items(), key= lambda x: -len(x[0].split())))
with open("saved_jsons/clean_aliases.json", "r") as file:
cleaned_award_names = json.load(file)
for awardName, awardCategory in d.items():
merged = False
substringOf = []
substringOfValues = []
for mergedName in new_d:
for alias in new_d[mergedName].aliases:
if cleaned_award_names[awardName] in cleaned_award_names[alias]:
substringOf.append(mergedName)
substringOfValues.append(new_d[mergedName].count)
break
if len(substringOf) == 1:
new_d[substringOf[0]].count += awardCategory.count
new_d[substringOf[0]].aliases |= awardCategory.aliases
merged = True
if not merged and len(substringOf) == 0:
new_d[awardName] = awardCategory
return new_d
def merge_simplify(d,simplify_dict):
with open("saved_jsons/clean_aliases.json", "r") as file:
cleaned_award_names = json.load(file)
simplify_keys = simplify_dict.keys()
new_d = dict()
d = dict(sorted(d.items(), key= lambda x: -len(x[0].split())))
set_to_name = {}
for awardName, awardCategory in d.items():
cleanedAwardName = cleaned_award_names[awardName]
for key in simplify_keys:
if key in cleanedAwardName:
cleanedAwardName = cleanedAwardName.replace(key,simplify_dict[key])
cleanedAwardNameWords = tuple(sorted(tuple(set(cleanedAwardName.split()))))
set_to_name[cleanedAwardNameWords] = awardName
if cleanedAwardNameWords not in new_d:
new_d[cleanedAwardNameWords] = AwardCategory(awardName)
# else:
# print(f"merging {awardName} into {cleanedAwardName}")
new_d[cleanedAwardNameWords].count += awardCategory.count
new_d[cleanedAwardNameWords].aliases |= awardCategory.aliases
new_d = {set_to_name[k]:v for k,v in new_d.items()}
clean_aliases(new_d)
return new_d
def add_short_versions(d):
with open("saved_jsons/clean_aliases.json", "r") as file:
cleaned_award_names = json.load(file)
for name, award in d.items():
curr_aliases = set(award.aliases)
for alias in award.aliases:
clean_award_name = cleaned_award_names[alias]
if clean_award_name not in curr_aliases:
curr_aliases.add(clean_award_name)
d[name].aliases = list(curr_aliases)
return d
def get_word_neighbors(d):
with open("saved_jsons/clean_aliases.json", "r") as file:
cleaned_award_names = json.load(file)
word_neighbors = dict()
for k in d.keys():
cleaned_name = cleaned_award_names[k]
split = cleaned_name[5:].split()
for idx, word in enumerate(split):
if word not in word_neighbors:
word_neighbors[word] = set()
word_set = word_neighbors[word]
if idx < len(split)-1:
following_word = split[idx+1]
if following_word not in ["or","and","in","for","to"]:
word_set.add(split[idx+1])
for word in word_neighbors:
word_neighbors[word] = list(word_neighbors[word])
dict_to_json(word_neighbors,"word_neighbors",folderName="saved_jsons/")
return word_neighbors
def get_simplification_dict(word_neighbors):
simplification_dict = {}
for word, following_words in word_neighbors.items():
if len(following_words) == 1:
following_word = following_words[0]
if len(word_neighbors[following_word]) != 1:
simplification_dict[f"{word} {following_word}"] = following_word
dict_to_json(simplification_dict,"simplification_dict",folderName="saved_jsons/")
return simplification_dict
def clean_aliases(d,pos=False):
cleaned_dict = {}
for award in d:
for alias in d[award].aliases:
cleaned_dict[alias] = clean_award_name(alias,pos=pos)
dict_to_json(cleaned_dict,"clean_aliases",folderName="saved_jsons/")
def clean_award_name(awardName:str,pos=False)-> str:
awardName = awardName.replace("for ","")
awardName = awardName.replace(" in a","")
awardName = awardName.replace(",","")
awardName = awardName.replace("-","")
awardName = awardName.replace(" in "," ")
awardName = re.sub(' +',' ',awardName).strip()
if pos:
nlp = spacy.load("en_core_web_sm")
doc = nlp(awardName)
lastidx = 0
for idx,token in enumerate(doc):
if token.pos_ not in ['ADP','AUX','CCONJ','DET','INTJ','NUM','PRON','SCONJ']:
lastidx = idx
cleaned = ''.join([token.text + " " for idx, token in enumerate(doc) if idx <= lastidx]).strip()
return cleaned if pos else awardName
def print_keys(d):
with open("test_files/keys.txt","w") as f:
for k in d:
f.write(f"\n{k}")
def get_award_categories_from_json(tweets):
awards = find_awards(tweets)
clean_aliases(awards,pos=True)
awards = merge_identical(awards)
awards = merge_substrings(awards)
awards = sort_dict_alpha(awards)
clean_aliases(awards)
word_neighbors = get_word_neighbors(awards)
simpl_dict = get_simplification_dict(word_neighbors)
awards = merge_simplify(awards,simpl_dict)
awards = add_short_versions(awards)
awards = sort_dict_alpha(awards)
dict_to_json(awards,"award_aliases",award=True,folderName="")
print_keys(awards)
return awards