Files
clients/WebBasedCrawler/twitter/twittercrawl.py
mjjo fb7b3949d3 - requirements.txt 추가
- print 구문 주석
2017-07-27 11:27:27 +09:00

290 lines
11 KiB
Python

from twitter.twconfig import TwitterConfig
from twitter.twdbhelper import TwitterDBHelper
from twitter.tweet import Tweet
from twitter.twparser import TweetParser
import base.proxy
import base.baseclasses
import requests
import bs4
import json
import urllib
import threading
import queue
import time
class TwitterCrawler():
def __init__(self):
self.default_config = TwitterConfig()
self.db_helper = TwitterDBHelper()
def set_arguments(self, browser, keyword_id, db_num, before_day, until_page):
params = self.db_helper.get_param(keyword_id)
self.default_config.set_param(keyword_id, db_num, params)
@staticmethod
def get_timeline_url(query, start_str, end_str, max_position=''):
params = {
'f': 'tweets',
'vertical': 'default',
'src': 'typd',
'q': '{} since:{} until:{}'.format(query, start_str, end_str),
'language': 'en',
'max_position': max_position,
}
url_tupple = (TwitterConfig.protocol, TwitterConfig.top_url, TwitterConfig.search_url, '', urllib.parse.urlencode(params), '')
return urllib.parse.urlunparse(url_tupple)
@staticmethod
def get_content_url(user_id, tweet_id, max_position=''):
params = {
'max_position': max_position,
}
sub_url = TwitterConfig.conversation_url_form.format(user_id, tweet_id)
url_tupple = (TwitterConfig.protocol, TwitterConfig.top_url, sub_url, '', urllib.parse.urlencode(params), '')
return urllib.parse.urlunparse(url_tupple)
@staticmethod
def get_page(url, proc_id):
headers = {
'user-agent': 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/46.0.2490.86 Safari/537.36',
'Accept-Language': 'ko-KR,ko;q=0.8,en-US;q=0.6,en;q=0.4',
}
# if proxies is None:
proxies = base.proxy.get_proxy_for_requests()
resp = None
while True:
try:
resp = requests.get(url, headers=headers, proxies=proxies, timeout=3)
except Exception as e:
if proxies == (None, None):
break
print('[{}] proxy {} is expired. ({})'.format(proc_id, proxies, e))
base.proxy.set_proxy_expired(proxies)
proxies = base.proxy.get_proxy_for_requests()
else:
break
return resp
def runner_proc(self, proc_id, content_queue, result_queue, config):
print('{} to {} runner thread start'.format(config.start_str, config.end_str))
b_continue = True
min_tweet_id = None
max_tweet_id = None
max_position = ''
tweet_count = 0
while b_continue:
if min_tweet_id is not None:
max_position = 'TWEET-{}-{}'.format(max_tweet_id, min_tweet_id)
url = self.get_timeline_url(config.keywords[0], config.start_str, config.end_str, max_position)
resp = self.get_page(url, proc_id)
if resp is None:
break
j = json.loads(resp.content.decode('utf-8'))
soup = bs4.BeautifulSoup(j['items_html'], 'lxml')
tweet_tags = soup.select("div.tweet")
for tw in tweet_tags:
tweet = TweetParser.parse(tw, config.keyword_id)
if tweet.is_reply is True:
# print(' ## {}: {}...'.format(tweet.user_name, tweet.text[:20]))
continue
if tweet.reply_cnt > 0:
self.insert_content_pool(proc_id, content_queue, tweet, tweet)
self.db_helper.insert_tweet(tweet, config.db_num)
# print('{} {}: {}...'.format(tweet.created_at, tweet.user_name, tweet.text[:20]))
count = len(tweet_tags)
if count == 0:
break
if min_tweet_id is None:
min_tweet_id = tweet_tags[0].attrs['data-item-id']
max_tweet_id = tweet_tags[-1].attrs['data-item-id']
tweet_count += count
print('{} to {} runner thread finished {}'.format(config.start_str, config.end_str, tweet_count))
result_queue.put((proc_id, tweet_count, ))
# self.runner_processing[proc_id].value = False
return proc_id, tweet_count,
@staticmethod
def insert_content_pool(proc_id: int, qu, tweet: Tweet, tweet_top: Tweet):
# print(' [{}] pool insert: {} ({})'.format(proc_id, tweet.text[:20] if tweet.text else '', tweet.tweet_link))
qu.put((tweet, tweet_top,))
@staticmethod
def get_content(content_queue):
sleep_time = time.time()
while True:
try:
parent_tw, top_tw, = content_queue.get(block=True, timeout=2)
except Exception as e:
if time.time()-sleep_time > 60:
break
else:
continue
else:
return parent_tw, top_tw,
return None, None,
def content_proc(self, proc_id, content_queue, result_queue):
print('[{}] content thread start'.format(proc_id))
tweet_count = 0
while True:
parent_tw, top_tw, = self.get_content(content_queue)
if not parent_tw:
break
# print(' [{}] <<< parent : {} ({})'.format(proc_id, parent_tw.text[:20], parent_tw.tweet_link))
max_position = ''
b_continue = True
while b_continue:
url = self.get_content_url(parent_tw.user_id, parent_tw.tweet_id, max_position)
resp = self.get_page(url, proc_id)
if resp is None or resp.status_code == 404:
break
elif resp.status_code != 200:
print('[WARNING] content_get code {}'.format(resp.status_code))
continue
j = json.loads(resp.content.decode('utf-8'))
soup = bs4.BeautifulSoup(j['items_html'], 'lxml')
reply_container_tags = soup.select('li.ThreadedConversation')
reply_container_tags += TweetParser.get_lone_container(soup, parent_tw)
for container_tags in reply_container_tags:
tweet_tags = container_tags.select('div.tweet')
if len(tweet_tags) > 0:
tweet = TweetParser.parse(tweet_tags[0], self.default_config.keyword_id, parent_tw.depth+1, top_tw)
# print('[{}]>>> {} {}: {} ({}) ({})'.format(proc_id, tweet.created_at, tweet.user_name, tweet.text[:20], tweet.depth, tweet.tweet_link))
self.insert_content_pool(proc_id, content_queue, tweet, top_tw)
self.db_helper.insert_tweet(tweet, self.default_config.db_num)
tweet_count += 1
b_continue = j['has_more_items']
if b_continue:
max_position = j['min_position']
result_queue.put((proc_id, tweet_count))
print('[{}] content thread finished'.format(proc_id))
return proc_id, tweet_count,
def debug_content(self):
content_qu = queue.Queue()
runner_result_qu = queue.Queue()
content_result_qu = queue.Queue()
test_tw = Tweet()
# test_tw.tweet_link = 'https://twitter.com/yniold_/status/886863893137678337'
# test_tw.user_id = 'yniold_'
# test_tw.tweet_id = 886863893137678337
test_tw.tweet_link = 'https://twitter.com/Awesome_vely/status/888704413111435264'
test_tw.user_id = 'Awesome_vely'
test_tw.tweet_id = 888704413111435264
test_tw.text = '시작'
self.insert_content_pool(0, content_qu, test_tw, test_tw)
content_threads = [threading.Thread(target=self.content_proc, args=(proc_id, content_qu, content_result_qu)) for proc_id in range(16)]
[th.start() for th in content_threads]
[th.join() for th in content_threads]
while not content_result_qu.empty():
res = content_result_qu.get()
print('reply : {}'.format(res))
print('end all')
def test_insert_db(self):
test_tw = Tweet()
test_tw.tweet_link = 'https://twitter.com/moonriver365/status/885797401033818112'
test_tw.user_id = 'moonriver365'
test_tw.tweet_id = 885797401033818112
for _ in range(5):
self.db_helper.insert_tweet(test_tw, self.default_config.db_num)
def debug(self):
if base.baseclasses.is_debug:
## check proxy
# base.proxy.get_proxy_from_file('proxy.txt')
# proxy = {'https': 'http://45.56.86.93:3128', 'http': 'http://45.56.86.93:3128'}
# base.proxy.set_proxy_expired(proxy)
# return
## contents check
self.debug_content()
# split_config = self.default_config.split()
# self.test_insert_db()
print("debug end")
# exit()
def start(self):
start_time = time.time()
# self.debug()
# return
# run
split_config = self.default_config.split()
content_qu = queue.Queue()
runner_result_qu = queue.Queue()
content_result_qu = queue.Queue()
runner_threads = [threading.Thread(target=self.runner_proc, args=(proc_id, content_qu, runner_result_qu, config)) for proc_id, config in enumerate(split_config)]
content_threads = [threading.Thread(target=self.content_proc, args=(proc_id, content_qu, content_result_qu)) for proc_id in range(16)]
[th.start() for th in runner_threads]
[th.start() for th in content_threads]
[th.join() for th in runner_threads]
[th.join() for th in content_threads]
# rerun zero runners
runner_threads = []
runner_result_qu2 = queue.Queue()
idx = 0
while not runner_result_qu.empty():
res = runner_result_qu.get()
if res == 0:
th = threading.Thread(target=self.runner_proc, args=(idx, content_qu, runner_result_qu2, split_config[idx]))
runner_threads.append(th)
idx += 1
content_threads = [threading.Thread(target=self.content_proc, args=(proc_id, content_qu, content_result_qu)) for proc_id in range(16)]
[th.start() for th in runner_threads]
[th.start() for th in content_threads]
[th.join() for th in runner_threads]
[th.join() for th in content_threads]
# print running time
delta = time.time() - start_time
m, s = divmod(delta, 60)
h, m = divmod(m, 60)
print("finished all {}:{:02d}:{:02d} ".format(int(h), int(m), int(s)))