diff --git a/.gitignore b/.gitignore index 1a8398e..6b77c80 100644 --- a/.gitignore +++ b/.gitignore @@ -5,3 +5,4 @@ WebBasedCrawler/proxy.txt clients-win/ clients-linux/ +**/*.log diff --git a/WebBasedCrawler/base/logger.py b/WebBasedCrawler/base/logger.py new file mode 100644 index 0000000..5ae9b62 --- /dev/null +++ b/WebBasedCrawler/base/logger.py @@ -0,0 +1,61 @@ +import logging +import logging.handlers +import enum +import datetime +import base.baseclasses +import threading + + +class CustomFormatter(logging.Formatter): + def format(self, record): + # msg_prefix = '[{}] [{}} [{}] '.format(self.formatTime(record, self.datefmt), threading.current_thread().ident, record.levelname) + # record.msg = msg_prefix + record.msg + # record.msg = '[%s] %s' % (threading.current_thread().ident, record.msg) + return super(CustomFormatter, self).format(record) + + +logger = logging.getLogger('mylogger') +# formatter = logging.Formatter('[ %(asctime)s][%(threadName)s][%(levelname)s][%(filename)s(%(lineno)s)] > %(message)s', datefmt='%Y-%m-%d %H:%M:%S') +# formatter = CustomFormatter('[ %(asctime)s][%(thread)s][%(levelname)s][%(pathname)s(%(lineno)s)]\n> %(message)s', datefmt='%Y-%m-%d %H:%M:%S') +formatter = CustomFormatter('', datefmt='%Y-%m-%d %H:%M:%S') + + +logging.handlers.RotatingFileHandler('crawler.log') + +file_handler = logging.FileHandler('{}.log'.format(datetime.datetime.now().strftime('%Y-%m-%d'))) +file_handler.setLevel(logging.DEBUG) +file_handler.setFormatter(formatter) + +debug_stream_handler = logging.StreamHandler() +debug_stream_handler.setLevel(logging.DEBUG) +debug_stream_handler.setFormatter(formatter) + +normal_stream_handler = logging.StreamHandler() +normal_stream_handler.setLevel(logging.INFO) + +logger.addHandler(file_handler) +# if base.baseclasses.is_debug: +# logger.addHandler(debug_stream_handler) +# else: +# logger.addHandler(normal_stream_handler) + + +class LogLevel(enum.Enum): + DEBUG = 1 + INFO = 2 + WARNING = 3 + ERROR = 4 + CRITICAL = 5 + + +def log(msg, level=LogLevel.INFO): + if level == LogLevel.DEBUG: + logger.debug(msg) + elif level == LogLevel.INFO: + logger.info(msg) + elif level == LogLevel.WARNING: + logger.warning(msg) + elif level == LogLevel.ERROR: + logger.error(msg) + elif level == LogLevel.CRITICAL: + logger.critical(msg) diff --git a/WebBasedCrawler/base/proxy2.py b/WebBasedCrawler/base/proxy2.py new file mode 100644 index 0000000..655d4bc --- /dev/null +++ b/WebBasedCrawler/base/proxy2.py @@ -0,0 +1,180 @@ +import base.proxy_crawler as proxy_crawler +import base.logger as logger + +import sqlalchemy +import sqlalchemy.ext +import sqlalchemy.ext.declarative +import sqlalchemy.orm + +import enum +import datetime +import threading +import random + + +Base = sqlalchemy.ext.declarative.declarative_base() + + +class Proxy2Model(Base): + __tablename__ = 'proxy2' + id = sqlalchemy.Column(sqlalchemy.Integer, primary_key=True, nullable=False, autoincrement=True) + ip = sqlalchemy.Column(sqlalchemy.String(15), primary_key=True) + port = sqlalchemy.Column(sqlalchemy.SmallInteger, primary_key=True) + create_at = sqlalchemy.Column(sqlalchemy.DateTime, default=datetime.datetime.now) + naver_block_at = sqlalchemy.Column(sqlalchemy.DateTime) + daum_block_at = sqlalchemy.Column(sqlalchemy.DateTime) + facebook_block_at = sqlalchemy.Column(sqlalchemy.DateTime) + kakao_block_at = sqlalchemy.Column(sqlalchemy.DateTime) + insta_block_at = sqlalchemy.Column(sqlalchemy.DateTime) + twitter_block_at = sqlalchemy.Column(sqlalchemy.DateTime) + youtube_block_at = sqlalchemy.Column(sqlalchemy.DateTime) + + def __init__(self, ip, port): + self.ip = ip + self.port = port + + def __repr__(self): + return '{}:{}'.format(self.ip, self.port) + + def get_instance_for_http(self): + return { + 'http': '{}:{}'.format(self.ip, self.port), + 'https': '{}:{}'.format(self.ip, self.port), + 'ip': self.ip, + 'port': self.port, + } + + +class Platform(enum.Enum): + NAVER = 'naver' + DAUM = 'daum' + FACEBOOK = 'facebook' + KAKAO = 'kakao' + INSTA = 'insta' + TWITTER = 'twitter' + YOUTUBE = 'youtube' + + +class Proxy2Handler: + block_field_map = { + Platform.NAVER: Proxy2Model.naver_block_at, + Platform.DAUM: Proxy2Model.daum_block_at, + Platform.FACEBOOK: Proxy2Model.facebook_block_at, + Platform.KAKAO: Proxy2Model.kakao_block_at, + Platform.INSTA: Proxy2Model.insta_block_at, + Platform.TWITTER: Proxy2Model.twitter_block_at, + Platform.YOUTUBE: Proxy2Model.youtube_block_at, + } + + def __init__(self): + self.lock = threading.Lock() + self.engine = sqlalchemy.create_engine('mysql+pymysql://admin:admin123@bigbird.iptime.org/concepters?charset=utf8') + session_factory = sqlalchemy.orm.sessionmaker(bind=self.engine) + self.session = sqlalchemy.orm.scoped_session(session_factory) + + def lock_enter(self): + # logger.log('lock {}'.format(threading.current_thread().ident)) + # self.lock.acquire() + pass + + def lock_leave(self): + # self.lock.release() + # logger.log('unlock {}'.format(threading.current_thread().ident)) + pass + + def commit(self): + self.lock_enter() + self.session.commit() + self.lock_leave() + + def get_oldest(self, platform): + self.lock_enter() + instance = self.session.query(Proxy2Model).order_by(self.block_field_map[platform].desc()).first() + self.lock_leave() + return instance + + # def get(self, platform): + # proxy = self.session.query(Proxy2Model).filter(self.block_field_map[platform] == None).first() + # if not proxy: + # proxy_crawler.crawl_proxies() + # + # proxy = self.get_oldest(platform) + # + # return proxy + + def get_query(self, ip, port): + return self.session.query(Proxy2Model).filter_by(ip=ip).filter_by(port=port) + + def get_instance(self, ip, port): + self.lock_enter() + instance = self.get_query(ip, port).first() + self.lock_leave() + return instance + + def get(self, platform, proc_id=-1): + self.lock_enter() + + block_column = self.block_field_map[platform] + try: + instances = self.session.query(Proxy2Model).filter(block_column == None).all() + except Exception as e: + self.lock_leave() + + try: + session_factory = sqlalchemy.orm.sessionmaker(bind=self.engine) + self.session = sqlalchemy.orm.scoped_session(session_factory) + except Exception as e2: + logger.log('{} session recreate'.format(proc_id)) + + return None + + instance = instances[random.randint(0, len(instances)-1)] if len(instances) > 0 else None + if instance: + self.lock_leave() + return instance.get_instance_for_http() + else: + proxies = proxy_crawler.crawl_proxies() + self.insert_all(proxies) + self.unlock() + return self.get(platform, proc_id) + + def insert(self, ip, port): + instance = self.get_instance(ip, port) + if not instance: + proxy = Proxy2Model(ip, port) + self.lock_enter() + self.session.add(proxy) + self.lock_leave() + + def insert_all(self, proxies): + + # INSERT INTO proxy2(ip, PORT) + # SELECT , FROM DUAL + # WHERE NOT EXISTS (SELECT * FROM proxy2 WHERE ip= AND PORT=) + + instances_add = [] + for proxy in proxies: + instance = self.get_instance(proxy['ip'], proxy['port']) + if not instance: + instances_add.append(Proxy2Model(proxy['ip'], proxy['port'])) + self.session.bulk_save_objects(instances_add) + self.commit() + + def set_proxy_blocked(self, ip, port, platform): + block_column = self.block_field_map[platform] + query = self.get_query(ip, port).filter(block_column == None) + query.update({block_column: datetime.datetime.now()}) + self.commit() + +if __name__ == '__main__': + proxy_handler = Proxy2Handler() + # proxy_handler.insert('127.0.0.5', 80) + # proxy_handler.commit() + + # proxy_handler.set_proxy_blocked('127.0.0.3', 80, Platform.TWITTER) + + # instance = proxy_handler.get(Platform.TWITTER) + # instance = proxy_handler.get_oldest(Platform.TWITTER) + # print(instance) + + proxy = proxy_handler.get(Platform.TWITTER) diff --git a/WebBasedCrawler/base/proxy_crawler.py b/WebBasedCrawler/base/proxy_crawler.py index 442bdd0..dba1aca 100644 --- a/WebBasedCrawler/base/proxy_crawler.py +++ b/WebBasedCrawler/base/proxy_crawler.py @@ -24,7 +24,10 @@ def get_proxies_free_proxy(): if len(tds) > 0: ip = tds[0].text port = tds[1].text - proxies.append('{}:{}'.format(ip, port)) + proxies.append({ + 'ip': ip, + 'port': int(port), + }) return proxies @@ -40,8 +43,11 @@ def get_proxies_proxy_searcher(): for tr in trs: tds = tr.select('td') if len(tds) > 0: - proxy = tds[1].text - proxies.append(proxy) + tokens = tds[1].text.split(':') + proxies.append({ + 'ip': tokens[0], + 'port': int(tokens[1]), + }) return proxies @@ -64,8 +70,8 @@ def get_proxies_proxy_searcher(): def check_proxy(qu, proxy, url): proxy_dict = { - 'http': proxy, - 'https': proxy, + 'http': '{}:{}'.format(proxy['ip'], proxy['port']), + 'https': '{}:{}'.format(proxy['ip'], proxy['port']), } try: resp = requests.get(url, proxies=proxy_dict, timeout=2) @@ -84,7 +90,8 @@ def crawl_proxies(check_url=None): proxies = get_proxies_free_proxy() proxies += get_proxies_proxy_searcher() # proxies += get_proxies_nntime() - proxies = list(set(proxies)) + # proxies = list(set(proxies)) + print('proxy crawled {}'.format(len(proxies))) if check_url: qu = queue.Queue() @@ -104,17 +111,20 @@ def crawl_proxies(check_url=None): else: proxies_alive = proxies - proxies_alive.sort() - print('proxy crawler got {} proxies'.format(len(proxies_alive))) - - with open('proxy.txt', 'w') as f: - print('proxy crawler dump start') - for proxy in proxies_alive: - # print(proxy) - f.write(proxy + '\n') - print('proxy crawler dump end') - print('proxy crawling end') + return proxies_alive + + # proxies_alive.sort() + # print('proxy crawler got {} proxies'.format(len(proxies_alive))) + # + # with open('proxy.txt', 'w') as f: + # print('proxy crawler dump start') + # for proxy in proxies_alive: + # # print(proxy) + # f.write(proxy + '\n') + # print('proxy crawler dump end') + # + # print('proxy crawling end') if __name__ == '__main__': diff --git a/WebBasedCrawler/insta/instacrawl.py b/WebBasedCrawler/insta/instacrawl.py index 248012f..a6b0983 100644 --- a/WebBasedCrawler/insta/instacrawl.py +++ b/WebBasedCrawler/insta/instacrawl.py @@ -346,6 +346,7 @@ def crawl_content_process(qu, keyword_id, db_num): try: # get a instance of InstaContent by do_no_proxy func. # if element['url'] is invalid, content is None + element['url'] = 'https://www.instagram.com/p/BWrBng6l9H3/' content = m_c_i.do_no_proxy(element['url']) if not content: break @@ -359,6 +360,7 @@ def crawl_content_process(qu, keyword_id, db_num): printl("proxies = ", content.proxies) m_c_i.change_proxy() raise Exception("reply load error") + #if rep: replies = rep + replies wait(reply_wait_sec) for j in range(0, len(replies)): diff --git a/WebBasedCrawler/requirements.txt b/WebBasedCrawler/requirements.txt index 2d69340..0f90a49 100644 --- a/WebBasedCrawler/requirements.txt +++ b/WebBasedCrawler/requirements.txt @@ -5,3 +5,4 @@ eventlet requests bs4 pytz +sqlalchemy=1.1.13 diff --git a/WebBasedCrawler/twitter/twittercrawl.py b/WebBasedCrawler/twitter/twittercrawl.py index 2ea7937..78fd2c6 100644 --- a/WebBasedCrawler/twitter/twittercrawl.py +++ b/WebBasedCrawler/twitter/twittercrawl.py @@ -4,6 +4,7 @@ from twitter.tweet import Tweet from twitter.twparser import TweetParser import base.proxy +import base.proxy2 as proxy2 import base.baseclasses import requests @@ -22,7 +23,9 @@ class TwitterCrawler: self.default_config = TwitterConfig() self.db_helper = TwitterDBHelper() self.proxy = {} + self.proxy_handler = proxy2.Proxy2Handler() self.before_day = None + self.runner_finished_queue = queue.Queue() def set_arguments(self, browser, keyword_id, db_num, before_day, until_page): params = self.db_helper.get_param(keyword_id) @@ -53,6 +56,14 @@ class TwitterCrawler: url_tupple = (TwitterConfig.protocol, TwitterConfig.top_url, sub_url, '', urllib.parse.urlencode(params), '') return urllib.parse.urlunparse(url_tupple) + def get_proxy(self, proxy_key): + proxy = None + while not proxy: + proxy = self.proxy_handler.get(proxy2.Platform.TWITTER, proxy_key) + time.sleep(1) + + return proxy + def get_page(self, url, is_runner, proc_id): headers = { 'user-agent': 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/46.0.2490.86 Safari/537.36', @@ -60,7 +71,8 @@ class TwitterCrawler: } proxy_key = '{}-{}'.format('runner' if is_runner else 'content', proc_id) if proxy_key not in self.proxy: - self.proxy[proxy_key] = base.proxy.get_proxy_for_requests() + # self.proxy[proxy_key] = base.proxy.get_proxy_for_requests() + self.proxy[proxy_key] = self.get_proxy(proxy_key) resp = None while True: @@ -70,9 +82,11 @@ class TwitterCrawler: if self.proxy[proxy_key] == (None, None): break - print('[{}] proxy {} is expired. ({})'.format(proc_id, self.proxy[proxy_key], e)) - base.proxy.set_proxy_expired(self.proxy[proxy_key]) - self.proxy[proxy_key] = base.proxy.get_proxy_for_requests() + # print('[{}] proxy {} is expired. ({})'.format(proc_id, self.proxy[proxy_key], e)) + # base.proxy.set_proxy_expired(self.proxy[proxy_key]) + self.proxy_handler.set_proxy_blocked(self.proxy[proxy_key]['ip'], self.proxy[proxy_key]['port'], proxy2.Platform.TWITTER) + # self.proxy[proxy_key] = base.proxy.get_proxy_for_requests() + self.proxy[proxy_key] = self.get_proxy(proxy_key) else: break @@ -96,7 +110,8 @@ class TwitterCrawler: j = json.loads(resp.text) if j['new_latent_count'] <= 0: proxy_key = '{}-{}'.format('runner' if is_runner else 'content', proc_id) - self.proxy[proxy_key] = base.proxy.get_proxy_for_requests() + # self.proxy[proxy_key] = base.proxy.get_proxy_for_requests() + self.proxy[proxy_key] = self.get_proxy(proxy_key) continue else: return j @@ -165,14 +180,13 @@ class TwitterCrawler: # print(' [{}] pool insert: {} ({})'.format(proc_id, tweet.text[:20] if tweet.text else '', tweet.tweet_link)) qu.put((tweet, tweet_top,)) - @staticmethod - def get_content(content_queue): + def get_content(self, content_queue): sleep_time = time.time() while True: try: parent_tw, top_tw, = content_queue.get(block=True, timeout=2) except Exception as e: - if time.time()-sleep_time > 15: + if not self.runner_finished_queue.empty() and time.time()-sleep_time > 15: break else: continue @@ -281,7 +295,7 @@ class TwitterCrawler: start_time = time.time() # run - worker_count = 16 + worker_count = 4 split_config = self.default_config.split() content_qu = queue.Queue() @@ -298,11 +312,15 @@ class TwitterCrawler: [runner_pool.submit(self.runner_proc, proc_id, content_qu, runner_result_qu, config) for proc_id, config in enumerate(split_config)] runner_pool.shutdown(wait=True) + self.runner_finished_queue.put(True) content_pool.shutdown(wait=True) self.db_helper.flush() # rerun zero runners print('restart failed runner') + while not self.runner_finished_queue.empty(): + self.runner_finished_queue.get() + for retry in range(5): runner_pool = concurrent.futures.ThreadPoolExecutor(max_workers=worker_count) runner_result_qu2 = queue.Queue() @@ -324,6 +342,7 @@ class TwitterCrawler: [content_pool.submit(self.content_proc, proc_id, content_qu, content_result_qu) for proc_id in range(worker_count)] runner_pool.shutdown(wait=True) + self.runner_finished_queue.put(True) content_pool.shutdown(wait=True) self.db_helper.flush() diff --git a/WebBasedCrawler/webbasedcrawler.py b/WebBasedCrawler/webbasedcrawler.py index 44a0853..d7ad90e 100644 --- a/WebBasedCrawler/webbasedcrawler.py +++ b/WebBasedCrawler/webbasedcrawler.py @@ -12,7 +12,7 @@ from naver import navercrawl from facebook import facebookcrawl from facebook import facebookcrawlbs from twitter import twittercrawl -from youtube import youtubecrawl +# from youtube import youtubecrawl from base.baseclasses import print_and_flush diff --git a/WebBasedCrawler/youtube/youtube.py b/WebBasedCrawler/youtube/youtube.py new file mode 100644 index 0000000..84419ae --- /dev/null +++ b/WebBasedCrawler/youtube/youtube.py @@ -0,0 +1,21 @@ +from base.dbdata import DataDBRow + + +class Youtube(DataDBRow): + + def __init__(self): + super(self.__class__, self).__init__() + + self.user_id = None + self.user_name = None + self.text = None + self.created_at = None + self.favorites = 0 + + self.is_reply = False + self.reply_cnt = 0 + self.retweet_cnt = 0 + self.favorite_cnt = 0 + self.top_link = None + + self.depth = 0 diff --git a/WebBasedCrawler/youtube/youtubecrawl.py b/WebBasedCrawler/youtube/youtubecrawl.py index bb34752..0ca7d8e 100644 --- a/WebBasedCrawler/youtube/youtubecrawl.py +++ b/WebBasedCrawler/youtube/youtubecrawl.py @@ -1,7 +1,301 @@ +from youtube.ytconfig import YoutubeConfig +from youtube.ytdbhelper import YoutubeDBHelper +from youtube.youtube import Youtube +from youtube.ytparser import YoutubeParser + +import base.proxy +import base.baseclasses + +import requests +import bs4 +import json +import urllib +import concurrent.futures +import threading +import queue +import time + + +class YoutubeCrawler: -class YoutubeMainCrawl: def __init__(self): - pass + self.default_config = YoutubeConfig() + self.db_helper = YoutubeDBHelper() + self.proxy = {} + self.before_day = None + + def set_arguments(self, browser, keyword_id, db_num, before_day, until_page): + params = self.db_helper.get_param(keyword_id) + self.before_day = before_day + self.default_config.set_param(keyword_id, db_num, params) + + @staticmethod + def get_timeline_url(query, start_str, end_str, max_position=''): + params = { + 'sp': 'CABQFA==', # 날짜순 + 'q': query, + } + + url_tupple = (YoutubeConfig.protocol, YoutubeConfig.top_url, YoutubeConfig.search_url, '', urllib.parse.urlencode(params), '') + return urllib.parse.urlunparse(url_tupple) + + @staticmethod + def get_content_url(user_id, tweet_id, max_position=''): + params = { + 'max_position': max_position, + } + + sub_url = TwitterConfig.conversation_url_form.format(user_id, tweet_id) + url_tupple = (TwitterConfig.protocol, TwitterConfig.top_url, sub_url, '', urllib.parse.urlencode(params), '') + return urllib.parse.urlunparse(url_tupple) + + def get_page(self, url, is_runner, proc_id): + headers = { + 'user-agent': 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/46.0.2490.86 Safari/537.36', + 'Accept-Language': 'ko-KR,ko;q=0.8,en-US;q=0.6,en;q=0.4', + } + proxy_key = '{}-{}'.format('runner' if is_runner else 'content', proc_id) + if proxy_key not in self.proxy: + self.proxy[proxy_key] = base.proxy.get_proxy_for_requests() + + resp = None + while True: + try: + resp = requests.get(url, headers=headers, proxies=self.proxy[proxy_key], timeout=3) + except Exception as e: + if self.proxy[proxy_key] == (None, None): + break + + # print('[{}] proxy {} is expired. ({})'.format(proc_id, self.proxy[proxy_key], e)) + base.proxy.set_proxy_expired(self.proxy[proxy_key]) + self.proxy[proxy_key] = base.proxy.get_proxy_for_requests() + else: + break + + return resp + + def get_page_data(self, url, is_runner, proc_id): + for retry_cnt in range(5): + # get response + resp = self.get_page(url, is_runner, proc_id) + if not resp: + break + + # check response + if resp.status_code == 404: + break + elif resp.status_code != 200: + print('[WARNING] content_get code {}'.format(resp.status_code)) + continue + + # parsing result + j = json.loads(resp.text) + if j['new_latent_count'] <= 0: + proxy_key = '{}-{}'.format('runner' if is_runner else 'content', proc_id) + self.proxy[proxy_key] = base.proxy.get_proxy_for_requests() + continue + else: + return j + + return { + 'items_html': '', + 'has_more_items': False, + } + + def runner_proc(self, proc_id, content_queue, result_queue, config): + print('[{}] {} to {} runner thread start'.format(proc_id, config.start_str, config.end_str)) + + b_continue = True + min_tweet_id = None + max_tweet_id = None + max_position = '' + tweet_count = 0 + + while b_continue: + url = self.get_timeline_url(config.keywords[0], config.start_str, config.end_str, max_position) + j = self.get_page_data(url, True, proc_id) + soup = bs4.BeautifulSoup(j['items_html'], 'lxml') + tweet_tags = soup.select("div.tweet") + + tweet_ids = [] + for tw in tweet_tags: + tweet = TweetParser.parse(tw, config.keyword_id) + tweet_ids.append(tweet.tweet_id) + + if tweet.is_reply is True: + # print(' ## {}: {}...'.format(tweet.user_name, tweet.text[:20])) + continue + + if tweet.reply_cnt > 0: + self.insert_content_pool(proc_id, content_queue, tweet, tweet) + self.db_helper.insert_tweet(tweet, config.db_num) + + # print('{} {}: {}...'.format(tweet.created_at, tweet.user_name, tweet.text[:20])) + print('[{}] body {} ({}) [{}]'.format(proc_id, tweet.top_link, tweet.created_at, 'ok')) + + count = len(tweet_tags) + tweet_count += count + + b_continue = count > 0 + # b_continue = j['has_more_items'] + if b_continue: + if min_tweet_id is None: + min_tweet_id = tweet_ids[0] + max_tweet_id = tweet_ids[-1] + + if 'min_position' in j: + max_position = j['min_position'] + else: + max_position = 'TWEET-{}-{}'.format(max_tweet_id, min_tweet_id) + + print('[{}] {} to {} runner thread finished {}'.format(proc_id, config.start_str, config.end_str, tweet_count)) + result_queue.put({ + 'proc_id': proc_id, + 'count': tweet_count, + }) + # self.runner_processing[proc_id].value = False + return proc_id, tweet_count, + + @staticmethod + def insert_content_pool(proc_id: int, qu, tweet: Youtube, tweet_top: Youtube): + # print(' [{}] pool insert: {} ({})'.format(proc_id, tweet.text[:20] if tweet.text else '', tweet.tweet_link)) + qu.put((tweet, tweet_top,)) + + @staticmethod + def get_content(content_queue): + sleep_time = time.time() + while True: + try: + parent_tw, top_tw, = content_queue.get(block=True, timeout=2) + except Exception as e: + if time.time()-sleep_time > 15: + break + else: + continue + else: + return parent_tw, top_tw, + + return None, None, + + def content_proc(self, proc_id, content_queue, result_queue): + # print('[{}] content thread start'.format(proc_id)) + # + # tweet_count = 0 + # while True: + # parent_tw, top_tw, = self.get_content(content_queue) + # if not parent_tw: + # break + # + # # print(' [{}] <<< parent : {} ({})'.format(proc_id, parent_tw.text[:20], parent_tw.tweet_link)) + # + # max_position = '' + # + # b_continue = True + # while b_continue: + # url = self.get_content_url(parent_tw.user_id, parent_tw.tweet_id, max_position) + # j = self.get_page_data(url, False, proc_id) + # soup = bs4.BeautifulSoup(j['items_html'], 'lxml') + # + # reply_container_tags = soup.select('li.ThreadedConversation') + # reply_container_tags += TweetParser.get_lone_container(soup, parent_tw) + # for container_tags in reply_container_tags: + # tweet_tags = container_tags.select('div.tweet') + # if len(tweet_tags) > 0: + # tweet = TweetParser.parse(tweet_tags[0], self.default_config.keyword_id, parent_tw.depth+1, top_tw) + # # print('[{}]>>> {} {}: {} ({}) ({})'.format(proc_id, tweet.created_at, tweet.user_name, tweet.text[:20], tweet.depth, tweet.tweet_link)) + # print('[{}] reply {} [{}]'.format(proc_id, tweet.top_link, 'ok')) + # self.insert_content_pool(proc_id, content_queue, tweet, top_tw) + # self.db_helper.insert_tweet(tweet, self.default_config.db_num) + # tweet_count += 1 + # + # b_continue = j['has_more_items'] + # if b_continue: + # max_position = j['min_position'] + # + # result_queue.put({ + # 'proc_id': proc_id, + # 'count': tweet_count, + # }) + # + # print('[{}] content thread finished'.format(proc_id)) + tweet_count = 0 + return proc_id, tweet_count, + + def run(self): + start_time = time.time() + + # run + worker_count = 1 + split_config = self.default_config.split() + + content_qu = queue.Queue() + runner_result_qu = queue.Queue() + content_result_qu = queue.Queue() + + runner_result_cnt = 0 + content_result_cnt = 0 + + content_pool = concurrent.futures.ThreadPoolExecutor(max_workers=worker_count) + [content_pool.submit(self.content_proc, proc_id, content_qu, content_result_qu) for proc_id in range(worker_count)] + + runner_pool = concurrent.futures.ThreadPoolExecutor(max_workers=worker_count) + [runner_pool.submit(self.runner_proc, proc_id, content_qu, runner_result_qu, config) for proc_id, config in enumerate(split_config)] + + runner_pool.shutdown(wait=True) + content_pool.shutdown(wait=True) + self.db_helper.flush() + + # rerun zero runners + print('restart failed runner') + for retry in range(5): + runner_pool = concurrent.futures.ThreadPoolExecutor(max_workers=worker_count) + runner_result_qu2 = queue.Queue() + b_rerun = False + while not runner_result_qu.empty(): + res = runner_result_qu.get() + runner_result_cnt += res['count'] + proc_id = res['proc_id'] + if res['count'] == 0: + runner_pool.submit(self.runner_proc, proc_id, content_qu, runner_result_qu2, split_config[proc_id]) + b_rerun = True + + while not content_result_qu.empty(): + res = content_result_qu.get() + content_result_cnt += res['count'] + + if b_rerun: + content_pool = concurrent.futures.ThreadPoolExecutor(max_workers=worker_count) + [content_pool.submit(self.content_proc, proc_id, content_qu, content_result_qu) for proc_id in range(worker_count)] + + runner_pool.shutdown(wait=True) + content_pool.shutdown(wait=True) + self.db_helper.flush() + + runner_result_qu = runner_result_qu2 + + while not runner_result_qu.empty(): + res = runner_result_qu.get() + runner_result_cnt += res['count'] + + while not content_result_qu.empty(): + res = content_result_qu.get() + content_result_cnt += res['count'] + + print('total body count: {}'.format(runner_result_cnt)) + print('total reply count: {}'.format(content_result_cnt)) + + # print running time + delta = time.time() - start_time + m, s = divmod(delta, 60) + h, m = divmod(m, 60) + print("finished all {}:{:02d}:{:02d} ".format(int(h), int(m), int(s))) def start(self): - pass \ No newline at end of file + + # run + while True: + self.default_config.reload_realtime(self.before_day) + self.run() + + if not self.default_config.realtime: + break diff --git a/WebBasedCrawler/youtube/ytconfig.py b/WebBasedCrawler/youtube/ytconfig.py new file mode 100644 index 0000000..31fee1c --- /dev/null +++ b/WebBasedCrawler/youtube/ytconfig.py @@ -0,0 +1,71 @@ +import datetime +import copy + + +class YoutubeConfig: + protocol = 'https' + top_url = 'youtube.com' + search_url = '/i/search/timeline' + conversation_url_form = '/i/{}/conversation/{}' + + def __init__(self): + self.keyword_id = -1 + self.db_num = -1 + + self.id = 0 + self.realtime = False + self.keywords = [] + self.start_str = None + self.start = None + self.end_str = None + self.end = None + self.authorship = None + self.state = None + self.platform = None + + def set_param(self, keyword_id, db_num, params): + self.keyword_id = int(keyword_id) + self.db_num = int(db_num) + + self.id = int(params['id']) + self.realtime = params['realtime'] == 1 + + self.keywords = [] + for keyword in params['searches'].split(','): + self.keywords.append(keyword.strip()) + + self.start_str = str(params['start']) + self.end_str = str(params['end']) + self.start = datetime.datetime.strptime(self.start_str, '%Y-%m-%d') + self.end = datetime.datetime.strptime(self.end_str, '%Y-%m-%d') + + self.authorship = params['authorship'] + self.state = params['state'] + self.platform = params['platform'] + + def reload_realtime(self, before_day): + if not self.realtime: + return + + self.end_str = datetime.datetime.strftime(datetime.datetime.now(), '%Y-%m-%d') + self.end = datetime.datetime.strptime(self.end_str, '%Y-%m-%d') + self.start = self.end + datetime.timedelta(days=int(before_day)) + self.start_str = datetime.datetime.strftime(self.start, '%Y-%m-%d') + + def split(self): + split_list = [] + new_end = self.end + + while new_end > self.start: + new_config = copy.deepcopy(self) + + new_config.end = new_end + new_end = new_end + datetime.timedelta(days=-1) + new_config.start = new_end + + new_config.start_str = new_config.start.strftime('%Y-%m-%d') + new_config.end_str = new_config.end.strftime('%Y-%m-%d') + + split_list.append(new_config) + + return split_list diff --git a/WebBasedCrawler/youtube/ytdbhelper.py b/WebBasedCrawler/youtube/ytdbhelper.py new file mode 100644 index 0000000..6595983 --- /dev/null +++ b/WebBasedCrawler/youtube/ytdbhelper.py @@ -0,0 +1,83 @@ +from youtube.youtube import Youtube +import queue + + +class YoutubeDBHelper: + pymysql = __import__('pymysql.cursors') + DB_DUMP_SIZE = 128 + + def __init__(self): + self.youtubes = [] + self.buffer = [] + self.queue = queue.Queue() + pass + + def __del__(self): + self.flush() + pass + + def get_param(self, keyword_id): + query = "select * from keyword where id = " + str(keyword_id) + params = [] + try: + conn = self.pymysql.connect(host='bigbird.iptime.org', + user='admin', passwd='admin123', + db='concepters', charset='utf8', + cursorclass=self.pymysql.cursors.DictCursor) + + with conn.cursor() as cursor: + cursor.execute(query) + params = cursor.fetchone() + + except Exception as e: + print(e) + exit(1) + + else: + conn.close() + + return params + + def flush(self): + local_buffer = [] + while not self.queue.empty(): + local_buffer.append(self.queue.get()) + + print('### db queue dump {}'.format(len(local_buffer))) + + if len(local_buffer) > 0: + while True: + try: + conn = self.pymysql.connect(host='bigbird.iptime.org', + user='admin', passwd='admin123', + db='concepters', charset='utf8', + cursorclass=self.pymysql.cursors.DictCursor, + connect_timeout=5) + + except Exception as e: + print(e) + continue + + else: + break + + try: + with conn.cursor() as cursor: + for youtube, _db_num in local_buffer: + if not youtube.is_reply: + query = youtube.get_delete_query(_db_num) + cursor.execute(query) + query = youtube.get_insert_query(conn, _db_num) + cursor.execute(query) + conn.commit() + + except Exception as e: + print(e) + + finally: + conn.close() + + def insert_youtube(self, youtube: Youtube = None, db_num: int = -1, flush=False): + self.queue.put((youtube, db_num)) + if self.queue.qsize() >= self.DB_DUMP_SIZE: + self.flush() diff --git a/WebBasedCrawler/youtube/ytparser.py b/WebBasedCrawler/youtube/ytparser.py new file mode 100644 index 0000000..8b88681 --- /dev/null +++ b/WebBasedCrawler/youtube/ytparser.py @@ -0,0 +1,14 @@ +from youtube.youtube import Youtube +from youtube.ytconfig import YoutubeConfig + +import bs4 +import datetime +import pytz + + +class YoutubeParser: + + @staticmethod + def parse(tag, keyword_id, depth=0, top_yt: Youtube=None): + youtube = Youtube() + return youtube