Month: November 2014

Spring Batch Partitioner – Case Study with SourceCode – Best Practices

I’m writing this post because i report a bug at Spring Community Jira, this is the link:

https://jira.spring.io/browse/BATCH-2309

I started a sample project which could reproduce the problem to show the community what I was experiencing, but to my surprise I was using the partitioner feature incorrectly. I am writing this post to share what I learned throughout this experience to help those who are going through the same questions.

My Goal: I wanted to use the resource partitioner for parallel processing but was worried to use the primary key of the table (column ID) because my table has gaps (id column is not incremental) and for this reason the partitioner would distribute number of different records for each thread, thus being inefficient in their distribution.

For example:

This is the good example partitioner:

https://github.com/spring-projects/spring-batch/blob/master/spring-batch-samples/src/main/java/org/springframework/batch/sample/common/ColumnRangePartitioner.java

Suppose that my table has the following records: Ids 1, 8, 9,10 11, 12, 13, 14, 15.

min: 1

max: 15

gridSize = number of threads = 2 in this example

target size calculation: int targetSize = (max min) / gridSize + 1;

(15 – 1) / 2 + 1 = 8

In this example:

Thread number 1 will receive to process: 1 to 8

Thread number 2 will receive to process: 9 to 16

The Problem: Thread 1 receives only two records to process (The Id’s 1 and 8) and the thread 2 will receive 7 records to process. At this case the partitioner to split incorrectly number of records between threads.

My Goal: I want to split the number of records equally between all threads.

Where I was going wrong: To achieve my goal I tried to use a query that makes use of rownum and / or ntile oracle feature, the goal was to use the split an id that is sequential, with no gaps in the id column table, so the load would be uniform among the threads. The JdbcPagingItemReader class can not be used with multithreaded characteristics using Oracle ROWNUM because the query is partially executed multiple times in the database and there is no guarantee that all records are processed because a confusion of Ids between threads occurs.

 The correct way: You can use JdbcPagingItemReader using the Primary Key column (may be single or multiple columns) or JdbcCursorItemReader can use both the PK column or  Rownum / NTILE to do division.

Why use JdbcCursorItemReader not cause problems of mistaken IDs or lost records ?

This class executes the query once the database and will use chunk mode to fetch the records as needed. If you use a rownum column in this case will not cause data loss because every query is processed only once in the database.

To illustrate and facilitate understanding, I created a design example set with various possible configurations available here:

springbatchpoc

GitHub Example Project:

https://github.com/victorjabur/PartitionSpringBatch_DataLose_Poc_BATCH-2309

Here are the sql scripts to create the database tables used in this poc:

https://github.com/victorjabur/PartitionSpringBatch_DataLose_Poc_BATCH-2309/tree/master/src/main/resources/sql

  1. JdbcCursorItemReader-OracleNtile – It works
  2. JdbcCursorItemReader-OracleRownum – It works
  3. JdbcPagingItemReader-OracleNtile – It not works, don’t use this. PagingReader does not work with NTile
  4. JdbcPagingItemReader-OracleRownum – It not works, don’t use this. PagingReader does not work with Rownum
  5. JdbcPagingItemReader-TablePrimaryKey – It works, but the records aren’t distributed in an uniform way (same quantity for each thread)

What is Oracle NTile ?
This feature of Oracle Database can create a desired number of containers so that each thread can consume one. For example: I have 1000 records in the database to be divided among 10 threads:

SELECT ID, DESCRIPTION, FLAG_PROCESSED, NTILE(10) OVER (ORDER BY ID)
AS CONTAINER_COLUMN FROM TABLE_SOURCE
WHERE FLAG_PROCESSED = 'N';

With this query, you can use the column “CONTAINER_COLUMN”, values are already pre split into buckets ready to be divided among the various threads.

This is the documentation with more clarified explanation:

https://docs.oracle.com/cd/B19306_01/server.102/b14200/functions101.htm

That’s it.

Any question or suggestion is very welcome.

Credits to this post:

http://alexandreesl.wordpress.com/2014/09/21/spring-batch-construindo-processamento-massivo-de-dados-em-java/
http://www.mkyong.com/spring-batch/spring-batch-partitioning-example/
https://github.com/spring-projects/spring-batch/tree/master/spring-batch-samples

Cheers,
Victor Jabur

How to consume a WebService that uses Ws-Security Authentication (UsernameToken) – OWSM – Oracle Service Bus (OSB)

Hi,

The Oracle Service Bus (OSB) allows to enable OWSM authentication, there is many policies that can be applied to the Proxy Service to turn on security authentication. The most basic of this policies is:

oracle / wss_username_token_service_policy

Requiring only a username and password. Once enabled this security, following a tip on how to make a request using a Java Client.

File: MainPost.java – This is a main class to make a request

import java.math.BigInteger;
import java.util.ArrayList;
import java.util.GregorianCalendar;
import java.util.List;

import javax.xml.datatype.DatatypeConstants;
import javax.xml.datatype.DatatypeFactory;
import javax.xml.datatype.XMLGregorianCalendar;
import javax.xml.ws.BindingProvider;
import javax.xml.ws.handler.Handler;

public class MainPost {

	public static void main(String[] args) {

		try {

			MyService service = new MyService();
			MyServicePort myServicePort = service.getMySoapPort();

			// This is the block that apply the Ws Security to the request
			BindingProvider bindingProvider = (BindingProvider) myServicePort;
			@SuppressWarnings("rawtypes")
			List<Handler> handlerChain = new ArrayList<Handler>();
			handlerChain.add(new WSSecurityHeaderSOAPHandler("myUsername", "myPassword"));
			bindingProvider.getBinding().setHandlerChain(handlerChain);

			RequestType myRequest = new RequestType();
			myRequest.setId(25);

			ResponseType response = myServicePort.searchSomething(myRequest);

		} catch (Exception e) {
			e.printStackTrace();
		}
	}
}

File: WSSecurityHeaderSOAPHandler.java – This is a handler responsible for creating the header authentication.

import java.util.Set;

import javax.xml.namespace.QName;
import javax.xml.soap.SOAPElement;
import javax.xml.soap.SOAPEnvelope;
import javax.xml.soap.SOAPHeader;
import javax.xml.ws.handler.MessageContext;
import javax.xml.ws.handler.soap.SOAPHandler;
import javax.xml.ws.handler.soap.SOAPMessageContext;

public class WSSecurityHeaderSOAPHandler implements SOAPHandler<SOAPMessageContext> {

 private static final String SOAP_ELEMENT_PASSWORD = "Password";
 private static final String SOAP_ELEMENT_USERNAME = "Username";
 private static final String SOAP_ELEMENT_USERNAME_TOKEN = "UsernameToken";
 private static final String SOAP_ELEMENT_SECURITY = "Security";
 private static final String NAMESPACE_SECURITY = "http://docs.oasis-open.org/wss/2004/01/oasis-200401-wss-wssecurity-secext-1.0.xsd";
 private static final String PREFIX_SECURITY = "wsse";

 private String usernameText;
 private String passwordText;

 public WSSecurityHeaderSOAPHandler(String usernameText, String passwordText) {
 this.usernameText = usernameText;
 this.passwordText = passwordText;
 }

 public boolean handleMessage(SOAPMessageContext soapMessageContext) {

 Boolean outboundProperty = (Boolean) soapMessageContext.get(MessageContext.MESSAGE_OUTBOUND_PROPERTY);

 if (outboundProperty.booleanValue()) {

 try {
 SOAPEnvelope soapEnvelope = soapMessageContext.getMessage().getSOAPPart().getEnvelope();

 SOAPHeader header = soapEnvelope.getHeader();
 if (header == null) {
 header = soapEnvelope.addHeader();
 }

 SOAPElement soapElementSecurityHeader = header.addChildElement(SOAP_ELEMENT_SECURITY, PREFIX_SECURITY,
 NAMESPACE_SECURITY);

 SOAPElement soapElementUsernameToken = soapElementSecurityHeader.addChildElement(SOAP_ELEMENT_USERNAME_TOKEN, PREFIX_SECURITY);
 SOAPElement soapElementUsername = soapElementUsernameToken.addChildElement(SOAP_ELEMENT_USERNAME, PREFIX_SECURITY);
 soapElementUsername.addTextNode(this.usernameText);

 SOAPElement soapElementPassword = soapElementUsernameToken.addChildElement(SOAP_ELEMENT_PASSWORD, PREFIX_SECURITY);
 soapElementPassword.addTextNode(this.passwordText);

 } catch (Exception e) {
 throw new RuntimeException("Error on wsSecurityHandler: " + e.getMessage());
 }

 }

 return true;
 }

 @Override
 public void close(MessageContext context) {
 // TODO Auto-generated method stub
 }

 @Override
 public boolean handleFault(SOAPMessageContext context) {
 // TODO Auto-generated method stub
 return true;
 }

 @Override
 public Set<QName> getHeaders() {
 // TODO Auto-generated method stub
 return null;
 }
}

Xml Request: This is the payload request that Java Client request to the server.

<soapenv:Envelope xmlns:soapenv="http://schemas.xmlsoap.org/soap/envelope/">
 <soapenv:Header>
 <wsse:Security xmlns:wsse="http://docs.oasis-open.org/wss/2004/01/oasis-200401-wss-wssecurity-secext-1.0.xsd">
 <wsse:UsernameToken>
 <wsse:Username>myUsername</wsse:Username>
 <wsse:Password Type="http://docs.oasis-open.org/wss/2004/01/oasis-200401-wss-username-token-profile-1.0#PasswordText">myPassword</wsse:Password>
 </wsse:UsernameToken>
 </wsse:Security>
 </soapenv:Header>
 <soapenv:Body>
 <searchSomething>
 <Id>25</Id>
 </searchSomething>
 </soapenv:Body>
</soapenv:Envelope>