Files
clients/WebBasedCrawler/twitter/twittercrawl.py

388 lines
12 KiB
Python
Raw Blame History

from twitter.twconfig import TwitterConfig
from twitter.twdbhelper import TwitterDBHelper
from twitter.tweet import Tweet
from twitter.twparser import TweetParser
import base.proxy
import base.proxy2 as proxy2
import base.baseclasses
import base.logger as logger
import base.debug as dbg
import requests
import bs4
import json
import urllib
import concurrent.futures
import threading
import queue
import time
class TwitterCrawler:
def __init__(self):
self.default_config = TwitterConfig()
self.db_helper = TwitterDBHelper()
self.proxy = {}
self.proxy_handler = proxy2.Proxy2Handler()
self.before_day = None
self.runner_finished_queue = queue.Queue()
def set_arguments(self, browser, keyword_id, db_num, before_day, until_page):
params = self.db_helper.get_param(keyword_id)
self.before_day = before_day
self.default_config.set_param(keyword_id, db_num, params)
@staticmethod
def get_timeline_url(query, start_str, end_str, max_position=''):
params = {
'f': 'tweets',
'vertical': 'default',
'src': 'typd',
'q': '{} since:{} until:{}'.format(query, start_str, end_str),
'language': 'en',
'max_position': max_position,
}
url_tupple = (TwitterConfig.protocol, TwitterConfig.top_url, TwitterConfig.search_url, '', urllib.parse.urlencode(params), '')
return urllib.parse.urlunparse(url_tupple)
@staticmethod
def get_content_url(user_id, tweet_id, max_position=''):
params = {
'max_position': max_position,
}
sub_url = TwitterConfig.conversation_url_form.format(user_id, tweet_id)
url_tupple = (TwitterConfig.protocol, TwitterConfig.top_url, sub_url, '', urllib.parse.urlencode(params), '')
return urllib.parse.urlunparse(url_tupple)
def get_proxy(self, proxy_key):
proxy = None
while not proxy:
proxy = self.proxy_handler.get(proxy2.Platform.TWITTER, proxy_key)
if not proxy:
time.sleep(1)
return proxy
def get_page(self, url, is_runner, proc_id):
headers = {
'user-agent': 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/46.0.2490.86 Safari/537.36',
'Accept-Language': 'ko-KR,ko;q=0.8,en-US;q=0.6,en;q=0.4',
}
proxy_key = '{}-{}'.format('runner' if is_runner else 'content', proc_id)
if proxy_key not in self.proxy:
# self.proxy[proxy_key] = base.proxy.get_proxy_for_requests()
self.proxy[proxy_key] = self.get_proxy(proxy_key)
resp = None
while True:
try:
resp = requests.get(url, headers=headers, proxies=self.proxy[proxy_key], timeout=3)
except Exception as e:
if self.proxy[proxy_key] == (None, None):
break
# print('[{}] proxy {} is expired. ({})'.format(proc_id, self.proxy[proxy_key], e))
# base.proxy.set_proxy_expired(self.proxy[proxy_key])
self.proxy_handler.set_proxy_blocked(self.proxy[proxy_key]['ip'], self.proxy[proxy_key]['port'], proxy2.Platform.TWITTER)
# self.proxy[proxy_key] = base.proxy.get_proxy_for_requests()
self.proxy[proxy_key] = self.get_proxy(proxy_key)
else:
break
return resp
def get_page_data(self, url, is_runner, proc_id):
for retry_cnt in range(5):
# get response
resp = self.get_page(url, is_runner, proc_id)
if not resp:
break
# check response
if resp.status_code == 404:
break
elif resp.status_code != 200:
print('[WARNING] content_get code {}'.format(resp.status_code))
continue
# parsing result
j = json.loads(resp.text)
if j['new_latent_count'] <= 0:
proxy_key = '{}-{}'.format('runner' if is_runner else 'content', proc_id)
# self.proxy[proxy_key] = base.proxy.get_proxy_for_requests()
self.proxy[proxy_key] = self.get_proxy(proxy_key)
continue
else:
return j
return {
'items_html': '',
'has_more_items': False,
}
def runner_proc(self, proc_id, content_queue, result_queue, config):
try:
print('[{}] {} to {} runner thread start'.format(proc_id, config.start_str, config.end_str))
b_continue = True
min_tweet_id = None
max_tweet_id = None
max_position = ''
tweet_count = 0
while b_continue:
url = self.get_timeline_url(config.keywords[0], config.start_str, config.end_str, max_position)
j = self.get_page_data(url, True, proc_id)
soup = bs4.BeautifulSoup(j['items_html'], 'lxml')
tweet_tags = soup.select("div.tweet")
tweet_ids = []
for tw in tweet_tags:
tweet = TweetParser.parse(tw, config.keyword_id)
tweet_ids.append(tweet.tweet_id)
if tweet.is_reply is True:
# print(' ## {}: {}...'.format(tweet.user_name, tweet.text[:20]))
continue
if tweet.reply_cnt > 0:
self.insert_content_pool(proc_id, content_queue, tweet, tweet)
self.db_helper.insert_tweet(tweet, config.db_num)
# print('{} {}: {}...'.format(tweet.created_at, tweet.user_name, tweet.text[:20]))
print('[{}] body {} ({}) [{}]'.format(proc_id, tweet.top_link, tweet.created_at, 'ok'))
count = len(tweet_tags)
tweet_count += count
b_continue = count > 0
# b_continue = j['has_more_items']
if b_continue:
if min_tweet_id is None:
min_tweet_id = tweet_ids[0]
max_tweet_id = tweet_ids[-1]
if 'min_position' in j:
max_position = j['min_position']
else:
max_position = 'TWEET-{}-{}'.format(max_tweet_id, min_tweet_id)
print('[{}] {} to {} runner thread finished {}'.format(proc_id, config.start_str, config.end_str, tweet_count))
result_queue.put({
'proc_id': proc_id,
'count': tweet_count,
})
# self.runner_processing[proc_id].value = False
except Exception as e:
dbg.print_exception(e)
return proc_id, tweet_count,
@staticmethod
def insert_content_pool(proc_id: int, qu, tweet: Tweet, tweet_top: Tweet):
# print(' [{}] pool insert: {} ({})'.format(proc_id, tweet.text[:20] if tweet.text else '', tweet.tweet_link))
qu.put((tweet, tweet_top,))
def get_content(self, content_queue):
sleep_time = time.time()
while True:
try:
parent_tw, top_tw, = content_queue.get(block=True, timeout=2)
except Exception as e:
if not self.runner_finished_queue.empty() and time.time()-sleep_time > 15:
break
else:
continue
else:
return parent_tw, top_tw,
return None, None,
def content_proc(self, proc_id, content_queue, result_queue):
print('[{}] content thread start'.format(proc_id))
tweet_count = 0
while True:
parent_tw, top_tw, = self.get_content(content_queue)
if not parent_tw:
break
# print(' [{}] <<< parent : {} ({})'.format(proc_id, parent_tw.text[:20], parent_tw.tweet_link))
max_position = ''
b_continue = True
while b_continue:
url = self.get_content_url(parent_tw.user_id, parent_tw.tweet_id, max_position)
j = self.get_page_data(url, False, proc_id)
soup = bs4.BeautifulSoup(j['items_html'], 'lxml')
reply_container_tags = soup.select('li.ThreadedConversation')
reply_container_tags += TweetParser.get_lone_container(soup, parent_tw)
for container_tags in reply_container_tags:
tweet_tags = container_tags.select('div.tweet')
if len(tweet_tags) > 0:
tweet = TweetParser.parse(tweet_tags[0], self.default_config.keyword_id, parent_tw.depth+1, top_tw)
# print('[{}]>>> {} {}: {} ({}) ({})'.format(proc_id, tweet.created_at, tweet.user_name, tweet.text[:20], tweet.depth, tweet.tweet_link))
print('[{}] reply {} [{}]'.format(proc_id, tweet.top_link, 'ok'))
self.insert_content_pool(proc_id, content_queue, tweet, top_tw)
self.db_helper.insert_tweet(tweet, self.default_config.db_num)
tweet_count += 1
b_continue = j['has_more_items']
if b_continue:
max_position = j['min_position']
result_queue.put({
'proc_id': proc_id,
'count': tweet_count,
})
print('[{}] content thread finished'.format(proc_id))
return proc_id, tweet_count,
def debug_content(self):
content_qu = queue.Queue()
runner_result_qu = queue.Queue()
content_result_qu = queue.Queue()
test_tw = Tweet()
# test_tw.tweet_link = 'https://twitter.com/yniold_/status/886863893137678337'
# test_tw.user_id = 'yniold_'
# test_tw.tweet_id = 886863893137678337
test_tw.tweet_link = 'https://twitter.com/Awesome_vely/status/888704413111435264'
test_tw.user_id = 'Awesome_vely'
test_tw.tweet_id = 888704413111435264
test_tw.text = '?<3F><><EFBFBD><EFBFBD>'
self.insert_content_pool(0, content_qu, test_tw, test_tw)
content_threads = [threading.Thread(target=self.content_proc, args=(proc_id, content_qu, content_result_qu)) for proc_id in range(16)]
[th.start() for th in content_threads]
[th.join() for th in content_threads]
while not content_result_qu.empty():
res = content_result_qu.get()
print('reply : {}'.format(res))
print('end all')
def test_insert_db(self):
test_tw = Tweet()
test_tw.tweet_link = 'https://twitter.com/moonriver365/status/885797401033818112'
test_tw.user_id = 'moonriver365'
test_tw.tweet_id = 885797401033818112
for _ in range(5):
self.db_helper.insert_tweet(test_tw, self.default_config.db_num)
def debug(self):
if base.baseclasses.is_debug:
## check proxy
# base.proxy.get_proxy_from_file('proxy.txt')
# proxy = {'https': 'http://45.56.86.93:3128', 'http': 'http://45.56.86.93:3128'}
# base.proxy.set_proxy_expired(proxy)
# return
## contents check
self.debug_content()
# split_config = self.default_config.split()
# self.test_insert_db()
print("debug end")
# exit()
def run(self):
start_time = time.time()
# run
worker_count = 1
split_config = self.default_config.split()
content_qu = queue.Queue()
runner_result_qu = queue.Queue()
content_result_qu = queue.Queue()
runner_result_cnt = 0
content_result_cnt = 0
content_pool = concurrent.futures.ThreadPoolExecutor(max_workers=worker_count)
[content_pool.submit(self.content_proc, proc_id, content_qu, content_result_qu) for proc_id in range(worker_count)]
runner_pool = concurrent.futures.ThreadPoolExecutor(max_workers=worker_count)
[runner_pool.submit(self.runner_proc, proc_id, content_qu, runner_result_qu, config) for proc_id, config in enumerate(split_config)]
runner_pool.shutdown(wait=True)
self.runner_finished_queue.put(True)
content_pool.shutdown(wait=True)
self.db_helper.flush()
# rerun zero runners
print('restart failed runner')
while not self.runner_finished_queue.empty():
self.runner_finished_queue.get()
for retry in range(5):
runner_pool = concurrent.futures.ThreadPoolExecutor(max_workers=worker_count)
runner_result_qu2 = queue.Queue()
b_rerun = False
while not runner_result_qu.empty():
res = runner_result_qu.get()
runner_result_cnt += res['count']
proc_id = res['proc_id']
if res['count'] == 0:
runner_pool.submit(self.runner_proc, proc_id, content_qu, runner_result_qu2, split_config[proc_id])
b_rerun = True
while not content_result_qu.empty():
res = content_result_qu.get()
content_result_cnt += res['count']
if b_rerun:
content_pool = concurrent.futures.ThreadPoolExecutor(max_workers=worker_count)
[content_pool.submit(self.content_proc, proc_id, content_qu, content_result_qu) for proc_id in range(worker_count)]
runner_pool.shutdown(wait=True)
self.runner_finished_queue.put(True)
content_pool.shutdown(wait=True)
self.db_helper.flush()
runner_result_qu = runner_result_qu2
while not runner_result_qu.empty():
res = runner_result_qu.get()
runner_result_cnt += res['count']
while not content_result_qu.empty():
res = content_result_qu.get()
content_result_cnt += res['count']
print('total body count: {}'.format(runner_result_cnt))
print('total reply count: {}'.format(content_result_cnt))
# print running time
delta = time.time() - start_time
m, s = divmod(delta, 60)
h, m = divmod(m, 60)
print("finished all {}:{:02d}:{:02d} ".format(int(h), int(m), int(s)))
def start(self):
# self.debug()
# return
# run
while True:
self.default_config.reload_realtime(self.before_day)
self.run()
if not self.default_config.realtime:
break