from twitter.twconfig import TwitterConfig from twitter.twdbhelper import TwitterDBHelper from twitter.tweet import Tweet from twitter.twparser import TweetParser import base.proxy import base.proxy2 as proxy2 import base.baseclasses import base.logger as logger import requests import bs4 import json import urllib import concurrent.futures import threading import queue import time class TwitterCrawler: def __init__(self): self.default_config = TwitterConfig() self.db_helper = TwitterDBHelper() self.proxy = {} self.proxy_handler = proxy2.Proxy2Handler() self.before_day = None self.runner_finished_queue = queue.Queue() def set_arguments(self, browser, keyword_id, db_num, before_day, until_page): params = self.db_helper.get_param(keyword_id) self.before_day = before_day self.default_config.set_param(keyword_id, db_num, params) @staticmethod def get_timeline_url(query, start_str, end_str, max_position=''): params = { 'f': 'tweets', 'vertical': 'default', 'src': 'typd', 'q': '{} since:{} until:{}'.format(query, start_str, end_str), 'language': 'en', 'max_position': max_position, } url_tupple = (TwitterConfig.protocol, TwitterConfig.top_url, TwitterConfig.search_url, '', urllib.parse.urlencode(params), '') return urllib.parse.urlunparse(url_tupple) @staticmethod def get_content_url(user_id, tweet_id, max_position=''): params = { 'max_position': max_position, } sub_url = TwitterConfig.conversation_url_form.format(user_id, tweet_id) url_tupple = (TwitterConfig.protocol, TwitterConfig.top_url, sub_url, '', urllib.parse.urlencode(params), '') return urllib.parse.urlunparse(url_tupple) def get_proxy(self, proxy_key): proxy = None while not proxy: proxy = self.proxy_handler.get(proxy2.Platform.TWITTER, proxy_key) if not proxy: time.sleep(1) return proxy def get_page(self, url, is_runner, proc_id): headers = { 'user-agent': 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/46.0.2490.86 Safari/537.36', 'Accept-Language': 'ko-KR,ko;q=0.8,en-US;q=0.6,en;q=0.4', } proxy_key = '{}-{}'.format('runner' if is_runner else 'content', proc_id) if proxy_key not in self.proxy: # self.proxy[proxy_key] = base.proxy.get_proxy_for_requests() self.proxy[proxy_key] = self.get_proxy(proxy_key) resp = None while True: try: resp = requests.get(url, headers=headers, proxies=self.proxy[proxy_key], timeout=3) except Exception as e: if self.proxy[proxy_key] == (None, None): break # print('[{}] proxy {} is expired. ({})'.format(proc_id, self.proxy[proxy_key], e)) # base.proxy.set_proxy_expired(self.proxy[proxy_key]) self.proxy_handler.set_proxy_blocked(self.proxy[proxy_key]['ip'], self.proxy[proxy_key]['port'], proxy2.Platform.TWITTER) # self.proxy[proxy_key] = base.proxy.get_proxy_for_requests() self.proxy[proxy_key] = self.get_proxy(proxy_key) else: break return resp def get_page_data(self, url, is_runner, proc_id): for retry_cnt in range(5): # get response resp = self.get_page(url, is_runner, proc_id) if not resp: break # check response if resp.status_code == 404: break elif resp.status_code != 200: print('[WARNING] content_get code {}'.format(resp.status_code)) continue # parsing result j = json.loads(resp.text) if j['new_latent_count'] <= 0: proxy_key = '{}-{}'.format('runner' if is_runner else 'content', proc_id) # self.proxy[proxy_key] = base.proxy.get_proxy_for_requests() self.proxy[proxy_key] = self.get_proxy(proxy_key) continue else: return j return { 'items_html': '', 'has_more_items': False, } def runner_proc(self, proc_id, content_queue, result_queue, config): try: print('[{}] {} to {} runner thread start'.format(proc_id, config.start_str, config.end_str)) b_continue = True min_tweet_id = None max_tweet_id = None max_position = '' tweet_count = 0 while b_continue: url = self.get_timeline_url(config.keywords[0], config.start_str, config.end_str, max_position) j = self.get_page_data(url, True, proc_id) soup = bs4.BeautifulSoup(j['items_html'], 'lxml') tweet_tags = soup.select("div.tweet") tweet_ids = [] for tw in tweet_tags: tweet = TweetParser.parse(tw, config.keyword_id) tweet_ids.append(tweet.tweet_id) if tweet.is_reply is True: # print(' ## {}: {}...'.format(tweet.user_name, tweet.text[:20])) continue if tweet.reply_cnt > 0: self.insert_content_pool(proc_id, content_queue, tweet, tweet) self.db_helper.insert_tweet(tweet, config.db_num) # print('{} {}: {}...'.format(tweet.created_at, tweet.user_name, tweet.text[:20])) print('[{}] body {} ({}) [{}]'.format(proc_id, tweet.top_link, tweet.created_at, 'ok')) count = len(tweet_tags) tweet_count += count b_continue = count > 0 # b_continue = j['has_more_items'] if b_continue: if min_tweet_id is None: min_tweet_id = tweet_ids[0] max_tweet_id = tweet_ids[-1] if 'min_position' in j: max_position = j['min_position'] else: max_position = 'TWEET-{}-{}'.format(max_tweet_id, min_tweet_id) print('[{}] {} to {} runner thread finished {}'.format(proc_id, config.start_str, config.end_str, tweet_count)) result_queue.put({ 'proc_id': proc_id, 'count': tweet_count, }) # self.runner_processing[proc_id].value = False except Exception as e: logger.log(e, logger.LogLevel.ERROR) return proc_id, tweet_count, @staticmethod def insert_content_pool(proc_id: int, qu, tweet: Tweet, tweet_top: Tweet): # print(' [{}] pool insert: {} ({})'.format(proc_id, tweet.text[:20] if tweet.text else '', tweet.tweet_link)) qu.put((tweet, tweet_top,)) def get_content(self, content_queue): sleep_time = time.time() while True: try: parent_tw, top_tw, = content_queue.get(block=True, timeout=2) except Exception as e: if not self.runner_finished_queue.empty() and time.time()-sleep_time > 15: break else: continue else: return parent_tw, top_tw, return None, None, def content_proc(self, proc_id, content_queue, result_queue): print('[{}] content thread start'.format(proc_id)) tweet_count = 0 while True: parent_tw, top_tw, = self.get_content(content_queue) if not parent_tw: break # print(' [{}] <<< parent : {} ({})'.format(proc_id, parent_tw.text[:20], parent_tw.tweet_link)) max_position = '' b_continue = True while b_continue: url = self.get_content_url(parent_tw.user_id, parent_tw.tweet_id, max_position) j = self.get_page_data(url, False, proc_id) soup = bs4.BeautifulSoup(j['items_html'], 'lxml') reply_container_tags = soup.select('li.ThreadedConversation') reply_container_tags += TweetParser.get_lone_container(soup, parent_tw) for container_tags in reply_container_tags: tweet_tags = container_tags.select('div.tweet') if len(tweet_tags) > 0: tweet = TweetParser.parse(tweet_tags[0], self.default_config.keyword_id, parent_tw.depth+1, top_tw) # print('[{}]>>> {} {}: {} ({}) ({})'.format(proc_id, tweet.created_at, tweet.user_name, tweet.text[:20], tweet.depth, tweet.tweet_link)) print('[{}] reply {} [{}]'.format(proc_id, tweet.top_link, 'ok')) self.insert_content_pool(proc_id, content_queue, tweet, top_tw) self.db_helper.insert_tweet(tweet, self.default_config.db_num) tweet_count += 1 b_continue = j['has_more_items'] if b_continue: max_position = j['min_position'] result_queue.put({ 'proc_id': proc_id, 'count': tweet_count, }) print('[{}] content thread finished'.format(proc_id)) return proc_id, tweet_count, def debug_content(self): content_qu = queue.Queue() runner_result_qu = queue.Queue() content_result_qu = queue.Queue() test_tw = Tweet() # test_tw.tweet_link = 'https://twitter.com/yniold_/status/886863893137678337' # test_tw.user_id = 'yniold_' # test_tw.tweet_id = 886863893137678337 test_tw.tweet_link = 'https://twitter.com/Awesome_vely/status/888704413111435264' test_tw.user_id = 'Awesome_vely' test_tw.tweet_id = 888704413111435264 test_tw.text = '?œìž‘' self.insert_content_pool(0, content_qu, test_tw, test_tw) content_threads = [threading.Thread(target=self.content_proc, args=(proc_id, content_qu, content_result_qu)) for proc_id in range(16)] [th.start() for th in content_threads] [th.join() for th in content_threads] while not content_result_qu.empty(): res = content_result_qu.get() print('reply : {}'.format(res)) print('end all') def test_insert_db(self): test_tw = Tweet() test_tw.tweet_link = 'https://twitter.com/moonriver365/status/885797401033818112' test_tw.user_id = 'moonriver365' test_tw.tweet_id = 885797401033818112 for _ in range(5): self.db_helper.insert_tweet(test_tw, self.default_config.db_num) def debug(self): if base.baseclasses.is_debug: ## check proxy # base.proxy.get_proxy_from_file('proxy.txt') # proxy = {'https': 'http://45.56.86.93:3128', 'http': 'http://45.56.86.93:3128'} # base.proxy.set_proxy_expired(proxy) # return ## contents check self.debug_content() # split_config = self.default_config.split() # self.test_insert_db() print("debug end") # exit() def run(self): start_time = time.time() # run worker_count = 16 split_config = self.default_config.split() content_qu = queue.Queue() runner_result_qu = queue.Queue() content_result_qu = queue.Queue() runner_result_cnt = 0 content_result_cnt = 0 content_pool = concurrent.futures.ThreadPoolExecutor(max_workers=worker_count) [content_pool.submit(self.content_proc, proc_id, content_qu, content_result_qu) for proc_id in range(worker_count)] runner_pool = concurrent.futures.ThreadPoolExecutor(max_workers=worker_count) [runner_pool.submit(self.runner_proc, proc_id, content_qu, runner_result_qu, config) for proc_id, config in enumerate(split_config)] runner_pool.shutdown(wait=True) self.runner_finished_queue.put(True) content_pool.shutdown(wait=True) self.db_helper.flush() # rerun zero runners print('restart failed runner') while not self.runner_finished_queue.empty(): self.runner_finished_queue.get() for retry in range(5): runner_pool = concurrent.futures.ThreadPoolExecutor(max_workers=worker_count) runner_result_qu2 = queue.Queue() b_rerun = False while not runner_result_qu.empty(): res = runner_result_qu.get() runner_result_cnt += res['count'] proc_id = res['proc_id'] if res['count'] == 0: runner_pool.submit(self.runner_proc, proc_id, content_qu, runner_result_qu2, split_config[proc_id]) b_rerun = True while not content_result_qu.empty(): res = content_result_qu.get() content_result_cnt += res['count'] if b_rerun: content_pool = concurrent.futures.ThreadPoolExecutor(max_workers=worker_count) [content_pool.submit(self.content_proc, proc_id, content_qu, content_result_qu) for proc_id in range(worker_count)] runner_pool.shutdown(wait=True) self.runner_finished_queue.put(True) content_pool.shutdown(wait=True) self.db_helper.flush() runner_result_qu = runner_result_qu2 while not runner_result_qu.empty(): res = runner_result_qu.get() runner_result_cnt += res['count'] while not content_result_qu.empty(): res = content_result_qu.get() content_result_cnt += res['count'] print('total body count: {}'.format(runner_result_cnt)) print('total reply count: {}'.format(content_result_cnt)) # print running time delta = time.time() - start_time m, s = divmod(delta, 60) h, m = divmod(m, 60) print("finished all {}:{:02d}:{:02d} ".format(int(h), int(m), int(s))) def start(self): # self.debug() # return # run while True: self.default_config.reload_realtime(self.before_day) self.run() if not self.default_config.realtime: break