docs.intersystems.com
Home  /  Application Development: Analytics Options  /  Using the InterSystems Spark Connector  /  Spark Connector Best Practices


Using the InterSystems Spark Connector
Spark Connector Best Practices
[Back]  [Next] 
InterSystems: The power behind what matters   
Search:  


The following topics are discussed:
Configuration
Intended topology
  1. The InterSystems IRIS™ shard master I0 and Spark master S0 both run on host machine H0.
  2. A Spark slave s0 also runs on host machine H0.
  3. Furthermore, for each additional distinct host machine H1 ... Hn hosting an InterSystems IRIS shard worker instance I1 ... In , exactly one Spark slave si also runs on Hi
  4. Optionally, for each additional distinct host machine H1 ... Hn hosting an InterSystems IRIS shard worker instance I1 ... In , exactly one HDFS data node di also runs on Hi
  5. The Spark cluster runs in what the Spark documentation refers to as Standalone Mode.
Host
IRIS
Spark
HDFS
H0
I0
S0
D0
   
s0
d0
H1
I1
s1
...
...
...
...
Hn
In
sn
dn
Rationale
While other topologies are certainly possible and will work (in the sense that Spark programs will save and restore data correctly and compute correct results), this specific topology will generally perform best because:
  1. Each Spark slave is collocated with exactly one InterSystems IRIS shard worker, and the connector is able to exploit this fact by requesting that a dataset partition requiring data from shard worker Ii be scheduled for execution on host machine Hi. In other words, the process is executed on the same machine as the data on which it depends. Thus movement of data across the network connecting the machines Hi is greatly reduced.
  2. Each Spark slave creates Spark worker processes that are multithreaded. The Spark master is aware of the number of processing cores available to each worker, and distributes tasks amongst them accordingly (subject to 1. above). Thus there is no advantage in running more than one Spark slave on each host machine.
  3. Not all tables are sharded. Un-sharded tables are stored on the InterSystems IRIS shard master I0. (The partitions of) datasets reading or writing such tables are best scheduled for execution on a Spark slave that is collocated with this instance, hence there should also be a Spark slave s0 running on H0 to service these requests.
  4. When loading large data sets into a cluster, there are significant advantages to installing HDFS on the nodes of the cluster (see below).
Hardware
While more cores, RAM, and disk storage are obviously merrier than less, one generally has a choice between a few powerful host machines, or more, less powerful hosts. A sweet spot exists, however, when the largest tables in the database fit comfortably within the collective RAM of all the shard workers in the cluster, for then the cluster effectively behaves as a scalable cache. This is especially true for large scale analytics of predominantly static or infrequently changing 'data lakes'.
Special consideration should be given to the machine H0 that hosts the InterSystems IRIS shard master. Complex queries, particularly those with aggregates, ORDER BY, TOP, or DISTINCT clauses and the like, generally require a non-trivial reduction of the intermediate results returned by the individual Shard workers, and this computation is staged in temporary tables on the Shard master. For a query that returns a large result set that must be sorted, for example, the reduction step on the shard master (the sort) can easily dominate the rest of the computation.
Also, be sure to take a look at the InterSystems IRIS Platform Scalability Documentation. (InterSystems IRIS Data Platform Scalability Guide GSCALE_preface)
Queries
When designing an application to run in this environment, one frequently has a choice as to where the aggregation will take place. Consider the following Spark queries:
Example 1
> val q1 = Spark.read.iris("SELECT max(i) FROM A")
res1: org.apache.Spark.sql.DataFrame = [max(i): int]
Example 2
> val q2 = Spark.read.iris("A").selectExpr("MAX(i)")
res2: org.apache.Spark.sql.DataFrame = [max(i): int]
> Spark.read.iris("A").createOrReplaceTempTable("A")
Example 3
> val q3 = Spark.sql("SELECT max(i) FROM B")
res3: org.apache.Spark.sql.DataFrame = [max(i): int]
where A is a sharded table with a numeric column named i. All three queries compute the same single value, namely the maximum value of column i, but they each accomplish this task in rather different ways:
Example 2 fetches the entire column i of table A (though, thanks to the connector's ability to automatically prune columns, only column i and not all of A) into the Spark slaves, each of which then computes the maximum value of its portion of the data. These local maximum are then sent to the Spark master, which picks the single greatest value amongst them and returns it to the driver program in a DataFrame.
To see what's going on, it can be helpful to take a look at the output of the DataFrame's explain operator:
> q2.explain
== Physical Plan ==
*HashAggregate(keys=[], functions=[max(i#106)])
+- Exchange SinglePartition
   +- *HashAggregate(keys=[], functions=[partial_max(i#106)])
      +- *Scan CacheRelation(SELECT * FROM (A)) [i#106] ReadSchema: struct<i:int>
Notice:
Example 3 demonstrates the use of Spark's own built-in SQL interpreter. For more complex queries, especially those that are federated across multiple databases or other disparate sources of data, this can be provide a powerful and flexible way of treating the various sources like a single giant database.
In the case, however, beneath the covers exactly the same work is performed as in Example 2, as can be seen with the explain operator.
Example 1 on the other hand, demonstrates the power of using Spark with InterSystems IRIS: now the entire query is performed within the InterSystems IRIS database, and is thus amenable to the InterSystems IRIS query optimization, the use if indexes, and so on:
> q1.explain
== Physical Plan ==
*Scan CacheRelation(select max(i) from B) [Aggregate_1#215] ReadSchema: struct<Aggregate_1:int>
Loading Data
Spark is particularly useful for loading large datasets into sharded InterSystems IRIS clusters.
For best performance and maximum parallelism, you will want to install HDFS on the nodes of the cluster too. In this scenario, each host Hi should also acts as a data node di.
Example: loading a large CSV file into a sharded table.
~>  hadoop fs -put x.csv /x.csv
  # Copy file "X.csv" into HDFS as /x.csv 
And now from within Spark:
scala> val df = Spark.read.options( ... )
                          .csv("hdfs:///host:port/x.csv")  // Read local partitions   
scala> df.write.shard(true).options( ... )
                           .autobalance(false).iris("X")   // Write to local shards
In this example, each Spark slave reads its own locally available portion of the original file, which was split and distributed to the nodes of the cluster automatically by HDFS in step 1, and writes the parsed records to its own co-located shard: thus no data moves across the network once the source file has been moved into HDFS.