- timestamp 처리
- 실패한 runner 한번 더 시도
This commit is contained in:
@@ -11,6 +11,7 @@ import bs4
|
||||
import json
|
||||
import urllib
|
||||
import multiprocessing as mp
|
||||
import time
|
||||
|
||||
|
||||
class TwitterCrawler():
|
||||
@@ -21,17 +22,17 @@ class TwitterCrawler():
|
||||
self.proxies = None
|
||||
self.runner_finished = mp.Value('b', False)
|
||||
self.content_queue = mp.Queue()
|
||||
self.proc_q = mp.Queue()
|
||||
self.result_queue = mp.Queue()
|
||||
|
||||
def set_arguments(self, browser, keyword_id, db_num, before_day, until_page):
|
||||
params = self.db_helper.get_param(keyword_id)
|
||||
self.default_config.set_param(keyword_id, db_num, params)
|
||||
|
||||
def __getstate__(self):
|
||||
return self.default_config, self.db_helper, self.proxies, self.runner_finished, self.content_queue,
|
||||
return self.default_config, self.db_helper, self.proxies, self.runner_finished, self.content_queue, self.result_queue,
|
||||
|
||||
def __setstate__(self, state):
|
||||
self.default_config, self.db_helper, self.proxies, self.runner_finished, self.content_queue, = state
|
||||
self.default_config, self.db_helper, self.proxies, self.runner_finished, self.content_queue, self.result_queue, = state
|
||||
|
||||
def get_timeline_url(self, query, start_str, end_str, max_position=''):
|
||||
params = {
|
||||
@@ -77,7 +78,7 @@ class TwitterCrawler():
|
||||
|
||||
return resp
|
||||
|
||||
def runner_proc(self, config):
|
||||
def runner_proc(self, proc_id, config):
|
||||
b_continue = True
|
||||
min_tweet_id = None
|
||||
max_tweet_id = None
|
||||
@@ -103,12 +104,6 @@ class TwitterCrawler():
|
||||
print(' ## {}: {}...'.format(tweet.user_name, tweet.text[:20]))
|
||||
continue
|
||||
|
||||
if tweet.created_at < config.start:
|
||||
b_continue = False
|
||||
break
|
||||
elif tweet.created_at > config.end:
|
||||
continue
|
||||
|
||||
if tweet.reply_cnt > 0:
|
||||
self.insert_content_pool(tweet)
|
||||
|
||||
@@ -126,6 +121,8 @@ class TwitterCrawler():
|
||||
tweet_count += count
|
||||
|
||||
print('{} to {} runner finished {}'.format(config.start_str, config.end_str, tweet_count))
|
||||
self.result_queue.put((proc_id, tweet_count, ))
|
||||
return proc_id, tweet_count,
|
||||
|
||||
def insert_content_pool(self, tweet: Tweet):
|
||||
self.content_queue.put(tweet)
|
||||
@@ -140,6 +137,7 @@ class TwitterCrawler():
|
||||
max_position = ''
|
||||
|
||||
b_continue = True
|
||||
tweet_count = 0
|
||||
while b_continue:
|
||||
url = self.get_content_url(parent_tw.user_id, parent_tw.tweet_id, max_position)
|
||||
resp = self.get_page(url)
|
||||
@@ -164,12 +162,14 @@ class TwitterCrawler():
|
||||
self.db_helper.insert_tweet(tweet, self.default_config.db_num)
|
||||
|
||||
print('>>> {} {}: {}'.format(tweet.created_at, tweet.user_name, tweet.text[:20]))
|
||||
tweet_count += 1
|
||||
|
||||
b_continue = j['has_more_items']
|
||||
if b_continue:
|
||||
max_position = j['min_position']
|
||||
|
||||
print('content proc {} finished'.format(proc_id))
|
||||
return proc_id,
|
||||
|
||||
def debug_content(self):
|
||||
test_tw = Tweet()
|
||||
@@ -214,13 +214,13 @@ class TwitterCrawler():
|
||||
exit()
|
||||
|
||||
def start(self):
|
||||
start_time = time.time()
|
||||
|
||||
# self.debug()
|
||||
|
||||
split_config = self.default_config.split()
|
||||
# split_config = [self.default_config, ]
|
||||
|
||||
runner_process = [mp.Process(target=self.runner_proc, args=(config, )) for config in split_config]
|
||||
runner_process = [mp.Process(target=self.runner_proc, args=(proc_id, config, )) for proc_id, config in enumerate(split_config)]
|
||||
content_process = [mp.Process(target=self.content_proc, args=(proc_id, )) for proc_id in range(16)]
|
||||
|
||||
[p.start() for p in runner_process]
|
||||
@@ -230,4 +230,24 @@ class TwitterCrawler():
|
||||
self.runner_finished.value = True
|
||||
[p.join() for p in content_process]
|
||||
|
||||
print('finished all')
|
||||
# rerun zero pages
|
||||
runner2_process = []
|
||||
while not self.result_queue.empty():
|
||||
result = self.result_queue.get()
|
||||
if result[1] == 0:
|
||||
runner2_process.append(
|
||||
mp.Process(target=self.runner_proc, args=(result[0], split_config[result[0]], ))
|
||||
)
|
||||
content_process = [mp.Process(target=self.content_proc, args=(proc_id,)) for proc_id in range(16)]
|
||||
self.runner_finished.value = False
|
||||
[p.start() for p in runner2_process]
|
||||
[p.start() for p in content_process]
|
||||
|
||||
[p.join() for p in runner2_process]
|
||||
self.runner_finished.value = True
|
||||
[p.join() for p in content_process]
|
||||
|
||||
delta = time.time() - start_time
|
||||
m, s = divmod(delta, 60)
|
||||
h, m = divmod(m, 60)
|
||||
print("finished all {}:{:02d}:{:02d} ".format(int(h), int(m), int(s)))
|
||||
|
||||
@@ -3,7 +3,7 @@ from twitter.twconfig import TwitterConfig
|
||||
|
||||
import bs4
|
||||
import datetime
|
||||
|
||||
import pytz
|
||||
|
||||
class TweetParser:
|
||||
|
||||
@@ -23,12 +23,18 @@ class TweetParser:
|
||||
tweet.user_id = tag.select('span.username')[0].text[1:]
|
||||
tweet.text = tag.select('p.tweet-text')[0].text
|
||||
|
||||
time_str = tag.select('a.tweet-timestamp')[0].attrs['title']
|
||||
# time_str = tag.select('a.tweet-timestamp')[0].attrs['title']
|
||||
# english
|
||||
# tweet.created_at = datetime.datetime.strptime(time_str, '%I:%M %p - %d %b %Y')
|
||||
# korean
|
||||
time_str = time_str.replace('오전', 'AM').replace('오후', 'PM')
|
||||
tweet.created_at = datetime.datetime.strptime(time_str, '%p %I:%M - %Y년 %m월 %d일')
|
||||
# time_str = time_str.replace('오전', 'AM').replace('오후', 'PM')
|
||||
# tweet.created_at = datetime.datetime.strptime(time_str, '%p %I:%M - %Y년 %m월 %d일')
|
||||
|
||||
timestamp = int(tag.select('span._timestamp')[0].attrs['data-time'])
|
||||
utc_dt = datetime.datetime.utcfromtimestamp(timestamp)
|
||||
local_tz = pytz.timezone('Asia/Seoul')
|
||||
local_dt = utc_dt.replace(tzinfo=pytz.utc).astimezone(local_tz)
|
||||
tweet.created_at = local_tz.normalize(local_dt)
|
||||
|
||||
reply_tag = tag.select('div.ReplyingToContextBelowAuthor')
|
||||
tweet.is_reply = len(reply_tag) > 0
|
||||
|
||||
Reference in New Issue
Block a user