Hello!

Question #1
I noticed couple of days ago that my scripts started running slower than
usual. I experimented a bit and it turns out that "compilation" time
depends on how many input files I give to my script. By compilation I mean
everything it does after Pig is being run and before I see new job in
JobTracker webUI.

I have 3600 input files that lives in 24 different folders with names 00 to
23. Pig consumes different amount of time starting from pig -p
input_path=... my-script.pig up to generating jar step depending on how
many input files the script should process. When I give it just one
directory like 00/* it takes only 10-20 seconds before starting job. When I
use bunch of directories as a param 0?/*   then it takes about 120-240
seconds. And it consumes tremendous 15 minutes when I use all my data.

During that hanging (and seems doing nothing) period of time I use
java/bin/jstack and strace and I see that there are only two active
threads:
* FIRST
        epoll_wait(291, {}, 1024, 0)            = 0
        read(287,
"\6\10\327\205\25\20\0\0\0\0;\n9\10\2\22\0\30\254\264\264'\"\3\10\244\3*\7per"...,
8192) = 70
        futex(0x4907b534, FUTEX_WAKE_OP_PRIVATE, 1, 1, 0x4907b530,
{FUTEX_OP_SET, 0, FUTEX_OP_CMP_GT, 1}) = 1
        futex(0x47d48a28, FUTEX_WAKE_PRIVATE, 1) = 1
        clock_gettime(CLOCK_REALTIME, {1370272461, 649119000}) = 0
        futex(0x4907b340, FUTEX_WAKE_PRIVATE, 1) = 1
        futex(0x4907b344, FUTEX_WAIT_PRIVATE, 689631, {9, 998984000}) = -1
EAGAIN (Resource temporarily unavailable)
        futex(0x48c25928, FUTEX_WAKE_PRIVATE, 1) = 0
        read(287, 0x2aaab1111000, 8192)         = -1 EAGAIN (Resource
temporarily unavailable)
        #287 is just a socket

its java stack is
"IPC Client (2138196637) connection to
hbase01.303net.pvt/10.0.240.16:8020from emorozov" daemon prio=10
tid=0x00002aaab108c000 nid=0x711 runnable
[0x0000000042ed9000]
   java.lang.Thread.State: RUNNABLE
at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:210)
 at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:65)
at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:69)
 - locked <0x00000000c1aab558> (a sun.nio.ch.Util$2)
- locked <0x00000000c1aab548> (a java.util.Collections$UnmodifiableSet)
 - locked <0x00000000c1aa4578> (a sun.nio.ch.EPollSelectorImpl)
at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:80)
 at
org.apache.hadoop.net.SocketIOWithTimeout$SelectorPool.select(SocketIOWithTimeout.java:336)
at
org.apache.hadoop.net.SocketIOWithTimeout.doIO(SocketIOWithTimeout.java:158)
 at org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:154)
at org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:127)
 at java.io.FilterInputStream.read(FilterInputStream.java:116)
at java.io.FilterInputStream.read(FilterInputStream.java:116)
 at
org.apache.hadoop.ipc.Client$Connection$PingInputStream.read(Client.java:386)
at java.io.BufferedInputStream.fill(BufferedInputStream.java:218)
 at java.io.BufferedInputStream.read(BufferedInputStream.java:237)
- locked <0x00000000c1800600> (a java.io.BufferedInputStream)
 at java.io.FilterInputStream.read(FilterInputStream.java:66)
at
com.google.protobuf.AbstractMessageLite$Builder.mergeDelimitedFrom(AbstractMessageLite.java:276)
 at
com.google.protobuf.AbstractMessage$Builder.mergeDelimitedFrom(AbstractMessage.java:760)
at
com.google.protobuf.AbstractMessageLite$Builder.mergeDelimitedFrom(AbstractMessageLite.java:288)
 at
com.google.protobuf.AbstractMessage$Builder.mergeDelimitedFrom(AbstractMessage.java:752)
at
org.apache.hadoop.ipc.protobuf.RpcPayloadHeaderProtos$RpcResponseHeaderProto.parseDelimitedFrom(RpcPayloadHeaderProtos.java:985)
 at org.apache.hadoop.ipc.Client$Connection.receiveResponse(Client.java:882)
at org.apache.hadoop.ipc.Client$Connection.run(Client.java:813)



* SECOND
        futex(0x4dd23a28, FUTEX_WAKE_PRIVATE, 1) = 0
        futex(0x4e0e9f94, FUTEX_WAKE_OP_PRIVATE, 1, 1, 0x4e0e9f90,
¨FUTEX_OP_SET, 0, FUTEX_OP_CMP_GT, 1¼) = 1
        futex(0x2aaab105cf28, FUTEX_WAKE_PRIVATE, 1) = 1
        write(287,
"½0½0½0½306½10½10½2½20½0½30½256½265j½273½1½n½vgetFileInfo½22z½nx"..., 202)
= 202
        #287 is same socket


"main" prio=10 tid=0x000000004dd22800 nid=0x6fd in Object.wait()
[0x0000000041fc9000]
   java.lang.Thread.State: WAITING (on object monitor)
at java.lang.Object.wait(Native Method)
 at java.lang.Object.wait(Object.java:485)
at org.apache.hadoop.ipc.Client.call(Client.java:1146)
- locked <0x00000000eda48e00> (a org.apache.hadoop.ipc.Client$Call)
 at
org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:202)
at $Proxy9.getFileInfo(Unknown Source)
 at sun.reflect.GeneratedMethodAccessor10.invoke(Unknown Source)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
 at java.lang.reflect.Method.invoke(Method.java:597)
at
org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:164)
 at
org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:83)
at $Proxy9.getFileInfo(Unknown Source)
 at
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getFileInfo(ClientNamenodeProtocolTranslatorPB.java:628)
at org.apache.hadoop.hdfs.DFSClient.getFileInfo(DFSClient.java:1507)
 at
org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:783)
at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1257)
 at
org.apache.pig.backend.hadoop.datastorage.HDataStorage.isContainer(HDataStorage.java:203)
at
org.apache.pig.backend.hadoop.datastorage.HDataStorage.asElement(HDataStorage.java:131)
 at
org.apache.pig.backend.hadoop.datastorage.HDataStorage.asElement(HDataStorage.java:147)
at
org.apache.pig.backend.hadoop.datastorage.HDataStorage.asElement(HDataStorage.java:153)
 at org.apache.pig.builtin.JsonMetadata.findMetaFile(JsonMetadata.java:131)
at org.apache.pig.builtin.JsonMetadata.getSchema(JsonMetadata.java:188)
 at org.apache.pig.builtin.PigStorage.getSchema(PigStorage.java:465)
at
org.apache.pig.newplan.logical.relational.LOLoad.getSchemaFromMetaData(LOLoad.java:151)
        ...



My complete script is following:
raw = LOAD '$log_path';
raw = FOREACH raw GENERATE $40 AS userId, (long)$14 as advEntityId,
(long)$17 as adNetworkId, (chararray)$34 AS eventType,
       ((chararray)$0 == 'ERROR' OR (chararray)$1 == 'ERROR' OR
(chararray)$3 == 'ERROR' OR
         (chararray)$5 == 'ERROR' OR (chararray)$6 == 'ERROR' OR
(chararray)$8 == 'ERROR' OR
         (chararray)$16 == 'ERROR' OR (chararray)$33 == 'ERROR' OR
(chararray)$34 == 'ERROR' OR
         (chararray)$39 == 'ERROR' OR (chararray)$40 == 'ERROR' OR
(chararray)$41 == 'ERROR' OR
         (chararray)$48 == 'ERROR' OR (chararray)$49 == 'ERROR' ? 1 : 0) AS
skip;
raw2 = FOREACH raw GENERATE (advEntityId is null ? -1 : advEntityId) as
advEntityId, (adNetworkId is null ? -1 : adNetworkId) as adNetworkId,
count, eventType, skip;
raw3 = FILTER raw2 BY NOT (advEntityId == -1 AND adNetworkId == -1 OR
eventType == 'api' OR skip == 1);
raw4 = FOREACH raw3 GENERATE userId;
d_raw = DISTINCT raw4;
s_raw = FOREACH (GROUP d_raw all PARALLEL 1) GENERATE COUNT(d_raw.userId);

Here is what we did couple of days ago.
We had just one box with one zookeeper, namenode and jobtracker. I must say
everything worked perfectly. So, couple of days ago we
1. Moved JobTracker to separate box
2. Moved Zookeepers out of this box. Applied HA to Zookeepers (instead of
one we have three now)

Namenode lives on same box and I tried to run my script on same box.

That's it. I would really really appreciate in anybody could give me a clue
where to go next.


Question #2.
Previous version of the script didn't contain ERROR comparisons. Such a
boolean expression slows my script even more. I would be glad if anyone
could explain it to me why it's happening, cause such an expression is
quite a simple one.


-- 
Evgeny Morozov
Developer Grid Dynamics
Skype: morozov.evgeny
www.griddynamics.com
[email protected]

Reply via email to