apache pig - performance improvement of pig script using python udf -
following pig(0.15) script used mapping inputfile(cdrs alias) other file (mastergt alias) & calling python(2.7.11) udf mapping same, taking 40mins 4.5k records. can please suggest improvement.
pig script:
register 'smsiuc_udf.py' using streaming_python smsiuc_udfs; cdrs = load '2016040111*' using pigstorage('|','-tagfile') ; mastergtrec = load 'master.txt' using pigstorage(',','-tagfile'); mastergt = foreach mastergtrec generate (chararray) upper($1) opcdpc, (chararray) upper($2) gtoptname,(chararray) upper($3) gtoptcircle; cdrrecord = foreach cdrs generate (chararray) upper($1) aparty, (chararray) upper($2) bparty,$3 smssentdate,$4 smssenttime,($29=='6' ? 's' : 'f') status,(chararray) upper($26) srcgt,(chararray) upper($27) destgt,($12=='405899136999995' ? 'mtsdel-cdma' : ($12=='919875089998' ? 'mtsraj-gsm' : ($12=='405899150999995' ? 'mtschn-cdma' : $12) ) ) smscgt, (chararray)$0 cdrfname,(chararray) $13 prepost; filteredp2pcdrs = filter cdrrecord smsiuc_udfs.pullp2pcdrs(aparty,bparty,srcgt,destgt) , status == 's' , substring(smssentdate,4,6) == '$mon'; groupp2pcdrs = group filteredp2pcdrs (srcgt,destgt,aparty,bparty,smscgt,status,prepost); distinctp2pcdrs= foreach groupp2pcdrs { uniq = distinct filteredp2pcdrs.(srcgt,destgt,aparty,bparty,smscgt,status,prepost); generate flatten(group),count(uniq) cnt; }; p2preportmap = foreach distinctp2pcdrs generate smsiuc_udfs.p2preport(srcgt,destgt,aparty,bparty),smscgt,status,prepost,cnt
python udf follows:
def p2preport(srcgt,destgt,aparty,bparty): mastergt = {} masterlrn = {} origno = str(int(aparty)) destno = str(int(bparty)) returnstring = [] try: if ((os.path.isfile(masterlrn) , os.access(masterlrn, os.r_ok) , os.stat(masterlrn).st_size > 0) , (os.path.isfile(mastergt) , os.access(mastergt, os.r_ok) , os.stat(mastergt).st_size > 0)): #read contents of master gt/lrn in bag/dict mastergt = readfileinbag(mastergt,1) masterlrn = readfileinbag(masterlrn,2) mastergtcircle = readfileinbag(mastergt,2) if(srcgt in mastergt): returnstring = mastergt[srcgt] elif(srcgt[0:9] in mastergt): returnstring = mastergt[srcgt[0:9]] elif(srcgt[0:8] in mastergt): returnstring = mastergt[srcgt[0:8]] elif(srcgt[0:7] in mastergt): returnstring = mastergt[srcgt[0:7]] elif(srcgt[0:6] in mastergt): returnstring = mastergt[srcgt[0:6]] elif(srcgt[0:5] in mastergt): returnstring = mastergt[srcgt[0:5]] elif(srcgt[0:4] in mastergt): returnstring = mastergt[srcgt[0:4]] else: returnstring = mastergt.get(srcgt,srcgt+",") if destgt in mastergt: returnstring = returnstring + "," + mastergt[destgt] elif(destgt[0:9] in mastergt): returnstring = returnstring + "," + mastergt[destgt[0:9]] elif(destgt[0:8] in mastergt): returnstring = returnstring + "," + mastergt[destgt[0:8]] elif(destgt[0:7] in mastergt): returnstring = returnstring + "," + mastergt[destgt[0:7]] elif(destgt[0:6] in mastergt): returnstring = returnstring + "," + mastergt[destgt[0:6]] elif(destgt[0:5] in mastergt): returnstring = returnstring + "," + mastergt[destgt[0:5]] elif(destgt[0:4] in mastergt): returnstring = returnstring + "," + mastergt[destgt[0:4]] else: returnstring = returnstring + mastergt.get(destgt,destgt+",") return returnstring except attributeerror: pass
Comments
Post a Comment