Skip to content
This repository has been archived by the owner on Oct 16, 2023. It is now read-only.

Commit

Permalink
Merge pull request #17 from Daethyra/0.1.2
Browse files Browse the repository at this point in the history
0.1.2_test-ready
  • Loading branch information
Daethyra authored Apr 7, 2023
2 parents d0ed69e + 246a486 commit 5b24d14
Show file tree
Hide file tree
Showing 6 changed files with 108 additions and 35 deletions.
47 changes: 47 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
# OpenDTS (Domestic Threat Scanner)

OpenDTS is a comprehensive threat detection system that aims to identify and track potential threats of extremist violence against minority groups in the United States. By leveraging advanced sentiment analysis techniques, OpenDTS scans tweets and news articles for potential threats of violence and stores this information in a database for further analysis.

The primary goal of OpenDTS is to provide neighborhoods and communities with actionable insights and early warning signs of potential extremist violence, allowing them to better prepare and protect themselves from such threats.

## Features

- Sentiment analysis using OpenAI's GPT-3.5-turbo model
- Twitter and NewsAPI data collection
- NoSQL database for storing and tracking potential threats
- Message queue system for scalable data processing
- Interactive dashboard for visualizing threat data (coming soon)

## Getting Started

### Prerequisites

- Python 3.7 or higher
- An OpenAI API key
- A Twitter Developer API key
- A NewsAPI API key
- RabbitMQ server

### Installation

1. Clone the repository:
git clone https://github.com/yourusername/OpenDTS.git

2. Change to the project directory:
cd OpenDTS

3. Install the required dependencies:
pip install -r requirements.txt

4. Create a `.env` file in the project directory and add the following environment variables:
API_KEY_OPENAI=your_openai_api_key
API_KEY_TWITTER=your_twitter_api_key
API_KEY_NEWSAPI=your_newsapi_api_key
RABBITMQ_URL=your_rabbitmq_url

5. Run the main script to start the pipeline:
python main.py

## License

This project is licensed under the GNU Affero General Public License v3.0 - see the [LICENSE.md](LICENSE.md) file for details.
5 changes: 5 additions & 0 deletions config.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,8 @@
API_KEY_TWITTER = os.getenv("API_KEY_TWITTER")
API_KEY_NEWSAPI = os.getenv("API_KEY_NEWSAPI")
RABBITMQ_URL = os.getenv("RABBITMQ_URL")

required_keys = [API_KEY_OPENAI, API_KEY_TWITTER, API_KEY_NEWSAPI, RABBITMQ_URL]

if not all(required_keys):
raise ValueError("One or more required environment variables are missing.")
23 changes: 21 additions & 2 deletions data_collection.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,29 @@
import tweepy
import requests
import time
from config import API_KEY_TWITTER, API_KEY_NEWSAPI
from requests.adapters import HTTPAdapter
from requests.packages.urllib3.util.retry import Retry
import logging

# Initialize logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

# Initialize the Twitter API
auth = tweepy.AppAuthHandler(API_KEY_TWITTER, API_KEY_TWITTER)
api = tweepy.API(auth)
api = tweepy.API(auth, wait_on_rate_limit=True, wait_on_rate_limit_notify=True)

session = requests.Session()
retry_strategy = Retry(
total=3,
status_forcelist=[429, 500, 502, 503, 504],
method_whitelist=["HEAD", "GET", "OPTIONS", "POST"],
backoff_factor=1
)
adapter = HTTPAdapter(max_retries=retry_strategy)
session.mount("https://", adapter)
session.mount("https://", adapter)

def collect_twitter_data(query):
try:
Expand All @@ -17,7 +36,7 @@ def collect_twitter_data(query):
def collect_news_data(query, from_date, to_date):
try:
url = f"https://newsapi.org/v2/everything?q={query}&from={from_date}&to={to_date}&apiKey={API_KEY_NEWSAPI}"
response = requests.get(url)
response = session.get(url, timeout=10)
response.raise_for_status()
news_data = response.json()['articles']
return [{'text': article['title'], 'id': article['url']} for article in news_data]
Expand Down
11 changes: 8 additions & 3 deletions data_processing.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
import openai
from functools import lru_cache
import logging
from config import API_KEY_OPENAI
from functools import lru_cache

logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

openai.api_key = API_KEY_OPENAI

Expand All @@ -15,7 +19,8 @@ def analyze_sentiment(text):
top_p=1
)
sentiment = response.choices[0].text.strip().lower()
logger.info(f"Sentiment analysis result: {sentiment}")
return sentiment
except Exception as e:
print(f"Error in analyze_sentiment: {e}")
return "unknown"
logger.error(f"Error in analyze_sentiment: {e}")
return "unknown"
17 changes: 12 additions & 5 deletions message_queue.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,20 @@
import pika
import json
import logging
from config import RABBITMQ_URL

logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

def send_message(queue, message):
connection = pika.BlockingConnection(pika.URLParameters(RABBITMQ_URL))
channel = connection.channel()
channel.queue_declare(queue=queue)
channel.basic_publish(exchange='', routing_key=queue, body=json.dumps(message))
connection.close()
try:
connection = pika.BlockingConnection(pika.URLParameters(RABBITMQ_URL))
channel = connection.channel()
channel.queue_declare(queue=queue)
channel.basic_publish(exchange='', routing_key=queue, body=json.dumps(message))
connection.close()
except Exception as e:
logger.error(f"Error in send_message: {e}")

def receive_messages(queue, callback):
connection = pika.BlockingConnection(pika.URLParameters(RABBITMQ_URL))
Expand Down
40 changes: 15 additions & 25 deletions pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,35 +2,25 @@
from data_collection import collect_twitter_data, collect_news_data
from data_processing import analyze_sentiment
from data_storage import create_db, store_data
import time
import concurrent.futures

def input_query():
source = input("Enter the source (twitter or news): ").lower()
query = input("Enter the query: ")

if source == "news":
from_date = input("Enter the start date (YYYY-MM-DD): ")
to_date = input("Enter the end date (YYYY-MM-DD): ")
return {'source': source, 'query': query, 'from_date': from_date, 'to_date': to_date}
else:
return {'source': source, 'query': query}

def main():
create_db()
while True:
message = input_query()
process_message(message)
def process_data(data):
sentiment = analyze_sentiment(data['text'])
is_dog_whistle = 1 if sentiment == 'negative' else 0
store_data(data['source'], data['text'], sentiment, is_dog_whistle)

def process_message(message):
if message['source'] == 'twitter':
tweets = collect_twitter_data(message['query'])
for tweet in tweets:
sentiment = analyze_sentiment(tweet['text'])
is_dog_whistle = 1 if sentiment == 'negative' else 0
store_data('twitter', tweet['text'], sentiment, is_dog_whistle)
with concurrent.futures.ThreadPoolExecutor() as executor:
for tweet in tweets:
executor.submit(process_data, {'source': 'twitter', 'text': tweet['text']})
elif message['source'] == 'news':
news_articles = collect_news_data(message['query'], message['from_date'], message['to_date'])
for article in news_articles:
sentiment = analyze_sentiment(article['text'])
is_dog_whistle = 1 if sentiment == 'negative' else 0
store_data('news', article['text'], sentiment, is_dog_whistle)
with concurrent.futures.ThreadPoolExecutor() as executor:
for article in news_articles:
executor.submit(process_data, {'source': 'news', 'text': article['text']})

if __name__ == "__main__":
create_db()
message_queue.receive_messages('sentiment_analysis', process_message)

0 comments on commit 5b24d14

Please sign in to comment.