Hello. I have a json key named viewLastModified. It has a value
of 1652135219. Using an Epoch Converter manually (
https://www.epochconverter.com/), I expect to convert this with my Groovy
script to something in this ballpark:
GMT: Monday, May 9, 2022 10:26:59 PM
Your time zone: Monday, May 9, 2022 6:26:59 PM GMT-04:00
<https://www.epochconverter.com/timezones?q=1652135219> DST
Relative: 2 years ago

But my code fails, and I'm not sure why.
Using the code I wrote, I process it and get this result:
"viewLastModified": [
    {
      "chronology": {
        "calendarType": "iso8601",
        "id": "ISO",
        "isoBased": true
      },
      "dayOfMonth": 11,
      "dayOfWeek": "SATURDAY",
      "dayOfYear": 192,
      "era": "CE",
      "leapYear": false,
      "month": "JULY",
      "monthValue": 7,
      "year": 1970
    }
  ]

Can anyone see where I have an error when I try to process a pattern that
is seconds since the epoch?

My code:
import java.util.regex.Pattern
import java.time.LocalDate
import java.time.LocalDateTime
import java.time.format.DateTimeFormatter
import java.time.format.DateTimeParseException
import java.time.Instant
import java.time.ZoneId
import groovy.json.JsonSlurper
import groovy.json.JsonBuilder
import org.apache.nifi.processor.io.StreamCallback
import org.apache.nifi.flowfile.FlowFile

// Combined regex pattern to match various date formats including Unix
timestamp
def combinedPattern = Pattern.compile(/\b(\d{8})|\b(\d{4}[' ,-\\/]+\d{2}['
,-\\/]+\d{2})|\b(\d{2}[' ,-\\/]+\d{2}['
,-\\/]+\d{4})|\b(?:Jan|Feb|Mar|Apr|May|Jun|Jul|Aug|Sep|Oct|Nov|Dec)['
,-\\/]+\d{2}['
,-\\/]+\d{4}|\b(?:January|February|March|April|May|June|July|August|September|October|November|December)['
,-\\/]+\d{2}[' ,-\\/]+\d{4}\b|\b\d{10}\b/)

// Precompile date formats for faster reuse
def dateFormats = [
    DateTimeFormatter.ofPattern('yyyyMMdd'),
    DateTimeFormatter.ofPattern('dd MMM, yyyy'),
    DateTimeFormatter.ofPattern('MMM dd, yyyy'),
    DateTimeFormatter.ofPattern('yyyy MMM dd'),
    DateTimeFormatter.ofPattern('MMMM dd, yyyy')
]

// Helper function to parse a date string using predefined formats
def parseDate(String dateStr, List<DateTimeFormatter> dateFormats) {
    for (format in dateFormats) {
        try {
            return LocalDate.parse(dateStr, format)
        } catch (DateTimeParseException e) {
            // Continue trying other formats if the current one fails
        }
    }
    return null
}

// Helper function to parse a Unix timestamp
def parseUnixTimestamp(String timestampStr) {
    try {
        long timestamp = Long.parseLong(timestampStr)
        // Validate if the timestamp is in a reasonable range
        if (timestamp >= 0 && timestamp <= Instant.now().getEpochSecond()) {
            return
Instant.ofEpochSecond(timestamp).atZone(ZoneId.systemDefault()).toLocalDateTime().toLocalDate()
        }
    } catch (NumberFormatException e) {
        // If parsing fails, return null
    }
    return null
}

// Helper function to validate date within a specific range
boolean validateDate(LocalDate date) {
    def currentYear = LocalDate.now().year
    def year = date.year
    return year >= currentYear - 120 && year <= currentYear + 40
}

// Function to process and normalize dates
def processDates(List<String> dates, List<DateTimeFormatter> dateFormats) {
    dates.collect { dateStr ->
        def parsedDate = parseDate(dateStr, dateFormats)
        if (parsedDate == null) {
            parsedDate = parseUnixTimestamp(dateStr)
        }
        log.info("Parsed date: ${parsedDate}")
        parsedDate
    }.findAll { it != null && validateDate(it) }
     .unique()
     .sort()
}

// Define the list of substrings to check in key names
def dateRelatedSubstrings = ['birth', 'death', 'dob', 'date', 'updated',
'modified', 'created', 'deleted', 'registered', 'times', 'datetime', 'day',
'month', 'year', 'week', 'epoch', 'period']

// Start of NiFi script execution
def ff = session.get()
if (!ff) return

try {
    log.info("Starting processing of FlowFile: ${ff.getId()}")

    // Extract JSON content for processing
    String jsonKeys = ff.getAttribute('payload.json.keys')
    log.info("JSON keys: ${jsonKeys}")
    def keysMap = new JsonSlurper().parseText(jsonKeys)
    def results = [:]

    // Process each key-value pair in the JSON map
    keysMap.each { key, value ->
        def datesForThisKey = []
        log.info("Processing key: ${key}")

        // Check if the key contains any of the specified substrings
        if (dateRelatedSubstrings.any { key.toLowerCase().contains(it) }) {
            // Read and process the content of the FlowFile
            ff = session.write(ff, { inputStream, outputStream ->
                def bufferedReader = new BufferedReader(new
InputStreamReader(inputStream))
                def bufferedWriter = new BufferedWriter(new
OutputStreamWriter(outputStream))
                String line

                // Read each line of the input stream
                while ((line = bufferedReader.readLine()) != null) {
                    // Check if the line contains the key
                    if (line.contains(key)) {
                        def matcher = combinedPattern.matcher(line)
                        // Find all matching date patterns in the line
                        while (matcher.find()) {
                            datesForThisKey << matcher.group(0)
                        }
                    }
                    bufferedWriter.write(line)
                    bufferedWriter.newLine()
                }

                bufferedReader.close()
                bufferedWriter.close()
            } as StreamCallback)

            // Process and store dates for the current key
            if (!datesForThisKey.isEmpty()) {
                log.info("Found dates for key ${key}: ${datesForThisKey}")
                results[key] = processDates(datesForThisKey, dateFormats)
                log.info("Processed dates for key ${key}: ${results[key]}")
            }
        } else {
            log.info("Key ${key} does not contain date-related substrings,
skipping.")
            results[key] = []
        }
    }

    // Serialize results to JSON and store in FlowFile attribute
    def jsonBuilder = new JsonBuilder(results)
    ff = session.putAttribute(ff, 'payload.json.dates',
jsonBuilder.toPrettyString())
    log.info("Successfully processed FlowFile: ${ff.getId()}")
    session.transfer(ff, REL_SUCCESS)
} catch (Exception e) {
    log.error("Failed processing FlowFile: ${ff.getId()}", e)
    session.transfer(ff, REL_FAILURE)
}

I'm producing something, but it isn't the correct something.

Reply via email to