diff --git a/WebBasedCrawler/base/proxy2.py b/WebBasedCrawler/base/proxy2.py index 4cf2adc..f9d3b72 100644 --- a/WebBasedCrawler/base/proxy2.py +++ b/WebBasedCrawler/base/proxy2.py @@ -1,4 +1,4 @@ -import base.proxy_crawler as proxy_crawler +import base.proxy_crawler as proxy_crawler import base.logger as logger import sqlalchemy @@ -11,10 +11,22 @@ import datetime import threading import random +import requests + Base = sqlalchemy.ext.declarative.declarative_base() +class Platform(enum.Enum): + NAVER = 'naver' + DAUM = 'daum' + FACEBOOK = 'facebook' + KAKAO = 'kakao' + INSTA = 'insta' + TWITTER = 'twitter' + YOUTUBE = 'youtube' + + class Proxy2Model(Base): __tablename__ = 'proxy2' id = sqlalchemy.Column(sqlalchemy.Integer, primary_key=True, nullable=False, autoincrement=True) @@ -28,13 +40,23 @@ class Proxy2Model(Base): insta_block_at = sqlalchemy.Column(sqlalchemy.DateTime) twitter_block_at = sqlalchemy.Column(sqlalchemy.DateTime) youtube_block_at = sqlalchemy.Column(sqlalchemy.DateTime) + dead = sqlalchemy.Column(sqlalchemy.Boolean, default=False) def __init__(self, ip, port): self.ip = ip self.port = port + self.block_map = { + Platform.NAVER: self.naver_block_at, + Platform.DAUM: self.daum_block_at, + Platform.FACEBOOK: self.facebook_block_at, + Platform.KAKAO: self.kakao_block_at, + Platform.INSTA: self.insta_block_at, + Platform.TWITTER: self.twitter_block_at, + Platform.YOUTUBE: self.youtube_block_at, + } def __repr__(self): - return '{}:{}'.format(self.ip, self.port) + return '{}:{} (twitter:{})'.format(self.ip, self.port, self.twitter_block_at) def get_instance_for_http(self): return { @@ -44,15 +66,21 @@ class Proxy2Model(Base): 'port': self.port, } - -class Platform(enum.Enum): - NAVER = 'naver' - DAUM = 'daum' - FACEBOOK = 'facebook' - KAKAO = 'kakao' - INSTA = 'insta' - TWITTER = 'twitter' - YOUTUBE = 'youtube' + def set_block_at(self, platform, value): + if platform == Platform.NAVER: + self.naver_block_at = value + elif platform == Platform.DAUM: + self.daum_block_at = value + elif platform == Platform.FACEBOOK: + self.facebook_block_at = value + elif platform == Platform.KAKAO: + self.kakao_block_at = value + elif platform == Platform.INSTA: + self.insta_block_at = value + elif platform == Platform.TWITTER: + self.twitter_block_at = value + elif platform == Platform.YOUTUBE: + self.youtube_block_at = value class Proxy2Handler: @@ -69,8 +97,9 @@ class Proxy2Handler: def __init__(self): self.lock = threading.Lock() self.engine = sqlalchemy.create_engine('mysql+pymysql://admin:admin123@bigbird.iptime.org/concepters?charset=utf8') - session_factory = sqlalchemy.orm.sessionmaker(bind=self.engine) + session_factory = sqlalchemy.orm.sessionmaker(bind=self.engine, autocommit=True, autoflush=True) self.session = sqlalchemy.orm.scoped_session(session_factory) + pass def lock_enter(self): # logger.log('lock {}'.format(threading.current_thread().ident)) @@ -93,15 +122,6 @@ class Proxy2Handler: self.lock_leave() return instance - # def get(self, platform): - # proxy = self.session.query(Proxy2Model).filter(self.block_field_map[platform] == None).first() - # if not proxy: - # proxy_crawler.crawl_proxies() - # - # proxy = self.get_oldest(platform) - # - # return proxy - def get_query(self, ip, port): return self.session.query(Proxy2Model).filter_by(ip=ip).filter_by(port=port) @@ -111,6 +131,43 @@ class Proxy2Handler: self.lock_leave() return instance + def check_all_proxies(self, platform): + print('check all start') + + url_map = { + Platform.NAVER: 'https://www.naver.com', + Platform.DAUM: 'https://www.daum.net', + Platform.FACEBOOK: 'https://www.facebook.com', + Platform.KAKAO: 'https://story.kakao.com', + Platform.INSTA: 'https://www.instagram.com', + Platform.TWITTER: 'https://twitter.com', + Platform.YOUTUBE: 'https://www.youtube.com', + } + + block_column = self.block_field_map[platform] + instances = self.session.query(Proxy2Model).filter(block_column != None).filter_by(dead=False).order_by(block_column).limit(8).all() + alive_cnt = 0 + for instance in instances: + proxy = instance.get_instance_for_http() + try: + resp = requests.get(url_map[platform], proxies=proxy, timeout=1) + except requests.exceptions.ProxyError as e: + instance.dead = True + except (requests.exceptions.ConnectTimeout, requests.exceptions.SSLError, requests.exceptions.ReadTimeout, requests.exceptions.ConnectionError) as e: + instance.set_block_at(platform, datetime.datetime.now()) + except Exception as e: + instance.set_block_at(platform, datetime.datetime.now()) + else: + if resp.ok: + instance.set_block_at(platform, None) + alive_cnt += 1 + print('proxy {}:{} alive'.format(instance.ip, instance.port)) + else: + instance.set_block_at(platform, datetime.datetime.now()) + + print('check all end') + return alive_cnt + def get(self, platform, proc_id=-1): self.lock_enter() @@ -133,8 +190,11 @@ class Proxy2Handler: self.lock_leave() return instance.get_instance_for_http() else: - proxies = proxy_crawler.crawl_proxies() - self.insert_all(proxies) + cnt = self.check_all_proxies(platform) + if cnt <= 0: + proxies = proxy_crawler.crawl_proxies() + self.insert_all(proxies) + self.lock_leave() return self.get(platform, proc_id) @@ -147,22 +207,37 @@ class Proxy2Handler: self.lock_leave() def insert_all(self, proxies): - # INSERT INTO proxy2(ip, PORT) # SELECT , FROM DUAL # WHERE NOT EXISTS (SELECT * FROM proxy2 WHERE ip= AND PORT=) - - instances_add = [] + self.lock.acquire() for proxy in proxies: - instance = self.get_instance(proxy['ip'], proxy['port']) - if not instance: - instances_add.append(Proxy2Model(proxy['ip'], proxy['port'])) - self.session.bulk_save_objects(instances_add) - self.commit() + query = r"INSERT INTO proxy2(ip, PORT) " \ + r"SELECT '{}', {} FROM DUAL " \ + r"WHERE NOT EXISTS (SELECT * FROM proxy2 WHERE ip='{}' AND PORT={})"\ + .format(proxy['ip'], proxy['port'], proxy['ip'], proxy['port']) + # 안됨 - 중복으로 들어감, 쓰레드 종료됨 + self.engine.execute(query) + self.lock.release() + + # self.query(Proxy2Model).insert() + # + # self.query(Proxy2Model).filter(Proxy2Model.ip == proxy['ip']).filter(Proxy2Model.port == proxy['port']).\ + # filter( + # ~sqlalchemy.exists().where( + # sqlalchemy.and_( + # Proxy2Model.kw_id == Proxy2Model.kw_id, + # Proxy2Model.checkpoint_id == Proxy2Model.id + # ) + # ) + # ) + # + # if self.session.query(Proxy2Model).filter_by(ip=proxy['ip']).filter_by(port=proxy['port']).count() == 0: + # self.session.add(Proxy2Model(proxy['ip'], proxy['port'])) def set_proxy_blocked(self, ip, port, platform): block_column = self.block_field_map[platform] - query = self.get_query(ip, port).filter(block_column == None) + query = self.get_query(ip, port) query.update({block_column: datetime.datetime.now()}) self.commit() diff --git a/WebBasedCrawler/twitter/twittercrawl.py b/WebBasedCrawler/twitter/twittercrawl.py index f074c07..2dd0a6d 100644 --- a/WebBasedCrawler/twitter/twittercrawl.py +++ b/WebBasedCrawler/twitter/twittercrawl.py @@ -61,7 +61,8 @@ class TwitterCrawler: proxy = None while not proxy: proxy = self.proxy_handler.get(proxy2.Platform.TWITTER, proxy_key) - time.sleep(1) + if not proxy: + time.sleep(1) return proxy @@ -257,7 +258,7 @@ class TwitterCrawler: test_tw.user_id = 'Awesome_vely' test_tw.tweet_id = 888704413111435264 - test_tw.text = '시작' + test_tw.text = '?작' self.insert_content_pool(0, content_qu, test_tw, test_tw) content_threads = [threading.Thread(target=self.content_proc, args=(proc_id, content_qu, content_result_qu)) for proc_id in range(16)] @@ -300,7 +301,7 @@ class TwitterCrawler: start_time = time.time() # run - worker_count = 4 + worker_count = 16 split_config = self.default_config.split() content_qu = queue.Queue()