Prashant,

thanks a lot, that solved my issue!!


On Tue, Jun 4, 2013 at 1:07 AM, Prashant Kommireddi <[email protected]>wrote:

> Can you try loading the input files without the schema?
>
> raw = LOAD '$log_path' using PigStorage('\t', '-noschema');
>
> PigStorage by default looks for schema files and that *may* be slowing down
> things (based on your assessment of slowness due to the # of input dirs).
>
>
> On Mon, Jun 3, 2013 at 12:59 PM, Eugene Morozov
> <[email protected]>wrote:
>
> > Hello!
> >
> >
> > Question #1
> > I noticed couple of days ago that my scripts started running slower than
> > usual. I experimented a bit and it turns out that "compilation" time
> > depends on how many input files I give to my script. By compilation I
> mean
> > everything it does after Pig is being run and before I see new job in
> > JobTracker webUI.
> >
> > I have 3600 input files that lives in 24 different folders with names 00
> to
> > 23. Pig consumes different amount of time starting from pig -p
> > input_path=... my-script.pig up to generating jar step depending on how
> > many input files the script should process. When I give it just one
> > directory like 00/* it takes only 10-20 seconds before starting job.
> When I
> > use bunch of directories as a param 0?/*   then it takes about 120-240
> > seconds. And it consumes tremendous 15 minutes when I use all my data.
> >
> > During that hanging (and seems doing nothing) period of time I use
> > java/bin/jstack and strace and I see that there are only two active
> > threads:
> > * FIRST
> >         epoll_wait(291, {}, 1024, 0)            = 0
> >         read(287,
> >
> >
> "\6\10\327\205\25\20\0\0\0\0;\n9\10\2\22\0\30\254\264\264'\"\3\10\244\3*\7per"...,
> > 8192) = 70
> >         futex(0x4907b534, FUTEX_WAKE_OP_PRIVATE, 1, 1, 0x4907b530,
> > {FUTEX_OP_SET, 0, FUTEX_OP_CMP_GT, 1}) = 1
> >         futex(0x47d48a28, FUTEX_WAKE_PRIVATE, 1) = 1
> >         clock_gettime(CLOCK_REALTIME, {1370272461, 649119000}) = 0
> >         futex(0x4907b340, FUTEX_WAKE_PRIVATE, 1) = 1
> >         futex(0x4907b344, FUTEX_WAIT_PRIVATE, 689631, {9, 998984000}) =
> -1
> > EAGAIN (Resource temporarily unavailable)
> >         futex(0x48c25928, FUTEX_WAKE_PRIVATE, 1) = 0
> >         read(287, 0x2aaab1111000, 8192)         = -1 EAGAIN (Resource
> > temporarily unavailable)
> >         #287 is just a socket
> >
> > its java stack is
> > "IPC Client (2138196637) connection to
> > hbase01.303net.pvt/10.0.240.16:8020from emorozov" daemon prio=10
> > tid=0x00002aaab108c000 nid=0x711 runnable
> > [0x0000000042ed9000]
> >    java.lang.Thread.State: RUNNABLE
> > at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
> > at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:210)
> >  at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:65)
> > at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:69)
> >  - locked <0x00000000c1aab558> (a sun.nio.ch.Util$2)
> > - locked <0x00000000c1aab548> (a java.util.Collections$UnmodifiableSet)
> >  - locked <0x00000000c1aa4578> (a sun.nio.ch.EPollSelectorImpl)
> > at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:80)
> >  at
> >
> >
> org.apache.hadoop.net.SocketIOWithTimeout$SelectorPool.select(SocketIOWithTimeout.java:336)
> > at
> >
> >
> org.apache.hadoop.net.SocketIOWithTimeout.doIO(SocketIOWithTimeout.java:158)
> >  at
> > org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:154)
> > at
> org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:127)
> >  at java.io.FilterInputStream.read(FilterInputStream.java:116)
> > at java.io.FilterInputStream.read(FilterInputStream.java:116)
> >  at
> >
> >
> org.apache.hadoop.ipc.Client$Connection$PingInputStream.read(Client.java:386)
> > at java.io.BufferedInputStream.fill(BufferedInputStream.java:218)
> >  at java.io.BufferedInputStream.read(BufferedInputStream.java:237)
> > - locked <0x00000000c1800600> (a java.io.BufferedInputStream)
> >  at java.io.FilterInputStream.read(FilterInputStream.java:66)
> > at
> >
> >
> com.google.protobuf.AbstractMessageLite$Builder.mergeDelimitedFrom(AbstractMessageLite.java:276)
> >  at
> >
> >
> com.google.protobuf.AbstractMessage$Builder.mergeDelimitedFrom(AbstractMessage.java:760)
> > at
> >
> >
> com.google.protobuf.AbstractMessageLite$Builder.mergeDelimitedFrom(AbstractMessageLite.java:288)
> >  at
> >
> >
> com.google.protobuf.AbstractMessage$Builder.mergeDelimitedFrom(AbstractMessage.java:752)
> > at
> >
> >
> org.apache.hadoop.ipc.protobuf.RpcPayloadHeaderProtos$RpcResponseHeaderProto.parseDelimitedFrom(RpcPayloadHeaderProtos.java:985)
> >  at
> > org.apache.hadoop.ipc.Client$Connection.receiveResponse(Client.java:882)
> > at org.apache.hadoop.ipc.Client$Connection.run(Client.java:813)
> >
> >
> >
> > * SECOND
> >         futex(0x4dd23a28, FUTEX_WAKE_PRIVATE, 1) = 0
> >         futex(0x4e0e9f94, FUTEX_WAKE_OP_PRIVATE, 1, 1, 0x4e0e9f90,
> > ¨FUTEX_OP_SET, 0, FUTEX_OP_CMP_GT, 1¼) = 1
> >         futex(0x2aaab105cf28, FUTEX_WAKE_PRIVATE, 1) = 1
> >         write(287,
> > "½0½0½0½306½10½10½2½20½0½30½256½265j½273½1½n½vgetFileInfo½22z½nx"...,
> 202)
> > = 202
> >         #287 is same socket
> >
> >
> > "main" prio=10 tid=0x000000004dd22800 nid=0x6fd in Object.wait()
> > [0x0000000041fc9000]
> >    java.lang.Thread.State: WAITING (on object monitor)
> > at java.lang.Object.wait(Native Method)
> >  at java.lang.Object.wait(Object.java:485)
> > at org.apache.hadoop.ipc.Client.call(Client.java:1146)
> > - locked <0x00000000eda48e00> (a org.apache.hadoop.ipc.Client$Call)
> >  at
> >
> >
> org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:202)
> > at $Proxy9.getFileInfo(Unknown Source)
> >  at sun.reflect.GeneratedMethodAccessor10.invoke(Unknown Source)
> > at
> >
> >
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
> >  at java.lang.reflect.Method.invoke(Method.java:597)
> > at
> >
> >
> org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:164)
> >  at
> >
> >
> org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:83)
> > at $Proxy9.getFileInfo(Unknown Source)
> >  at
> >
> >
> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getFileInfo(ClientNamenodeProtocolTranslatorPB.java:628)
> > at org.apache.hadoop.hdfs.DFSClient.getFileInfo(DFSClient.java:1507)
> >  at
> >
> >
> org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:783)
> > at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1257)
> >  at
> >
> >
> org.apache.pig.backend.hadoop.datastorage.HDataStorage.isContainer(HDataStorage.java:203)
> > at
> >
> >
> org.apache.pig.backend.hadoop.datastorage.HDataStorage.asElement(HDataStorage.java:131)
> >  at
> >
> >
> org.apache.pig.backend.hadoop.datastorage.HDataStorage.asElement(HDataStorage.java:147)
> > at
> >
> >
> org.apache.pig.backend.hadoop.datastorage.HDataStorage.asElement(HDataStorage.java:153)
> >  at
> org.apache.pig.builtin.JsonMetadata.findMetaFile(JsonMetadata.java:131)
> > at org.apache.pig.builtin.JsonMetadata.getSchema(JsonMetadata.java:188)
> >  at org.apache.pig.builtin.PigStorage.getSchema(PigStorage.java:465)
> > at
> >
> >
> org.apache.pig.newplan.logical.relational.LOLoad.getSchemaFromMetaData(LOLoad.java:151)
> >         ...
> >
> >
> >
> > My complete script is following:
> > raw = LOAD '$log_path';
> > raw = FOREACH raw GENERATE $40 AS userId, (long)$14 as advEntityId,
> > (long)$17 as adNetworkId, (chararray)$34 AS eventType,
> >        ((chararray)$0 == 'ERROR' OR (chararray)$1 == 'ERROR' OR
> > (chararray)$3 == 'ERROR' OR
> >          (chararray)$5 == 'ERROR' OR (chararray)$6 == 'ERROR' OR
> > (chararray)$8 == 'ERROR' OR
> >          (chararray)$16 == 'ERROR' OR (chararray)$33 == 'ERROR' OR
> > (chararray)$34 == 'ERROR' OR
> >          (chararray)$39 == 'ERROR' OR (chararray)$40 == 'ERROR' OR
> > (chararray)$41 == 'ERROR' OR
> >          (chararray)$48 == 'ERROR' OR (chararray)$49 == 'ERROR' ? 1 : 0)
> AS
> > skip;
> > raw2 = FOREACH raw GENERATE (advEntityId is null ? -1 : advEntityId) as
> > advEntityId, (adNetworkId is null ? -1 : adNetworkId) as adNetworkId,
> > count, eventType, skip;
> > raw3 = FILTER raw2 BY NOT (advEntityId == -1 AND adNetworkId == -1 OR
> > eventType == 'api' OR skip == 1);
> > raw4 = FOREACH raw3 GENERATE userId;
> > d_raw = DISTINCT raw4;
> > s_raw = FOREACH (GROUP d_raw all PARALLEL 1) GENERATE
> COUNT(d_raw.userId);
> >
> > Here is what we did couple of days ago.
> > We had just one box with one zookeeper, namenode and jobtracker. I must
> say
> > everything worked perfectly. So, couple of days ago we
> > 1. Moved JobTracker to separate box
> > 2. Moved Zookeepers out of this box. Applied HA to Zookeepers (instead of
> > one we have three now)
> >
> > Namenode lives on same box and I tried to run my script on same box.
> >
> > That's it. I would really really appreciate in anybody could give me a
> clue
> > where to go next.
> >
> >
> > Question #2.
> > Previous version of the script didn't contain ERROR comparisons. Such a
> > boolean expression slows my script even more. I would be glad if anyone
> > could explain it to me why it's happening, cause such an expression is
> > quite a simple one.
> >
> >
> > --
> > Evgeny Morozov
> > Developer Grid Dynamics
> > Skype: morozov.evgeny
> > www.griddynamics.com
> > [email protected]
> >
>



-- 
Evgeny Morozov
Developer Grid Dynamics
Skype: morozov.evgeny
www.griddynamics.com
[email protected]

Reply via email to