Re: [DISCUSS] Releasing Flink 1.4.1

2018-01-15 Thread vishal
Hello folks,  
 
https://issues.apache.org/jira/browse/FLINK-8226 . We would want a
schedule on the 1.4.1 version. Without checkpointing and state, CEP is in
effect not production ready IMHO. Is there any time line on 1.4.1 ? The
earlier the better. 

Thank you and Regards, 

Vishal



--
Sent from: http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/


When is the scheduled release for 1.4.1 ? We have been waiting for a fix https://issues.apache.org/jira/browse/FLINK-8226 which looks a potential fix

2018-01-15 Thread Vishal Santoshi
This issues has a possible fix in 1.4.1.

We have been waiting for a fix
https://issues.apache.org/jira/browse/FLINK-8226 which looks a potential fix



A simple CEP loop pattern

 private Pattern, ?> alertPattern =
Pattern.>begin("start").where(checkStatusOn)
.followedBy("middle").where(checkStatusOn).times(2)
.next("end").where(checkStatusOn).within(Time.minutes(5))

I see failures.

SimpleBinaryEvent is

public class SimpleBinaryEvent implements Serializable {

private int id;
private int sequence;
private boolean status;
private long time;

public SimpleBinaryEvent(int id, int sequence, boolean status , long time) {
this.id = id;
this.sequence = sequence;
this.status = status;
this.time = time;
}
public int getId() {
return id;
}
public int getSequence() {
return sequence;
}
public boolean isStatus() {
return status;
}
public long getTime() {
return time;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;

SimpleBinaryEvent that = (SimpleBinaryEvent) o;

if (getId() != that.getId()) return false;
if (isStatus() != that.isStatus()) return false;
if (getSequence() != that.getSequence()) return false;
return getTime() == that.getTime();
}

@Override
public int hashCode() {
//return Objects.hash(getId(),isStatus(), getSequence(),getTime());
int result = getId();
result = 31 * result + (isStatus() ? 1 : 0);
result = 31 * result + getSequence();
result = 31 * result + (int) (getTime() ^ (getTime() >>> 32));
return result;
}

@Override
public String toString() {
return "SimpleBinaryEvent{" +
"id='" + id + '\'' +
", status=" + status +
", sequence=" + sequence +
", time=" + time +
'}';
}

}

failure cause:

Caused by: java.lang.Exception: Could not materialize checkpoint 2 for
operator KeyedCEPPatternOperator -> Map (1/1).
... 6 more
Caused by: java.util.concurrent.ExecutionException:
java.lang.IllegalStateException: Could not find id for entry:
SharedBufferEntry(ValueTimeWrapper((1,SimpleBinaryEvent{id='1',
status=true, sequence=95, time=150550338}), 150550338, 0),

I am sure I have the equals() and hashCode() implemented the way it should
be. I have tried the Objects.hashCode too. In other instances I have had
CircularReference ( and thus stackOverflow ) on SharedBuffer.toString(),
which again points to issues with references ( equality and what not ).
Without checkpointing turned on it works as expected. I am running on a
local cluster. Is CEP production ready ?

I am using 1.3.2 Flink


[jira] [Created] (FLINK-33769) ExternalSorter hits "java.lang.RuntimeException: Error obtaining the sorted input: Thread 'SortMerger Reading Thread' terminated due to an exception: java.io.EOFExceptio

2023-12-07 Thread Vishal Palla (Jira)
Vishal Palla created FLINK-33769:


 Summary: ExternalSorter hits "java.lang.RuntimeException: Error 
obtaining the sorted input: Thread 'SortMerger Reading Thread' terminated due 
to an exception: java.io.EOFException: Can't collect further: memorySource 
depleted" when using custom serializer
 Key: FLINK-33769
 URL: https://issues.apache.org/jira/browse/FLINK-33769
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Task
Affects Versions: 1.17.2
    Reporter: Vishal Palla


The [NormalizedKeySorter 
library|https://github.com/twitter-forks/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/NormalizedKeySorter.java]
 is used to sort records in-memory. It internally uses a 
[SimpleCollectingOutputView|https://github.com/twitter-forks/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/SimpleCollectingOutputView.java]
 instantiated using a fixed chunk of managed memory to store the records. When 
the SimpleCollectingOutputView runs out of memory segments, it [throws an 
EOFException|https://github.com/twitter-forks/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/SimpleCollectingOutputView.java#L76]
 which [should be caught by the sorter in the write method
|https://github.com/twitter-forks/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/NormalizedKeySorter.java#L298]and
 a {{false}} indicating that the sort buffer was full (javadoc). The issue here 
is that the EOFException thrown by the SimpleCollectingOutputView is first 
caught by the record serializer which offers no guarantee on passing on the 
exception as it was caught upwards. In the case of Kryo and Thrift for example, 
the serializer wraps the caught exception in their own exception classes and 
throw them upwards which the sorter doesn't catch and the job crashes.

Example stacktrace -
{{java.lang.RuntimeException: Error obtaining the sorted input: Thread 
'SortMerger Reading Thread' terminated due to an exception: 
java.io.EOFException: Can't collect further: memorySource depleted
at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:487)
at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:357)
at 
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:914)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.flink.util.WrappingRuntimeException: 
java.util.concurrent.ExecutionException: java.lang.RuntimeException: Error 
obtaining the sorted input: Thread 'SortMerger Reading Thread' terminated due 
to an exception: java.io.EOFException: Can't collect further: memorySource 
depleted
at 
org.apache.flink.runtime.operators.sort.ExternalSorter.getIterator(ExternalSorter.java:262)
at 
org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1222)
at 
org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupReduceDriver.java:105)
at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:479)
... 6 more
Caused by: java.util.concurrent.ExecutionException: java.lang.RuntimeException: 
Error obtaining the sorted input: Thread 'SortMerger Reading Thread' terminated 
due to an exception: java.io.EOFException: Can't collect further: memorySource 
depleted
at 
java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395)
at 
java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1999)
at 
org.apache.flink.runtime.operators.sort.ExternalSorter.getIterator(ExternalSorter.java:259)
... 9 more
Caused by: java.lang.RuntimeException: Error obtaining the sorted input: Thread 
'SortMerger Reading Thread' terminated due to an exception: 
java.io.EOFException: Can't collect further: memorySource depleted
at 
org.apache.flink.runtime.operators.sort.ExternalSorter.lambda$getIterator$1(ExternalSorter.java:256)
at 
java.base/java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:986)
at 
java.base/java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:970)
at 
java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
at 
java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088)
at 
org.apache.flink.runtime.operators.sort.ExternalSorterBuilder.lambda$doBuild$1(ExternalSorterBuilder.java:397)
at 
org.apache.flink.runtime.operators.sort.ThreadBase.internalHandleExc

[jira] [Created] (FLINK-9349) KafkaConnector Exception while fetching from multiple kafka topics

2018-05-13 Thread Vishal Santoshi (JIRA)
Vishal Santoshi created FLINK-9349:
--

 Summary: KafkaConnector Exception  while fetching from multiple 
kafka topics
 Key: FLINK-9349
 URL: https://issues.apache.org/jira/browse/FLINK-9349
 Project: Flink
  Issue Type: Bug
  Components: Kafka Connector
Affects Versions: 1.4.0
Reporter: Vishal Santoshi


./flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
 
It seems the List subscribedPartitionStates was being modified when 
runFetchLoop iterated the List.
This can happen if, e.g., FlinkKafkaConsumer runs the following code 
concurrently:
                kafkaFetcher.addDiscoveredPartitions(discoveredPartitions);
 
{code:java}
 java.util.ConcurrentModificationException
at 
java.util.LinkedList$ListItr.checkForComodification(LinkedList.java:966)
at java.util.LinkedList$ListItr.next(LinkedList.java:888)
at 
org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:134)
{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-12333) Add documentation around save point with cancel lifecycle through REST

2019-04-25 Thread Vishal Santoshi (JIRA)
Vishal Santoshi created FLINK-12333:
---

 Summary: Add documentation around save point with cancel lifecycle 
through REST 
 Key: FLINK-12333
 URL: https://issues.apache.org/jira/browse/FLINK-12333
 Project: Flink
  Issue Type: Bug
  Components: Runtime / REST
Affects Versions: 1.8.0
Reporter: Vishal Santoshi


Exit code 2 seen ( after 5 minutes ) when 
{code:java}
curl  --header "Content-Type: application/json" --request POST --data 
'{"target-directory":"***","cancel-job":true}'    
https://***/jobs//savepoints{code}
It seems that when a REST call is 

"Triggered the cancel with savepoint command from via the REST call. This 
command is an asynchronous operation which produces a result (the savepoint 
path). In order to deliver asynchronous results to the caller, Flink waits 
before shutting down until they are delivered or until it times out after 5 
minutes."

 

That implies that one has to execute 
{code:java}
curl  --request GET   
https://**/jobs//savepoints/[Request_id]
{code}
on the request_id returned by the first call ( within 5 minutes ) , for a clean 
exit ( code 0 ) 

 

Please add this life cycle in flink documentation , in all probability here 
[https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/rest_api.html#jobs-jobid-savepoints]

 

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)