The error we get is the following:
org.apache.flink.client.deployment.ClusterDeploymentException: Couldn't deploy
Yarn session cluster
at
org.apache.flink.yarn.AbstractYarnClusterDescriptor.deploySessionCluster(AbstractYarnClusterDescriptor.java:423)
at
org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:262)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:216)
at
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1053)
at
org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1129)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1920)
at
org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1129)
Caused by: java.lang.RuntimeException: Hadoop security with Kerberos is enabled
but the login user does not have Kerberos credentials
at
org.apache.flink.yarn.AbstractYarnClusterDescriptor.deployInternal(AbstractYarnClusterDescriptor.java:490)
at
org.apache.flink.yarn.AbstractYarnClusterDescriptor.deploySessionCluster(AbstractYarnClusterDescriptor.java:416)
... 9 more
On 1/10/20, 3:36 PM, "Aljoscha Krettek" <[email protected]> wrote:
Hi,
Interesting! What problem are you seeing when you don't unset that
environment variable? From reading UserGroupInformation.java our code
should almost work when that environment variable is set.
Best,
Aljoscha
On 10.01.20 15:23, Juan Gentile wrote:
> Hello Aljoscha!
>
>
>
> The way we send the DTs to spark is by setting an env variable
(HADOOP_TOKEN_FILE_LOCATION) which interestingly we have to unset when we run
Flink because even if we do kinit that variable affects somehow Flink and
doesn’t work.
>
> I’m not an expert but what you describe (We would need to modify the
SecurityUtils [2] to create a UGI from tokens, i.e.
UserGroupInformation.createRemoteUser() and then addCreds() or addToken())
makes sense.
>
> If you are able to do this it would be great and help us a lot!
>
>
>
> Thank you,
>
> Juan
>
>
>
> On 1/10/20, 3:13 PM, "Aljoscha Krettek" <[email protected]> wrote:
>
>
>
> Hi,
>
>
>
> it seems I hin send to early, my mail was missing a small part. This
is
>
> the full mail again:
>
>
>
> to summarize and clarify various emails: currently, you can only use
>
> Kerberos authentication via tickets (i.e. kinit) or keytab. The
relevant
>
> bit of code is in the Hadoop security module [1]. Here you can see
that
>
> we either use keytab or try to login as a user.
>
>
>
> I think we should be able to extend this to also work with delegation
>
> tokens (DTs). We would need to modify the SecurityUtils [2] to
create a
>
> UGI from tokens, i.e. UserGroupInformation.createRemoteUser() and
then
>
> addCreds() or addToken(). In Spark, how do you pass the DTs to the
>
> system as a user?
>
>
>
> Best,
>
> Aljoscha
>
>
>
> [1]
>
>
https://eur03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fapache%2Fflink%2Fblob%2F64e2f27640946bf3b1608d4d85585fe18891dcee%2Fflink-runtime%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fflink%2Fruntime%2Fsecurity%2Fmodules%2FHadoopModule.java%23L68&data=02%7C01%7Cj.gentile%40criteo.com%7C812d636410c8447bf75608d795da85b1%7C2a35d8fd574d48e3927c8c398e225a01%7C1%7C0%7C637142638087617028&sdata=3FKN2UWzWr1K14klFN1Su2hKvhi8%2BADj4GCwdTDnKQU%3D&reserved=0
>
> [2]
>
>
https://eur03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fapache%2Fflink%2Fblob%2F2bbf4e5002d7657018ba0b53b7a3ed8ee3124da8%2Fflink-runtime%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fflink%2Fruntime%2Fsecurity%2FSecurityUtils.java%23L89&data=02%7C01%7Cj.gentile%40criteo.com%7C812d636410c8447bf75608d795da85b1%7C2a35d8fd574d48e3927c8c398e225a01%7C1%7C0%7C637142638087626978&sdata=to%2FsDgs1JlTASVg76OmxRD4dVPuSuwZC5jBpPms8hFY%3D&reserved=0
>
>
>
> On 10.01.20 15:02, Aljoscha Krettek wrote:
>
> > Hi Juan,
>
> >
>
> > to summarize and clarify various emails: currently, you can only
use
>
> > Kerberos authentication via tickets (i.e. kinit) or keytab. The
relevant
>
> > bit of code is in the Hadoop security module: [1]. Here you can
see that
>
> > we either use keytab.
>
> >
>
> > I think we should be able to extend this to also work with
delegation
>
> > tokens (DTs). In Spark, how do you pass the DTs to the system as a
user?
>
> >
>
> > Best,
>
> > Aljoscha
>
> >
>
> > [1]
>
> >
https://eur03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fapache%2Fflink%2Fblob%2F64e2f27640946bf3b1608d4d85585fe18891dcee%2Fflink-runtime%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fflink%2Fruntime%2Fsecurity%2Fmodules%2FHadoopModule.java%23L68&data=02%7C01%7Cj.gentile%40criteo.com%7C812d636410c8447bf75608d795da85b1%7C2a35d8fd574d48e3927c8c398e225a01%7C1%7C0%7C637142638087626978&sdata=T9Oy6ZwcDyH2mraQqvYXusan271romFT2tEjQRTk%2FVc%3D&reserved=0
>
> >
>
> >
>
> > On 06.01.20 09:52, Yang Wang wrote:
>
> >> I guess you have set some kerberos related configuration in spark
>
> >> jobs. For
>
> >> Flink, you need
>
> >> to do this too by the following configs. And the keytab file
should
>
> >> existed
>
> >> on Flink client. In your
>
> >> environment, it means the scheduler(oozie) could access the
keytab file.
>
> >>
>
> >> security.kerberos.login.keytab
>
> >> security.kerberos.login.principal
>
> >> security.kerberos.login.contexts
>
> >>
>
> >>
>
> >>
>
> >> Best,
>
> >> Yang
>
> >>
>
> >> Juan Gentile <[email protected]> 于2020年1月6日周一 下午3:55写道:
>
> >>
>
> >>> Hello Rong, Chesnay,
>
> >>>
>
> >>>
>
> >>>
>
> >>> Thank you for your answer, the way we are trying to launch the
job is
>
> >>> through a scheduler (similar to oozie) where we have a keytab
for the
>
> >>> scheduler user and with that keytab we get delegation tokens
>
> >>> impersonating
>
> >>> the right user (owner of the job). But the only way I was able to
>
> >>> make this
>
> >>> work is by getting a ticket (through kinit).
>
> >>>
>
> >>> As a comparison, if I launch a spark job (without doing kinit)
just with
>
> >>> the delegation tokens, it works okay. So I guess Spark does
something
>
> >>> extra.
>
> >>>
>
> >>> This is as far as I could go but at this point I’m not sure if
this is
>
> >>> something just not supported by Flink or I’m doing something
wrong.
>
> >>>
>
> >>>
>
> >>>
>
> >>> Thank you,
>
> >>>
>
> >>> Juan
>
> >>>
>
> >>>
>
> >>>
>
> >>> *From: *Rong Rong <[email protected]>
>
> >>> *Date: *Saturday, January 4, 2020 at 6:06 PM
>
> >>> *To: *Chesnay Schepler <[email protected]>
>
> >>> *Cc: *Juan Gentile <[email protected]>,
"[email protected]" <
>
> >>> [email protected]>, Oleksandr Nitavskyi
<[email protected]>
>
> >>> *Subject: *Re: Yarn Kerberos issue
>
> >>>
>
> >>>
>
> >>>
>
> >>> Hi Juan,
>
> >>>
>
> >>>
>
> >>>
>
> >>> Chesnay was right. If you are using CLI to launch your session
cluster
>
> >>> based on the document [1], you following the instruction to use
kinit
>
> >>> [2]
>
> >>> first seems to be one of the right way to go.
>
> >>>
>
> >>> Another way of approaching it is to setup the kerberos settings
in the
>
> >>> flink-conf.yaml file [3]. FlinkYarnSessionCli will be able to
pick up
>
> >>> your
>
> >>> keytab files and run the CLI securely.
>
> >>>
>
> >>>
>
> >>>
>
> >>> As far as I know the option
`security.kerberos.login.use-ticket-cache`
>
> >>> doesn't actually change the behavior of the authentication
process,
>
> >>> it is
>
> >>> more of a hint whether to use the ticket cache instantiated by
>
> >>> `kinit`. If
>
> >>> you disable using the ticket cache, you will have to use the
>
> >>> "keytab/principle" approach - this doc [4] might be helpful to
explain
>
> >>> better.
>
> >>>
>
> >>>
>
> >>>
>
> >>> Thanks,
>
> >>>
>
> >>> Rong
>
> >>>
>
> >>>
>
> >>>
>
> >>>
>
> >>>
>
> >>> [1]
>
> >>>
https://eur03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fci.apache.org%2Fprojects%2Fflink%2Fflink-docs-stable%2Fops%2Fdeployment%2Fyarn_setup.html%23start-flink-session&data=02%7C01%7Cj.gentile%40criteo.com%7C812d636410c8447bf75608d795da85b1%7C2a35d8fd574d48e3927c8c398e225a01%7C1%7C0%7C637142638087626978&sdata=UiAsVViNxO9kFF93N0VXS704lMaCXCnI4%2F4tTsHglsk%3D&reserved=0
>
> >>>
>
> >>>
<https://eur03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fci.apache.org%2Fprojects%2Fflink%2Fflink-docs-stable%2Fops%2Fdeployment%2Fyarn_setup.html%23start-flink-session&data=02%7C01%7Cj.gentile%40criteo.com%7C812d636410c8447bf75608d795da85b1%7C2a35d8fd574d48e3927c8c398e225a01%7C1%7C0%7C637142638087626978&sdata=UiAsVViNxO9kFF93N0VXS704lMaCXCnI4%2F4tTsHglsk%3D&reserved=0>
>
> >>>
>
> >>>
>
> >>> [2]
>
> >>>
https://eur03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fci.apache.org%2Fprojects%2Fflink%2Fflink-docs-stable%2Fops%2Fsecurity-kerberos.html%23using-kinit-yarn-only&data=02%7C01%7Cj.gentile%40criteo.com%7C812d636410c8447bf75608d795da85b1%7C2a35d8fd574d48e3927c8c398e225a01%7C1%7C0%7C637142638087626978&sdata=fW03W5sRPF3SiTc4j7DG8YMzUrhh3sOp0nW5Fo1CxZc%3D&reserved=0
>
> >>>
>
> >>>
<https://eur03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fci.apache.org%2Fprojects%2Fflink%2Fflink-docs-stable%2Fops%2Fsecurity-kerberos.html%23using-kinit-yarn-only&data=02%7C01%7Cj.gentile%40criteo.com%7C812d636410c8447bf75608d795da85b1%7C2a35d8fd574d48e3927c8c398e225a01%7C1%7C0%7C637142638087626978&sdata=fW03W5sRPF3SiTc4j7DG8YMzUrhh3sOp0nW5Fo1CxZc%3D&reserved=0>
>
> >>>
>
> >>>
>
> >>> [3]
>
> >>>
https://eur03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fci.apache.org%2Fprojects%2Fflink%2Fflink-docs-stable%2Fops%2Fsecurity-kerberos.html%23yarnmesos-mode&data=02%7C01%7Cj.gentile%40criteo.com%7C812d636410c8447bf75608d795da85b1%7C2a35d8fd574d48e3927c8c398e225a01%7C1%7C0%7C637142638087626978&sdata=xoRo3%2BM1Lz8ka3NsLR1JnUmUhqU2QARj%2Bq8kf%2BiIbRY%3D&reserved=0
>
> >>>
>
> >>>
<https://eur03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fci.apache.org%2Fprojects%2Fflink%2Fflink-docs-stable%2Fops%2Fsecurity-kerberos.html%23yarnmesos-mode&data=02%7C01%7Cj.gentile%40criteo.com%7C812d636410c8447bf75608d795da85b1%7C2a35d8fd574d48e3927c8c398e225a01%7C1%7C0%7C637142638087626978&sdata=xoRo3%2BM1Lz8ka3NsLR1JnUmUhqU2QARj%2Bq8kf%2BiIbRY%3D&reserved=0>
>
> >>>
>
> >>>
>
> >>> [4]
>
> >>>
https://eur03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fci.apache.org%2Fprojects%2Fflink%2Fflink-docs-release-1.9%2Fdev%2Fconnectors%2Fkafka.html%23enabling-kerberos-authentication-for-versions-09-and-above-only&data=02%7C01%7Cj.gentile%40criteo.com%7C812d636410c8447bf75608d795da85b1%7C2a35d8fd574d48e3927c8c398e225a01%7C1%7C0%7C637142638087626978&sdata=3a%2B4nIUvW%2FTaJY%2FuNEcE2yWg%2Fd%2FsLsJt%2FlkTntAvcOc%3D&reserved=0
>
> >>>
>
> >>>
<https://eur03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fci.apache.org%2Fprojects%2Fflink%2Fflink-docs-release-1.9%2Fdev%2Fconnectors%2Fkafka.html%23enabling-kerberos-authentication-for-versions-09-and-above-only&data=02%7C01%7Cj.gentile%40criteo.com%7C812d636410c8447bf75608d795da85b1%7C2a35d8fd574d48e3927c8c398e225a01%7C1%7C0%7C637142638087626978&sdata=3a%2B4nIUvW%2FTaJY%2FuNEcE2yWg%2Fd%2FsLsJt%2FlkTntAvcOc%3D&reserved=0>
>
> >>>
>
> >>>
>
> >>>
>
> >>>
>
> >>> On Fri, Jan 3, 2020 at 7:20 AM Chesnay Schepler
<[email protected]>
>
> >>> wrote:
>
> >>>
>
> >>> From what I understand from the documentation, if you want to
use
>
> >>> delegation tokens you always first have to issue a ticket using
>
> >>> kinit; so
>
> >>> you did everything correctly?
>
> >>>
>
> >>>
>
> >>>
>
> >>> On 02/01/2020 13:00, Juan Gentile wrote:
>
> >>>
>
> >>> Hello,
>
> >>>
>
> >>>
>
> >>>
>
> >>> Im trying to submit a job (batch worcount) to a Yarn cluster.
I’m trying
>
> >>> to use delegation tokens and I’m getting the following error:
>
> >>>
>
> >>> *org.apache.flink.client.deployment.ClusterDeploymentException:
Couldn't
>
> >>> deploy Yarn session cluster*
>
> >>>
>
> >>> * at
>
> >>>
org.apache.flink.yarn.AbstractYarnClusterDescriptor.deploySessionCluster(AbstractYarnClusterDescriptor.java:423)*
>
> >>>
>
> >>>
>
> >>> * at
>
> >>>
org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:262)*
>
> >>>
>
> >>>
>
> >>> * at
>
> >>>
org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:216)*
>
> >>>
>
> >>> * at
>
> >>>
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1053)*
>
> >>>
>
> >>>
>
> >>> * at
>
> >>>
org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1129)*
>
> >>>
>
> >>>
>
> >>> * at
java.security.AccessController.doPrivileged(Native
>
> >>> Method)*
>
> >>>
>
> >>> * at
javax.security.auth.Subject.doAs(Subject.java:422)*
>
> >>>
>
> >>> * at
>
> >>>
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1920)*
>
> >>>
>
> >>>
>
> >>> * at
>
> >>>
org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)*
>
> >>>
>
> >>>
>
> >>> * at
>
> >>>
org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1129)*
>
> >>>
>
> >>> *Caused by:
org.apache.hadoop.ipc.RemoteException(java.io.IOException):
>
> >>> Delegation Token can be issued only with kerberos or web
authentication*
>
> >>>
>
> >>> * at
>
> >>>
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getDelegationToken(FSNamesystem.java:7560)*
>
> >>>
>
> >>>
>
> >>> * at
>
> >>>
org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getDelegationToken(NameNodeRpcServer.java:548)*
>
> >>>
>
> >>>
>
> >>> * at
>
> >>>
org.apache.hadoop.hdfs.server.namenode.AuthorizationProviderProxyClientProtocol.getDelegationToken(AuthorizationProviderProxyClientProtocol.java:663)*
>
> >>>
>
> >>>
>
> >>> * at
>
> >>>
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getDelegationToken(ClientNamenodeProtocolServerSideTranslatorPB.java:981)*
>
> >>>
>
> >>>
>
> >>> * at
>
> >>>
org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)*
>
> >>>
>
> >>>
>
> >>> * at
>
> >>>
org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:617)*
>
> >>>
>
> >>>
>
> >>> * at
org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1073)*
>
> >>>
>
> >>> * at
>
> >>> org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2221)*
>
> >>>
>
> >>> * at
>
> >>> org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2217)*
>
> >>>
>
> >>> * at
java.security.AccessController.doPrivileged(Native
>
> >>> Method)*
>
> >>>
>
> >>> * at
javax.security.auth.Subject.doAs(Subject.java:422)*
>
> >>>
>
> >>> * at
>
> >>>
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1920)*
>
> >>>
>
> >>>
>
> >>> * at
>
> >>> org.apache.hadoop.ipc.Server$Handler.run(Server.java:2215) *
>
> >>>
>
> >>> * at
org.apache.hadoop.ipc.Client.call(Client.java:1472)*
>
> >>>
>
> >>> * at
org.apache.hadoop.ipc.Client.call(Client.java:1409)*
>
> >>>
>
> >>> * at
>
> >>>
org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:230)*
>
> >>>
>
> >>>
>
> >>> * at
com.sun.proxy.$Proxy18.getDelegationToken(Unknown
>
> >>> Source)*
>
> >>>
>
> >>> * at
>
> >>>
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getDelegationToken(ClientNamenodeProtocolTranslatorPB.java:928)*
>
> >>>
>
> >>>
>
> >>> * at
sun.reflect.NativeMethodAccessorImpl.invoke0(Native
>
> >>> Method)*
>
> >>>
>
> >>> * at
>
> >>>
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)*
>
> >>>
>
> >>>
>
> >>> * at
>
> >>>
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)*
>
> >>>
>
> >>>
>
> >>> * at
java.lang.reflect.Method.invoke(Method.java:498)*
>
> >>>
>
> >>> * at
>
> >>>
org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:256)*
>
> >>>
>
> >>>
>
> >>> * at
>
> >>>
org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:104)*
>
> >>>
>
> >>>
>
> >>> * at
com.sun.proxy.$Proxy19.getDelegationToken(Unknown
>
> >>> Source)*
>
> >>>
>
> >>> * at
>
> >>>
org.apache.hadoop.hdfs.DFSClient.getDelegationToken(DFSClient.java:1082)*
>
> >>>
>
> >>>
>
> >>> * at
>
> >>>
org.apache.hadoop.hdfs.DistributedFileSystem.getDelegationToken(DistributedFileSystem.java:1499)*
>
> >>>
>
> >>>
>
> >>> * at
>
> >>>
org.apache.hadoop.fs.FileSystem.collectDelegationTokens(FileSystem.java:546)*
>
> >>>
>
> >>>
>
> >>> * at
>
> >>>
org.apache.hadoop.fs.FileSystem.collectDelegationTokens(FileSystem.java:557)*
>
> >>>
>
> >>>
>
> >>> * at
>
> >>>
org.apache.hadoop.fs.FileSystem.addDelegationTokens(FileSystem.java:524)*
>
> >>>
>
> >>>
>
> >>> * at
>
> >>>
org.apache.hadoop.mapreduce.security.TokenCache.obtainTokensForNamenodesInternal(TokenCache.java:140)*
>
> >>>
>
> >>>
>
> >>> * at
>
> >>>
org.apache.hadoop.mapreduce.security.TokenCache.obtainTokensForNamenodesInternal(TokenCache.java:100)*
>
> >>>
>
> >>>
>
> >>> * at
>
> >>>
org.apache.hadoop.mapreduce.security.TokenCache.obtainTokensForNamenodes(TokenCache.java:80)*
>
> >>>
>
> >>>
>
> >>> * at
>
> >>> org.apache.flink.yarn.Utils.setTokensFor(Utils.java:235)*
>
> >>>
>
> >>> * at
>
> >>>
org.apache.flink.yarn.AbstractYarnClusterDescriptor.startAppMaster(AbstractYarnClusterDescriptor.java:972)*
>
> >>>
>
> >>>
>
> >>> * at
>
> >>>
org.apache.flink.yarn.AbstractYarnClusterDescriptor.deployInternal(AbstractYarnClusterDescriptor.java:545)*
>
> >>>
>
> >>>
>
> >>> * at
>
> >>>
org.apache.flink.yarn.AbstractYarnClusterDescriptor.deploySessionCluster(AbstractYarnClusterDescriptor.java:416)*
>
> >>>
>
> >>>
>
> >>>
>
> >>>
>
> >>> The kerberos configuration in this case is the default one. Then
I tried
>
> >>> with this option set to false
‘security.kerberos.login.use-ticket-cache‘
>
> >>> but I get the same error.
>
> >>>
>
> >>> I was able to solve the problem by issuing a ticket (with kinit)
but I’d
>
> >>> like to know if it’s possible to make flink work with delegation
>
> >>> tokens and
>
> >>> if so what is the right config.
>
> >>>
>
> >>>
>
> >>>
>
> >>> Thank you,
>
> >>>
>
> >>> Juan
>
> >>>
>
> >>>
>
> >>>
>
> >>>
>
> >>
>
>