yangxk1 commented on code in PR #782: URL: https://github.com/apache/incubator-graphar/pull/782#discussion_r2469321173
########## maven-projects/spark/snb-graphar-bridge/src/main/scala/org/apache/graphar/datasources/ldbc/bridge/LdbcStreamingBridge.scala: ########## @@ -0,0 +1,479 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.graphar.datasources.ldbc.bridge + +import ldbc.snb.datagen.util.GeneratorConfiguration +import org.apache.graphar.datasources.ldbc.model.{ + ConversionResult, + StreamingConversionResult, + ValidationResult +} +import org.apache.graphar.datasources.ldbc.stream.core.{LdbcStreamingIntegrator} +import org.apache.graphar.datasources.ldbc.stream.model.IntegrationStatistics +import org.apache.spark.sql.SparkSession +import org.slf4j.{Logger, LoggerFactory} + +import scala.util.Try + +/** + * LDBC streaming bridge (dedicated to streaming processing) + * + * Provides complete LDBC streaming processing functionality: + * 1. Supports streaming processing mode for complete LDBC dataset 2. + * Standardized configuration and validation 3. GraphAr format output + */ +class LdbcStreamingBridge extends StreamingBridgeInterface with Serializable { + + private val logger: Logger = + LoggerFactory.getLogger(classOf[LdbcStreamingBridge]) + + /** + * Unified write method (following GraphAr GraphWriter interface) + */ + override def write( + path: String, + spark: SparkSession, + name: String, + vertex_chunk_size: Long, + edge_chunk_size: Long, + file_type: String + ): Try[ConversionResult] = { + logger.info( + s"LdbcStreamingBridge.write() called with path=$path, name=$name" + ) + logger.info("Using streaming mode for all write operations") + + // Convert to streaming configuration and call streaming processing + val streamingConfig = StreamingConfiguration( + ldbc_config_path = "ldbc_config.properties", + output_path = path, + scale_factor = "0.1", // Default scale factor + graph_name = name, + vertex_chunk_size = vertex_chunk_size, + edge_chunk_size = edge_chunk_size, + file_type = file_type + ) + + writeStreaming(streamingConfig)(spark).map(_.asInstanceOf[ConversionResult]) + } + + /** + * Streaming processing dedicated write method (using new streaming + * architecture) + */ + override def writeStreaming( + config: StreamingConfiguration + )(implicit spark: SparkSession): Try[StreamingConversionResult] = Try { + + logger.info("=== STREAMING MODE (FULL IMPLEMENTATION) ===") + logger.info( + "Using GraphArActivityOutputStream for true streaming processing" + ) + logger.info( + "Supporting complete LDBC entities: Person, Forum, Post, Comment, and all relationships" + ) + + val startTime = System.currentTimeMillis() + + // Create streaming integrator + val streamingIntegrator = new LdbcStreamingIntegrator( + output_path = config.output_path, + graph_name = config.graph_name, + vertex_chunk_size = config.vertex_chunk_size, + edge_chunk_size = config.edge_chunk_size, + file_type = config.file_type + ) + + try { + // Create LDBC configuration + val ldbcConfig = createLdbcConfiguration(config) + + // Execute streaming conversion + val conversionResult = + streamingIntegrator.executeStreamingConversion(ldbcConfig) + + conversionResult match { + case scala.util.Success(result) => + logger.info( + s"✓ Streaming conversion completed: ${result.processingDurationMs}ms" + ) + logger.info( + s"✓ Supported entity types: ${streamingIntegrator.getSupportedEntityTypes().mkString(", ")}" + ) + logger.info( + s"✓ Processed entities: ${result.integrationStatistics.processedEntities.mkString(", ")}" + ) + logger.info( + s"✓ Total records: ${result.integrationStatistics.totalRecords}" + ) + logger.info( + "✓ Streaming conversion completed, real LDBC data generation successful" + ) + result + + case scala.util.Failure(exception) => + logger.error("✗ Streaming conversion failed", exception) + throw exception // Directly throw exception, no fallback + } + + } finally { + streamingIntegrator.cleanup() + } + } + + /** + * Create LDBC configuration + */ + private def createLdbcConfiguration( + config: StreamingConfiguration + ): GeneratorConfiguration = { + val ldbcConfig = new GeneratorConfiguration( + new java.util.HashMap[String, String]() + ) + + // *** Complete LDBC configuration based on params_default.ini *** + + // Basic probability parameters + ldbcConfig.map.put("generator.baseProbCorrelated", "0.95") Review Comment: giving users control is better ########## maven-projects/spark/snb-graphar-bridge/README.md: ########## @@ -0,0 +1,182 @@ +# LDBC SNB to GraphAr Bridge + +This module provides a direct memory pipeline for converting LDBC Social Network Benchmark (SNB) data to Apache GraphAr format. + +## Design + +**Dual-Track Architecture**: +- **Static entities** (Person, Place, Organisation, Tag, TagClass): RDD-based batch processing from LDBC dictionaries +- **Dynamic entities** (Forum, Post, Comment): Streaming architecture processing LDBC activity serializer output + +**Key Features**: +- Direct memory pipeline eliminates intermediate CSV file I/O +- Supports all 31 LDBC SNB entity types (8 vertices + 23 edges) +- Fully compliant with GraphAr v1.0 standard +- Batch processing with configurable chunk sizes + +## Building + +### Prerequisites + +- Java 8 or Java 11 +- Maven 3.6+ +- SBT 1.x (for LDBC dependency) +- Apache Spark 3.5.1 + +### Build LDBC SNB Datagen Dependency + +```bash +cd ../ldbc-snb-datagen +sbt assembly +``` + +### Build This Module + +**IMPORTANT**: Build from project root, not from this directory. + +```bash +cd /path/to/incubator-graphar/maven-projects + +# Build with dependencies +mvn clean install -DskipTests \ + -pl spark/graphar,spark/datasources-35,spark/snb-graphar-bridge -am +``` + +## Usage + +### Quick Start + +```bash +spark-submit \ + --class org.apache.graphar.datasources.ldbc.examples.LdbcEnhancedBridgeExample \ + --master "local[*]" \ + --jars ../ldbc-snb-datagen/target/ldbc_snb_datagen_2.12_spark3.2-0.5.1+23-1d60a657-jar-with-dependencies.jar \ + target/snb-graphar-bridge-0.13.0-SNAPSHOT.jar \ + 0.1 /tmp/graphar_output ldbc_test 256 256 parquet +``` + +### Parameters + +| Parameter | Description | Example | +|-----------|-------------|---------| +| scaleFactor | LDBC scale factor | `0.1` | +| outputPath | Output directory | `/tmp/graphar_output` | +| graphName | Graph identifier | `ldbc_test` | +| vertexChunkSize | Vertex chunk size | `256` | +| edgeChunkSize | Edge chunk size | `256` | +| fileType | File format | `parquet` | + +**Scale Factor Guidelines**: Use SF≥0.1 for full testing (SF0.003 has limited dynamic entities). + +### Output Structure + +``` +/tmp/graphar_output/ +├── ldbc_test.graph.yml Review Comment: Whether to generate `xxx.vertex.yml` and `xxx.edge.yml` ########## maven-projects/spark/snb-graphar-bridge/src/main/scala/org/apache/graphar/datasources/ldbc/bridge/LdbcGraphArBridge.scala: ########## @@ -0,0 +1,1100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.graphar.datasources.ldbc.bridge + +import org.apache.graphar.datasources.ldbc.model.{ + ConversionResult, + ValidationResult, + ValidationSuccess, + ValidationFailure +} +import org.apache.graphar.datasources.ldbc.stream.core.GraphArDataCollector +import org.apache.graphar.datasources.ldbc.stream.processor.{ + UnifiedIdManager, + PersonRDDProcessor, + StaticEntityProcessor +} +import org.apache.graphar.datasources.ldbc.stream.strategy.{ + EnhancedOutputStrategySelector, + OutputStrategy, + SystemResourceInfo, + StrategyDecision +} +import org.apache.graphar.datasources.ldbc.stream.writer.StreamingGraphArWriter +import org.apache.graphar.datasources.ldbc.stream.output.GraphArActivityOutputStream +import org.apache.graphar.graph.GraphWriter +import org.apache.spark.sql.SparkSession +import org.slf4j.{Logger, LoggerFactory} + +import scala.util.{Try, Success, Failure} +import java.io.File +import java.nio.file.{Files, Paths} + +/** + * Main controller for LDBC to GraphAr conversion + * + * Implements a dual-track processing architecture: + * 1. Static data (Person, etc.) processed via RDD-based batch processing 2. + * Dynamic data processed via streaming architecture 3. Intelligently + * selects output strategy and generates standard GraphAr format + */ +class LdbcGraphArBridge extends LdbcBridgeInterface { + + private val logger: Logger = + LoggerFactory.getLogger(classOf[LdbcGraphArBridge]) + + /** + * Unified write method (following GraphAr GraphWriter pattern) + */ + override def write( + path: String, + spark: SparkSession, + name: String, + vertex_chunk_size: Long, + edge_chunk_size: Long, + file_type: String + ): Try[ConversionResult] = { + + logger.info( + s"Starting LDBC to GraphAr conversion: path=$path, name=$name, vertex_chunk_size=$vertex_chunk_size, edge_chunk_size=$edge_chunk_size, file_type=$file_type" + ) + + Try { + implicit val sparkSession: SparkSession = spark + + // 1. Parameter validation + val validation = validateConfiguration( + "dual_track", + path, + vertex_chunk_size, + edge_chunk_size, + file_type + ) + if (!validation.isSuccess) { + throw new IllegalArgumentException( + s"Configuration validation failed: ${validation.getErrors.mkString(", ")}" + ) + } + + // 2. Initialize enhanced components + val systemResources = SystemResourceInfo.current() + val scaleFactor = getScaleFactor(spark) + + logger.info(s"Initializing enhanced components: scaleFactor=$scaleFactor") + + val idManager = new UnifiedIdManager(scaleFactor) + idManager.initialize() + + val dataCollector = new GraphArDataCollector(path, name, idManager) + val strategySelector = new EnhancedOutputStrategySelector() + + // 3. Create temporary directories + val tempDir = createTempDirectories(path) + + try { + // 4. First track: Process static entities + val staticResult = + processStaticEntities(dataCollector, tempDir.staticPath) + logger.info(s"Static data processing completed: $staticResult") + + // 5. Second track: Process dynamic streaming data + val streamingResult = processStreamingEntities( + dataCollector, + tempDir.streamingPath, + name, + vertex_chunk_size, + edge_chunk_size, + file_type + ) + logger.info(s"Streaming data processing completed: $streamingResult") + + // 6. Intelligent strategy selection + val strategyDecision = strategySelector.selectStrategyFromCollector( + dataCollector, + systemResources + ) + logger.info(s"Selected strategy: ${strategyDecision}") + + // 7. Unified output based on selected strategy + val finalResult = executeSelectedStrategy( + path, + tempDir, + dataCollector, + strategyDecision, + name, + vertex_chunk_size, + edge_chunk_size, + file_type + ) + + // 8. Generate processing report + val processingReport = + generateProcessingReport(dataCollector, strategyDecision, finalResult) + logger.info(s"Processing report: $processingReport") + + ConversionResult( + personCount = dataCollector.getTotalEntityCount().toLong, + knowsCount = 0L, + interestCount = 0L, + workAtCount = 0L, + studyAtCount = 0L, + locationCount = 0L, + outputPath = path, + conversionTime = System.currentTimeMillis(), + warnings = List( + s"Strategy=${strategyDecision.strategy}, Confidence=${(strategyDecision.confidence * 100).toInt}%" + ) + ) + + } finally { + // Cleanup temporary directories + cleanupTempDirectories(tempDir) + + // Cleanup data collector + dataCollector.cleanup() + } + + }.recoverWith { case e: Exception => + logger.error("LDBC to GraphAr conversion failed", e) + Try( + ConversionResult( + personCount = 0L, + knowsCount = 0L, + outputPath = path, + warnings = List(s"Conversion failed: ${e.getMessage}") + ) + ) + } + } + + /** + * Validate configuration parameters + */ + override def validateConfiguration( + mode: String, + output_path: String, + vertex_chunk_size: Long, + edge_chunk_size: Long, + file_type: String + ): ValidationResult = { + + val errors = scala.collection.mutable.ListBuffer[String]() + + if (output_path.trim.isEmpty) { + errors += "Output path cannot be empty" + } + + if (vertex_chunk_size <= 0) { + errors += s"Vertex chunk size must be positive: $vertex_chunk_size" + } + + if (edge_chunk_size <= 0) { + errors += s"Edge chunk size must be positive: $edge_chunk_size" + } + + val supportedFileTypes = Set("csv", "parquet", "orc") + if (!supportedFileTypes.contains(file_type.toLowerCase)) { + errors += s"Unsupported file type: $file_type. Supported types: ${supportedFileTypes.mkString(", ")}" + } + + if (!Set("dual_track", "static", "streaming").contains(mode)) { + errors += s"Unsupported processing mode: $mode" + } + + if (errors.isEmpty) { + ValidationSuccess + } else { + ValidationFailure(errors.toList) + } + } + + /** + * Get supported processing modes + */ + override def getSupportedModes(): List[String] = { + List("dual_track", "static", "streaming") + } + + /** + * Get bridge type identifier + */ + override def getBridgeType(): String = "ldbc_enhanced_dual_track" + + /** + * Process static entity data + */ + private def processStaticEntities( + dataCollector: GraphArDataCollector, + tempStaticPath: String + )(implicit spark: SparkSession): StaticProcessingResult = { + + logger.info("Starting to process static entity data") + + try { + // Use PersonRDDProcessor to process Person and its relationships + val personProcessor = new PersonRDDProcessor(dataCollector.idManager) + val personResult = personProcessor.processAndCollect(dataCollector).get + + // Process other static entities (Organisation, Place, Tag, etc.) + processOtherStaticEntities(dataCollector, dataCollector.idManager) + + StaticProcessingResult( + success = true, + processedEntities = + List("Person", "Organisation", "Place", "Tag", "TagClass"), + personResult = Some(personResult), + totalVertices = personResult.personCount, + totalEdges = + personResult.knowsCount + personResult.hasInterestCount + personResult.studyAtCount + personResult.workAtCount + personResult.isLocatedInCount + ) + + } catch { + case e: Exception => + logger.error("Static entity processing failed", e) + StaticProcessingResult( + success = false, + processedEntities = List.empty, + personResult = None, + totalVertices = 0, + totalEdges = 0, + error = Some(e.getMessage) + ) + } + } + + /** + * Process streaming entity data + */ + private def processStreamingEntities( + dataCollector: GraphArDataCollector, + tempStreamingPath: String, + graph_name: String, + vertex_chunk_size: Long, + edge_chunk_size: Long, + file_type: String + )(implicit spark: SparkSession): StreamingProcessingResult = { + + logger.info("Starting to process streaming entity data") + + try { + // Use existing streaming components + val streamingWriter = new StreamingGraphArWriter( + output_path = tempStreamingPath, + graph_name = graph_name, + vertex_chunk_size = vertex_chunk_size, + edge_chunk_size = edge_chunk_size, + file_type = file_type + ) + + val activityOutputStream = new GraphArActivityOutputStream( + output_path = tempStreamingPath, + graph_name = graph_name, + file_type = file_type + ) + + // Call real LDBC dynamic data generation + simulateStreamingDataProcessing(activityOutputStream) + + activityOutputStream.close() + + // Scan temporary directory to collect dynamic entity information + logger.info( + "Scanning temporary streaming directory to collect dynamic entity information" + ) + val streamingEntities = + discoverStreamingEntities(tempStreamingPath, file_type) + + logger.info( + s"Found ${streamingEntities.size} dynamic entity types: ${streamingEntities.keys.mkString(", ")}" + ) + + streamingEntities.foreach { case (entityType, (entityPath, isVertex)) => + try { + // Read entity data + val entityDF = file_type.toLowerCase match { + case "parquet" => spark.read.parquet(entityPath) + case "csv" => spark.read.option("header", "true").csv(entityPath) + case "orc" => spark.read.orc(entityPath) + case _ => spark.read.parquet(entityPath) + } + + val rowCount = entityDF.count() + logger.info( + s"Reading dynamic entity $entityType: $rowCount records, path: $entityPath" + ) + + // Add to data collector + if (isVertex) { + dataCollector.addStaticVertexData(entityType, entityDF) + } else { + // Parse edge relationship + val relation = parseEdgeRelationFromPath(entityType) + dataCollector.addStaticEdgeData(relation, entityDF) + } + } catch { + case e: Exception => + logger.error(s"Failed to read dynamic entity $entityType", e) + } + } + + StreamingProcessingResult( + success = true, + processedEntities = streamingEntities.keys.toList, + totalChunks = 0, + totalRows = streamingEntities.size + ) + + } catch { + case e: Exception => + logger.error("Streaming entity processing failed", e) + StreamingProcessingResult( + success = false, + processedEntities = List.empty, + totalChunks = 0, + totalRows = 0, + error = Some(e.getMessage) + ) + } + } + + /** + * Execute selected output strategy + */ + private def executeSelectedStrategy( + output_path: String, + tempDir: TempDirectoryInfo, + dataCollector: GraphArDataCollector, + strategyDecision: StrategyDecision, + graph_name: String, + vertex_chunk_size: Long, + edge_chunk_size: Long, + file_type: String + )(implicit spark: SparkSession): OutputExecutionResult = { + + logger.info(s"Executing output strategy: ${strategyDecision.strategy}") + + strategyDecision.strategy match { + case OutputStrategy.COMPLETE_STANDARD => + executeCompleteStandardStrategy( + output_path, + tempDir, + dataCollector, + graph_name, + vertex_chunk_size, + edge_chunk_size, + file_type + ) + + case OutputStrategy.HYBRID_DOCUMENTED => + executeHybridDocumentedStrategy( + output_path, + tempDir, + dataCollector, + graph_name, + vertex_chunk_size, + edge_chunk_size, + file_type + ) + } + } + + /** + * Execute complete standard strategy + */ + private def executeCompleteStandardStrategy( + output_path: String, + tempDir: TempDirectoryInfo, + dataCollector: GraphArDataCollector, + graph_name: String, + vertex_chunk_size: Long, + edge_chunk_size: Long, + file_type: String + )(implicit spark: SparkSession): OutputExecutionResult = { + + logger.info("Executing complete standard strategy") + + try { + val graphWriter = new GraphWriter() + + // 1. Add all static data to GraphWriter + val staticDataFrames = dataCollector.getStaticDataFrames() + logger.info(s"Retrieved static DataFrame count: ${staticDataFrames.size}") + + staticDataFrames.foreach { case (entityType, df) => + val rowCount = df.count() + logger.info( + s"Adding static vertex data to GraphWriter: $entityType, record count: $rowCount" + ) + + // Remove internal columns added by UnifiedIdManager, let GraphWriter manage vertex indices + val cleanDF = + df.drop("_graphArVertexIndex", "_entityType", "_idSpaceCategory") + graphWriter.PutVertexData(entityType, cleanDF) + } + + val staticEdgeFrames = dataCollector.getStaticEdgeFrames() + logger.info( + s"Retrieved static edge DataFrame count: ${staticEdgeFrames.size}" + ) + + staticEdgeFrames.foreach { case (relation, df) => + val rowCount = df.count() + logger.info( + s"Adding static edge data to GraphWriter: ${relation._1}_${relation._2}_${relation._3}, record count: $rowCount" + ) + graphWriter.PutEdgeData(relation, df) + } + + // 2. Convert streaming chunk data to DataFrame and add to GraphWriter + val streamingEntityInfo = dataCollector.getStreamingEntityInfo() + logger.info( + s"Retrieved streaming entity count: ${streamingEntityInfo.size}" + ) + + streamingEntityInfo.foreach { case (entityType, info) => + logger.info( + s"Processing streaming entity: $entityType, chunks: ${info.chunkCount}, rows: ${info.totalRows}" + ) + val mergedDF = convertChunksToDataFrame( + tempDir.streamingPath, + entityType, + info, + file_type + ) + + if (isVertexEntity(entityType)) { + graphWriter.PutVertexData(entityType, mergedDF) + logger.info( + s"Adding streaming vertex data to GraphWriter: $entityType" + ) + } else { + val relation = parseEdgeRelation(entityType) + graphWriter.PutEdgeData(relation, mergedDF) + logger.info( + s"Adding streaming edge data to GraphWriter: ${relation._1}_${relation._2}_${relation._3}" + ) + } + } + + // 3. Generate complete standard GraphAr output in one shot + logger.info( + s"Starting GraphWriter write: output_path=$output_path, graph_name=$graph_name" + ) + graphWriter.write( + output_path, + spark, + graph_name, + vertex_chunk_size, + edge_chunk_size, + file_type + ) + logger.info("GraphWriter write completed") + + OutputExecutionResult( + success = true, + strategy = OutputStrategy.COMPLETE_STANDARD, + outputFormat = "complete_graphar_standard", + totalEntities = dataCollector.getTotalEntityCount() + ) + + } catch { + case e: Exception => + logger.error("Complete standard strategy execution failed", e) + OutputExecutionResult( + success = false, + strategy = OutputStrategy.COMPLETE_STANDARD, + outputFormat = "failed", + totalEntities = 0, + error = Some(e.getMessage) + ) + } + } + + /** + * Execute hybrid documented strategy + */ + private def executeHybridDocumentedStrategy( + output_path: String, + tempDir: TempDirectoryInfo, + dataCollector: GraphArDataCollector, + graph_name: String, + vertex_chunk_size: Long, + edge_chunk_size: Long, + file_type: String + )(implicit spark: SparkSession): OutputExecutionResult = { + + logger.info("Executing hybrid documented strategy") + + try { + // 1. Process static data part + val staticGraphWriter = new GraphWriter() + + dataCollector.getStaticDataFrames().foreach { case (entityType, df) => + staticGraphWriter.PutVertexData(entityType, df) + } + + dataCollector.getStaticEdgeFrames().foreach { case (relation, df) => + staticGraphWriter.PutEdgeData(relation, df) + } + + // Write static part to temporary directory + val tempStaticOutputPath = s"$output_path/.temp_static_final" + staticGraphWriter.write( + tempStaticOutputPath, + spark, + s"${graph_name}_static", + vertex_chunk_size, + edge_chunk_size, + file_type + ) + + // 2. Merge directory structures + mergeDirectoryStructures( + tempStaticOutputPath, + tempDir.streamingPath, + output_path + ) + + // 3. Generate unified standard YAML metadata + generateHybridStandardYamls( + output_path, + dataCollector, + graph_name, + vertex_chunk_size, + edge_chunk_size, + file_type + ) + + OutputExecutionResult( + success = true, + strategy = OutputStrategy.HYBRID_DOCUMENTED, + outputFormat = "hybrid_with_standard_metadata", + totalEntities = dataCollector.getTotalEntityCount() + ) + + } catch { + case e: Exception => + logger.error("Hybrid documented strategy execution failed", e) + OutputExecutionResult( + success = false, + strategy = OutputStrategy.HYBRID_DOCUMENTED, + outputFormat = "failed", + totalEntities = 0, + error = Some(e.getMessage) + ) + } + } + + // Helper method implementations + private def getScaleFactor(spark: SparkSession): Double = { + // Get scale factor from Spark configuration or system properties + spark.conf + .getOption("ldbc.scale.factor") + .map(_.toDouble) + .getOrElse(0.003) // Default scale factor + } + + private def createTempDirectories(basePath: String): TempDirectoryInfo = { + val tempDir = TempDirectoryInfo( + basePath = basePath, + staticPath = s"$basePath/.temp_static", + streamingPath = s"$basePath/.temp_streaming" + ) + + Files.createDirectories(Paths.get(tempDir.staticPath)) + Files.createDirectories(Paths.get(tempDir.streamingPath)) + + tempDir + } + + private def cleanupTempDirectories(tempDir: TempDirectoryInfo): Unit = { + try { + if (Files.exists(Paths.get(tempDir.staticPath))) { + deleteDirectory(new File(tempDir.staticPath)) + } + if (Files.exists(Paths.get(tempDir.streamingPath))) { + deleteDirectory(new File(tempDir.streamingPath)) + } + } catch { + case e: Exception => + logger.warn("Failed to cleanup temporary directories", e) + } + } + + private def deleteDirectory(directory: File): Boolean = { + if (directory.exists()) { + directory.listFiles().foreach { file => + if (file.isDirectory) { + deleteDirectory(file) + } else { + file.delete() + } + } + directory.delete() + } else { + true + } + } + + /** + * Scan temporary streaming directory to discover generated dynamic entities + * Returns Map[EntityType, (Path, IsVertex)] + */ + private def discoverStreamingEntities( + tempStreamingPath: String, + file_type: String + ): Map[String, (String, Boolean)] = { + import java.nio.file.{Files, Paths} + import scala.collection.JavaConverters._ + + val result = scala.collection.mutable.Map[String, (String, Boolean)]() + + try { + // Check vertex directory + val vertexDir = Paths.get(tempStreamingPath, "vertex") + if (Files.exists(vertexDir) && Files.isDirectory(vertexDir)) { + Files.list(vertexDir).iterator().asScala.foreach { vertexPath => + if (Files.isDirectory(vertexPath)) { + val entityType = vertexPath.getFileName.toString + result(entityType) = + (vertexPath.toString, true) // true indicates vertex + } + } + } + + // Check edge directory + val edgeDir = Paths.get(tempStreamingPath, "edge") + if (Files.exists(edgeDir) && Files.isDirectory(edgeDir)) { + Files.list(edgeDir).iterator().asScala.foreach { edgePath => + if (Files.isDirectory(edgePath)) { + val entityType = edgePath.getFileName.toString + result(entityType) = + (edgePath.toString, false) // false indicates edge + } + } + } + + // Check entities in root directory (like Photo) + val rootDir = Paths.get(tempStreamingPath) + if (Files.exists(rootDir) && Files.isDirectory(rootDir)) { + Files.list(rootDir).iterator().asScala.foreach { entityPath => + val fileName = entityPath.getFileName.toString + if ( + Files.isDirectory(entityPath) && !fileName + .startsWith(".") && fileName != "vertex" && fileName != "edge" + ) { + // Determine whether it's a vertex or edge (by name pattern) + val isVertex = !fileName.contains("_") + result(fileName) = (entityPath.toString, isVertex) + } + } + } + } catch { + case e: Exception => + logger.error("Failed to scan streaming directory", e) + } + + result.toMap + } + + /** + * Parse edge relationship from path name + */ + private def parseEdgeRelationFromPath( + pathName: String + ): (String, String, String) = { + val parts = pathName.split("_") + if (parts.length >= 3) { + val src = parts(0) + val dst = parts.last + val edge = parts.slice(1, parts.length - 1).mkString("_") + (src, edge, dst) + } else { + logger.warn( + s"Failed to parse edge relation: $pathName, using default value" + ) + ("Unknown", pathName, "Unknown") + } + } + + private def processOtherStaticEntities( + dataCollector: GraphArDataCollector, + idManager: UnifiedIdManager + )(implicit spark: SparkSession): Unit = { + logger.info("Processing other static entities") + + try { + // Create static entity processor + val staticProcessor = new StaticEntityProcessor(idManager)(spark) + + // Generate all static entities + val staticEntities = staticProcessor.generateAllStaticEntities() + + if (staticEntities.isEmpty) { + logger.warn( + "Failed to generate any static entities, LDBC dictionaries may not be initialized" + ) + return + } + + // Add vertices to data collector + staticEntities.foreach { case (entityType, df) => + val recordCount = df.count() + if (recordCount > 0) { + logger.info(s"Adding static entity $entityType: $recordCount records") + dataCollector.addStaticVertexData(entityType, df) + } else { + logger.warn(s"Static entity $entityType is empty, skipping") + } + } + + // Generate and add static edge data + logger.info("Starting to generate static edge data") + val staticEdges = staticProcessor.generateAllStaticEdges() + + staticEdges.foreach { case (edgeKey, df) => + val recordCount = df.count() + if (recordCount > 0) { + // Parse edge relation tuple: Organisation_isLocatedIn_Place -> (Organisation, isLocatedIn, Place) + val parts = edgeKey.split("_") + val relation = if (parts.length >= 3) { + val src = parts(0) + val dst = parts.last + val edge = parts.slice(1, parts.length - 1).mkString("_") + (src, edge, dst) + } else { + logger.error(s"Failed to parse edge relation: $edgeKey") + ("Unknown", "Unknown", "Unknown") + } + + logger.info( + s"Adding static edge data ${relation._1}_${relation._2}_${relation._3}: $recordCount records" + ) + dataCollector.addStaticEdgeData(relation, df) + } else { + logger.warn(s"Static edge $edgeKey is empty, skipping") + } + } + + logger.info("Static entity and edge processing completed") + + } catch { + case e: Exception => + logger.error("Error occurred while processing static entities", e) + // Continue processing, don't interrupt the entire workflow + logger.warn( + "Static entity processing failed, but continuing with subsequent steps" + ) + } + } + + private def simulateStreamingDataProcessing( + outputStream: GraphArActivityOutputStream + )(implicit spark: SparkSession): Unit = { + logger.info( + "Starting to process dynamic data stream (real LDBC Activity generation)" + ) + + try { + // Import LDBC required classes + import ldbc.snb.datagen.generator.DatagenParams + import ldbc.snb.datagen.generator.generators.{ + SparkPersonGenerator, + PersonActivityGenerator + } + import scala.collection.JavaConverters._ + + // Create LDBC configuration + val ldbcConfig = createLdbcConfiguration() + DatagenParams.readConf(ldbcConfig) + + logger.info( + s"Starting to generate LDBC dynamic data, scaleFactor=${getScaleFactor(spark)}" + ) + + // 1. Generate Person data (foundation of dynamic data) + val personRDD = SparkPersonGenerator(ldbcConfig)(spark) + val personCount = personRDD.count() + logger.info(s"Person RDD generation completed: $personCount Persons") + + // 2. Group Persons by block (following LDBC PersonSorter logic) + val blockSize = DatagenParams.blockSize + val personsByBlock = personRDD + .map { person => (person.getAccountId / blockSize, person) } + .groupByKey() + .collect() Review Comment: Is the memory enough for large SF? ########## .github/workflows/pyspark.yml: ########## @@ -53,10 +53,13 @@ jobs: python-version: '3.10' - name: Install Poetry - working-directory: pyspark run: | - yes | sudo python3 -m pip install poetry --quiet - poetry env use python3 + curl -sSL https://install.python-poetry.org | python3 - + echo "$HOME/.local/bin" >> $GITHUB_PATH Review Comment: Please check if this is necessary -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
