프록시에 lock 적용, exception 해결
This commit is contained in:
@@ -103,32 +103,27 @@ class Proxy2Handler:
|
||||
|
||||
def lock_enter(self):
|
||||
# logger.log('lock {}'.format(threading.current_thread().ident))
|
||||
# self.lock.acquire()
|
||||
self.lock.acquire()
|
||||
pass
|
||||
|
||||
def lock_leave(self):
|
||||
# self.lock.release()
|
||||
self.lock.release()
|
||||
# logger.log('unlock {}'.format(threading.current_thread().ident))
|
||||
pass
|
||||
|
||||
def commit(self):
|
||||
self.lock_enter()
|
||||
# self.session.commit()
|
||||
self.lock_leave()
|
||||
pass
|
||||
|
||||
def get_oldest(self, platform):
|
||||
self.lock_enter()
|
||||
instance = self.session.query(Proxy2Model).order_by(self.block_field_map[platform].desc()).first()
|
||||
self.lock_leave()
|
||||
return instance
|
||||
|
||||
def get_query(self, ip, port):
|
||||
return self.session.query(Proxy2Model).filter_by(ip=ip).filter_by(port=port)
|
||||
|
||||
def get_instance(self, ip, port):
|
||||
self.lock_enter()
|
||||
instance = self.get_query(ip, port).first()
|
||||
self.lock_leave()
|
||||
return instance
|
||||
|
||||
def check_all_proxies(self, platform):
|
||||
@@ -171,82 +166,40 @@ class Proxy2Handler:
|
||||
def get(self, platform, proc_id=-1):
|
||||
self.lock_enter()
|
||||
|
||||
try:
|
||||
block_column = self.block_field_map[platform]
|
||||
instances = self.session.query(Proxy2Model).filter(block_column == None).limit(32).all()
|
||||
instance = instances[random.randint(0, len(instances)-1)] if len(instances) > 0 else None
|
||||
proxy = None
|
||||
if instance:
|
||||
proxy = instance.get_instance_for_http()
|
||||
else:
|
||||
cnt = self.check_all_proxies(platform)
|
||||
if cnt <= 0:
|
||||
proxies = proxy_crawler.crawl_proxies()
|
||||
self.insert_all(proxies)
|
||||
|
||||
block_column = self.block_field_map[platform]
|
||||
try:
|
||||
instances = self.session.query(Proxy2Model).filter(block_column == None).limit(32).all()
|
||||
except Exception as e:
|
||||
dbg.print_exception()
|
||||
assert True
|
||||
|
||||
self.lock_leave()
|
||||
|
||||
# try:
|
||||
# session_factory = sqlalchemy.orm.sessionmaker(bind=self.engine)
|
||||
# self.session = sqlalchemy.orm.scoped_session(session_factory)
|
||||
# logger.log('{} session recreate'.format(proc_id))
|
||||
#
|
||||
# except Exception as e2:
|
||||
# dbg.print_exception(e2)
|
||||
|
||||
return None
|
||||
|
||||
instance = None
|
||||
if len(instances) > 0:
|
||||
instance = instances[random.randint(0, len(instances)-1)] if len(instances) > 0 else None
|
||||
|
||||
if instance:
|
||||
self.lock_leave()
|
||||
return instance.get_instance_for_http()
|
||||
else:
|
||||
cnt = self.check_all_proxies(platform)
|
||||
if cnt <= 0:
|
||||
proxies = proxy_crawler.crawl_proxies()
|
||||
self.insert_all(proxies)
|
||||
|
||||
self.lock_leave()
|
||||
return self.get(platform, proc_id)
|
||||
|
||||
except Exception as e:
|
||||
dbg.print_exception(e)
|
||||
self.lock_leave()
|
||||
return proxy
|
||||
|
||||
def insert(self, ip, port):
|
||||
instance = self.get_instance(ip, port)
|
||||
if not instance:
|
||||
proxy = Proxy2Model(ip, port)
|
||||
self.lock_enter()
|
||||
self.session.add(proxy)
|
||||
self.lock_leave()
|
||||
self.commit()
|
||||
|
||||
def insert_all(self, proxies):
|
||||
print('{} proxy insert start'.format(len(proxies)))
|
||||
# INSERT INTO proxy2(ip, PORT)
|
||||
# SELECT <ip>, <port> FROM DUAL
|
||||
# WHERE NOT EXISTS (SELECT * FROM proxy2 WHERE ip=<ip> AND PORT=<port>)
|
||||
self.lock.acquire()
|
||||
for proxy in proxies:
|
||||
query = r"INSERT INTO proxy2(ip, PORT) " \
|
||||
r"SELECT '{}', {} FROM DUAL " \
|
||||
r"WHERE NOT EXISTS (SELECT * FROM proxy2 WHERE ip='{}' AND PORT={})"\
|
||||
.format(proxy['ip'], proxy['port'], proxy['ip'], proxy['port'])
|
||||
# 안됨 - 중복으로 들어감, 쓰레드 종료됨
|
||||
self.engine.execute(query)
|
||||
self.lock.release()
|
||||
|
||||
# self.query(Proxy2Model).insert()
|
||||
#
|
||||
# self.query(Proxy2Model).filter(Proxy2Model.ip == proxy['ip']).filter(Proxy2Model.port == proxy['port']).\
|
||||
# filter(
|
||||
# ~sqlalchemy.exists().where(
|
||||
# sqlalchemy.and_(
|
||||
# Proxy2Model.kw_id == Proxy2Model.kw_id,
|
||||
# Proxy2Model.checkpoint_id == Proxy2Model.id
|
||||
# )
|
||||
# )
|
||||
# )
|
||||
#
|
||||
# if self.session.query(Proxy2Model).filter_by(ip=proxy['ip']).filter_by(port=proxy['port']).count() == 0:
|
||||
# self.session.add(Proxy2Model(proxy['ip'], proxy['port']))
|
||||
print('{} proxy insert end'.format(len(proxies)))
|
||||
|
||||
def set_proxy_blocked(self, ip, port, platform):
|
||||
try:
|
||||
|
||||
@@ -86,13 +86,14 @@ def check_proxy(qu, proxy, url):
|
||||
|
||||
|
||||
def crawl_proxies(check_url=None):
|
||||
# print('proxy crawling start')
|
||||
proxies = get_proxies_free_proxy()
|
||||
print('proxy crawling start')
|
||||
proxies = []
|
||||
proxies += get_proxies_free_proxy()
|
||||
proxies += get_proxies_proxy_searcher()
|
||||
# proxies += get_proxies_nntime()
|
||||
# proxies = list(set(proxies))
|
||||
# print('proxy crawled {}'.format(len(proxies)))
|
||||
|
||||
proxies_alive = []
|
||||
if check_url:
|
||||
qu = queue.Queue()
|
||||
threads = []
|
||||
@@ -103,7 +104,6 @@ def crawl_proxies(check_url=None):
|
||||
[th.start() for th in threads]
|
||||
[th.join() for th in threads]
|
||||
|
||||
proxies_alive = []
|
||||
while not qu.empty():
|
||||
proxy = qu.get()
|
||||
proxies_alive.append(proxy)
|
||||
@@ -111,21 +111,9 @@ def crawl_proxies(check_url=None):
|
||||
else:
|
||||
proxies_alive = proxies
|
||||
|
||||
# print('proxy crawling end')
|
||||
print('proxy crawled {}'.format(len(proxies_alive)))
|
||||
return proxies_alive
|
||||
|
||||
# proxies_alive.sort()
|
||||
# print('proxy crawler got {} proxies'.format(len(proxies_alive)))
|
||||
#
|
||||
# with open('proxy.txt', 'w') as f:
|
||||
# print('proxy crawler dump start')
|
||||
# for proxy in proxies_alive:
|
||||
# # print(proxy)
|
||||
# f.write(proxy + '\n')
|
||||
# print('proxy crawler dump end')
|
||||
#
|
||||
# print('proxy crawling end')
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
|
||||
|
||||
Reference in New Issue
Block a user