Previous section   Next section

Spark Connector Best Practices

This chapter describes the preferred way to set up a Spark cluster on InterSystems IRIS® data platform. The following topics are discussed:
  • Configuration — describes and explains the preferred Spark cluster topology.
  • Queries — provides examples that demonstrate how to optimize query performance.
  • Loading Data — discusses how to optimize loading of large datasets into sharded clusters.
Pre-installed Spark in Containers
The InterSystems Cloud Manager (ICM) provides you with a simple, intuitive way to provision cloud infrastructure and deploy services on it. See the “Using ICM” chapter of the InterSystems Cloud Manager Guide for instructions on how to configure and deploy containers in a topology similar to the one described in this chapter. The “Deploy and Manage Services” section provides specific instructions for using images that include pre-installed instances of Spark.


Intended topology

  1. The InterSystems IRIS shard master I0 and Spark master S0 both run on host machine H0.
  2. A Spark slave s0 also runs on host machine H0.
  3. Furthermore, for each additional distinct host machine H1 ... Hn hosting an InterSystems IRIS shard server instance I1 ... In , exactly one Spark slave si also runs on Hi
  4. Optionally, for each additional distinct host machine H1 ... Hn hosting an InterSystems IRIS shard server instance I1 ... In , exactly one HDFS data node di also runs on Hi
  5. The Spark cluster runs in what the Spark documentation refers to as Standalone Mode.


While other topologies are certainly possible and will work (in the sense that Spark programs will save and restore data correctly and compute correct results), this specific topology will generally perform best because:
  1. Each Spark slave is collocated with exactly one InterSystems IRIS shard server, and the connector is able to exploit this fact by requesting that a dataset partition requiring data from shard server Ii be scheduled for execution on host machine Hi. In other words, the process is executed on the same machine as the data on which it depends. Thus movement of data across the network connecting the machines Hi is greatly reduced.
  2. Each Spark slave creates Spark worker processes that are multithreaded. The Spark master is aware of the number of processing cores available to each worker, and distributes tasks amongst them accordingly (subject to 1. above). Thus there is no advantage in running more than one Spark slave on each host machine.
  3. Not all tables are sharded. Un-sharded tables are stored on the InterSystems IRIS shard master I0. (The partitions of) datasets reading or writing such tables are best scheduled for execution on a Spark slave that is collocated with this instance, hence there should also be a Spark slave s0 running on H0 to service these requests.
  4. When loading large data sets into a cluster, there are significant advantages to installing HDFS on the nodes of the cluster (see below).


While more cores, RAM, and disk storage are obviously merrier than less, one generally has a choice between a few powerful host machines, or more, less powerful hosts. A sweet spot exists, however, when the largest tables in the database fit comfortably within the collective RAM of all the shard servers in the cluster, for then the cluster effectively behaves as a scalable cache. This is especially true for large scale analytics of predominantly static or infrequently changing 'data lakes'.
Special consideration should be given to the machine H0 that hosts the InterSystems IRIS shard master. Complex queries, particularly those with aggregates, ORDER BY, TOP, or DISTINCT clauses and the like, generally require a non-trivial reduction of the intermediate results returned by the individual shard servers, and this computation is staged in temporary tables on the shard master. For a query that returns a large result set that must be sorted, for example, the reduction step on the shard master (the sort) can easily dominate the rest of the computation.


When designing an application to run in this environment, one frequently has a choice as to where the aggregation will take place. Consider the following Spark queries:
Example 1
> val q1 ="SELECT max(i) FROM A")
res1: org.apache.Spark.sql.DataFrame = [max(i): int]
Example 2
> val q2 ="A").selectExpr("MAX(i)")
res2: org.apache.Spark.sql.DataFrame = [max(i): int]
Example 3
> val q3 = Spark.sql("SELECT max(i) FROM B")
res3: org.apache.Spark.sql.DataFrame = [max(i): int]
where A is a sharded table with a numeric column named i. All three queries compute the same single value, namely the maximum value of column i, but they each accomplish this task in rather different ways:
Example 2 fetches the entire column i of table A (though, thanks to the connector's ability to automatically prune columns, only column i and not all of A) into the Spark slaves, each of which then computes the maximum value of its portion of the data. These local maximum are then sent to the Spark master, which picks the single greatest value amongst them and returns it to the driver program in a DataFrame.
To see what's going on, it can be helpful to take a look at the output of the DataFrame's explain operator:
> q2.explain
== Physical Plan ==
*HashAggregate(keys=[], functions=[max(i#106)])
+- Exchange SinglePartition
   +- *HashAggregate(keys=[], functions=[partial_max(i#106)])
      +- *Scan Relation(SELECT * FROM (A)) [i#106] ReadSchema: struct<i:int>
  • the Scan, which corresponds to the parallelized fetch of the data in each of the slaves,
  • the inner HashAggregate, which corresponds to the work that each slave will perform in parallel in locating its local maximum, and
  • the Exchange and subsequent HashAggregate, which corresponds to the transmission of the local maxima back to the master and subsequent selection of the global maximum.
Example 3 demonstrates the use of Spark's own built-in SQL interpreter. For more complex queries, especially those that are federated across multiple databases or other disparate sources of data, this can be provide a powerful and flexible way of treating the various sources like a single giant database.
In the case, however, beneath the covers exactly the same work is performed as in Example 2, as can be seen with the explain operator.
Example 1 on the other hand, demonstrates the power of using Spark with InterSystems IRIS: now the entire query is performed within the InterSystems IRIS database, and is thus amenable to the InterSystems IRIS query optimization, the use if indexes, and so on:
> q1.explain
== Physical Plan ==
*Scan Relation(select max(i) from B) [Aggregate_1#215] ReadSchema: struct<Aggregate_1:int>

Loading Data

Spark is particularly useful for loading large datasets into sharded InterSystems IRIS clusters.
For best performance and maximum parallelism, you will want to install HDFS on the nodes of the cluster too. In this scenario, each host Hi should also acts as a data node di.
Example: loading a large CSV file into a sharded table.
~>  hadoop fs -put x.csv /x.csv
  # Copy file "X.csv" into HDFS as /x.csv 
And now from within Spark:
scala> val df = ... )
                          .csv("hdfs:///host:port/x.csv")  // Read local partitions
scala> df.write.shard(true).options( ... )
                           .autobalance(false).iris("X")   // Write to local shards
In this example, each Spark slave reads its own locally available portion of the original file, which was split and distributed to the nodes of the cluster automatically by HDFS in step 1, and writes the parsed records to its own co-located shard: thus no data moves across the network once the source file has been moved into HDFS.
Previous section   Next section