diff --git a/WebBasedCrawler/base/debug.py b/WebBasedCrawler/base/debug.py new file mode 100644 index 0000000..da708ab --- /dev/null +++ b/WebBasedCrawler/base/debug.py @@ -0,0 +1,13 @@ +import linecache +import sys +import base.logger as logger + + +def print_exception(obj=None): + exc_type, exc_obj, tb = sys.exc_info() + f = tb.tb_frame + lineno = tb.tb_lineno + filename = f.f_code.co_filename + linecache.checkcache(filename) + line = linecache.getline(filename, lineno, f.f_globals) + logger.log('({}({}) Exception from "{}"):\n {}, {}'.format(filename, lineno, line.strip(), exc_obj, obj if obj else ''), logger.LogLevel.ERROR) diff --git a/WebBasedCrawler/base/logger.py b/WebBasedCrawler/base/logger.py index 5ae9b62..b8054a4 100644 --- a/WebBasedCrawler/base/logger.py +++ b/WebBasedCrawler/base/logger.py @@ -17,7 +17,7 @@ class CustomFormatter(logging.Formatter): logger = logging.getLogger('mylogger') # formatter = logging.Formatter('[ %(asctime)s][%(threadName)s][%(levelname)s][%(filename)s(%(lineno)s)] > %(message)s', datefmt='%Y-%m-%d %H:%M:%S') # formatter = CustomFormatter('[ %(asctime)s][%(thread)s][%(levelname)s][%(pathname)s(%(lineno)s)]\n> %(message)s', datefmt='%Y-%m-%d %H:%M:%S') -formatter = CustomFormatter('', datefmt='%Y-%m-%d %H:%M:%S') +formatter = CustomFormatter(datefmt='%Y-%m-%d %H:%M:%S') logging.handlers.RotatingFileHandler('crawler.log') diff --git a/WebBasedCrawler/base/proxy2.py b/WebBasedCrawler/base/proxy2.py index f9d3b72..0d95faf 100644 --- a/WebBasedCrawler/base/proxy2.py +++ b/WebBasedCrawler/base/proxy2.py @@ -12,6 +12,7 @@ import threading import random import requests +import base.debug as dbg Base = sqlalchemy.ext.declarative.declarative_base() @@ -97,9 +98,8 @@ class Proxy2Handler: def __init__(self): self.lock = threading.Lock() self.engine = sqlalchemy.create_engine('mysql+pymysql://admin:admin123@bigbird.iptime.org/concepters?charset=utf8') - session_factory = sqlalchemy.orm.sessionmaker(bind=self.engine, autocommit=True, autoflush=True) - self.session = sqlalchemy.orm.scoped_session(session_factory) - pass + SessionFactory = sqlalchemy.orm.sessionmaker(bind=self.engine, autocommit=True, autoflush=True) + self.session = sqlalchemy.orm.scoped_session(SessionFactory) def lock_enter(self): # logger.log('lock {}'.format(threading.current_thread().ident)) @@ -113,7 +113,7 @@ class Proxy2Handler: def commit(self): self.lock_enter() - self.session.commit() + # self.session.commit() self.lock_leave() def get_oldest(self, platform): @@ -171,32 +171,42 @@ class Proxy2Handler: def get(self, platform, proc_id=-1): self.lock_enter() - block_column = self.block_field_map[platform] try: - instances = self.session.query(Proxy2Model).filter(block_column == None).all() - except Exception as e: - self.lock_leave() + block_column = self.block_field_map[platform] try: - session_factory = sqlalchemy.orm.sessionmaker(bind=self.engine) - self.session = sqlalchemy.orm.scoped_session(session_factory) - except Exception as e2: - logger.log('{} session recreate'.format(proc_id)) + instances = self.session.query(Proxy2Model).filter(block_column == None).limit(32).all() + except Exception as e: + dbg.print_exception() + assert True - return None + self.lock_leave() - instance = instances[random.randint(0, len(instances)-1)] if len(instances) > 0 else None - if instance: - self.lock_leave() - return instance.get_instance_for_http() - else: - cnt = self.check_all_proxies(platform) - if cnt <= 0: - proxies = proxy_crawler.crawl_proxies() - self.insert_all(proxies) + # try: + # session_factory = sqlalchemy.orm.sessionmaker(bind=self.engine) + # self.session = sqlalchemy.orm.scoped_session(session_factory) + # logger.log('{} session recreate'.format(proc_id)) + # + # except Exception as e2: + # dbg.print_exception(e2) - self.lock_leave() - return self.get(platform, proc_id) + return None + + instance = instances[random.randint(0, len(instances)-1)] if len(instances) > 0 else None + if instance: + self.lock_leave() + return instance.get_instance_for_http() + else: + cnt = self.check_all_proxies(platform) + if cnt <= 0: + proxies = proxy_crawler.crawl_proxies() + self.insert_all(proxies) + + self.lock_leave() + return self.get(platform, proc_id) + + except Exception as e: + dbg.print_exception(e) def insert(self, ip, port): instance = self.get_instance(ip, port) @@ -236,10 +246,13 @@ class Proxy2Handler: # self.session.add(Proxy2Model(proxy['ip'], proxy['port'])) def set_proxy_blocked(self, ip, port, platform): - block_column = self.block_field_map[platform] - query = self.get_query(ip, port) - query.update({block_column: datetime.datetime.now()}) - self.commit() + try: + block_column = self.block_field_map[platform] + query = self.get_query(ip, port) + query.update({block_column: datetime.datetime.now()}) + self.commit() + except Exception as e: + dbg.print_exception(e) if __name__ == '__main__': proxy_handler = Proxy2Handler() diff --git a/WebBasedCrawler/twitter/twittercrawl.py b/WebBasedCrawler/twitter/twittercrawl.py index 2dd0a6d..eceab9b 100644 --- a/WebBasedCrawler/twitter/twittercrawl.py +++ b/WebBasedCrawler/twitter/twittercrawl.py @@ -1,4 +1,4 @@ -from twitter.twconfig import TwitterConfig +from twitter.twconfig import TwitterConfig from twitter.twdbhelper import TwitterDBHelper from twitter.tweet import Tweet from twitter.twparser import TweetParser @@ -7,6 +7,7 @@ import base.proxy import base.proxy2 as proxy2 import base.baseclasses import base.logger as logger +import base.debug as dbg import requests import bs4 @@ -19,367 +20,368 @@ import time class TwitterCrawler: + def __init__(self): + self.default_config = TwitterConfig() + self.db_helper = TwitterDBHelper() + self.proxy = {} + self.proxy_handler = proxy2.Proxy2Handler() + self.before_day = None + self.runner_finished_queue = queue.Queue() + + def set_arguments(self, browser, keyword_id, db_num, before_day, until_page): + params = self.db_helper.get_param(keyword_id) + self.before_day = before_day + self.default_config.set_param(keyword_id, db_num, params) + + @staticmethod + def get_timeline_url(query, start_str, end_str, max_position=''): + params = { + 'f': 'tweets', + 'vertical': 'default', + 'src': 'typd', + 'q': '{} since:{} until:{}'.format(query, start_str, end_str), + 'language': 'en', + 'max_position': max_position, + } + + url_tupple = (TwitterConfig.protocol, TwitterConfig.top_url, TwitterConfig.search_url, '', urllib.parse.urlencode(params), '') + return urllib.parse.urlunparse(url_tupple) + + @staticmethod + def get_content_url(user_id, tweet_id, max_position=''): + params = { + 'max_position': max_position, + } + + sub_url = TwitterConfig.conversation_url_form.format(user_id, tweet_id) + url_tupple = (TwitterConfig.protocol, TwitterConfig.top_url, sub_url, '', urllib.parse.urlencode(params), '') + return urllib.parse.urlunparse(url_tupple) + + def get_proxy(self, proxy_key): + proxy = None + while not proxy: + proxy = self.proxy_handler.get(proxy2.Platform.TWITTER, proxy_key) + if not proxy: + time.sleep(1) + + return proxy + + def get_page(self, url, is_runner, proc_id): + headers = { + 'user-agent': 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/46.0.2490.86 Safari/537.36', + 'Accept-Language': 'ko-KR,ko;q=0.8,en-US;q=0.6,en;q=0.4', + } + + proxy_key = '{}-{}'.format('runner' if is_runner else 'content', proc_id) + if proxy_key not in self.proxy: + # self.proxy[proxy_key] = base.proxy.get_proxy_for_requests() + self.proxy[proxy_key] = self.get_proxy(proxy_key) + + resp = None + while True: + try: + resp = requests.get(url, headers=headers, proxies=self.proxy[proxy_key], timeout=3) + except Exception as e: + if self.proxy[proxy_key] == (None, None): + break + + # print('[{}] proxy {} is expired. ({})'.format(proc_id, self.proxy[proxy_key], e)) + # base.proxy.set_proxy_expired(self.proxy[proxy_key]) + self.proxy_handler.set_proxy_blocked(self.proxy[proxy_key]['ip'], self.proxy[proxy_key]['port'], proxy2.Platform.TWITTER) + # self.proxy[proxy_key] = base.proxy.get_proxy_for_requests() + self.proxy[proxy_key] = self.get_proxy(proxy_key) + else: + break + + return resp + + def get_page_data(self, url, is_runner, proc_id): + for retry_cnt in range(5): + # get response + resp = self.get_page(url, is_runner, proc_id) + if not resp: + break + + # check response + if resp.status_code == 404: + break + elif resp.status_code != 200: + print('[WARNING] content_get code {}'.format(resp.status_code)) + continue + + # parsing result + j = json.loads(resp.text) + if j['new_latent_count'] <= 0: + proxy_key = '{}-{}'.format('runner' if is_runner else 'content', proc_id) + # self.proxy[proxy_key] = base.proxy.get_proxy_for_requests() + self.proxy[proxy_key] = self.get_proxy(proxy_key) + continue + else: + return j + + + return { + 'items_html': '', + 'has_more_items': False, + } + + def runner_proc(self, proc_id, content_queue, result_queue, config): + try: + print('[{}] {} to {} runner thread start'.format(proc_id, config.start_str, config.end_str)) + + b_continue = True + min_tweet_id = None + max_tweet_id = None + max_position = '' + tweet_count = 0 + + while b_continue: + url = self.get_timeline_url(config.keywords[0], config.start_str, config.end_str, max_position) + j = self.get_page_data(url, True, proc_id) + soup = bs4.BeautifulSoup(j['items_html'], 'lxml') + tweet_tags = soup.select("div.tweet") + + tweet_ids = [] + for tw in tweet_tags: + tweet = TweetParser.parse(tw, config.keyword_id) + tweet_ids.append(tweet.tweet_id) + + if tweet.is_reply is True: + # print(' ## {}: {}...'.format(tweet.user_name, tweet.text[:20])) + continue + + if tweet.reply_cnt > 0: + self.insert_content_pool(proc_id, content_queue, tweet, tweet) + self.db_helper.insert_tweet(tweet, config.db_num) + + # print('{} {}: {}...'.format(tweet.created_at, tweet.user_name, tweet.text[:20])) + print('[{}] body {} ({}) [{}]'.format(proc_id, tweet.top_link, tweet.created_at, 'ok')) + + count = len(tweet_tags) + tweet_count += count + + b_continue = count > 0 + # b_continue = j['has_more_items'] + if b_continue: + if min_tweet_id is None: + min_tweet_id = tweet_ids[0] + max_tweet_id = tweet_ids[-1] + + if 'min_position' in j: + max_position = j['min_position'] + else: + max_position = 'TWEET-{}-{}'.format(max_tweet_id, min_tweet_id) + + print('[{}] {} to {} runner thread finished {}'.format(proc_id, config.start_str, config.end_str, tweet_count)) + result_queue.put({ + 'proc_id': proc_id, + 'count': tweet_count, + }) + # self.runner_processing[proc_id].value = False + except Exception as e: + dbg.print_exception(e) + + return proc_id, tweet_count, + + @staticmethod + def insert_content_pool(proc_id: int, qu, tweet: Tweet, tweet_top: Tweet): + # print(' [{}] pool insert: {} ({})'.format(proc_id, tweet.text[:20] if tweet.text else '', tweet.tweet_link)) + qu.put((tweet, tweet_top,)) + + def get_content(self, content_queue): + sleep_time = time.time() + while True: + try: + parent_tw, top_tw, = content_queue.get(block=True, timeout=2) + except Exception as e: + if not self.runner_finished_queue.empty() and time.time()-sleep_time > 15: + break + else: + continue + else: + return parent_tw, top_tw, + + return None, None, + + def content_proc(self, proc_id, content_queue, result_queue): + print('[{}] content thread start'.format(proc_id)) + + tweet_count = 0 + while True: + parent_tw, top_tw, = self.get_content(content_queue) + if not parent_tw: + break + + # print(' [{}] <<< parent : {} ({})'.format(proc_id, parent_tw.text[:20], parent_tw.tweet_link)) + + max_position = '' + + b_continue = True + while b_continue: + url = self.get_content_url(parent_tw.user_id, parent_tw.tweet_id, max_position) + j = self.get_page_data(url, False, proc_id) + soup = bs4.BeautifulSoup(j['items_html'], 'lxml') + + reply_container_tags = soup.select('li.ThreadedConversation') + reply_container_tags += TweetParser.get_lone_container(soup, parent_tw) + for container_tags in reply_container_tags: + tweet_tags = container_tags.select('div.tweet') + if len(tweet_tags) > 0: + tweet = TweetParser.parse(tweet_tags[0], self.default_config.keyword_id, parent_tw.depth+1, top_tw) + # print('[{}]>>> {} {}: {} ({}) ({})'.format(proc_id, tweet.created_at, tweet.user_name, tweet.text[:20], tweet.depth, tweet.tweet_link)) + print('[{}] reply {} [{}]'.format(proc_id, tweet.top_link, 'ok')) + self.insert_content_pool(proc_id, content_queue, tweet, top_tw) + self.db_helper.insert_tweet(tweet, self.default_config.db_num) + tweet_count += 1 + + b_continue = j['has_more_items'] + if b_continue: + max_position = j['min_position'] + + result_queue.put({ + 'proc_id': proc_id, + 'count': tweet_count, + }) + + print('[{}] content thread finished'.format(proc_id)) + return proc_id, tweet_count, + + def debug_content(self): + content_qu = queue.Queue() + runner_result_qu = queue.Queue() + content_result_qu = queue.Queue() + + test_tw = Tweet() + # test_tw.tweet_link = 'https://twitter.com/yniold_/status/886863893137678337' + # test_tw.user_id = 'yniold_' + # test_tw.tweet_id = 886863893137678337 + + test_tw.tweet_link = 'https://twitter.com/Awesome_vely/status/888704413111435264' + test_tw.user_id = 'Awesome_vely' + test_tw.tweet_id = 888704413111435264 + + test_tw.text = '?����' + self.insert_content_pool(0, content_qu, test_tw, test_tw) + + content_threads = [threading.Thread(target=self.content_proc, args=(proc_id, content_qu, content_result_qu)) for proc_id in range(16)] + [th.start() for th in content_threads] + [th.join() for th in content_threads] + + while not content_result_qu.empty(): + res = content_result_qu.get() + print('reply : {}'.format(res)) + + print('end all') + + def test_insert_db(self): + test_tw = Tweet() + test_tw.tweet_link = 'https://twitter.com/moonriver365/status/885797401033818112' + test_tw.user_id = 'moonriver365' + test_tw.tweet_id = 885797401033818112 + for _ in range(5): + self.db_helper.insert_tweet(test_tw, self.default_config.db_num) + + def debug(self): + if base.baseclasses.is_debug: + ## check proxy + # base.proxy.get_proxy_from_file('proxy.txt') + # proxy = {'https': 'http://45.56.86.93:3128', 'http': 'http://45.56.86.93:3128'} + # base.proxy.set_proxy_expired(proxy) + # return + + ## contents check + self.debug_content() + + # split_config = self.default_config.split() + + # self.test_insert_db() + + print("debug end") + # exit() + + def run(self): + start_time = time.time() + + # run + worker_count = 1 + split_config = self.default_config.split() + + content_qu = queue.Queue() + runner_result_qu = queue.Queue() + content_result_qu = queue.Queue() + + runner_result_cnt = 0 + content_result_cnt = 0 + + content_pool = concurrent.futures.ThreadPoolExecutor(max_workers=worker_count) + [content_pool.submit(self.content_proc, proc_id, content_qu, content_result_qu) for proc_id in range(worker_count)] + + runner_pool = concurrent.futures.ThreadPoolExecutor(max_workers=worker_count) + [runner_pool.submit(self.runner_proc, proc_id, content_qu, runner_result_qu, config) for proc_id, config in enumerate(split_config)] + + runner_pool.shutdown(wait=True) + self.runner_finished_queue.put(True) + content_pool.shutdown(wait=True) + self.db_helper.flush() + + # rerun zero runners + print('restart failed runner') + while not self.runner_finished_queue.empty(): + self.runner_finished_queue.get() + + for retry in range(5): + runner_pool = concurrent.futures.ThreadPoolExecutor(max_workers=worker_count) + runner_result_qu2 = queue.Queue() + b_rerun = False + while not runner_result_qu.empty(): + res = runner_result_qu.get() + runner_result_cnt += res['count'] + proc_id = res['proc_id'] + if res['count'] == 0: + runner_pool.submit(self.runner_proc, proc_id, content_qu, runner_result_qu2, split_config[proc_id]) + b_rerun = True + + while not content_result_qu.empty(): + res = content_result_qu.get() + content_result_cnt += res['count'] + + if b_rerun: + content_pool = concurrent.futures.ThreadPoolExecutor(max_workers=worker_count) + [content_pool.submit(self.content_proc, proc_id, content_qu, content_result_qu) for proc_id in range(worker_count)] + + runner_pool.shutdown(wait=True) + self.runner_finished_queue.put(True) + content_pool.shutdown(wait=True) + self.db_helper.flush() + + runner_result_qu = runner_result_qu2 - def __init__(self): - self.default_config = TwitterConfig() - self.db_helper = TwitterDBHelper() - self.proxy = {} - self.proxy_handler = proxy2.Proxy2Handler() - self.before_day = None - self.runner_finished_queue = queue.Queue() - - def set_arguments(self, browser, keyword_id, db_num, before_day, until_page): - params = self.db_helper.get_param(keyword_id) - self.before_day = before_day - self.default_config.set_param(keyword_id, db_num, params) - - @staticmethod - def get_timeline_url(query, start_str, end_str, max_position=''): - params = { - 'f': 'tweets', - 'vertical': 'default', - 'src': 'typd', - 'q': '{} since:{} until:{}'.format(query, start_str, end_str), - 'language': 'en', - 'max_position': max_position, - } - - url_tupple = (TwitterConfig.protocol, TwitterConfig.top_url, TwitterConfig.search_url, '', urllib.parse.urlencode(params), '') - return urllib.parse.urlunparse(url_tupple) - - @staticmethod - def get_content_url(user_id, tweet_id, max_position=''): - params = { - 'max_position': max_position, - } - - sub_url = TwitterConfig.conversation_url_form.format(user_id, tweet_id) - url_tupple = (TwitterConfig.protocol, TwitterConfig.top_url, sub_url, '', urllib.parse.urlencode(params), '') - return urllib.parse.urlunparse(url_tupple) - - def get_proxy(self, proxy_key): - proxy = None - while not proxy: - proxy = self.proxy_handler.get(proxy2.Platform.TWITTER, proxy_key) - if not proxy: - time.sleep(1) - - return proxy - - def get_page(self, url, is_runner, proc_id): - headers = { - 'user-agent': 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/46.0.2490.86 Safari/537.36', - 'Accept-Language': 'ko-KR,ko;q=0.8,en-US;q=0.6,en;q=0.4', - } - proxy_key = '{}-{}'.format('runner' if is_runner else 'content', proc_id) - if proxy_key not in self.proxy: - # self.proxy[proxy_key] = base.proxy.get_proxy_for_requests() - self.proxy[proxy_key] = self.get_proxy(proxy_key) - - resp = None - while True: - try: - resp = requests.get(url, headers=headers, proxies=self.proxy[proxy_key], timeout=3) - except Exception as e: - if self.proxy[proxy_key] == (None, None): - break - - # print('[{}] proxy {} is expired. ({})'.format(proc_id, self.proxy[proxy_key], e)) - # base.proxy.set_proxy_expired(self.proxy[proxy_key]) - self.proxy_handler.set_proxy_blocked(self.proxy[proxy_key]['ip'], self.proxy[proxy_key]['port'], proxy2.Platform.TWITTER) - # self.proxy[proxy_key] = base.proxy.get_proxy_for_requests() - self.proxy[proxy_key] = self.get_proxy(proxy_key) - else: - break - - return resp - - def get_page_data(self, url, is_runner, proc_id): - for retry_cnt in range(5): - # get response - resp = self.get_page(url, is_runner, proc_id) - if not resp: - break - - # check response - if resp.status_code == 404: - break - elif resp.status_code != 200: - print('[WARNING] content_get code {}'.format(resp.status_code)) - continue - - # parsing result - j = json.loads(resp.text) - if j['new_latent_count'] <= 0: - proxy_key = '{}-{}'.format('runner' if is_runner else 'content', proc_id) - # self.proxy[proxy_key] = base.proxy.get_proxy_for_requests() - self.proxy[proxy_key] = self.get_proxy(proxy_key) - continue - else: - return j - - return { - 'items_html': '', - 'has_more_items': False, - } - - def runner_proc(self, proc_id, content_queue, result_queue, config): - try: - print('[{}] {} to {} runner thread start'.format(proc_id, config.start_str, config.end_str)) - - b_continue = True - min_tweet_id = None - max_tweet_id = None - max_position = '' - tweet_count = 0 - - while b_continue: - url = self.get_timeline_url(config.keywords[0], config.start_str, config.end_str, max_position) - j = self.get_page_data(url, True, proc_id) - soup = bs4.BeautifulSoup(j['items_html'], 'lxml') - tweet_tags = soup.select("div.tweet") - - tweet_ids = [] - for tw in tweet_tags: - tweet = TweetParser.parse(tw, config.keyword_id) - tweet_ids.append(tweet.tweet_id) - - if tweet.is_reply is True: - # print(' ## {}: {}...'.format(tweet.user_name, tweet.text[:20])) - continue - - if tweet.reply_cnt > 0: - self.insert_content_pool(proc_id, content_queue, tweet, tweet) - self.db_helper.insert_tweet(tweet, config.db_num) - - # print('{} {}: {}...'.format(tweet.created_at, tweet.user_name, tweet.text[:20])) - print('[{}] body {} ({}) [{}]'.format(proc_id, tweet.top_link, tweet.created_at, 'ok')) - - count = len(tweet_tags) - tweet_count += count - - b_continue = count > 0 - # b_continue = j['has_more_items'] - if b_continue: - if min_tweet_id is None: - min_tweet_id = tweet_ids[0] - max_tweet_id = tweet_ids[-1] - - if 'min_position' in j: - max_position = j['min_position'] - else: - max_position = 'TWEET-{}-{}'.format(max_tweet_id, min_tweet_id) - - print('[{}] {} to {} runner thread finished {}'.format(proc_id, config.start_str, config.end_str, tweet_count)) - result_queue.put({ - 'proc_id': proc_id, - 'count': tweet_count, - }) - # self.runner_processing[proc_id].value = False - except Exception as e: - logger.log(e, logger.LogLevel.ERROR) - - return proc_id, tweet_count, - - @staticmethod - def insert_content_pool(proc_id: int, qu, tweet: Tweet, tweet_top: Tweet): - # print(' [{}] pool insert: {} ({})'.format(proc_id, tweet.text[:20] if tweet.text else '', tweet.tweet_link)) - qu.put((tweet, tweet_top,)) - - def get_content(self, content_queue): - sleep_time = time.time() - while True: - try: - parent_tw, top_tw, = content_queue.get(block=True, timeout=2) - except Exception as e: - if not self.runner_finished_queue.empty() and time.time()-sleep_time > 15: - break - else: - continue - else: - return parent_tw, top_tw, - - return None, None, - - def content_proc(self, proc_id, content_queue, result_queue): - print('[{}] content thread start'.format(proc_id)) - - tweet_count = 0 - while True: - parent_tw, top_tw, = self.get_content(content_queue) - if not parent_tw: - break - - # print(' [{}] <<< parent : {} ({})'.format(proc_id, parent_tw.text[:20], parent_tw.tweet_link)) - - max_position = '' - - b_continue = True - while b_continue: - url = self.get_content_url(parent_tw.user_id, parent_tw.tweet_id, max_position) - j = self.get_page_data(url, False, proc_id) - soup = bs4.BeautifulSoup(j['items_html'], 'lxml') - - reply_container_tags = soup.select('li.ThreadedConversation') - reply_container_tags += TweetParser.get_lone_container(soup, parent_tw) - for container_tags in reply_container_tags: - tweet_tags = container_tags.select('div.tweet') - if len(tweet_tags) > 0: - tweet = TweetParser.parse(tweet_tags[0], self.default_config.keyword_id, parent_tw.depth+1, top_tw) - # print('[{}]>>> {} {}: {} ({}) ({})'.format(proc_id, tweet.created_at, tweet.user_name, tweet.text[:20], tweet.depth, tweet.tweet_link)) - print('[{}] reply {} [{}]'.format(proc_id, tweet.top_link, 'ok')) - self.insert_content_pool(proc_id, content_queue, tweet, top_tw) - self.db_helper.insert_tweet(tweet, self.default_config.db_num) - tweet_count += 1 - - b_continue = j['has_more_items'] - if b_continue: - max_position = j['min_position'] - - result_queue.put({ - 'proc_id': proc_id, - 'count': tweet_count, - }) - - print('[{}] content thread finished'.format(proc_id)) - return proc_id, tweet_count, - - def debug_content(self): - content_qu = queue.Queue() - runner_result_qu = queue.Queue() - content_result_qu = queue.Queue() - - test_tw = Tweet() - # test_tw.tweet_link = 'https://twitter.com/yniold_/status/886863893137678337' - # test_tw.user_id = 'yniold_' - # test_tw.tweet_id = 886863893137678337 - - test_tw.tweet_link = 'https://twitter.com/Awesome_vely/status/888704413111435264' - test_tw.user_id = 'Awesome_vely' - test_tw.tweet_id = 888704413111435264 - - test_tw.text = '?œìž‘' - self.insert_content_pool(0, content_qu, test_tw, test_tw) - - content_threads = [threading.Thread(target=self.content_proc, args=(proc_id, content_qu, content_result_qu)) for proc_id in range(16)] - [th.start() for th in content_threads] - [th.join() for th in content_threads] - - while not content_result_qu.empty(): - res = content_result_qu.get() - print('reply : {}'.format(res)) - - print('end all') - - def test_insert_db(self): - test_tw = Tweet() - test_tw.tweet_link = 'https://twitter.com/moonriver365/status/885797401033818112' - test_tw.user_id = 'moonriver365' - test_tw.tweet_id = 885797401033818112 - for _ in range(5): - self.db_helper.insert_tweet(test_tw, self.default_config.db_num) - - def debug(self): - if base.baseclasses.is_debug: - ## check proxy - # base.proxy.get_proxy_from_file('proxy.txt') - # proxy = {'https': 'http://45.56.86.93:3128', 'http': 'http://45.56.86.93:3128'} - # base.proxy.set_proxy_expired(proxy) - # return - - ## contents check - self.debug_content() - - # split_config = self.default_config.split() - - # self.test_insert_db() - - print("debug end") - # exit() - - def run(self): - start_time = time.time() - - # run - worker_count = 16 - split_config = self.default_config.split() - - content_qu = queue.Queue() - runner_result_qu = queue.Queue() - content_result_qu = queue.Queue() - - runner_result_cnt = 0 - content_result_cnt = 0 - - content_pool = concurrent.futures.ThreadPoolExecutor(max_workers=worker_count) - [content_pool.submit(self.content_proc, proc_id, content_qu, content_result_qu) for proc_id in range(worker_count)] - - runner_pool = concurrent.futures.ThreadPoolExecutor(max_workers=worker_count) - [runner_pool.submit(self.runner_proc, proc_id, content_qu, runner_result_qu, config) for proc_id, config in enumerate(split_config)] - - runner_pool.shutdown(wait=True) - self.runner_finished_queue.put(True) - content_pool.shutdown(wait=True) - self.db_helper.flush() - - # rerun zero runners - print('restart failed runner') - while not self.runner_finished_queue.empty(): - self.runner_finished_queue.get() - - for retry in range(5): - runner_pool = concurrent.futures.ThreadPoolExecutor(max_workers=worker_count) - runner_result_qu2 = queue.Queue() - b_rerun = False - while not runner_result_qu.empty(): - res = runner_result_qu.get() - runner_result_cnt += res['count'] - proc_id = res['proc_id'] - if res['count'] == 0: - runner_pool.submit(self.runner_proc, proc_id, content_qu, runner_result_qu2, split_config[proc_id]) - b_rerun = True - - while not content_result_qu.empty(): - res = content_result_qu.get() - content_result_cnt += res['count'] - - if b_rerun: - content_pool = concurrent.futures.ThreadPoolExecutor(max_workers=worker_count) - [content_pool.submit(self.content_proc, proc_id, content_qu, content_result_qu) for proc_id in range(worker_count)] - - runner_pool.shutdown(wait=True) - self.runner_finished_queue.put(True) - content_pool.shutdown(wait=True) - self.db_helper.flush() - - runner_result_qu = runner_result_qu2 + while not runner_result_qu.empty(): + res = runner_result_qu.get() + runner_result_cnt += res['count'] - while not runner_result_qu.empty(): - res = runner_result_qu.get() - runner_result_cnt += res['count'] + while not content_result_qu.empty(): + res = content_result_qu.get() + content_result_cnt += res['count'] + + print('total body count: {}'.format(runner_result_cnt)) + print('total reply count: {}'.format(content_result_cnt)) - while not content_result_qu.empty(): - res = content_result_qu.get() - content_result_cnt += res['count'] - - print('total body count: {}'.format(runner_result_cnt)) - print('total reply count: {}'.format(content_result_cnt)) + # print running time + delta = time.time() - start_time + m, s = divmod(delta, 60) + h, m = divmod(m, 60) + print("finished all {}:{:02d}:{:02d} ".format(int(h), int(m), int(s))) + + def start(self): + + # self.debug() + # return + + # run + while True: + self.default_config.reload_realtime(self.before_day) + self.run() - # print running time - delta = time.time() - start_time - m, s = divmod(delta, 60) - h, m = divmod(m, 60) - print("finished all {}:{:02d}:{:02d} ".format(int(h), int(m), int(s))) - - def start(self): - - # self.debug() - # return - - # run - while True: - self.default_config.reload_realtime(self.before_day) - self.run() - - if not self.default_config.realtime: - break + if not self.default_config.realtime: + break