Prashant, thanks a lot, that solved my issue!!
On Tue, Jun 4, 2013 at 1:07 AM, Prashant Kommireddi <[email protected]>wrote: > Can you try loading the input files without the schema? > > raw = LOAD '$log_path' using PigStorage('\t', '-noschema'); > > PigStorage by default looks for schema files and that *may* be slowing down > things (based on your assessment of slowness due to the # of input dirs). > > > On Mon, Jun 3, 2013 at 12:59 PM, Eugene Morozov > <[email protected]>wrote: > > > Hello! > > > > > > Question #1 > > I noticed couple of days ago that my scripts started running slower than > > usual. I experimented a bit and it turns out that "compilation" time > > depends on how many input files I give to my script. By compilation I > mean > > everything it does after Pig is being run and before I see new job in > > JobTracker webUI. > > > > I have 3600 input files that lives in 24 different folders with names 00 > to > > 23. Pig consumes different amount of time starting from pig -p > > input_path=... my-script.pig up to generating jar step depending on how > > many input files the script should process. When I give it just one > > directory like 00/* it takes only 10-20 seconds before starting job. > When I > > use bunch of directories as a param 0?/* then it takes about 120-240 > > seconds. And it consumes tremendous 15 minutes when I use all my data. > > > > During that hanging (and seems doing nothing) period of time I use > > java/bin/jstack and strace and I see that there are only two active > > threads: > > * FIRST > > epoll_wait(291, {}, 1024, 0) = 0 > > read(287, > > > > > "\6\10\327\205\25\20\0\0\0\0;\n9\10\2\22\0\30\254\264\264'\"\3\10\244\3*\7per"..., > > 8192) = 70 > > futex(0x4907b534, FUTEX_WAKE_OP_PRIVATE, 1, 1, 0x4907b530, > > {FUTEX_OP_SET, 0, FUTEX_OP_CMP_GT, 1}) = 1 > > futex(0x47d48a28, FUTEX_WAKE_PRIVATE, 1) = 1 > > clock_gettime(CLOCK_REALTIME, {1370272461, 649119000}) = 0 > > futex(0x4907b340, FUTEX_WAKE_PRIVATE, 1) = 1 > > futex(0x4907b344, FUTEX_WAIT_PRIVATE, 689631, {9, 998984000}) = > -1 > > EAGAIN (Resource temporarily unavailable) > > futex(0x48c25928, FUTEX_WAKE_PRIVATE, 1) = 0 > > read(287, 0x2aaab1111000, 8192) = -1 EAGAIN (Resource > > temporarily unavailable) > > #287 is just a socket > > > > its java stack is > > "IPC Client (2138196637) connection to > > hbase01.303net.pvt/10.0.240.16:8020from emorozov" daemon prio=10 > > tid=0x00002aaab108c000 nid=0x711 runnable > > [0x0000000042ed9000] > > java.lang.Thread.State: RUNNABLE > > at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method) > > at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:210) > > at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:65) > > at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:69) > > - locked <0x00000000c1aab558> (a sun.nio.ch.Util$2) > > - locked <0x00000000c1aab548> (a java.util.Collections$UnmodifiableSet) > > - locked <0x00000000c1aa4578> (a sun.nio.ch.EPollSelectorImpl) > > at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:80) > > at > > > > > org.apache.hadoop.net.SocketIOWithTimeout$SelectorPool.select(SocketIOWithTimeout.java:336) > > at > > > > > org.apache.hadoop.net.SocketIOWithTimeout.doIO(SocketIOWithTimeout.java:158) > > at > > org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:154) > > at > org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:127) > > at java.io.FilterInputStream.read(FilterInputStream.java:116) > > at java.io.FilterInputStream.read(FilterInputStream.java:116) > > at > > > > > org.apache.hadoop.ipc.Client$Connection$PingInputStream.read(Client.java:386) > > at java.io.BufferedInputStream.fill(BufferedInputStream.java:218) > > at java.io.BufferedInputStream.read(BufferedInputStream.java:237) > > - locked <0x00000000c1800600> (a java.io.BufferedInputStream) > > at java.io.FilterInputStream.read(FilterInputStream.java:66) > > at > > > > > com.google.protobuf.AbstractMessageLite$Builder.mergeDelimitedFrom(AbstractMessageLite.java:276) > > at > > > > > com.google.protobuf.AbstractMessage$Builder.mergeDelimitedFrom(AbstractMessage.java:760) > > at > > > > > com.google.protobuf.AbstractMessageLite$Builder.mergeDelimitedFrom(AbstractMessageLite.java:288) > > at > > > > > com.google.protobuf.AbstractMessage$Builder.mergeDelimitedFrom(AbstractMessage.java:752) > > at > > > > > org.apache.hadoop.ipc.protobuf.RpcPayloadHeaderProtos$RpcResponseHeaderProto.parseDelimitedFrom(RpcPayloadHeaderProtos.java:985) > > at > > org.apache.hadoop.ipc.Client$Connection.receiveResponse(Client.java:882) > > at org.apache.hadoop.ipc.Client$Connection.run(Client.java:813) > > > > > > > > * SECOND > > futex(0x4dd23a28, FUTEX_WAKE_PRIVATE, 1) = 0 > > futex(0x4e0e9f94, FUTEX_WAKE_OP_PRIVATE, 1, 1, 0x4e0e9f90, > > ¨FUTEX_OP_SET, 0, FUTEX_OP_CMP_GT, 1¼) = 1 > > futex(0x2aaab105cf28, FUTEX_WAKE_PRIVATE, 1) = 1 > > write(287, > > "½0½0½0½306½10½10½2½20½0½30½256½265j½273½1½n½vgetFileInfo½22z½nx"..., > 202) > > = 202 > > #287 is same socket > > > > > > "main" prio=10 tid=0x000000004dd22800 nid=0x6fd in Object.wait() > > [0x0000000041fc9000] > > java.lang.Thread.State: WAITING (on object monitor) > > at java.lang.Object.wait(Native Method) > > at java.lang.Object.wait(Object.java:485) > > at org.apache.hadoop.ipc.Client.call(Client.java:1146) > > - locked <0x00000000eda48e00> (a org.apache.hadoop.ipc.Client$Call) > > at > > > > > org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:202) > > at $Proxy9.getFileInfo(Unknown Source) > > at sun.reflect.GeneratedMethodAccessor10.invoke(Unknown Source) > > at > > > > > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25) > > at java.lang.reflect.Method.invoke(Method.java:597) > > at > > > > > org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:164) > > at > > > > > org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:83) > > at $Proxy9.getFileInfo(Unknown Source) > > at > > > > > org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getFileInfo(ClientNamenodeProtocolTranslatorPB.java:628) > > at org.apache.hadoop.hdfs.DFSClient.getFileInfo(DFSClient.java:1507) > > at > > > > > org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:783) > > at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1257) > > at > > > > > org.apache.pig.backend.hadoop.datastorage.HDataStorage.isContainer(HDataStorage.java:203) > > at > > > > > org.apache.pig.backend.hadoop.datastorage.HDataStorage.asElement(HDataStorage.java:131) > > at > > > > > org.apache.pig.backend.hadoop.datastorage.HDataStorage.asElement(HDataStorage.java:147) > > at > > > > > org.apache.pig.backend.hadoop.datastorage.HDataStorage.asElement(HDataStorage.java:153) > > at > org.apache.pig.builtin.JsonMetadata.findMetaFile(JsonMetadata.java:131) > > at org.apache.pig.builtin.JsonMetadata.getSchema(JsonMetadata.java:188) > > at org.apache.pig.builtin.PigStorage.getSchema(PigStorage.java:465) > > at > > > > > org.apache.pig.newplan.logical.relational.LOLoad.getSchemaFromMetaData(LOLoad.java:151) > > ... > > > > > > > > My complete script is following: > > raw = LOAD '$log_path'; > > raw = FOREACH raw GENERATE $40 AS userId, (long)$14 as advEntityId, > > (long)$17 as adNetworkId, (chararray)$34 AS eventType, > > ((chararray)$0 == 'ERROR' OR (chararray)$1 == 'ERROR' OR > > (chararray)$3 == 'ERROR' OR > > (chararray)$5 == 'ERROR' OR (chararray)$6 == 'ERROR' OR > > (chararray)$8 == 'ERROR' OR > > (chararray)$16 == 'ERROR' OR (chararray)$33 == 'ERROR' OR > > (chararray)$34 == 'ERROR' OR > > (chararray)$39 == 'ERROR' OR (chararray)$40 == 'ERROR' OR > > (chararray)$41 == 'ERROR' OR > > (chararray)$48 == 'ERROR' OR (chararray)$49 == 'ERROR' ? 1 : 0) > AS > > skip; > > raw2 = FOREACH raw GENERATE (advEntityId is null ? -1 : advEntityId) as > > advEntityId, (adNetworkId is null ? -1 : adNetworkId) as adNetworkId, > > count, eventType, skip; > > raw3 = FILTER raw2 BY NOT (advEntityId == -1 AND adNetworkId == -1 OR > > eventType == 'api' OR skip == 1); > > raw4 = FOREACH raw3 GENERATE userId; > > d_raw = DISTINCT raw4; > > s_raw = FOREACH (GROUP d_raw all PARALLEL 1) GENERATE > COUNT(d_raw.userId); > > > > Here is what we did couple of days ago. > > We had just one box with one zookeeper, namenode and jobtracker. I must > say > > everything worked perfectly. So, couple of days ago we > > 1. Moved JobTracker to separate box > > 2. Moved Zookeepers out of this box. Applied HA to Zookeepers (instead of > > one we have three now) > > > > Namenode lives on same box and I tried to run my script on same box. > > > > That's it. I would really really appreciate in anybody could give me a > clue > > where to go next. > > > > > > Question #2. > > Previous version of the script didn't contain ERROR comparisons. Such a > > boolean expression slows my script even more. I would be glad if anyone > > could explain it to me why it's happening, cause such an expression is > > quite a simple one. > > > > > > -- > > Evgeny Morozov > > Developer Grid Dynamics > > Skype: morozov.evgeny > > www.griddynamics.com > > [email protected] > > > -- Evgeny Morozov Developer Grid Dynamics Skype: morozov.evgeny www.griddynamics.com [email protected]
