docs.intersystems.com
Home  /  Application Development: Analytics Options  /  Using the InterSystems Spark Connector  /  Spark Connector Data Source Options


Using the InterSystems Spark Connector
Spark Connector Data Source Options
[Back]  [Next] 
InterSystems: The power behind what matters   
Search:  


The Spark Connector iris data source and the standard Spark jdbc data source both use the generic option interface, which sets options by passing option keys and values to the option() or options() methods. The iris data source supports the same set of options as the Spark jdbc data source, and also includes several options implemented specifically for the InterSystems IRIS™ SQL dialect.
The following sections provide detailed information on all supported iris data source options:
In all of the following examples, spark is an instance of SparkSession, and df is an instance of DataFrame.
Note:
In this book, the terms jdbc and iris (lower case, in the same typography as other class names) are always data source provider class names, not references to Java JDBC or InterSystems IRIS. See Data Source Provider Class Names for details.
Using Spark Connector Generic Options
Both the iris data source and the jdbc data source provide a generic option interface to set data source options. For example, the following code uses the generic Spark jdbc data source with the InterSystems JDBC driver to load a table from the InterSystems IRIS database:
val df = spark.read.format("jdbc")
                   .option("driver","com.intersystems.jdbc.IRISDriver")
                   .option("dbtable","tablename")
                   .option("url","IRIS://localhost:51773/USER")
                   .option("user","_system")
                   .option("password","SYS")
                   .load()
Most options supported by the jdbc data source can also be used in exactly the same way with the iris data source . However, the same table loading code can be much simpler when using the Spark Connector iris data source because most of the corresponding iris options have defaults (see Default Configuration Settings for details). For example:
import com.intersystems.spark._

val df = spark.read.format("iris")
                   .option("dbtable","tablename")
                   .load()
The most important difference in this example is that the format() call specifies the Spark Connector’s iris data source provider class, rather than the generic jdbc provider class. We omit the driver option because the Spark Connector always uses the built-in InterSystems JDBC driver. The url, user, and password options can also be omitted because iris can use default values obtained from Spark configuration settings (see Connection Options).
Query Options
The Spark jdbc format and the iris format can both use dbtable and fetchsize when loading a table or performing a query.
dbtable (or query)
The Spark jdbc format and the iris format both use dbtable to specify a table name or SQL query. Any string that would be valid in a FROM clause can be used as a dbtable value. The dbtable key is also a valid write option (see Standard Save Options).
The iris query key is just a synonym for dbtable. This optional key can improve code readability by distinguishing a SQL query from a table name. For example:
var df = spark.read.format("iris")
                   .option("dbtable","mytable")   // Load a table
                   .load()

var df = spark.read.format("iris")
                   .option("query","SELECT * FROM mytable")   // Perform a query
                   .load()
The Spark Connector also offers several alternative query methods that allow this option to be set as a method parameter (see Using Query and Save Extension Methods).
fetchsize
The Spark jdbc format and the iris format both use fetchsize to specify the number of rows to fetch per server round trip. Defaults to 1000. This option is applied when loading a table or performing a query, and will be ignored by write operations.
var df = spark.read.format("iris").option("dbtable","mytable")
                   .option("fetchsize",500)
                   .load()
Standard Save Options
The Spark jdbc format and the iris format can both use dbtable, mode, batchsize, and isolationlevel when saving a table.
dbtable
The Spark jdbc format and the iris format both use dbtable to specify a table name when saving. This key is also a valid read option (see Query Options).
spark.write.format("iris")
           .option("dbtable","mytable")   // Save a table
           .save()
This option can also be specified as a parameter of the DataFrameReader.iris() extension method (see Using Extension Methods to Set Data Source Options).
mode
Describes how to behave if the target table already exists. Can be one of OVERWRITE, APPEND, IGNORE, or ERROR, and defaults to ERROR. These settings correspond to the values defined in the SaveMode enum (see org.apache.spark.sql.SaveMode for more information). This option is applied when saving a table, and will be ignored by read operations.
df.write.format("iris")
        .option("mode","OVERWRITE")   //overwrite any existing table
        .save()
This option can also be set by the standard DataFrameWriter.mode() method.
batchsize
Specifies an integer number of rows to insert per server round trip. Defaults to 1000. This option is applied when saving a table, and will be ignored by read operations.
df.write.format("iris")
        .option("batchsize",500)
        .save()
isolationlevel
Sets the JDBC transaction isolation level. Can READ_UNCOMMITTED (the default) or NONE. These values correspond to the transaction isolation levels defined in java.sql.Connection. This option is applied when saving a table, and will be ignored by read operations.
df.write.format("iris")
        .option("isolationlevel","READ_COMMITTED")
        .save()
InterSystems IRIS Save Options
The description, publicrowid, shard, and autobalance options are supported only by the iris data source, and are used to set certain options unique to InterSystems IRIS. These options are applied when saving a table, and will be ignored by read operations.
description
Specifies an optional description to document the newly created table. Will be ignored when appending to a table that already exists. Defaults to "".
df.write.format("iris")
        .option("description","This is a table of no importance whatsoever.")
        .save()
See CREATE TABLE in the InterSystems SQL Reference for more information on this option.
This option can also be set with the DataFrameReader.description() extension method (see Setting Save Options with Extension Methods).
publicrowid
Specifies whether the master row ID column for the newly created table is to be made publicly visible. Will be ignored when appending to a table that already exists. Defaults to "false".
df.write.format("iris")
        .option("publicrowid","false")
        .save()
See CREATE TABLE in the InterSystems SQL Reference for more information on this option.
This option can also be set with the DataFrameReader.publicRowID() extension method (see Setting Save Options with Extension Methods).
shard
Determines sharding for the newly created table. Will be ignored when appending to a table that already exists. The value can be either a boolean indicating whether the newly created table is to be sharded, or a sequence of field name strings (possibly empty) to be used as the user defined shard key.
This option can also be set with the DataFrameReader.shard() extension method (see Setting Save Options with Extension Methods).
autobalance
When writing a dataset to a table that is sharded on a system assigned shard key, an autobalance value of "true" (the default) specifies that the inserted records are to be evenly distributed amongst the available shards. Value "false" specifies that every dataset partition should attempt to write its records directly into the shard on which its Spark executor also runs.
df.write.format("iris")
        .option("autobalance","false") // disable autobalancing)
        .save()
In a properly configured cluster, a Spark slave runs on each shard server. Writing a dataset with the autobalance option disabled can be faster, since records no longer need to travel across the network to reach their destination shard. However, it now becomes the Spark application's responsibility to ensure that roughly the same number of records is written to each shard.
Will be ignored when writing to a table that is not sharded, or when writing to a table sharded on a user defined shard key.
This option can also be set with the DataFrameReader.autobalance() extension method (see Setting Save Options with Extension Methods).
Connection Options
The url, user, and password options explicitly specify connection string values for a read or write. If these options are not set, they will be automatically defined using information from the default InterSystems IRIS master instance specified in the SparkConf configuration (see Default Configuration Settings). When set, these options override the default for the duration of the current read or write operation.
url, user, and password
The value of url is a string of the form "IRIS://[host]:[port]/[namespace]" that names the master InterSystems IRIS instance with which the data is to be loaded or saved. The values of user and password are strings containing the account and password required to make the connection to url.
For example, the following call specifies the connection required to read a table named mytable in the USER namespace:
var df = spark.read.format("iris").option("dbtable","mytable")
              .options(Map("url" -> "IRIS://localhost:51773/USER",
                           "user" -> "_system",
                           "password" -> "SYS"))
              .load()
For more information on constructing a URL string, see Defining a JDBC Connection URL in Using InterSystems IRIS with JDBC.
These options can also be set with the DataFrameReader.address() extension method (see Setting Connection Options with the address() Method).
Note:
The standard Spark jdbc format also offers the driver option, which specifies the class name of the JDBC driver to use. The Spark Connector iris format ignores this option (if specified) because it always uses the InterSystems JDBC driver, which is embedded within the Connector itself.
Partition Tuning Options
These options determine how a read operation will be partitioned. When reading data, the Connector attempts to negotiate with the server as to how best to partition the resulting Dataset. Depending on how the cluster is configured, each partition can potentially run in parallel within its own Spark executor and establish its own independent connection into the shard from which it draws its data.
Partitioning requests can be either implicit or explicit:
The Spark Connector also offers several extension methods that allow these options to be set as method parameters (see Setting Partitioning Options with Query Method Parameters).
Implicit Partitioning
The mfpi option (maximum number of partitions per instance) specifies an upper limit to the number of partitions that the server will create per queried instance in any implicit query factorization. The default is "1".
This option is applied when reading, and will be ignored by write operations. It is also ignored if a value has been specified for numPartitions, which takes precedence over mfpi (see Explicit partitioning).
Implicit partitioning with "mfpi"
Using the mfpi option, you could execute a query with the following statement:
var df = spark.read.format("iris").option("dbtable","SELECT * FROM mytable")
                   .option("mfpi", 2)
                   .load()
Explicit Partitioning
The following options provide an explicit description of how to partition the queries sent to each distinct instance:
These options correspond exactly to the similarly named arguments for the Apache Spark DataFrameReader.jdbc() method. See the documentation on that method for more information.
Explicit partitioning options take precedence over the mfpi option (described in the previous section). They are applied when reading, and will be ignored by write operations.
Explicit partitioning with partitionColumn, lowerBound, upperBound, and numPartitions
Using these options, you could execute a query by calling:
var df = spark.read.format("iris").option("dbtable","SELECT * FROM mytable")
                   .option("partitionColumn", "columnA")
                   .option("lowerBound", 0)
                   .option("upperBound", 10000)
                   .option("numPartitions", 2)
                   .load()