|
|
- #!/usr/bin/env python3
-
- import sqlite3
- import common
- import json
- import time
- import threading
-
- from datetime import datetime
- from kafka import KafkaConsumer
-
-
- def committer(connection):
- while True:
- connection.commit()
- time.sleep(10)
-
-
- def insert_update_store(connection, stores, value_json):
- cursor = connection.cursor()
-
- columns = '('
- values_sql = f'('
- values = []
- update_sql = ''
- update_values = []
-
- for dict_key in value_json['store']:
- columns = f'{columns}{dict_key}, '
- values.append(value_json['store'][dict_key])
- values_sql = f'{values_sql}?, '
- if dict_key != 'store':
- update_sql = f'{update_sql}{dict_key}=?, '
- update_values.append(value_json['store'][dict_key])
-
- columns = columns[:-2] + ')'
- values_sql = values_sql[:-2] + ')'
- update_sql = update_sql[:-2]
- update_values.append(value_json["store"]["store"])
-
- try:
- sql_statement = f'INSERT INTO store {columns} VALUES {values_sql}'
- cursor.execute(sql_statement, values)
- stores[value_json["store"]["store"]] = cursor.lastrowid
- except sqlite3.IntegrityError as err:
- try:
- sql_statement = f'UPDATE store SET {update_sql} WHERE store=?'
- cursor.execute(sql_statement, tuple(update_values))
- except Exception as err:
- print(err)
- except Exception as err:
- print(err)
-
-
- def insert_update_product(connection, stores, value_json):
- cursor = connection.cursor()
-
- columns = '(store, '
- values_sql = f'(?, '
- values = [stores[value_json["store"]]]
- update_sql = ''
- update_values = []
-
- for dict_key in value_json['product']:
- columns = f'{columns}{dict_key}, '
- values.append(value_json['product'][dict_key])
- values_sql = f'{values_sql}?, '
- if dict_key != 'sku_code':
- update_sql = f'{update_sql}{dict_key}=?, '
- update_values.append(value_json['product'][dict_key])
-
- columns = columns[:-2] + ')'
- values_sql = values_sql[:-2] + ')'
- update_sql = update_sql[:-2]
- update_values.append(stores[value_json["store"]])
- update_values.append(value_json["product"]["sku_code"])
-
- product_id = None
-
- try:
- sql_statement = f'INSERT INTO products {columns} VALUES {values_sql}'
- cursor.execute(sql_statement, values)
- product_id = cursor.lastrowid
- #print(f'inserted {product_id}')
- except sqlite3.IntegrityError as err:
- try:
- sql_statement = f'UPDATE products SET {update_sql} WHERE store=? and sku_code=?'
- cursor.execute(sql_statement, tuple(update_values))
- sql_statement = f'SELECT id FROM products WHERE store=? and sku_code=?'
- cursor.execute(sql_statement, (stores[value_json["store"]], value_json["product"]["sku_code"]))
- product_id = cursor.fetchone()[0]
- #print(f'updated {product_id}')
- except Exception as err:
- print(err)
- except Exception as err:
- print(err)
-
- insert_update_price(connection, stores, value_json, product_id)
-
- def insert_update_price(connection, stores, value_json, product_id):
- cursor = connection.cursor()
-
- columns = '(product_id, last_update, '
- values_sql = f'(?, ?, '
- values = [product_id, datetime.now().strftime('%d/%m/%Y')]
-
- for dict_key in value_json['price']:
- columns = f'{columns}{dict_key}, '
- values.append(value_json['price'][dict_key])
- values_sql = f'{values_sql}?, '
-
- columns = columns[:-2] + ')'
- values_sql = values_sql[:-2] + ')'
-
- price_exists = False
- try:
- sql_statement = f'SELECT id FROM price WHERE product_id=? and price=? and active=1'
- cursor.execute(sql_statement, (product_id, value_json["price"]["price"]))
- if cursor.fetchone():
- price_exists = True
- except Exception as err:
- print(err)
- try:
- if not price_exists:
- sql_statement = f'INSERT INTO price {columns} VALUES {values_sql}'
- cursor.execute(sql_statement, values)
- else:
- sql_statement = f'UPDATE price SET last_update=? WHERE product_id=? and price=? and active=1'
- cursor.execute(sql_statement, (datetime.now().strftime("%d/%m/%Y"), product_id, value_json["price"]["price"]))
- except sqlite3.IntegrityError as err:
- print(err)
- except Exception as err:
- print(err)
-
-
- def deactivate_old_price(connection, stores, value_json):
- cursor = connection.cursor()
- try:
- sql_statement=f'UPDATE price SET active=0 WHERE product_id IN (SELECT id FROM products WHERE store=?) AND last_update<?'
- cursor.execute(sql_statement, (stores[value_json["store"]], datetime.now().strftime("%d/%m/%Y")))
- #print(sql_statement)
- except Exception as err:
- print(err)
-
- def insert(connection, stores, value_json):
- if value_json['type'] == 'store':
- print('insert_update_store')
- insert_update_store(connection, stores, value_json)
-
- if value_json['type'] == 'store_update':
- print('deactivate_old_price')
- deactivate_old_price(connection, stores, value_json)
-
- if value_json['type'] == 'product':
- #print('insert_update_product')
- insert_update_product(connection, stores, value_json)
-
-
- def get_stores(connection):
- stores = {}
- cursor = con.cursor()
-
- sql_statement = 'SELECT store, id FROM store'
- cursor.execute(sql_statement)
- result = cursor.fetchall()
- for res in result:
- stores[res[0]] = res[1]
-
- return stores
-
-
- if __name__ == '__main__':
- config = common.get_config()
-
- con = sqlite3.connect(config['config']['sqlitedb'], check_same_thread=False)
- if config['config']['log_sql']:
- con.set_trace_callback(print)
-
- stores = get_stores(con)
-
- commit_thread = threading.Thread(target=committer, args=(con,), daemon=True)
- commit_thread.start()
-
- consumer = KafkaConsumer('shopper_db',bootstrap_servers=[config['config']['kafka_boostrap_servers']])
- for msg in consumer:
- value_json = json.loads(msg.value.decode("utf-8"))
- insert(con, stores, value_json)
|