Using the SQL Inbound Adapter
This chapter describes the default behavior of the Ensemble SQL inbound adapter (EnsLib.SQL.InboundAdapterOpens in a new tab) and describes how to use this adapter in your productions. It discusses the following topics:
-
How to reset the adapter so that it “forgets” which rows it has already processed (a useful task during development)
Overall Behavior
First, it is useful to understand the details that you specify for the adapter. The EnsLib.SQL.InboundAdapterOpens in a new tab class provides runtime settings that you use to specify items like the following:
-
A polling interval, which controls how frequently the adapter checks for new input
-
The external data source to which the adapter connects
-
The ID of the Ensemble credentials that provide the username and password for that data source, if needed
-
An SQL query to execute
-
Optional parameters to use in the query
In general, the inbound SQL adapter (EnsLib.SQL.InboundAdapterOpens in a new tab) periodically executes a query and then iterates through the rows of the result set, passing one row at a time to the associated business service. The business service, which you create and configure, uses this row and communicates with the rest of the production. More specifically:
-
The adapter regularly executes its OnTask() method, which executes the given query. The polling interval is determined by the CallInterval setting.
-
If the query returns any rows, the adapter iterates through the rows in the result set and does the following for each row:
-
If this row has already been processed and has not changed, the adapter ignores it.
To determine if a given row has already been processed, the adapter uses the information in the KeyFieldName setting; see “Processing Only New Rows” later in this chapter.
-
If this row has already been processed (as identified by the KeyFieldName setting) and an error occurred, the adapter ignores it until the next restart.
-
Otherwise, the adapter builds an instance of the EnsLib.SQL.SnapshotOpens in a new tab class and puts the row data into it. This instance is the snapshot object. “Using the Default Snapshot Object” later in this chapter provides details on this object.
The adapter then calls the internal ProcessInput() method of the associated business service class, passing the snapshot object as input.
-
-
The internal ProcessInput() method of the business service class executes. This method performs basic Ensemble tasks such as setting up a monitor and error logging; these tasks are needed by all business services. You do not customize or override this method, which your business service class inherits.
-
The ProcessInput() method then calls your custom OnProcessInput() method, passing the snapshot object as input. The requirements for this method are described later in “Implementing the OnProcessInput() Method.”
The following figure shows the overall flow:
Creating a Business Service to Use the Adapter
To use this adapter in your production, create a new business service class as described here. Later, add it to your production and configure it. You must also create appropriate message classes, if none yet exist. See “Defining Ensemble Messages” in Developing Ensemble Productions.
The following list describes the basic requirements of the business service class:
-
Your business service class should extend Ens.BusinessServiceOpens in a new tab.
-
In your class, the ADAPTER parameter should equal EnsLib.SQL.InboundAdapterOpens in a new tab.
-
Your class should implement the OnProcessInput() method, as described in “Implementing the OnProcessInput() Method.”
-
Your class can optionally implement OnInit(); see “Initializing the Adapter.”
-
For other options and general information, see “Defining a Business Service Class” in Developing Ensemble Productions.
The following example shows the general structure that you need:
Class ESQL.NewService1 Extends Ens.BusinessService
{
Parameter ADAPTER = "EnsLib.SQL.InboundAdapter";
Method OnProcessInput(pInput As EnsLib.SQL.Snapshot, pOutput As %RegisteredObject) As %Status
{
Quit $$$ERROR($$$NotImplemented)
}
}
Studio provides a wizard that you can use to create a stub similar to the preceding. To access this wizard, click New on the File menu and then click the Production tab. Choose to create a business service and select EnsLib.SQL.InboundAdapterOpens in a new tab as the associated inbound adapter. The wizard then uses EnsLib.SQL.SnapshotOpens in a new tab as the specific input argument needed with this adapter.
Implementing the OnProcessInput() Method
Within your custom business service class, your OnProcessInput() method should have the following signature:
Method OnProcessInput(pInput As EnsLib.SQL.Snapshot,
pOutput As %RegisteredObject) As %Status
Here pInput is the snapshot object that the adapter sends to this business service; this is an instance of EnsLib.SQL.SnapshotOpens in a new tab. Also, pOutput is the generic output argument required in the method signature.
The OnProcessInput() method should do some or all of the following:
-
Create an instance of the request message, which will be the message that your business service sends.
For information on creating message classes, see “Defining Ensemble Messages” in Developing Ensemble Productions.
-
For the request message, set its properties as appropriate, using values in the snapshot object. This object corresponds to a single row returned by your query; for more information, see “Using the Default Snapshot Object.”
-
Call a suitable method of the business service to send the request to some destination within the production. Specifically, call SendRequestSync(), SendRequestAsync(), or (less common) SendDeferredResponse(). For details, see “Sending Request Messages” in Developing Ensemble Productions
Each of these methods returns a status (specifically, an instance of %StatusOpens in a new tab).
-
Optionally check the status of the previous action and act upon it.
-
Optionally examine the response message that your business service has received and act upon it.
-
Return an appropriate status.
The following shows a simple example:
Method OnProcessInput(pInput As EnsLib.SQL.Snapshot,
pOutput As %RegisteredObject) As %Status
{
set req=##class(ESQL.request).%New()
set req.CustomerID=pInput.Get("CustomerID")
set req.SSN=pInput.Get("SSN")
set req.Name=pInput.Get("Name")
set req.City=pInput.Get("City")
set sc=..SendRequestSync("ESQL.operation",req,.pOutput)
quit sc
}
Notice that this example uses the Get() method of EnsLib.SQL.SnapshotOpens in a new tab to get data for specific columns; see “Using the Default Snapshot Object.”
Using the Default Snapshot Object
When you create a business service class to work with EnsLib.SQL.InboundAdapterOpens in a new tab, the adapter passes a snapshot object (an instance of EnsLib.SQL.SnapshotOpens in a new tab) to your custom OnProcessInput() method. This instance contains data for one row of the data returned by your query. Your OnProcessInput() method typically uses the data available in this object.
The EnsLib.SQL.SnapshotOpens in a new tab class provides properties and methods to manage multiple rows. However, multiple-row snapshots are relevant only for operations that use the outbound adapter, described later in this book.
The following list describes the methods that you are most likely to use within your custom OnProcessInput() method:
method Get(pName As %String, pRow=..%CurrentRow) returns %String
Returns the value of the column that has the name pName, in the current row (which is the only row in this case).
method GetColumnId(pName As %String) returns %Integer
Returns the ordinal position of the column that has the name pName. This method is useful when you work with unfamiliar tables.
method GetData(pColumn As %Integer, pRow=..%CurrentRow) returns %String
Returns the value of the column whose position is specified by pColumn in the current row (which is the only row in this case). From left to right, the first column is 1, the second column is 2, and so on.
method GetColumnName(pColumn As %Integer = 0)
Returns the name of the column whose position is specified by pColumn.
method GetColumnSize(pColumn As %Integer = 0)
Returns the size (the width in number of characters) of the database field whose position is specified by pColumn.
method GetColumnType(pColumn As %Integer = 0)
Returns the SQL type of the column whose position is specified by pColumn, for example, VARCHAR, DATE, or INTEGER.
SQL type names vary between different database vendors.
The following shows how you might use the Get() method to extract data from the snapshot and use it to populate the request message:
set req=##class(ESQL.request).%New()
set req.CustomerID=pInput.Get("CustomerID")
set req.SSN=pInput.Get("SSN")
set req.Name=pInput.Get("Name")
set req.City=pInput.Get("City")
Initializing the Adapter
To initialize the inbound adapter, customize the OnInit() method of your custom business service class. This method is executed during startup of the business host; by default, this method does nothing.
Method OnInit() As %Status
The most common reason to initialize the adapter is to initialize values for use as parameters of the query, as described in “Specifying Inbound Queries.” The following subsections list the relevant methods and provide an example.
Initializing Persistent Values
EnsLib.SQL.InboundAdapterOpens in a new tab provides the following method to initialize persistent values that are saved between restarts of the business service:
ClassMethod InitializePersistentValue
(pConfigName As %String,
pPersistentValueName As %String = "%LastKey",
pNewValue As %String)
As %String
Use this to initialize a name-value pair associated with the adapter and then use the name for a parameter of the query. This method checks the current value of the given persistent name-value pair. If the value is currently null, this method sets it equal to pNewValue.
By default, if you omit the name, the method initializes the persistent name-value pair &%LastKey which contains the IDKey value of the last row processed by the adapter.
In some cases, you might instead need the InitializeLastKeyValue(), which initializes the transient adapter property %LastKey. This property is reset each time the business service is started. Also see the class documentation for EnsLib.SQL.InboundAdapterOpens in a new tab for information on the related methods SetPersistentValue() and GetPersistentValue().
Examples
To initialize the &%LastKey persistent value, you would customize the OnInit() method of your business service to include the following:
Method OnInit() As %Status
{
#; initialize persistent last key value
Do ..Adapter.InitializePersistentValue(..%ConfigName,,0)
Quit $$$OK
}
To initialize the &TopSales persistent value, you would customize the OnInit() method of your business service to include the following:
Method OnInit() As %Status
{
#; must initialize so the query can do a numeric comparison
Do ..Adapter.InitializePersistentValue(..%ConfigName,"TopSales",0)
Quit $$$OK
}
Adding and Configuring the Business Service
To add your business service to an Ensemble production, use the Management Portal to do the following:
-
Add an instance of your custom business service class to the Ensemble production.
-
Enable the business service.
-
Set the PoolSize setting to 1.
If PoolSize is larger than 1, the adapter will process many records twice.
-
Configure the adapter to communicate with a specific external data source. Specifically:
-
Specify the data source name to which the adapter should connect
-
Specify the query that the adapter should execute
-
Specify other common runtime settings for your production
These topics are discussed later in this chapter.
-
-
Run the production.
Specifying the Data Source Name
The EnsLib.SQL.InboundAdapterOpens in a new tab provides a runtime setting that you use to specify the data source to which you want to connect. When you configure the business service, you should set an appropriate initial value for this setting:
This data source name specifies the external data source to which to connect. Ensemble distinguishes between these three forms automatically: a defined Caché SQL Gateway connection, a JDBC URL, or an ODBC DSN configured in your operating system.
If this name matches the name of a JDBC or ODBC connection configured from the System Administration > Configure > Connectivity > SQL Gateway Connections page of the Management Portal, Ensemble uses the parameters from that specification. If the entry is not the name of a configured connection and it contains a colon (:), it assumes a JDBC URL, otherwise it assumes an ODBC DSN.
The following example shows the name of a DSN that refers to a JDBC URL:
jdbc:Cache://localhost:9982/Samples
The following example shows the name of an ODBC DSN that refers to a Microsoft Access database:
accessplayground
If this data source is protected by a password, create Ensemble credentials to contain the username and password. Then set the Credentials setting equal to the ID of those credentials; see “Specifying Other Runtime Settings” for details.
If you are using a JDBC data source, the following settings also apply:
Configuration name of the Java Gateway service controlling the Java Gateway server this operation uses.
This setting is required for all JDBC data sources, even if you are using a working SQL gateway connection with JDBC. For JDBC connections to work, a business service of type EnsLib.JavaGateway.ServiceOpens in a new tab must be present. The SQL adapter requires the name of this configuration item and uses its configured settings to connect to the JVM it manages.
JDBC driver class name.
If you use a named SQL Gateway Connection as DSN, this value is optional; but if present, it overrides the value specified in the named JDBC SQL Gateway Connection set of properties.
Classpath for JDBC driver class name, if needed in addition to the ones configured in the Java Gateway Service.
An optional set of SQL connection attribute options. For ODBC, they have the form:
attr:val,attr:val
For example, AutoCommit:1.
For JDBC, they have the form
attr=val;attr=val
For example, TransactionIsolationLevel=TRANSACTION_READ_COMMITTED.
If you use a named JDBC SQL Gateway Connection as DSN, this value is optional; but if present, it overrides the value specified in the named JDBC SQL Gateway Connection set of properties.
Specifying Inbound Queries
By default, the EnsLib.SQL.InboundAdapterOpens in a new tab executes a query periodically (and sends the results, row by row, to the business service). This section includes information on the following:
Specifying the Query
To specify the base query used by the inbound SQL adapters, you use the Query setting, which specifies the base query string. It can include the standard SQL ? to represent replaceable parameters, which you specify in a separate setting (discussed in the next heading). Consider the following examples:
SELECT * FROM Customer
SELECT p1,p2,p3 FROM Customer WHERE updatetimestamp > ?
SELECT * FROM Sample.Person WHERE ID > ?
SELECT * FROM Sample.Person WHERE Age > ?, PostalCode = ?
Specifying Parameters
The Parameters setting specifies any replaceable parameters in the query string. This setting should equal a comma-separated list of parameter value specifiers, as follows:
value,value,value,...
For a given value, you can use a constant literal value such as 10 or Gotham City; or you can refer to any of the following:
-
You can use a property of the adapter. Within the Parameters setting, to refer to a property of the adapter, simply refer to the property by its name.
-
You can refer to a property of the associated business service. Use the syntax $property_name
Within the Parameters setting, if a parameter name starts with a dollar sign ($), Ensemble assumes that it is a property of the service class.
For example, you could add a property named LastTS to the business service class to contain a timestamp. Within the Parameters setting, you would refer to the value of that property as $LastTS
-
You can refer to a special persistent value such as &%LastKey, which contains the IDKey value of the last row processed by the adapter.
Within the Parameters setting, if a parameter name starts with an ampersand (&), Ensemble assumes that it is a special persistent value.
Note:For information on initializing these values, see “Initializing the Adapter.”
Processing Only New Rows
Because it is often undesirable to keep executing a query against the same data, the EnsLib.SQL.InboundAdapterOpens in a new tab provides several tools that you can use to keep track of the rows that it has processed. This section discusses those tools and then describes several ways to use them in practice.
Be sure to set the PoolSize setting equal to 1. If it is larger than 1, the adapter will process many records twice.
Available Tools
The EnsLib.SQL.InboundAdapterOpens in a new tab provides the following tools to keep track of the rows that it has processed:
-
If you specify the KeyFieldName setting, the adapter adds data to a Caché global that indicates which rows it has processed. This setting should refer to a field that contains values that are not reused over time; this field must be in the result set returned by the query. The adapter uses the data in that field to evaluate whether a row has previously been processed.
-
The adapter provides a persistent value, &%LastKey, that contains the value of the Key Field Name for the last row that was processed. This special persistent value is saved when you restart the business service.
-
The adapter provides a transient property, %LastKey, that contains the value of the KeyFieldName for the last row that was processed. This adapter property is created each time you restart the associated business service.
The latter two options are practical only if KeyFieldName refers to a field that increases monotonically for each new row. Also see “Initializing the Adapter.”
Practical Ways to Not Reprocess Rows
There are three practical ways that you can ensure that you do not reprocess the same data:
-
Use the KeyFieldName setting of the adapter. If specified, this setting should refer to a field that contains values that are not reused over time; this field must be in the result set returned by the query. If you specify a KeyFieldName, the adapter uses the data in that field to evaluate whether a row has previously been processed. Note that if you specify the pseudo field %ID in the SELECT and want to specify this field as the KeyFieldName, you should specify the KeyFieldName as ID and should omit the % (percent sign).
The default is ID.
For example, you could specify Query as follows:
SELECT ID,Name,Done from Sample.Person
And then you could specify KeyFieldName as follows:
ID
This technique can be inefficient because Ensemble might select a large number of rows during each polling cycle, and only a small number of those rows might be new.
-
Use a query that uses a query parameter that refers to the special persistent value &%LastKey (or the transient adapter property %LastKey). For example, you could specify Query as follows:
SELECT ID,Name,Done from Sample.Person WHERE ID>?
And then you could specify Parameters as follows:
&%LastKey
Also see “Initializing the Adapter.”
-
After executing the query, delete the source data or update it so that the query will not return the same rows. To do this, you use the DeleteQuery setting. By default, after the EnsLib.SQL.InboundAdapterOpens in a new tab executes the main query (the Query setting), it executes the DeleteQuery once for each row returned by the main query.
This query must include exactly one replaceable parameter (a question mark), which the adapter will replace with value specified by KeyFieldName.
This query can either delete the source data or can perform an update to ensure that the same rows will not be selected by the main query of the adapter.
For example, you could specify Query as follows:
SELECT ID,Name,Done from Sample.Person WHERE Done=0
And then you could specify DeleteQuery as follows:
UPDATE Sample.Person SET Done=1 WHERE ID=?
Reprocessing Rows
In many cases, it is necessary to notice changes in a row that has previously been processed by the SQL inbound adapter. The easiest way to do this is as follows:
-
Include a column in the relevant table to record if a given row has been changed.
-
Install an update trigger in the data source to update that column when appropriate.
-
Within the query used by the adapter, use the value of this column to determine whether to select a given row.
Be sure to set the PoolSize setting equal to 1. If it is larger than 1, the adapter will process many records twice.
Examples That Use Query Settings
This section provides examples that use the preceding settings.
Example 1: Using KeyFieldName
In the simplest example, we select rows from the Customer table. This table has a primary key, the CustomerID field, an integer that is automatically incremented for each new record. We are interested only in new rows, so we can use this field as the KeyFieldName. Within our production, we use the following settings for the adapter:
Setting | Value |
---|---|
Query | SELECT * FROM Customer |
DeleteQuery | none |
KeyFieldName | CustomerID |
Parameters | none |
When the production starts, the adapter will automatically select and process all rows in the Customer table. As it processes each row, it adds an entry to the Ensemble Event Log; this entry will have text like the following:
Processing row '216'
Here '216' refers to the CustomerID of the row being processed.
After the production startup, during each polling cycle, the adapter will select all rows but will process only the rows that have a new value in the CustomerID field.
Example 2: Using &%LastKey or %LastKey
This example is a variation of the preceding. In this case, the main query selects a subset of the rows, which is more efficient than selecting all the rows.
Setting | Value |
---|---|
Query | SELECT * FROM Customer WHERE ID>? |
DeleteQuery | none |
KeyFieldName | CustomerID |
Parameters | &%LastKey |
In each polling cycle, the adapter determines the value of &%LastKey and passes that value to SQL.
Also see “Initializing the Adapter.”
When the adapter selects a set of rows, it may or may not process them in the order given by the KeyFieldName. For example, in a given polling cycle, it may select rows with CustomerID equal to 101, 102, 103, 104, and 105, but it may process customer 103 last (instead of customer 105). After this polling cycle, the value of &%LastKey equals 103. So in the next cycle, the adapter will select customers 104 and 105 again, although it will not reprocess them. (This is still more efficient than reselecting all the rows as in the previous example.) To force the adapter to process the rows in a specific order, include an ORDER BY clause within the query, for example:
SELECT * FROM Customer WHERE ID>? ORDER BY CustomerID
In this case, the value of &%LastKey will always be set to the highest CustomerID and no rows will be selected more than once.
Example 3: Using DeleteQuery
In this example, the Customer table has a field called Done, in which we can record whether the adapter has previously selected a given row.
Setting | Value |
---|---|
Query | SELECT * FROM Customer WHERE Done=0 |
DeleteQuery | UPDATE Customer SET Done=1 WHERE CustomerID=? |
KeyFieldName | CustomerID |
Parameters | none |
In common with the preceding example, this example selects any given row only once.
Using the delete query can be slow, because this query is executed once for each row that is processed. It is more efficient to perform a batch delete (or batch update) at a regular interval.
Example 4: Working with a Composite Key
In many cases, you might want to treat multiple table fields collectively as the primary key. For example, you might have a table of statistics that includes the fields Month and Year, and your query might need to treat the month and year together as the unique key for the adapter. In such a case, you would use a query that concatenates the relevant fields and uses the AS clause to provide an alias for the composite field.
For example, with SQL*Server, you could use a query that starts as follows:
SELECT Stats, Year||Month as ID ...
The result set available in the adapter will have a field named ID, which you can use for KeyFieldName.
The syntax for concatenation depends upon the database with which you are working.
Example 5: No KeyFieldName
In some cases, you might not want to use KeyFieldName. If KeyFieldName is null, the adapter does not distinguish rows and does not skip rows that had either an error or were successfully processed already.
For example:
Setting | Value |
---|---|
Query | Select * from Cinema.Film Where TicketsSold>? |
DeleteQuery | none |
KeyFieldName | none |
Parameters | &TopSales (this refers to a special persistent value named TopSales that is defined within the OnProcessInput() method of the business service) |
The business service is as follows:
Class Test.SQL.TopSalesService Extends Ens.BusinessService
{
Parameter ADAPTER = "EnsLib.SQL.InboundAdapter";
Parameter REQUESTCLASSES As %String = "EnsLib.SQL.Snapshot";
Method OnInit() As %Status
{
#; must initialize so the query can do a numeric comparison
Do ..Adapter.InitializePersistentValue(..%ConfigName,"TopSales",0)
Quit $$$OK
}
Method OnProcessInput(pInput As EnsLib.SQL.Snapshot,
Output pOutput As Ens.Response) As %Status
{
Kill pOutput Set pOutput=$$$NULLOREF
for j=1:1:pInput.ColCount {
}
for i=1:1:pInput.RowCount {
for j=1:1:pInput.ColCount {
}
}
Set tSales=pInput.Get("TicketsSold")
Set:tSales>$G($$$EnsStaticAppData(
..%ConfigName,"adapter.sqlparam","TopSales")) ^("TopSales")=tSales
Quit $$$OK
}
}
Specifying Other Runtime Settings
EnsLib.SQL.InboundAdapterOpens in a new tab provides the following additional runtime settings.
Specifies the polling interval, in seconds, for the adapter. This specifies how frequently this adapter checks for input.
Upon polling, if the adapter finds input, it creates an appropriate Ensemble object and passes the object to its associated business service. If several inputs are detected at once, the adapter processes all of them sequentially until no more are found. The adapter sends one request to the business service for each item of input it finds. The adapter then waits for the polling interval to elapse before checking for input again. This cycle continues whenever the production is running and the business service is enabled and scheduled to be active.
It is possible to set a property in the business service so that the adapter delays for the duration of the CallInterval in between processing each input. For details, see Developing Ensemble Productions.
The default CallInterval is 5 seconds. The minimum is 0.1 seconds.
ID of the Ensemble credentials that can authorize a connection to the given DSN. For information on creating Ensemble credentials, see Configuring Ensemble Productions.
Specifies whether to keep the connection open between commands, such as issuing an SQL statement or changing an ODBC driver setting.
-
If this setting is 0, the adapter will disconnect immediately after each SQL command.
-
If this setting is –1, the adapter auto-connects on startup and then stays connected. Use this value, for example, if you are managing database transactions as described in “Managing Transactions,” later in this book.
This setting can also be positive (which specifies the idle time after each SQL command, in seconds), but such a value is not useful for the SQL inbound adapter, which works by polling. (If the idle time is longer than the polling interval [CallInterval], the adapter stays connected all the time. If the idle time is shorter than the polling interval, the adapter disconnects and reconnects at every polling interval — meaning that the idle time is essentially ignored.)
For any settings not listed here, see Configuring Ensemble Productions.
Resetting Rows Previously Processed by the Inbound Adapter
During development and testing, you might find it useful to reset the adapter for a given business service in order to repeat previous tests. To do so, use one of the following methods described here; these are class methods inherited by EnsLib.SQL.InboundAdapterOpens in a new tab.
You do not normally use these methods within a live production.
ClassMethod ClearRuntimeAppData(pConfigName As %String)
Clears all runtime data for the business service that has the given configured name. Note that you can use the adapter property %ConfigName to access the name of currently configured business service. This data is cleared automatically each time the business service starts.
ClassMethod ClearStaticAppData(pConfigName As %String)
Clears static data for the business service specified by the configured name. This data includes all persistent values associated with the adapter, such as the persistent last key value.
ClassMethod ClearAllAppData(pConfigName As %String)
This method just executes the ClearRuntimeAppData() and ClearStaticAppData() class methods.