Hi Chet,
I'll be a user of this, so thank you.
It seems reasonable although - did you consider letting folk name the document
ID field explicitly? It would avoid an unnecessary transformation and might
be simpler:
// instruct the writer to use a provided document ID
ElasticsearchIO.write().withConnectionConfiguration(conn).withMaxBatchSize(BATCH_SIZE).withDocumentIdField("myID");
On Wed, Nov 15, 2017 at 8:08 PM, Chet Aldrich <[email protected]
<mailto:[email protected]>> wrote:
Given that this seems like a change that should probably happen, and I’d
like to help contribute if possible, a few questions and my current opinion:
So I’m leaning towards approach B here, which is:
b. (a bit less user friendly) PCollection<KV> with K as an id. But forces
the user to do a Pardo before writing to ES to output KV pairs of <id, json>
I think that the reduction in user-friendliness may be outweighed by the
fact that this obviates some of the issues surrounding a failure when
finishing a bundle. Additionally, this /forces/ the user to provide a
document id, which I think is probably better practice. This will also
probably lead to fewer frustrations around “magic” code that just pulls
something in if it happens to be there, and doesn’t if not. We’ll need to
rely on the user catching this functionality in the docs or the code
itself to take advantage of it.
IMHO it’d be generally better to enforce this at compile time because it
does have an effect on whether the pipeline produces duplicates on
failure. Additionally, we get the benefit of relatively intuitive behavior
where if the user passes in the same Key value, it’ll update a record in
ES, and if the key is different then it will create a new record.
Curious to hear thoughts on this. If this seems reasonable I’ll go ahead
and create a JIRA for tracking and start working on a PR for this. Also,
if it’d be good to loop in the dev mailing list before starting let me
know, I’m pretty new to this.
Chet
On Nov 15, 2017, at 12:53 AM, Etienne Chauchot <[email protected]
<mailto:[email protected]>> wrote:
Hi Chet,
What you say is totally true, docs written using ElasticSearchIO will
always have an ES generated id. But it might change in the future, indeed
it might be a good thing to allow the user to pass an id. Just in 5
seconds thinking, I see 3 possible designs for that.
a.(simplest) use a json special field for the id, if it is provided by
the user in the input json then it is used, auto-generated id otherwise.
b. (a bit less user friendly) PCollection<KV> with K as an id. But forces
the user to do a Pardo before writing to ES to output KV pairs of <id, json>
c. (a lot more complex) Allow the IO to serialize/deserialize java beans
and have an String id field. Matching java types to ES types is quite
tricky, so, for now we just relied on the user to serialize his beans
into json and let ES match the types automatically.
Related to the problems you raise bellow:
1. Well, the bundle is the commit entity of beam. Consider the case of
ESIO.batchSize being < to bundle size. While processing records, when the
number of elements reaches batchSize, an ES bulk insert will be issued
but no finishBundle. If there is a problem later on in the bundle
processing before the finishBundle, the checkpoint will still be at the
beginning of the bundle, so all the bundle will be retried leading to
duplicate documents. Thanks for raising that! I'm CCing the dev list so
that someone could correct me on the checkpointing mecanism if I'm
missing something. Besides I'm thinking about forcing the user to provide
an id in all cases to workaround this issue.
2. Correct.
Best,
Etienne
Le 15/11/2017 à 02:16, Chet Aldrich a écrit :
Hello all!
So I’ve been using the ElasticSearchIO sink for a project (unfortunately
it’s Elasticsearch 5.x, and so I’ve been messing around with the latest
RC) and I’m finding that it doesn’t allow for changing the document ID,
but only lets you pass in a record, which means that the document ID is
auto-generated. See this line for what specifically is happening:
https://github.com/apache/beam/blob/master/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java#L838
<https://github.com/apache/beam/blob/master/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java#L838>
Essentially the data part of the document is being placed but it doesn’t
allow for other properties, such as the document ID, to be set.
This leads to two problems:
1. Beam doesn’t necessarily guarantee exactly-once execution for a given
item in a PCollection, as I understand it. This means that you may get
more than one record in Elastic for a given item in a PCollection that
you pass in.
2. You can’t do partial updates to an index. If you run a batch job
once, and then run the batch job again on the same index without
clearing it, you just double everything in there.
Is there any good way around this?
I’d be happy to try writing up a PR for this in theory, but not sure how
to best approach it. Also would like to figure out a way to get around
this in the meantime, if anyone has any ideas.
Best,
Chet
P.S. CCed [email protected] <mailto:[email protected]> because it
seems like he’s been doing work related to the elastic sink.