import re import configparser class ResultSender: pymysql = __import__('pymysql.cursors') re_emoji = re.compile(u'[\U0001F300-\U0001F64F\U0001F680-\U0001F6FF]+', re.UNICODE) def __init__(self, host='182.162.171.147', user='admin', passwd='admin123', db='bigbird'): self.host = host self.user = user self.passwd = passwd self.db = db self.conn = None def connect(self): self.conn = self.pymysql.connect(host=self.host, user=self.user, passwd=self.passwd, db=self.db, charset='utf8', cursorclass=self.pymysql.cursors.DictCursor) def close(self): if self.conn: self.conn.close() def _make_query(self, table_name, dictionary): query = "insert into " + str(table_name) + " (" key_list = list() val_list = list() for key, val in dictionary.items(): key_list.append(key) if type(val) == int: val_list.append(str(val)) else: val_list.append(self.conn.escape(self.re_emoji.sub(' ', str(val)))) return query + ",".join(key_list) + ") values (" + ",".join(val_list) + ")" + " on duplicate key update " + \ ','.join(map(lambda x:x[0] + '=' + x[1], zip(key_list, val_list))) def get_buzz(self, event_num): query = 'select replybuzz from stats_s1_effect where event_num = ' + str(event_num) if not self.conn.open: self.connect() with self.conn.cursor() as cursor: try: cursor.execute(query) buzz = cursor.fetchone() except Exception as e: print(e) return buzz['replybuzz'] if buzz != None else buzz def send(self, table_name, dictionary): query = self._make_query(table_name, dictionary) self._exec_query(query) def _exec_query(self, query): if not self.conn.open: self.connect() with self.conn.cursor() as cursor: cursor.execute(query) self.conn.commit() def get_settings(option='database', filename='effect.ini'): file_content = '' start = 0 end = 0 try: with open(filename, encoding='utf8') as f: file_content = f.readlines() except: return None for i in range(0, len(file_content)): line_trimmed = file_content[i].strip() if line_trimmed.startswith('#'): continue elif line_trimmed.startswith('[') and line_trimmed.endswith(']') and line_trimmed[1:-1] == option: start = i break for i in range(start + 1, len(file_content)): line_trimmed = file_content[i].strip() if line_trimmed.startswith('#') and line_trimmed[1] == '[' and line_trimmed[-1] == ']': end = i + 1 break elif line_trimmed.startswith('['): end = i + 1 break elif i == len(file_content) - 1: end = i + 1 break if start == end: return None cg = configparser.ConfigParser() cg.read_string(''.join(file_content[start:end])) return cg[option]