트위터 크롤러 수정

- 프록시를 porxy2 db에 넣고 사용
This commit is contained in:
mjjo
2017-08-09 15:32:57 +09:00
parent fc266404c8
commit 1fb61f0b4c
13 changed files with 786 additions and 29 deletions

1
.gitignore vendored
View File

@@ -5,3 +5,4 @@
WebBasedCrawler/proxy.txt
clients-win/
clients-linux/
**/*.log

View File

@@ -0,0 +1,61 @@
import logging
import logging.handlers
import enum
import datetime
import base.baseclasses
import threading
class CustomFormatter(logging.Formatter):
def format(self, record):
# msg_prefix = '[{}] [{}} [{}] '.format(self.formatTime(record, self.datefmt), threading.current_thread().ident, record.levelname)
# record.msg = msg_prefix + record.msg
# record.msg = '[%s] %s' % (threading.current_thread().ident, record.msg)
return super(CustomFormatter, self).format(record)
logger = logging.getLogger('mylogger')
# formatter = logging.Formatter('[ %(asctime)s][%(threadName)s][%(levelname)s][%(filename)s(%(lineno)s)] > %(message)s', datefmt='%Y-%m-%d %H:%M:%S')
# formatter = CustomFormatter('[ %(asctime)s][%(thread)s][%(levelname)s][%(pathname)s(%(lineno)s)]\n> %(message)s', datefmt='%Y-%m-%d %H:%M:%S')
formatter = CustomFormatter('', datefmt='%Y-%m-%d %H:%M:%S')
logging.handlers.RotatingFileHandler('crawler.log')
file_handler = logging.FileHandler('{}.log'.format(datetime.datetime.now().strftime('%Y-%m-%d')))
file_handler.setLevel(logging.DEBUG)
file_handler.setFormatter(formatter)
debug_stream_handler = logging.StreamHandler()
debug_stream_handler.setLevel(logging.DEBUG)
debug_stream_handler.setFormatter(formatter)
normal_stream_handler = logging.StreamHandler()
normal_stream_handler.setLevel(logging.INFO)
logger.addHandler(file_handler)
# if base.baseclasses.is_debug:
# logger.addHandler(debug_stream_handler)
# else:
# logger.addHandler(normal_stream_handler)
class LogLevel(enum.Enum):
DEBUG = 1
INFO = 2
WARNING = 3
ERROR = 4
CRITICAL = 5
def log(msg, level=LogLevel.INFO):
if level == LogLevel.DEBUG:
logger.debug(msg)
elif level == LogLevel.INFO:
logger.info(msg)
elif level == LogLevel.WARNING:
logger.warning(msg)
elif level == LogLevel.ERROR:
logger.error(msg)
elif level == LogLevel.CRITICAL:
logger.critical(msg)

View File

@@ -0,0 +1,180 @@
import base.proxy_crawler as proxy_crawler
import base.logger as logger
import sqlalchemy
import sqlalchemy.ext
import sqlalchemy.ext.declarative
import sqlalchemy.orm
import enum
import datetime
import threading
import random
Base = sqlalchemy.ext.declarative.declarative_base()
class Proxy2Model(Base):
__tablename__ = 'proxy2'
id = sqlalchemy.Column(sqlalchemy.Integer, primary_key=True, nullable=False, autoincrement=True)
ip = sqlalchemy.Column(sqlalchemy.String(15), primary_key=True)
port = sqlalchemy.Column(sqlalchemy.SmallInteger, primary_key=True)
create_at = sqlalchemy.Column(sqlalchemy.DateTime, default=datetime.datetime.now)
naver_block_at = sqlalchemy.Column(sqlalchemy.DateTime)
daum_block_at = sqlalchemy.Column(sqlalchemy.DateTime)
facebook_block_at = sqlalchemy.Column(sqlalchemy.DateTime)
kakao_block_at = sqlalchemy.Column(sqlalchemy.DateTime)
insta_block_at = sqlalchemy.Column(sqlalchemy.DateTime)
twitter_block_at = sqlalchemy.Column(sqlalchemy.DateTime)
youtube_block_at = sqlalchemy.Column(sqlalchemy.DateTime)
def __init__(self, ip, port):
self.ip = ip
self.port = port
def __repr__(self):
return '{}:{}'.format(self.ip, self.port)
def get_instance_for_http(self):
return {
'http': '{}:{}'.format(self.ip, self.port),
'https': '{}:{}'.format(self.ip, self.port),
'ip': self.ip,
'port': self.port,
}
class Platform(enum.Enum):
NAVER = 'naver'
DAUM = 'daum'
FACEBOOK = 'facebook'
KAKAO = 'kakao'
INSTA = 'insta'
TWITTER = 'twitter'
YOUTUBE = 'youtube'
class Proxy2Handler:
block_field_map = {
Platform.NAVER: Proxy2Model.naver_block_at,
Platform.DAUM: Proxy2Model.daum_block_at,
Platform.FACEBOOK: Proxy2Model.facebook_block_at,
Platform.KAKAO: Proxy2Model.kakao_block_at,
Platform.INSTA: Proxy2Model.insta_block_at,
Platform.TWITTER: Proxy2Model.twitter_block_at,
Platform.YOUTUBE: Proxy2Model.youtube_block_at,
}
def __init__(self):
self.lock = threading.Lock()
self.engine = sqlalchemy.create_engine('mysql+pymysql://admin:admin123@bigbird.iptime.org/concepters?charset=utf8')
session_factory = sqlalchemy.orm.sessionmaker(bind=self.engine)
self.session = sqlalchemy.orm.scoped_session(session_factory)
def lock_enter(self):
# logger.log('lock {}'.format(threading.current_thread().ident))
# self.lock.acquire()
pass
def lock_leave(self):
# self.lock.release()
# logger.log('unlock {}'.format(threading.current_thread().ident))
pass
def commit(self):
self.lock_enter()
self.session.commit()
self.lock_leave()
def get_oldest(self, platform):
self.lock_enter()
instance = self.session.query(Proxy2Model).order_by(self.block_field_map[platform].desc()).first()
self.lock_leave()
return instance
# def get(self, platform):
# proxy = self.session.query(Proxy2Model).filter(self.block_field_map[platform] == None).first()
# if not proxy:
# proxy_crawler.crawl_proxies()
#
# proxy = self.get_oldest(platform)
#
# return proxy
def get_query(self, ip, port):
return self.session.query(Proxy2Model).filter_by(ip=ip).filter_by(port=port)
def get_instance(self, ip, port):
self.lock_enter()
instance = self.get_query(ip, port).first()
self.lock_leave()
return instance
def get(self, platform, proc_id=-1):
self.lock_enter()
block_column = self.block_field_map[platform]
try:
instances = self.session.query(Proxy2Model).filter(block_column == None).all()
except Exception as e:
self.lock_leave()
try:
session_factory = sqlalchemy.orm.sessionmaker(bind=self.engine)
self.session = sqlalchemy.orm.scoped_session(session_factory)
except Exception as e2:
logger.log('{} session recreate'.format(proc_id))
return None
instance = instances[random.randint(0, len(instances)-1)] if len(instances) > 0 else None
if instance:
self.lock_leave()
return instance.get_instance_for_http()
else:
proxies = proxy_crawler.crawl_proxies()
self.insert_all(proxies)
self.unlock()
return self.get(platform, proc_id)
def insert(self, ip, port):
instance = self.get_instance(ip, port)
if not instance:
proxy = Proxy2Model(ip, port)
self.lock_enter()
self.session.add(proxy)
self.lock_leave()
def insert_all(self, proxies):
# INSERT INTO proxy2(ip, PORT)
# SELECT <ip>, <port> FROM DUAL
# WHERE NOT EXISTS (SELECT * FROM proxy2 WHERE ip=<ip> AND PORT=<port>)
instances_add = []
for proxy in proxies:
instance = self.get_instance(proxy['ip'], proxy['port'])
if not instance:
instances_add.append(Proxy2Model(proxy['ip'], proxy['port']))
self.session.bulk_save_objects(instances_add)
self.commit()
def set_proxy_blocked(self, ip, port, platform):
block_column = self.block_field_map[platform]
query = self.get_query(ip, port).filter(block_column == None)
query.update({block_column: datetime.datetime.now()})
self.commit()
if __name__ == '__main__':
proxy_handler = Proxy2Handler()
# proxy_handler.insert('127.0.0.5', 80)
# proxy_handler.commit()
# proxy_handler.set_proxy_blocked('127.0.0.3', 80, Platform.TWITTER)
# instance = proxy_handler.get(Platform.TWITTER)
# instance = proxy_handler.get_oldest(Platform.TWITTER)
# print(instance)
proxy = proxy_handler.get(Platform.TWITTER)

View File

@@ -24,7 +24,10 @@ def get_proxies_free_proxy():
if len(tds) > 0:
ip = tds[0].text
port = tds[1].text
proxies.append('{}:{}'.format(ip, port))
proxies.append({
'ip': ip,
'port': int(port),
})
return proxies
@@ -40,8 +43,11 @@ def get_proxies_proxy_searcher():
for tr in trs:
tds = tr.select('td')
if len(tds) > 0:
proxy = tds[1].text
proxies.append(proxy)
tokens = tds[1].text.split(':')
proxies.append({
'ip': tokens[0],
'port': int(tokens[1]),
})
return proxies
@@ -64,8 +70,8 @@ def get_proxies_proxy_searcher():
def check_proxy(qu, proxy, url):
proxy_dict = {
'http': proxy,
'https': proxy,
'http': '{}:{}'.format(proxy['ip'], proxy['port']),
'https': '{}:{}'.format(proxy['ip'], proxy['port']),
}
try:
resp = requests.get(url, proxies=proxy_dict, timeout=2)
@@ -84,7 +90,8 @@ def crawl_proxies(check_url=None):
proxies = get_proxies_free_proxy()
proxies += get_proxies_proxy_searcher()
# proxies += get_proxies_nntime()
proxies = list(set(proxies))
# proxies = list(set(proxies))
print('proxy crawled {}'.format(len(proxies)))
if check_url:
qu = queue.Queue()
@@ -104,17 +111,20 @@ def crawl_proxies(check_url=None):
else:
proxies_alive = proxies
proxies_alive.sort()
print('proxy crawler got {} proxies'.format(len(proxies_alive)))
with open('proxy.txt', 'w') as f:
print('proxy crawler dump start')
for proxy in proxies_alive:
# print(proxy)
f.write(proxy + '\n')
print('proxy crawler dump end')
print('proxy crawling end')
return proxies_alive
# proxies_alive.sort()
# print('proxy crawler got {} proxies'.format(len(proxies_alive)))
#
# with open('proxy.txt', 'w') as f:
# print('proxy crawler dump start')
# for proxy in proxies_alive:
# # print(proxy)
# f.write(proxy + '\n')
# print('proxy crawler dump end')
#
# print('proxy crawling end')
if __name__ == '__main__':

View File

@@ -346,6 +346,7 @@ def crawl_content_process(qu, keyword_id, db_num):
try:
# get a instance of InstaContent by do_no_proxy func.
# if element['url'] is invalid, content is None
element['url'] = 'https://www.instagram.com/p/BWrBng6l9H3/'
content = m_c_i.do_no_proxy(element['url'])
if not content:
break
@@ -359,6 +360,7 @@ def crawl_content_process(qu, keyword_id, db_num):
printl("proxies = ", content.proxies)
m_c_i.change_proxy()
raise Exception("reply load error")
#if rep:
replies = rep + replies
wait(reply_wait_sec)
for j in range(0, len(replies)):

View File

@@ -5,3 +5,4 @@ eventlet
requests
bs4
pytz
sqlalchemy=1.1.13

View File

@@ -4,6 +4,7 @@ from twitter.tweet import Tweet
from twitter.twparser import TweetParser
import base.proxy
import base.proxy2 as proxy2
import base.baseclasses
import requests
@@ -22,7 +23,9 @@ class TwitterCrawler:
self.default_config = TwitterConfig()
self.db_helper = TwitterDBHelper()
self.proxy = {}
self.proxy_handler = proxy2.Proxy2Handler()
self.before_day = None
self.runner_finished_queue = queue.Queue()
def set_arguments(self, browser, keyword_id, db_num, before_day, until_page):
params = self.db_helper.get_param(keyword_id)
@@ -53,6 +56,14 @@ class TwitterCrawler:
url_tupple = (TwitterConfig.protocol, TwitterConfig.top_url, sub_url, '', urllib.parse.urlencode(params), '')
return urllib.parse.urlunparse(url_tupple)
def get_proxy(self, proxy_key):
proxy = None
while not proxy:
proxy = self.proxy_handler.get(proxy2.Platform.TWITTER, proxy_key)
time.sleep(1)
return proxy
def get_page(self, url, is_runner, proc_id):
headers = {
'user-agent': 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/46.0.2490.86 Safari/537.36',
@@ -60,7 +71,8 @@ class TwitterCrawler:
}
proxy_key = '{}-{}'.format('runner' if is_runner else 'content', proc_id)
if proxy_key not in self.proxy:
self.proxy[proxy_key] = base.proxy.get_proxy_for_requests()
# self.proxy[proxy_key] = base.proxy.get_proxy_for_requests()
self.proxy[proxy_key] = self.get_proxy(proxy_key)
resp = None
while True:
@@ -70,9 +82,11 @@ class TwitterCrawler:
if self.proxy[proxy_key] == (None, None):
break
print('[{}] proxy {} is expired. ({})'.format(proc_id, self.proxy[proxy_key], e))
base.proxy.set_proxy_expired(self.proxy[proxy_key])
self.proxy[proxy_key] = base.proxy.get_proxy_for_requests()
# print('[{}] proxy {} is expired. ({})'.format(proc_id, self.proxy[proxy_key], e))
# base.proxy.set_proxy_expired(self.proxy[proxy_key])
self.proxy_handler.set_proxy_blocked(self.proxy[proxy_key]['ip'], self.proxy[proxy_key]['port'], proxy2.Platform.TWITTER)
# self.proxy[proxy_key] = base.proxy.get_proxy_for_requests()
self.proxy[proxy_key] = self.get_proxy(proxy_key)
else:
break
@@ -96,7 +110,8 @@ class TwitterCrawler:
j = json.loads(resp.text)
if j['new_latent_count'] <= 0:
proxy_key = '{}-{}'.format('runner' if is_runner else 'content', proc_id)
self.proxy[proxy_key] = base.proxy.get_proxy_for_requests()
# self.proxy[proxy_key] = base.proxy.get_proxy_for_requests()
self.proxy[proxy_key] = self.get_proxy(proxy_key)
continue
else:
return j
@@ -165,14 +180,13 @@ class TwitterCrawler:
# print(' [{}] pool insert: {} ({})'.format(proc_id, tweet.text[:20] if tweet.text else '', tweet.tweet_link))
qu.put((tweet, tweet_top,))
@staticmethod
def get_content(content_queue):
def get_content(self, content_queue):
sleep_time = time.time()
while True:
try:
parent_tw, top_tw, = content_queue.get(block=True, timeout=2)
except Exception as e:
if time.time()-sleep_time > 15:
if not self.runner_finished_queue.empty() and time.time()-sleep_time > 15:
break
else:
continue
@@ -281,7 +295,7 @@ class TwitterCrawler:
start_time = time.time()
# run
worker_count = 16
worker_count = 4
split_config = self.default_config.split()
content_qu = queue.Queue()
@@ -298,11 +312,15 @@ class TwitterCrawler:
[runner_pool.submit(self.runner_proc, proc_id, content_qu, runner_result_qu, config) for proc_id, config in enumerate(split_config)]
runner_pool.shutdown(wait=True)
self.runner_finished_queue.put(True)
content_pool.shutdown(wait=True)
self.db_helper.flush()
# rerun zero runners
print('restart failed runner')
while not self.runner_finished_queue.empty():
self.runner_finished_queue.get()
for retry in range(5):
runner_pool = concurrent.futures.ThreadPoolExecutor(max_workers=worker_count)
runner_result_qu2 = queue.Queue()
@@ -324,6 +342,7 @@ class TwitterCrawler:
[content_pool.submit(self.content_proc, proc_id, content_qu, content_result_qu) for proc_id in range(worker_count)]
runner_pool.shutdown(wait=True)
self.runner_finished_queue.put(True)
content_pool.shutdown(wait=True)
self.db_helper.flush()

View File

@@ -12,7 +12,7 @@ from naver import navercrawl
from facebook import facebookcrawl
from facebook import facebookcrawlbs
from twitter import twittercrawl
from youtube import youtubecrawl
# from youtube import youtubecrawl
from base.baseclasses import print_and_flush

View File

@@ -0,0 +1,21 @@
from base.dbdata import DataDBRow
class Youtube(DataDBRow):
def __init__(self):
super(self.__class__, self).__init__()
self.user_id = None
self.user_name = None
self.text = None
self.created_at = None
self.favorites = 0
self.is_reply = False
self.reply_cnt = 0
self.retweet_cnt = 0
self.favorite_cnt = 0
self.top_link = None
self.depth = 0

View File

@@ -1,7 +1,301 @@
from youtube.ytconfig import YoutubeConfig
from youtube.ytdbhelper import YoutubeDBHelper
from youtube.youtube import Youtube
from youtube.ytparser import YoutubeParser
import base.proxy
import base.baseclasses
import requests
import bs4
import json
import urllib
import concurrent.futures
import threading
import queue
import time
class YoutubeCrawler:
class YoutubeMainCrawl:
def __init__(self):
pass
self.default_config = YoutubeConfig()
self.db_helper = YoutubeDBHelper()
self.proxy = {}
self.before_day = None
def set_arguments(self, browser, keyword_id, db_num, before_day, until_page):
params = self.db_helper.get_param(keyword_id)
self.before_day = before_day
self.default_config.set_param(keyword_id, db_num, params)
@staticmethod
def get_timeline_url(query, start_str, end_str, max_position=''):
params = {
'sp': 'CABQFA==', # 날짜순
'q': query,
}
url_tupple = (YoutubeConfig.protocol, YoutubeConfig.top_url, YoutubeConfig.search_url, '', urllib.parse.urlencode(params), '')
return urllib.parse.urlunparse(url_tupple)
@staticmethod
def get_content_url(user_id, tweet_id, max_position=''):
params = {
'max_position': max_position,
}
sub_url = TwitterConfig.conversation_url_form.format(user_id, tweet_id)
url_tupple = (TwitterConfig.protocol, TwitterConfig.top_url, sub_url, '', urllib.parse.urlencode(params), '')
return urllib.parse.urlunparse(url_tupple)
def get_page(self, url, is_runner, proc_id):
headers = {
'user-agent': 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/46.0.2490.86 Safari/537.36',
'Accept-Language': 'ko-KR,ko;q=0.8,en-US;q=0.6,en;q=0.4',
}
proxy_key = '{}-{}'.format('runner' if is_runner else 'content', proc_id)
if proxy_key not in self.proxy:
self.proxy[proxy_key] = base.proxy.get_proxy_for_requests()
resp = None
while True:
try:
resp = requests.get(url, headers=headers, proxies=self.proxy[proxy_key], timeout=3)
except Exception as e:
if self.proxy[proxy_key] == (None, None):
break
# print('[{}] proxy {} is expired. ({})'.format(proc_id, self.proxy[proxy_key], e))
base.proxy.set_proxy_expired(self.proxy[proxy_key])
self.proxy[proxy_key] = base.proxy.get_proxy_for_requests()
else:
break
return resp
def get_page_data(self, url, is_runner, proc_id):
for retry_cnt in range(5):
# get response
resp = self.get_page(url, is_runner, proc_id)
if not resp:
break
# check response
if resp.status_code == 404:
break
elif resp.status_code != 200:
print('[WARNING] content_get code {}'.format(resp.status_code))
continue
# parsing result
j = json.loads(resp.text)
if j['new_latent_count'] <= 0:
proxy_key = '{}-{}'.format('runner' if is_runner else 'content', proc_id)
self.proxy[proxy_key] = base.proxy.get_proxy_for_requests()
continue
else:
return j
return {
'items_html': '',
'has_more_items': False,
}
def runner_proc(self, proc_id, content_queue, result_queue, config):
print('[{}] {} to {} runner thread start'.format(proc_id, config.start_str, config.end_str))
b_continue = True
min_tweet_id = None
max_tweet_id = None
max_position = ''
tweet_count = 0
while b_continue:
url = self.get_timeline_url(config.keywords[0], config.start_str, config.end_str, max_position)
j = self.get_page_data(url, True, proc_id)
soup = bs4.BeautifulSoup(j['items_html'], 'lxml')
tweet_tags = soup.select("div.tweet")
tweet_ids = []
for tw in tweet_tags:
tweet = TweetParser.parse(tw, config.keyword_id)
tweet_ids.append(tweet.tweet_id)
if tweet.is_reply is True:
# print(' ## {}: {}...'.format(tweet.user_name, tweet.text[:20]))
continue
if tweet.reply_cnt > 0:
self.insert_content_pool(proc_id, content_queue, tweet, tweet)
self.db_helper.insert_tweet(tweet, config.db_num)
# print('{} {}: {}...'.format(tweet.created_at, tweet.user_name, tweet.text[:20]))
print('[{}] body {} ({}) [{}]'.format(proc_id, tweet.top_link, tweet.created_at, 'ok'))
count = len(tweet_tags)
tweet_count += count
b_continue = count > 0
# b_continue = j['has_more_items']
if b_continue:
if min_tweet_id is None:
min_tweet_id = tweet_ids[0]
max_tweet_id = tweet_ids[-1]
if 'min_position' in j:
max_position = j['min_position']
else:
max_position = 'TWEET-{}-{}'.format(max_tweet_id, min_tweet_id)
print('[{}] {} to {} runner thread finished {}'.format(proc_id, config.start_str, config.end_str, tweet_count))
result_queue.put({
'proc_id': proc_id,
'count': tweet_count,
})
# self.runner_processing[proc_id].value = False
return proc_id, tweet_count,
@staticmethod
def insert_content_pool(proc_id: int, qu, tweet: Youtube, tweet_top: Youtube):
# print(' [{}] pool insert: {} ({})'.format(proc_id, tweet.text[:20] if tweet.text else '', tweet.tweet_link))
qu.put((tweet, tweet_top,))
@staticmethod
def get_content(content_queue):
sleep_time = time.time()
while True:
try:
parent_tw, top_tw, = content_queue.get(block=True, timeout=2)
except Exception as e:
if time.time()-sleep_time > 15:
break
else:
continue
else:
return parent_tw, top_tw,
return None, None,
def content_proc(self, proc_id, content_queue, result_queue):
# print('[{}] content thread start'.format(proc_id))
#
# tweet_count = 0
# while True:
# parent_tw, top_tw, = self.get_content(content_queue)
# if not parent_tw:
# break
#
# # print(' [{}] <<< parent : {} ({})'.format(proc_id, parent_tw.text[:20], parent_tw.tweet_link))
#
# max_position = ''
#
# b_continue = True
# while b_continue:
# url = self.get_content_url(parent_tw.user_id, parent_tw.tweet_id, max_position)
# j = self.get_page_data(url, False, proc_id)
# soup = bs4.BeautifulSoup(j['items_html'], 'lxml')
#
# reply_container_tags = soup.select('li.ThreadedConversation')
# reply_container_tags += TweetParser.get_lone_container(soup, parent_tw)
# for container_tags in reply_container_tags:
# tweet_tags = container_tags.select('div.tweet')
# if len(tweet_tags) > 0:
# tweet = TweetParser.parse(tweet_tags[0], self.default_config.keyword_id, parent_tw.depth+1, top_tw)
# # print('[{}]>>> {} {}: {} ({}) ({})'.format(proc_id, tweet.created_at, tweet.user_name, tweet.text[:20], tweet.depth, tweet.tweet_link))
# print('[{}] reply {} [{}]'.format(proc_id, tweet.top_link, 'ok'))
# self.insert_content_pool(proc_id, content_queue, tweet, top_tw)
# self.db_helper.insert_tweet(tweet, self.default_config.db_num)
# tweet_count += 1
#
# b_continue = j['has_more_items']
# if b_continue:
# max_position = j['min_position']
#
# result_queue.put({
# 'proc_id': proc_id,
# 'count': tweet_count,
# })
#
# print('[{}] content thread finished'.format(proc_id))
tweet_count = 0
return proc_id, tweet_count,
def run(self):
start_time = time.time()
# run
worker_count = 1
split_config = self.default_config.split()
content_qu = queue.Queue()
runner_result_qu = queue.Queue()
content_result_qu = queue.Queue()
runner_result_cnt = 0
content_result_cnt = 0
content_pool = concurrent.futures.ThreadPoolExecutor(max_workers=worker_count)
[content_pool.submit(self.content_proc, proc_id, content_qu, content_result_qu) for proc_id in range(worker_count)]
runner_pool = concurrent.futures.ThreadPoolExecutor(max_workers=worker_count)
[runner_pool.submit(self.runner_proc, proc_id, content_qu, runner_result_qu, config) for proc_id, config in enumerate(split_config)]
runner_pool.shutdown(wait=True)
content_pool.shutdown(wait=True)
self.db_helper.flush()
# rerun zero runners
print('restart failed runner')
for retry in range(5):
runner_pool = concurrent.futures.ThreadPoolExecutor(max_workers=worker_count)
runner_result_qu2 = queue.Queue()
b_rerun = False
while not runner_result_qu.empty():
res = runner_result_qu.get()
runner_result_cnt += res['count']
proc_id = res['proc_id']
if res['count'] == 0:
runner_pool.submit(self.runner_proc, proc_id, content_qu, runner_result_qu2, split_config[proc_id])
b_rerun = True
while not content_result_qu.empty():
res = content_result_qu.get()
content_result_cnt += res['count']
if b_rerun:
content_pool = concurrent.futures.ThreadPoolExecutor(max_workers=worker_count)
[content_pool.submit(self.content_proc, proc_id, content_qu, content_result_qu) for proc_id in range(worker_count)]
runner_pool.shutdown(wait=True)
content_pool.shutdown(wait=True)
self.db_helper.flush()
runner_result_qu = runner_result_qu2
while not runner_result_qu.empty():
res = runner_result_qu.get()
runner_result_cnt += res['count']
while not content_result_qu.empty():
res = content_result_qu.get()
content_result_cnt += res['count']
print('total body count: {}'.format(runner_result_cnt))
print('total reply count: {}'.format(content_result_cnt))
# print running time
delta = time.time() - start_time
m, s = divmod(delta, 60)
h, m = divmod(m, 60)
print("finished all {}:{:02d}:{:02d} ".format(int(h), int(m), int(s)))
def start(self):
pass
# run
while True:
self.default_config.reload_realtime(self.before_day)
self.run()
if not self.default_config.realtime:
break

View File

@@ -0,0 +1,71 @@
import datetime
import copy
class YoutubeConfig:
protocol = 'https'
top_url = 'youtube.com'
search_url = '/i/search/timeline'
conversation_url_form = '/i/{}/conversation/{}'
def __init__(self):
self.keyword_id = -1
self.db_num = -1
self.id = 0
self.realtime = False
self.keywords = []
self.start_str = None
self.start = None
self.end_str = None
self.end = None
self.authorship = None
self.state = None
self.platform = None
def set_param(self, keyword_id, db_num, params):
self.keyword_id = int(keyword_id)
self.db_num = int(db_num)
self.id = int(params['id'])
self.realtime = params['realtime'] == 1
self.keywords = []
for keyword in params['searches'].split(','):
self.keywords.append(keyword.strip())
self.start_str = str(params['start'])
self.end_str = str(params['end'])
self.start = datetime.datetime.strptime(self.start_str, '%Y-%m-%d')
self.end = datetime.datetime.strptime(self.end_str, '%Y-%m-%d')
self.authorship = params['authorship']
self.state = params['state']
self.platform = params['platform']
def reload_realtime(self, before_day):
if not self.realtime:
return
self.end_str = datetime.datetime.strftime(datetime.datetime.now(), '%Y-%m-%d')
self.end = datetime.datetime.strptime(self.end_str, '%Y-%m-%d')
self.start = self.end + datetime.timedelta(days=int(before_day))
self.start_str = datetime.datetime.strftime(self.start, '%Y-%m-%d')
def split(self):
split_list = []
new_end = self.end
while new_end > self.start:
new_config = copy.deepcopy(self)
new_config.end = new_end
new_end = new_end + datetime.timedelta(days=-1)
new_config.start = new_end
new_config.start_str = new_config.start.strftime('%Y-%m-%d')
new_config.end_str = new_config.end.strftime('%Y-%m-%d')
split_list.append(new_config)
return split_list

View File

@@ -0,0 +1,83 @@
from youtube.youtube import Youtube
import queue
class YoutubeDBHelper:
pymysql = __import__('pymysql.cursors')
DB_DUMP_SIZE = 128
def __init__(self):
self.youtubes = []
self.buffer = []
self.queue = queue.Queue()
pass
def __del__(self):
self.flush()
pass
def get_param(self, keyword_id):
query = "select * from keyword where id = " + str(keyword_id)
params = []
try:
conn = self.pymysql.connect(host='bigbird.iptime.org',
user='admin', passwd='admin123',
db='concepters', charset='utf8',
cursorclass=self.pymysql.cursors.DictCursor)
with conn.cursor() as cursor:
cursor.execute(query)
params = cursor.fetchone()
except Exception as e:
print(e)
exit(1)
else:
conn.close()
return params
def flush(self):
local_buffer = []
while not self.queue.empty():
local_buffer.append(self.queue.get())
print('### db queue dump {}'.format(len(local_buffer)))
if len(local_buffer) > 0:
while True:
try:
conn = self.pymysql.connect(host='bigbird.iptime.org',
user='admin', passwd='admin123',
db='concepters', charset='utf8',
cursorclass=self.pymysql.cursors.DictCursor,
connect_timeout=5)
except Exception as e:
print(e)
continue
else:
break
try:
with conn.cursor() as cursor:
for youtube, _db_num in local_buffer:
if not youtube.is_reply:
query = youtube.get_delete_query(_db_num)
cursor.execute(query)
query = youtube.get_insert_query(conn, _db_num)
cursor.execute(query)
conn.commit()
except Exception as e:
print(e)
finally:
conn.close()
def insert_youtube(self, youtube: Youtube = None, db_num: int = -1, flush=False):
self.queue.put((youtube, db_num))
if self.queue.qsize() >= self.DB_DUMP_SIZE:
self.flush()

View File

@@ -0,0 +1,14 @@
from youtube.youtube import Youtube
from youtube.ytconfig import YoutubeConfig
import bs4
import datetime
import pytz
class YoutubeParser:
@staticmethod
def parse(tag, keyword_id, depth=0, top_yt: Youtube=None):
youtube = Youtube()
return youtube