java – How to parallelize nested loops with Stream

Question:

Learning Java 8, I discovered streams. I decided to try to parallelize long enough nested loops, but I ran into a problem – I don’t know how yet.

This is what the sequential code looks like:

long total = 0;
BitVector vector = getVector(); // из библиотеки import cern.colt.bitvector.*
for (int index = 0; index < vector.size(); index++){
    if (vector.get(index)){
        for (TrafficData trafficData : trafficDataList){

            int visitorTrafficDataIndex = getVisitorTrafficDataIndex(index, trafficData);

            if (visitorTrafficDataIndex >= 0){
                totalTrafficImpressions += (long)(trafficData.getVolume()[visitorTrafficDataIndex]);
            }
        }
    }
}

I thought of doing something like this:

long total = IntStream.range(0, vector.size()).parallel()
        .filter(index -> vector.get(index))
        .mapToLong(index -> 
            trafficDataList.stream()
                .map(trafficData -> getVisitorTrafficDataIndex(index, trafficData))
                .filter(visitorTrafficDataIndex -> visitorTrafficDataIndex >= 0)
                .mapToLong(visitorTrafficDataIndex -> (long)(trafficData.getVolume()[visitorTrafficDataIndex]) // здесь уже нет объекта использовать trafficData 
                .sum())
        ).sum();

The problem is it doesn't compile ..

Answer:

If I understand your problem correctly, then you need to do something like this:

long total = IntStream.range( 0, vector.size() ).parallel().filter( index -> vector.get( index ) )
        .mapToLong( index -> trafficDataList.stream().parallel().map( trafficData -> {
            int visitorTrafficDataIndex = getVisitorTrafficDataIndex( index, trafficData );
            if (visitorTrafficDataIndex >= 0) {
                return trafficData.getVolume()[visitorTrafficDataIndex];
            }
            return 0;
        } ).sum() ).sum();

You won't be able to do it the way you want (everything on streams), because after you map(trafficData -> getVisitorTrafficDataIndex(index, trafficData)) there is no trafficData object in your trafficData , it (stream) is just a collection of int.

Scroll to Top