Skip to main content
InterSystems IRIS Data Platform 2025.1
AskMe (beta)
Loading icon

Serializing Data with Persister

Persister handles all aspects of reading, serializing, buffering, and writing data. Each instance of Persister is bound to one specific schema and its associated database extent. Persisters are thread-safe, and allow precise control and monitoring of buffers and buffer queues.

  • A multi-threaded loader can be used to ingest large data sets. The loader consumes a data stream, serializing each record and writing each serialized record to a pool of output buffers.

  • Each buffer maintains a separate connection to an InterSystems IRIS® server.

  • If the targeted extent is a sharded extent then at least one buffer per shard factor is allocated, resulting in parallel writes to the factors.

  • The size of the serialized object queues, the number of buffers allocated and the size of the buffers can be configured.

  • The loader maintains its own connection pool which, in the case of sharded extents, may include connections to multiple servers.

Schemas define the content and structure of data messages and are used by serializers (data message producers) and deserializers (data message consumers).

Persister Architecture
stuff

The Persister can be used by Java applications to rapidly ingest large data sets into an InterSystems IRIS® server.

Data is organized as a set of records and is described by a Schema. Schemas are managed by the SchemaManager. The SchemaManager interacts with the InterSystems IRIS Schema Manager to synchronize schemas between the local schema cache and the Schema Repository on the server. A Persister instance is created by passing a SchemaManager instance and a RecordSchema (an implementation of the Record interface).

Persister Threading and Buffering

A Persister instance accepts data from the caller, serializes that data and writes it to a buffer. Buffers are automatically written to the InterSystems IRIS Server when full, on demand, and on close. Buffers to be written to the server can be either written immediately or placed in a queue that is monitored by a separate thread. The Persister constructor automatically creates an output buffer, but it is also possible to create local buffers for use in threads.

Thread-local instances of PersisterBuffer are created by calling Persister.createLocalBuffer().

  • createLocalBuffer() creates an instance of PersisterBuffer for use exclusively within a thread. If a bufferQueue is specified, filled buffers will be placed in the buffer queue instead of being written directly to the server

    PersisterBuffer  createLocalBuffer ()
    PersisterBuffer  createLocalBuffer (LinkedBlockingQueue< BufferWrite > bufferQueue)
    
  • flush() flushes a buffer to the server or bufferQueue if there are objects in the buffer. Optional localBuffer specifies a local buffer to be flushed. This method can be used to finish a sequence of calls to add().

    synchronized void  flush ()
    void  flush (PersisterBuffer localBuffer) 
    

Using Local Buffers with PersisterBuffer

PersisterBuffer holds data that is intended to be written to a server using the provided connection. The buffer data is flushed whenever it is full, by direct call and when it is closed.

This class does not use any concurrency protections as it is primarily used by Persister, where protections are in place. The exception is for direct use by applications wishing to use a local buffer in a single thread. The local buffer can be flushed a buffer queue to be written to the database. The buffer queue is expected to be consumed by a separate thread running the BufferWriter runnable (see Threading with BufferWriter for details).

PersisterBuffer accumulates statistics, including number of objects written, total number of bytes written, and the number of buffer flushes (see Persister Buffer Statistics for details).

class PersisterBuffer

The following PersisterBuffer methods are available:

  PersisterBuffer (long persisterFunction, int bufferSize, IRISConnection connection,
      ListWriter headerSuffix)
  PersisterBuffer (long persisterFunction, int bufferSize, IRISConnection connection,
      ListWriter headerSuffix, LinkedBlockingQueue bufferQueue)

           final void  add (ListWriter vList)
           final void  addAndWrite (ListWriter vList)
           final void  close () throws IOException
final PersisterBuffer  combine (PersisterBuffer otherBuffer)
        final long []  finish ()
                 void  flush ()
    static ListWriter  getListWriter ()
    static ListWriter  getListWriter (byte[] byteArray)
        final long []  getStatistics ()
          static void  recycleListWriter (ListWriter idList)
           final void  write () 

Threading with BufferWriter

PersisterBuffer does not use any concurrency protections as it is primarily used by Persister where protections are in place. The exception is for direct use by applications wishing to use a local buffer in a single thread.

The buffer can be placed in a queue of buffers to be written. That queue is expected to be consumed by a separate thread running the BufferWriter runnable (BufferWriter.BufferWriterRun.run()).

BufferWriter

BufferWriter consumes a BlockingQueue of BufferWrite objects and writes each one to the server. The following methods are available:

  BufferWriter (BlockingQueue< BufferWrite > bufferQueue, IRISConnection connection)
               void  close () throws IOException
               void  stopBufferWriter ()
static BufferWriter  startBufferWriter (BlockingQueue< BufferWrite > bufferQueue, IRISDataSource dataSource)
static BufferWriter  startBufferWriter (BlockingQueue< BufferWrite > bufferQueue, IRISConnection connection) 
Using a buffer queue with BufferWriter

Persister instances are thread safe. To improve performance, it is possible to create buffers that are used in a single thread. The local buffers are not thread safe, so they are passed to a buffer queue rather than being written directly to the database. When the queue is full, BufferWriter writes the buffered data to the database.

Create bufferQueue to hold local buffers, then start bufferWriter.

    LinkedBlockingQueue<BufferWrite> bufferQueue =
      new LinkedBlockingQueue<BufferWrite>(200_000);
    BufferWriter bufferWriter = BufferWriter.startBufferWriter(bufferQueue, dataSource);

Generate Runnable producer which provides a custom run() function to be passed to the threads. The runnable overrides function BufferWriter.BufferWriterRun.run().

    Runnable producer = new Runnable() {
      @Override
      public void run() {  // create separate local buffer for each thread
        PersisterBuffer localBuffer = persister.createLocalBuffer(bufferQueue);
        for (int i = 0; i < chunks; i++) {  // get data from somewhere and insert it
          Object[] data = new Object[3]{"Smith-Jones", true, 922285477L};
          persister.insert(data, localBuffer);
        }
        persister.flush(localBuffer);
        statistics.addThreadStat(persister.getStatistics(localBuffer));
      }
    };

Now call producer.run() from each thread and buffer the results:

    Thread[] producerThreads = new Thread[producerThreadCount];
    for (int tp = 0; tp < producerThreadCount; tp++) {
      producerThreads[tp] = new Thread(producer);  // pass producer.run() to thread
      producerThreads[tp].start();
    }
    for (int tp = 0; tp < producerThreadCount; tp++) {
      producerThreads[tp].join();
    }
    bufferWriter.stopBufferWriter();

See ThreadLoader in Java Persister Examples for a complete listing of the source program that provides this example.

Persister Buffer Statistics

Persister statistics are collected by the Persister and can be reported. Statistics are stored in a PersisterStatistics object (described below). The following Persister statistics methods are available:

  • getStatistics() — returns a PersisterStatistics object containing the current statistics from the buffer or a specified local buffer.

  • reportStatistics() — displays a console message listing server write statistics since this Persister instance was created or reset (see details at end of this section).

  • resetStatistics() — resets buffer startTime and refreshes baseStatistics. This will cause a future call to reportStatistics() to report on the activity starting from the time when this method is called. If localBuffer is specified, resets statistics for the local buffer.

These methods are typically called just before and after a set of Persister inserts. For example::

  persister.resetStatistics();
  PersisterStatistics statistics = new PersisterStatistics();
  statistics.setStartTime((new Date(System.currentTimeMillis())).getTime())
;
//start BufferWriter / run Persister / stop BufferWriter

  statistics.setStopTime((new Date(System.currentTimeMillis())).getTime());
  statistics.reportStatistics();

class PersisterStatistics

This is a small class that tracks statistics for a specified buffer: start time, stop time, number of objects, total bytes written, and number of buffers written to the server.

  PersisterStatistics ()
  PersisterStatistics (long startTime, long stopTime, long[] rawStat)
  PersisterStatistics (long startTime, long stopTime, ConcurrentLinkedQueue< long[]> rawStats)

synchronized void  addThreadStat (PersisterStatistics persisterStatistics)
          long []  getCumulativeRaw ()
             long  getDuration ()
             long  getStartTime ()
             long  getStopTime ()
             void  reportStatistics ()
             void  setStartTime (long startTime)
             void  setStopTime (long stopTime)

  • addThreadStat() — add a PersisterStatistics instance to this Persister instance, accumulating the statistics into the cumulativeStats and adding it to the threadStats List.

  • getCumulativeRaw() — return a long[] containing the accumulated statistics. The first element is the total object count, the second is the total byte count and the third is the total number of buffers used.

  • getDuration() — return the duration (stopTime - startTime) in milliseconds.

  • getStartTime() — start time in milliseconds.

  • getStopTime() — stop time in milliseconds.

  • reportStatistics() — display console message listing server write statistics since this Persister instance was created or reset.

  • setStartTime() — start time in milliseconds.

  • setStopTime() — stop time in milliseconds.

reportStatistics() writes the following statistics to the console:

load() executed on [threadStats.size()] threads...
Elapsed time (seconds)   = [getDuration()/1000.0f]
Number of objects stored = [objectCount]
Store rate (obj/sec)     = [objectCount * 1000.0/getDuration()]
Total bytes written      = [byteCount]
Total buffers written    = [bufferWrites]
MB per second            = [(byteCount / getDuration() * 1000f) / 1048576f]

Avg object size        = [byteCount / objectCount]

Where:

objectCount = getCumulativeRaw()[0]
byteCount = getCumulativeRaw()[1]
bufferWrites = getCumulativeRaw()[2]
FeedbackOpens in a new tab