Files
clients/WebBasedCrawler/youtube/youtubecrawl.py
mjjo 1fb61f0b4c 트위터 크롤러 수정
- 프록시를 porxy2 db에 넣고 사용
2017-08-09 15:32:57 +09:00

302 lines
11 KiB
Python

from youtube.ytconfig import YoutubeConfig
from youtube.ytdbhelper import YoutubeDBHelper
from youtube.youtube import Youtube
from youtube.ytparser import YoutubeParser
import base.proxy
import base.baseclasses
import requests
import bs4
import json
import urllib
import concurrent.futures
import threading
import queue
import time
class YoutubeCrawler:
def __init__(self):
self.default_config = YoutubeConfig()
self.db_helper = YoutubeDBHelper()
self.proxy = {}
self.before_day = None
def set_arguments(self, browser, keyword_id, db_num, before_day, until_page):
params = self.db_helper.get_param(keyword_id)
self.before_day = before_day
self.default_config.set_param(keyword_id, db_num, params)
@staticmethod
def get_timeline_url(query, start_str, end_str, max_position=''):
params = {
'sp': 'CABQFA==', # 날짜순
'q': query,
}
url_tupple = (YoutubeConfig.protocol, YoutubeConfig.top_url, YoutubeConfig.search_url, '', urllib.parse.urlencode(params), '')
return urllib.parse.urlunparse(url_tupple)
@staticmethod
def get_content_url(user_id, tweet_id, max_position=''):
params = {
'max_position': max_position,
}
sub_url = TwitterConfig.conversation_url_form.format(user_id, tweet_id)
url_tupple = (TwitterConfig.protocol, TwitterConfig.top_url, sub_url, '', urllib.parse.urlencode(params), '')
return urllib.parse.urlunparse(url_tupple)
def get_page(self, url, is_runner, proc_id):
headers = {
'user-agent': 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/46.0.2490.86 Safari/537.36',
'Accept-Language': 'ko-KR,ko;q=0.8,en-US;q=0.6,en;q=0.4',
}
proxy_key = '{}-{}'.format('runner' if is_runner else 'content', proc_id)
if proxy_key not in self.proxy:
self.proxy[proxy_key] = base.proxy.get_proxy_for_requests()
resp = None
while True:
try:
resp = requests.get(url, headers=headers, proxies=self.proxy[proxy_key], timeout=3)
except Exception as e:
if self.proxy[proxy_key] == (None, None):
break
# print('[{}] proxy {} is expired. ({})'.format(proc_id, self.proxy[proxy_key], e))
base.proxy.set_proxy_expired(self.proxy[proxy_key])
self.proxy[proxy_key] = base.proxy.get_proxy_for_requests()
else:
break
return resp
def get_page_data(self, url, is_runner, proc_id):
for retry_cnt in range(5):
# get response
resp = self.get_page(url, is_runner, proc_id)
if not resp:
break
# check response
if resp.status_code == 404:
break
elif resp.status_code != 200:
print('[WARNING] content_get code {}'.format(resp.status_code))
continue
# parsing result
j = json.loads(resp.text)
if j['new_latent_count'] <= 0:
proxy_key = '{}-{}'.format('runner' if is_runner else 'content', proc_id)
self.proxy[proxy_key] = base.proxy.get_proxy_for_requests()
continue
else:
return j
return {
'items_html': '',
'has_more_items': False,
}
def runner_proc(self, proc_id, content_queue, result_queue, config):
print('[{}] {} to {} runner thread start'.format(proc_id, config.start_str, config.end_str))
b_continue = True
min_tweet_id = None
max_tweet_id = None
max_position = ''
tweet_count = 0
while b_continue:
url = self.get_timeline_url(config.keywords[0], config.start_str, config.end_str, max_position)
j = self.get_page_data(url, True, proc_id)
soup = bs4.BeautifulSoup(j['items_html'], 'lxml')
tweet_tags = soup.select("div.tweet")
tweet_ids = []
for tw in tweet_tags:
tweet = TweetParser.parse(tw, config.keyword_id)
tweet_ids.append(tweet.tweet_id)
if tweet.is_reply is True:
# print(' ## {}: {}...'.format(tweet.user_name, tweet.text[:20]))
continue
if tweet.reply_cnt > 0:
self.insert_content_pool(proc_id, content_queue, tweet, tweet)
self.db_helper.insert_tweet(tweet, config.db_num)
# print('{} {}: {}...'.format(tweet.created_at, tweet.user_name, tweet.text[:20]))
print('[{}] body {} ({}) [{}]'.format(proc_id, tweet.top_link, tweet.created_at, 'ok'))
count = len(tweet_tags)
tweet_count += count
b_continue = count > 0
# b_continue = j['has_more_items']
if b_continue:
if min_tweet_id is None:
min_tweet_id = tweet_ids[0]
max_tweet_id = tweet_ids[-1]
if 'min_position' in j:
max_position = j['min_position']
else:
max_position = 'TWEET-{}-{}'.format(max_tweet_id, min_tweet_id)
print('[{}] {} to {} runner thread finished {}'.format(proc_id, config.start_str, config.end_str, tweet_count))
result_queue.put({
'proc_id': proc_id,
'count': tweet_count,
})
# self.runner_processing[proc_id].value = False
return proc_id, tweet_count,
@staticmethod
def insert_content_pool(proc_id: int, qu, tweet: Youtube, tweet_top: Youtube):
# print(' [{}] pool insert: {} ({})'.format(proc_id, tweet.text[:20] if tweet.text else '', tweet.tweet_link))
qu.put((tweet, tweet_top,))
@staticmethod
def get_content(content_queue):
sleep_time = time.time()
while True:
try:
parent_tw, top_tw, = content_queue.get(block=True, timeout=2)
except Exception as e:
if time.time()-sleep_time > 15:
break
else:
continue
else:
return parent_tw, top_tw,
return None, None,
def content_proc(self, proc_id, content_queue, result_queue):
# print('[{}] content thread start'.format(proc_id))
#
# tweet_count = 0
# while True:
# parent_tw, top_tw, = self.get_content(content_queue)
# if not parent_tw:
# break
#
# # print(' [{}] <<< parent : {} ({})'.format(proc_id, parent_tw.text[:20], parent_tw.tweet_link))
#
# max_position = ''
#
# b_continue = True
# while b_continue:
# url = self.get_content_url(parent_tw.user_id, parent_tw.tweet_id, max_position)
# j = self.get_page_data(url, False, proc_id)
# soup = bs4.BeautifulSoup(j['items_html'], 'lxml')
#
# reply_container_tags = soup.select('li.ThreadedConversation')
# reply_container_tags += TweetParser.get_lone_container(soup, parent_tw)
# for container_tags in reply_container_tags:
# tweet_tags = container_tags.select('div.tweet')
# if len(tweet_tags) > 0:
# tweet = TweetParser.parse(tweet_tags[0], self.default_config.keyword_id, parent_tw.depth+1, top_tw)
# # print('[{}]>>> {} {}: {} ({}) ({})'.format(proc_id, tweet.created_at, tweet.user_name, tweet.text[:20], tweet.depth, tweet.tweet_link))
# print('[{}] reply {} [{}]'.format(proc_id, tweet.top_link, 'ok'))
# self.insert_content_pool(proc_id, content_queue, tweet, top_tw)
# self.db_helper.insert_tweet(tweet, self.default_config.db_num)
# tweet_count += 1
#
# b_continue = j['has_more_items']
# if b_continue:
# max_position = j['min_position']
#
# result_queue.put({
# 'proc_id': proc_id,
# 'count': tweet_count,
# })
#
# print('[{}] content thread finished'.format(proc_id))
tweet_count = 0
return proc_id, tweet_count,
def run(self):
start_time = time.time()
# run
worker_count = 1
split_config = self.default_config.split()
content_qu = queue.Queue()
runner_result_qu = queue.Queue()
content_result_qu = queue.Queue()
runner_result_cnt = 0
content_result_cnt = 0
content_pool = concurrent.futures.ThreadPoolExecutor(max_workers=worker_count)
[content_pool.submit(self.content_proc, proc_id, content_qu, content_result_qu) for proc_id in range(worker_count)]
runner_pool = concurrent.futures.ThreadPoolExecutor(max_workers=worker_count)
[runner_pool.submit(self.runner_proc, proc_id, content_qu, runner_result_qu, config) for proc_id, config in enumerate(split_config)]
runner_pool.shutdown(wait=True)
content_pool.shutdown(wait=True)
self.db_helper.flush()
# rerun zero runners
print('restart failed runner')
for retry in range(5):
runner_pool = concurrent.futures.ThreadPoolExecutor(max_workers=worker_count)
runner_result_qu2 = queue.Queue()
b_rerun = False
while not runner_result_qu.empty():
res = runner_result_qu.get()
runner_result_cnt += res['count']
proc_id = res['proc_id']
if res['count'] == 0:
runner_pool.submit(self.runner_proc, proc_id, content_qu, runner_result_qu2, split_config[proc_id])
b_rerun = True
while not content_result_qu.empty():
res = content_result_qu.get()
content_result_cnt += res['count']
if b_rerun:
content_pool = concurrent.futures.ThreadPoolExecutor(max_workers=worker_count)
[content_pool.submit(self.content_proc, proc_id, content_qu, content_result_qu) for proc_id in range(worker_count)]
runner_pool.shutdown(wait=True)
content_pool.shutdown(wait=True)
self.db_helper.flush()
runner_result_qu = runner_result_qu2
while not runner_result_qu.empty():
res = runner_result_qu.get()
runner_result_cnt += res['count']
while not content_result_qu.empty():
res = content_result_qu.get()
content_result_cnt += res['count']
print('total body count: {}'.format(runner_result_cnt))
print('total reply count: {}'.format(content_result_cnt))
# print running time
delta = time.time() - start_time
m, s = divmod(delta, 60)
h, m = divmod(m, 60)
print("finished all {}:{:02d}:{:02d} ".format(int(h), int(m), int(s)))
def start(self):
# run
while True:
self.default_config.reload_realtime(self.before_day)
self.run()
if not self.default_config.realtime:
break