Concept/Terminology
Summation(plus)
Paradigm(kiss | MPI | OpenMP | Mapreduce | Spark)
Why distributed computational framework
Scalability Challenge
One machine with multi-threads | SSD | GPU
Parallel version of ML alg(impl with MPI/pthread/bsp...)
Distributed computing architecture(Mapreduce/Graphlab/Spark/Pregel...)
Scalability(both data size and model size)
Simple communication model
Learning problem(iterative)
Optimized synchronous/straggler-worker problem/last-reducer problem
Minimized model transformation cost
Parameter server
Asynchronous model
Learning problems
$$ \theta \leftarrow \theta + \delta $$
read
write
update
read_all
...
Predictable stragglers
Unpredictable stragglers
phase/clock
limit_s
may get 'stale' para
not too 'stale'
text file | user-defined partition | restricted fmt
one socket per req/rep
k-v tables
replicas | paxos-like strategy
pattern: linesplit(default) | fmap | smap | fsmap | fvec
mix flag
# parasol loader usage
parser = lambda line: line.strip('\n').split()
ld = loader('./*.csv', 'fsmap', parser, False)
lines = ld.load()
graph = ld.create_graph(lines)
matrix = ld.create_matrix(lines)
Client
Server
Multi2multi
estimation | 1/5000.
100-500 procs | 0.90-0.99
1000-3000 procs | 0.54-0.81
ft api(TODO)
parser + pattern + mix_flag
one k-v table
user-defined update | update vs inc
no replicas | alg responsibility
Python
Mpi4py
Zeromq
Msgpack
def h(x, theta):
return e ** np.dot(x, theta) / (1. + e ** np.dot(x, theta))
def log_reg_sgd(x, y, alpha, iter):
m, n = x.shape
theta = np.random.rand(n)
z = np.arragne(m)
for it in xrange(iter):
z = np.random.permutation(z)
for i in z:
theta += alpha * (y[i] - h(x[i], theta)) * x[i]
return theta
class sgd(paralg):
def __init__(self): ...
def sgd_kernel(self):
self.theta = np.random.rand(n)
paralg.write(self, 'theta', self.theta)
m, n = self.sample.shape
self.theta = np.random.rand(n)
z = np.arragne(m)
for it in xrange(iter):
z = np.random.permutation(z)
for i in z:
self.theta = paralg.paralg.read(self, 'theta')
delta = alpha * (y[i] - h(x[i], theta)) * x[i]
self.theta += delta
paralg.paralg_inc(self, 'theta', delta)
self.theta = paralg.paralg_read(self, 'theta')
return theta
class sgd(paralg):
def __init__(self): ...
def sgd_kernel(self):
self.splits = paralg.get_decomp(self)
self.theta = np.random.rand(n)
paralg.write(self, 'theta', self.theta)
m, n = self.sample.shape
self.theta = np.random.rand(n)
z = np.arragne(m)
for it in xrange(iter):
z = np.random.permutation(z)
for i in z:
self.theta = paralg.paralg.read(self, 'theta')
delta = alpha * (y[i] - h(x[i], theta)) * x[i]
self.theta += delta
paralg.paralg_inc(self, 'theta', delta / self.splits)
self.theta = paralg.paralg_read(self, 'theta')
return theta
limit_s = 3
class sgd(paralg):
def __init__(self): ...
def sgd_kernel(self):
self.splits = paralg.get_decomp(self)
self.theta = np.random.rand(n)
paralg.write(self, 'theta', self.theta)
m, n = self.sample.shape
self.theta = np.random.rand(n)
z = np.arragne(m)
for it in xrange(iter):
z = np.random.permutation(z)
for i in z:
self.theta = paralg.paralg.read(self, 'theta')
delta = alpha * (y[i] - h(x[i], theta)) * x[i]
self.theta += delta
paralg.paralg_inc(self, 'theta', delta / self.splits)
paralg.phase_done(self)
self.theta = paralg.paralg_read(self, 'theta')
return theta
class paralg:
def get_nodeid():
def get_nodesize():
def set_phases();
def sync():
def loadinput():
def getlines():
def get_graph():
def paralg_read():
def paralg_write():
def paralg_inc():
def phase_done():
...
batch_read/write/update
jump read/write/update
server 1 | k 50 | netflix_small | 0.005,0.02 | rounds 5 | limit_s 0 | beater7
| rmse | pull/round time | learning/round time | inc/round time | total time | |
|---|---|---|---|---|---|
| 1 | 0.8820 | 9s | 470-514s | 1-2s | 3109s |
| 2 | 0.8435 | 6-16s | 275-310s | 6-9s | 1585s |
| 4 | 0.8796 | 7-15s | 130-140s | 3-5s | 780s |
| 8 | 0.8966 | 7-25s | 49-75s | 3-4s | 455s |
| 16 | 0.9163 | 14-26s | 40-69s | 2-4s | 440s |
server 1 | k 50 | netflix_small | 0.005,0.02 | rounds 5 | worker 16 | beater7
| rmse | total | |
|---|---|---|
| 0 | 0.9163 | 440s |
| 1 | 0.9170 | 422s |
| 3 | 0.9171 | 454s |
| 5 | 0.9147 | 407s |
server scalability
mf-v2 to test limit_s
demo
pi-like(word count) | y
pagerank | graph algorithms | bsp | y
other | cos?
bug fix
C++ version
optimize hash function (load balance & key balance)
ft
A distributed framework for learning algs
Distributed both dataset and parameter space
Ssp to solve last-reducer problem
0 pain from serial to parallel
(0.5,1) - (0, 0.5) - (1,0.5) - (0.5, 1) - (0.5, 0.25) - (0.25, 0.25)
Talk from Jeff Dean, Jan 2013, Stanford
Large Scale Distributed Deep Networks, NIPS12
Solving the straggler problem with bounded staleness, HotOS13
MapReduce/Bigtable for distributed optimization, NIPS10 LCCC Workshop.
contact info