트위터 크롤러 1depth까지 완료

This commit is contained in:
mjjo
2017-07-20 11:00:20 +09:00
parent 1496644cc2
commit 08435ece8d
5 changed files with 267 additions and 55 deletions

View File

@@ -1,10 +1,11 @@
import datetime
import copy
class TwitterConfig:
protocol = 'https'
top_url = 'twitter.com'
search_url = '/i/search/timeline'
conversation_url_form = '/i/{}/conversation/{}'
def __init__(self):
self.keyword_id = -1
@@ -32,10 +33,37 @@ class TwitterConfig:
for keyword in params['searches'].split(','):
self.keywords.append(keyword.strip())
self.start = datetime.datetime.combine(params['start'], datetime.datetime.min.time())
self.start_str = str(params['start'])
self.end = datetime.datetime.combine(params['end'], datetime.datetime.min.time())
self.end_str = str(params['end'])
self.start = datetime.datetime.combine(params['start'], datetime.datetime.min.time())
self.end = datetime.datetime.combine(params['end'], datetime.datetime.min.time())
self.authorship = params['authorship']
self.state = params['state']
self.platform = params['platform']
self.platform = params['platform']
# debug
self.platform = 14
# self.start_str = '2017-05-01'
# self.end_str = '2017-05-02'
# self.start = datetime.datetime.strptime(self.start_str, '%Y-%m-%d')
# self.end = datetime.datetime.strptime(self.end_str, '%Y-%m-%d')
def split(self):
split_list = []
new_end = self.end
while new_end > self.start:
new_config = copy.deepcopy(self)
new_config.end = new_end
new_end = new_end + datetime.timedelta(days=-1)
new_config.start = new_end
new_config.start_str = new_config.start.strftime('%Y-%m-%d')
new_config.end_str = new_config.end.strftime('%Y-%m-%d')
split_list.append(new_config)
return split_list

View File

@@ -1,26 +1,39 @@
from twitter.tweet import Tweet
from base.dbdata import DataDBRow
import multiprocessing as mp
import copy
class TwitterDBHelper:
pymysql = __import__('pymysql.cursors')
def __init__(self):
self.conn = self.pymysql.connect(host='bigbird.iptime.org',
user='admin', passwd='admin123',
db='concepters', charset='utf8',
cursorclass=self.pymysql.cursors.DictCursor)
self.tweets = []
self.buffer = []
self.lock = mp.Lock()
pass
def __del__(self):
self.conn.close()
pass
# def __getstate__(self):
# return self.conn,
#
# def __setstate__(self, state):
# self.conn, = state
def get_param(self, keyword_id):
query = "select * from keyword where id = " + str(keyword_id)
params = []
try:
with self.conn.cursor() as cursor:
conn = self.pymysql.connect(host='bigbird.iptime.org',
user='admin', passwd='admin123',
db='concepters', charset='utf8',
cursorclass=self.pymysql.cursors.DictCursor)
with conn.cursor() as cursor:
cursor.execute(query)
params = cursor.fetchone()
conn.close()
except Exception as e:
print(e)
@@ -28,12 +41,43 @@ class TwitterDBHelper:
return params
def insert_tweet(self, db_num: int, tweet: Tweet):
query = tweet.get_insert_query(self.conn, db_num)
def insert_tweet(self, tweet: Tweet = None, db_num: int = -1, flush=False):
try:
with self.conn.cursor() as cursor:
cursor.execute(query)
self.conn.commit()
except Exception as e:
print(e)
# self.lock.acquire()
# if tweet is not None:
# self.buffer.append((tweet, db_num, ))
#
# local_buffer = None
# if len(self.buffer) >= 100 or flush:
# local_buffer = copy.deepcopy(self.buffer)
# self.buffer.clear()
# self.lock.release()
local_buffer = [(tweet, db_num, )]
if local_buffer:
while True:
try:
conn = self.pymysql.connect(host='bigbird.iptime.org',
user='admin', passwd='admin123',
db='concepters', charset='utf8',
cursorclass=self.pymysql.cursors.DictCursor)
except Exception as e:
print(e)
continue
else:
break
try:
with conn.cursor() as cursor:
for tweet, _db_num in local_buffer:
query = tweet.get_insert_query(conn, _db_num)
cursor.execute(query)
conn.commit()
except Exception as e:
print(e)
finally:
conn.close()

View File

@@ -6,6 +6,7 @@ class Tweet(DataDBRow):
def __init__(self):
super(self.__class__, self).__init__()
self.tweet_id = None
self.user_id = None
self.user_name = None
self.text = None
@@ -18,3 +19,5 @@ class Tweet(DataDBRow):
self.retweet_cnt = 0
self.favorite_cnt = 0
self.tweet_link = None
self.depth = 0

View File

@@ -4,96 +4,117 @@ from twitter.tweet import Tweet
from twitter.twparser import TweetParser
import base.proxy
import base.baseclasses
import requests
import bs4
import json
from urllib import parse
import urllib
import multiprocessing as mp
class TwitterCrawler:
class TwitterCrawler():
def __init__(self):
self.config = TwitterConfig()
self.default_config = TwitterConfig()
self.db_helper = TwitterDBHelper()
self.proxies = None
self.runner_finished = mp.Value('b', False)
self.content_queue = mp.Queue()
self.proc_q = mp.Queue()
def set_arguments(self, browser, keyword_id, db_num, before_day, until_page):
params = self.db_helper.get_param(keyword_id)
self.config.set_param(keyword_id, db_num, params)
self.default_config.set_param(keyword_id, db_num, params)
def get_url(self, query, max_position=None):
def __getstate__(self):
return self.default_config, self.db_helper, self.proxies, self.runner_finished, self.content_queue,
def __setstate__(self, state):
self.default_config, self.db_helper, self.proxies, self.runner_finished, self.content_queue, = state
def get_timeline_url(self, query, start_str, end_str, max_position=''):
params = {
'f': 'tweets',
'q': '{} since:{} until:{}'.format(query, self.config.start_str, self.config.end_str),
'language': 'en'
'vertical': 'default',
'src': 'typd',
'q': '{} since:{} until:{}'.format(query, start_str, end_str),
'language': 'en',
'max_position': max_position,
}
if max_position is not None:
params['max_position'] = max_position
url_tupple = (TwitterConfig.protocol, TwitterConfig.top_url, TwitterConfig.search_url, '', urllib.parse.urlencode(params), '')
return urllib.parse.urlunparse(url_tupple)
url_tupple = (TwitterConfig.protocol, TwitterConfig.top_url, TwitterConfig.search_url, '', parse.urlencode(params), '')
return parse.urlunparse(url_tupple)
def get_content_url(self, user_id, tweet_id, max_position=''):
params = {
'max_position': max_position,
}
sub_url = TwitterConfig.conversation_url_form.format(user_id, tweet_id)
url_tupple = (TwitterConfig.protocol, TwitterConfig.top_url, sub_url, '', urllib.parse.urlencode(params), '')
return urllib.parse.urlunparse(url_tupple)
def get_page(self, url):
headers = {
'user-agent': 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/46.0.2490.86 Safari/537.36',
'Accept-Language': 'en-US'
# 'Accept-Language': 'en-US',
'Accept-Language': 'ko-KR,ko;q=0.8,en-US;q=0.6,en;q=0.4',
}
if self.proxies is None:
self.proxies = base.proxy.get_proxy_for_requests()
resp = None
for cnt in range(5):
for cnt in range(10):
try:
resp = requests.get(url, headers=headers, proxies=self.proxies, timeout=5)
except Exception as e:
print('proxy {} is expired. ({})'.format(self.proxies, e))
base.proxy.set_proxy_expired(self.proxies)
self.proxies = base.proxy.get_proxy_for_requests()
else:
break
return resp
def insert_pool(self, tweet: Tweet):
pass
def start(self):
def runner_proc(self, config):
b_continue = True
min_tweet_id = None
max_tweet_id = None
max_position = None
max_position = ''
tweet_count = 0
while b_continue:
if min_tweet_id is not None:
max_position = 'TWEET-{}-{}'.format(max_tweet_id, min_tweet_id)
url = self.get_url(self.config.keywords[0], max_position)
r = self.get_page(url)
if r is None:
url = self.get_timeline_url(config.keywords[0], config.start_str, config.end_str, max_position)
resp = self.get_page(url)
if resp is None:
break
j = json.loads(r.content.decode('utf-8'))
j = json.loads(resp.content.decode('utf-8'))
soup = bs4.BeautifulSoup(j['items_html'], 'lxml')
tweet_tags = soup.select("div.tweet")
for tw in tweet_tags:
tweet = TweetParser.parse(tw, self.config.keyword_id)
tweet = TweetParser.parse(tw, config.keyword_id)
if tweet.is_reply is True:
print(' ## {}: {}...'.format(tweet.user_name, tweet.text[:20]))
continue
if tweet.created_at < self.config.start:
if tweet.created_at < config.start:
b_continue = False
break
elif tweet.created_at > self.config.end:
elif tweet.created_at > config.end:
continue
if tweet.reply_cnt > 0:
self.insert_pool(tweet)
self.insert_content_pool(tweet)
self.db_helper.insert_tweet(self.config.db_num, tweet)
self.db_helper.insert_tweet(tweet, config.db_num)
print('{} {}>>{}: {}'.format(tweet.created_at, tweet.article_id, tweet.user_name, tweet.text))
print('{} {}: {}...'.format(tweet.created_at, tweet.user_name, tweet.text[:20]))
count = len(tweet_tags)
if count == 0:
@@ -104,4 +125,109 @@ class TwitterCrawler:
max_tweet_id = tweet_tags[-1].attrs['data-item-id']
tweet_count += count
print('runner finished {}'.format(tweet_count))
print('{} to {} runner finished {}'.format(config.start_str, config.end_str, tweet_count))
def insert_content_pool(self, tweet: Tweet):
self.content_queue.put(tweet)
def content_proc(self, proc_id):
while not self.runner_finished.value or not self.content_queue.empty():
try:
parent_tw = self.content_queue.get(block=True, timeout=5)
except:
continue
max_position = ''
b_continue = True
while b_continue:
url = self.get_content_url(parent_tw.user_id, parent_tw.tweet_id, max_position)
resp = self.get_page(url)
if resp is None or resp.status_code == 404:
break
elif resp.status_code != 200:
print('content_get code {}'.format(resp.status_code))
continue
j = json.loads(resp.content.decode('utf-8'))
soup = bs4.BeautifulSoup(j['items_html'], 'lxml')
reply_container_tags = soup.select('li.ThreadedConversation')
for container_tags in reply_container_tags:
tweet_tags = container_tags.select('div.tweet')
for idx, tag in enumerate(tweet_tags):
if idx >= 2:
break
tweet = TweetParser.parse(tag, self.default_config.keyword_id, idx+1, parent_tw.tweet_link)
self.db_helper.insert_tweet(tweet, self.default_config.db_num)
print('>>> {} {}: {}'.format(tweet.created_at, tweet.user_name, tweet.text[:20]))
b_continue = j['has_more_items']
if b_continue:
max_position = j['min_position']
print('content proc {} finished'.format(proc_id))
def debug_content(self):
test_tw = Tweet()
test_tw.tweet_link = 'https://twitter.com/moonriver365/status/885797401033818112'
test_tw.user_id = 'moonriver365'
test_tw.tweet_id = 885797401033818112
self.content_queue.put(test_tw)
self.content_queue.put(test_tw)
print(self.content_queue.qsize())
print(self.content_queue.empty())
content_process = [mp.Process(target=self.content_proc, args=()) for _ in range(1)]
[p.start() for p in content_process]
[p.join() for p in content_process]
print('end all')
def test_insert_db(self):
test_tw = Tweet()
test_tw.tweet_link = 'https://twitter.com/moonriver365/status/885797401033818112'
test_tw.user_id = 'moonriver365'
test_tw.tweet_id = 885797401033818112
for _ in range(5):
self.db_helper.insert_tweet(test_tw, self.default_config.db_num)
def debug(self):
if base.baseclasses.is_debug:
## check proxy
# base.proxy.get_proxy_from_file('proxy.txt')
# proxy = {'https': 'http://45.56.86.93:3128', 'http': 'http://45.56.86.93:3128'}
# base.proxy.set_proxy_expired(proxy)
# return
## contents check
# self.debug_content()
# split_config = self.default_config.split()
self.test_insert_db()
exit()
def start(self):
# self.debug()
split_config = self.default_config.split()
# split_config = [self.default_config, ]
runner_process = [mp.Process(target=self.runner_proc, args=(config, )) for config in split_config]
content_process = [mp.Process(target=self.content_proc, args=(proc_id, )) for proc_id in range(16)]
[p.start() for p in runner_process]
[p.start() for p in content_process]
[p.join() for p in runner_process]
self.runner_finished.value = True
[p.join() for p in content_process]
print('finished all')

View File

@@ -8,9 +8,11 @@ import datetime
class TweetParser:
@staticmethod
def parse(tag, keyword_id):
def parse(tag, keyword_id, depth=0, main_url=None):
tweet = Tweet()
tweet.tweet_id = int(tag.attrs['data-tweet-id'])
nickname_tag = tag.select('strong.fullname')[0]
tweet.user_name = ''
for child in nickname_tag.children:
@@ -22,7 +24,11 @@ class TweetParser:
tweet.text = tag.select('p.tweet-text')[0].text
time_str = tag.select('a.tweet-timestamp')[0].attrs['title']
tweet.created_at = datetime.datetime.strptime(time_str, '%I:%M %p - %d %b %Y')
# english
# tweet.created_at = datetime.datetime.strptime(time_str, '%I:%M %p - %d %b %Y')
# korean
time_str = time_str.replace('오전', 'AM').replace('오후', 'PM')
tweet.created_at = datetime.datetime.strptime(time_str, '%p %I:%M - %Y년 %m월 %d')
reply_tag = tag.select('div.ReplyingToContextBelowAuthor')
tweet.is_reply = len(reply_tag) > 0
@@ -39,20 +45,25 @@ class TweetParser:
if len(favorite_cnt_tag) > 0:
tweet.favorites_cnt = int(favorite_cnt_tag[0].attrs['data-tweet-stat-count'])
link_tag = tag.select('a.js-permalink')
if len(link_tag) > 0:
tweet.tweet_link = link_tag[0].attrs['href']
if main_url:
tweet.tweet_link = main_url
else:
link_tag = tag.select('a.js-permalink')
if len(link_tag) > 0:
tweet.tweet_link = TwitterConfig.protocol + '://' + TwitterConfig.top_url + link_tag[0].attrs['href']
tweet.depth = depth
tweet.platform_name = 'twitter'
tweet.platform_form = 'post'
tweet.platform_title = tweet.user_id
tweet.article_form = 'reply' if tweet.is_reply else 'body'
tweet.article_form = 'body' if tweet.depth is 0 else 'reply'
# tweet.article_parent = None
tweet.article_id = tweet.user_id
tweet.article_nickname = tweet.user_name
# tweet.article_title = None
tweet.article_data = tweet.text
tweet.article_url = TwitterConfig.protocol + '://' + TwitterConfig.top_url + tweet.tweet_link
tweet.article_url = tweet.tweet_link
# tweet.article_hit = 0
tweet.article_date = tweet.created_at
# tweet.article_order = 0