Luning Wong <gfen...@gmail.com> 于2022年2月21日周一 19:38写道:
> import logging > import sys > > from pyflink.common import SimpleStringSchema, WatermarkStrategy > from pyflink.datastream import StreamExecutionEnvironment > from pyflink.datastream.connectors import PulsarSource, > PulsarDeserializationSchema, SubscriptionType > from pyflink.common.typeinfo import Types > > > def foo(): > env = StreamExecutionEnvironment.get_execution_environment() > env.set_parallelism(1) > env.add_jars( > 'file:///Users/a/src/me/flink/flink-connectors/flink-sql-connector-pulsar/target/flink-sql-connector-pulsar-1.15-SNAPSHOT.jar' > ) > deserialization_schema = PulsarDeserializationSchema.flink_schema( > SimpleStringSchema()) > # deserialization_schema = > PulsarDeserializationSchema.flink_type_info(Types.STRING(), None) > > ps = PulsarSource.builder()\ > .set_deserialization_schema(deserialization_schema)\ > .set_service_url('pulsar://localhost:6650')\ > .set_admin_url('http://localhost:8080')\ > .set_topics('ada')\ > .set_subscription_name('axcsdas')\ > .set_subscription_type(SubscriptionType.Exclusive)\ > .build() > > kafka_source = env.from_source( > source=ps, > watermark_strategy=WatermarkStrategy.for_monotonous_timestamps(), > source_name="pulsar_source" > ) > kafka_source.print() > env.execute('pulsar_source') > > > if __name__ == '__main__': > logging.basicConfig(stream=sys.stdout, level=logging.INFO, format=" > %(message)s") > foo() > > The above is my test script. it is successful to submit a job to a > standalone cluster. > > Best, > Wong > > Yufei Zhang <affei...@gmail.com> 于2022年2月21日周一 18:33写道: > >> Hi Ananth, >> >> >> From the steps you described, the steps involved using >> `flink-sql-connector-pulsar-1.15-SNAPSHOT.jar`, however to my knowledge >> pulsar connector has not supported Table API yet, so would you mind >> considering using the `flink-connector-pulsar-1.14.jar` (without sql, >> though the classes should be the same. 1.14 is also the stable version) . >> Since it failed to submit, I'm wildly guessing it's because some class not >> found issue prevented the serialization before submitting. >> >> Also, you mentioned "Get a “transactions not enabled” error in spite of >> enabling transactions in 2.8.0 broker" this is interesting. To use >> transactions, not only do we need to enable transactions in the broker, but >> also in the pulsar source connector as well. Please refer to >> PulsarOptions.PULSAR_ENABLE_TRANSACTION for more details. (generally, a >> call PulsarSourceBuilder#setConfig(PulsarOptions.PULSAR_ENABLE_TRANSACTION, >> true) would suffice) >> >> >> Thank you for your report and I think since you have these detailed steps >> to reproduce, I'd recommend submitting a JIRA ticket and we'll try to >> reproduce the issue you just described in the coming days to find the exact >> cause. Thank you so much for your precise steps to reproduce. >> >> Cheers, >> Yufei. >> >> On Mon, Feb 21, 2022 at 5:47 PM Ananth Gundabattula < >> agundabatt...@darwinium.com> wrote: >> >>> Thanks Guowei. >>> >>> >>> >>> A small correction in the telnet result command below. I had a typo in >>> the telnet command earlier (did not separate the port from host name ). >>> Issuing the proper telnet command resolved the jobmanagers host properly. >>> >>> >>> >>> Regards, >>> >>> Ananth >>> >>> >>> >>> *From: *Guowei Ma <guowei....@gmail.com> >>> *Date: *Monday, 21 February 2022 at 8:42 pm >>> *To: *Ananth Gundabattula <agundabatt...@darwinium.com> >>> *Cc: *user@flink.apache.org <user@flink.apache.org>, affei...@gmail.com >>> <affei...@gmail.com> >>> *Subject: *Re: Pulsar connector 2.9.1 failing job submission in >>> standalone. >>> >>> Thanks Ananth for your clarification.But I am not an expert on Pulsar. >>> >>> I would cc the author of the connector to have a look. Would Yufei like >>> to give some insight? >>> >>> >>> Best, >>> >>> Guowei >>> >>> >>> >>> >>> >>> On Mon, Feb 21, 2022 at 2:10 PM Ananth Gundabattula < >>> agundabatt...@darwinium.com> wrote: >>> >>> Thanks for the response Guowei. >>> >>> >>> >>> - Tried a telnet to the jobmanager host:port and I get “*127.0.0.1:8086 >>> <http://127.0.0.1:8086>: nodename nor servname provided, or not known*” >>> which suggests that the network access is fine ? >>> - I resubmitted the word count example and it ran fine to completion. >>> >>> >>> >>> For the pulsar script, I have also tried localhost, and the local LAN >>> Ips as jobmanager host configuration in conf/flink.yaml and all of them end >>> with the same result. I have also tried this with Pulsar 2.8.0 and it did >>> have issues with “shared” subscription type (Get a “transactions not >>> enabled” error in spite of enabling transactions in 2.8.0 broker). When I >>> change the subscription type to “Exclusive” it exhibits the same behavior >>> as the Pulsar 2.9.1 version. i.e. The job manager submission fails. (in >>> both 2.8.0 pulsar and 2.9.1 pulsar) >>> >>> >>> >>> Regards, >>> >>> Ananth >>> >>> >>> >>> *From: *Guowei Ma <guowei....@gmail.com> >>> *Date: *Monday, 21 February 2022 at 4:57 pm >>> *To: *Ananth Gundabattula <agundabatt...@darwinium.com> >>> *Cc: *user@flink.apache.org <user@flink.apache.org> >>> *Subject: *Re: Pulsar connector 2.9.1 failing job submission in >>> standalone. >>> >>> Hi, Ansanth >>> >>> >>> >>> I don't see any error logs on the server side, so it's hard to tell what >>> the specific problem is. From the current log, there are two things to try >>> first: >>> >>> >>> 1. From the client's log, it is a 5-minute timeout, so you can telnet >>> 127.0.0.1:8086 to see if there is a problem with the local network >>> 2. From the log on the server side, there is no job submission at all. >>> You can try to submit the wordcount example again when submitting the >>> pulsar example fails. So as to rule out whether the session cluster is >>> inherently problematic. >>> >>> >>> Best, >>> >>> Guowei >>> >>> >>> >>> >>> >>> On Mon, Feb 21, 2022 at 9:48 AM Ananth Gundabattula < >>> agundabatt...@darwinium.com> wrote: >>> >>> Hello All, >>> >>> >>> >>> I have a Pyflink script that needs to read from Pulsar and process the >>> data. >>> >>> >>> >>> I have done the following to implement a prototype. >>> >>> >>> >>> 1. Since I need Pyflink way to connect to Pulsar , I checked out the >>> code from master branch as advised in a different thread. (PyFlink Pulsar >>> connector seems to be slated for 1.15 release) >>> 2. I built the Flink source. >>> 3. I am using the following location as FLINK_HOME under the source: >>> flink-dist/target/flink-1.15-SNAPSHOT-bin/flink-1.15-SNAPSHOT >>> 4. The python pyflink wheels have been appropriately installed in >>> the right python conda environment. >>> 5. I copied the flink-sql-connector-pulsar-1.15-SNAPSHOT.jar into >>> the $FLINK_HOME/lib folder. >>> 6. I started the standalone cluster by running bin/start-cluster.sh >>> 7. I submit my test script by using bin/flink run –python … >>> 8. If am launching the the word_count example in flink >>> documentation, everything runs fine and it completes successfully. >>> 9. However, if the script involves the Pulsar connector, the logs >>> show that the Flink client codebase is not able to submit the job to the >>> Jobamanger. >>> 10. It ultimately dies with a Channel Idle exception. (See this in >>> DEBUG mode of the logs). I am attaching the logs for reference. >>> >>> >>> >>> I am trying this on OSx. Please note that the classic word_count script >>> works fine without any issues and I see the job submission failures on the >>> client only when the pulsar source connector is in the script. I have also >>> added the logs for the standalone session job manager.I am also attaching >>> the script for reference. >>> >>> >>> >>> Could you please advise what can I do to resolve the issue. (Will raise >>> an JIRA-Issue if someone thinks it is a bug). >>> >>> >>> >>> Regards, >>> >>> Ananth >>> >>> >>> >>>