- multithreading으로 모든 depth 크롤링

This commit is contained in:
mjjo
2017-07-25 17:44:02 +09:00
parent 4d6aef1310
commit 2f324b1710
4 changed files with 151 additions and 105 deletions

View File

@@ -1,6 +1,5 @@
from twitter.tweet import Tweet
import multiprocessing as mp
import copy
class TwitterDBHelper:
@@ -15,12 +14,6 @@ class TwitterDBHelper:
def __del__(self):
pass
# def __getstate__(self):
# return self.conn,
#
# def __setstate__(self, state):
# self.conn, = state
def get_param(self, keyword_id):
query = "select * from keyword where id = " + str(keyword_id)
params = []
@@ -33,12 +26,14 @@ class TwitterDBHelper:
with conn.cursor() as cursor:
cursor.execute(query)
params = cursor.fetchone()
conn.close()
except Exception as e:
print(e)
exit(1)
else:
conn.close()
return params
def insert_tweet(self, tweet: Tweet = None, db_num: int = -1, flush=False):
@@ -60,7 +55,8 @@ class TwitterDBHelper:
conn = self.pymysql.connect(host='bigbird.iptime.org',
user='admin', passwd='admin123',
db='concepters', charset='utf8',
cursorclass=self.pymysql.cursors.DictCursor)
cursorclass=self.pymysql.cursors.DictCursor,
connect_timeout=5)
except Exception as e:
print(e)

View File

@@ -18,6 +18,7 @@ class Tweet(DataDBRow):
self.reply_cnt = 0
self.retweet_cnt = 0
self.favorite_cnt = 0
self.top_link = None
self.tweet_link = None
self.depth = 0

View File

@@ -10,7 +10,8 @@ import requests
import bs4
import json
import urllib
import multiprocessing as mp
import threading
import queue
import time
@@ -19,22 +20,13 @@ class TwitterCrawler():
def __init__(self):
self.default_config = TwitterConfig()
self.db_helper = TwitterDBHelper()
self.proxies = None
self.runner_finished = mp.Value('b', False)
self.content_queue = mp.Queue()
self.result_queue = mp.Queue()
def set_arguments(self, browser, keyword_id, db_num, before_day, until_page):
params = self.db_helper.get_param(keyword_id)
self.default_config.set_param(keyword_id, db_num, params)
def __getstate__(self):
return self.default_config, self.db_helper, self.proxies, self.runner_finished, self.content_queue, self.result_queue,
def __setstate__(self, state):
self.default_config, self.db_helper, self.proxies, self.runner_finished, self.content_queue, self.result_queue, = state
def get_timeline_url(self, query, start_str, end_str, max_position=''):
@staticmethod
def get_timeline_url(query, start_str, end_str, max_position=''):
params = {
'f': 'tweets',
'vertical': 'default',
@@ -47,7 +39,8 @@ class TwitterCrawler():
url_tupple = (TwitterConfig.protocol, TwitterConfig.top_url, TwitterConfig.search_url, '', urllib.parse.urlencode(params), '')
return urllib.parse.urlunparse(url_tupple)
def get_content_url(self, user_id, tweet_id, max_position=''):
@staticmethod
def get_content_url(user_id, tweet_id, max_position=''):
params = {
'max_position': max_position,
}
@@ -56,29 +49,34 @@ class TwitterCrawler():
url_tupple = (TwitterConfig.protocol, TwitterConfig.top_url, sub_url, '', urllib.parse.urlencode(params), '')
return urllib.parse.urlunparse(url_tupple)
def get_page(self, url):
@staticmethod
def get_page(url, proc_id):
headers = {
'user-agent': 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/46.0.2490.86 Safari/537.36',
# 'Accept-Language': 'en-US',
'Accept-Language': 'ko-KR,ko;q=0.8,en-US;q=0.6,en;q=0.4',
}
if self.proxies is None:
self.proxies = base.proxy.get_proxy_for_requests()
# if proxies is None:
proxies = base.proxy.get_proxy_for_requests()
resp = None
for cnt in range(10):
while True:
try:
resp = requests.get(url, headers=headers, proxies=self.proxies, timeout=5)
resp = requests.get(url, headers=headers, proxies=proxies, timeout=3)
except Exception as e:
print('proxy {} is expired. ({})'.format(self.proxies, e))
base.proxy.set_proxy_expired(self.proxies)
self.proxies = base.proxy.get_proxy_for_requests()
if proxies == (None, None):
break
print('[{}] proxy {} is expired. ({})'.format(proc_id, proxies, e))
base.proxy.set_proxy_expired(proxies)
proxies = base.proxy.get_proxy_for_requests()
else:
break
return resp
def runner_proc(self, proc_id, config):
def runner_proc(self, proc_id, content_queue, result_queue, config):
print('{} to {} runner thread start'.format(config.start_str, config.end_str))
b_continue = True
min_tweet_id = None
max_tweet_id = None
@@ -89,7 +87,7 @@ class TwitterCrawler():
if min_tweet_id is not None:
max_position = 'TWEET-{}-{}'.format(max_tweet_id, min_tweet_id)
url = self.get_timeline_url(config.keywords[0], config.start_str, config.end_str, max_position)
resp = self.get_page(url)
resp = self.get_page(url, proc_id)
if resp is None:
break
@@ -105,7 +103,7 @@ class TwitterCrawler():
continue
if tweet.reply_cnt > 0:
self.insert_content_pool(tweet)
self.insert_content_pool(proc_id, content_queue, tweet, tweet)
self.db_helper.insert_tweet(tweet, config.db_num)
@@ -120,71 +118,101 @@ class TwitterCrawler():
max_tweet_id = tweet_tags[-1].attrs['data-item-id']
tweet_count += count
print('{} to {} runner finished {}'.format(config.start_str, config.end_str, tweet_count))
self.result_queue.put((proc_id, tweet_count, ))
print('{} to {} runner thread finished {}'.format(config.start_str, config.end_str, tweet_count))
result_queue.put((proc_id, tweet_count, ))
# self.runner_processing[proc_id].value = False
return proc_id, tweet_count,
def insert_content_pool(self, tweet: Tweet):
self.content_queue.put(tweet)
@staticmethod
def insert_content_pool(proc_id: int, qu, tweet: Tweet, tweet_top: Tweet):
print(' [{}] pool insert: {} ({})'.format(proc_id, tweet.text[:20] if tweet.text else '', tweet.tweet_link))
qu.put((tweet, tweet_top,))
def content_proc(self, proc_id):
while not self.runner_finished.value or not self.content_queue.empty():
@staticmethod
def get_content(content_queue):
sleep_time = time.time()
while True:
try:
parent_tw = self.content_queue.get(block=True, timeout=5)
except:
continue
parent_tw, top_tw, = content_queue.get(block=True, timeout=2)
except Exception as e:
if time.time()-sleep_time > 60:
break
else:
continue
else:
return parent_tw, top_tw,
return None, None,
def content_proc(self, proc_id, content_queue, result_queue):
print('[{}] content thread start'.format(proc_id))
tweet_count = 0
while True:
parent_tw, top_tw, = self.get_content(content_queue)
if not parent_tw:
break
print(' [{}] <<< parent : {} ({})'.format(proc_id, parent_tw.text[:20], parent_tw.tweet_link))
max_position = ''
b_continue = True
tweet_count = 0
while b_continue:
url = self.get_content_url(parent_tw.user_id, parent_tw.tweet_id, max_position)
resp = self.get_page(url)
resp = self.get_page(url, proc_id)
if resp is None or resp.status_code == 404:
break
elif resp.status_code != 200:
print('content_get code {}'.format(resp.status_code))
print('[WARNING] content_get code {}'.format(resp.status_code))
continue
j = json.loads(resp.content.decode('utf-8'))
soup = bs4.BeautifulSoup(j['items_html'], 'lxml')
reply_container_tags = soup.select('li.ThreadedConversation')
reply_container_tags += TweetParser.get_lone_container(soup, parent_tw)
for container_tags in reply_container_tags:
tweet_tags = container_tags.select('div.tweet')
for idx, tag in enumerate(tweet_tags):
if idx >= 2:
break
tweet = TweetParser.parse(tag, self.default_config.keyword_id, idx+1, parent_tw.tweet_link)
if len(tweet_tags) > 0:
tweet = TweetParser.parse(tweet_tags[0], self.default_config.keyword_id, parent_tw.depth+1, top_tw)
print('[{}]>>> {} {}: {} ({}) ({})'.format(proc_id, tweet.created_at, tweet.user_name, tweet.text[:20], tweet.depth, tweet.tweet_link))
self.insert_content_pool(proc_id, content_queue, tweet, top_tw)
self.db_helper.insert_tweet(tweet, self.default_config.db_num)
print('>>> {} {}: {}'.format(tweet.created_at, tweet.user_name, tweet.text[:20]))
tweet_count += 1
b_continue = j['has_more_items']
if b_continue:
max_position = j['min_position']
print('content proc {} finished'.format(proc_id))
return proc_id,
result_queue.put(tweet_count)
print('[{}] content thread finished'.format(proc_id))
return proc_id, tweet_count,
def debug_content(self):
content_qu = queue.Queue()
runner_result_qu = queue.Queue()
content_result_qu = queue.Queue()
test_tw = Tweet()
test_tw.tweet_link = 'https://twitter.com/moonriver365/status/885797401033818112'
test_tw.user_id = 'moonriver365'
test_tw.tweet_id = 885797401033818112
# test_tw.tweet_link = 'https://twitter.com/yniold_/status/886863893137678337'
# test_tw.user_id = 'yniold_'
# test_tw.tweet_id = 886863893137678337
self.content_queue.put(test_tw)
self.content_queue.put(test_tw)
print(self.content_queue.qsize())
print(self.content_queue.empty())
test_tw.tweet_link = 'https://twitter.com/Awesome_vely/status/888704413111435264'
test_tw.user_id = 'Awesome_vely'
test_tw.tweet_id = 888704413111435264
content_process = [mp.Process(target=self.content_proc, args=()) for _ in range(1)]
[p.start() for p in content_process]
[p.join() for p in content_process]
test_tw.text = '시작'
self.insert_content_pool(0, content_qu, test_tw, test_tw)
content_threads = [threading.Thread(target=self.content_proc, args=(proc_id, content_qu, content_result_qu)) for proc_id in range(16)]
[th.start() for th in content_threads]
[th.join() for th in content_threads]
while not content_result_qu.empty():
res = content_result_qu.get()
print('reply : {}'.format(res))
print('end all')
@@ -205,48 +233,56 @@ class TwitterCrawler():
# return
## contents check
# self.debug_content()
self.debug_content()
# split_config = self.default_config.split()
self.test_insert_db()
# self.test_insert_db()
exit()
print("debug end")
# exit()
def start(self):
start_time = time.time()
# self.debug()
# return
# run
split_config = self.default_config.split()
content_qu = queue.Queue()
runner_result_qu = queue.Queue()
content_result_qu = queue.Queue()
runner_process = [mp.Process(target=self.runner_proc, args=(proc_id, config, )) for proc_id, config in enumerate(split_config)]
content_process = [mp.Process(target=self.content_proc, args=(proc_id, )) for proc_id in range(16)]
runner_threads = [threading.Thread(target=self.runner_proc, args=(proc_id, content_qu, runner_result_qu, config)) for proc_id, config in enumerate(split_config)]
content_threads = [threading.Thread(target=self.content_proc, args=(proc_id, content_qu, content_result_qu)) for proc_id in range(16)]
[p.start() for p in runner_process]
[p.start() for p in content_process]
[th.start() for th in runner_threads]
[th.start() for th in content_threads]
[p.join() for p in runner_process]
self.runner_finished.value = True
[p.join() for p in content_process]
[th.join() for th in runner_threads]
[th.join() for th in content_threads]
# rerun zero pages
runner2_process = []
while not self.result_queue.empty():
result = self.result_queue.get()
if result[1] == 0:
runner2_process.append(
mp.Process(target=self.runner_proc, args=(result[0], split_config[result[0]], ))
)
content_process = [mp.Process(target=self.content_proc, args=(proc_id,)) for proc_id in range(16)]
self.runner_finished.value = False
[p.start() for p in runner2_process]
[p.start() for p in content_process]
# rerun zero runners
runner_threads = []
runner_result_qu2 = queue.Queue()
idx = 0
while not runner_result_qu.empty():
res = runner_result_qu.get()
if res == 0:
th = threading.Thread(target=self.runner_proc, args=(idx, content_qu, runner_result_qu2, split_config[idx]))
runner_threads.append(th)
[p.join() for p in runner2_process]
self.runner_finished.value = True
[p.join() for p in content_process]
idx += 1
content_threads = [threading.Thread(target=self.content_proc, args=(proc_id, content_qu, content_result_qu)) for proc_id in range(16)]
[th.start() for th in runner_threads]
[th.start() for th in content_threads]
[th.join() for th in runner_threads]
[th.join() for th in content_threads]
# print running time
delta = time.time() - start_time
m, s = divmod(delta, 60)
h, m = divmod(m, 60)

View File

@@ -8,7 +8,7 @@ import pytz
class TweetParser:
@staticmethod
def parse(tag, keyword_id, depth=0, main_url=None):
def parse(tag, keyword_id, depth=0, top_tw: Tweet=None):
tweet = Tweet()
tweet.tweet_id = int(tag.attrs['data-tweet-id'])
@@ -43,41 +43,54 @@ class TweetParser:
if len(reply_cnt_tag) > 0:
tweet.reply_cnt = int(reply_cnt_tag[0].attrs['data-tweet-stat-count'])
retweet_cnt_tag = tag.select('span.ProfileTweet-action--reply > span.ProfileTweet-actionCount')
retweet_cnt_tag = tag.select('span.ProfileTweet-action--retweet > span.ProfileTweet-actionCount')
if len(retweet_cnt_tag) > 0:
tweet.retweet_cnt = int(retweet_cnt_tag[0].attrs['data-tweet-stat-count'])
favorite_cnt_tag = tag.select('span.ProfileTweet-action--reply > span.ProfileTweet-actionCount')
favorite_cnt_tag = tag.select('span.ProfileTweet-action--favorite > span.ProfileTweet-actionCount')
if len(favorite_cnt_tag) > 0:
tweet.favorites_cnt = int(favorite_cnt_tag[0].attrs['data-tweet-stat-count'])
if main_url:
tweet.tweet_link = main_url
else:
link_tag = tag.select('a.js-permalink')
if len(link_tag) > 0:
tweet.tweet_link = TwitterConfig.protocol + '://' + TwitterConfig.top_url + link_tag[0].attrs['href']
link_tag = tag.select('a.js-permalink')
if len(link_tag) > 0:
tweet.tweet_link = TwitterConfig.protocol + '://' + TwitterConfig.top_url + link_tag[0].attrs['href']
tweet.top_link = top_tw.tweet_link if top_tw else tweet.tweet_link
tweet.depth = depth
tweet.platform_name = 'twitter'
tweet.platform_form = 'post'
tweet.platform_title = tweet.user_id
tweet.platform_title = top_tw.user_id if top_tw else tweet.user_id
tweet.article_form = 'body' if tweet.depth is 0 else 'reply'
# tweet.article_parent = None
tweet.article_id = tweet.user_id
tweet.article_nickname = tweet.user_name
# tweet.article_title = None
tweet.article_data = tweet.text
tweet.article_url = tweet.tweet_link
tweet.article_url = tweet.top_link
# tweet.article_hit = 0
tweet.article_date = tweet.created_at
# tweet.article_order = 0
tweet.article_order = tweet.depth
# tweet.article_profile = tweet.user_name
tweet.article_profileurl = TwitterConfig.protocol + '://' + TwitterConfig.top_url + '/' + tweet.user_id
tweet.platform_id = tweet.user_id
tweet.platform_id = top_tw.user_id if top_tw else tweet.user_id
tweet.keyword_id = keyword_id
# tweet.reply_url = ''
tweet.reply_url = tweet.tweet_link
# tweet.etc = ''
return tweet
@staticmethod
def get_lone_container(soup, parent_tw):
lone_tweets = soup.select('div.ThreadedConversation--loneTweet')
container_tags = []
for tag in reversed(lone_tweets):
li = tag.select('li.stream-item')
if len(li) > 0 and 'data-item-id' in li[0].attrs:
tweet_id = int(li[0].attrs['data-item-id'])
if tweet_id == parent_tw.tweet_id:
break
container_tags.append(tag)
return reversed(container_tags)