Files
clients/WebBasedCrawler/twitter/twittercrawl.py
mjjo 3d5e2d0c98 - 트위터 크롤러 수정
- 중복 제거 후 insert
  - proxy.txt가 모두 만료되면 db 사용
  - proxy db에서 중복 제거해서 가져오기
  - 프록시 문제로 페이지 요청 시 0.1초 딜레이
  - 크롤러 stop 동작하도록
  - realtime 적용
2017-07-28 14:29:05 +09:00

333 lines
12 KiB
Python

from twitter.twconfig import TwitterConfig
from twitter.twdbhelper import TwitterDBHelper
from twitter.tweet import Tweet
from twitter.twparser import TweetParser
import base.proxy
import base.baseclasses
import requests
import bs4
import json
import urllib
import threading
import queue
import time
class TwitterCrawler():
def __init__(self):
self.default_config = TwitterConfig()
self.db_helper = TwitterDBHelper()
self.proxy = None
self.before_day = None
def set_arguments(self, browser, keyword_id, db_num, before_day, until_page):
params = self.db_helper.get_param(keyword_id)
self.before_day = before_day
self.default_config.set_param(keyword_id, db_num, params)
@staticmethod
def get_timeline_url(query, start_str, end_str, max_position=''):
params = {
'f': 'tweets',
'vertical': 'default',
'src': 'typd',
'q': '{} since:{} until:{}'.format(query, start_str, end_str),
'language': 'en',
'max_position': max_position,
}
url_tupple = (TwitterConfig.protocol, TwitterConfig.top_url, TwitterConfig.search_url, '', urllib.parse.urlencode(params), '')
return urllib.parse.urlunparse(url_tupple)
@staticmethod
def get_content_url(user_id, tweet_id, max_position=''):
params = {
'max_position': max_position,
}
sub_url = TwitterConfig.conversation_url_form.format(user_id, tweet_id)
url_tupple = (TwitterConfig.protocol, TwitterConfig.top_url, sub_url, '', urllib.parse.urlencode(params), '')
return urllib.parse.urlunparse(url_tupple)
def get_page(self, url, proc_id):
headers = {
'user-agent': 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/46.0.2490.86 Safari/537.36',
'Accept-Language': 'ko-KR,ko;q=0.8,en-US;q=0.6,en;q=0.4',
}
if not self.proxy:
self.proxy = base.proxy.get_proxy_for_requests()
resp = None
while True:
try:
time.sleep(0.1)
resp = requests.get(url, headers=headers, proxies=self.proxy, timeout=3)
except Exception as e:
if self.proxy == (None, None):
break
print('[{}] proxy {} is expired. ({})'.format(proc_id, self.proxy, e))
base.proxy.set_proxy_expired(self.proxy)
self.proxy = base.proxy.get_proxy_for_requests()
else:
break
return resp
def runner_proc(self, proc_id, content_queue, result_queue, config):
print('{} to {} runner thread start'.format(config.start_str, config.end_str))
b_continue = True
min_tweet_id = None
max_tweet_id = None
max_position = ''
tweet_count = 0
while b_continue:
if min_tweet_id is not None:
max_position = 'TWEET-{}-{}'.format(max_tweet_id, min_tweet_id)
url = self.get_timeline_url(config.keywords[0], config.start_str, config.end_str, max_position)
resp = self.get_page(url, proc_id)
if resp is None:
break
j = json.loads(resp.content.decode('utf-8'))
soup = bs4.BeautifulSoup(j['items_html'], 'lxml')
tweet_tags = soup.select("div.tweet")
for tw in tweet_tags:
tweet = TweetParser.parse(tw, config.keyword_id)
if tweet.is_reply is True:
# print(' ## {}: {}...'.format(tweet.user_name, tweet.text[:20]))
continue
if tweet.reply_cnt > 0:
self.insert_content_pool(proc_id, content_queue, tweet, tweet)
self.db_helper.insert_tweet(tweet, config.db_num)
# print('{} {}: {}...'.format(tweet.created_at, tweet.user_name, tweet.text[:20]))
print('body {} [{}]'.format(tweet.top_link, 'ok'))
count = len(tweet_tags)
if count == 0:
break
if min_tweet_id is None:
min_tweet_id = tweet_tags[0].attrs['data-item-id']
max_tweet_id = tweet_tags[-1].attrs['data-item-id']
tweet_count += count
print('{} to {} runner thread finished {}'.format(config.start_str, config.end_str, tweet_count))
result_queue.put({
'proc_id': proc_id,
'count': tweet_count,
})
# self.runner_processing[proc_id].value = False
return proc_id, tweet_count,
@staticmethod
def insert_content_pool(proc_id: int, qu, tweet: Tweet, tweet_top: Tweet):
# print(' [{}] pool insert: {} ({})'.format(proc_id, tweet.text[:20] if tweet.text else '', tweet.tweet_link))
qu.put((tweet, tweet_top,))
@staticmethod
def get_content(content_queue):
sleep_time = time.time()
while True:
try:
parent_tw, top_tw, = content_queue.get(block=True, timeout=2)
except Exception as e:
if time.time()-sleep_time > 15:
break
else:
continue
else:
return parent_tw, top_tw,
return None, None,
def content_proc(self, proc_id, content_queue, result_queue):
print('[{}] content thread start'.format(proc_id))
tweet_count = 0
while True:
parent_tw, top_tw, = self.get_content(content_queue)
if not parent_tw:
break
# print(' [{}] <<< parent : {} ({})'.format(proc_id, parent_tw.text[:20], parent_tw.tweet_link))
max_position = ''
b_continue = True
while b_continue:
url = self.get_content_url(parent_tw.user_id, parent_tw.tweet_id, max_position)
resp = self.get_page(url, proc_id)
if resp is None or resp.status_code == 404:
break
elif resp.status_code != 200:
print('[WARNING] content_get code {}'.format(resp.status_code))
continue
j = json.loads(resp.content.decode('utf-8'))
soup = bs4.BeautifulSoup(j['items_html'], 'lxml')
reply_container_tags = soup.select('li.ThreadedConversation')
reply_container_tags += TweetParser.get_lone_container(soup, parent_tw)
for container_tags in reply_container_tags:
tweet_tags = container_tags.select('div.tweet')
if len(tweet_tags) > 0:
tweet = TweetParser.parse(tweet_tags[0], self.default_config.keyword_id, parent_tw.depth+1, top_tw)
# print('[{}]>>> {} {}: {} ({}) ({})'.format(proc_id, tweet.created_at, tweet.user_name, tweet.text[:20], tweet.depth, tweet.tweet_link))
print('reply {} [{}]'.format(tweet.top_link, 'ok'))
self.insert_content_pool(proc_id, content_queue, tweet, top_tw)
self.db_helper.insert_tweet(tweet, self.default_config.db_num)
tweet_count += 1
b_continue = j['has_more_items']
if b_continue:
max_position = j['min_position']
result_queue.put({
'proc_id': proc_id,
'count': tweet_count,
})
print('[{}] content thread finished'.format(proc_id))
return proc_id, tweet_count,
def debug_content(self):
content_qu = queue.Queue()
runner_result_qu = queue.Queue()
content_result_qu = queue.Queue()
test_tw = Tweet()
# test_tw.tweet_link = 'https://twitter.com/yniold_/status/886863893137678337'
# test_tw.user_id = 'yniold_'
# test_tw.tweet_id = 886863893137678337
test_tw.tweet_link = 'https://twitter.com/Awesome_vely/status/888704413111435264'
test_tw.user_id = 'Awesome_vely'
test_tw.tweet_id = 888704413111435264
test_tw.text = '시작'
self.insert_content_pool(0, content_qu, test_tw, test_tw)
content_threads = [threading.Thread(target=self.content_proc, args=(proc_id, content_qu, content_result_qu)) for proc_id in range(16)]
[th.start() for th in content_threads]
[th.join() for th in content_threads]
while not content_result_qu.empty():
res = content_result_qu.get()
print('reply : {}'.format(res))
print('end all')
def test_insert_db(self):
test_tw = Tweet()
test_tw.tweet_link = 'https://twitter.com/moonriver365/status/885797401033818112'
test_tw.user_id = 'moonriver365'
test_tw.tweet_id = 885797401033818112
for _ in range(5):
self.db_helper.insert_tweet(test_tw, self.default_config.db_num)
def debug(self):
if base.baseclasses.is_debug:
## check proxy
# base.proxy.get_proxy_from_file('proxy.txt')
# proxy = {'https': 'http://45.56.86.93:3128', 'http': 'http://45.56.86.93:3128'}
# base.proxy.set_proxy_expired(proxy)
# return
## contents check
self.debug_content()
# split_config = self.default_config.split()
# self.test_insert_db()
print("debug end")
# exit()
def run(self):
start_time = time.time()
# run
split_config = self.default_config.split()
content_qu = queue.Queue()
runner_result_qu = queue.Queue()
content_result_qu = queue.Queue()
runner_result_cnt = 0
content_result_cnt = 0
runner_threads = [threading.Thread(target=self.runner_proc, args=(proc_id, content_qu, runner_result_qu, config)) for proc_id, config in enumerate(split_config)]
content_threads = [threading.Thread(target=self.content_proc, args=(proc_id, content_qu, content_result_qu)) for proc_id in range(16)]
[th.start() for th in runner_threads]
[th.start() for th in content_threads]
[th.join() for th in runner_threads]
[th.join() for th in content_threads]
# rerun zero runners
runner_threads = []
runner_result_qu2 = queue.Queue()
idx = 0
while not runner_result_qu.empty():
res = runner_result_qu.get()
runner_result_cnt += res['count']
if res['count'] == 0:
th = threading.Thread(target=self.runner_proc, args=(idx, content_qu, runner_result_qu2, split_config[idx]))
runner_threads.append(th)
idx += 1
if len(runner_threads) > 0:
content_threads = [threading.Thread(target=self.content_proc, args=(proc_id, content_qu, content_result_qu)) for proc_id in range(16)]
else:
content_threads = []
[th.start() for th in runner_threads]
[th.start() for th in content_threads]
[th.join() for th in runner_threads]
[th.join() for th in content_threads]
while not runner_result_qu2.empty():
res = runner_result_qu2.get()
runner_result_cnt += res['count']
while not content_result_qu.empty():
res = content_result_qu.get()
content_result_cnt += res['count']
print('total body count: {}'.format(runner_result_cnt))
print('total reply count: {}'.format(content_result_cnt))
# print running time
delta = time.time() - start_time
m, s = divmod(delta, 60)
h, m = divmod(m, 60)
print("finished all {}:{:02d}:{:02d} ".format(int(h), int(m), int(s)))
def start(self):
# self.debug()
# return
# run
while True:
self.default_config.reload_realtime(self.before_day)
self.run()
if not self.default_config.realtime:
break