-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathproccess_data.py
More file actions
86 lines (72 loc) · 3.04 KB
/
Copy pathproccess_data.py
File metadata and controls
86 lines (72 loc) · 3.04 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
from copy import error
from types import TracebackType
import pandas as pd
import re
import urllib.request as ur
import gzip
import json
from urllib.parse import urlparse
import os
import ast
import logger_handler
from database_class import load_ids
from database_class import bulk_insert
# define parameters
loop_size = 200
price_category = ["cheep","normal","expensive","null price"]
computed_column = "dim_price"
my_logger = logger_handler.get_logger("processing")
def dim_product():
url = 'http://snap.stanford.edu/data/amazon/productGraph/categoryFiles/meta_Musical_Instruments.json.gz'
pk_id = 'asin'
processing(url,pk_id)
def fact_review():
url = 'http://snap.stanford.edu/data/amazon/productGraph/categoryFiles/reviews_Musical_Instruments_5.json.gz'
pk_id = 'reviewerID,asin'
processing(url,pk_id)
# processing function give some parameters that can be set in Airflow then read gzip file on steam
# set table name from url
# unzip file
# check duplicate ids
# fill 200 row in list then call bulk insert function
def processing(url,pk_id):
lines_collected_temp = list()
handle = ur.urlopen(url)
my_logger.debug("get table name from url")
tableName = os.path.basename(urlparse(url).path).split('.')[0]
my_logger.debug("fill list of inserted ids to avoid duplicate insert")
IDs = load_ids(tableName,pk_id)
print(IDs)
my_logger.debug("unzip file and read")
with gzip.GzipFile(fileobj=handle,) as f:
for line in f:
str_line = line.decode('utf-8')
json_line = ast.literal_eval(str_line)
my_logger.debug("check price column and add dim_price as a price category")
if 'price' in json_line :
if json_line['price'] < 10 :
json_line[computed_column] = price_category[0]
elif json_line['price'] >= 60 :
json_line[computed_column] = price_category[2]
else :
json_line[computed_column] = price_category[1]
else :
json_line[computed_column] = price_category[3]
my_logger.debug("fetch data from database to check duplicate IDs")
id = get_id_from_json_line(json_line,pk_id)
# if [json_line[pk_id], json_line['asin']] not in IDs:
if id not in IDs:
IDs.append(id)
lines_collected_temp.append(json_line)
my_logger.debug("call bulk insert after reading " + str(loop_size) +" rows")
if len(lines_collected_temp) >= loop_size:
bulk_insert(lines_collected_temp, tableName)
my_logger.debug("clear list for next batch")
lines_collected_temp = list()
def get_id_from_json_line(json_line,pk_id):
pk_id_array = pk_id.split(',')
pk_ids = []
for i in pk_id_array:
pk_ids.append(json_line[i])
return pk_ids
# dim_product()