Java : Spark iterate through custom objects

Raj Source

I have below code as part of my program :

finalJoined is of type DataSet<Row>.

RuleParams and RuleOutputParams are java pojo classes with setters and gettes.

I am calling drools rule engine in below code.

List<Row> finalList = finalJoined.collectAsList();
List<RuleOutputParams> ruleOutputParamsList = new ArrayList<RuleOutputParams>();
        Dataset<RuleOutputParams> rulesParamDS = null;
        Iterator<Row> iterator = finalList.iterator();
        while (iterator.hasNext()) {
            Row row = iterator.next();
            RuleParams ruleParams = new RuleParams();
            String outputString = (String) row.get(1);
            // setting up parameters
            System.out.println("Value of TXN DTTM is : " + row.getString(0));
            ruleParams.setTxnDttm(row.getString(0));
            ruleParams.setCisDivision(row.getString(1));
            System.out.println("Division is  : " + ruleParams.getCisDivision());
            ruleParams.setTxnVol(row.getInt(2));
            System.out.println("TXN Volume is  : " + ruleParams.getTxnVol());
            ruleParams.setTxnAmt(row.getInt(3));
            System.out.println("TXN Amount is  : " + ruleParams.getTxnAmt());
            ruleParams.setCurrencyCode(row.getString(4));
            ruleParams.setAcctNumberTypeCode(row.getString(5));
            ruleParams.setAccountNumber(row.getLong(6));
            ruleParams.setUdfChar1(row.getString(7));
            System.out.println("UDF Char1 is : " + ruleParams.getUdfChar1());
            ruleParams.setUdfChar2(row.getString(8));
            ruleParams.setUdfChar3(row.getString(9));
            ruleParams.setAccountId(row.getLong(10));
            kSession.insert(ruleParams);
            int output = kSession.fireAllRules();

            System.out.println("FileAllRules Output" + output);
            System.out.println("After firing  rules");
            System.out.println(ruleParams.getPriceItemParam1());
            System.out.println(ruleParams.getCisDivision());
            // generating output objects depending on the size of priceitems
            // derived.
            System.out.println("No. of priceitems derived : " + ruleParams.getPriceItemCd().size());
            for (int index = 0; index < ruleParams.getPriceItemCd().size(); index++) {

                System.out.println("Inside a for loop");

                RuleOutputParams ruleOutputParams = new RuleOutputParams();

                ruleOutputParams.setTxnDttm(ruleParams.getTxnDttm());
                ruleOutputParams.setCisDivision(ruleParams.getCisDivision());
                ruleOutputParams.setTxnVol(ruleParams.getTxnVol());
                ruleOutputParams.setTxnAmt(ruleParams.getTxnAmt());
                ruleOutputParams.setCurrencyCode(ruleParams.getCurrencyCode());
                ruleOutputParams.setAcctNumberTypeCode(ruleParams.getAcctNumberTypeCode());
                ruleOutputParams.setAccountNumber(ruleParams.getAccountNumber());
                ruleOutputParams.setAccountId(ruleParams.getAccountId());
                ruleOutputParams.setPriceItemCd(ruleParams.getPriceItemCd().get(index));
                System.out.println(ruleOutputParams.getPriceItemCd());
                ruleOutputParams.setPriceItemParam(ruleParams.getPriceItemParams().get(index));
                System.out.println(ruleOutputParams.getPriceItemParam());
                ruleOutputParams.setPriceItemParamCode(ruleParams.getPriceItemParamCodes().get(index));
                ruleOutputParams.setProcessingDate(new SimpleDateFormat("yyyy-MM-dd").format(new Date()));
                ruleOutputParams.setUdfChar1(ruleParams.getUdfChar1());
                ruleOutputParams.setUdfChar2(ruleParams.getUdfChar2());
                ruleOutputParams.setUdfChar3(ruleParams.getUdfChar3());

                ruleOutputParamsList.add(ruleOutputParams);
            }
        }
        System.out.println("Size of ruleOutputParamsList is : " + ruleOutputParamsList.size());
        Encoder<RuleOutputParams> rulesOutputParamEncoder = Encoders.bean(RuleOutputParams.class);
        rulesParamDS = sparkSession.createDataset(Collections.unmodifiableList(ruleOutputParamsList),
                rulesOutputParamEncoder);
        rulesParamDS.show();

I have used while and for loops in the code.

Can this code be rewritten using spark's map , flatmap or forEach functions? How that can be done?

Issue here is Drools rule engine is getting called sequentially. I want to execute it in parallel.

EDIT - As shown in above code I am first converting a DataFrame to a List and then using iterator on same. can I directly use the DataFrame or RDD for my purpose?

javaapache-sparkdrools

Answers

answered 3 months ago Hearen #1

A very simple demo to show the parallelStream and CompletableFuture from my tests.

For parallelStream

int parallelGet() {
    return IntStream.rangeClosed(0, TOP).parallel().map(i -> getIoBoundNumber(i)).sum();
}

For CompletableFuture

int concurrencyGetBasic() {
    List<CompletableFuture<Integer>> futureList = IntStream.rangeClosed(0, TOP).boxed()
            .map(i -> CompletableFuture.supplyAsync(() -> getIoBoundNumber(i)))
            .collect(Collectors.toList());
    return futureList.stream().map(CompletableFuture::join).reduce(0, Integer::sum);
}

For more tutorials you can check Java 8 Tutorial and Java 8 in Action.

answered 3 months ago AbhishekN #2

As mentioned finalJoined is already a DataSet, so no need to collect it onto driver. You can code similar to below:

This is your base dataset with data

DataSet<Row> finalJoined;

Make a new function and pass each row into it. Since you have data already in a partitioned DataSet, it will run in parallel in workers. If not, do finalJoined.Partitions();

finalJoined.forEach(row -> droolprocess(row));

public void droolprocess(Row row){

[put entire code which is within while loop (except iterator) here. it will execute in parallel on workers]

[Pass connection parameters as required or get new connection within here]

}

If you want to get return values from execution of each row,

  • use MAP instead of FOREACH and have another DataSet created for further use. Map on DataSet results into another DataSet.
  • change droolprocess function return type.

comments powered by Disqus