Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

merge with dev #2

Merged
merged 17 commits into from
Mar 3, 2018
Merged
1 change: 1 addition & 0 deletions client/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,4 @@


from .squid import SquidClient
from .py_cli import ProxyFetcher
128 changes: 128 additions & 0 deletions client/py_cli.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
"""
This module privodes core algrithm to pick up proxy ip resources.
"""
import time

from logger import client_logger
from utils import (
get_redis_conn, decode_all)
from config.rules import (
SCORE_MAPS, TTL_MAPS,
SPEED_MAPS)
from config.settings import (
TTL_VALIDATED_RESOURCE, LONGEST_RESPONSE_TIME,
LOWEST_SCORE, DATA_ALL)


class Strategy:
strategy = None

def check(self, strategy):
return self.strategy == strategy

def get_proxies_by_stragery(self, pool):
raise NotImplementedError


class RobinStrategy(Strategy):
def __init__(self):
super().__init__()
self.strategy = 'robin'

def get_proxies_by_stragery(self, pool):
if not pool:
return None
proxy = pool[0]
pool[0], pool[-1] = pool[-1], pool[0]
return proxy


class GreedyStrategy(Strategy):
def __init__(self):
self.strategy = 'greedy'

def get_proxies_by_stragery(self, pool):
if not pool:
return None
return pool[0]


class ProxyFetcher:
def __init__(self, usage, strategy='robin', length=10, fast_response=5):
"""
:param usage: one of SCORE_MAPS's keys, such as https
:param length: if total available proxies are less than length,
you must refresh pool
:param strategy: the load balance of proxy ip, the value is
one of ['robin', 'greedy']
:param fast_response: if you use greedy strategy, if will be needed to
decide whether a proxy ip should continue to be used
"""
self.score_queue = SCORE_MAPS.get(usage)
self.ttl_queue = TTL_MAPS.get(usage)
self.speed_queue = SPEED_MAPS.get(usage)
self.strategy = strategy
# pool is a queue, which is FIFO
self.pool = list()
self.length = length
self.fast_response = fast_response
self.handlers = [RobinStrategy(), GreedyStrategy()]
self.conn = get_redis_conn()

def get_proxy(self):
"""
get one available proxy from redis, if not any, None is returned
:return:
"""
# todo consider aysnc or multi thread
proxy = None
for handler in self.handlers:
if handler.strategy == self.strategy:
proxy = handler.get_proxies_by_stragery(self.pool)
self.refresh()
return proxy

def get_proxies(self):
"""core algrithm to get proxies from redis"""
start_time = int(time.time()) - TTL_VALIDATED_RESOURCE * 60
pipe = self.conn.pipeline(False)
pipe.zrevrangebyscore(self.score_queue, '+inf', LOWEST_SCORE)
pipe.zrevrangebyscore(self.ttl_queue, '+inf', start_time)
pipe.zrangebyscore(self.speed_queue, 0, 1000*LONGEST_RESPONSE_TIME)
scored_proxies, ttl_proxies, speed_proxies = pipe.execute()
proxies = scored_proxies and ttl_proxies and speed_proxies

if not proxies:
proxies = scored_proxies and ttl_proxies

if not proxies:
proxies = ttl_proxies

proxies = decode_all(proxies)
client_logger.info('{} proxies have been fetched'.format(len(proxies)))
self.pool.extend(proxies)

def proxy_feedback(self, res, response_time=None):
"""
client should give feedbacks after executing get_proxy()
:param res: one value of ['success', 'failure']
:param response_time: the response time using current proxy ip
"""
if res == 'failure':
self.delete_proxy(self.pool[0])
if self.strategy == 'greedy' and self.fast_response*1000 < response_time:
self.pool[0], self.pool[-1] = self.pool[-1], self.pool[0]

def refresh(self):
if len(self.pool) < self.length:
self.get_proxies()

def delete_proxy(self, proxy):
# it's not thread safe
self.pool.pop(0)
pipe = self.conn.pipeline(True)
pipe.srem(DATA_ALL, proxy)
pipe.zrem(self.score_queue, proxy)
pipe.zrem(self.speed_queue, proxy)
pipe.zrem(self.ttl_queue, proxy)
pipe.execute()
13 changes: 7 additions & 6 deletions client/squid.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import time
import subprocess

from logger import client_logger
from config.rules import (
SCORE_MAPS, TTL_MAPS,
SPEED_MAPS)
Expand All @@ -24,7 +25,7 @@ class SquidClient:

def __init__(self, task):
if task not in SCORE_MAPS.keys():
print('task value is invalid, https task will be used')
client_logger.warning('task value is invalid, https task will be used')
task = 'https'
self.score_queue = SCORE_MAPS.get(task)
self.ttl_queue = TTL_MAPS.get(task)
Expand All @@ -36,8 +37,8 @@ def __init__(self, task):
r = subprocess.check_output('which squid', shell=True)
self.squid_path = r.decode().strip()
except subprocess.CalledProcessError:
print('no squid is installed on this machine, or the installed dir '
'is not contained in environment path')
client_logger.warning('no squid is installed on this machine, or the installed dir is not '
'contained in environment path')
else:
self.squid_path = SQUID_BIN_PATH

Expand All @@ -47,7 +48,7 @@ def update_conf(self):
pipe = conn.pipeline(False)
pipe.zrevrangebyscore(self.score_queue, '+inf', LOWEST_SCORE)
pipe.zrevrangebyscore(self.ttl_queue, '+inf', start_time)
pipe.zrangebyscore(self.speed_queue, 0, 1000*LONGEST_RESPONSE_TIME)
pipe.zrangebyscore(self.speed_queue, 0, 1000 * LONGEST_RESPONSE_TIME)
scored_proxies, ttl_proxies, speed_proxies = pipe.execute()
proxies = scored_proxies and ttl_proxies and speed_proxies

Expand All @@ -63,7 +64,7 @@ def update_conf(self):
original_conf = fr.read()
if not proxies:
fw.write(original_conf)
print('no proxies got at this turn')
client_logger.info('no proxies got at this turn')
else:
conts.append(original_conf)
# if two proxies use the same ip and different ports and no name
Expand All @@ -77,4 +78,4 @@ def update_conf(self):
fw.write(conf)
# in docker, execute with shell will fail
subprocess.call([self.squid_path, '-k', 'reconfigure'], shell=False)
print('update squid conf successfully')
client_logger.info('update squid conf successfully')
25 changes: 20 additions & 5 deletions config/rules.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@
TTL_HTTPS_QUEUE, SPEED_HTTPS_QUEUE,
SPEED_HTTP_QUEUE, TEMP_WEIBO_QUEUE,
VALIDATED_WEIBO_QUEUE, TTL_WEIBO_QUEUE,
SPEED_WEIBO_QUEUE)
SPEED_WEIBO_QUEUE, TEMP_ZHIHU_QUEUE,
VALIDATED_ZHIHU_QUEUE, TTL_ZHIHU_QUEUE,
SPEED_ZHIHU_QUEUE)


__all__ = ['CRWALER_TASKS', 'VALIDATOR_TASKS', 'CRAWLER_TASK_MAPS',
Expand Down Expand Up @@ -360,6 +362,13 @@
'internal': 20,
'enable': 1,
},
{
'name': 'zhihu',
'task_queue': TEMP_ZHIHU_QUEUE,
'resource': VALIDATED_ZHIHU_QUEUE,
'internal': 20,
'enable': 1,
},
]

# crawlers will fetch tasks from the following queues
Expand All @@ -375,26 +384,32 @@
'init': INIT_HTTP_QUEUE,
'http': TEMP_HTTP_QUEUE,
'https': TEMP_HTTPS_QUEUE,
'weibo': TEMP_WEIBO_QUEUE
'weibo': TEMP_WEIBO_QUEUE,
'zhihu': TEMP_ZHIHU_QUEUE
}


# todo the three maps may be combined in one map
# validator scheduler and clients will fetch proxies from the following queues
SCORE_MAPS = {
'http': VALIDATED_HTTP_QUEUE,
'https': VALIDATED_HTTPS_QUEUE,
'weibo': VALIDATED_WEIBO_QUEUE
'weibo': VALIDATED_WEIBO_QUEUE,
'zhihu': VALIDATED_ZHIHU_QUEUE
}

# validator scheduler and clients will fetch proxies from the following queues which are verified recently
TTL_MAPS = {
'http': TTL_HTTP_QUEUE,
'https': TTL_HTTPS_QUEUE,
'weibo': TTL_WEIBO_QUEUE
'weibo': TTL_WEIBO_QUEUE,
'zhihu': TTL_ZHIHU_QUEUE
}

SPEED_MAPS = {
'http': SPEED_HTTP_QUEUE,
'https': SPEED_HTTPS_QUEUE,
'weibo': SPEED_WEIBO_QUEUE
'weibo': SPEED_WEIBO_QUEUE,
'zhihu': SPEED_ZHIHU_QUEUE
}

9 changes: 7 additions & 2 deletions config/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,14 @@

# scrapy log settings
LOG_LEVEL = 'DEBUG'
LOG_FILE = 'logs/haipproxy.log'


#####################################################################
# Custom settings of this project
#####################################################################
# redis settings.If you use docker-compose, REDIS_HOST = 'redis'
REDIS_HOST = 'redis'
REDIS_HOST = '127.0.0.1'
REDIS_PORT = 6379
REDIS_PASSWORD = '123456'
DEFAULT_REDIS_DB = 0
Expand Down Expand Up @@ -84,22 +85,26 @@
TEMP_HTTP_QUEUE = 'haipproxy:http:temp'
TEMP_HTTPS_QUEUE = 'haipproxy:https:temp'
TEMP_WEIBO_QUEUE = 'haipproxy:weibo:temp'
TEMP_ZHIHU_QUEUE = 'haipproxy:zhihu:temp'

# valited queues are zsets.squid and other clients fetch ip resources from them.
VALIDATED_HTTP_QUEUE = 'haipproxy:validated:http'
VALIDATED_HTTPS_QUEUE = 'haipproxy:validated:https'
VALIDATED_WEIBO_QUEUE = 'haipproxy:validated:weibo'
VALIDATED_ZHIHU_QUEUE = 'haipproxy:validated:zhihu'

# time to life of proxy ip resources
TTL_VALIDATED_RESOURCE = 2 # minutes
TTL_HTTP_QUEUE = 'haipproxy:ttl:http'
TTL_HTTPS_QUEUE = 'haipproxy:ttl:https'
TTL_WEIBO_QUEUE = 'haipproxy:ttl:weibo'
TTL_ZHIHU_QUEUE = 'haipproxy:ttl:zhihu'

# queue for proxy speed
SPEED_HTTP_QUEUE = 'haipproxy:speed:http'
SPEED_HTTPS_QUEUE = 'haipproxy:speed:https'
SPEED_WEIBO_QUEUE = 'haipproxy:speed:weibo'
SPEED_ZHIHU_QUEUE = 'haipproxy:speed:zhihu'

# squid settings on linux os
# execute sudo chown -R $USER /etc/squid/ and
Expand All @@ -110,6 +115,6 @@

# client settings
# client picks proxies which's response time is between 0 and 5 seconds
LONGEST_RESPONSE_TIME = 5
LONGEST_RESPONSE_TIME = 10
# client picks proxies which's score is not less than 7
LOWEST_SCORE = 7
11 changes: 7 additions & 4 deletions crawler/redis_spiders.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,15 @@
from scrapy.spiders import (
Spider, CrawlSpider)
from scrapy_splash import SplashRequest
from scrapy.utils.log import configure_logging

from logger import crawler_logger
from utils import get_redis_conn
from config.settings import (
VALIDATOR_FEED_SIZE, SPIDER_FEED_SIZE)


configure_logging(install_root_handler=True)
__all__ = ['RedisSpider', 'RedisAjaxSpider',
'RedisCrawlSpider', 'ValidatorRedisSpider']

Expand Down Expand Up @@ -47,7 +51,7 @@ def next_requests(self):
yield req
found += 1

self.logger.debug('Read {} requests from {}'.format(found, self.task_queue))
crawler_logger.info('Read {} requests from {}'.format(found, self.task_queue))

def schedule_next_requests(self):
for req in self.next_requests():
Expand Down Expand Up @@ -95,7 +99,7 @@ def next_requests(self):
yield req
found += 1

self.logger.debug('Read {} requests from {}'.format(found, self.task_queue))
crawler_logger.info('Read {} requests from {}'.format(found, self.task_queue))


class ValidatorRedisSpider(RedisSpider):
Expand All @@ -120,8 +124,7 @@ def next_requests_process(self, task_queue):
callback=self.parse, errback=self.parse_error)
yield req
found += 1

self.logger.debug('Read {} ip proxies from {}'.format(found, task_queue))
crawler_logger.info('Read {} ip proxies from {}'.format(found, task_queue))

def parse_error(self, failure):
raise NotImplementedError
Expand Down
5 changes: 3 additions & 2 deletions crawler/validators/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,12 @@
from .httpbin import (
HttpBinInitValidator, HttpValidator,
HttpsValidator)

from .zhihu import ZhiHuValidator
from .weibo import WeiBoValidator


all_validators = [
HttpBinInitValidator, HttpValidator,
HttpsValidator, WeiBoValidator
HttpsValidator, WeiBoValidator,
ZhiHuValidator
]
Loading