[ 
https://issues.apache.org/jira/browse/FLINK-9126?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16443124#comment-16443124
 ] 

ASF GitHub Bot commented on FLINK-9126:
---------------------------------------

GitHub user Jicaar opened a pull request:

    https://github.com/apache/flink/pull/5876

    [FLINK-9126] New CassandraPojoInputFormat to output data as a custom 
annotated Cassandra Pojo

    ## What is the purpose of the change
    
    Committing the new CassandraPojoInputFormat class. This works similarly to 
the CassandraInputClass, but allows the data that is read from Cassandra to be 
output as a custom POJO that the user has created whichhas been annotated using 
Datastax API. 
    
    ## Brief change log
    
    -Initial commit of the CassandraPojoInputFormat class and validation test.
    
    ## Verifying this change
    
    -CassandraPojoInputFormat can be validated with the 
testCassandraBatchPojoFormat test in the CassandraConnectorITCaseTest.java file.
    
    ## Does this pull request potentially affect one of the following parts:
    
      - Dependencies (does it add or upgrade a dependency): no
      - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
      - The serializers: no
      - The runtime per-record code paths (performance sensitive): don't know
      - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: no
      - The S3 file system connector: no
    
    ## Documentation
    
      - Does this pull request introduce a new feature? yes
      - If yes, how is the feature documented? JavaDocs


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/Jicaar/flink master

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/5876.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #5876
    
----
commit 5a9f1795a61c7f27b2e5170c275a10e8ce269d77
Author: Jicaar <jpcarterara@...>
Date:   2018-04-09T19:25:56Z

    Also adding in code for CassandraPojoInputFormat class in connection with 
Jira task FLINK-9126
    Merge remote-tracking branch 'upstream/master'
    
    # Conflicts:
    #   README.md
    #   
flink-annotations/src/main/java/org/apache/flink/annotation/docs/ConfigGroup.java
    #   
flink-annotations/src/main/java/org/apache/flink/annotation/docs/ConfigGroups.java
    #   
flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011ITCase.java
    #   
flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
    #   
flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
    #   
flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
    #   
flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
    #   flink-docs/pom.xml
    #   
flink-docs/src/main/java/org/apache/flink/docs/configuration/ConfigOptionsDocGenerator.java
    #   
flink-docs/src/test/java/org/apache/flink/docs/configuration/ConfigOptionsDocGeneratorTest.java
    #   flink-libraries/flink-sql-client/conf/sql-client-defaults.yaml
    #   
flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Environment.java
    #   
flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
    #   
flink-libraries/flink-sql-client/src/test/resources/test-sql-client-defaults.yaml
    #   
flink-libraries/flink-sql-client/src/test/resources/test-sql-client-factory.yaml
    #   
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/TableDescriptorValidator.scala
    #   
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/TableSourceDescriptor.scala
    #   
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/sources/TableSourceFactoryServiceTest.scala
    #   
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/sources/TestTableSourceFactory.scala
    #   
flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/messages/MessageType.java
    #   
flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
    #   
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConfig.java
    #   
flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/message/KvStateRequestType.java
    #   
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java
    #   
flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
    #   
flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java
    #   
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
    #   
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
    #   
flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
    #   
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamUtils.java
    #   
flink-streaming-java/src/main/java/org/apache/flink/streaming/experimental/CollectSink.java
    #   
flink-streaming-java/src/main/java/org/apache/flink/streaming/experimental/SocketStreamIterator.java
    #   
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/InputProcessorUtil.java
    #   
flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStreamUtils.scala
    #   
flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerFailsITCase.scala

commit 8390239beb88315dff895372157dd87759e048f2
Author: Jicaar <jpcarterara@...>
Date:   2018-04-09T19:26:49Z

    Adding code for the class CassandraPojoInputFormat as well as code for 
setting up the test setup.

commit ad89310c9fa7d5188a68524d3de3f4c5e7b91a49
Author: Jicaar <jpcarterara@...>
Date:   2018-04-10T22:53:25Z

    Added test for CasandraPojoInputFormat class and ran successfully.

commit d392c5faaf6479b5d7a0f1adb54d4d6b3303cefa
Author: Jicaar <jpcarterara@...>
Date:   2018-04-18T19:22:41Z

    Merge branch 'master' of git://github.com/apache/flink

commit 32c328c9668e8ed7a2efdf06a100f8b82e98b1c5
Author: Jicaar <jpcarterara@...>
Date:   2018-04-18T20:06:28Z

    Removing leftover system.print lines that were from testing.

----


> Creation of the CassandraPojoInputFormat class to output data into a Custom 
> Cassandra Annotated Pojo
> ----------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-9126
>                 URL: https://issues.apache.org/jira/browse/FLINK-9126
>             Project: Flink
>          Issue Type: New Feature
>          Components: DataSet API
>    Affects Versions: 1.4.2
>            Reporter: Jeffrey Carter
>            Priority: Minor
>              Labels: easyfix, features, newbie
>             Fix For: 1.6.0
>
>         Attachments: CassandraPojoInputFormatText.rtf
>
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> *First time proposing new update so apologies if I missed anything*
> Currently the DataSet API only has the ability to output data received from 
> Cassandra as a source in as a Tuple. This would be allow the data to be 
> output as a custom POJO that the user has created that has been annotated 
> using Datastax API. This would remove the need of  very long Tuples to be 
> created by the DataSet and then mapped to the custom POJO.
>  
> -The changes to the CassandraInputFormat object would be minimal, but would 
> require importing the Datastax API into the class-. Another option is to make 
> a similar, but slightly different class called CassandraPojoInputFormat.
> I have already gotten code for this working in my own project, but want other 
> thoughts as to the best way this should go about being implemented.
>  
> //Example of its use in main
> CassandraPojoInputFormat<CustomCassandraPojo> cassandraInputFormat = new 
> CassandraPojoInputFormat<>(queryToRun, defaultClusterBuilder, 
> CustomCassandraPojo.class);
>  cassandraInputFormat.configure(null);
>  cassandraInputFormat.open(null);
> DataSet<CustomCassandraPojo> outputTestSet = 
> exEnv.createInput(cassandraInputFormat, TypeInformation.of(new 
> TypeHint<CustomCassandraPojo>(){}));
>  
> //The class that I currently have set up
> [^CassandraPojoInputFormatText.rtf]
>  
> Will make another Jira Issue for the Output version next if this is approved



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to