Hi Mathew, thanks for answering this, I've also tried with a simple case
class and it works fine.
I'm using this case class structure, which is failing:
import java.text.SimpleDateFormat
import java.util.Calendar
import scala.annotation.tailrec
trait TabbedToString {
_: Product =>
override def toString: String =
excludeSome(productIterator.mkString(",").replaceAll("None",
"null"))
@tailrec
private def excludeSome(s: String): String = {
val someIdx = s.indexOf("Some(")
if (someIdx < 0) {
s
} else {
val endIdx = s.indexOf(")", someIdx)
excludeSome(s.substring(0, someIdx + 5) + s.substring(endIdx + 1))
}
}
}
case class PartialCallEndModule(failoverCorrelationId: String = null)
extends TabbedToString
case class PartialCallBeginModule(failoverCorrelationId: String = null)
extends TabbedToString
case class CallingPartyAddress(callingPartyAddress: String) extends
TabbedToString
case class EarlyMedia(sdpOfferTimestamp: String= null,
sdpAnswerTimestamp: String= null,
earlyMediaSdp: String= null,
earlyMediaInitiatorFlag: String = null) extends
TabbedToString
case class MessageBody(bodyContentType: String= null,
bodyContentLength: String= null,
bodyContentDisposition: String= null, bodyOriginator:
String = null) extends TabbedToString
case class TgppModule(primaryDeviceLinePort: String= null,
calledAssertedIdentity: String= null,
calledAssertedPresentationIndicator: String= null,
sdp: String= null, mediaInitiatorFlag: String= null,
earlyMediaList: Array[EarlyMedia],
messageBodyList: Array[MessageBody],
sipErrorCode: String = null,
callingPartyAddressList: Array[CallingPartyAddress])
extends TabbedToString
case class CorrelationInfo(key: String= null, creator: String= null,
originatorNetwork: String= null,
terminatorNetwork: String= null,
otherInfoInPCV: String = null) extends
TabbedToString
case class IpModule(route: String= null, networkCallID: String= null,
codec: String= null, accessDeviceAddress: String= null,
accessCallID: String= null, accessNetworkInfo: String=
null,
correlationInfo: CorrelationInfo= null,
chargingFunctionAddresses: String= null,
codecUsage: String= null, routingNumber: String= null,
pCamelLocInfo: String= null,
pCamelMscAddress: String= null,
pCamelCellIDorLAI: String= null, userAgent: String=
null,
gets: String = null) extends TabbedToString
case class Location(location: String, locationType: String)
extends TabbedToString
case class LocationInformation(location: Location = null, locationType:
String = null)
case class Host(group: String= null, userId: String= null,
userNumber: String= null,
groupNumber: String = null) extends TabbedToString
case class FlexibleSeatingHost(hostGroup: String= null, hostUserId: String=
null, hostUserNumber: String= null,
hostGroupNumber:String = null)
case class ServiceExtension(serviceName: String= null,
invocationTime: String= null, facResult:
String= null,
host: Host= null, pushToTalk: String= null,
relatedCallId: String= null,
mediaSelection: String= null,
action: String= null, result: String= null,
`type`: String= null,
startTime: String= null, stopTime: String= null,
confId: String= null,
locationActivationResult: String= null,
locationDeactivationResult: String= null,
callRetrieveResult: String= null,
charge: String= null,
currency: String= null, time: String= null,
sum: String= null, callBridgeResult: String=
null,
nightServiceActivationMOResult: String= null,
nightServiceDeactivationMOResult: String= null,
forcedForwardingActivationResult: String= null,
forcedForwardingDeactivationResult: String=
null,
outgoingCallCenterCallFACResult: String= null,
outgoingPersonalCallFACResult: String= null,
outgoingCallCenterPhoneNumber: String= null,
outgoingCallCenterUserId: String= null,
outgoingCallCenterGroupNumber: String= null,
routingNumber: String= null,
preAlertingDuration: String= null,
conferenceId: String= null,
role: String= null, bridge: String= null,
owner: String= null, ownerDN: String= null,
title: String= null,
projectCode: String= null,
recordingDuration: String= null,
transactionId: String= null,
mobilityNumber: String= null,
mobilityRoutingNumber: String= null,
recordingTrigger: String= null,
recordingDestination: String= null,
recordingResult: String= null,
sccCallId: String= null, sccNumber: String=
null,
sccCause: String= null, targetHungGroupId:
String= null,
flexibleSeatingHost: FlexibleSeatingHost =
null) extends TabbedToString
case class CenterxModule(group: String= null, department: String= null,
accountCode: String= null,
authorizationCode: String= null,
cbfAuthorizationCode: String= null,
callingPartyCategory: String= null,
outsideAccessCode: String= null,
originalCalledNumber: String= null,
originalCalledNumberContext: String= null,
originalCalledPresentationIndicator: String= null,
originalCalledReason: String= null,
redirectingNumber: String= null,
redirectingNumberContext: String= null,
redirectingPresentationIndicator: String= null,
redirectingReason: String= null,
trunkGroupName: String= null,
trunkGroupInfo: String= null, chargeNumber:
String= null,
relatedCallId: String= null,
relatedCallIdReason: String= null,
faxMessaging: String= null,
twoStageDiallingDigits: String= null,
recallType: String= null,
originationMethod: String= null,
serviceExtensionList: Array[ServiceExtension],
prepaidStatus: String= null,
configurableCLID: String= null,
virtualOnNetType: String= null,
officeZone: String= null, primaryZone: String=
null,
roamingMscAddress: String= null,
customSchemaVersion: String= null,
locationList: Array[Location],
locationUsage: String= null, cicInsertedAsCac:
String
= null, extTrackingId: String = null) extends
TabbedToString
case class RecordId(eventCounter: String, systemId: String, date:
String, systemTimeZone: String) extends TabbedToString
case class HeaderModule(recordId: RecordId, serviceProvider: String = null,
`type`: String)
extends TabbedToString
case class BasicModule(userNumber: String = null, groupNumber: String =
null,
direction: String= null, asCallType: String= null,
callingNumber: String= null,
callingNumberContext: String= null,
callingPresentationNumber: String= null,
callingPresentationNumberContext: String= null,
callingAssertedNumber: String= null,
callingAssertedNumberContext: String= null,
dialableCallingNumber: String= null,
callingPresentationIndicator:String= null,
dialedDigits: String= null,
dialedDigitsContext: String= null,
calledNumber: String= null,
calledNumberContext: String= null,
networkTranslatedNumber: String= null,
networkTranslatedNumberContext: String= null,
networkTranslatedGroup: String= null,
startTime: String= null, userTimeZone: String= null,
localCallId: String= null, remoteCallId: String=
null,
answerIndicator: String= null,
answerTime: String= null, releaseTime: String= null,
terminationCause: String= null,
q850Cause: String= null,
carrierIdentificationCode: String= null,
callCategory: String= null, networkCallType: String=
null,
chargeIndicator: String= null, typeOfNetwork:
String= null,
releasingParty: String= null,
userId: String= null, otherPartyName: String= null,
otherPartyNamePresentationIndicator: String= null,
clidPermitted: String= null,
receivedCallingNumber: String= null, namePermitted:
String = null) extends TabbedToString
// enriched classes
case class CompletedCDRs(completed: List[RichCDR], incompleted: List[CDR])
case class DonoUnidadeTelefonica(id: Long,
nome: String)
case class CDR(headerModule: HeaderModule, basicModule: BasicModule = null,
centerxModule: CenterxModule = null , ipModule: IpModule =
null,
tgppModule: TgppModule = null,
partialCallBeginModule: PartialCallBeginModule = null,
partialCallEndModule: PartialCallEndModule = null) extends
TabbedToString
2017-05-09 4:54 GMT-03:00 Matthew cao <[email protected]>:
> Hi,
> I have tried simple test like this:
> case class A(id: Long)
> val sample = spark.range(0,10).as[A]
> sample.createOrReplaceTempView("sample")
> val df = spark.emptyDataset[A]
> val df1 = spark.sql("select * from sample").as[A]
> df.union(df1)
>
> It runs ok. And for nullabillity I thought that issue has been fixed:
> https://issues.apache.org/jira/browse/SPARK-18058
> I think you can check your spark version and schema of dataset again? Hope
> this help.
>
> Best,
>
> On 2017年5月9日, at 04:56, Dirceu Semighini Filho <[email protected]>
> wrote:
>
> Ok, great,
> Well I havn't provided a good example of what I'm doing. Let's assume that
> my case class is
> case class A(tons of fields, with sub classes)
>
> val df = sqlContext.sql("select * from a").as[A]
>
> val df2 = spark.emptyDataset[A]
>
> df.union(df2)
>
> This code will throw the exception.
> Is this expected? I assume that when I do as[A] it will convert the schema
> to the case class schema, and it shouldn't throw the exception, or this
> will be done lazy when the union is been processed?
>
>
>
> 2017-05-08 17:50 GMT-03:00 Burak Yavuz <[email protected]>:
>
>> Yes, unfortunately. This should actually be fixed, and the union's schema
>> should have the less restrictive of the DataFrames.
>>
>> On Mon, May 8, 2017 at 12:46 PM, Dirceu Semighini Filho <
>> [email protected]> wrote:
>>
>>> HI Burak,
>>> By nullability you mean that if I have the exactly the same schema, but
>>> one side support null and the other doesn't, this exception (in union
>>> dataset) will be thrown?
>>>
>>>
>>>
>>> 2017-05-08 16:41 GMT-03:00 Burak Yavuz <[email protected]>:
>>>
>>>> I also want to add that generally these may be caused by the
>>>> `nullability` field in the schema.
>>>>
>>>> On Mon, May 8, 2017 at 12:25 PM, Shixiong(Ryan) Zhu <
>>>> [email protected]> wrote:
>>>>
>>>>> This is because RDD.union doesn't check the schema, so you won't see
>>>>> the problem unless you run RDD and hit the incompatible column problem.
>>>>> For
>>>>> RDD, You may not see any error if you don't use the incompatible column.
>>>>>
>>>>> Dataset.union requires compatible schema. You can print ds.schema and
>>>>> ds1.schema and check if they are same.
>>>>>
>>>>> On Mon, May 8, 2017 at 11:07 AM, Dirceu Semighini Filho <
>>>>> [email protected]> wrote:
>>>>>
>>>>>> Hello,
>>>>>> I've a very complex case class structure, with a lot of fields.
>>>>>> When I try to union two datasets of this class, it doesn't work with
>>>>>> the following error :
>>>>>> ds.union(ds1)
>>>>>> Exception in thread "main" org.apache.spark.sql.AnalysisException:
>>>>>> Union can only be performed on tables with the compatible column types
>>>>>>
>>>>>> But when use it's rdd, the union goes right:
>>>>>> ds.rdd.union(ds1.rdd)
>>>>>> res8: org.apache.spark.rdd.RDD[
>>>>>>
>>>>>> Is there any reason for this to happen (besides a bug ;) )
>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>
>