302 lines
11 KiB
Python
302 lines
11 KiB
Python
from youtube.ytconfig import YoutubeConfig
|
|
from youtube.ytdbhelper import YoutubeDBHelper
|
|
from youtube.youtube import Youtube
|
|
from youtube.ytparser import YoutubeParser
|
|
|
|
import base.proxy
|
|
import base.baseclasses
|
|
|
|
import requests
|
|
import bs4
|
|
import json
|
|
import urllib
|
|
import concurrent.futures
|
|
import threading
|
|
import queue
|
|
import time
|
|
|
|
|
|
class YoutubeCrawler:
|
|
|
|
def __init__(self):
|
|
self.default_config = YoutubeConfig()
|
|
self.db_helper = YoutubeDBHelper()
|
|
self.proxy = {}
|
|
self.before_day = None
|
|
|
|
def set_arguments(self, browser, keyword_id, db_num, before_day, until_page):
|
|
params = self.db_helper.get_param(keyword_id)
|
|
self.before_day = before_day
|
|
self.default_config.set_param(keyword_id, db_num, params)
|
|
|
|
@staticmethod
|
|
def get_timeline_url(query, start_str, end_str, max_position=''):
|
|
params = {
|
|
'sp': 'CABQFA==', # 날짜순
|
|
'q': query,
|
|
}
|
|
|
|
url_tupple = (YoutubeConfig.protocol, YoutubeConfig.top_url, YoutubeConfig.search_url, '', urllib.parse.urlencode(params), '')
|
|
return urllib.parse.urlunparse(url_tupple)
|
|
|
|
@staticmethod
|
|
def get_content_url(user_id, tweet_id, max_position=''):
|
|
params = {
|
|
'max_position': max_position,
|
|
}
|
|
|
|
sub_url = TwitterConfig.conversation_url_form.format(user_id, tweet_id)
|
|
url_tupple = (TwitterConfig.protocol, TwitterConfig.top_url, sub_url, '', urllib.parse.urlencode(params), '')
|
|
return urllib.parse.urlunparse(url_tupple)
|
|
|
|
def get_page(self, url, is_runner, proc_id):
|
|
headers = {
|
|
'user-agent': 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/46.0.2490.86 Safari/537.36',
|
|
'Accept-Language': 'ko-KR,ko;q=0.8,en-US;q=0.6,en;q=0.4',
|
|
}
|
|
proxy_key = '{}-{}'.format('runner' if is_runner else 'content', proc_id)
|
|
if proxy_key not in self.proxy:
|
|
self.proxy[proxy_key] = base.proxy.get_proxy_for_requests()
|
|
|
|
resp = None
|
|
while True:
|
|
try:
|
|
resp = requests.get(url, headers=headers, proxies=self.proxy[proxy_key], timeout=3)
|
|
except Exception as e:
|
|
if self.proxy[proxy_key] == (None, None):
|
|
break
|
|
|
|
# print('[{}] proxy {} is expired. ({})'.format(proc_id, self.proxy[proxy_key], e))
|
|
base.proxy.set_proxy_expired(self.proxy[proxy_key])
|
|
self.proxy[proxy_key] = base.proxy.get_proxy_for_requests()
|
|
else:
|
|
break
|
|
|
|
return resp
|
|
|
|
def get_page_data(self, url, is_runner, proc_id):
|
|
for retry_cnt in range(5):
|
|
# get response
|
|
resp = self.get_page(url, is_runner, proc_id)
|
|
if not resp:
|
|
break
|
|
|
|
# check response
|
|
if resp.status_code == 404:
|
|
break
|
|
elif resp.status_code != 200:
|
|
print('[WARNING] content_get code {}'.format(resp.status_code))
|
|
continue
|
|
|
|
# parsing result
|
|
j = json.loads(resp.text)
|
|
if j['new_latent_count'] <= 0:
|
|
proxy_key = '{}-{}'.format('runner' if is_runner else 'content', proc_id)
|
|
self.proxy[proxy_key] = base.proxy.get_proxy_for_requests()
|
|
continue
|
|
else:
|
|
return j
|
|
|
|
return {
|
|
'items_html': '',
|
|
'has_more_items': False,
|
|
}
|
|
|
|
def runner_proc(self, proc_id, content_queue, result_queue, config):
|
|
print('[{}] {} to {} runner thread start'.format(proc_id, config.start_str, config.end_str))
|
|
|
|
b_continue = True
|
|
min_tweet_id = None
|
|
max_tweet_id = None
|
|
max_position = ''
|
|
tweet_count = 0
|
|
|
|
while b_continue:
|
|
url = self.get_timeline_url(config.keywords[0], config.start_str, config.end_str, max_position)
|
|
j = self.get_page_data(url, True, proc_id)
|
|
soup = bs4.BeautifulSoup(j['items_html'], 'lxml')
|
|
tweet_tags = soup.select("div.tweet")
|
|
|
|
tweet_ids = []
|
|
for tw in tweet_tags:
|
|
tweet = TweetParser.parse(tw, config.keyword_id)
|
|
tweet_ids.append(tweet.tweet_id)
|
|
|
|
if tweet.is_reply is True:
|
|
# print(' ## {}: {}...'.format(tweet.user_name, tweet.text[:20]))
|
|
continue
|
|
|
|
if tweet.reply_cnt > 0:
|
|
self.insert_content_pool(proc_id, content_queue, tweet, tweet)
|
|
self.db_helper.insert_tweet(tweet, config.db_num)
|
|
|
|
# print('{} {}: {}...'.format(tweet.created_at, tweet.user_name, tweet.text[:20]))
|
|
print('[{}] body {} ({}) [{}]'.format(proc_id, tweet.top_link, tweet.created_at, 'ok'))
|
|
|
|
count = len(tweet_tags)
|
|
tweet_count += count
|
|
|
|
b_continue = count > 0
|
|
# b_continue = j['has_more_items']
|
|
if b_continue:
|
|
if min_tweet_id is None:
|
|
min_tweet_id = tweet_ids[0]
|
|
max_tweet_id = tweet_ids[-1]
|
|
|
|
if 'min_position' in j:
|
|
max_position = j['min_position']
|
|
else:
|
|
max_position = 'TWEET-{}-{}'.format(max_tweet_id, min_tweet_id)
|
|
|
|
print('[{}] {} to {} runner thread finished {}'.format(proc_id, config.start_str, config.end_str, tweet_count))
|
|
result_queue.put({
|
|
'proc_id': proc_id,
|
|
'count': tweet_count,
|
|
})
|
|
# self.runner_processing[proc_id].value = False
|
|
return proc_id, tweet_count,
|
|
|
|
@staticmethod
|
|
def insert_content_pool(proc_id: int, qu, tweet: Youtube, tweet_top: Youtube):
|
|
# print(' [{}] pool insert: {} ({})'.format(proc_id, tweet.text[:20] if tweet.text else '', tweet.tweet_link))
|
|
qu.put((tweet, tweet_top,))
|
|
|
|
@staticmethod
|
|
def get_content(content_queue):
|
|
sleep_time = time.time()
|
|
while True:
|
|
try:
|
|
parent_tw, top_tw, = content_queue.get(block=True, timeout=2)
|
|
except Exception as e:
|
|
if time.time()-sleep_time > 15:
|
|
break
|
|
else:
|
|
continue
|
|
else:
|
|
return parent_tw, top_tw,
|
|
|
|
return None, None,
|
|
|
|
def content_proc(self, proc_id, content_queue, result_queue):
|
|
# print('[{}] content thread start'.format(proc_id))
|
|
#
|
|
# tweet_count = 0
|
|
# while True:
|
|
# parent_tw, top_tw, = self.get_content(content_queue)
|
|
# if not parent_tw:
|
|
# break
|
|
#
|
|
# # print(' [{}] <<< parent : {} ({})'.format(proc_id, parent_tw.text[:20], parent_tw.tweet_link))
|
|
#
|
|
# max_position = ''
|
|
#
|
|
# b_continue = True
|
|
# while b_continue:
|
|
# url = self.get_content_url(parent_tw.user_id, parent_tw.tweet_id, max_position)
|
|
# j = self.get_page_data(url, False, proc_id)
|
|
# soup = bs4.BeautifulSoup(j['items_html'], 'lxml')
|
|
#
|
|
# reply_container_tags = soup.select('li.ThreadedConversation')
|
|
# reply_container_tags += TweetParser.get_lone_container(soup, parent_tw)
|
|
# for container_tags in reply_container_tags:
|
|
# tweet_tags = container_tags.select('div.tweet')
|
|
# if len(tweet_tags) > 0:
|
|
# tweet = TweetParser.parse(tweet_tags[0], self.default_config.keyword_id, parent_tw.depth+1, top_tw)
|
|
# # print('[{}]>>> {} {}: {} ({}) ({})'.format(proc_id, tweet.created_at, tweet.user_name, tweet.text[:20], tweet.depth, tweet.tweet_link))
|
|
# print('[{}] reply {} [{}]'.format(proc_id, tweet.top_link, 'ok'))
|
|
# self.insert_content_pool(proc_id, content_queue, tweet, top_tw)
|
|
# self.db_helper.insert_tweet(tweet, self.default_config.db_num)
|
|
# tweet_count += 1
|
|
#
|
|
# b_continue = j['has_more_items']
|
|
# if b_continue:
|
|
# max_position = j['min_position']
|
|
#
|
|
# result_queue.put({
|
|
# 'proc_id': proc_id,
|
|
# 'count': tweet_count,
|
|
# })
|
|
#
|
|
# print('[{}] content thread finished'.format(proc_id))
|
|
tweet_count = 0
|
|
return proc_id, tweet_count,
|
|
|
|
def run(self):
|
|
start_time = time.time()
|
|
|
|
# run
|
|
worker_count = 1
|
|
split_config = self.default_config.split()
|
|
|
|
content_qu = queue.Queue()
|
|
runner_result_qu = queue.Queue()
|
|
content_result_qu = queue.Queue()
|
|
|
|
runner_result_cnt = 0
|
|
content_result_cnt = 0
|
|
|
|
content_pool = concurrent.futures.ThreadPoolExecutor(max_workers=worker_count)
|
|
[content_pool.submit(self.content_proc, proc_id, content_qu, content_result_qu) for proc_id in range(worker_count)]
|
|
|
|
runner_pool = concurrent.futures.ThreadPoolExecutor(max_workers=worker_count)
|
|
[runner_pool.submit(self.runner_proc, proc_id, content_qu, runner_result_qu, config) for proc_id, config in enumerate(split_config)]
|
|
|
|
runner_pool.shutdown(wait=True)
|
|
content_pool.shutdown(wait=True)
|
|
self.db_helper.flush()
|
|
|
|
# rerun zero runners
|
|
print('restart failed runner')
|
|
for retry in range(5):
|
|
runner_pool = concurrent.futures.ThreadPoolExecutor(max_workers=worker_count)
|
|
runner_result_qu2 = queue.Queue()
|
|
b_rerun = False
|
|
while not runner_result_qu.empty():
|
|
res = runner_result_qu.get()
|
|
runner_result_cnt += res['count']
|
|
proc_id = res['proc_id']
|
|
if res['count'] == 0:
|
|
runner_pool.submit(self.runner_proc, proc_id, content_qu, runner_result_qu2, split_config[proc_id])
|
|
b_rerun = True
|
|
|
|
while not content_result_qu.empty():
|
|
res = content_result_qu.get()
|
|
content_result_cnt += res['count']
|
|
|
|
if b_rerun:
|
|
content_pool = concurrent.futures.ThreadPoolExecutor(max_workers=worker_count)
|
|
[content_pool.submit(self.content_proc, proc_id, content_qu, content_result_qu) for proc_id in range(worker_count)]
|
|
|
|
runner_pool.shutdown(wait=True)
|
|
content_pool.shutdown(wait=True)
|
|
self.db_helper.flush()
|
|
|
|
runner_result_qu = runner_result_qu2
|
|
|
|
while not runner_result_qu.empty():
|
|
res = runner_result_qu.get()
|
|
runner_result_cnt += res['count']
|
|
|
|
while not content_result_qu.empty():
|
|
res = content_result_qu.get()
|
|
content_result_cnt += res['count']
|
|
|
|
print('total body count: {}'.format(runner_result_cnt))
|
|
print('total reply count: {}'.format(content_result_cnt))
|
|
|
|
# print running time
|
|
delta = time.time() - start_time
|
|
m, s = divmod(delta, 60)
|
|
h, m = divmod(m, 60)
|
|
print("finished all {}:{:02d}:{:02d} ".format(int(h), int(m), int(s)))
|
|
|
|
def start(self):
|
|
|
|
# run
|
|
while True:
|
|
self.default_config.reload_realtime(self.before_day)
|
|
self.run()
|
|
|
|
if not self.default_config.realtime:
|
|
break
|