Skip to content

Commit

Permalink
simplify scheduler
Browse files Browse the repository at this point in the history
  • Loading branch information
AuxProc committed Sep 27, 2015
1 parent a17e84c commit 0bbc6f1
Showing 1 changed file with 59 additions and 83 deletions.
142 changes: 59 additions & 83 deletions main.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
from TwitterAPI import TwitterAPI
import sched
import logging
import time
import json
import sys
import threading
from datetime import datetime, timedelta, date
from apscheduler.schedulers.blocking import BlockingScheduler
import random
import os


def get_logger():
Expand Down Expand Up @@ -49,7 +48,6 @@ class Config:
access_token_key = None
access_token_secret = None
daily_tweets = 300
retweet_update_time = 60
scan_update_time = 5400
clear_queue_time = 43200
min_posts_queue = 60
Expand Down Expand Up @@ -106,9 +104,27 @@ def append_file(self, p_object):
with open(self.filename, 'a+') as f:
f.write(str(p_object) + '\n')


ignore_list = None


def encode_timestamp(timestamp):
return str(timestamp).replace(" ", "+").replace(":", "%3A")


def random_time(start, end):
sec_diff = int((end-start).total_seconds())
secs_to_add = random.randint(0, sec_diff)
return start + timedelta(seconds=secs_to_add)


def get_daily_tweets_random_times(n, start, end):
times = []
for i in range(0, Config.daily_tweets):
times.append(random_time(start, end))
times.sort()
return times


def RandomTimes():
# we need to parse today's state to properly
# schedule the tweet times
Expand All @@ -125,39 +141,20 @@ def RandomTimes():
upper_bound = datetime(year, month, day, 23, 0, 0)
logger.info("[{}] - the upper bound is {}".format(datetime.now(), upper_bound))

randsched = BlockingScheduler()
logger.info("[{}] - Created blocking scheduler".format(datetime.now()))

def encode_timestamp(timestamp):
return str(timestamp).replace(" ", "+").replace(":", "%3A")

def random_time(start, end):
sec_diff = int((end-start).total_seconds())
secs_to_add = random.randint(0, sec_diff)
return start + timedelta(seconds=secs_to_add)

def get_daily_tweets_random_times(n, start, end):
times = []
for i in range(0, Config.daily_tweets):
times.append(random_time(start, end))
times.sort()
return times

times = get_daily_tweets_random_times(Config.daily_tweets, lower_bound, upper_bound)
logger.info("[{}] - Received {} times to schedule".format(datetime.now(),
len(times)))

for ind, atime in enumerate(times):
if ind == (Config.daily_tweets-1):
randsched.add_job(UpdateQueue, 'date', run_date=atime)
scheduler.add_job(UpdateQueue, 'date', run_date=atime)
logger.info("[{}] - added last task at {}".format(datetime.now(),
atime))
else:
randsched.add_job(UpdateQueue, 'date', run_date=atime)
scheduler.add_job(UpdateQueue, 'date', run_date=atime)
logger.info("[{}] - added task at {}".format(datetime.now(),
atime))

randsched.start()

def CheckError(r):
r = r.json()
Expand Down Expand Up @@ -390,7 +387,7 @@ def ScanForContests():
original_screen_name))
else:

logger.debug("{0} ignored {1} in ignore list".format(id, original_id))
logger.info("{0} ignored {1} in ignore list".format(id, original_id))

else:

Expand All @@ -400,15 +397,15 @@ def ScanForContests():

post_list.append(item)

logger.debug("{0} - {1} : {2}".format(id, screen_name, text))
logger.info("{0} - {1} : {2}".format(id, screen_name, text))
ignore_list.append(id)

else:

logger.info("{0} ignored {1} blocked user in ignore list".format(id, screen_name))
else:

logger.debug("{0} in ignore list".format(id))
logger.info("{0} in ignore list".format(id))

logger.info("Got {0} results".format(c))

Expand All @@ -422,59 +419,38 @@ def ScanForContests():
ratelimit_search[0],
ratelimit_search[2]))


class PeriodicScheduler(sched.scheduler):

def __init__(self, timefunc=time.time, delayfunc=time.sleep):
# List of tasks tha will be periodically be called
# tasks are stored as tuples: (delay, priority, action)
self.tasks = []
super().__init__(timefunc, delayfunc)

def enter(self, delay, priority, action):
self.tasks.append((delay, priority, action))

def run(self, blocking=True):
for i in range(len(self.tasks)):
self.run_task(i)

super().run(blocking)

def enter_task(self, index):
super().enter(self.tasks[index][0], self.tasks[index][1], self.run_task, argument=(index,))

def run_task(self, index):
self.enter_task(index)
t = threading.Thread(target=self.tasks[index][2])
t.daemon = True
try:
logger.debug("Scheduler is calling: {}".format(self.tasks[index][2].__name__))
t.start()
except Exception:
logger.exception("Exception in thread")

if __name__ == '__main__':
#Load config
Config.load('config.json')

#Initialize twitter api
api = TwitterAPI(
Config.consumer_key,
Config.consumer_secret,
Config.access_token_key,
Config.access_token_secret)

#Initialize ignorelist
ignore_list = IgnoreList("ignorelist")

#Initialize Scheduler
s = PeriodicScheduler()

s.enter(86400, 1, RandomTimes)
s.enter(Config.clear_queue_time, 2, ClearQueue)
s.enter(Config.rate_limit_update_time, 3, CheckRateLimit)
s.enter(Config.blocked_users_update_time, 4, CheckBlockedUsers)
s.enter(Config.scan_update_time, 5, ScanForContests)

#Init the program
s.run()

#Load config
Config.load('config.json')

#Initialize twitter api
api = TwitterAPI(
Config.consumer_key,
Config.consumer_secret,
Config.access_token_key,
Config.access_token_secret)

#Initialize ignorelist
ignore_list = IgnoreList("ignorelist")

#Initialize scheduler
scheduler = BlockingScheduler()

#First run
RandomTimes()
ClearQueue()
CheckRateLimit()
CheckBlockedUsers()
ScanForContests()

scheduler.add_job(RandomTimes, 'interval', hours=24)
scheduler.add_job(ClearQueue, 'interval', seconds=Config.clear_queue_time)
scheduler.add_job(CheckRateLimit, 'interval', seconds=Config.rate_limit_update_time)
scheduler.add_job(CheckBlockedUsers, 'interval', seconds=Config.blocked_users_update_time)
scheduler.add_job(ScanForContests, 'interval', seconds=Config.scan_update_time)

try:
scheduler.start()
except (KeyboardInterrupt, SystemExit):
pass

0 comments on commit 0bbc6f1

Please sign in to comment.