The sql: component allows you to work with databases using JDBC queries. The difference between this component and JDBC component is that in case of SQL the query is a property of the endpoint and it uses message payload as parameters passed to the query.
This component uses
spring-jdbc behind the scenes for the actual
The SQL component also supports:
a JDBC based repository for the Idempotent Consumer EIP pattern. See further below.
a JDBC based repository for the Aggregator EIP pattern. See further below.
The SQL component can only be used to define producer endpoints. In other words,
you cannot define an SQL endpoint in a
The SQL component uses the following endpoint URI notation:
sql:select * from table where id=# order by name[?options]
Notice that the standard
? symbol that denotes the parameters to an
SQL query is substituted with the
# symbol, because the
? symbol is used to specify options for the endpoint. The
? symbol replacement can be configured on endpoint basis.
You can append query options to the URI in the following format,
Table 71. URI options
Apache Camel 2..7.5/2.8.4/2.9: Specifies whether to execute SQL batch update statements.
When set to
||Apache Camel 1.5.1/2.0: Reference to a
||Camel 2.4: Specifies a character
that will be replaced to |
|| Sets additional options on the Spring
The SQL component tries to convert the message body to an object of
java.util.Iterator type and then uses this iterator to fill the
query parameters (where each query parameter is represented by a
symbol, or other configured placeholder, in the endpoint URI). If the message body is
not an array or collection, the conversion results in an iterator that iterates over
only one object, which is the body itself.
For example, if the message body is an instance of
the first item in the list is substituted into the first occurrence of
# in the SQL query, the second item in the list is substituted
into the second occurrence of
#, and so on.
select operations, the result is an instance of
List<Map<String, Object>> type, as returned by the JdbcTemplate.queryForList() method. For
operations, the result is the number of updated rows, returned as an
update operations, the SQL Component stores the
update count in the following message headers:
Table 72. Message headers
|| Apache Camel 1.x: The number of rows updated for
|| Apache Camel 2.0: The number of rows updated for
|| Apache Camel 2.0: The number of rows returned for
The SQL component must be configured before it can be used. In Spring, you can configure it as follows:
<bean id="sql" class="org.apache.camel.component.sql.SqlComponent"> <property name="dataSource" ref="myDS"/> </bean> <bean id="myDS" class="org.springframework.jdbc.datasource.DriverManagerDataSource"> <property name="driverClassName" value="com.mysql.jdbc.Driver" /> <property name="url" value="jdbc:mysql://localhost:3306/ds" /> <property name="username" value="username" /> <property name="password" value="password" /> </bean>
You can now set a reference to a
DataSource in the URI
sql:select * from table where id=# order by name?dataSourceRef=myDS
In the sample below we execute a query and retrieve the result as a
List of rows, where each row is a
Object and the key is the column name.
First, we set up a table to use for our sample. As this is based on an unit test, we do it java code:
// this is the database we create with some initial data for our unit test jdbcTemplate.execute("create table projects (id integer primary key," + "project varchar(10), license varchar(5))"); jdbcTemplate.execute("insert into projects values (1, 'Camel', 'ASF')"); jdbcTemplate.execute("insert into projects values (2, 'AMQ', 'ASF')"); jdbcTemplate.execute("insert into projects values (3, 'Linux', 'XXX')");
Then we configure our route and our
sql component. Notice that we
direct endpoint in front of the
endpoint. This allows us to send an exchange to the
with the URI,
direct:simple, which is much easier for the client to
use than the long
sql: URI. Note that the
DataSource is looked up up in the registry, so we can use
standard Spring XML to configure our
from("direct:simple") .to("sql:select * from projects where license = # order by id?dataSourceRef=jdbc/myDataSource") .to("mock:result");
And then we fire the message into the
direct endpoint that will
route it to our
sql component that queries the database.
MockEndpoint mock = getMockEndpoint("mock:result"); mock.expectedMessageCount(1); // send the query to direct that will route it to the sql where we will execute the query // and bind the parameters with the data from the body. The body only contains one value // in this case (XXX) but if we should use multi values then the body will be iterated // so we could supply a List<String> instead containing each binding value. template.sendBody("direct:simple", "XXX"); mock.assertIsSatisfied(); // the result is a List List received = assertIsInstanceOf(List.class, mock.getReceivedExchanges().get(0).getIn().getBody()); // and each row in the list is a Map Map row = assertIsInstanceOf(Map.class, received.get(0)); // and we should be able the get the project from the map that should be Linux assertEquals("Linux", row.get("PROJECT"));
We could configure the
DataSource in Spring XML as follows:
<jee:jndi-lookup id="myDS" jndi-name="jdbc/myDataSource"/>
Available as of Camel 2.7: In this section we will use the JDBC based idempotent repository.
First we need to setup a
javax.sql.DataSource in the Spring XML
<bean id="dataSource" class="org.springframework.jdbc.datasource.SingleConnectionDataSource"> <property name="driverClassName" value="org.hsqldb.jdbcDriver"/> <property name="url" value="jdbc:hsqldb:mem:camel_jdbc"/> <property name="username" value="sa"/> <property name="password" value=""/> </bean>
And finally we can create our JDBC idempotent repository in the Spring XML file as well:
<bean id="messageIdRepository" class="org.apache.camel.processor.idempotent.jdbc.JdbcMessageIdRepository"> <constructor-arg ref="dataSource" /> <constructor-arg value="myProcessorName" /> </bean> <camel:camelContext> <camel:errorHandler id="deadLetterChannel" type="DeadLetterChannel" deadLetterUri="mock:error"> <camel:redeliveryPolicy maximumRedeliveries="0" maximumRedeliveryDelay="0" logStackTrace="false" /> </camel:errorHandler> <camel:route id="JdbcMessageIdRepositoryTest" errorHandlerRef="deadLetterChannel"> <camel:from uri="direct:start" /> <camel:idempotentConsumer messageIdRepositoryRef="messageIdRepository"> <camel:header>messageId</camel:header> <camel:to uri="mock:result" /> </camel:idempotentConsumer> </camel:route> </camel:camelContext>
Available as of Camel 2.6
|Using JdbcAggregationRepository in Camel 2.6|
In Camel 2.6, the JdbcAggregationRepository is provided in the
JdbcAggregationRepository is an
AggregationRepository which on the fly persists the aggregated
messages. This ensures that you will not loose messages, as the default aggregator will
use an in memory only
JdbcAggregationRepository allows together with Camel to provide
persistent support for the Aggregator.
It has the following options:
Table 73. jdbcAggregateRepository options
||Mandatory: The name of the repository.|
|| A |
||boolean|| Whether the get operation should return the old existing Exchange if
any existed. By default this option is |
||boolean|| Whether or not recovery is enabled. This option is by default
||long||If recovery is enabled then a background task is run every x'th time to scan for failed exchanges to recover and resubmit. By default this interval is 5000 millis.|
||int|| Allows you to limit the maximum number of redelivery attempts for a
recovered exchange. If enabled then the Exchange will be moved to the
dead letter channel if all redelivery attempts failed. By default this
option is disabled. If this option is used then the
||String|| An endpoint uri for a Dead Letter Channel where exhausted recovered
Exchanges will be moved. If this option is used then the
JdbcAggregationRepository will only preserve any
Serializable compatible data types. If a data type is not such a
type its dropped and a
WARN is logged. And it only persists the
Message body and the
Message headers. The
Exchange properties are not
JdbcAggregationRepository will by default recover any failed
exchange. It does this by having a background tasks that scans for failed Exchanges in the persistent store. You can use the
checkInterval option to set how often this task runs. The
recovery works as transactional which ensures that Camel will try to recover and
redeliver the failed Exchange. Any Exchange which was found to be recovered will be restored
from the persistent store and resubmitted and send out again.
The following headers are set when an exchange is being recovered/redelivered:
Table 74. Recovery/redelivery headers
||Boolean||Is set to true to indicate the Exchange is being redelivered.|
||Integer||The redelivery attempt, starting from 1.|
Only when an Exchange has been successfully processed
it will be marked as complete which happens when the
is invoked on the
AggregationRepository. This means if the same Exchange fails again it will be kept retried until it
You can use option
maximumRedeliveries to limit the maximum number
of redelivery attempts for a given recovered Exchange.
You must also set the
deadLetterUri option so Camel knows where to
send the Exchange when the
maximumRedeliveries was hit.
You can see some examples in the unit tests of camel-sql, for example this test.
To be operational, each aggregator uses two table: the aggregation and completed one.
By convention the completed has the same name as the aggregation one suffixed with
"_COMPLETED". The name must be configured in the Spring bean with
RepositoryName property. In the following example aggregation
will be used.
The table structure definition of both table are identical: in both case a String value is used as key (id) whereas a Blob contains the exchange serialized in byte array. However one difference should be remembered: the id field does not have the same content depending on the table. In the aggregation table id holds the correlation Id used by the component to aggregate the messages. In the completed table, id holds the id of the exchange stored in corresponding the blob field.
Here is the SQL query used to create the tables, just replace
"aggregation" with your aggregator repository name.
CREATE TABLE aggregation ( id varchar(255) NOT NULL, exchange blob NOT NULL, constraint aggregation_pk PRIMARY KEY (id) ); CREATE TABLE aggregation_completed ( id varchar(255) NOT NULL, exchange blob NOT NULL, constraint aggregation_completed_pk PRIMARY KEY (id) );
Since they can contain any type of payload, Exchanges are not serializable by design.
It is converted into a byte array to be stored in a database BLOB field. All those
conversions are handled by the
JdbcCodec class. One detail of the
code requires your attention: the
ClassLoadingAwareObjectInputStream has been reused from the
Apache ActiveMQ project. It wraps
ObjectInputStream and use it with the
ContextClassLoader rather than the
currentThread one. The benefit is to be able to load classes
exposed by other bundles. This allows the exchange body and headers to have custom types
start method verify the connection of the database and the
presence of the required tables. If anything is wrong it will fail during
Depending on the targeted environment, the aggregator might need some configuration.
As you already know, each aggregator should have its own repository (with the
corresponding pair of table created in the database) and a data source. If the default
lobHandler is not adapted to your database system, it can be injected with the
Here is the declaration for Oracle:
<bean id="lobHandler" class="org.springframework.jdbc.support.lob.OracleLobHandler"> <property name="nativeJdbcExtractor" ref="nativeJdbcExtractor"/> </bean> <bean id="nativeJdbcExtractor" class="org.springframework.jdbc.support.nativejdbc.CommonsDbcpNativeJdbcExtractor"/> <bean id="repo" class="org.apache.camel.processor.aggregate.jdbc.JdbcAggregationRepository"> <property name="transactionManager" ref="transactionManager"/> <property name="repositoryName" value="aggregation"/> <property name="dataSource" ref="dataSource"/> <!-- Only with Oracle, else use default --> <property name="lobHandler" ref="lobHandler"/> </bean>