#!/usr/bin/env python3 import sqlite3 import common import json import time import threading from datetime import datetime from kafka import KafkaConsumer def committer(connection): while True: connection.commit() time.sleep(10) def insert_update_store(connection, stores, value_json): cursor = connection.cursor() columns = '(' values_sql = f'(' values = [] update_sql = '' update_values = [] for dict_key in value_json['store']: columns = f'{columns}{dict_key}, ' values.append(value_json['store'][dict_key]) values_sql = f'{values_sql}?, ' if dict_key != 'store': update_sql = f'{update_sql}{dict_key}=?, ' update_values.append(value_json['store'][dict_key]) columns = columns[:-2] + ')' values_sql = values_sql[:-2] + ')' update_sql = update_sql[:-2] update_values.append(value_json["store"]["store"]) try: sql_statement = f'INSERT INTO store {columns} VALUES {values_sql}' cursor.execute(sql_statement, values) stores[value_json["store"]["store"]] = cursor.lastrowid except sqlite3.IntegrityError as err: try: sql_statement = f'UPDATE store SET {update_sql} WHERE store=?' cursor.execute(sql_statement, tuple(update_values)) except Exception as err: print(err) except Exception as err: print(err) def insert_update_product(connection, stores, value_json): cursor = connection.cursor() columns = '(store, ' values_sql = f'(?, ' values = [stores[value_json["store"]]] update_sql = '' update_values = [] for dict_key in value_json['product']: columns = f'{columns}{dict_key}, ' values.append(value_json['product'][dict_key]) values_sql = f'{values_sql}?, ' if dict_key != 'sku_code': update_sql = f'{update_sql}{dict_key}=?, ' update_values.append(value_json['product'][dict_key]) columns = columns[:-2] + ')' values_sql = values_sql[:-2] + ')' update_sql = update_sql[:-2] update_values.append(stores[value_json["store"]]) update_values.append(value_json["product"]["sku_code"]) product_id = None try: sql_statement = f'INSERT INTO products {columns} VALUES {values_sql}' cursor.execute(sql_statement, values) product_id = cursor.lastrowid #print(f'inserted {product_id}') except sqlite3.IntegrityError as err: try: sql_statement = f'UPDATE products SET {update_sql} WHERE store=? and sku_code=?' cursor.execute(sql_statement, tuple(update_values)) sql_statement = f'SELECT id FROM products WHERE store=? and sku_code=?' cursor.execute(sql_statement, (stores[value_json["store"]], value_json["product"]["sku_code"])) product_id = cursor.fetchone()[0] #print(f'updated {product_id}') except Exception as err: print(err) except Exception as err: print(err) insert_update_price(connection, stores, value_json, product_id) def insert_update_price(connection, stores, value_json, product_id): cursor = connection.cursor() columns = '(product_id, last_update, ' values_sql = f'(?, ?, ' values = [product_id, datetime.now().strftime('%d/%m/%Y')] for dict_key in value_json['price']: columns = f'{columns}{dict_key}, ' values.append(value_json['price'][dict_key]) values_sql = f'{values_sql}?, ' columns = columns[:-2] + ')' values_sql = values_sql[:-2] + ')' price_exists = False try: sql_statement = f'SELECT id FROM price WHERE product_id=? and price=? and active=1' cursor.execute(sql_statement, (product_id, value_json["price"]["price"])) if cursor.fetchone(): price_exists = True except Exception as err: print(err) try: if not price_exists: sql_statement = f'INSERT INTO price {columns} VALUES {values_sql}' cursor.execute(sql_statement, values) else: sql_statement = f'UPDATE price SET last_update=? WHERE product_id=? and price=? and active=1' cursor.execute(sql_statement, (datetime.now().strftime("%d/%m/%Y"), product_id, value_json["price"]["price"])) except sqlite3.IntegrityError as err: print(err) except Exception as err: print(err) def deactivate_old_price(connection, stores, value_json): cursor = connection.cursor() try: sql_statement=f'UPDATE price SET active=0 WHERE product_id IN (SELECT id FROM products WHERE store=?) AND last_update