트위터 크롤러 수정

- proxy.txt에 있는 ip 모두 소진하면 새롭게 가져와서 채우는 기능
- db에 넣을 때 128개 모아서 한번에 넣는 기능
- concurrent.future.ThreadPoolExecutor 사용
- qt에서 로그 라인별로 읽어서 출력
- 로그 256개씩 한번에 삭제
This commit is contained in:
mjjo
2017-08-01 10:39:03 +09:00
parent 3d5e2d0c98
commit de8f2f4c23
7 changed files with 246 additions and 72 deletions

View File

@@ -56,19 +56,41 @@ void STwitterTagManage::processFinished(QProcess *pPro, QString _strOut)
void STwitterTagManage::readStandardOutput()
{
if(!m_pMain)
return;
static bool bRunning = false;
if(bRunning)
return;
bRunning = true;
QProcess *pPro = (QProcess*)sender();
QThread::msleep(100);
QString str = pPro->readAllStandardOutput();
QStringList list = str.split("\n", QString::SkipEmptyParts);
foreach(QString log,list)
QStringList lines;
while(pPro->canReadLine())
{
if (m_pMain)
{
m_pMain->InsertLog(log);
}
else
exit(0);
QString line = pPro->readLine();
lines.append(line.simplified());
}
foreach(QString line, lines)
m_pMain->InsertLog(line);
bRunning = false;
// QThread::msleep(100);
// QString str = pPro->readAllStandardOutput();
// QStringList list = str.split("\n", QString::SkipEmptyParts);
// foreach(QString log,list)
// {
// if (m_pMain)
// {
// m_pMain->InsertLog(log);
// }
// else
// exit(0);
// }
}
void STwitterTagManage::readStandardError()

View File

@@ -185,11 +185,14 @@ void Widget::InsertLog(QString str)
file.close();
if (m_pResultList->count() > 1024)
{
for(int i=0; i<256; i++)
{
m_pResultList->removeItemWidget(m_pResultList->item(0));
QListWidgetItem* item = m_pResultList->takeItem(0);
delete item;
}
}
m_pResultList->setCurrentRow( m_pResultList->count() - 1 );
m_pResultList->repaint();
}

View File

@@ -4,6 +4,7 @@ import pymysql
import os
from selenium import webdriver
import sys
import base.proxy_crawler
proxy_filename = 'proxy.txt'
re_ip = re.compile('([\d]{1,3}\.[\d]{1,3}\.[\d]{1,3}\.[\d]{1,3})[^\d]([\d]{2,5})')
@@ -126,14 +127,20 @@ def set_proxy_expired(proxy):
f.writelines(lines)
def get_proxy_from_file(filename):
def get_proxy_from_file(filename, check_url=None):
"""
:param filename:
:param check_url: valid check url
:return (ip, port): string, string
if ip, port or filename is invalid, return (None, None)
"""
proxy_lists = [line.replace('\n', '') for line in open(filename) if not line.strip().startswith('#') and re_ip.search(line)]
if proxy_lists:
if len(proxy_lists) <= 0:
base.proxy_crawler.crawl_proxies(check_url)
proxy_lists = [line.replace('\n', '') for line in open(filename) if not line.strip().startswith('#') and re_ip.search(line)]
if len(proxy_lists) > 0:
m = re_ip.search(proxy_lists[random.randint(0, len(proxy_lists) - 1)])
if m:
return m.group(1), m.group(2)
@@ -156,9 +163,9 @@ def get_proxy_from_db():
return None, None
def get_proxy():
def get_proxy(check_url=None):
if os.path.exists(proxy_filename) and os.path.isfile(proxy_filename):
ip, port = get_proxy_from_file(proxy_filename)
ip, port = get_proxy_from_file(proxy_filename, check_url)
if not ip or not port:
return get_proxy_from_db()
else:
@@ -174,6 +181,6 @@ def get_requests_proxy(proxies):
}
def get_proxy_for_requests():
ip, port = get_proxy()
def get_proxy_for_requests(check_url=None):
ip, port = get_proxy(check_url)
return get_requests_proxy(ip + ":" + port)

View File

@@ -0,0 +1,133 @@
import requests
import bs4
import subprocess as sp
import sys
import queue
import threading
def get_page(url):
resp = requests.get(url)
return resp
def get_proxies_free_proxy():
proxies = []
resp = get_page('https://free-proxy-list.net')
soup = bs4.BeautifulSoup(resp.text, 'lxml')
table = soup.select('table.table')
trs = table[0].select('tr')
for tr in trs:
tds = tr.select('td')
if len(tds) > 0:
ip = tds[0].text
port = tds[1].text
proxies.append('{}:{}'.format(ip, port))
return proxies
def get_proxies_proxy_searcher():
proxies = []
resp = get_page('http://proxysearcher.sourceforge.net/Proxy List.php')
soup = bs4.BeautifulSoup(resp.text, 'lxml')
table = soup.select('table.tablesorter')
trs = table[0].select('tr')
for tr in trs:
tds = tr.select('td')
if len(tds) > 0:
proxy = tds[1].text
proxies.append(proxy)
return proxies
# def get_proxies_nntime():
# proxies = []
#
# resp = get_page('http://nntime.com/')
# soup = bs4.BeautifulSoup(resp.text, 'lxml')
# table = soup.select('table.data')
# trs = table[0].select('tr')
#
# for tr in trs[1:]:
# tds = tr.select('td')
# if len(tds) > 0:
# proxy = tds[1].text
# proxies.append(proxy)
#
# return proxies
def check_proxy(qu, proxy, url):
proxy_dict = {
'http': proxy,
'https': proxy,
}
try:
resp = requests.get(url, proxies=proxy_dict, timeout=2)
except Exception as e:
return False
else:
if resp.status_code != 200:
return False
qu.put(proxy)
return True
def crawl_proxies(check_url=None):
print('proxy crawling start')
proxies = get_proxies_free_proxy()
proxies += get_proxies_proxy_searcher()
# proxies += get_proxies_nntime()
proxies = list(set(proxies))
if check_url:
qu = queue.Queue()
threads = []
for proxy in proxies:
th = threading.Thread(target=check_proxy, args=(qu, proxy, check_url))
threads.append(th)
[th.start() for th in threads]
[th.join() for th in threads]
proxies_alive = []
while not qu.empty():
proxy = qu.get()
proxies_alive.append(proxy)
else:
proxies_alive = proxies
proxies_alive.sort()
print('proxy crawler got {} proxies'.format(len(proxies_alive)))
with open('proxy.txt', 'w') as f:
print('proxy crawler dump start')
for proxy in proxies_alive:
print(proxy)
f.write(proxy + '\n')
print('proxy crawler dump end')
print('proxy crawling end')
if __name__ == '__main__':
check_url = None
if len(sys.argv) > 1:
check_url = sys.argv[1]
viewer = None
if len(sys.argv) > 2:
viewer = sys.argv[2]
crawl_proxies(check_url)
if viewer:
sp.Popen([viewer, 'proxy.txt'])

View File

@@ -1,17 +1,19 @@
from twitter.tweet import Tweet
import multiprocessing as mp
import queue
class TwitterDBHelper:
pymysql = __import__('pymysql.cursors')
DB_DUMP_SIZE = 128
def __init__(self):
self.tweets = []
self.buffer = []
self.lock = mp.Lock()
self.queue = queue.Queue()
pass
def __del__(self):
self.flush()
pass
def get_param(self, keyword_id):
@@ -36,20 +38,14 @@ class TwitterDBHelper:
return params
def insert_tweet(self, tweet: Tweet = None, db_num: int = -1, flush=False):
def flush(self):
local_buffer = []
while not self.queue.empty():
local_buffer.append(self.queue.get())
# self.lock.acquire()
# if tweet is not None:
# self.buffer.append((tweet, db_num, ))
#
# local_buffer = None
# if len(self.buffer) >= 100 or flush:
# local_buffer = copy.deepcopy(self.buffer)
# self.buffer.clear()
# self.lock.release()
print('### db queue dump {}'.format(len(local_buffer)))
local_buffer = [(tweet, db_num, )]
if local_buffer:
if len(local_buffer) > 0:
while True:
try:
conn = self.pymysql.connect(host='bigbird.iptime.org',
@@ -80,3 +76,8 @@ class TwitterDBHelper:
finally:
conn.close()
def insert_tweet(self, tweet: Tweet = None, db_num: int = -1, flush=False):
self.queue.put((tweet, db_num))
if self.queue.qsize() >= self.DB_DUMP_SIZE:
self.flush()

View File

@@ -10,6 +10,7 @@ import requests
import bs4
import json
import urllib
import concurrent.futures
import threading
import queue
import time
@@ -20,7 +21,7 @@ class TwitterCrawler():
def __init__(self):
self.default_config = TwitterConfig()
self.db_helper = TwitterDBHelper()
self.proxy = None
self.proxy = {}
self.before_day = None
def set_arguments(self, browser, keyword_id, db_num, before_day, until_page):
@@ -52,33 +53,34 @@ class TwitterCrawler():
url_tupple = (TwitterConfig.protocol, TwitterConfig.top_url, sub_url, '', urllib.parse.urlencode(params), '')
return urllib.parse.urlunparse(url_tupple)
def get_page(self, url, proc_id):
def get_page(self, url, is_runner, proc_id):
headers = {
'user-agent': 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/46.0.2490.86 Safari/537.36',
'Accept-Language': 'ko-KR,ko;q=0.8,en-US;q=0.6,en;q=0.4',
}
if not self.proxy:
self.proxy = base.proxy.get_proxy_for_requests()
proxy_key = '{}-{}'.format('runner' if is_runner else 'content', proc_id)
if proxy_key not in self.proxy:
self.proxy[proxy_key] = base.proxy.get_proxy_for_requests()
resp = None
while True:
try:
time.sleep(0.1)
resp = requests.get(url, headers=headers, proxies=self.proxy, timeout=3)
# time.sleep(random.random())
resp = requests.get(url, headers=headers, proxies=self.proxy[proxy_key], timeout=3)
except Exception as e:
if self.proxy == (None, None):
if self.proxy[proxy_key] == (None, None):
break
print('[{}] proxy {} is expired. ({})'.format(proc_id, self.proxy, e))
base.proxy.set_proxy_expired(self.proxy)
self.proxy = base.proxy.get_proxy_for_requests()
print('[{}] proxy {} is expired. ({})'.format(proc_id, self.proxy[proxy_key], e))
base.proxy.set_proxy_expired(self.proxy[proxy_key])
self.proxy[proxy_key] = base.proxy.get_proxy_for_requests()
else:
break
return resp
def runner_proc(self, proc_id, content_queue, result_queue, config):
print('{} to {} runner thread start'.format(config.start_str, config.end_str))
print('[{}] {} to {} runner thread start'.format(proc_id, config.start_str, config.end_str))
b_continue = True
min_tweet_id = None
@@ -90,7 +92,7 @@ class TwitterCrawler():
if min_tweet_id is not None:
max_position = 'TWEET-{}-{}'.format(max_tweet_id, min_tweet_id)
url = self.get_timeline_url(config.keywords[0], config.start_str, config.end_str, max_position)
resp = self.get_page(url, proc_id)
resp = self.get_page(url, True, proc_id)
if resp is None:
break
@@ -111,18 +113,18 @@ class TwitterCrawler():
self.db_helper.insert_tweet(tweet, config.db_num)
# print('{} {}: {}...'.format(tweet.created_at, tweet.user_name, tweet.text[:20]))
print('body {} [{}]'.format(tweet.top_link, 'ok'))
print('[{}] body {} [{}]'.format(proc_id, tweet.top_link, 'ok'))
count = len(tweet_tags)
if count == 0:
break
tweet_count += count
b_continue = j['has_more_items']
if b_continue:
if min_tweet_id is None:
min_tweet_id = tweet_tags[0].attrs['data-item-id']
max_tweet_id = tweet_tags[-1].attrs['data-item-id']
tweet_count += count
print('{} to {} runner thread finished {}'.format(config.start_str, config.end_str, tweet_count))
print('[{}] {} to {} runner thread finished {}'.format(proc_id, config.start_str, config.end_str, tweet_count))
result_queue.put({
'proc_id': proc_id,
'count': tweet_count,
@@ -167,7 +169,7 @@ class TwitterCrawler():
b_continue = True
while b_continue:
url = self.get_content_url(parent_tw.user_id, parent_tw.tweet_id, max_position)
resp = self.get_page(url, proc_id)
resp = self.get_page(url, False, proc_id)
if resp is None or resp.status_code == 404:
break
elif resp.status_code != 200:
@@ -184,7 +186,7 @@ class TwitterCrawler():
if len(tweet_tags) > 0:
tweet = TweetParser.parse(tweet_tags[0], self.default_config.keyword_id, parent_tw.depth+1, top_tw)
# print('[{}]>>> {} {}: {} ({}) ({})'.format(proc_id, tweet.created_at, tweet.user_name, tweet.text[:20], tweet.depth, tweet.tweet_link))
print('reply {} [{}]'.format(tweet.top_link, 'ok'))
# print('[{}] reply {} [{}]'.format(proc_id, tweet.top_link, 'ok'))
self.insert_content_pool(proc_id, content_queue, tweet, top_tw)
self.db_helper.insert_tweet(tweet, self.default_config.db_num)
tweet_count += 1
@@ -258,6 +260,7 @@ class TwitterCrawler():
start_time = time.time()
# run
worker_count = 64
split_config = self.default_config.split()
content_qu = queue.Queue()
@@ -267,38 +270,42 @@ class TwitterCrawler():
runner_result_cnt = 0
content_result_cnt = 0
runner_threads = [threading.Thread(target=self.runner_proc, args=(proc_id, content_qu, runner_result_qu, config)) for proc_id, config in enumerate(split_config)]
content_threads = [threading.Thread(target=self.content_proc, args=(proc_id, content_qu, content_result_qu)) for proc_id in range(16)]
content_pool = concurrent.futures.ThreadPoolExecutor(max_workers=worker_count)
[content_pool.submit(self.content_proc, proc_id, content_qu, content_result_qu) for proc_id in range(worker_count)]
[th.start() for th in runner_threads]
[th.start() for th in content_threads]
runner_pool = concurrent.futures.ThreadPoolExecutor(max_workers=worker_count)
[runner_pool.submit(self.runner_proc, proc_id, content_qu, runner_result_qu, config) for proc_id, config in enumerate(split_config)]
[th.join() for th in runner_threads]
[th.join() for th in content_threads]
runner_pool.shutdown(wait=True)
content_pool.shutdown(wait=True)
self.db_helper.flush()
# rerun zero runners
runner_threads = []
print('restart failed runner')
runner_pool = concurrent.futures.ThreadPoolExecutor(max_workers=worker_count)
runner_result_qu2 = queue.Queue()
idx = 0
b_rerun = False
while not runner_result_qu.empty():
res = runner_result_qu.get()
runner_result_cnt += res['count']
if res['count'] == 0:
th = threading.Thread(target=self.runner_proc, args=(idx, content_qu, runner_result_qu2, split_config[idx]))
runner_threads.append(th)
runner_pool.submit(self.runner_proc, idx, content_qu, runner_result_qu2, split_config[idx])
b_rerun = True
idx += 1
if len(runner_threads) > 0:
content_threads = [threading.Thread(target=self.content_proc, args=(proc_id, content_qu, content_result_qu)) for proc_id in range(16)]
else:
content_threads = []
while not content_result_qu.empty():
res = content_result_qu.get()
content_result_cnt += res['count']
[th.start() for th in runner_threads]
[th.start() for th in content_threads]
if b_rerun:
content_pool = concurrent.futures.ThreadPoolExecutor(max_workers=worker_count)
[content_pool.submit(self.content_proc, proc_id, content_qu, content_result_qu) for proc_id in range(worker_count)]
[th.join() for th in runner_threads]
[th.join() for th in content_threads]
runner_pool.shutdown(wait=True)
content_pool.shutdown(wait=True)
self.db_helper.flush()
while not runner_result_qu2.empty():
res = runner_result_qu2.get()

View File

@@ -35,6 +35,7 @@ cd %CUR_PATH%
%QT_PATH%\windeployqt.exe %DEPLOY_PATH%\FilterProcess.exe
copy %MYSQL_PATH%\libmysql.dll %DEPLOY_PATH%
copy %CUR_PATH%\*.txt %DEPLOY_PATH%\
xcopy %PYTHONCRAWLER_PATH%\*.py %DEPLOY_PATH%\ /c /d /s /y
xcopy %PYTHONCRAWLER_PATH%\*.txt %DEPLOY_PATH%\ /c /d /s /y