From 2f324b171094c54fbe96a41f52aaf272a6f84fc0 Mon Sep 17 00:00:00 2001 From: mjjo Date: Tue, 25 Jul 2017 17:44:02 +0900 Subject: [PATCH] =?UTF-8?q?-=20multithreading=EC=9C=BC=EB=A1=9C=20?= =?UTF-8?q?=EB=AA=A8=EB=93=A0=20depth=20=ED=81=AC=EB=A1=A4=EB=A7=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- WebBasedCrawler/twitter/twdbhelper.py | 14 +- WebBasedCrawler/twitter/tweet.py | 1 + WebBasedCrawler/twitter/twittercrawl.py | 200 ++++++++++++++---------- WebBasedCrawler/twitter/twparser.py | 41 +++-- 4 files changed, 151 insertions(+), 105 deletions(-) diff --git a/WebBasedCrawler/twitter/twdbhelper.py b/WebBasedCrawler/twitter/twdbhelper.py index c1643cd..74d983a 100644 --- a/WebBasedCrawler/twitter/twdbhelper.py +++ b/WebBasedCrawler/twitter/twdbhelper.py @@ -1,6 +1,5 @@ from twitter.tweet import Tweet import multiprocessing as mp -import copy class TwitterDBHelper: @@ -15,12 +14,6 @@ class TwitterDBHelper: def __del__(self): pass - # def __getstate__(self): - # return self.conn, - # - # def __setstate__(self, state): - # self.conn, = state - def get_param(self, keyword_id): query = "select * from keyword where id = " + str(keyword_id) params = [] @@ -33,12 +26,14 @@ class TwitterDBHelper: with conn.cursor() as cursor: cursor.execute(query) params = cursor.fetchone() - conn.close() except Exception as e: print(e) exit(1) + else: + conn.close() + return params def insert_tweet(self, tweet: Tweet = None, db_num: int = -1, flush=False): @@ -60,7 +55,8 @@ class TwitterDBHelper: conn = self.pymysql.connect(host='bigbird.iptime.org', user='admin', passwd='admin123', db='concepters', charset='utf8', - cursorclass=self.pymysql.cursors.DictCursor) + cursorclass=self.pymysql.cursors.DictCursor, + connect_timeout=5) except Exception as e: print(e) diff --git a/WebBasedCrawler/twitter/tweet.py b/WebBasedCrawler/twitter/tweet.py index 8effdbf..c5d0d2c 100644 --- a/WebBasedCrawler/twitter/tweet.py +++ b/WebBasedCrawler/twitter/tweet.py @@ -18,6 +18,7 @@ class Tweet(DataDBRow): self.reply_cnt = 0 self.retweet_cnt = 0 self.favorite_cnt = 0 + self.top_link = None self.tweet_link = None self.depth = 0 diff --git a/WebBasedCrawler/twitter/twittercrawl.py b/WebBasedCrawler/twitter/twittercrawl.py index 0f6136a..582a176 100644 --- a/WebBasedCrawler/twitter/twittercrawl.py +++ b/WebBasedCrawler/twitter/twittercrawl.py @@ -10,7 +10,8 @@ import requests import bs4 import json import urllib -import multiprocessing as mp +import threading +import queue import time @@ -19,22 +20,13 @@ class TwitterCrawler(): def __init__(self): self.default_config = TwitterConfig() self.db_helper = TwitterDBHelper() - self.proxies = None - self.runner_finished = mp.Value('b', False) - self.content_queue = mp.Queue() - self.result_queue = mp.Queue() def set_arguments(self, browser, keyword_id, db_num, before_day, until_page): params = self.db_helper.get_param(keyword_id) self.default_config.set_param(keyword_id, db_num, params) - def __getstate__(self): - return self.default_config, self.db_helper, self.proxies, self.runner_finished, self.content_queue, self.result_queue, - - def __setstate__(self, state): - self.default_config, self.db_helper, self.proxies, self.runner_finished, self.content_queue, self.result_queue, = state - - def get_timeline_url(self, query, start_str, end_str, max_position=''): + @staticmethod + def get_timeline_url(query, start_str, end_str, max_position=''): params = { 'f': 'tweets', 'vertical': 'default', @@ -47,7 +39,8 @@ class TwitterCrawler(): url_tupple = (TwitterConfig.protocol, TwitterConfig.top_url, TwitterConfig.search_url, '', urllib.parse.urlencode(params), '') return urllib.parse.urlunparse(url_tupple) - def get_content_url(self, user_id, tweet_id, max_position=''): + @staticmethod + def get_content_url(user_id, tweet_id, max_position=''): params = { 'max_position': max_position, } @@ -56,29 +49,34 @@ class TwitterCrawler(): url_tupple = (TwitterConfig.protocol, TwitterConfig.top_url, sub_url, '', urllib.parse.urlencode(params), '') return urllib.parse.urlunparse(url_tupple) - def get_page(self, url): + @staticmethod + def get_page(url, proc_id): headers = { 'user-agent': 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/46.0.2490.86 Safari/537.36', - # 'Accept-Language': 'en-US', 'Accept-Language': 'ko-KR,ko;q=0.8,en-US;q=0.6,en;q=0.4', } - if self.proxies is None: - self.proxies = base.proxy.get_proxy_for_requests() + # if proxies is None: + proxies = base.proxy.get_proxy_for_requests() resp = None - for cnt in range(10): + while True: try: - resp = requests.get(url, headers=headers, proxies=self.proxies, timeout=5) + resp = requests.get(url, headers=headers, proxies=proxies, timeout=3) except Exception as e: - print('proxy {} is expired. ({})'.format(self.proxies, e)) - base.proxy.set_proxy_expired(self.proxies) - self.proxies = base.proxy.get_proxy_for_requests() + if proxies == (None, None): + break + + print('[{}] proxy {} is expired. ({})'.format(proc_id, proxies, e)) + base.proxy.set_proxy_expired(proxies) + proxies = base.proxy.get_proxy_for_requests() else: break return resp - def runner_proc(self, proc_id, config): + def runner_proc(self, proc_id, content_queue, result_queue, config): + print('{} to {} runner thread start'.format(config.start_str, config.end_str)) + b_continue = True min_tweet_id = None max_tweet_id = None @@ -89,7 +87,7 @@ class TwitterCrawler(): if min_tweet_id is not None: max_position = 'TWEET-{}-{}'.format(max_tweet_id, min_tweet_id) url = self.get_timeline_url(config.keywords[0], config.start_str, config.end_str, max_position) - resp = self.get_page(url) + resp = self.get_page(url, proc_id) if resp is None: break @@ -105,7 +103,7 @@ class TwitterCrawler(): continue if tweet.reply_cnt > 0: - self.insert_content_pool(tweet) + self.insert_content_pool(proc_id, content_queue, tweet, tweet) self.db_helper.insert_tweet(tweet, config.db_num) @@ -120,71 +118,101 @@ class TwitterCrawler(): max_tweet_id = tweet_tags[-1].attrs['data-item-id'] tweet_count += count - print('{} to {} runner finished {}'.format(config.start_str, config.end_str, tweet_count)) - self.result_queue.put((proc_id, tweet_count, )) + print('{} to {} runner thread finished {}'.format(config.start_str, config.end_str, tweet_count)) + result_queue.put((proc_id, tweet_count, )) + # self.runner_processing[proc_id].value = False return proc_id, tweet_count, - def insert_content_pool(self, tweet: Tweet): - self.content_queue.put(tweet) + @staticmethod + def insert_content_pool(proc_id: int, qu, tweet: Tweet, tweet_top: Tweet): + print(' [{}] pool insert: {} ({})'.format(proc_id, tweet.text[:20] if tweet.text else '', tweet.tweet_link)) + qu.put((tweet, tweet_top,)) - def content_proc(self, proc_id): - while not self.runner_finished.value or not self.content_queue.empty(): + @staticmethod + def get_content(content_queue): + sleep_time = time.time() + while True: try: - parent_tw = self.content_queue.get(block=True, timeout=5) - except: - continue + parent_tw, top_tw, = content_queue.get(block=True, timeout=2) + except Exception as e: + if time.time()-sleep_time > 60: + break + else: + continue + else: + return parent_tw, top_tw, + + return None, None, + + def content_proc(self, proc_id, content_queue, result_queue): + print('[{}] content thread start'.format(proc_id)) + + tweet_count = 0 + while True: + parent_tw, top_tw, = self.get_content(content_queue) + if not parent_tw: + break + + print(' [{}] <<< parent : {} ({})'.format(proc_id, parent_tw.text[:20], parent_tw.tweet_link)) max_position = '' b_continue = True - tweet_count = 0 while b_continue: url = self.get_content_url(parent_tw.user_id, parent_tw.tweet_id, max_position) - resp = self.get_page(url) + resp = self.get_page(url, proc_id) if resp is None or resp.status_code == 404: break elif resp.status_code != 200: - print('content_get code {}'.format(resp.status_code)) + print('[WARNING] content_get code {}'.format(resp.status_code)) continue j = json.loads(resp.content.decode('utf-8')) soup = bs4.BeautifulSoup(j['items_html'], 'lxml') reply_container_tags = soup.select('li.ThreadedConversation') + reply_container_tags += TweetParser.get_lone_container(soup, parent_tw) for container_tags in reply_container_tags: tweet_tags = container_tags.select('div.tweet') - - for idx, tag in enumerate(tweet_tags): - if idx >= 2: - break - - tweet = TweetParser.parse(tag, self.default_config.keyword_id, idx+1, parent_tw.tweet_link) + if len(tweet_tags) > 0: + tweet = TweetParser.parse(tweet_tags[0], self.default_config.keyword_id, parent_tw.depth+1, top_tw) + print('[{}]>>> {} {}: {} ({}) ({})'.format(proc_id, tweet.created_at, tweet.user_name, tweet.text[:20], tweet.depth, tweet.tweet_link)) + self.insert_content_pool(proc_id, content_queue, tweet, top_tw) self.db_helper.insert_tweet(tweet, self.default_config.db_num) - - print('>>> {} {}: {}'.format(tweet.created_at, tweet.user_name, tweet.text[:20])) tweet_count += 1 b_continue = j['has_more_items'] if b_continue: max_position = j['min_position'] - print('content proc {} finished'.format(proc_id)) - return proc_id, + result_queue.put(tweet_count) + print('[{}] content thread finished'.format(proc_id)) + return proc_id, tweet_count, def debug_content(self): + content_qu = queue.Queue() + runner_result_qu = queue.Queue() + content_result_qu = queue.Queue() + test_tw = Tweet() - test_tw.tweet_link = 'https://twitter.com/moonriver365/status/885797401033818112' - test_tw.user_id = 'moonriver365' - test_tw.tweet_id = 885797401033818112 + # test_tw.tweet_link = 'https://twitter.com/yniold_/status/886863893137678337' + # test_tw.user_id = 'yniold_' + # test_tw.tweet_id = 886863893137678337 - self.content_queue.put(test_tw) - self.content_queue.put(test_tw) - print(self.content_queue.qsize()) - print(self.content_queue.empty()) + test_tw.tweet_link = 'https://twitter.com/Awesome_vely/status/888704413111435264' + test_tw.user_id = 'Awesome_vely' + test_tw.tweet_id = 888704413111435264 - content_process = [mp.Process(target=self.content_proc, args=()) for _ in range(1)] - [p.start() for p in content_process] - [p.join() for p in content_process] + test_tw.text = '시작' + self.insert_content_pool(0, content_qu, test_tw, test_tw) + + content_threads = [threading.Thread(target=self.content_proc, args=(proc_id, content_qu, content_result_qu)) for proc_id in range(16)] + [th.start() for th in content_threads] + [th.join() for th in content_threads] + + while not content_result_qu.empty(): + res = content_result_qu.get() + print('reply : {}'.format(res)) print('end all') @@ -205,48 +233,56 @@ class TwitterCrawler(): # return ## contents check - # self.debug_content() + self.debug_content() # split_config = self.default_config.split() - self.test_insert_db() + # self.test_insert_db() - exit() + print("debug end") + # exit() def start(self): start_time = time.time() # self.debug() + # return + # run split_config = self.default_config.split() + content_qu = queue.Queue() + runner_result_qu = queue.Queue() + content_result_qu = queue.Queue() - runner_process = [mp.Process(target=self.runner_proc, args=(proc_id, config, )) for proc_id, config in enumerate(split_config)] - content_process = [mp.Process(target=self.content_proc, args=(proc_id, )) for proc_id in range(16)] + runner_threads = [threading.Thread(target=self.runner_proc, args=(proc_id, content_qu, runner_result_qu, config)) for proc_id, config in enumerate(split_config)] + content_threads = [threading.Thread(target=self.content_proc, args=(proc_id, content_qu, content_result_qu)) for proc_id in range(16)] - [p.start() for p in runner_process] - [p.start() for p in content_process] + [th.start() for th in runner_threads] + [th.start() for th in content_threads] - [p.join() for p in runner_process] - self.runner_finished.value = True - [p.join() for p in content_process] + [th.join() for th in runner_threads] + [th.join() for th in content_threads] - # rerun zero pages - runner2_process = [] - while not self.result_queue.empty(): - result = self.result_queue.get() - if result[1] == 0: - runner2_process.append( - mp.Process(target=self.runner_proc, args=(result[0], split_config[result[0]], )) - ) - content_process = [mp.Process(target=self.content_proc, args=(proc_id,)) for proc_id in range(16)] - self.runner_finished.value = False - [p.start() for p in runner2_process] - [p.start() for p in content_process] + # rerun zero runners + runner_threads = [] + runner_result_qu2 = queue.Queue() + idx = 0 + while not runner_result_qu.empty(): + res = runner_result_qu.get() + if res == 0: + th = threading.Thread(target=self.runner_proc, args=(idx, content_qu, runner_result_qu2, split_config[idx])) + runner_threads.append(th) - [p.join() for p in runner2_process] - self.runner_finished.value = True - [p.join() for p in content_process] + idx += 1 + content_threads = [threading.Thread(target=self.content_proc, args=(proc_id, content_qu, content_result_qu)) for proc_id in range(16)] + [th.start() for th in runner_threads] + [th.start() for th in content_threads] + + [th.join() for th in runner_threads] + [th.join() for th in content_threads] + + # print running time delta = time.time() - start_time m, s = divmod(delta, 60) h, m = divmod(m, 60) diff --git a/WebBasedCrawler/twitter/twparser.py b/WebBasedCrawler/twitter/twparser.py index 209d729..257f964 100644 --- a/WebBasedCrawler/twitter/twparser.py +++ b/WebBasedCrawler/twitter/twparser.py @@ -8,7 +8,7 @@ import pytz class TweetParser: @staticmethod - def parse(tag, keyword_id, depth=0, main_url=None): + def parse(tag, keyword_id, depth=0, top_tw: Tweet=None): tweet = Tweet() tweet.tweet_id = int(tag.attrs['data-tweet-id']) @@ -43,41 +43,54 @@ class TweetParser: if len(reply_cnt_tag) > 0: tweet.reply_cnt = int(reply_cnt_tag[0].attrs['data-tweet-stat-count']) - retweet_cnt_tag = tag.select('span.ProfileTweet-action--reply > span.ProfileTweet-actionCount') + retweet_cnt_tag = tag.select('span.ProfileTweet-action--retweet > span.ProfileTweet-actionCount') if len(retweet_cnt_tag) > 0: tweet.retweet_cnt = int(retweet_cnt_tag[0].attrs['data-tweet-stat-count']) - favorite_cnt_tag = tag.select('span.ProfileTweet-action--reply > span.ProfileTweet-actionCount') + favorite_cnt_tag = tag.select('span.ProfileTweet-action--favorite > span.ProfileTweet-actionCount') if len(favorite_cnt_tag) > 0: tweet.favorites_cnt = int(favorite_cnt_tag[0].attrs['data-tweet-stat-count']) - if main_url: - tweet.tweet_link = main_url - else: - link_tag = tag.select('a.js-permalink') - if len(link_tag) > 0: - tweet.tweet_link = TwitterConfig.protocol + '://' + TwitterConfig.top_url + link_tag[0].attrs['href'] + link_tag = tag.select('a.js-permalink') + if len(link_tag) > 0: + tweet.tweet_link = TwitterConfig.protocol + '://' + TwitterConfig.top_url + link_tag[0].attrs['href'] + tweet.top_link = top_tw.tweet_link if top_tw else tweet.tweet_link tweet.depth = depth tweet.platform_name = 'twitter' tweet.platform_form = 'post' - tweet.platform_title = tweet.user_id + tweet.platform_title = top_tw.user_id if top_tw else tweet.user_id tweet.article_form = 'body' if tweet.depth is 0 else 'reply' # tweet.article_parent = None tweet.article_id = tweet.user_id tweet.article_nickname = tweet.user_name # tweet.article_title = None tweet.article_data = tweet.text - tweet.article_url = tweet.tweet_link + tweet.article_url = tweet.top_link # tweet.article_hit = 0 tweet.article_date = tweet.created_at - # tweet.article_order = 0 + tweet.article_order = tweet.depth # tweet.article_profile = tweet.user_name tweet.article_profileurl = TwitterConfig.protocol + '://' + TwitterConfig.top_url + '/' + tweet.user_id - tweet.platform_id = tweet.user_id + tweet.platform_id = top_tw.user_id if top_tw else tweet.user_id tweet.keyword_id = keyword_id - # tweet.reply_url = '' + tweet.reply_url = tweet.tweet_link # tweet.etc = '' return tweet + + @staticmethod + def get_lone_container(soup, parent_tw): + lone_tweets = soup.select('div.ThreadedConversation--loneTweet') + container_tags = [] + for tag in reversed(lone_tweets): + li = tag.select('li.stream-item') + if len(li) > 0 and 'data-item-id' in li[0].attrs: + tweet_id = int(li[0].attrs['data-item-id']) + if tweet_id == parent_tw.tweet_id: + break + + container_tags.append(tag) + + return reversed(container_tags)