Skip to main content
Previous section   Next section

Spark Connector Internals

This chapter covers datatype mapping, predicate pushdown operators, and other useful information not otherwise available.

  • SQL/Spark Datatype Mapping — a table of SQL datatypes and their corresponding Spark datatypes.

  • Predicate Pushdown Operators — a list of Spark operators that the Spark Connector recognizes as having direct counterparts within the underlying database.

  • Logging — the Spark connector, like the Spark system itself, uses Log4J to log events of interest.

  • Known Issues — problems to be aware of.

SQL/Spark Datatype Mapping

Internally, the Connector uses the InterSystems JDBC driver to read and write values to and from servers. This constrains the data types that can be serialized in and out of database tables via Spark. The JDBC driver exposes the JDBC data types in the following tables as available projections for InterSystems IRIS® data types, and converts them to and from the listed Spark Catalyst types (members of the org.apache.spark.sql.types package).

This mapping between Spark Catalyst and JDBC data types differs subtly from that used by the standard Spark jdbc data source, as noted in the following sections.

JDBC / Spark Value Type Conversions

The following JDBC value types are exposed, and are converted directly to and from the listed Spark Catalyst types:

Note:
  • Value types are all qualified as being NOT NULL when saved to a table.

  • Spark Catalyst distinguishes between the different sizes of integer TINYINT, SMALLINT, INTEGER, BIGINT, whereas the Spark jdbc data source does not.

  • Spark Catalyst FloatType is mapped to DOUBLE (and deprecated InterSystems class %Library.Float is projected as type DOUBLE).

JDBC / Spark Object Type Conversions

The following JDBC object types are exposed, and are represented by the listed Spark Catalyst types. Bidirectional conversion is not supported for LONGVARCHAR, GUID, LONGVARBINARY, and TIME because these JDBC types do not correspond to unique Spark Catalyst types:

JDBC to Spark Catalyst projections     Spark Catalyst to JDBC projections
JDBC Object Type Spark Type      Spark Type JDBC Object Type
VARCHAR, GUID, and LONGVARCHAR StringType      StringType VARCHAR
VARBINARY and LONGVARBINARY BinaryType       BinaryType VARBINARY
NUMERIC(p,s) DecimalType(p,s)   DecimalType(p,s) NUMERIC(p,s)
DATE DateType   DateType DATE
TIMESTAMP and TIME TimestampType   TimestampType TIMESTAMP
Note:
  • In NUMERIC and DecimalType the positive integers p and s respectively denote the precision and scale of the numeric representation.

  • There is no Spark SQL encoder currently available for type java.sql.Time, so JDBC TIME is represented as Spark TimestampType.

  • Spark Catalyst (unlike the Spark jdbc data source) recognizes the UNIQUEIDENTIFIER data type, which is not widely supported by all JDBC vendors

  • JDBC type GUID is the JDBC representation of Intersystems SQL datatype UNIQUEIDENTIFIER.

See “InterSystems SQL Data Types” for details on how values are represented within InterSystems IRIS, and “DDL Data Types Exposed by InterSystems ODBC / JDBC” for specific information on how they are projected to JDBC.

Predicate Pushdown Operators

The Spark Connector currently recognizes the following Spark operators as having direct counterparts within the underlying database (see “Overview of Predicates” in the SQL Reference):

EqualTo LessThanOrEqual StringStartsWith IsNull And
LessThan GreaterThanOrEqual StringEndsWith IsNotNull Not
GreaterThan StringContains In Or  

Logging

The connector logs various events of interest using the same infrastructure as the Spark system itself uses, namely Log4J.

The content, format, and destination of the system as a whole is configured by the file ${SPARK_HOME}/conf/log4j-defaults. The connector is implemented in classes that reside in a package named com.intersystems.spark and so can easily be configured by specifying keys of the form:

    log4j.logger.com.intersystems.spark                      = INFO
    log4j.logger.com.intersystems.spark.core                 = DEBUG
    log4j.logger.com.intersystems.spark.core.IRISDataSource = ALL
    ...
Copy code to clipboard

Known Issues

We hope to address the following issues in a subsequent release:

Pruning Columns with Synthetic Names

Consider the following query Spark session:

scala> spark.read.iris("select a, min(a) from A")
Copy code to clipboard

where A is some table that presumably has a column named a.

Notice that no alias is provided for the selection expression min(a). The server synthesizes names for such columns, and in this case might describe the schema for the resulting dataframe as having two columns, named 'a' and 'Aggregate_2' respectively.

No actual field named 'Aggregate_2' exists in the table however, so an attempt to reference it in an enclosing selection would fail:

scala> val df = spark.read.iris("select Aggregate_2 from (select a, min(a) from A)")
> SQL ERROR: No such field SQLUSER.A.Aggregate_2 ....
Copy code to clipboard

This is to be expected in a standard SQL implementation.

The connector uses just such enclosing projection expressions as these however when attempting to prune the columns of a dataframe to those that are actually referenced in subsequent code:

scala> spark.read.iris("select a, min(a) from A").select("a")...
Copy code to clipboard

internally generates the query select a from(select a, min(a) from A) to be executed on the server in order to minimize the motion of data into the spark cluster.

As a result, the connector cannot efficiently prune columns with synthetic names and instead resorts to fetching the entire result set:

scala> spark.read.iris("select a, min(a) from A").select("Aggregate_2")
Copy code to clipboard

internally generates the query select * from(select a, min(a) from A).

For this reason, you should consider modifying the original query by attaching aliases to columns that would otherwise receive server synthesized names.

We hope to address this issue in a subsequent release.

Java 9 Compatibility

Java 9, and the JVM 1.9 on which it runs, became available for general release in September 2017. Neither Apache Spark nor the InterSystems Spark Connector currently run on this version of the JVM. We hope to address this issue in a subsequent release.

Handling of TINYINT

The mapping between Spark Catalyst and JDBC datatypes (see “SQL/Spark Datatype Mapping” earlier in this chapter) differs subtly from that used by the Spark jdbc data source. The Connector achieves this mapping by automatically installing its own subclass of class org.apache.spark.sql.jdbc.JdbcDialect but this also has the side effect of changing the mapping used by Spark JDBC itself.

By and large this is a good thing, but one problem that has been identified recently is that due to a bug in Spark 2.1.1, which neglects to implement a low level reader function for the ByteType, attempting to read an InterSystems IRIS table with a column of type TINYINT using the Spark jdbc data source will fail once the Connector has been loaded.

For now, it is probably best to avoid reading and writing DataFrames using the Spark jdbc data source directly once the Connector has been loaded. We hope to address this issue in a subsequent release.

JDBC Isolation Levels

The InterSystems IRIS server does not currently support the writing of a dataset to a SQL table using JDBC isolation levels other than NONE and READ_UNCOMITTED. We hope to address this issue in a subsequent release.

Previous section   Next section