Skip to main content
Previous sectionNext section

Spark Connector Best Practices

This chapter describes the preferred way to set up a Spark cluster on the InterSystems IRIS® data platform. The following topics are discussed:

  • Configuration — describes and explains the preferred Spark cluster topology.

  • Queries — provides examples that demonstrate how to optimize query performance.

  • Loading Data — discusses how to optimize loading of large datasets into sharded clusters.

Note:
Pre-installed Spark in Containers

The InterSystems Cloud Manager (ICM) provides you with a simple, intuitive way to provision cloud infrastructure and deploy services on it. See the “Using ICM” chapter of the InterSystems Cloud Manager Guide for instructions on how to configure and deploy containers in a topology similar to the one described in this chapter. The “Deploy and Manage Services” section provides specific instructions for using images that include pre-installed instances of Spark.

Configuration

An InterSystems IRIS database can be sharded, meaning that tables that are transparently partitioned across multiple servers running on different computers. The Connector allocates compute tasks so that each Spark executor is collocated with the server from which it draws its data. This reduces the movement of data across the network, and more importantly, allows the Spark program's performance to scale linearly with the number of shards (and therefore the size of the data set) on which it operates.

The optimum topology is as follows:

  • The Spark cluster runs in what the Spark documentation refers to as Standalone Mode.

  • The InterSystems IRIS shard master I0 and Spark master S0 both run on host machine H0.

  • A Spark worker w0 also runs on host machine H0.

  • Each additional host machine Hi hosting an InterSystems IRIS shard server instance Ii runs exactly one Spark worker wi.

  • Optionally, each host machine Hi hosting an InterSystems IRIS shard server instance Ii also runs exactly one HDFS data node di.

Host machines
IRIS shard master/servers
Spark master/workers
HDFS data nodes
H0
I0 (shard master)
S0 (Spark master)
D0
   
w0 (Spark worker)
d0
H1...Hn
I1...In (shard servers)
w1...wn (Spark workers)
d1...dn

While other topologies are certainly possible and will produce correct results, this specific topology will generally perform best for the following reasons:

  • Each Spark worker is collocated with exactly one InterSystems IRIS shard server, allowing the Connector to ensure that each process is executed on the same machine as the data on which it depends. This significantly reduces movement of data across the network, and allows performance to scale linearly with the number of shards.

  • Each Spark worker creates Spark executor processes running in parallel. The Spark master is aware of the number of processing cores available, and distributes tasks to executors accordingly. There is no advantage in running more than one Spark worker on each host machine.

  • Any tables that are not sharded will be stored on InterSystems IRIS shard master I0. A Spark worker w0 should be present on H0, allowing the processes that use these tables to be collocated with the data.

  • When loading large data sets into a cluster, there are significant advantages to installing HDFS on the nodes of the cluster, running one HDFS data node di on each host machine Hi.

Hardware

Hardware configuration generally involves a trade-off between a few powerful host machines and a larger number of less powerful hosts. Given this choice, the optimum configuration will ensure that the largest tables in the database fit comfortably within the collective RAM of all the shard servers in the cluster. In this configuration, the cluster effectively behaves as a scalable cache. This is especially true for large scale analytics of predominantly static or infrequently changing 'data lakes'.

Special consideration should be given to the machine H0 that hosts the InterSystems IRIS shard master. Complex queries, particularly those with aggregates, ORDER BY, TOP, or DISTINCT clauses and the like, generally require a non-trivial reduction of the intermediate results returned by the individual shard servers. This computation is staged in temporary tables on the shard master. For a query that returns a large result set that must be sorted, for example, the reduction step on the shard master (the sort) can easily dominate the rest of the computation.

Queries

When designing an application to run in this environment, there is frequently a choice as to where the aggregation will take place. Consider the following Spark queries, where A is a sharded table with a numeric column named i. All three queries compute the same single value, namely the maximum value of column i, but each one accomplishes this task a different way:

Example 1 — parallelized fetch and aggregate
> val q1 = Spark.read.iris("A").selectExpr("MAX(i)")
res1: org.apache.Spark.sql.DataFrame = [max(i): int]
> Spark.read.iris("A").createOrReplaceTempTable("A")
Copy code to clipboard
Example 2 — using Spark SQL interpreter
> val q2 = Spark.sql("SELECT max(i) FROM A")
res2: org.apache.Spark.sql.DataFrame = [max(i): int]
Copy code to clipboard
Example 3 — using InterSystems IRIS query optimization
> val q3 = Spark.read.iris("SELECT max(i) FROM A")
res3: org.apache.Spark.sql.DataFrame = [max(i): int]
Copy code to clipboard

Example 1 fetches the entire column i of table A (and only column i, not all of A, thanks to the Connector's ability to automatically prune columns) into the Spark workers. Each worker then computes the maximum value for its portion of the data. These local maximum values are then sent to the Spark master, which picks the single greatest value amongst them and returns it to the driver program in a DataFrame.

To see what's going on, it can be helpful to take a look at the output of the DataFrame's explain operator:

> q2.explain
== Physical Plan ==
*HashAggregate(keys=[], functions=[max(i#106)])
+- Exchange SinglePartition
   +- *HashAggregate(keys=[], functions=[partial_max(i#106)])
      +- *Scan Relation(SELECT * FROM (A)) [i#106] ReadSchema: struct<i:int>
Copy code to clipboard

Notice:

  • the Scan, which corresponds to the parallelized fetch of the data in each of the workers,

  • the inner HashAggregate, which corresponds to the work that each worker will perform in parallel in locating its local maximum, and

  • the Exchange and subsequent HashAggregate, which corresponds to the transmission of the local maximum values back to the master and subsequent selection of the global maximum.

Example 2 demonstrates the use of Spark's own built-in SQL interpreter. For more complex queries, especially those that are federated across multiple databases or other disparate sources of data, this can be provide a powerful and flexible way of treating the various sources like a single giant database.

However, beneath the covers exactly the same work is performed as in Example 1, as can be seen with the explain operator.

Example 3 on the other hand, demonstrates the power of using Spark with InterSystems IRIS: now the entire query is performed within the InterSystems IRIS database, and is thus amenable to the InterSystems IRIS query optimization, the use if indexes, and so on:

> q1.explain
== Physical Plan ==
*Scan Relation(select max(i) from B) [Aggregate_1#215] ReadSchema: struct<Aggregate_1:int>
Copy code to clipboard

Loading Data

Spark is particularly useful for loading large datasets into sharded InterSystems IRIS clusters.

For best performance and maximum parallelism, you will want to install HDFS on the nodes of the cluster too. In this scenario, each host Hi should also acts as a data node di.

Example: loading a large CSV file into a sharded table.

~>  hadoop fs -put x.csv /x.csv
  # Copy file "X.csv" into HDFS as /x.csv 
Copy code to clipboard

And now from within Spark:

scala> val df = Spark.read.options( ... )
                          .csv("hdfs:///host:port/x.csv")  // Read local partitions
scala> df.write.shard(true).options( ... )
                           .autobalance(false).iris("X")   // Write to local shards
Copy code to clipboard

In this example, each Spark worker reads its own locally available portion of the original file, which was split and distributed to the nodes of the cluster automatically by HDFS in step 1, and writes the parsed records to its own collocated shard. No data moves across the network once the source file has been moved into HDFS.