From de8f2f4c23427979cd8cd91ea93ffed13897db6c Mon Sep 17 00:00:00 2001 From: mjjo Date: Tue, 1 Aug 2017 10:39:03 +0900 Subject: [PATCH] =?UTF-8?q?=ED=8A=B8=EC=9C=84=ED=84=B0=20=ED=81=AC?= =?UTF-8?q?=EB=A1=A4=EB=9F=AC=20=EC=88=98=EC=A0=95=20-=20proxy.txt?= =?UTF-8?q?=EC=97=90=20=EC=9E=88=EB=8A=94=20ip=20=EB=AA=A8=EB=91=90=20?= =?UTF-8?q?=EC=86=8C=EC=A7=84=ED=95=98=EB=A9=B4=20=EC=83=88=EB=A1=AD?= =?UTF-8?q?=EA=B2=8C=20=EA=B0=80=EC=A0=B8=EC=99=80=EC=84=9C=20=EC=B1=84?= =?UTF-8?q?=EC=9A=B0=EB=8A=94=20=EA=B8=B0=EB=8A=A5=20-=20db=EC=97=90=20?= =?UTF-8?q?=EB=84=A3=EC=9D=84=20=EB=95=8C=20128=EA=B0=9C=20=EB=AA=A8?= =?UTF-8?q?=EC=95=84=EC=84=9C=20=ED=95=9C=EB=B2=88=EC=97=90=20=EB=84=A3?= =?UTF-8?q?=EB=8A=94=20=EA=B8=B0=EB=8A=A5=20-=20concurrent.future.ThreadPo?= =?UTF-8?q?olExecutor=20=EC=82=AC=EC=9A=A9=20-=20qt=EC=97=90=EC=84=9C=20?= =?UTF-8?q?=EB=A1=9C=EA=B7=B8=20=EB=9D=BC=EC=9D=B8=EB=B3=84=EB=A1=9C=20?= =?UTF-8?q?=EC=9D=BD=EC=96=B4=EC=84=9C=20=EC=B6=9C=EB=A0=A5=20-=20?= =?UTF-8?q?=EB=A1=9C=EA=B7=B8=20256=EA=B0=9C=EC=94=A9=20=ED=95=9C=EB=B2=88?= =?UTF-8?q?=EC=97=90=20=EC=82=AD=EC=A0=9C?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- CrawlerList/stwittertagmanage.cpp | 42 ++++++-- CrawlerList/widget.cpp | 9 +- WebBasedCrawler/base/proxy.py | 19 ++-- WebBasedCrawler/base/proxy_crawler.py | 133 ++++++++++++++++++++++++ WebBasedCrawler/twitter/twdbhelper.py | 29 +++--- WebBasedCrawler/twitter/twittercrawl.py | 85 ++++++++------- package-clients.bat | 1 + 7 files changed, 246 insertions(+), 72 deletions(-) create mode 100644 WebBasedCrawler/base/proxy_crawler.py diff --git a/CrawlerList/stwittertagmanage.cpp b/CrawlerList/stwittertagmanage.cpp index 0074828..21abc4a 100644 --- a/CrawlerList/stwittertagmanage.cpp +++ b/CrawlerList/stwittertagmanage.cpp @@ -56,19 +56,41 @@ void STwitterTagManage::processFinished(QProcess *pPro, QString _strOut) void STwitterTagManage::readStandardOutput() { + if(!m_pMain) + return; + + static bool bRunning = false; + if(bRunning) + return; + + bRunning = true; QProcess *pPro = (QProcess*)sender(); - QThread::msleep(100); - QString str = pPro->readAllStandardOutput(); - QStringList list = str.split("\n", QString::SkipEmptyParts); - foreach(QString log,list) + QStringList lines; + + while(pPro->canReadLine()) { - if (m_pMain) - { - m_pMain->InsertLog(log); - } - else - exit(0); + QString line = pPro->readLine(); + lines.append(line.simplified()); } + + foreach(QString line, lines) + m_pMain->InsertLog(line); + + bRunning = false; + + +// QThread::msleep(100); +// QString str = pPro->readAllStandardOutput(); +// QStringList list = str.split("\n", QString::SkipEmptyParts); +// foreach(QString log,list) +// { +// if (m_pMain) +// { +// m_pMain->InsertLog(log); +// } +// else +// exit(0); +// } } void STwitterTagManage::readStandardError() diff --git a/CrawlerList/widget.cpp b/CrawlerList/widget.cpp index 18ebc1f..c8abdee 100644 --- a/CrawlerList/widget.cpp +++ b/CrawlerList/widget.cpp @@ -186,9 +186,12 @@ void Widget::InsertLog(QString str) if (m_pResultList->count() > 1024) { - m_pResultList->removeItemWidget(m_pResultList->item(0)); - QListWidgetItem* item = m_pResultList->takeItem(0); - delete item; + for(int i=0; i<256; i++) + { + m_pResultList->removeItemWidget(m_pResultList->item(0)); + QListWidgetItem* item = m_pResultList->takeItem(0); + delete item; + } } m_pResultList->setCurrentRow( m_pResultList->count() - 1 ); m_pResultList->repaint(); diff --git a/WebBasedCrawler/base/proxy.py b/WebBasedCrawler/base/proxy.py index 448fd1f..a88e7a1 100644 --- a/WebBasedCrawler/base/proxy.py +++ b/WebBasedCrawler/base/proxy.py @@ -4,6 +4,7 @@ import pymysql import os from selenium import webdriver import sys +import base.proxy_crawler proxy_filename = 'proxy.txt' re_ip = re.compile('([\d]{1,3}\.[\d]{1,3}\.[\d]{1,3}\.[\d]{1,3})[^\d]([\d]{2,5})') @@ -126,14 +127,20 @@ def set_proxy_expired(proxy): f.writelines(lines) -def get_proxy_from_file(filename): +def get_proxy_from_file(filename, check_url=None): """ :param filename: + :param check_url: valid check url :return (ip, port): string, string if ip, port or filename is invalid, return (None, None) """ proxy_lists = [line.replace('\n', '') for line in open(filename) if not line.strip().startswith('#') and re_ip.search(line)] - if proxy_lists: + + if len(proxy_lists) <= 0: + base.proxy_crawler.crawl_proxies(check_url) + proxy_lists = [line.replace('\n', '') for line in open(filename) if not line.strip().startswith('#') and re_ip.search(line)] + + if len(proxy_lists) > 0: m = re_ip.search(proxy_lists[random.randint(0, len(proxy_lists) - 1)]) if m: return m.group(1), m.group(2) @@ -156,9 +163,9 @@ def get_proxy_from_db(): return None, None -def get_proxy(): +def get_proxy(check_url=None): if os.path.exists(proxy_filename) and os.path.isfile(proxy_filename): - ip, port = get_proxy_from_file(proxy_filename) + ip, port = get_proxy_from_file(proxy_filename, check_url) if not ip or not port: return get_proxy_from_db() else: @@ -174,6 +181,6 @@ def get_requests_proxy(proxies): } -def get_proxy_for_requests(): - ip, port = get_proxy() +def get_proxy_for_requests(check_url=None): + ip, port = get_proxy(check_url) return get_requests_proxy(ip + ":" + port) diff --git a/WebBasedCrawler/base/proxy_crawler.py b/WebBasedCrawler/base/proxy_crawler.py new file mode 100644 index 0000000..8c2db82 --- /dev/null +++ b/WebBasedCrawler/base/proxy_crawler.py @@ -0,0 +1,133 @@ +import requests +import bs4 +import subprocess as sp +import sys +import queue +import threading + + +def get_page(url): + resp = requests.get(url) + return resp + + +def get_proxies_free_proxy(): + proxies = [] + + resp = get_page('https://free-proxy-list.net') + soup = bs4.BeautifulSoup(resp.text, 'lxml') + table = soup.select('table.table') + trs = table[0].select('tr') + + for tr in trs: + tds = tr.select('td') + if len(tds) > 0: + ip = tds[0].text + port = tds[1].text + proxies.append('{}:{}'.format(ip, port)) + + return proxies + + +def get_proxies_proxy_searcher(): + proxies = [] + + resp = get_page('http://proxysearcher.sourceforge.net/Proxy List.php') + soup = bs4.BeautifulSoup(resp.text, 'lxml') + table = soup.select('table.tablesorter') + trs = table[0].select('tr') + + for tr in trs: + tds = tr.select('td') + if len(tds) > 0: + proxy = tds[1].text + proxies.append(proxy) + + return proxies + +# def get_proxies_nntime(): +# proxies = [] +# +# resp = get_page('http://nntime.com/') +# soup = bs4.BeautifulSoup(resp.text, 'lxml') +# table = soup.select('table.data') +# trs = table[0].select('tr') +# +# for tr in trs[1:]: +# tds = tr.select('td') +# if len(tds) > 0: +# proxy = tds[1].text +# proxies.append(proxy) +# +# return proxies + + +def check_proxy(qu, proxy, url): + proxy_dict = { + 'http': proxy, + 'https': proxy, + } + try: + resp = requests.get(url, proxies=proxy_dict, timeout=2) + except Exception as e: + return False + else: + if resp.status_code != 200: + return False + + qu.put(proxy) + return True + + +def crawl_proxies(check_url=None): + print('proxy crawling start') + proxies = get_proxies_free_proxy() + proxies += get_proxies_proxy_searcher() + # proxies += get_proxies_nntime() + proxies = list(set(proxies)) + + if check_url: + qu = queue.Queue() + threads = [] + for proxy in proxies: + th = threading.Thread(target=check_proxy, args=(qu, proxy, check_url)) + threads.append(th) + + [th.start() for th in threads] + [th.join() for th in threads] + + proxies_alive = [] + while not qu.empty(): + proxy = qu.get() + proxies_alive.append(proxy) + + else: + proxies_alive = proxies + + proxies_alive.sort() + print('proxy crawler got {} proxies'.format(len(proxies_alive))) + + with open('proxy.txt', 'w') as f: + print('proxy crawler dump start') + for proxy in proxies_alive: + print(proxy) + f.write(proxy + '\n') + print('proxy crawler dump end') + + print('proxy crawling end') + + +if __name__ == '__main__': + + check_url = None + if len(sys.argv) > 1: + check_url = sys.argv[1] + + viewer = None + if len(sys.argv) > 2: + viewer = sys.argv[2] + + crawl_proxies(check_url) + + if viewer: + sp.Popen([viewer, 'proxy.txt']) diff --git a/WebBasedCrawler/twitter/twdbhelper.py b/WebBasedCrawler/twitter/twdbhelper.py index 540fba4..e9a3fb9 100644 --- a/WebBasedCrawler/twitter/twdbhelper.py +++ b/WebBasedCrawler/twitter/twdbhelper.py @@ -1,17 +1,19 @@ from twitter.tweet import Tweet -import multiprocessing as mp +import queue class TwitterDBHelper: pymysql = __import__('pymysql.cursors') + DB_DUMP_SIZE = 128 def __init__(self): self.tweets = [] self.buffer = [] - self.lock = mp.Lock() + self.queue = queue.Queue() pass def __del__(self): + self.flush() pass def get_param(self, keyword_id): @@ -36,20 +38,14 @@ class TwitterDBHelper: return params - def insert_tweet(self, tweet: Tweet = None, db_num: int = -1, flush=False): + def flush(self): + local_buffer = [] + while not self.queue.empty(): + local_buffer.append(self.queue.get()) - # self.lock.acquire() - # if tweet is not None: - # self.buffer.append((tweet, db_num, )) - # - # local_buffer = None - # if len(self.buffer) >= 100 or flush: - # local_buffer = copy.deepcopy(self.buffer) - # self.buffer.clear() - # self.lock.release() + print('### db queue dump {}'.format(len(local_buffer))) - local_buffer = [(tweet, db_num, )] - if local_buffer: + if len(local_buffer) > 0: while True: try: conn = self.pymysql.connect(host='bigbird.iptime.org', @@ -80,3 +76,8 @@ class TwitterDBHelper: finally: conn.close() + + def insert_tweet(self, tweet: Tweet = None, db_num: int = -1, flush=False): + self.queue.put((tweet, db_num)) + if self.queue.qsize() >= self.DB_DUMP_SIZE: + self.flush() diff --git a/WebBasedCrawler/twitter/twittercrawl.py b/WebBasedCrawler/twitter/twittercrawl.py index f6fd539..0cdaadd 100644 --- a/WebBasedCrawler/twitter/twittercrawl.py +++ b/WebBasedCrawler/twitter/twittercrawl.py @@ -10,6 +10,7 @@ import requests import bs4 import json import urllib +import concurrent.futures import threading import queue import time @@ -20,7 +21,7 @@ class TwitterCrawler(): def __init__(self): self.default_config = TwitterConfig() self.db_helper = TwitterDBHelper() - self.proxy = None + self.proxy = {} self.before_day = None def set_arguments(self, browser, keyword_id, db_num, before_day, until_page): @@ -52,33 +53,34 @@ class TwitterCrawler(): url_tupple = (TwitterConfig.protocol, TwitterConfig.top_url, sub_url, '', urllib.parse.urlencode(params), '') return urllib.parse.urlunparse(url_tupple) - def get_page(self, url, proc_id): + def get_page(self, url, is_runner, proc_id): headers = { 'user-agent': 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/46.0.2490.86 Safari/537.36', 'Accept-Language': 'ko-KR,ko;q=0.8,en-US;q=0.6,en;q=0.4', } - if not self.proxy: - self.proxy = base.proxy.get_proxy_for_requests() + proxy_key = '{}-{}'.format('runner' if is_runner else 'content', proc_id) + if proxy_key not in self.proxy: + self.proxy[proxy_key] = base.proxy.get_proxy_for_requests() resp = None while True: try: - time.sleep(0.1) - resp = requests.get(url, headers=headers, proxies=self.proxy, timeout=3) + # time.sleep(random.random()) + resp = requests.get(url, headers=headers, proxies=self.proxy[proxy_key], timeout=3) except Exception as e: - if self.proxy == (None, None): + if self.proxy[proxy_key] == (None, None): break - print('[{}] proxy {} is expired. ({})'.format(proc_id, self.proxy, e)) - base.proxy.set_proxy_expired(self.proxy) - self.proxy = base.proxy.get_proxy_for_requests() + print('[{}] proxy {} is expired. ({})'.format(proc_id, self.proxy[proxy_key], e)) + base.proxy.set_proxy_expired(self.proxy[proxy_key]) + self.proxy[proxy_key] = base.proxy.get_proxy_for_requests() else: break return resp def runner_proc(self, proc_id, content_queue, result_queue, config): - print('{} to {} runner thread start'.format(config.start_str, config.end_str)) + print('[{}] {} to {} runner thread start'.format(proc_id, config.start_str, config.end_str)) b_continue = True min_tweet_id = None @@ -90,7 +92,7 @@ class TwitterCrawler(): if min_tweet_id is not None: max_position = 'TWEET-{}-{}'.format(max_tweet_id, min_tweet_id) url = self.get_timeline_url(config.keywords[0], config.start_str, config.end_str, max_position) - resp = self.get_page(url, proc_id) + resp = self.get_page(url, True, proc_id) if resp is None: break @@ -111,18 +113,18 @@ class TwitterCrawler(): self.db_helper.insert_tweet(tweet, config.db_num) # print('{} {}: {}...'.format(tweet.created_at, tweet.user_name, tweet.text[:20])) - print('body {} [{}]'.format(tweet.top_link, 'ok')) + print('[{}] body {} [{}]'.format(proc_id, tweet.top_link, 'ok')) count = len(tweet_tags) - if count == 0: - break - - if min_tweet_id is None: - min_tweet_id = tweet_tags[0].attrs['data-item-id'] - max_tweet_id = tweet_tags[-1].attrs['data-item-id'] tweet_count += count - print('{} to {} runner thread finished {}'.format(config.start_str, config.end_str, tweet_count)) + b_continue = j['has_more_items'] + if b_continue: + if min_tweet_id is None: + min_tweet_id = tweet_tags[0].attrs['data-item-id'] + max_tweet_id = tweet_tags[-1].attrs['data-item-id'] + + print('[{}] {} to {} runner thread finished {}'.format(proc_id, config.start_str, config.end_str, tweet_count)) result_queue.put({ 'proc_id': proc_id, 'count': tweet_count, @@ -167,7 +169,7 @@ class TwitterCrawler(): b_continue = True while b_continue: url = self.get_content_url(parent_tw.user_id, parent_tw.tweet_id, max_position) - resp = self.get_page(url, proc_id) + resp = self.get_page(url, False, proc_id) if resp is None or resp.status_code == 404: break elif resp.status_code != 200: @@ -184,7 +186,7 @@ class TwitterCrawler(): if len(tweet_tags) > 0: tweet = TweetParser.parse(tweet_tags[0], self.default_config.keyword_id, parent_tw.depth+1, top_tw) # print('[{}]>>> {} {}: {} ({}) ({})'.format(proc_id, tweet.created_at, tweet.user_name, tweet.text[:20], tweet.depth, tweet.tweet_link)) - print('reply {} [{}]'.format(tweet.top_link, 'ok')) + # print('[{}] reply {} [{}]'.format(proc_id, tweet.top_link, 'ok')) self.insert_content_pool(proc_id, content_queue, tweet, top_tw) self.db_helper.insert_tweet(tweet, self.default_config.db_num) tweet_count += 1 @@ -258,6 +260,7 @@ class TwitterCrawler(): start_time = time.time() # run + worker_count = 64 split_config = self.default_config.split() content_qu = queue.Queue() @@ -267,38 +270,42 @@ class TwitterCrawler(): runner_result_cnt = 0 content_result_cnt = 0 - runner_threads = [threading.Thread(target=self.runner_proc, args=(proc_id, content_qu, runner_result_qu, config)) for proc_id, config in enumerate(split_config)] - content_threads = [threading.Thread(target=self.content_proc, args=(proc_id, content_qu, content_result_qu)) for proc_id in range(16)] + content_pool = concurrent.futures.ThreadPoolExecutor(max_workers=worker_count) + [content_pool.submit(self.content_proc, proc_id, content_qu, content_result_qu) for proc_id in range(worker_count)] - [th.start() for th in runner_threads] - [th.start() for th in content_threads] + runner_pool = concurrent.futures.ThreadPoolExecutor(max_workers=worker_count) + [runner_pool.submit(self.runner_proc, proc_id, content_qu, runner_result_qu, config) for proc_id, config in enumerate(split_config)] - [th.join() for th in runner_threads] - [th.join() for th in content_threads] + runner_pool.shutdown(wait=True) + content_pool.shutdown(wait=True) + self.db_helper.flush() # rerun zero runners - runner_threads = [] + print('restart failed runner') + runner_pool = concurrent.futures.ThreadPoolExecutor(max_workers=worker_count) runner_result_qu2 = queue.Queue() idx = 0 + b_rerun = False while not runner_result_qu.empty(): res = runner_result_qu.get() runner_result_cnt += res['count'] if res['count'] == 0: - th = threading.Thread(target=self.runner_proc, args=(idx, content_qu, runner_result_qu2, split_config[idx])) - runner_threads.append(th) + runner_pool.submit(self.runner_proc, idx, content_qu, runner_result_qu2, split_config[idx]) + b_rerun = True idx += 1 - if len(runner_threads) > 0: - content_threads = [threading.Thread(target=self.content_proc, args=(proc_id, content_qu, content_result_qu)) for proc_id in range(16)] - else: - content_threads = [] + while not content_result_qu.empty(): + res = content_result_qu.get() + content_result_cnt += res['count'] - [th.start() for th in runner_threads] - [th.start() for th in content_threads] + if b_rerun: + content_pool = concurrent.futures.ThreadPoolExecutor(max_workers=worker_count) + [content_pool.submit(self.content_proc, proc_id, content_qu, content_result_qu) for proc_id in range(worker_count)] - [th.join() for th in runner_threads] - [th.join() for th in content_threads] + runner_pool.shutdown(wait=True) + content_pool.shutdown(wait=True) + self.db_helper.flush() while not runner_result_qu2.empty(): res = runner_result_qu2.get() diff --git a/package-clients.bat b/package-clients.bat index d8dd343..d166e3b 100644 --- a/package-clients.bat +++ b/package-clients.bat @@ -35,6 +35,7 @@ cd %CUR_PATH% %QT_PATH%\windeployqt.exe %DEPLOY_PATH%\FilterProcess.exe copy %MYSQL_PATH%\libmysql.dll %DEPLOY_PATH% +copy %CUR_PATH%\*.txt %DEPLOY_PATH%\ xcopy %PYTHONCRAWLER_PATH%\*.py %DEPLOY_PATH%\ /c /d /s /y xcopy %PYTHONCRAWLER_PATH%\*.txt %DEPLOY_PATH%\ /c /d /s /y