Description

Agenda


Background & Challenge


Motivation & Goal


System Design & Implementation


Paracel in Action


Agenda


Background & Challenge


Motivation & Goal


System Design & Implementation


Paracel in Action


Description

source: http://www.domo.com/learn/data-never-sleeps-2

One Minute at Douban

5+ follows


100+ note activities


200+ movie actions


500+ dongxi purchases


1000+ status activities


10000+ fm records


...




Description
Description

Before Move On

Distributed - Parallel - Optimize


Scalablity T(n, p)


Load Balance


Agenda


Background & Challenge


Motivation & Goal


System Design & Implementation


Paracel in Action


Motivation

Scalability Challenge

  • massive data size
  • massive model size

Performance Challenge

  • The Straggler Problem
  • High-level Communication Interface

Machine Learning Algorithms

  • wide dependency
  • dpark is not that flexible

Goal

Scalability

  • split data
  • split model

High-level Communication Model

  • flexible than mapreduce
  • high-level than mpi

Adaptive to Machine Learning Problems

Easy Programming

  • painless from serial to parallel
  • minimize model transformation cost

Agenda


Background & Challenge


Motivation & Goal


System Design & Implementation


Paracel in Action


Abstraction

Machine Learning Problems

  • optimization problem - min f(x), max g(x)
  • iterative method to solve

    $$ \theta \leftarrow \theta + \delta $$

Parameter Server


Jeff Dean's talk @stanford, 2013



Split Model with a Global Distributed Key-Value Storage



Simple Communication Model


Asynchronous Distributed SGD

Description
source: http://infolab.stanford.edu/infoseminar/archive/WinterY2013/dean.pdf

Asynchronous Distributed SGD

Description
source: http://infolab.stanford.edu/infoseminar/archive/WinterY2013/dean.pdf

Asynchronous Distributed SGD

Description
source: http://infolab.stanford.edu/infoseminar/archive/WinterY2013/dean.pdf

Asynchronous Distributed SGD

Description
source: http://infolab.stanford.edu/infoseminar/archive/WinterY2013/dean.pdf

Asynchronous Distributed SGD

Description
source: http://infolab.stanford.edu/infoseminar/archive/WinterY2013/dean.pdf

Asynchronous Distributed SGD

Description
source: http://infolab.stanford.edu/infoseminar/archive/WinterY2013/dean.pdf

Asynchronous Distributed SGD

Description
source: http://infolab.stanford.edu/infoseminar/archive/WinterY2013/dean.pdf

Asynchronous Distributed SGD

Description
source: http://infolab.stanford.edu/infoseminar/archive/WinterY2013/dean.pdf

Straggler Problem

Predictable stragglers

  • Slow/old machine

  • Bad network card

  • Load unbalance

Unpredictable stragglers

  • Hardware: disk seeks | network | CPU interrupts

  • Software: garbage collection | virtualization

  • Algorithmic: Calculating objectives | stop conditions

Solution

Load Imbalance(little tricky)







Asynchronous Controlling for iterative tasks

  • Bounded Staleness

SSP(Stale Synchronous Parallel)

Description

SSP(Stale Synchronous Parallel)

spend less time waiting for each other


spend less time communicating with parameter server


help slow/straggling workers to catch up


currently only applies to:

  • convex function
  • average objective value
  • opportunity for more research

Architecture Diagram




Description

Implementation - Overview


C++11

Eigen3

MPI

ZeroMQ

Msgpack-C: an increment version

Parasol(prototype) finished at 2013-10-25

First Internel Release at 2014-03-31


Implementation - Partition

paracel_load

  • split input with bytes
  • schedule_load/fix_load
  • exchange data
  • generate rdd/matrix/graph


Load Balance

  • original distribution: hash to avoid

Implementation - Workers & Servers

Client

  • serialize and compress using Msgpack

Server

  • tbl_store in each server
  • multi-thread to handle requests
  • consistent hashing kv messages with key

Rpc/Communication

  • all workers connect to all servers
  • Zeromq: req/rep, pull/push

Implementation - Bounded Staleness(Client)

bool ssp_switch = true; int limit_s > 0; int clock_server = 0;
int stale_cache, clock, total_iters; paracel::dict_type cached_para;
bool paracel_write(const string & k,      void paracel_update(const string & k,
                   const V & v) {                             const V & delta) {
  cached_para[key] = val;                   V v = remote_update(k, delta);
  return remote_push(k, v);                 cached_para[key] = v;
}                                         }
bool paracel_read(const string & k, V & v) {
  if(stale_cache + limit_s > clock) { 
    v = cached_para[k];
  } else { 
    while(stale_cache + limit_s < clock) { stale_cache = pull_int("server_clock"); }
    cached_para[key] = remote_pull<V>(key); val = caced_para[key]; 
  }
}

Implementation - Bounded Staleness(Client)

void set_total_iters(int n) {
  total_iters = n;
}
    
void iter_commit() {
  string clock_key = "client_clock_" + std::to_string(clock % limit_s);
  remote_incr_int(clock_key, 1);
  clock ++;
}
    
...
    

Implementation - Bounded Staleness(Server)

clock server

...

Agenda


Background & Challenge


Motivation & Goal


System Design & Implementation


Paracel in Action


Installation

On Douban's Dev Machine



    xunzhang@balin:/mfs/user/xunzhang$ git clone https://github.com/douban/paracel.git;
    xunzhang@balin:/mfs/user/xunzhang/paracel$ cd paracel; mkdir build;
    xunzhang@balin:/mfs/user/xunzhang/paracel/build$ cd build;
    xunzhang@balin:/mfs/user/xunzhang/paracel/build$ cmake ..; make -j 2; make install
    

Get Started

1. Your class must inherits paralg baseclass

#include "ps.hpp"
class HOF : public paralg {
 public:
  HOF(paracel::Comm comm, string hosts_dct_str, string input, string output, int a) 
      : paralg(hosts_dct_str, comm, output, 1) { this.a = a; }
 private:
  int a;
};
  

Get Started

2. Write a update function

#include "proxy.hpp"
#include "paracel_types.hpp"
int reduce(int a, int b) {
  return a + b;
}
// register your updater into parcel framework
update_result updater = update_proxy(reduce);
  

Get Started

3. Implement the solve member function

Get Started

4. Create a driver

#include "utils.hpp"
#include "wc.hpp"
DEFINE_string(server_info);
DEFINE_string(cfg_file);
int main(int argc, char *argv[])
{
  // init worker environment
  paracel::main_env comm_main_env(argc, argv);
  paracel::Comm comm(MPI_COMM_WORLD);
  // init parameter server environment
  google::SetUsageMessage("[options]\n\t--server_info\n");
  google::ParseCommandLineFlags(&argc, &argv, true);
  
  string input = "/alg/data/*.csv"; string output = "result.txt";
  int a = 7;
  // create a instance
  HOF solver(comm, FLAGS_server_info, input, output, a);
  // solve problems
  solver.solve();
  return 0;
}
  

Get Started

specify LD_LIBRARY_PATH, then use prun.py

Options:
  -h, --help                        show this help message and exit
  -p PARASRV_NUM, --snum=PARASRV_NUM
                                    number of parameter servers
  --m_server=local | mesos | mpi
                                    running method for parameter servers. If not given,
                                    set with the same value of -m or --method
  --ppn_server=PPN_SERVER           mesos case: procs number per node of parameter
                                    servers. If not given, set with the same value of --ppn
  --mem_limit_server=MEM_LIMIT_SERVER 
                                    mesos case: memory size of each task in parameter
                                    servers. If not given, set with the same value of --mem_limit
  --hostfile_server=HOSTFILE_SERVER mpi case: hostfile for mpirun of parameter servers
  -w WORKER_NUM, --wnum=WORKER_NUM  number of workers for learning
  -m local | mesos | mpi, --method=local | mesos | mpi
                                    running method for workers
  --ppn=PPN                         mesos case: procs number per node for workers
  --mem_limit=MEM_LIMIT             mesos case: memory size of each task of workers
  --hostfile=HOSTFILE               mpi case: hostfile for mpirun for workers
  -c CONFIG, --cfg_file=CONFIG      config file in json fmt, for alg usage
  

Word Count

class WordCount : public paralg {
 private:
  vector<pair<string, int> > result;
};

void WordCount::learning(const vector<string> & lines) {
  for(auto & line : lines) {
    auto word_lst = parser(line);
    for(auto & word : word_lst) {
      paracel_bupdate(word, 1,
                      "/local/lib/libwc_update.so",
                      "wc_updater");
    } // word_lst                                     
  } // lines                                        // update function
  sync();                                           int local_update(int a, int b) {
  paracel_read_topk(topk, result)                     return a + b;
}                                                   }

virtual void WordCount::solve() {                   paracel::update_result wc_updater = 
  auto lines = paracel_load(input);                   paracel::update_proxy(local_update);
  sync();
  learning(lines); 
  sync();
}
  

Logistic Regression

void logistic_regression::solve() {
  auto lines = paracel_load(input); local_parser(lines); sync(); // init data
  set_total_iters(rounds); 
  dgd_learning();
}

void logistic_regression::dgd_learning() {
  theta = paracel::random_double_list(data_dim); 
  paracel_write("theta", theta); // init push
  for(int rd = 0; rd < rounds; ++rd) {
    for(int i = 0; i < data_dim; ++i) { delta[i] = 0.; }
    random_shuffle(idx.begin(), idx.end()); 
    theta = paracel_read<vector<double> >("theta"); // pull thera
    for(auto sample_id : idx) {
      for(int i = 0; i < data_dim; ++i) {
        delta[i] += coff1 * samples[sample_id][i] - coff2 * theta[i];
      }
    } // traverse
    paracel_bupdate("theta", delta, "update.so", "lg_theta_update"); // update with delta
    iter_commit();
  }
  theta = paracel_read<vector<double> >("theta"); // last pull
}
  

Logistic Regression

// filename: update.so
// funcname: lg_theta_update
vector<double> local_update(const vector<double> & a, 
                            const vector<double> & b) {
  vector<double> r;
  for(size_t i = 0; i < a.size(); ++i) {
    r.push_back(a[i] + b[i]);
  }
  return r;
}

paracel::update_result lg_theta_update = paracel::update_proxy(local_update);
  

Pagerank

$$ pagerank(A) = \frac{1-d}{N} + d \times \sum_{it \in inlink} \frac{pr(it)}{out-degree(it)} $$

Description

Pagerank in Mapreduce

Description

Pagerank in Mapreduce

Description

Pagerank in Pregel

class PageRankVertex : public Vertex<double, void, double> {
  virtual void compute(MessaeIterator *msgs) {
    if(superstep() >= 1) {
      double sum = 0.;
      for(; !msgs->Done(); msg->Next()): sum += msg->Value();
      *MutableValue = (1 - d) / NumVertices() + d * sum;
    } // if
    if(superstrp() < 30) {
      const int64 n = GetOutEdgeIterator().size();
      SendMessageToAllNeighbors(GetValue() / n);
    } else {
      Vote2Halt();
    }
  } // compute
};
  

Pagerank in Paracel

Description

Paracel So Far

10+ algorithms, 15+ users


User applications:


  • Douban FM: DSGD(逆磷)
  • Douban FM: Track Factor Similarites(Jason)
  • Search: Douban Dict(秦平)
  • Plato: Real-Time RecSys Based on Factor-Model(xunzhang)
  • Decision Tree: for new FM user.
  • ...

Reference & Related Work

Communication Efficient Distributed Machine Learning with the Parameter Server, NIPS14

Scaling Distributed Machine Learning with the Parameter Server, OSDI14

Parameter Server for Distributed Machine Learning, Big Learning Workshop of NIPS13

Talk from Jeff Dean, Jan 2013, Stanford

Solving the straggler problem with bounded staleness, HotOS13

More Effective Distributed ML via a Stale Synchronous Parallel Parameter Server, NIPS13

Large Scale Distributed Deep Networks, NIPS12

HOGWILD, NIPS11

MapReduce/Bigtable for distributed optimization, NIPS10 LCCC Workshop

Piccolo: Building Fast, Distributed Programs with Partitioned Tables, OSDI10

Future Work

Fault Tolerance


Data Flow Interface


Streaming Paracel


Conclusion

A Distributed Optimization Framework


Distributed Both Dataset and Parameter Space


SSP to Solve the Straggler Problem


Easy Programming


Try it out at : https://github.com/douban/paracel

Thank You & Questions

About Me