In esProc we can perform parallel processing on big text files conveniently. The following example will illustrate the method.
There is a text file, sales.txt, with ten million sales records. Its main fields include SellerID, OrderDate and Amount. Now compute each salesman’s total amount of big orders in the past four years. The big orders refer to those whose amount is above 2,000.
To do parallel processing, first we must segment the file. esProc provides cursor data object and the related functions to segment and read big text files conveniently. For example file(“e:/sales.txt”).cursor@tz(;, 3:24) means the file is divided roughly into 24 segments by bytes, and the third one will be fetched. Dividing a file simply by bytes will produce incomplete rows, or incomplete records, which need to be processed with additional program. If splitting the file by rows, we have to traverse all the rows while handling one segment of data, which cannot attain a high efficiency expected from parallel processing based on segments. In contrast, esProc automatically rounds off each segment by skipping the incomplete beginning row and completing the ending one, thus ensuring the correctness of the data.
Then we simply need to perform the parallel processing after the file is segmented. Like the following code shows:
Main Program
|
A |
B |
1 |
24 | |
2 |
fork to(A1) | =file(“D:\\sales.txt”).cursor@tz(;,A2:A1) |
3 |
=B2.select(Amount>2000 && year(OrderDate)>=2011) | |
4 |
=B3.groups(SellerId;sum(Amount):sAmount) | |
5 |
result B4 | |
6 |
=A2.merge() | |
7 |
=A6.groups@o(SellerId;sum(sAmount)) |
A1: Set the number of parallel tasks as 24, that is, divide the file into 24 segments.
A2: Perform multithreaded computation. The tasks are represented by to(A1), whose result is [1,2,3…24], which is the serial number of each segment assigned to each thread. When all the threads complete their computations, the results will be found in A2. B2-B5 is the code within the fork statement.
B2: Read the file with cursor. Which segment should be processed in the current task should be decided according to parameter passed from the main thread.
B3: Select records whose order amount is above 2,000 after the year of 2011.
B4: Group and summarize data for the current segment.
B5: Return the result of the current thread to A2, where the main thread resides.
A6: Merge the results of A2’s multithreading tasks. Below is the selection of the result table:
A7: Group and summarize the merged data to get each seller’s sales amount. The result is as follows:
Code explanation
For the CPU with N cores, it appears natural to set N tasks. The fact is that some tasks always run faster than others, say, according to the differences of filtered data. So the common situation is that some cores are in an idle state after finishing those faster tasks, but a few other cores are still in running with slower tasks. Comparatively, if each core performs multiple tasks in turn, the speed of different tasks will be averaged out and the general operation will become more stable. In the above example, therefore, the task is divided into 24 parts and given to CPU’s 8 cores to process (how many parallel threads are allowed at the same time can be configured in the environment of esProc). But, on the other hand, too many segmented tasks may have weaknesses. One is the decline of whole performance, the other is that there will be more computed results produced by each task when added together, and they will occupy more memory.
fork makes development process simpler by encapsulating complex multithread computation, which enables programmers to focus on business algorithm instead of being distracted by complicated semaphore control.
Computed results of A6 in the above main program have been sorted automatically according to Sellerld, so it is not necessary to sort again in A7 for the grouping and summarizing. @o, the function option of groups, can group and summarize data efficiently without sorting.
Further illustration:
Sometimes when the data size of a text file amounts to several TB, it is required to use multi-node parallel computation based on a cluster. esProc can perform parallel computation easily. Because its cursor and related functions support non-expensive scale-out file system and distributed file system (DFS). The code will be like this: =callx(“sub.dfx”, to(A1),A1; [“192.168.1.200:8281”, “192.168.1.201:8281”, ”……”]). See related documents for detailed usage description.