Java Persister Examples
This section provides full listing for several working Persister programs:
-
Hello Persister — a quick demonstration of the basic Persister workflow.
-
Continents - load data from a local array
-
DivvyTrip - Chicago Bicycle Sharing CSV file (small - 1.5 million records)
-
ThreadLoader — load a large data set- 20 million records
The Persister, like all InterSystems Java drivers, connects to the database with a standard InterSystems JDBC IRISConnection object (see Using IRISDataSource to Connect in Using Java with InterSystems Software). IRISDataSource is the recommended way to create a the connection because a DataSource is required to use the Persister’s multi-threaded loader.
Hello Persister
This very short Persister application demonstrates all of the basic Persister mechanisms from schema creation to data serialization, using the three main Persister classes: SchemaBuilder creates a schema, SchemaManager synchronizes the schema to the Schema Registry on the server, and Persister serializes and stores records in the extent specified by the Registry.
package com.intersystems.demo;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.intersystems.jdbc.*;
import com.intersystems.persister.Persister;
import com.intersystems.persister.SchemaBuilder;
import com.intersystems.persister.SchemaManager;
import com.intersystems.persister.ArrayRecord;
import com.intersystems.persister.schemas.RecordSchema;
import java.util.Arrays;
import java.util.stream.*;
import java.sql.*;
public class Hello {
public static void main(String[] args) throws SQLException, JsonProcessingException {
IRISDataSource dataSource = new IRISDataSource();
dataSource.setURL("jdbc:IRIS://127.0.0.1:1972/USER");
IRISConnection irisConn = (IRISConnection) dataSource.getConnection("_SYSTEM","SYS");
// Create a data stream and use SchemaBuilder to infer a schema
String[][] data = new String[][]{{"Hello"},{"Bonjour"},{"Guten Tag"}};
Stream<Object[]> stream = Arrays.stream(data);
String schemaJson = SchemaBuilder.infer(data[0],"Demo.Hello",new String[]{"greeting"});
// Create a SchemaManager and synchronize the schema to the database
SchemaManager mgr = new SchemaManager(irisConn);
RecordSchema schemaRec = mgr.synchronizeSchema(schemaJson);
// Create a persister, passing it the manager and the schema record
Persister persister = Persister.createPersister(mgr, schemaRec, Persister.INDEX_MODE_DEFERRED);
persister.deleteExtent(); // delete old test data
// Pass the stream to the persister and insert each item into the database
stream.map(d -> new ArrayRecord(d, schemaRec)).forEach(persister::insert);
// Use standard SQL calls to display the persisted data
ResultSet rs = irisConn.createStatement().executeQuery("SELECT %ID, * FROM Demo.Hello");
while (rs.next()) { System.out.printf("\n Greeting: %s", rs.getString("greeting"));}
}
}
The call to SchemaBuilder.infer() produces the following JSON schema string:
{"type":"record","name":"Hello","namespace":"Demo","category":"persistent",
"fields":[{"name":"greeting","type":"string"}]}
The final SQL printf statement produces the following output:
Greeting: Hello
Greeting: Bonjour
Greeting: Guten Tag
Continents - Load Data from a Local Array
Example of loading a local array of strings. Each string is delimited. This example uses CsvRecord to model the source data.
package com.intersystems.demo;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.intersystems.jdbc.*;
import com.intersystems.persister.*;
import com.intersystems.persister.schemas.*;
import java.util.Arrays;
import java.util.stream.*;
import java.sql.*;
public class Continent {
public static void main(String[] args) throws SQLException, JsonProcessingException {
IRISDataSource dataSource = new IRISDataSource();
dataSource.setURL("jdbc:IRIS://127.0.0.1:1972/USER");
IRISConnection irisConn = (IRISConnection) dataSource.getConnection("_SYSTEM","SYS");
// create a set of delimited strings (instead of reading a CVS file)
String[] continents = new String[]{
"NA,North America", "SA,South America", "AF,Africa", "AS,Asia",
"EU,Europe", "OC,Oceana", "AT,Atlantis", "AN,Antarctica"
};
Stream<String> dataStream = Arrays.stream(continents);
// Create a schema
String schemaSource = SchemaBuilder.record()
.withName("Demo.Continent")
.addField("code", "string")
.addField("name", "string")
.complete();
System.out.println("\nschemaSource:\n" + schemaSource + "\n");
// Parse and synchronise the schema
SchemaManager schemaManager = new SchemaManager(irisConn);
RecordSchema schemaRecord = (RecordSchema) schemaManager.parseSchema(schemaSource);
RecordSchema continentsSchema = schemaManager.synchronizeSchema(schemaRecord);
System.out.println("\ncontinentsSchema:\n" + continentsSchema.toJson() + "\n");
// Prepare the User.Continent extent and parse the data stream to a buffer
Persister persister = Persister.createPersister(schemaManager, continentsSchema, Persister.INDEX_MODE_DEFERRED);
persister.deleteExtent();
dataStream.map(CsvRecord.getParser(",", schemaRecord)).forEach(record -> persister.add(record));
// Flush buffer to the database and print buffer statistics
persister.flush();
System.out.println("REPORT STATISTICS");
persister.reportStatistics();
// Query the new data
Statement query = irisConn.createStatement();
java.sql.ResultSet rs = query.executeQuery("select code, name from Demo.Continent order by name");
int colnum = rs.getMetaData().getColumnCount();
while (rs.next()) {
for (int i=1; i<=colnum; i++) {
System.out.print(rs.getString(i) + " ");
}
System.out.println();
}
}
}
DivvyTrip - CSV bicycle sharing dataset (small - 1.5 million records)
The City of Chicago’s Divvy bicycle sharing data sets are publicly available. This example shows how a local file containing data in CSV format can be loaded using streams and Persister. The schema used here is generated from an existing InterSystems IRIS class (RowDB.DivvyTrip) created by a series of DDL statements (see listing in TABLE DivvyTrip). The data set used in this example concatenates several months of Divvy data. To simplify the example, the data was preprocessed to eliminate rows that contain null entries.
package com.intersystems.demo;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.intersystems.jdbc.*;
import com.intersystems.persister.*;
import com.intersystems.persister.schemas.*;
import java.util.Arrays;
import java.util.stream.*;
import java.util.function.Function;
import java.sql.*;
import java.io.FileReader;
import java.io.BufferedReader;
public class DivvyTrip {
public static void main(String[] args) throws SQLException, JsonProcessingException {
IRISDataSource dataSource = new IRISDataSource();
dataSource.setURL("jdbc:IRIS://127.0.0.1:1972/USER");
IRISConnection irisConn = (IRISConnection) dataSource.getConnection("_SYSTEM","SYS");
SchemaManager divvyManager = new SchemaManager(irisConn);
// Generate a schema from an existing ObjectScript class
RecordSchema divvySchema = divvyManager.getSchemaForClass("RowDB.DivvyTrip");
System.out.println("\ndivvySchema:\n" + divvySchema + "\n");
System.out.println();
// Create a Persister and delete any old test data
Persister persister = Persister.createPersister(divvyManager, divvySchema, Persister.INDEX_MODE_DEFERRED);
persister.deleteExtent();
// Get a data stream from a CSV file
Stream<String> dataStream = Stream.empty();
try {
dataStream = new BufferedReader(new FileReader("../divvy-tripdata.csv")).lines();
} catch (Exception ignore) {}
System.out.println("\n STARTING TO PERSIST STREAM\n");
// Create a CSV parser, then parse and add each record to the database.
Function<String, CsvRecord> parser = CsvRecord.getParser(",", divvySchema);
persister.resetStatistics();
try {
dataStream.skip(1)
.map(parser)
.forEach(record -> persister.add(record));
} catch (Exception e) {
e.printStackTrace();
}
// Flush the buffer to the database, report buffer statistics after indexing has finished
persister.flush();
persister.waitForIndexing();
persister.reportStatistics();
// Query the new data and print first 10 records
Statement query = irisConn.createStatement();
java.sql.ResultSet rs = query.executeQuery(
"select top 10 * from RowDB.DivvyTrip order by started_at");
int colnum = rs.getMetaData().getColumnCount();
while (rs.next()) {
for (int i=1; i<=colnum; i++) {
System.out.print(rs.getString(i) + " ");
}
System.out.println();
}
}
The RowDB.DivvyTrip class is created by a series of DDL statements. To create a fresh table, open the Terminal and paste the following lines exactly as shown. (The first command may be changed if you want to use a namespace other than USER. The single empty line after DROP TABLE is required to enter SQL.Shell() multi-line mode):
zn "USER"
DO $SYSTEM.SQL.Shell()
DROP TABLE RowDB.DivvyTrip
CREATE TABLE RowDB.DivvyTrip (
ride_id VARCHAR(16),
rideable_type VARCHAR(11),
started_at TIMESTAMP,
ended_at TIMESTAMP,
start_station_name VARCHAR(50),
start_station_id VARCHAR(4),
end_station_name VARCHAR(50),
end_station_id VARCHAR(4),
start_lat DOUBLE,
start_lng DOUBLE,
end_lat DOUBLE,
end_lng DOUBLE,
member_casual VARCHAR(10) )
GO
CREATE BITMAP INDEX StartTimeIndex ON RowDB.DivvyTrip (started_at)
CREATE BITMAP INDEX EndTimeIndex ON RowDB.DivvyTrip (ended_at)
CREATE BITMAP INDEX StartStationIndex ON RowDB.DivvyTrip (start_station_id)
CREATE BITMAP INDEX EndStationIndex ON RowDB.DivvyTrip (end_station_id)
CREATE BITMAP INDEX RideIDIndex ON RowDB.DivvyTrip (ride_id)
TUNE TABLE RowDB.DivvyTrip
q
ThreadLoader (large - 20 million records)
This example uses generated data stored in an Object array, multiple threads, local buffers and a BufferWriter. See Serializing Data with Persister for more information on threading, buffering, and buffer statistics.
package com.intersystems.demo;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.intersystems.jdbc.*;
import com.intersystems.persister.*;
import com.intersystems.persister.schemas.*;
import java.sql.*;
import java.util.concurrent.LinkedBlockingQueue;
public class ThreadLoader {
public static void main(String[] args) throws SQLException, JsonProcessingException {
try {
//========================================================================
// Initialize Persister, SchemaManager, statistics, and buffer queue
//========================================================================
System.out.println("ThreadLoader - write arrays of generated data " +
"to localBuffer, BufferWriter writes to server");
IRISDataSource dataSource = new IRISDataSource();
dataSource.setURL("jdbc:IRIS://127.0.0.1:1972/USER");
IRISConnection irisConn = (IRISConnection) dataSource.getConnection("_SYSTEM","SYS");
SchemaManager schemaManager = new SchemaManager(irisConn);
try {
schemaManager.deleteIrisSchema("Demo.ThreadLoader");
} catch (Exception ignore) {
}
RecordSchema sourceType = schemaManager.synchronizeSchema(SchemaBuilder.record()
.withName("Demo.ThreadLoader")
.addField("ID", "int")
.addField("FirstName", "string")
.addField("LastName", "string")
.addField("aBool", "boolean")
.addField("along", "long")
.addField("afloat", "float")
.addField("adouble", "double")
.addField("abytes", "bytes")
.complete());
Persister persister = Persister.createPersister(schemaManager, sourceType,
Persister.INDEX_MODE_DEFERRED, 32_000);
persister.deleteExtent();
persister.flush();
System.out.println("\nStarting load, resetting statistics");
persister.resetStatistics();
PersisterStatistics statistics = new PersisterStatistics();
statistics.setStartTime((new Date(System.currentTimeMillis())).getTime());
// Create queue to hold local buffers
LinkedBlockingQueue<BufferWrite> bufferQueue =
new LinkedBlockingQueue<BufferWrite>(200_000);
BufferWriter bufferWriter = BufferWriter.startBufferWriter(bufferQueue, dataSource);
// Set loader constants
int producerThreadCount = 2;
int objectCount = 24_000_000;
int chunkSize = 10_000;
int chunks = objectCount / chunkSize / producerThreadCount;
System.out.format("Loading %d objects using %d producer threads, %d chunks of " +
"%d objects.%n", objectCount, producerThreadCount, chunks, chunkSize);
//========================================================================
// Generate Runnable producer (provides function to be passed to threads)
//========================================================================
Runnable producer = new Runnable() {
@Override
public void run() { // create separate local buffer for each thread
PersisterBuffer localBuffer = persister.createLocalBuffer(bufferQueue);
for (int i = 0; i < chunks; i++) { // generate some test data
Object[][] data = new Object[chunkSize][8];
for (int j = 0; j < chunkSize; j++) {
data[j][0] = i * chunkSize + j;
data[j][1] =
"1234567890123456789012345678901234567890123456789012345678901234567890";
data[j][2] = "Smith-Jones";
data[j][3] = true;
data[j][4] = 922285477L;
data[j][5] = 767876231.123F;
data[j][6] = 230.134;
data[j][7] = 233;
}
persister.insert(data, localBuffer);
}
persister.flush(localBuffer);
statistics.addThreadStat(persister.getStatistics(localBuffer));
}
};
//========================================================================
// Call producer.run() from each thread and buffer results
//========================================================================
Thread[] producerThreads = new Thread[producerThreadCount];
for (int tp = 0; tp < producerThreadCount; tp++) {
producerThreads[tp] = new Thread(producer); // pass producer.run() to thread
producerThreads[tp].start();
}
for (int tp = 0; tp < producerThreadCount; tp++) {
producerThreads[tp].join();
}
System.out.println("Stopping the BufferWriter");
bufferWriter.stopBufferWriter();
statistics.setStopTime((new Date(System.currentTimeMillis())).getTime());
statistics.reportStatistics();
} catch (Exception e) {
e.printStackTrace();
}
}
This code produces output similar to the following:
Demo.ThreadLoader - write arrays of generated data to localBuffer, BufferWriter writes to server
Starting load, resetting statistics
Loading 24000000 objects using 2 producer threads, 1200 chunks of 10000 objects.
Stopping the BufferWriter
loaded 24,000,000 objects, using 2 threads in 37.51 seconds, 639,863 obj/sec, 76.956749 MB/sec
load() executed on 2 threads...
Elapsed time (seconds) = 37.51
Number of objects stored = 24,000,000
Store rate (obj/sec) = 639,863
Total bytes written = 3,026,714,094
Total buffers written = 94,856
MB per second = 76.96
Avg object size = 126