Luning Wong <gfen...@gmail.com> 于2022年2月21日周一 19:38写道:

> import logging
> import sys
>
> from pyflink.common import SimpleStringSchema, WatermarkStrategy
> from pyflink.datastream import StreamExecutionEnvironment
> from pyflink.datastream.connectors import PulsarSource,
> PulsarDeserializationSchema, SubscriptionType
> from pyflink.common.typeinfo import Types
>
>
> def foo():
> env = StreamExecutionEnvironment.get_execution_environment()
> env.set_parallelism(1)
> env.add_jars(
> 'file:///Users/a/src/me/flink/flink-connectors/flink-sql-connector-pulsar/target/flink-sql-connector-pulsar-1.15-SNAPSHOT.jar'
> )
> deserialization_schema = PulsarDeserializationSchema.flink_schema(
> SimpleStringSchema())
> # deserialization_schema =
> PulsarDeserializationSchema.flink_type_info(Types.STRING(), None)
>
> ps = PulsarSource.builder()\
> .set_deserialization_schema(deserialization_schema)\
> .set_service_url('pulsar://localhost:6650')\
> .set_admin_url('http://localhost:8080')\
> .set_topics('ada')\
> .set_subscription_name('axcsdas')\
> .set_subscription_type(SubscriptionType.Exclusive)\
> .build()
>
> kafka_source = env.from_source(
> source=ps,
> watermark_strategy=WatermarkStrategy.for_monotonous_timestamps(),
> source_name="pulsar_source"
> )
> kafka_source.print()
> env.execute('pulsar_source')
>
>
> if __name__ == '__main__':
> logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="
> %(message)s")
> foo()
>
> The above is my test script. it is successful to submit a job to a
> standalone cluster.
>
> Best,
> Wong
>
> Yufei Zhang <affei...@gmail.com> 于2022年2月21日周一 18:33写道:
>
>> Hi Ananth,
>>
>>
>> From the steps you described, the steps involved using
>> `flink-sql-connector-pulsar-1.15-SNAPSHOT.jar`, however to my knowledge
>> pulsar connector has not supported Table API yet, so would you mind
>> considering using the  `flink-connector-pulsar-1.14.jar` (without sql,
>> though the classes should be the same. 1.14 is also the stable version) .
>> Since it failed to submit, I'm wildly guessing it's because some class not
>> found issue prevented the serialization before submitting.
>>
>> Also, you mentioned "Get a “transactions not enabled” error in spite of
>> enabling transactions in 2.8.0 broker" this is interesting. To use
>> transactions, not only do we need to enable transactions in the broker, but
>> also in the pulsar source connector as well. Please refer to
>> PulsarOptions.PULSAR_ENABLE_TRANSACTION for more details. (generally, a
>> call PulsarSourceBuilder#setConfig(PulsarOptions.PULSAR_ENABLE_TRANSACTION,
>> true) would suffice)
>>
>>
>> Thank you for your report and I think since you have these detailed steps
>> to reproduce, I'd recommend submitting a JIRA ticket and we'll try to
>> reproduce the issue you just described in the coming days to find the exact
>> cause. Thank you so much for your precise steps to reproduce.
>>
>> Cheers,
>> Yufei.
>>
>> On Mon, Feb 21, 2022 at 5:47 PM Ananth Gundabattula <
>> agundabatt...@darwinium.com> wrote:
>>
>>> Thanks Guowei.
>>>
>>>
>>>
>>> A small correction in the telnet result command below. I had a typo in
>>> the telnet command earlier (did not separate the port from host name ).
>>> Issuing the proper telnet command resolved the jobmanagers host properly.
>>>
>>>
>>>
>>> Regards,
>>>
>>> Ananth
>>>
>>>
>>>
>>> *From: *Guowei Ma <guowei....@gmail.com>
>>> *Date: *Monday, 21 February 2022 at 8:42 pm
>>> *To: *Ananth Gundabattula <agundabatt...@darwinium.com>
>>> *Cc: *user@flink.apache.org <user@flink.apache.org>, affei...@gmail.com
>>> <affei...@gmail.com>
>>> *Subject: *Re: Pulsar connector 2.9.1 failing job submission in
>>> standalone.
>>>
>>> Thanks Ananth for your clarification.But I am not an expert on Pulsar.
>>>
>>> I would cc the author of the connector to have a look. Would Yufei like
>>> to give some insight?
>>>
>>>
>>> Best,
>>>
>>> Guowei
>>>
>>>
>>>
>>>
>>>
>>> On Mon, Feb 21, 2022 at 2:10 PM Ananth Gundabattula <
>>> agundabatt...@darwinium.com> wrote:
>>>
>>> Thanks for the response Guowei.
>>>
>>>
>>>
>>>    - Tried a telnet to the jobmanager host:port and I get “*127.0.0.1:8086
>>>    <http://127.0.0.1:8086>: nodename nor servname provided, or not known*”
>>>    which suggests that the network access is fine ?
>>>    - I resubmitted the word count example and it ran fine to completion.
>>>
>>>
>>>
>>> For the pulsar script, I have also tried localhost, and the local LAN
>>> Ips as jobmanager host configuration in conf/flink.yaml and all of them end
>>> with the same result. I have also tried this with Pulsar 2.8.0 and it did
>>> have issues with “shared” subscription type (Get a “transactions not
>>> enabled” error in spite of enabling transactions in 2.8.0 broker).  When I
>>> change the subscription type to “Exclusive” it exhibits the same behavior
>>> as the Pulsar 2.9.1 version. i.e. The job manager submission fails. (in
>>> both 2.8.0 pulsar and 2.9.1 pulsar)
>>>
>>>
>>>
>>> Regards,
>>>
>>> Ananth
>>>
>>>
>>>
>>> *From: *Guowei Ma <guowei....@gmail.com>
>>> *Date: *Monday, 21 February 2022 at 4:57 pm
>>> *To: *Ananth Gundabattula <agundabatt...@darwinium.com>
>>> *Cc: *user@flink.apache.org <user@flink.apache.org>
>>> *Subject: *Re: Pulsar connector 2.9.1 failing job submission in
>>> standalone.
>>>
>>> Hi, Ansanth
>>>
>>>
>>>
>>> I don't see any error logs on the server side, so it's hard to tell what
>>> the specific problem is. From the current log, there are two things to try
>>> first:
>>>
>>>
>>> 1. From the client's log, it is a 5-minute timeout, so you can telnet
>>> 127.0.0.1:8086 to see if there is a problem with the local network
>>> 2. From the log on the server side, there is no job submission at all.
>>> You can try to submit the wordcount example again when submitting the
>>> pulsar example fails. So as to rule out whether the session cluster is
>>> inherently problematic.
>>>
>>>
>>> Best,
>>>
>>> Guowei
>>>
>>>
>>>
>>>
>>>
>>> On Mon, Feb 21, 2022 at 9:48 AM Ananth Gundabattula <
>>> agundabatt...@darwinium.com> wrote:
>>>
>>> Hello All,
>>>
>>>
>>>
>>> I have a Pyflink script that needs to read from Pulsar and process the
>>> data.
>>>
>>>
>>>
>>> I have done the following to implement a prototype.
>>>
>>>
>>>
>>>    1. Since I need Pyflink way to connect to Pulsar , I checked out the
>>>    code from master branch as advised in a different thread. (PyFlink Pulsar
>>>    connector seems to be slated for 1.15 release)
>>>    2. I built the Flink source.
>>>    3. I am using the following location as FLINK_HOME under the source:
>>>    flink-dist/target/flink-1.15-SNAPSHOT-bin/flink-1.15-SNAPSHOT
>>>    4. The python pyflink wheels have been appropriately installed in
>>>    the right python conda environment.
>>>    5. I copied the flink-sql-connector-pulsar-1.15-SNAPSHOT.jar into
>>>    the $FLINK_HOME/lib folder.
>>>    6. I started the standalone cluster by running bin/start-cluster.sh
>>>    7. I submit my test script by using bin/flink run –python …
>>>    8. If am launching the the word_count example in flink
>>>    documentation, everything runs fine and it completes successfully.
>>>    9. However, if the script involves the Pulsar connector, the logs
>>>    show that the Flink client codebase is not able to submit the job to the
>>>    Jobamanger.
>>>    10. It ultimately dies with a Channel Idle exception. (See this in
>>>    DEBUG mode of the logs). I am attaching the logs for reference.
>>>
>>>
>>>
>>> I am trying this on OSx. Please note that the classic word_count script
>>> works fine without any issues and I see the job submission failures on the
>>> client only when the pulsar source connector is in the script. I have also
>>> added the logs for the standalone session job manager.I am also attaching
>>> the script for reference.
>>>
>>>
>>>
>>> Could you please advise what can I do to resolve the issue. (Will raise
>>> an JIRA-Issue if someone thinks it is a bug).
>>>
>>>
>>>
>>> Regards,
>>>
>>> Ananth
>>>
>>>
>>>
>>>

Reply via email to