A bite of functional programming and distributed computing

A bite of functional programming and distributed computing

The first function that you learn on your way to the functional programming world is map. It applies a unary/monadic function to each element of a list and returns the list of results.

>>> list(map(sum, [[1,5], [4,2,-2]]))
[6, 4]

In q/kdb+ this is an iterator and called each.

q) sum each (1 5;4 2 -2)
6 4

Iterator each-left (denoted by \:) is something similar and is handy if you have a dyadic function - like concatenation - and you would like to fix the second parameter and pass each element of a list as the first parameter

q) ("Jack"; "Linda"; "Steve") ,\: ", how are you today?"
"Jack, how are you today?"
"Linda, how are you today?"
"Steve, how are you today?"

Combining each with dot (aka. indexing at depth) allows you to iterate over a list of tuples and passing each element of the tuple as a separate parameter to a multivalence function.

// splits a sentence by a separator then takes the nth word
q) splitNtake: {[sep; s; n] vs[sep;s] n}
q) splitNtake[" "; "We are hiring!"; 2]
"hiring!"
q) l: ((","; "foo,bar,baz"; 2); (" "; "hello world"; 0))   // list of triples
q) (splitNtake .) each l
"baz"
"hello"

peach

If you start your q process with secondary threads (by -s command line parameter) on a multi-core computer then you can use function peach instead of each. peach executes the monadic function in parallel. Furthermore, if you have multiple standalone q processes then you can instruct peach to delegate the tasks to the q processes. All you need is to assign the list of process handlers to variable .z.pd. Very simple!

The q processes can live on different hosts and these worker processes can start in multi-threaded mode to leverage the inherent parallelization of q. This is particularly useful in today’s cloud environments where virtual machines (VM) are easy to allocate and VMs access to the same high performant block storage (like Persistent disks in Google Cloud and multi attach-EBS in AWS) or network storage. If you have an end-of-day work then you start up a large pool of hosts with hundreds of q processes to work parallel. Once the work is done you can rid of your infrastructure resources.

Let us assume that you started the same number of q processes on the same port range (variable ports of type string list) on a few machines (variable hosts). You can use function cross to get the cartesian product of hosts and ports

.z.pd: `u#hopen each `$hosts cross ports;

Function peach assigns the tasks sequentially to the processes then maintains a queue and assigns the task to the process that completed first. This algorithm is demonstrated by the following simple script:

# start five standalone worker q processes on ports ranging from 5000 to 5004
$ for i in {5000..5004}; do q -p $i </dev/null &> log-$i.log &; done
$ q -s -5
q) .z.pd: `u#hopen each 5000 + til 5
q) // execute tasks that make the worker process
q) // sleep for a random short time then returns the worker's PID
q) group {system "sleep ", x; .z.i} peach string 20?.1
62643| 0 6 15
62644| 1 10 13 17
62645| 2 9 11 19
62646| 3 7 12 14 18
62647| 4 5 8 16

If the number of tasks is smaller than the number of processes then the cross-based assignment of .z.pd might be inefficient. You may observe that some hosts are sweating and some hosts are just twiddling their thumbs.

Function cross takes the first element of the first list and concatenates it with all elements of the second list. Next, it repeats this with the second element of the first list. So your result looks like host1:port1host1:port2host1:port3, … host2:port1host2:port2host2:port3, …

You need to iterate the other way to get the cartesian product. Fix the port and iterate over the hosts, then take another port and iterate over the hosts again. To achieve this you just need to recall that function cross is semantically equivalent to calling each-right on each-left then flattening the result, i.e.

{raze x,/:\:y}

If you change the order of each-left and each-right, i.e.

.z.pd: `u#hopen each `$raze hosts ,\:/: ports;

then you achieve a more balanced load distribution. Tasks are distributed on the hosts fairly when the input list is short.

Task delegation to processes assumes that the worker q processes are identical and either process is able to execute the task. This is not always the case there might be pools of q processes, each pool having its own responsibility. This is typical with horizontal partitioning of tables when data is distributed into shards therefore each q process has visibility only to a subset of the data. q is famous for its database layer kdb+ that can execute SQL-like queries on on-disk or in-memory tables.

There are high performant network storage options available in many public clouds, however, the best performance is still achieved with locally attached SSDs or with Intel Optane. Queries are often easy to rewrite by employing map-reduce to support horizontal partitioning of the data. To send a task to a specific pool of q workers we can employ two techniques, called one-shot requests and socket sharding.

One-shot requests

The monadic function that runs by peach has certain limitations. It cannot use an open socket to send a message. One-shot messages come to our rescue. A one-shot request opens a connection, sends a synchronous request and closes the connection. In the example below, we send a one-shot request to a q process at myhost:port where dyadic function fibonacci is defined.

q) `:myhost:myport (`fibonacci; 5; 1 1)
1 1 2 3 5 8 13

If you have a map (or a function) that returns a q address for a given task then we can distribute tasks to specific q processes by starting the main q process with multiple threads (-s command line parameter with a positive number). In the example below our table t is horizontally partitioned by date and we would like to get all rows from t for a given date, stock pairs. Variable m maps dates to q addresses.

({m[x] ({select from t where date=x, stock=y}; x; y)}. ) peach flip (2021.01.26 2020.02.24 2018.09.20; `GOOG`IBM`MSFT)

Now, let’s scale further and have a pool of processes instead of a solitary q process.

Socket sharding

Socket sharding on Linux boxes allows multiple q processes to use the same port. Simply prepend literal rp, to the port number. The Linux kernel takes care of distributing the task to the processes. The kernel tries to evenly distribute the task but it doesn’t do it as efficiently as q. It can easily assign a task to a busy process while other processes are free. This is demonstrated by the following code.

$ for i in {1..5}; do q -p rp,5000 </dev/null &> log-$i.log &; done
$ q -s 5
q) group {`::5000 ({system "sleep ", x; .z.i}; x)} peach string 20#.1
64683| 0 2 6 7 11 15
64686| 1 3 10
64684| 4 5 12 14 16 18 19
64685| 8 13
64687| 9 17

We can see that processes with PIDs 64683 and 64686 received the first two tasks and the third task was assigned again to 64683 although three q processes were free and waiting for work to do.

To summary, parallel one-shot requests with socket sharding fall behind the .z.pd- based approach in two aspects. First, every request has the extra cost of opening and closing a connection. Second, q makes sure that it assigns a task to a free process if there is any. Linux kernel does not guarantee this efficiency. On the other hand .z.pd-based approach has a limitation that all worker processes are handled uniformly.

To scale from a pool of q workers on the same host to a pool on multiple hosts we can use TCP load balancers offered by all cloud providers. You don’t need any development to scale your infrastructure. Furthermore, we can make use of the autoscaling feature of the load balancers that starts up new hosts with pools of q processes under heavy load. All these do not require writing a single line of q code.

Alexander Clouter

Director at coreMem Limited

4 年

"To summary, ... Linux kernel does not guarantee this efficiency." I would call you out on 'efficiency' here :) When a process is 'hot' having it go to sleep and make the kernel wake up another process to service a pending request actually reduces throughput and increases latency; caching, context switches, etc What I think you are observing is q indicating idleness by blocking on the select() syscall which for your example is effectively asking the kernel "got anything for me to service?" Whilst busy, as q is single threaded, it is does not blocked on select() which gives the other processes meanwhile blocked select() the chance to service the listening socket. I suspect here the 'hot' active process is calling select() and immediately being informed there is more traffic ('strace -T -e select -p ...' shows some very fast returning) so making it easier to just leave the other processes sleeping. You should see a more even distribution if you can influence the select() timeout to be zero and spin the cpu on the select() as your idle loop; though context switches now probably will get in the way... A worthwhile read is https://idea.popcount.org/2017-01-06-select-is-fundamentally-broken/ and https://blog.cloudflare.com/the-sad-state-of-linux-socket-balancing/ as well as digging around processes using strace (and eBFP); especially when source code is not available.

András G. Békés

Senior Software Engineer at KX

4 年

I was wondering why socket sharding appears so clumsy in this test, so I made some others that more resemble to real workload that Linux socket sharding was designed for. I started more servers (12 on 12 cores), more clients (48 secondary threads) and more tasks and larger tasks each. Ideally I should have made thousands of clients coming from different hosts, but this looks acceptably even: q)count each group {`::5000 ({sum til x; .z.i}; x)} peach 12000#20000000 69956| 998 69955| 972 69958| 982 69957| 923 69963| 1056 69962| 1015 69966| 972 69965| 1037 69959| 1011 69964| 1024 69960| 1012 69961| 998 I was expecting almost identical amount of work per process, but this isn't bad. Obviously the real difference between socket sharding and a real kdb+ loadbalancer is when the requests are significantly different in runtime and when we're not interested in the total runtime but the individual execution times instead. When all requests are from the same client, we're probably interested in total time, and that's not very bad.

回复
András G. Békés

Senior Software Engineer at KX

4 年

"A one-shot request opens a connection, sends an asynchronous request" It's a synchronous request. BTW a very nice and useful article!

回复

要查看或添加评论,请登录

Ferenc Bodon Ph.D.的更多文章

  • Adding 42 to a list

    Adding 42 to a list

    DISCLAIMER. This is a work of fiction.

    16 条评论
  • Data Analysis by Example in Python, BigQuery and Q

    Data Analysis by Example in Python, BigQuery and Q

    In this article, I take a simple, real-life problem and analyze different solutions in Python, BigQuery and Q. The…

    12 条评论
  • Lists - Python and Q side-by-side

    Lists - Python and Q side-by-side

    Python and Q are two world-class, dynamic programming languages with many similarities and some interesting…

    8 条评论
  • Python for data analysis… is it really that simple?!?

    Python for data analysis… is it really that simple?!?

    Python is a popular programming language that is easy to learn, efficient and enjoys the support of a large and active…

    16 条评论

社区洞察

其他会员也浏览了