트위터 크롤러 max_position 수정

빠진 파일들 추가
This commit is contained in:
mjjo
2017-08-02 10:52:11 +09:00
parent de8f2f4c23
commit 3ceb59e815
4 changed files with 121 additions and 52 deletions

View File

@@ -110,7 +110,7 @@ def crawl_proxies(check_url=None):
with open('proxy.txt', 'w') as f:
print('proxy crawler dump start')
for proxy in proxies_alive:
print(proxy)
# print(proxy)
f.write(proxy + '\n')
print('proxy crawler dump end')

View File

@@ -16,7 +16,7 @@ import queue
import time
class TwitterCrawler():
class TwitterCrawler:
def __init__(self):
self.default_config = TwitterConfig()
@@ -65,7 +65,6 @@ class TwitterCrawler():
resp = None
while True:
try:
# time.sleep(random.random())
resp = requests.get(url, headers=headers, proxies=self.proxy[proxy_key], timeout=3)
except Exception as e:
if self.proxy[proxy_key] == (None, None):
@@ -79,6 +78,34 @@ class TwitterCrawler():
return resp
def get_page_data(self, url, is_runner, proc_id):
for retry_cnt in range(5):
# get response
resp = self.get_page(url, is_runner, proc_id)
if not resp:
break
# check response
if resp.status_code == 404:
break
elif resp.status_code != 200:
print('[WARNING] content_get code {}'.format(resp.status_code))
continue
# parsing result
j = json.loads(resp.text)
if j['new_latent_count'] <= 0:
proxy_key = '{}-{}'.format('runner' if is_runner else 'content', proc_id)
self.proxy[proxy_key] = base.proxy.get_proxy_for_requests()
continue
else:
return j
return {
'items_html': '',
'has_more_items': False,
}
def runner_proc(self, proc_id, content_queue, result_queue, config):
print('[{}] {} to {} runner thread start'.format(proc_id, config.start_str, config.end_str))
@@ -89,19 +116,15 @@ class TwitterCrawler():
tweet_count = 0
while b_continue:
if min_tweet_id is not None:
max_position = 'TWEET-{}-{}'.format(max_tweet_id, min_tweet_id)
url = self.get_timeline_url(config.keywords[0], config.start_str, config.end_str, max_position)
resp = self.get_page(url, True, proc_id)
if resp is None:
break
j = json.loads(resp.content.decode('utf-8'))
j = self.get_page_data(url, True, proc_id)
soup = bs4.BeautifulSoup(j['items_html'], 'lxml')
tweet_tags = soup.select("div.tweet")
tweet_ids = []
for tw in tweet_tags:
tweet = TweetParser.parse(tw, config.keyword_id)
tweet_ids.append(tweet.tweet_id)
if tweet.is_reply is True:
# print(' ## {}: {}...'.format(tweet.user_name, tweet.text[:20]))
@@ -109,20 +132,25 @@ class TwitterCrawler():
if tweet.reply_cnt > 0:
self.insert_content_pool(proc_id, content_queue, tweet, tweet)
self.db_helper.insert_tweet(tweet, config.db_num)
# print('{} {}: {}...'.format(tweet.created_at, tweet.user_name, tweet.text[:20]))
print('[{}] body {} [{}]'.format(proc_id, tweet.top_link, 'ok'))
print('[{}] body {} ({}) [{}]'.format(proc_id, tweet.top_link, tweet.created_at, 'ok'))
count = len(tweet_tags)
tweet_count += count
b_continue = j['has_more_items']
b_continue = count > 0
# b_continue = j['has_more_items']
if b_continue:
if min_tweet_id is None:
min_tweet_id = tweet_tags[0].attrs['data-item-id']
max_tweet_id = tweet_tags[-1].attrs['data-item-id']
min_tweet_id = tweet_ids[0]
max_tweet_id = tweet_ids[-1]
if 'min_position' in j:
max_position = j['min_position']
else:
max_position = 'TWEET-{}-{}'.format(max_tweet_id, min_tweet_id)
print('[{}] {} to {} runner thread finished {}'.format(proc_id, config.start_str, config.end_str, tweet_count))
result_queue.put({
@@ -169,14 +197,7 @@ class TwitterCrawler():
b_continue = True
while b_continue:
url = self.get_content_url(parent_tw.user_id, parent_tw.tweet_id, max_position)
resp = self.get_page(url, False, proc_id)
if resp is None or resp.status_code == 404:
break
elif resp.status_code != 200:
print('[WARNING] content_get code {}'.format(resp.status_code))
continue
j = json.loads(resp.content.decode('utf-8'))
j = self.get_page_data(url, False, proc_id)
soup = bs4.BeautifulSoup(j['items_html'], 'lxml')
reply_container_tags = soup.select('li.ThreadedConversation')
@@ -186,7 +207,7 @@ class TwitterCrawler():
if len(tweet_tags) > 0:
tweet = TweetParser.parse(tweet_tags[0], self.default_config.keyword_id, parent_tw.depth+1, top_tw)
# print('[{}]>>> {} {}: {} ({}) ({})'.format(proc_id, tweet.created_at, tweet.user_name, tweet.text[:20], tweet.depth, tweet.tweet_link))
# print('[{}] reply {} [{}]'.format(proc_id, tweet.top_link, 'ok'))
print('[{}] reply {} [{}]'.format(proc_id, tweet.top_link, 'ok'))
self.insert_content_pool(proc_id, content_queue, tweet, top_tw)
self.db_helper.insert_tweet(tweet, self.default_config.db_num)
tweet_count += 1
@@ -260,7 +281,7 @@ class TwitterCrawler():
start_time = time.time()
# run
worker_count = 64
worker_count = 16
split_config = self.default_config.split()
content_qu = queue.Queue()
@@ -282,34 +303,35 @@ class TwitterCrawler():
# rerun zero runners
print('restart failed runner')
runner_pool = concurrent.futures.ThreadPoolExecutor(max_workers=worker_count)
runner_result_qu2 = queue.Queue()
idx = 0
b_rerun = False
for retry in range(5):
runner_pool = concurrent.futures.ThreadPoolExecutor(max_workers=worker_count)
runner_result_qu2 = queue.Queue()
b_rerun = False
while not runner_result_qu.empty():
res = runner_result_qu.get()
runner_result_cnt += res['count']
proc_id = res['proc_id']
if res['count'] == 0:
runner_pool.submit(self.runner_proc, proc_id, content_qu, runner_result_qu2, split_config[proc_id])
b_rerun = True
while not content_result_qu.empty():
res = content_result_qu.get()
content_result_cnt += res['count']
if b_rerun:
content_pool = concurrent.futures.ThreadPoolExecutor(max_workers=worker_count)
[content_pool.submit(self.content_proc, proc_id, content_qu, content_result_qu) for proc_id in range(worker_count)]
runner_pool.shutdown(wait=True)
content_pool.shutdown(wait=True)
self.db_helper.flush()
runner_result_qu = runner_result_qu2
while not runner_result_qu.empty():
res = runner_result_qu.get()
runner_result_cnt += res['count']
if res['count'] == 0:
runner_pool.submit(self.runner_proc, idx, content_qu, runner_result_qu2, split_config[idx])
b_rerun = True
idx += 1
while not content_result_qu.empty():
res = content_result_qu.get()
content_result_cnt += res['count']
if b_rerun:
content_pool = concurrent.futures.ThreadPoolExecutor(max_workers=worker_count)
[content_pool.submit(self.content_proc, proc_id, content_qu, content_result_qu) for proc_id in range(worker_count)]
runner_pool.shutdown(wait=True)
content_pool.shutdown(wait=True)
self.db_helper.flush()
while not runner_result_qu2.empty():
res = runner_result_qu2.get()
runner_result_cnt += res['count']
while not content_result_qu.empty():
res = content_result_qu.get()
@@ -326,7 +348,6 @@ class TwitterCrawler():
def start(self):
# self.debug()
# return