This is an automated email from the ASF dual-hosted git repository.

imbruced pushed a commit to branch confluent
in repository https://gitbox.apache.org/repos/asf/sedona.git

commit 55ec2c76855782ceb012bcb72ede8a5155216421
Author: pawelkocinski <[email protected]>
AuthorDate: Sun Mar 30 22:01:28 2025 +0200

    SEDONA-721 Apply requested changes.
---
 .../sedona/flink/confluent/Constructors.java       |  44 ++++++++
 .../apache/sedona/flink/confluent/Functions.java   |  33 ++++++
 .../sedona/flink/confluent/GeometrySerde.java      |  47 +++++++++
 .../constructors/ST_GeomCollFromText.java          |  39 +++++++
 .../confluent/constructors/ST_GeomFromEWKT.java    |  32 ++++++
 .../confluent/constructors/ST_GeomFromGML.java     |  39 +++++++
 .../confluent/constructors/ST_GeomFromGeoHash.java |  41 ++++++++
 .../confluent/constructors/ST_GeomFromGeoJSON.java |  35 +++++++
 .../confluent/constructors/ST_GeomFromKML.java     |  33 ++++++
 .../confluent/constructors/ST_GeomFromText.java    |  39 +++++++
 .../confluent/constructors/ST_GeomFromWKB.java     |  40 ++++++++
 .../confluent/constructors/ST_GeomFromWKT.java     |  39 +++++++
 .../constructors/ST_GeometryFromText.java          |  39 +++++++
 .../confluent/constructors/ST_LineFromText.java    |  44 ++++++++
 .../confluent/constructors/ST_LineFromWKB.java     |  61 +++++++++++
 .../constructors/ST_LineStringFromText.java        |  42 ++++++++
 .../confluent/constructors/ST_MLineFromText.java   |  38 +++++++
 .../confluent/constructors/ST_MPointFromText.java  |  38 +++++++
 .../confluent/constructors/ST_MPolyFromText.java   |  38 +++++++
 .../confluent/constructors/ST_MakeEnvelope.java    |  47 +++++++++
 .../confluent/constructors/ST_MakePointM.java      |  34 ++++++
 .../flink/confluent/constructors/ST_Point.java     |  31 ++++++
 .../constructors/ST_PointFromGeoHash.java          |  38 +++++++
 .../confluent/constructors/ST_PointFromText.java   |  42 ++++++++
 .../confluent/constructors/ST_PointFromWKB.java    |  61 +++++++++++
 .../flink/confluent/constructors/ST_PointM.java    |  43 ++++++++
 .../flink/confluent/constructors/ST_PointZ.java    |  43 ++++++++
 .../flink/confluent/constructors/ST_PointZM.java   |  45 ++++++++
 .../constructors/ST_PolygonFromEnvelope.java       |  44 ++++++++
 .../confluent/constructors/ST_PolygonFromText.java |  44 ++++++++
 pyflink/.python-version                            |   1 +
 pyflink/README.md                                  |   0
 pyflink/cli.py                                     |  23 +++++
 pyflink/pyproject.toml                             |   9 ++
 pyflink/sedonuts/cli/__init__.py                   |   0
 pyflink/sedonuts/cli/confluent/__init__.py         |   0
 pyflink/sedonuts/cli/confluent/functions.py        |  27 +++++
 pyflink/sedonuts/cli/confluent/generate_ddl.py     |  43 ++++++++
 .../sedonuts/cli/confluent/generate_terraform.py   |  20 ++++
 pyflink/sedonuts/cli/confluent/insert_with_cli.py  | 114 +++++++++++++++++++++
 pyflink/sedonuts/cli/confluent/template.py         |   3 +
 41 files changed, 1473 insertions(+)

diff --git 
a/flink/src/main/java/org/apache/sedona/flink/confluent/Constructors.java 
b/flink/src/main/java/org/apache/sedona/flink/confluent/Constructors.java
new file mode 100644
index 0000000000..57aada0c32
--- /dev/null
+++ b/flink/src/main/java/org/apache/sedona/flink/confluent/Constructors.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sedona.flink.confluent;
+
+import org.apache.sedona.common.enums.FileDataSplitter;
+import org.apache.sedona.common.enums.GeometryType;
+import org.apache.sedona.common.utils.FormatUtils;
+import org.locationtech.jts.geom.Geometry;
+import org.locationtech.jts.io.ParseException;
+
+public class Constructors {
+
+  public static Geometry getGeometryByType(
+      String geom, String inputDelimiter, GeometryType geometryType) throws 
ParseException {
+    FileDataSplitter delimiter =
+        inputDelimiter == null
+            ? FileDataSplitter.CSV
+            : FileDataSplitter.getFileDataSplitter(inputDelimiter);
+    FormatUtils<Geometry> formatUtils = new FormatUtils<>(delimiter, false, 
geometryType);
+    return formatUtils.readGeometry(geom);
+  }
+
+  public static Geometry getGeometryByFileData(String wktString, 
FileDataSplitter dataSplitter)
+      throws ParseException {
+    FormatUtils<Geometry> formatUtils = new FormatUtils<>(dataSplitter, false);
+    return formatUtils.readGeometry(wktString);
+  }
+}
diff --git 
a/flink/src/main/java/org/apache/sedona/flink/confluent/Functions.java 
b/flink/src/main/java/org/apache/sedona/flink/confluent/Functions.java
new file mode 100644
index 0000000000..70ecdc2783
--- /dev/null
+++ b/flink/src/main/java/org/apache/sedona/flink/confluent/Functions.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sedona.flink.confluent;
+
+import org.apache.flink.table.annotation.DataTypeHint;
+import org.apache.flink.table.functions.ScalarFunction;
+import org.locationtech.jts.geom.Geometry;
+
+public class Functions {
+  static class ST_Area extends ScalarFunction {
+    @DataTypeHint("Double")
+    public Double eval(byte[] o) {
+      Geometry geom = GeometrySerde.deserialize(o);
+      return org.apache.sedona.common.Functions.area(geom);
+    }
+  }
+}
diff --git 
a/flink/src/main/java/org/apache/sedona/flink/confluent/GeometrySerde.java 
b/flink/src/main/java/org/apache/sedona/flink/confluent/GeometrySerde.java
new file mode 100644
index 0000000000..87b1e61537
--- /dev/null
+++ b/flink/src/main/java/org/apache/sedona/flink/confluent/GeometrySerde.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sedona.flink.confluent;
+
+import java.util.Arrays;
+import org.apache.sedona.common.Constructors;
+import org.apache.sedona.common.Functions;
+import org.locationtech.jts.geom.Geometry;
+import org.locationtech.jts.geom.GeometryFactory;
+import org.locationtech.jts.io.ParseException;
+
+public class GeometrySerde {
+
+  public static GeometryFactory GEOMETRY_FACTORY = new GeometryFactory();
+
+  public static byte[] serialize(Geometry geom) {
+    return Functions.asEWKB(geom);
+  }
+
+  public static Geometry deserialize(byte[] bytes) {
+    try {
+      return Constructors.geomFromWKB(bytes);
+    } catch (ParseException e) {
+      String msg =
+          String.format(
+              "Failed to parse WKB(printed through Arrays.toString(bytes)): 
%s, error: %s",
+              Arrays.toString(bytes), e.getMessage());
+      throw new IllegalArgumentException(msg);
+    }
+  }
+}
diff --git 
a/flink/src/main/java/org/apache/sedona/flink/confluent/constructors/ST_GeomCollFromText.java
 
b/flink/src/main/java/org/apache/sedona/flink/confluent/constructors/ST_GeomCollFromText.java
new file mode 100644
index 0000000000..413f01a899
--- /dev/null
+++ 
b/flink/src/main/java/org/apache/sedona/flink/confluent/constructors/ST_GeomCollFromText.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sedona.flink.confluent.constructors;
+
+import org.apache.flink.table.annotation.DataTypeHint;
+import org.apache.flink.table.functions.ScalarFunction;
+import org.apache.sedona.flink.confluent.GeometrySerde;
+import org.locationtech.jts.io.ParseException;
+
+public class ST_GeomCollFromText extends ScalarFunction {
+
+  @DataTypeHint("Bytes")
+  public byte[] eval(@DataTypeHint(value = "String") String wkt, 
@DataTypeHint("Int") Integer srid)
+      throws ParseException {
+    return GeometrySerde.serialize(
+        org.apache.sedona.common.Constructors.geomCollFromText(wkt, srid));
+  }
+
+  @DataTypeHint("Bytes")
+  public byte[] eval(@DataTypeHint(value = "String") String wkt) throws 
ParseException {
+    return 
GeometrySerde.serialize(org.apache.sedona.common.Constructors.geomCollFromText(wkt,
 0));
+  }
+}
diff --git 
a/flink/src/main/java/org/apache/sedona/flink/confluent/constructors/ST_GeomFromEWKT.java
 
b/flink/src/main/java/org/apache/sedona/flink/confluent/constructors/ST_GeomFromEWKT.java
new file mode 100644
index 0000000000..9f481c1473
--- /dev/null
+++ 
b/flink/src/main/java/org/apache/sedona/flink/confluent/constructors/ST_GeomFromEWKT.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sedona.flink.confluent.constructors;
+
+import org.apache.flink.table.annotation.DataTypeHint;
+import org.apache.flink.table.functions.ScalarFunction;
+import org.apache.sedona.flink.confluent.GeometrySerde;
+import org.locationtech.jts.io.ParseException;
+
+public class ST_GeomFromEWKT extends ScalarFunction {
+
+  @DataTypeHint("Bytes")
+  public byte[] eval(@DataTypeHint("String") String wktString) throws 
ParseException {
+    return 
GeometrySerde.serialize(org.apache.sedona.common.Constructors.geomFromEWKT(wktString));
+  }
+}
diff --git 
a/flink/src/main/java/org/apache/sedona/flink/confluent/constructors/ST_GeomFromGML.java
 
b/flink/src/main/java/org/apache/sedona/flink/confluent/constructors/ST_GeomFromGML.java
new file mode 100644
index 0000000000..e4597457d9
--- /dev/null
+++ 
b/flink/src/main/java/org/apache/sedona/flink/confluent/constructors/ST_GeomFromGML.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sedona.flink.confluent.constructors;
+
+import org.apache.flink.table.annotation.DataTypeHint;
+import org.apache.flink.table.functions.ScalarFunction;
+import org.apache.sedona.flink.confluent.GeometrySerde;
+import org.locationtech.jts.geom.GeometryFactory;
+import org.locationtech.jts.io.ParseException;
+import org.locationtech.jts.io.gml2.GMLReader;
+
+public class ST_GeomFromGML extends ScalarFunction {
+
+  @DataTypeHint("Bytes")
+  public byte[] eval(@DataTypeHint("String") String gml) throws ParseException 
{
+    GMLReader reader = new GMLReader();
+    try {
+      return GeometrySerde.serialize(reader.read(gml, new GeometryFactory()));
+    } catch (Exception e) {
+      throw new ParseException(e);
+    }
+  }
+}
diff --git 
a/flink/src/main/java/org/apache/sedona/flink/confluent/constructors/ST_GeomFromGeoHash.java
 
b/flink/src/main/java/org/apache/sedona/flink/confluent/constructors/ST_GeomFromGeoHash.java
new file mode 100644
index 0000000000..ff25a6e96c
--- /dev/null
+++ 
b/flink/src/main/java/org/apache/sedona/flink/confluent/constructors/ST_GeomFromGeoHash.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sedona.flink.confluent.constructors;
+
+import org.apache.flink.table.annotation.DataTypeHint;
+import org.apache.flink.table.functions.ScalarFunction;
+import org.apache.sedona.common.utils.GeoHashDecoder;
+import org.apache.sedona.flink.confluent.GeometrySerde;
+import org.locationtech.jts.io.ParseException;
+
+public class ST_GeomFromGeoHash extends ScalarFunction {
+
+  @DataTypeHint("Bytes")
+  public byte[] eval(@DataTypeHint("String") String value, 
@DataTypeHint("Int") Integer precision)
+      throws ParseException, GeoHashDecoder.InvalidGeoHashException {
+    // The default precision is the geohash length. Otherwise, use the 
precision given by the user
+    return GeometrySerde.serialize(GeoHashDecoder.decode(value, precision));
+  }
+
+  @DataTypeHint("Bytes")
+  public byte[] eval(@DataTypeHint("String") String value)
+      throws ParseException, GeoHashDecoder.InvalidGeoHashException {
+    return eval(value, null);
+  }
+}
diff --git 
a/flink/src/main/java/org/apache/sedona/flink/confluent/constructors/ST_GeomFromGeoJSON.java
 
b/flink/src/main/java/org/apache/sedona/flink/confluent/constructors/ST_GeomFromGeoJSON.java
new file mode 100644
index 0000000000..9c52ab6d88
--- /dev/null
+++ 
b/flink/src/main/java/org/apache/sedona/flink/confluent/constructors/ST_GeomFromGeoJSON.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sedona.flink.confluent.constructors;
+
+import static 
org.apache.sedona.flink.confluent.Constructors.getGeometryByFileData;
+
+import org.apache.flink.table.annotation.DataTypeHint;
+import org.apache.flink.table.functions.ScalarFunction;
+import org.apache.sedona.common.enums.FileDataSplitter;
+import org.apache.sedona.flink.confluent.GeometrySerde;
+import org.locationtech.jts.io.ParseException;
+
+public class ST_GeomFromGeoJSON extends ScalarFunction {
+
+  @DataTypeHint("Bytes")
+  public byte[] eval(@DataTypeHint("String") String geoJson) throws 
ParseException {
+    return GeometrySerde.serialize(getGeometryByFileData(geoJson, 
FileDataSplitter.GEOJSON));
+  }
+}
diff --git 
a/flink/src/main/java/org/apache/sedona/flink/confluent/constructors/ST_GeomFromKML.java
 
b/flink/src/main/java/org/apache/sedona/flink/confluent/constructors/ST_GeomFromKML.java
new file mode 100644
index 0000000000..25286f4664
--- /dev/null
+++ 
b/flink/src/main/java/org/apache/sedona/flink/confluent/constructors/ST_GeomFromKML.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sedona.flink.confluent.constructors;
+
+import org.apache.flink.table.annotation.DataTypeHint;
+import org.apache.flink.table.functions.ScalarFunction;
+import org.apache.sedona.flink.confluent.GeometrySerde;
+import org.locationtech.jts.io.ParseException;
+import org.locationtech.jts.io.kml.KMLReader;
+
+public class ST_GeomFromKML extends ScalarFunction {
+
+  @DataTypeHint("Bytes")
+  public byte[] eval(@DataTypeHint("String") String kml) throws ParseException 
{
+    return GeometrySerde.serialize(new KMLReader().read(kml));
+  }
+}
diff --git 
a/flink/src/main/java/org/apache/sedona/flink/confluent/constructors/ST_GeomFromText.java
 
b/flink/src/main/java/org/apache/sedona/flink/confluent/constructors/ST_GeomFromText.java
new file mode 100644
index 0000000000..9f81c5f595
--- /dev/null
+++ 
b/flink/src/main/java/org/apache/sedona/flink/confluent/constructors/ST_GeomFromText.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sedona.flink.confluent.constructors;
+
+import org.apache.flink.table.annotation.DataTypeHint;
+import org.apache.flink.table.functions.ScalarFunction;
+import org.apache.sedona.flink.confluent.GeometrySerde;
+import org.locationtech.jts.io.ParseException;
+
+public class ST_GeomFromText extends ScalarFunction {
+
+  @DataTypeHint("Bytes")
+  public byte[] eval(@DataTypeHint("String") String wktString) throws 
ParseException {
+    return 
GeometrySerde.serialize(org.apache.sedona.common.Constructors.geomFromWKT(wktString,
 0));
+  }
+
+  @DataTypeHint("Bytes")
+  public byte[] eval(@DataTypeHint("String") String wktString, 
@DataTypeHint("Int") Integer srid)
+      throws ParseException {
+    return GeometrySerde.serialize(
+        org.apache.sedona.common.Constructors.geomFromWKT(wktString, srid));
+  }
+}
diff --git 
a/flink/src/main/java/org/apache/sedona/flink/confluent/constructors/ST_GeomFromWKB.java
 
b/flink/src/main/java/org/apache/sedona/flink/confluent/constructors/ST_GeomFromWKB.java
new file mode 100644
index 0000000000..d2dcefbb6f
--- /dev/null
+++ 
b/flink/src/main/java/org/apache/sedona/flink/confluent/constructors/ST_GeomFromWKB.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sedona.flink.confluent.constructors;
+
+import static 
org.apache.sedona.flink.confluent.Constructors.getGeometryByFileData;
+
+import org.apache.flink.table.annotation.DataTypeHint;
+import org.apache.flink.table.functions.ScalarFunction;
+import org.apache.sedona.common.enums.FileDataSplitter;
+import org.apache.sedona.flink.confluent.GeometrySerde;
+import org.locationtech.jts.io.ParseException;
+
+public class ST_GeomFromWKB extends ScalarFunction {
+
+  @DataTypeHint("Bytes")
+  public byte[] eval(@DataTypeHint("String") String wkbString) throws 
ParseException {
+    return GeometrySerde.serialize(getGeometryByFileData(wkbString, 
FileDataSplitter.WKB));
+  }
+
+  @DataTypeHint("Bytes")
+  public byte[] eval(@DataTypeHint("Bytes") byte[] wkb) throws ParseException {
+    return 
GeometrySerde.serialize(org.apache.sedona.common.Constructors.geomFromWKB(wkb));
+  }
+}
diff --git 
a/flink/src/main/java/org/apache/sedona/flink/confluent/constructors/ST_GeomFromWKT.java
 
b/flink/src/main/java/org/apache/sedona/flink/confluent/constructors/ST_GeomFromWKT.java
new file mode 100644
index 0000000000..4144c1778d
--- /dev/null
+++ 
b/flink/src/main/java/org/apache/sedona/flink/confluent/constructors/ST_GeomFromWKT.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sedona.flink.confluent.constructors;
+
+import org.apache.flink.table.annotation.DataTypeHint;
+import org.apache.flink.table.functions.ScalarFunction;
+import org.apache.sedona.flink.confluent.GeometrySerde;
+import org.locationtech.jts.io.ParseException;
+
+public class ST_GeomFromWKT extends ScalarFunction {
+
+  @DataTypeHint("Bytes")
+  public byte[] eval(@DataTypeHint("String") String wktString) throws 
ParseException {
+    return 
GeometrySerde.serialize(org.apache.sedona.common.Constructors.geomFromWKT(wktString,
 0));
+  }
+
+  @DataTypeHint("Bytes")
+  public byte[] eval(@DataTypeHint("String") String wktString, 
@DataTypeHint("Int") Integer srid)
+      throws ParseException {
+    return GeometrySerde.serialize(
+        org.apache.sedona.common.Constructors.geomFromWKT(wktString, srid));
+  }
+}
diff --git 
a/flink/src/main/java/org/apache/sedona/flink/confluent/constructors/ST_GeometryFromText.java
 
b/flink/src/main/java/org/apache/sedona/flink/confluent/constructors/ST_GeometryFromText.java
new file mode 100644
index 0000000000..9a184e21ea
--- /dev/null
+++ 
b/flink/src/main/java/org/apache/sedona/flink/confluent/constructors/ST_GeometryFromText.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sedona.flink.confluent.constructors;
+
+import org.apache.flink.table.annotation.DataTypeHint;
+import org.apache.flink.table.functions.ScalarFunction;
+import org.apache.sedona.flink.confluent.GeometrySerde;
+import org.locationtech.jts.io.ParseException;
+
+public class ST_GeometryFromText extends ScalarFunction {
+
+  @DataTypeHint("Bytes")
+  public byte[] eval(@DataTypeHint("String") String wktString) throws 
ParseException {
+    return 
GeometrySerde.serialize(org.apache.sedona.common.Constructors.geomFromWKT(wktString,
 0));
+  }
+
+  @DataTypeHint("Bytes")
+  public byte[] eval(@DataTypeHint("String") String wktString, 
@DataTypeHint("Int") Integer srid)
+      throws ParseException {
+    return GeometrySerde.serialize(
+        org.apache.sedona.common.Constructors.geomFromWKT(wktString, srid));
+  }
+}
diff --git 
a/flink/src/main/java/org/apache/sedona/flink/confluent/constructors/ST_LineFromText.java
 
b/flink/src/main/java/org/apache/sedona/flink/confluent/constructors/ST_LineFromText.java
new file mode 100644
index 0000000000..e1205aa136
--- /dev/null
+++ 
b/flink/src/main/java/org/apache/sedona/flink/confluent/constructors/ST_LineFromText.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sedona.flink.confluent.constructors;
+
+import static org.apache.sedona.flink.confluent.Constructors.getGeometryByType;
+
+import org.apache.flink.table.annotation.DataTypeHint;
+import org.apache.flink.table.functions.ScalarFunction;
+import org.apache.sedona.common.enums.GeometryType;
+import org.apache.sedona.flink.confluent.GeometrySerde;
+import org.locationtech.jts.io.ParseException;
+
+public class ST_LineFromText extends ScalarFunction {
+
+  @DataTypeHint("Bytes")
+  public byte[] eval(
+      @DataTypeHint("String") String lineString, @DataTypeHint("String") 
String inputDelimiter)
+      throws ParseException {
+    // The default delimiter is comma. Otherwise, use the delimiter given by 
the user
+    return GeometrySerde.serialize(
+        getGeometryByType(lineString, inputDelimiter, 
GeometryType.LINESTRING));
+  }
+
+  @DataTypeHint("Bytes")
+  public byte[] eval(@DataTypeHint("String") String lineString) throws 
ParseException {
+    return eval(lineString, null);
+  }
+}
diff --git 
a/flink/src/main/java/org/apache/sedona/flink/confluent/constructors/ST_LineFromWKB.java
 
b/flink/src/main/java/org/apache/sedona/flink/confluent/constructors/ST_LineFromWKB.java
new file mode 100644
index 0000000000..792adff004
--- /dev/null
+++ 
b/flink/src/main/java/org/apache/sedona/flink/confluent/constructors/ST_LineFromWKB.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sedona.flink.confluent.constructors;
+
+import static 
org.apache.sedona.flink.confluent.Constructors.getGeometryByFileData;
+
+import org.apache.flink.table.annotation.DataTypeHint;
+import org.apache.flink.table.functions.ScalarFunction;
+import org.apache.sedona.common.enums.FileDataSplitter;
+import org.apache.sedona.flink.confluent.GeometrySerde;
+import org.locationtech.jts.geom.Geometry;
+import org.locationtech.jts.geom.LineString;
+import org.locationtech.jts.io.ParseException;
+
+public class ST_LineFromWKB extends ScalarFunction {
+
+  @DataTypeHint("Bytes")
+  public byte[] eval(@DataTypeHint("String") String wkbString) throws 
ParseException {
+    Geometry geometry = getGeometryByFileData(wkbString, FileDataSplitter.WKB);
+    if (geometry instanceof LineString) {
+      return GeometrySerde.serialize(geometry);
+    }
+    return null;
+  }
+
+  @DataTypeHint("Bytes")
+  public byte[] eval(@DataTypeHint("String") String wkbString, int srid) 
throws ParseException {
+    Geometry geometry = getGeometryByFileData(wkbString, FileDataSplitter.WKB);
+    if (geometry instanceof LineString) {
+      geometry = org.apache.sedona.common.Functions.setSRID(geometry, srid);
+      return GeometrySerde.serialize(geometry);
+    }
+    return null; // Return null if geometry is not a Linestring
+  }
+
+  @DataTypeHint("Bytes")
+  public byte[] eval(@DataTypeHint("Bytes") byte[] wkb) throws ParseException {
+    return 
GeometrySerde.serialize(org.apache.sedona.common.Constructors.lineFromWKB(wkb));
+  }
+
+  @DataTypeHint("Bytes")
+  public byte[] eval(@DataTypeHint("Bytes") byte[] wkb, int srid) throws 
ParseException {
+    return 
GeometrySerde.serialize(org.apache.sedona.common.Constructors.lineFromWKB(wkb, 
srid));
+  }
+}
diff --git 
a/flink/src/main/java/org/apache/sedona/flink/confluent/constructors/ST_LineStringFromText.java
 
b/flink/src/main/java/org/apache/sedona/flink/confluent/constructors/ST_LineStringFromText.java
new file mode 100644
index 0000000000..8a5ebc6d61
--- /dev/null
+++ 
b/flink/src/main/java/org/apache/sedona/flink/confluent/constructors/ST_LineStringFromText.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sedona.flink.confluent.constructors;
+
+import org.apache.flink.table.annotation.DataTypeHint;
+import org.apache.flink.table.functions.ScalarFunction;
+import org.apache.sedona.flink.confluent.GeometrySerde;
+import org.locationtech.jts.io.ParseException;
+
+public class ST_LineStringFromText extends ScalarFunction {
+
+  @DataTypeHint("Bytes")
+  public byte[] eval(
+      @DataTypeHint("String") String lineString, @DataTypeHint("String") 
String inputDelimiter)
+      throws ParseException {
+    // The default delimiter is comma. Otherwise, use the delimiter given by 
the user
+    return GeometrySerde.serialize(
+        new org.apache.sedona.flink.expressions.Constructors.ST_LineFromText()
+            .eval(lineString, inputDelimiter));
+  }
+
+  @DataTypeHint("Bytes")
+  public byte[] eval(@DataTypeHint("String") String lineString) throws 
ParseException {
+    return eval(lineString, null);
+  }
+}
diff --git 
a/flink/src/main/java/org/apache/sedona/flink/confluent/constructors/ST_MLineFromText.java
 
b/flink/src/main/java/org/apache/sedona/flink/confluent/constructors/ST_MLineFromText.java
new file mode 100644
index 0000000000..449f110136
--- /dev/null
+++ 
b/flink/src/main/java/org/apache/sedona/flink/confluent/constructors/ST_MLineFromText.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sedona.flink.confluent.constructors;
+
+import org.apache.flink.table.annotation.DataTypeHint;
+import org.apache.flink.table.functions.ScalarFunction;
+import org.apache.sedona.flink.confluent.GeometrySerde;
+import org.locationtech.jts.io.ParseException;
+
+public class ST_MLineFromText extends ScalarFunction {
+
+  @DataTypeHint("Bytes")
+  public byte[] eval(@DataTypeHint(value = "String") String wkt, 
@DataTypeHint("Int") Integer srid)
+      throws ParseException {
+    return 
GeometrySerde.serialize(org.apache.sedona.common.Constructors.mLineFromText(wkt,
 srid));
+  }
+
+  @DataTypeHint("Bytes")
+  public byte[] eval(@DataTypeHint(value = "String") String wkt) throws 
ParseException {
+    return 
GeometrySerde.serialize(org.apache.sedona.common.Constructors.mLineFromText(wkt,
 0));
+  }
+}
diff --git 
a/flink/src/main/java/org/apache/sedona/flink/confluent/constructors/ST_MPointFromText.java
 
b/flink/src/main/java/org/apache/sedona/flink/confluent/constructors/ST_MPointFromText.java
new file mode 100644
index 0000000000..968325182b
--- /dev/null
+++ 
b/flink/src/main/java/org/apache/sedona/flink/confluent/constructors/ST_MPointFromText.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sedona.flink.confluent.constructors;
+
+import org.apache.flink.table.annotation.DataTypeHint;
+import org.apache.flink.table.functions.ScalarFunction;
+import org.apache.sedona.flink.confluent.GeometrySerde;
+import org.locationtech.jts.io.ParseException;
+
+public class ST_MPointFromText extends ScalarFunction {
+
+  @DataTypeHint("Bytes")
+  public byte[] eval(@DataTypeHint(value = "String") String wkt, 
@DataTypeHint("Int") Integer srid)
+      throws ParseException {
+    return 
GeometrySerde.serialize(org.apache.sedona.common.Constructors.mPointFromText(wkt,
 srid));
+  }
+
+  @DataTypeHint("Bytes")
+  public byte[] eval(@DataTypeHint(value = "String") String wkt) throws 
ParseException {
+    return 
GeometrySerde.serialize(org.apache.sedona.common.Constructors.mPointFromText(wkt,
 0));
+  }
+}
diff --git 
a/flink/src/main/java/org/apache/sedona/flink/confluent/constructors/ST_MPolyFromText.java
 
b/flink/src/main/java/org/apache/sedona/flink/confluent/constructors/ST_MPolyFromText.java
new file mode 100644
index 0000000000..bdbe27805f
--- /dev/null
+++ 
b/flink/src/main/java/org/apache/sedona/flink/confluent/constructors/ST_MPolyFromText.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sedona.flink.confluent.constructors;
+
+import org.apache.flink.table.annotation.DataTypeHint;
+import org.apache.flink.table.functions.ScalarFunction;
+import org.apache.sedona.flink.confluent.GeometrySerde;
+import org.locationtech.jts.io.ParseException;
+
+public class ST_MPolyFromText extends ScalarFunction {
+
+  @DataTypeHint("Bytes")
+  public byte[] eval(@DataTypeHint(value = "String") String wkt, 
@DataTypeHint("Int") Integer srid)
+      throws ParseException {
+    return 
GeometrySerde.serialize(org.apache.sedona.common.Constructors.mPolyFromText(wkt,
 srid));
+  }
+
+  @DataTypeHint("Bytes")
+  public byte[] eval(@DataTypeHint(value = "String") String wkt) throws 
ParseException {
+    return 
GeometrySerde.serialize(org.apache.sedona.common.Constructors.mPolyFromText(wkt,
 0));
+  }
+}
diff --git 
a/flink/src/main/java/org/apache/sedona/flink/confluent/constructors/ST_MakeEnvelope.java
 
b/flink/src/main/java/org/apache/sedona/flink/confluent/constructors/ST_MakeEnvelope.java
new file mode 100644
index 0000000000..2f276f55e1
--- /dev/null
+++ 
b/flink/src/main/java/org/apache/sedona/flink/confluent/constructors/ST_MakeEnvelope.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sedona.flink.confluent.constructors;
+
+import org.apache.flink.table.annotation.DataTypeHint;
+import org.apache.flink.table.functions.ScalarFunction;
+import org.apache.sedona.flink.confluent.GeometrySerde;
+
+public class ST_MakeEnvelope extends ScalarFunction {
+
+  @DataTypeHint("Bytes")
+  public byte[] eval(
+      @DataTypeHint("Double") Double minX,
+      @DataTypeHint("Double") Double minY,
+      @DataTypeHint("Double") Double maxX,
+      @DataTypeHint("Double") Double maxY,
+      @DataTypeHint("Integer") Integer srid) {
+    return GeometrySerde.serialize(
+        org.apache.sedona.common.Constructors.makeEnvelope(minX, minY, maxX, 
maxY, srid));
+  }
+
+  @DataTypeHint("Bytes")
+  public byte[] eval(
+      @DataTypeHint("Double") Double minX,
+      @DataTypeHint("Double") Double minY,
+      @DataTypeHint("Double") Double maxX,
+      @DataTypeHint("Double") Double maxY) {
+    return GeometrySerde.serialize(
+        org.apache.sedona.common.Constructors.makeEnvelope(minX, minY, maxX, 
maxY));
+  }
+}
diff --git 
a/flink/src/main/java/org/apache/sedona/flink/confluent/constructors/ST_MakePointM.java
 
b/flink/src/main/java/org/apache/sedona/flink/confluent/constructors/ST_MakePointM.java
new file mode 100644
index 0000000000..53f4874232
--- /dev/null
+++ 
b/flink/src/main/java/org/apache/sedona/flink/confluent/constructors/ST_MakePointM.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sedona.flink.confluent.constructors;
+
+import org.apache.flink.table.annotation.DataTypeHint;
+import org.apache.flink.table.functions.ScalarFunction;
+import org.apache.sedona.flink.confluent.GeometrySerde;
+
+public class ST_MakePointM extends ScalarFunction {
+
+  @DataTypeHint("Bytes")
+  public byte[] eval(
+      @DataTypeHint("Double") Double x,
+      @DataTypeHint("Double") Double y,
+      @DataTypeHint("Double") Double m) {
+    return 
GeometrySerde.serialize(org.apache.sedona.common.Constructors.makePointM(x, y, 
m));
+  }
+}
diff --git 
a/flink/src/main/java/org/apache/sedona/flink/confluent/constructors/ST_Point.java
 
b/flink/src/main/java/org/apache/sedona/flink/confluent/constructors/ST_Point.java
new file mode 100644
index 0000000000..2a71472ef1
--- /dev/null
+++ 
b/flink/src/main/java/org/apache/sedona/flink/confluent/constructors/ST_Point.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sedona.flink.confluent.constructors;
+
+import org.apache.flink.table.annotation.DataTypeHint;
+import org.apache.flink.table.functions.ScalarFunction;
+import org.apache.sedona.flink.confluent.GeometrySerde;
+
+public class ST_Point extends ScalarFunction {
+
+  @DataTypeHint("Bytes")
+  public byte[] eval(@DataTypeHint("Double") Double x, @DataTypeHint("Double") 
Double y) {
+    return 
GeometrySerde.serialize(org.apache.sedona.common.Constructors.point(x, y));
+  }
+}
diff --git 
a/flink/src/main/java/org/apache/sedona/flink/confluent/constructors/ST_PointFromGeoHash.java
 
b/flink/src/main/java/org/apache/sedona/flink/confluent/constructors/ST_PointFromGeoHash.java
new file mode 100644
index 0000000000..985aca4f82
--- /dev/null
+++ 
b/flink/src/main/java/org/apache/sedona/flink/confluent/constructors/ST_PointFromGeoHash.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sedona.flink.confluent.constructors;
+
+import org.apache.flink.table.annotation.DataTypeHint;
+import org.apache.flink.table.functions.ScalarFunction;
+import org.apache.sedona.flink.confluent.GeometrySerde;
+
+public class ST_PointFromGeoHash extends ScalarFunction {
+
+  @DataTypeHint("Bytes")
+  public byte[] eval(@DataTypeHint("String") String value, 
@DataTypeHint("Int") Integer precision) {
+    // The default precision is the geohash length. Otherwise, use the 
precision given by the user
+    return GeometrySerde.serialize(
+        org.apache.sedona.common.Constructors.pointFromGeoHash(value, 
precision));
+  }
+
+  @DataTypeHint("Bytes")
+  public byte[] eval(@DataTypeHint("String") String value) {
+    return eval(value, null);
+  }
+}
diff --git 
a/flink/src/main/java/org/apache/sedona/flink/confluent/constructors/ST_PointFromText.java
 
b/flink/src/main/java/org/apache/sedona/flink/confluent/constructors/ST_PointFromText.java
new file mode 100644
index 0000000000..a2e624bf69
--- /dev/null
+++ 
b/flink/src/main/java/org/apache/sedona/flink/confluent/constructors/ST_PointFromText.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sedona.flink.confluent.constructors;
+
+import static org.apache.sedona.flink.confluent.Constructors.getGeometryByType;
+
+import org.apache.flink.table.annotation.DataTypeHint;
+import org.apache.flink.table.functions.ScalarFunction;
+import org.apache.sedona.common.enums.GeometryType;
+import org.apache.sedona.flink.confluent.GeometrySerde;
+import org.locationtech.jts.io.ParseException;
+
+public class ST_PointFromText extends ScalarFunction {
+
+  @DataTypeHint("Bytes")
+  public byte[] eval(
+      @DataTypeHint("String") String s, @DataTypeHint("String") String 
inputDelimiter)
+      throws ParseException {
+    return GeometrySerde.serialize(getGeometryByType(s, inputDelimiter, 
GeometryType.POINT));
+  }
+
+  @DataTypeHint("Bytes")
+  public byte[] eval(@DataTypeHint("String") String s) throws ParseException {
+    return eval(s, null);
+  }
+}
diff --git 
a/flink/src/main/java/org/apache/sedona/flink/confluent/constructors/ST_PointFromWKB.java
 
b/flink/src/main/java/org/apache/sedona/flink/confluent/constructors/ST_PointFromWKB.java
new file mode 100644
index 0000000000..b51ae97c67
--- /dev/null
+++ 
b/flink/src/main/java/org/apache/sedona/flink/confluent/constructors/ST_PointFromWKB.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sedona.flink.confluent.constructors;
+
+import static 
org.apache.sedona.flink.confluent.Constructors.getGeometryByFileData;
+
+import org.apache.flink.table.annotation.DataTypeHint;
+import org.apache.flink.table.functions.ScalarFunction;
+import org.apache.sedona.common.enums.FileDataSplitter;
+import org.apache.sedona.flink.confluent.GeometrySerde;
+import org.locationtech.jts.geom.Geometry;
+import org.locationtech.jts.geom.Point;
+import org.locationtech.jts.io.ParseException;
+
+public class ST_PointFromWKB extends ScalarFunction {
+
+  @DataTypeHint("Bytes")
+  public byte[] eval(@DataTypeHint("String") String wkbString) throws 
ParseException {
+    Geometry geometry = getGeometryByFileData(wkbString, FileDataSplitter.WKB);
+    if (geometry instanceof Point) {
+      return GeometrySerde.serialize(geometry);
+    }
+    return null; // Return null if geometry is not a Point
+  }
+
+  @DataTypeHint("Bytes")
+  public byte[] eval(@DataTypeHint("String") String wkbString, int srid) 
throws ParseException {
+    Geometry geometry = getGeometryByFileData(wkbString, FileDataSplitter.WKB);
+    if (geometry instanceof Point) {
+      geometry = org.apache.sedona.common.Functions.setSRID(geometry, srid);
+      return GeometrySerde.serialize(geometry);
+    }
+    return null; // Return null if geometry is not a Point
+  }
+
+  @DataTypeHint("Bytes")
+  public byte[] eval(@DataTypeHint("Bytes") byte[] wkb) throws ParseException {
+    return 
GeometrySerde.serialize(org.apache.sedona.common.Constructors.pointFromWKB(wkb));
+  }
+
+  @DataTypeHint("Bytes")
+  public byte[] eval(@DataTypeHint("Bytes") byte[] wkb, int srid) throws 
ParseException {
+    return 
GeometrySerde.serialize(org.apache.sedona.common.Constructors.pointFromWKB(wkb, 
srid));
+  }
+}
diff --git 
a/flink/src/main/java/org/apache/sedona/flink/confluent/constructors/ST_PointM.java
 
b/flink/src/main/java/org/apache/sedona/flink/confluent/constructors/ST_PointM.java
new file mode 100644
index 0000000000..70c42066eb
--- /dev/null
+++ 
b/flink/src/main/java/org/apache/sedona/flink/confluent/constructors/ST_PointM.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sedona.flink.confluent.constructors;
+
+import org.apache.flink.table.annotation.DataTypeHint;
+import org.apache.flink.table.functions.ScalarFunction;
+import org.apache.sedona.flink.confluent.GeometrySerde;
+
+public class ST_PointM extends ScalarFunction {
+
+  @DataTypeHint("Bytes")
+  public byte[] eval(
+      @DataTypeHint("Double") Double x,
+      @DataTypeHint("Double") Double y,
+      @DataTypeHint("Double") Double m) {
+    return eval(x, y, m, 0);
+  }
+
+  @DataTypeHint("Bytes")
+  public byte[] eval(
+      @DataTypeHint("Double") Double x,
+      @DataTypeHint("Double") Double y,
+      @DataTypeHint("Double") Double m,
+      @DataTypeHint("Integer") Integer srid) {
+    return 
GeometrySerde.serialize(org.apache.sedona.common.Constructors.pointM(x, y, m, 
srid));
+  }
+}
diff --git 
a/flink/src/main/java/org/apache/sedona/flink/confluent/constructors/ST_PointZ.java
 
b/flink/src/main/java/org/apache/sedona/flink/confluent/constructors/ST_PointZ.java
new file mode 100644
index 0000000000..17dfffd392
--- /dev/null
+++ 
b/flink/src/main/java/org/apache/sedona/flink/confluent/constructors/ST_PointZ.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sedona.flink.confluent.constructors;
+
+import org.apache.flink.table.annotation.DataTypeHint;
+import org.apache.flink.table.functions.ScalarFunction;
+import org.apache.sedona.flink.confluent.GeometrySerde;
+
+public class ST_PointZ extends ScalarFunction {
+
+  @DataTypeHint("Bytes")
+  public byte[] eval(
+      @DataTypeHint("Double") Double x,
+      @DataTypeHint("Double") Double y,
+      @DataTypeHint("Double") Double z) {
+    return eval(x, y, z, 0);
+  }
+
+  @DataTypeHint("Bytes")
+  public byte[] eval(
+      @DataTypeHint("Double") Double x,
+      @DataTypeHint("Double") Double y,
+      @DataTypeHint("Double") Double z,
+      @DataTypeHint("Integer") Integer srid) {
+    return 
GeometrySerde.serialize(org.apache.sedona.common.Constructors.pointZ(x, y, z, 
srid));
+  }
+}
diff --git 
a/flink/src/main/java/org/apache/sedona/flink/confluent/constructors/ST_PointZM.java
 
b/flink/src/main/java/org/apache/sedona/flink/confluent/constructors/ST_PointZM.java
new file mode 100644
index 0000000000..1836b60bf2
--- /dev/null
+++ 
b/flink/src/main/java/org/apache/sedona/flink/confluent/constructors/ST_PointZM.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sedona.flink.confluent.constructors;
+
+import org.apache.flink.table.annotation.DataTypeHint;
+import org.apache.flink.table.functions.ScalarFunction;
+import org.apache.sedona.flink.confluent.GeometrySerde;
+
+public class ST_PointZM extends ScalarFunction {
+
+  @DataTypeHint("Bytes")
+  public byte[] eval(
+      @DataTypeHint("Double") Double x,
+      @DataTypeHint("Double") Double y,
+      @DataTypeHint("Double") Double z,
+      @DataTypeHint("Double") Double m) {
+    return eval(x, y, z, m, 0);
+  }
+
+  @DataTypeHint("Bytes")
+  public byte[] eval(
+      @DataTypeHint("Double") Double x,
+      @DataTypeHint("Double") Double y,
+      @DataTypeHint("Double") Double z,
+      @DataTypeHint("Double") Double m,
+      @DataTypeHint("Integer") Integer srid) {
+    return 
GeometrySerde.serialize(org.apache.sedona.common.Constructors.pointZM(x, y, z, 
m, srid));
+  }
+}
diff --git 
a/flink/src/main/java/org/apache/sedona/flink/confluent/constructors/ST_PolygonFromEnvelope.java
 
b/flink/src/main/java/org/apache/sedona/flink/confluent/constructors/ST_PolygonFromEnvelope.java
new file mode 100644
index 0000000000..9595264495
--- /dev/null
+++ 
b/flink/src/main/java/org/apache/sedona/flink/confluent/constructors/ST_PolygonFromEnvelope.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sedona.flink.confluent.constructors;
+
+import org.apache.flink.table.annotation.DataTypeHint;
+import org.apache.flink.table.functions.ScalarFunction;
+import org.apache.sedona.flink.confluent.GeometrySerde;
+import org.locationtech.jts.geom.Coordinate;
+import org.locationtech.jts.geom.GeometryFactory;
+
+public class ST_PolygonFromEnvelope extends ScalarFunction {
+
+  @DataTypeHint("Bytes")
+  public byte[] eval(
+      @DataTypeHint("Double") Double minX,
+      @DataTypeHint("Double") Double minY,
+      @DataTypeHint("Double") Double maxX,
+      @DataTypeHint("Double") Double maxY) {
+    Coordinate[] coordinates = new Coordinate[5];
+    coordinates[0] = new Coordinate(minX, minY);
+    coordinates[1] = new Coordinate(minX, maxY);
+    coordinates[2] = new Coordinate(maxX, maxY);
+    coordinates[3] = new Coordinate(maxX, minY);
+    coordinates[4] = coordinates[0];
+    GeometryFactory geometryFactory = new GeometryFactory();
+    return GeometrySerde.serialize(geometryFactory.createPolygon(coordinates));
+  }
+}
diff --git 
a/flink/src/main/java/org/apache/sedona/flink/confluent/constructors/ST_PolygonFromText.java
 
b/flink/src/main/java/org/apache/sedona/flink/confluent/constructors/ST_PolygonFromText.java
new file mode 100644
index 0000000000..53a8409e00
--- /dev/null
+++ 
b/flink/src/main/java/org/apache/sedona/flink/confluent/constructors/ST_PolygonFromText.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sedona.flink.confluent.constructors;
+
+import static org.apache.sedona.flink.confluent.Constructors.getGeometryByType;
+
+import org.apache.flink.table.annotation.DataTypeHint;
+import org.apache.flink.table.functions.ScalarFunction;
+import org.apache.sedona.common.enums.GeometryType;
+import org.apache.sedona.flink.confluent.GeometrySerde;
+import org.locationtech.jts.io.ParseException;
+
+public class ST_PolygonFromText extends ScalarFunction {
+
+  @DataTypeHint("Bytes")
+  public byte[] eval(
+      @DataTypeHint("String") String polygonString, @DataTypeHint("String") 
String inputDelimiter)
+      throws ParseException {
+    // The default delimiter is comma. Otherwise, use the delimiter given by 
the user
+    return GeometrySerde.serialize(
+        getGeometryByType(polygonString, inputDelimiter, 
GeometryType.POLYGON));
+  }
+
+  @DataTypeHint("Bytes")
+  public byte[] eval(@DataTypeHint("String") String polygonString) throws 
ParseException {
+    return eval(polygonString, null);
+  }
+}
diff --git a/pyflink/.python-version b/pyflink/.python-version
new file mode 100644
index 0000000000..2c0733315e
--- /dev/null
+++ b/pyflink/.python-version
@@ -0,0 +1 @@
+3.11
diff --git a/pyflink/README.md b/pyflink/README.md
new file mode 100644
index 0000000000..e69de29bb2
diff --git a/pyflink/cli.py b/pyflink/cli.py
new file mode 100644
index 0000000000..2b0b5e09cf
--- /dev/null
+++ b/pyflink/cli.py
@@ -0,0 +1,23 @@
+import typer
+
+from sedonuts.cli.confluent.generate_ddl import create_ddl_command
+from sedonuts.cli.confluent.generate_terraform import create_terraform
+from sedonuts.cli.confluent.insert_with_cli import create_confluent_cli_command
+
+
+def main():
+    terraform_command = create_terraform()
+    ddl_command = create_ddl_command()
+    confluent_cli_command = create_confluent_cli_command()
+
+    app = typer.Typer()
+
+    app.add_typer(terraform_command, name="terraform")
+    app.add_typer(ddl_command, name="ddl")
+    app.add_typer(confluent_cli_command, name="cli")
+
+    app()
+
+
+if __name__ == "__main__":
+    main()
diff --git a/pyflink/pyproject.toml b/pyflink/pyproject.toml
new file mode 100644
index 0000000000..befa4ff5b9
--- /dev/null
+++ b/pyflink/pyproject.toml
@@ -0,0 +1,9 @@
+[project]
+name = "pyflink"
+version = "0.1.0"
+description = "Add your description here"
+readme = "README.md"
+requires-python = ">=3.11"
+dependencies = [
+    "typer>=0.15.2",
+]
diff --git a/pyflink/sedonuts/cli/__init__.py b/pyflink/sedonuts/cli/__init__.py
new file mode 100644
index 0000000000..e69de29bb2
diff --git a/pyflink/sedonuts/cli/confluent/__init__.py 
b/pyflink/sedonuts/cli/confluent/__init__.py
new file mode 100644
index 0000000000..e69de29bb2
diff --git a/pyflink/sedonuts/cli/confluent/functions.py 
b/pyflink/sedonuts/cli/confluent/functions.py
new file mode 100644
index 0000000000..f104b5d201
--- /dev/null
+++ b/pyflink/sedonuts/cli/confluent/functions.py
@@ -0,0 +1,27 @@
+import os.path
+import zipfile
+
+
+def download_jar(sedona_version, scala_version) -> str:
+    pass
+
+
+def list_classes_in_jar(jar_file_path):
+    with zipfile.ZipFile(jar_file_path, 'r') as jar:
+        jar_contents = jar.namelist()
+
+        class_files = [
+            file for file in jar_contents
+            if file.endswith('.class') and 
"org/apache/sedona/flink/confluent/constructors" in file and "ST_" in file
+        ]
+
+        return class_files
+
+
+def list_functions(sedona_version, scala_version, path: str | None = None):
+    if path is None:
+        path = download_jar(sedona_version, scala_version)
+
+    path = os.path.join(path, 
f"sedona-flink-shaded_{scala_version}-{sedona_version}-SNAPSHOT.jar")
+
+    return list_classes_in_jar(path)
diff --git a/pyflink/sedonuts/cli/confluent/generate_ddl.py 
b/pyflink/sedonuts/cli/confluent/generate_ddl.py
new file mode 100644
index 0000000000..f811cdbd89
--- /dev/null
+++ b/pyflink/sedonuts/cli/confluent/generate_ddl.py
@@ -0,0 +1,43 @@
+import typer
+
+from sedonuts.cli.confluent.functions import list_functions
+from sedonuts.cli.confluent.template import function_template
+
+
+def generate_ddl(
+        file: str = typer.Option(None, "--file", "-f", help="Path to the 
Terraform configuration file"),
+        artifact_id: str = typer.Option(..., "--artifact-id", "-a", 
help="Artifact ID of the JAR file")
+):
+    files = list_functions(
+        "1.8.0",
+        "2.12",
+        "/Users/pawelkocinski/Desktop/projects/sed/sedona/flink-shaded/target"
+    )
+
+    templates = []
+
+    for f in files:
+        tail = f.split("/")[-1].replace(".class", "")
+        class_name = tail.split("$")[1]
+        location = tail.replace("$", ".")
+        # class_name, location,
+        templates.append(function_template.format(artifact_id))
+
+    if file is None:
+        for template in templates:
+            print(template)
+            print("")
+
+        return
+
+    with open(file, "w") as f:
+        for template in templates:
+            f.write(template)
+            f.write("\n\n")
+
+
+def create_ddl_command():
+    ddl_command = typer.Typer(name="ddl")
+    ddl_command.command(name="generate")(generate_ddl)
+
+    return ddl_command
diff --git a/pyflink/sedonuts/cli/confluent/generate_terraform.py 
b/pyflink/sedonuts/cli/confluent/generate_terraform.py
new file mode 100644
index 0000000000..5d131997f3
--- /dev/null
+++ b/pyflink/sedonuts/cli/confluent/generate_terraform.py
@@ -0,0 +1,20 @@
+import typer
+
+
+def generate(
+        file: str = typer.Option(None, "--file", "-f", help="Path to the 
Terraform configuration file"),
+):
+    if file is None:
+        pass
+
+
+
+
+def create_terraform():
+    terraform_app = typer.Typer(name="terraform")
+    terraform_app.command(name="generate")(
+        generate
+    )
+
+
+    return terraform_app
diff --git a/pyflink/sedonuts/cli/confluent/insert_with_cli.py 
b/pyflink/sedonuts/cli/confluent/insert_with_cli.py
new file mode 100644
index 0000000000..2a061a55ac
--- /dev/null
+++ b/pyflink/sedonuts/cli/confluent/insert_with_cli.py
@@ -0,0 +1,114 @@
+import json
+import time
+from dataclasses import dataclass
+
+import typer
+
+from sedonuts.cli.confluent.functions import list_functions
+from sedonuts.cli.confluent.template import function_template
+import concurrent.futures
+
+import subprocess
+
+
+@dataclass
+class FlinkSQLStatement:
+    name: str
+    status: str
+
+
+def describe_flink_sql_statement(name: str, environment: str):
+    # The command to run
+    command = [
+        'confluent', 'flink', 'statement', 'describe',
+        name, "--environment", environment, "--output", "json",
+        "--cloud", "aws", "--region", "us-east-1"
+    ]
+
+    # Run the command using subprocess.run
+    result = subprocess.run(command, capture_output=True, text=True)
+
+    # Check the result and print output or error
+    if result.returncode == 0:
+        result = result.stdout
+        metadata = json.loads(result)
+
+        return FlinkSQLStatement(
+            name=metadata["name"],
+            status=metadata["status"],
+        )
+
+    else:
+        print("Command failed:")
+        print(result.stderr)
+
+
+def run_flink_sql_statement(sql: str, compute_pool: str, database: str, 
environment: str, function_name: str):
+    print(f"Creating function for {function_name}")
+    command = [
+        'confluent', 'flink', 'statement', 'create', '--sql',
+        sql, '--compute-pool', compute_pool, '--database', database,
+        "--environment", environment, "--output", "json"
+    ]
+
+    result = subprocess.run(command, capture_output=True, text=True)
+
+    if result.returncode == 0:
+        result = result.stdout
+        metadata = json.loads(result)
+
+        name = metadata["name"]
+
+        metadata_update = describe_flink_sql_statement(name, environment)
+
+        while metadata_update.status == "RUNNING" or metadata_update.status == 
"PENDING":
+            time.sleep(2)
+            metadata_update = describe_flink_sql_statement(name, environment)
+
+        if metadata_update.status == "FAILED":
+            print(f"Command failed for {function_name}")
+            return
+
+        if metadata_update.status == "COMPLETED":
+            print(f"Command succeeded for {function_name}")
+            return
+
+    else:
+        print("Command failed:")
+        print(result.stderr.strip()) # The standard error output
+        print("")
+
+
+def apply(
+        file: str = typer.Option(None, "--file", "-f", help="Path to the 
Terraform configuration file"),
+        artifact_id: str = typer.Option(..., "--artifact-id", "-a", 
help="Artifact ID of the JAR file"),
+        database: str = typer.Option(..., "--database", "-d", help="Database 
name"),
+        compute_pool: str = typer.Option(..., "--compute-pool", "-c", 
help="Compute pool name"),
+        environment: str = typer.Option(..., "--environment", "-e", 
help="Environment name"),
+):
+    files = list_functions(
+        "1.8.0",
+        "2.12",
+        "/Users/pawelkocinski/Desktop/projects/sed/sedona/flink-shaded/target"
+    )
+
+    tasks = []
+    for f in files:
+        class_name = f.split("/")[-1].replace(".class", "")
+
+        sql = function_template.format(class_name, class_name, artifact_id)
+
+        tasks.append([sql, class_name])
+
+    with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
+        task_pool = [executor.submit(run_flink_sql_statement, sql, 
compute_pool, database, environment, class_name) for sql, class_name in tasks]
+
+        for future in concurrent.futures.as_completed(task_pool):
+            task_result = future.result()
+
+
+def create_confluent_cli_command():
+    cli_command = typer.Typer(name="cli")
+    cli_command.command(name="apply")(apply)
+
+    return cli_command
diff --git a/pyflink/sedonuts/cli/confluent/template.py 
b/pyflink/sedonuts/cli/confluent/template.py
new file mode 100644
index 0000000000..985d7f9a9e
--- /dev/null
+++ b/pyflink/sedonuts/cli/confluent/template.py
@@ -0,0 +1,3 @@
+function_template = """
+CREATE FUNCTION {} AS 'org.apache.sedona.flink.confluent.constructors.{}' 
USING JAR 'confluent-artifact://{}';
+""".strip()

Reply via email to