You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

187 lines
6.1 KiB

3 years ago
  1. #!/usr/bin/env python3
  2. import sqlite3
  3. import common
  4. import json
  5. import time
  6. import threading
  7. from datetime import datetime
  8. from kafka import KafkaConsumer
  9. def committer(connection):
  10. while True:
  11. connection.commit()
  12. time.sleep(10)
  13. def insert_update_store(connection, stores, value_json):
  14. cursor = connection.cursor()
  15. columns = '('
  16. values_sql = f'('
  17. values = []
  18. update_sql = ''
  19. update_values = []
  20. for dict_key in value_json['store']:
  21. columns = f'{columns}{dict_key}, '
  22. values.append(value_json['store'][dict_key])
  23. values_sql = f'{values_sql}?, '
  24. if dict_key != 'store':
  25. update_sql = f'{update_sql}{dict_key}=?, '
  26. update_values.append(value_json['store'][dict_key])
  27. columns = columns[:-2] + ')'
  28. values_sql = values_sql[:-2] + ')'
  29. update_sql = update_sql[:-2]
  30. update_values.append(value_json["store"]["store"])
  31. try:
  32. sql_statement = f'INSERT INTO store {columns} VALUES {values_sql}'
  33. cursor.execute(sql_statement, values)
  34. stores[value_json["store"]["store"]] = cursor.lastrowid
  35. except sqlite3.IntegrityError as err:
  36. try:
  37. sql_statement = f'UPDATE store SET {update_sql} WHERE store=?'
  38. cursor.execute(sql_statement, tuple(update_values))
  39. except Exception as err:
  40. print(err)
  41. except Exception as err:
  42. print(err)
  43. def insert_update_product(connection, stores, value_json):
  44. cursor = connection.cursor()
  45. columns = '(store, '
  46. values_sql = f'(?, '
  47. values = [stores[value_json["store"]]]
  48. update_sql = ''
  49. update_values = []
  50. for dict_key in value_json['product']:
  51. columns = f'{columns}{dict_key}, '
  52. values.append(value_json['product'][dict_key])
  53. values_sql = f'{values_sql}?, '
  54. if dict_key != 'sku_code':
  55. update_sql = f'{update_sql}{dict_key}=?, '
  56. update_values.append(value_json['product'][dict_key])
  57. columns = columns[:-2] + ')'
  58. values_sql = values_sql[:-2] + ')'
  59. update_sql = update_sql[:-2]
  60. update_values.append(stores[value_json["store"]])
  61. update_values.append(value_json["product"]["sku_code"])
  62. product_id = None
  63. try:
  64. sql_statement = f'INSERT INTO products {columns} VALUES {values_sql}'
  65. cursor.execute(sql_statement, values)
  66. product_id = cursor.lastrowid
  67. #print(f'inserted {product_id}')
  68. except sqlite3.IntegrityError as err:
  69. try:
  70. sql_statement = f'UPDATE products SET {update_sql} WHERE store=? and sku_code=?'
  71. cursor.execute(sql_statement, tuple(update_values))
  72. sql_statement = f'SELECT id FROM products WHERE store=? and sku_code=?'
  73. cursor.execute(sql_statement, (stores[value_json["store"]], value_json["product"]["sku_code"]))
  74. product_id = cursor.fetchone()[0]
  75. #print(f'updated {product_id}')
  76. except Exception as err:
  77. print(err)
  78. except Exception as err:
  79. print(err)
  80. insert_update_price(connection, stores, value_json, product_id)
  81. def insert_update_price(connection, stores, value_json, product_id):
  82. cursor = connection.cursor()
  83. columns = '(product_id, last_update, '
  84. values_sql = f'(?, ?, '
  85. values = [product_id, datetime.now().strftime('%d/%m/%Y')]
  86. for dict_key in value_json['price']:
  87. columns = f'{columns}{dict_key}, '
  88. values.append(value_json['price'][dict_key])
  89. values_sql = f'{values_sql}?, '
  90. columns = columns[:-2] + ')'
  91. values_sql = values_sql[:-2] + ')'
  92. price_exists = False
  93. try:
  94. sql_statement = f'SELECT id FROM price WHERE product_id=? and price=? and active=1'
  95. cursor.execute(sql_statement, (product_id, value_json["price"]["price"]))
  96. if cursor.fetchone():
  97. price_exists = True
  98. except Exception as err:
  99. print(err)
  100. try:
  101. if not price_exists:
  102. sql_statement = f'INSERT INTO price {columns} VALUES {values_sql}'
  103. cursor.execute(sql_statement, values)
  104. else:
  105. sql_statement = f'UPDATE price SET last_update=? WHERE product_id=? and price=? and active=1'
  106. cursor.execute(sql_statement, (datetime.now().strftime("%d/%m/%Y"), product_id, value_json["price"]["price"]))
  107. except sqlite3.IntegrityError as err:
  108. print(err)
  109. except Exception as err:
  110. print(err)
  111. def deactivate_old_price(connection, stores, value_json):
  112. cursor = connection.cursor()
  113. try:
  114. sql_statement=f'UPDATE price SET active=0 WHERE product_id IN (SELECT id FROM products WHERE store=?) AND last_update<?'
  115. cursor.execute(sql_statement, (stores[value_json["store"]], datetime.now().strftime("%d/%m/%Y")))
  116. #print(sql_statement)
  117. except Exception as err:
  118. print(err)
  119. def insert(connection, stores, value_json):
  120. if value_json['type'] == 'store':
  121. print('insert_update_store')
  122. insert_update_store(connection, stores, value_json)
  123. if value_json['type'] == 'store_update':
  124. print('deactivate_old_price')
  125. deactivate_old_price(connection, stores, value_json)
  126. if value_json['type'] == 'product':
  127. #print('insert_update_product')
  128. insert_update_product(connection, stores, value_json)
  129. def get_stores(connection):
  130. stores = {}
  131. cursor = con.cursor()
  132. sql_statement = 'SELECT store, id FROM store'
  133. cursor.execute(sql_statement)
  134. result = cursor.fetchall()
  135. for res in result:
  136. stores[res[0]] = res[1]
  137. return stores
  138. if __name__ == '__main__':
  139. config = common.get_config()
  140. con = sqlite3.connect(config['config']['sqlitedb'], check_same_thread=False)
  141. if config['config']['log_sql']:
  142. con.set_trace_callback(print)
  143. stores = get_stores(con)
  144. commit_thread = threading.Thread(target=committer, args=(con,), daemon=True)
  145. commit_thread.start()
  146. consumer = KafkaConsumer('shopper_db',bootstrap_servers=[config['config']['kafka_boostrap_servers']])
  147. for msg in consumer:
  148. value_json = json.loads(msg.value.decode("utf-8"))
  149. insert(con, stores, value_json)