esProc Parallel Computing: Data Redundancy

Course 1355 0

esProc Parallel Computing: Cluster Computing explains how to use cluster computing to process massive data or handle tasks involving a large amount of computation.

1. The problem with parallel computing

Parallel computing requires that the dfx file to be invoked be stored in all nodes, and that the data files the subtasks need be copied to the corresponding nodes. Since you can’t know each node’s execution environment in advance, you need to use call@a() to distribute the task to the capable nodes in order.

Below is an example cited by esProc Parallel Computing: Cluster Computing. The cellset file CalcStock2.dfx is used to obtain the transaction data of six stocks in specified years, as shown below:

  A B
1 >output@t(“calc begin, year: “+string(arg1)+”. “) =file(“StockRecord”+string(arg1)+”.txt”)
2 if !(B1.exists()) end “File of year “+string(arg1)+” is not found!”
3 =B1.cursor@t() [124051,128857,131893,136760,139951,145380]
4 =A3.select(B3.pos(SID)>0).fetch() =A4.groups(SID;count(~):Count,max(Closing):Maximum,min(Closing):
Minimum,sum(Closing):Sum)
5 >output@t(“calc finish, year: “+string(arg1)+”. “) result B4

Transaction data of different years is stored in different files. For example, data of 2010 is stored in StockRecord2010.txt. In this cellset file, arg1 is a parameter for passing into the year.

The 5 files the computation needs are stored in 3 servers, whose addresses and port numbers are: Server A (192.168.0.86:4001), Server B (192.168.0.66:4001) and Server C (192.168.0.86:4004). Refer to esProc Parallel Computing: Servers to learn more about the configuration and the launch of servers. Server A has the data of 2010 and 2014; Server B has the data of 2011 and 2013; Server C has the data of 2012. Each subtask obtains the transaction data of the specified years. output@t function is used to output the time and prompt information at the beginning of the computation and before the result is returned.

Since each data file is stored only in a certain server, it needs to be handled by this server. In case of inappropriate task distribution, in CalcStock2.dfx A2 judges whether the required data file is in place or not. If the file is not there, B2 returns the result using end to redistribute the task.

Usually the invocation operation distributes the task among servers in order. The main program performs the cluster computation:

  A
1 [192.168.0.86:4001,192.168.0.66:4001,192.168.0.86:4004, 192.168.0.66:4001,192.168.0.86:4001]
2 [2010,2011,2012,2013,2014]
3 =callx@a(“CalcStock2.dfx”,A2;A1)
4 =A3.merge(SID).groups(SID;sum(Count):Count,max(Maximum):Maximum,min(Minimum):Minimum,sum(Sum):Sum)
5 =A4.new(SID,Count,Maximum,Minimum,round(Sum/Count,2):Average)

A3 performs cluster computation using callx@a, specifying appropriate servers to compute according to the parameters. By doing so, the subtasks will compute the transaction data of the six stocks in each year, and then the main program will summarize the results to get the final result: 

esProc_parallel_data_redundancy_3

One thing you need to know is that this mode of parallel computing has a potential problem, which is that there’s something wrong with a node will affect the operation of the whole task. For example, a node with poor performance and slow computation will slow down the general computational speed a lot; or if a node goes down, or the network fails, or errors, such as damage to data files, occur, the computation will fail.

If Server B stops due to malfunction in the above computation:

esProc_parallel_data_redundancy_4

Errors will occur to the main program that performs the cluster computation but cannot find Server Ⅲ.

esProc_parallel_data_redundancy_5

Another way of performing cluster computation is to distribute the task without specifying subtasks for nodes, thus redistribution will take effect when a node malfunctions.

  A
1 [192.168.0.86:4001,192.168.0.66:4001,192.168.0.86:4004]
2 [2010,2011,2012,2013,2014]
3 =callx(“CalcStock2.dfx”,A2;A1)
4 =A3.merge(SID).groups(SID;sum(Count):Count,max(Maximum):Maximum,min(Minimum):Minimum,sum(Sum):Sum)
5 =A4.new(SID,Count,Maximum,Minimum,round(Sum/Count,2):Average)

In this way, the task won’t be interrupted because of the absence of a server. However, another problem arises with the computation in this example:

esProc_parallel_data_redundancy_7

The problem is that the data file of 2012 is only stored in Server C, whose malfunction, if there is, will cause the failure of the task. Yet this can be avoided if there are copies of the data file StockRecord2012.txt in both Server A and Server B, enabling the computation to be handled properly without specifying servers for subtasks. Such condition of storing one data file on multiple servers to increase the stability of the computation is data redundancy.

2. Data redundancy solution

Usually in a solution of handling data with parallel processing each data file is stored on one machine, which is the only one for handling this part of data. If the machine malfunctions or the data file is damaged, the computation will fail. With the existence of data redundancy, each data file will be held in two or more separate nodes and one same piece of task can be handled by multiple nodes, so the handling of the whole task will not be affected because of the problem with a server or a data file. This makes the parallel computation more fault-tolerant and more stable.

If every node is holding all the data files, then it can handle any subtask; thus one node is enough to deal with the computation. This is the safest way. As usually many data files are needed in the cluster computation, the storage usage in this kind of solution would be heavy. Whereas the fewer the copies of the data files, the less steady the computation. Therefore an actual solution of data redundancy needs a flexible control on the degree of redundancy.

For example, you can change the storage method of the stock files in the above computation by storing the data of 2010, 2011, 2012 and 2013 in Server A, the data of 2012, 2013 and 2014 on Server B and the data of 2014, 2010 and 2011 on Server C. After the modification, each data file is held in two servers. Note that it doesn’t mean with this method that you can find the required data in any node, so you still need to judge in each subtask whether the required file exists.

With data redundancy, the callx() function used by the main program should not use the @a option. The result of invoking the cluster computation by the main program is the same as that of performing the cluster computation without specifying servers for subtasks:

  A
1 [192.168.0.86:4001,192.168.0.66:4001,192.168.0.86:4004]
2 [2010,2011,2012,2013,2014]
3 =callx(“CalcStock2.dfx”,A2;A1)
4 =A3.merge(SID).groups(SID;sum(Count):Count,max(Maximum):Maximum,min(Minimum):Minimum,sum(Sum):Sum)
5 =A4.new(SID,Count,Maximum,Minimum,round(Sum/Count,2):Average)

When all servers operate properly, computation with main program invocation is similar to the normal cluster computation explained in esProc Parallel Computing: Cluster Computing. With more data files stored in each server, the first solution is able to handle more computational task and is more likely to succeed in distributing or redistributing the task.

The use of data redundancy is able to ensure the accomplishment of the computation even if any server stops working. For example, you can try performing the computation by stopping Server B. From the server’s system information output window, you can see how the task is distributed and executed:

esProc_parallel_data_redundancy_9

esProc_parallel_data_redundancy_10

esProc_parallel_data_redundancy_11

As you can see, the task is distributed to three nodes. As Server B hasn’t been launched, it can’t do the job. An error message is reported and returned after Server A is found that it can’t handle the data of 2014. The same thing happens to Sever C, which doesn’t have the data of 2012 and 2013 but is given the task of handling the data of 2013, and the subtask is redistributed to Server A. Through this arrangement, the redundant data can be made full use during parallel computing, even if not all servers work well or some data files are damaged.

With the data redundancy solution, if a node can’t find the data file it needs, end s command can be used to return the error message. This action produces little computation loss and won’t have noticeable impact on the performance of the parallel processing.

3. Data redundancy for external storage partitioning

You can create partitions on a node during cluster computation for storing data files, which can then be found according to the partition names across the server cluster. Multiple node servers are allowed to have partitions with the same names and in one partition multiple data files can exist. Generally partitions with the same names contain the same data files. According to the preceding section, storing copies of one file on multiple node servers increases computing stability. Therefore the partition scheme usually adopts the data redundancy solution by placing copies of one file in partitions with the same names in separate node servers.

esProc Parallel Computing: Servers discusses how to configure and use the parallel servers in general conditions. To store data files through partitioning, you need to set the partitions in the configuration files for parallel servers in order. Here we still use Server A (192.168.0.86:4001), Server B (192.168.0.66:4001) and Server C (192.168.0.86:4004) for an illustration. This time external partitioning is used to store the stock data in partitions with the following names – z2010, z2011, z2012, z2013 and z2014. Server A has four partitions – z2010, z2011, z2012, z2013; Server B has three partitions – z2012, z2013, z2014; and Server C also has three partitions – z2014, z2010, z2011.

First, define the configuration files for the three servers respectively:

Server A:

<?xml version=”1.0″ encoding=”UTF-8″?>

<Server>

           <TempTimeOut>600</TempTimeOut>

           <Interval>6</Interval>

           <ProxyTimeOut>30</ProxyTimeOut>

           <UnitList loggerProperties=”raqsoftLog.properties”>

                    <Unit host=”192.168.0.86″ port=”4001″ nodeTask=”4″ callxTask=”2″/>

           </UnitList>

                  <Partitions>

                            <Partition name=”z2010″ path=”D:/files/data/unit01/2010″ />

                            <Partition name=”z2011″ path=”D:/files/data/unit01/2011″ />

                            <Partition name=”z2012″ path=”D:/files/data/unit01/2012″ />

                            <Partition name=”z2013″ path=”D:/files/data/unit01/2013″ />

                  </Partitions>

</Server>

Server B:

<?xml version=”1.0″ encoding=”UTF-8″?>

<Server>

           <TempTimeOut>600</TempTimeOut>

           <Interval>6</Interval>

           <ProxyTimeOut>30</ProxyTimeOut>

           <UnitList loggerProperties=”raqsoftLog.properties”>

                    <Unit host=”192.168.0.66″ port=”4001″ nodeTask=”4″ callxTask=”1″/>

           </UnitList>

                  <Partitions>

                            <Partition name=”z2012″ path=”G:/files/unit2/2012″ />

                            <Partition name=”z2013″ path=”G:/files/unit2/2013″ />

                            <Partition name=”z2014″ path=”G:/files/unit2/2014″ />

                  </Partitions>

</Server>

Server C:

<?xml version=”1.0″ encoding=”UTF-8″?>

<Server>

           <TempTimeOut>600</TempTimeOut>

           <Interval>6</Interval>

           <ProxyTimeOut>30</ProxyTimeOut>

           <UnitList loggerProperties=”raqsoftLog.properties”>

                    <Unit host=”192.168.0.86″ port=”4004″ nodeTask=”4″ callxTask=”4″/>

                   </UnitList>

                  <Partitions>

                            <Partition name=”z2014″ path=”D:/files/data/unit02/2014″ />

                            <Partition name=”z2010″ path=”D:/files/data/unit02/2010″ />

                            <Partition name=”z2011″ path=”D:/files/data/unit02/2011″ />

                  </Partitions>

</Server>

Then execute datastore.exe (or datastore.sh under Linux), after servers are started, to enable the data store service, as shown by the following window:

esProc_parallel_data_redundancy_12

Here are the partitions set aside for storing data files in each server. The newly-created partitions are empty without data files. Then right-click on each partition to upload files:

esProc_parallel_data_redundancy_13

Apart from uploading files through the data store service, you can copy files into the corresponding folders. If there are files already existing in a partition, you can see them in the list when the data store service is launched. You can see this after data files are uploaded to the corresponding partitions:

esProc_parallel_data_redundancy_14

Now synchronize the files between partitions. First synchronize files of Server A to other servers:

esProc_parallel_data_redundancy_15

Next synchronize files of Server B to other severs. Thus all the data files are in place in the partitions:

esProc_parallel_data_redundancy_16

To use the file in each partition in the program, you need to use file(fn,z,h), in which fn is the file name, z the partition name and h the server list. For example:

  A B C
1 [192.168.0.86:4001,192.168.0.66:4001,
192.168.0.86:4004]
[2010,2011,2012,2013,2014]  
2 [124051,128857,131893,136760,139951, 145380] []  
3 for B1 =”StockRecord”+string(A3)+”.txt” =”z”+string(A3)
4   =file(B3,C3,A1) =B4.cursor@t()
5   =C4.select(A2.pos(SID)>0) =B5.fetch()
6   =C5.groups(SID;count(~):Count, max(Closing):Maximum, min(Closing):Minimum,
sum(Closing):Sum)
>B2=B2|B6
7 =B2.groups(SID;sum(Count):Count,
max(Maximum):Maximum, min(Minimum):Minimum, sum(Sum):Sum)
=A7.new(SID,Count,Maximum,Minimum, round(Sum/Count,2):Average)  

In the above code, A3 loops through each year’s data, B3 and C3 respectively get the file name and the partition name for the current year. B4 accesses the file from the partition. It’s not special to handle files stored in partitions. So C4 generates a cursor with the file. B5 retrieves the stock records with specified codes. B6 gets the statistics for the current year and C6 concatenates the result to B2.

A7 calculates the necessary data based on each year’s stock records combined. B7 gets the final stock statistics required:

esProc_parallel_data_redundancy_18

The program will first search local data files according to the specified file names and partition names during the computation. If the target file isn’t found, it will then search the server list. During a remote search, the program will find an available node according to the status of the execution in every service. Here the utilization of data redundancy solution ensures that the computation will be accomplished smoothly, even if a node corrupts. 

FAVOR (0)
Leave a Reply
Cancel
Icon

Hi,You need to fill in the Username and Email!

  • Username (*)
  • Email (*)
  • Website