docs.intersystems.com
Home  /  Application Development: Analytics Options  /  Using the InterSystems Spark Connector  /  Using Spark Connector Extension Methods


Using the InterSystems Spark Connector
Using Spark Connector Extension Methods
[Back]  [Next] 
InterSystems: The power behind what matters   
Search:  


This chapter discusses a set of InterSystems-specific Scala extension methods provided by the Spark Connector. These methods extend the generic Spark interface with implicit Scala classes and types that provide more convenience and better type safety.
The following topics are discussed:
All of the examples in this chapter assume that spark is an instance of SparkSession and df is an instance of DataFrame.
Note:
In this book, the terms jdbc and iris (lower case, in the same typography as other class names) are always data source provider class names, not references to Java JDBC or InterSystems IRIS™. See Data Source Provider Class Names for details.
Using Query and Save Extension Methods
The following topics are discussed:
Using the iris() Save Method
The following example demonstrates a standard way to save an existing DataFrame df to a table named Owls:
df.write.format("iris").option("dbtable","Owls").save()
The Spark connector provides the DataFrameWriter.iris() extension method, which performs the same task in a much simpler manner. The following code is functionally identical to the previous example:
df.write.iris("Owls")
In this example, the dbtable option is replaced by an argument, the format is automatically set to iris, and the call to save() is made internally.
Using the iris() and dataset[T]() Query Methods
The following example demonstrates a standard way to save an existing DataFrame df to a table named Owls, and then query the database to read the table back into new DataFrame df2:
val df2 = spark.read.format("iris").option("dbtable","SELECT * FROM Owls").load()
The Spark connector provides the DataFrameWriter.iris() method to perform the same task in a much simpler manner. The following code is functionally identical to the previous example:
val df2 = spark.read.iris("SELECT * FROM Owls")
In this example, the dbtable option is replaced by an argument, the format is automatically set to iris, and the call to load() is made internally.
Similarly, the SparkSession.dataset[T]() executes a query or loads a table and returns results as a Dataset of the specified type.
Using the rdd[T]() Query Method
SparkContext.rdd[T]() executes a query or loads a table, formats each row of the result set with the provided function, and returns an RDD[T] containing the formatted rows.
Defining a Type Format[T] function
The SparkContext.rdd[T]() method executes a query on the cluster and returns an RDD where each element has been formatted as an instance of the specified type T. Formatting is performed by a user-defined function of type Format[T]. For example, the following code specifies a function pair, which extracts a pair of strings from the first two columns of the current row of a result set:
val pair: Format[(String,String)]  =  r => (r.getString(1),r.getString(2))
This function constructs an RDD[(String,String)] from the results of any query of the cluster that includes at least two strings per record.
Using SparkContext.rdd[T]()
The following example calls rdd[T]() using the previously defined Format[T] function:
val asPair: Format[(Int,Double)] = r => (r.getInt(1),r.getDouble(2))
val newRDD = spark.sparkContext
                  .rdd("myTable",1,asPair)
Note:
Format functions will be invoked for each and every record requested by the client, and therefore should normally restrict themselves to calling only pure (that is, non side effecting) member functions of the result set (getInt, getDouble, getDate and the like).
Using Extension Methods to Set Data Source Options
In addition to generic option support (see Spark Connector Data Source Options), the Spark Connector also provides a number of methods that allow many of these options to be specified as method arguments. These methods can simplify code and improve type safety by providing alternatives to the generic option interface.
The standard way to set data source options is with the generic option interface (see Using Spark Connector Generic Options), but most data source options can also be set with Spark Connector extension methods or method parameters.
Note:
This chapter does not discuss these options in detail. For complete information on data source option settings, see the descriptions in the previous chapter (Spark Connector Data Source Options).
Setting the dbtable Query and Save Options
The dbtable option is specified as the first argument of all query and save extension methods (see Using Query and Save Extension Methods earlier in this chapter), as demonstrated in the following examples
Setting the dbtable query option with query methods
All of the query extension methods (DataFrameReader.iris(), SparkContext.rdd[T](), and SparkSession.dataset[T]()) accept the dbtable (alias query) option as the first argument. For example, the following code fragments specify dbtable as either a table name or an SQL query, and use the DataFrameReader.iris() method to set the format, set the dbtable option, and call load():
  var df = spark.read.iris("mytable")   // Load a table

  var df = spark.read.iris("SELECT * FROM mytable")   // Perform a query
rather than:
var df = spark.read.format("iris").option("dbtable","mytable").load()   // Load a table

var df = spark.read.format("iris").option("query","SELECT * FROM mytable").load()   // Perform a query
See Using the iris() and dataset[T]() Query Methods and Using the rdd[T]() Query Method earlier in this chapter for detailed information about the query methods. For more on the dbtable query option, see the dbtable entry under Standard Save Options in the previous chapter.
Setting the dbtable save option with DataFrameWriter.iris()
When saving with the DataFrameWriter.iris() extension method, the dbtable option is specified as the method parameter. For example, the following code uses DataFrameWriter.iris() to set the format, set the dbtable option, and call save():
spark.write.iris("mytable")
rather than:
spark.write.format("iris")
           .option("dbtable","mytable")   // Save a table
           .save()
See Using the iris() Save Method earlier in this chapter for detailed information about this extension method. For more on the dbtable save option, see the dbtable (or query) entry under Query Options in the previous chapter.
Extension Methods for InterSystems IRIS Save Options
The save options (description, publicrowid, shard, and autobalance) can also be set with DataFrameWriter extension methods. See InterSystems IRIS Save Options in the previous chapter for detailed information on these options.
description()
The DataFrameWriter.description() extension method is an alternate way to set the description option. For example, you can use the following code:
spark.write.description("This is a table.")
rather than:
df.write.format("iris").option("description","This is a table.").save()
See description in the previous chapter for more information on this option.
publicRowID()
The DataFrameWriter.publicRowID() extension method is an alternate way to set the publicrowid option. For example, you can use the following code:
spark.write.publicRowID("false")
rather than:
df.write.format("iris").option("publicrowid","false").save()
See publicrowid in the previous chapter for more information on this option.
shard()
The DataFrameWriter.shard() extension method is an alternate way to set the shard option. For example, you can use the following code:
spark.write.shard("columnA","columnX")
rather than:
df.write.format("iris") .option("shard",Seq("columnA","columnX")).save()
Notice that this method call automatically interprets the arguments as a Seq.
See shard in the previous chapter for more information on this option.
autobalance()
The DataFrameWriter.autobalance() extension method is an alternate way to set the autobalance option. For example, you can use the following code:
spark.write.autobalance("false")
rather than:
df.write.format("iris").option("autobalance","false").save()
See autobalance in the previous chapter for more information on this option.
Setting Connection Options with the address() Method
The url, user, and password options explicitly specify connection string values for a query or save (see Connection Options in the previous chapter for detailed information and examples). These connection options can also be set as parameters of the DataFrameReader.address() and DataFrameWriter.address() extension methods, as demonstrated in the following examples.
Querying with DataFrameReader.address()
The following code uses the DataFrameReader.address() method to set the url, user, and password options, and then uses the DataFrameReader.iris() method to set the format, set the dbtable option, and call load():
var df = spark.read.address("IRIS://localhost:51773/USER", "_system", "SYS").iris("mytable")
rather than:
var df = spark.read.format("iris")
                   .option("dbtable","mytable")
                   .option("url", "IRIS://localhost:51773/USER")
                   .option("user", "_system")
                   .option"password", "SYS")
                   .load()
Saving with DataFrameWriter.address()
The following code uses the DataFrameWriter.address() method to set the url, user, and password options, and then uses the DataFrameWriter.iris() method to set the format, set the dbtable option, and call save():
df.write.address("IRIS://localhost:51773/USER", "_system", "SYS").iris("mytable")
rather than:
df.write.format("iris")
        .option("dbtable","mytable")
        .option("url", "IRIS://localhost:51773/USER")
        .option("user", "_system")
        .option"password", "SYS")
        .save()
Setting Partitioning Options with Query Method Parameters
All of the query extension methods (DataFrameReader.iris(), SparkContext.rdd[T](), and SparkSession.dataset[T]()) accept optional arguments that specify how to partition a query. See Using Query and Save Extension Methods earlier in this chapter for a discussion of query methods. The following examples use the iris() method, but all three methods accept the same set of arguments.
Implicit partitioning with the mfpi parameter
The mfpi option can be specified as a argument when reading with any of the query extension methods. For example, the following call to DataFrameReader.iris() specifies the first argument as the value for dbtable and the second argument as the value for mfpi:
var df = spark.read.iris("SELECT * FROM table",2)
This call is exactly equivalent to the following example:
var df = spark.read.format("iris")
                   .option("dbtable","SELECT * FROM mytable")
                   .option("mfpi", 2)
                   .load()
See Implicit Partitioning in the previous chapter for more information on this option.
Explicit partitioning with the partitionColumn, lowerBound, upperBound, and numPartitions parameters
These options can be specified as arguments when reading with any of the query extension methods. For example, the following call to DataFrameReader.iris() specifies the first argument as the value for dbtable and the remaining arguments as the values for partitionColumn, lowerBound, upperBound, and numPartitions:
var df = spark.read.iris("SELECT * FROM mytable","column",0,10000,2)
This call is exactly equivalent to the following example:
var df = spark.read.format("iris")
                   .option("dbtable","SELECT * FROM mytable")
                   .option("partitionColumn", "columnA")
                   .option("lowerBound", 0)
                   .option("upperBound", 10000)
                   .option("numPartitions", 2)
                   .load()
See Explicit Partitioning in the previous chapter for more information on these options.