docs.intersystems.com
Home  /  Application Development: Analytics Options  /  Using the InterSystems Spark Connector  /  Spark Connector Internals


Using the InterSystems Spark Connector
Spark Connector Internals
[Back]  [Next] 
InterSystems: The power behind what matters   
Search:  


This chapter covers datatype mapping, predicate pushdown operators, and other useful information not otherwise available.
SQL/Spark Datatype Mapping
Internally, the Connector uses the InterSystems JDBC driver to read and write values to and from servers. This constrains the data types that can be serialized in and out of database tables via Spark. The JDBC driver exposes the JDBC data types in the following tables as available projections for InterSystems IRIS™ data types, and converts them to and from the listed Spark Catalyst types (members of the org.apache.spark.sql.types package).
This mapping between Spark Catalyst and JDBC data types differs subtly from that used by the standard Spark jdbc data source, as noted in the following sections.
JDBC / Spark Value Type Conversions
The following JDBC value types are exposed, and are converted directly to and from the listed Spark Catalyst types:
JDBC Value Type Spark Type
BIT BooleanType
TINYINT ByteType
BIGINT LongType
INTEGER IntegerType
SMALLINT ShortType
DOUBLE DoubleType
Note:
JDBC / Spark Object Type Conversions
The following JDBC object types are exposed, and are represented by the listed Spark Catalyst types. Bidirectional conversion is not supported for LONGVARCHAR, GUID, LONGVARBINARY, and TIME because these JDBC types do not correspond to unique Spark Catalyst types:
JDBC to Spark Catalyst projections     Spark Catalyst to JDBC projections
JDBC Object Type Spark Type      Spark Type JDBC Object Type
VARCHAR, GUID, and LONGVARCHAR StringType      StringType VARCHAR
VARBINARY and LONGVARBINARY BinaryType       BinaryType VARBINARY
NUMERIC(p,s) DecimalType(p,s)   DecimalType(p,s) NUMERIC(p,s)
DATE DateType   DateType DATE
TIMESTAMP and TIME TimestampType   TimestampType TIMESTAMP
Note:
See InterSystems SQL Data Types for details on how values are represented within InterSystems IRIS, and DDL Data Types Exposed by InterSystems ODBC / JDBC for specific information on how they are projected to JDBC.
Predicate Pushdown Operators
The Spark Connector currently recognizes the following Spark operators as having direct counterparts within the underlying database (see Overview of Predicates in the SQL Reference):
EqualTo LessThanOrEqual StringStartsWith IsNull And
LessThan GreaterThanOrEqual StringEndsWith IsNotNull Not
GreaterThan StringContains In Or  
Logging
The connector logs various events of interest using the same infrastructure as the Spark system itself uses, namely Log4J.
The content, format, and destination of the system as a whole is configured by the file ${SPARK_HOME}/conf/log4j-defaults. The connector is implemented in classes that reside in a package named com.intersystems.spark and so can easily be configured by specifying keys of the form:
    log4j.logger.com.intersystems.spark                      = INFO
    log4j.logger.com.intersystems.spark.core                 = DEBUG
    log4j.logger.com.intersystems.spark.core.IRISDataSource = ALL
    ...
Known Issues
We hope to address the following issues in a subsequent release:
Pruning Columns with Synthetic Names
Consider the following query Spark session:
scala> spark.read.iris("select a, min(a) from A")
where A is some table that presumably has a column named a.
Notice that no alias is provided for the selection expression min(a). The server synthesizes names for such columns, and in this case might describe the schema for the resulting dataframe as having two columns, named 'a' and 'Aggregate_2' respectively.
No actual field named 'Aggregate_2' exists in the table however, so an attempt to reference it in an enclosing selection would fail:
scala> val df = spark.read.iris("select Aggregate_2 from (select a, min(a) from A)")
> SQL ERROR: No such field SQLUSER.A.Aggregate_2 ....
This is to be expected in a standard SQL implementation.
The connector uses just such enclosing projection expressions as these however when attempting to prune the columns of a dataframe to those that are actually referenced in subsequent code:
scala> spark.read.iris("select a, min(a) from A").select("a")...
internally generates the query select a from(select a, min(a) from A) to be executed on the server in order to minimize the motion of data into the spark cluster.
As a result, the connector cannot efficiently prune columns with synthetic names and instead resorts to fetching the entire result set:
scala> spark.read.iris("select a, min(a) from A").select("Aggregate_2")
internally generates the query select * from(select a, min(a) from A).
For this reason, you should consider modifying the original query by attaching aliases to columns that would otherwise receive server synthesized names.
We hope to address this issue in a subsequent release.
Java 9 Compatibility
Java 9, and the JVM 1.9 on which it runs, became available for general release in September 2017. Neither Apache Spark nor the InterSystems Spark Connector currently run on this version of the JVM. We hope to address this issue in a subsequent release.
Handling of TINYINT
The mapping between Spark Catalyst and JDBC datatypes (see SQL/Spark Datatype Mapping earlier in this chapter) differs subtly from that used by the Spark jdbc data source. The Connector achieves this mapping by automatically installing its own subclass of class org.apache.spark.sql.jdbc.JdbcDialect but this also has the side effect of changing the mapping used by Spark JDBC itself.
By and large this is a good thing, but one problem that has been identified recently is that due to a bug in Spark 2.1.1, which neglects to implement a low level reader function for the ByteType, attempting to read an InterSystems IRIS table with a column of type TINYINT using the Spark jdbc data source will fail once the Connector has been loaded.
For now, it is probably best to avoid reading and writing DataFrames using the Spark jdbc data source directly once the Connector has been loaded. We hope to address this issue in a subsequent release.
JDBC Isolation Levels
The InterSystems IRIS server does not currently support the writing of a dataset to a SQL table using JDBC isolation levels other than NONE and READ_UNCOMITTED. We hope to address this issue in a subsequent release.