Saturday, 31 August 2013

Thread safety issue while timing out the thread for each bundle

Thread safety issue while timing out the thread for each bundle

I am working on a project in which I will have different Bundles/Models.
Let's take an example, Suppose I have 4 bundles and each of those bundles
will have a method name process.
Below are the things, I am supposed to do-
I need to call all those 4 Bundles process method in parallel using
multithread and process method in each bundle will return me a map and
then write this map into the database in that same thread or whatever is
the best approach to do (I am not sure on this which is the right way to
go).
And also I want to have some sort of timeout feature enabled at the thread
level. Meaning if any Bundle is taking lot of time to execute, then that
Bundle thread should get timeout and log as an error stating that this
particular bundle got timeout bcoz it was taking lot of time.
The following attempt that I have done is most probably flawed and error
handling is by no means complete. Can anybody guide me what I am supposed
to do in the error handling cases as well?
Below is my method which will call process method of all the bundles in a
multithreaded way.
public void processEvents(final Map<String, Object> eventData) {
ExecutorService pool = Executors.newFixedThreadPool(5);
List<ProcessBundleHolderEntry> entries = new
ArrayList<ProcessBundleHolderEntry>();
Map<String, String> outputs = (Map<String,
String>)eventData.get(BConstants.EVENT_HOLDER);
for (BundleRegistration.BundlesHolderEntry entry :
BundleRegistration.getInstance()) {
ProcessBundleHolderEntry processBundleHolderEntry = new
ProcessBundleHolderEntry(entry, outputs);
entries.add(processBundleHolderEntry);
}
try {
List<Future<Object>> futures = pool.invokeAll(entries, 30,
TimeUnit.SECONDS);
for (int i = 0; i < futures.size(); i++) {
// This works since the list of future objects are in the
// same sequential order as the list of entries
Future<Object> future = futures.get(i);
ProcessBundleHolderEntry entry = entries.get(i);
if (!future.isDone()) {
// log error for this entry
}
}
} catch (InterruptedException e) {
// handle this exception!
}
}
Secondly, an implementation of Callable for your threads:
public class ProcessBundleHolderEntry implements Callable {
private BundleRegistration.BundlesHolderEntry entry;
private Map<String, String> outputs;
public ProcessBundleHolderEntry(BundleRegistration.BundlesHolderEntry
entry, Map<String, String> outputs) {
this.entry = entry;
this.outputs = outputs;
}
public Object call() throws Exception {
final Map<String, String> response =
entry.getPlugin().process(outputs);
// write to the database.
System.out.println(response);
return response;
}
}
Can anyone tell me whether there is any problem with the above approach or
is there any better and efficient way of doing the same thing? I am not
sure whether there is any thread safety issue as well.
Any help will be appreciated on this.

No comments:

Post a Comment