项目作者: dkubek

项目描述 :
Final task for NSWI106 - Linux Administration course
高级语言: Python
项目地址: git://github.com/dkubek/NSWI106-screencast.git
创建时间: 2021-02-05T16:59:06Z
项目社区:https://github.com/dkubek/NSWI106-screencast

开源协议:

下载


NSWI106-screencast

Introduction

Final task for the NSWI106 - Linux Administration course. Setup of Apache spark
cluster using Vagrant and Ansible.

What is Vagrant?

Vagrant is a tool for building and managing virtual machine environments in a
single workflow. With an easy-to-use workflow and focus on automation,
Vagrant lowers development environment setup time, increases production
parity, and makes the “works on my machine” excuse a relic of the past.

Vagrant provides easy to configure, reproducible, and portable work
environments built on top of industry-standard technology and controlled by a
single consistent workflow to help maximize the productivity and flexibility
of you and your team.

Source: Vagrant intro

What is Ansible?

Ansible is an open-source software for provisioning, configuration
management, and application-deployment tool enabling infrastructure as code.

Source: Ansible wiki)

What is Spark?

Apache Spark is a unified analytics engine for large-scale data processing.
It provides high-level APIs in Java, Scala, Python and R, and an optimized
engine that supports general execution graphs. It also supports a rich set of
higher-level tools including Spark SQL for SQL and structured data
processing, MLlib for machine learning, GraphX for graph processing, and
Structured Streaming for incremental computation and stream processing.

Source: Spark overview

Installation

In this screencast we will be using a Linux environment. However, the tools we
use are relatively OS agnostic, so there shouldn’t be many problems applying
these steps to a different architecture.

Firstly, we will have to install the required tools, i.e. vagrant and
ansible. Both tools are quite popular and should be available in the
repositories every major distributions package manager. Otherwise, please
refer to their respective install pages.

As a default hypervisor Vagrant uses VirtualBox, we however want to utilize
the native linux KVM through libvirt. Vagrant unfortunately doesn’t support
it out-of-the box, but fortunately is the vagrant-libvirt plugin available.
Installing it can be as simple as:

  1. > vagrant plugin install vagrant-libvirt

Note, that this is a third-party solution and the above step may not work
correctly in every environment. Please consult the vagrant-libvirt
installation guide

If you wish to use a different hypervisor you are free to do so. As you will
see switching switching a hypervisor in Vagrant is trivial.

Architecture

We will setup a simple Spark cluster with one master (control) node and
several worker nodes all connected and managed by the master. All will be
connected on the same network.

We will assign IP addresses as follows

node IP
master 192.168.50.10
worker-01 192.168.50.11
worker-02 192.168.50.12
worker-XX 192.168.50.(10 + XX)

For cluster management we will use the Spark in the simple Standalone
mode. To read more about cluster management options, please refer to the
Spark Cluster Mode Overview

Vagrant setup

We create a new directory named spark_example where we will store our
setup.

For our purposes, we will setup our virtual machines using vagrant. We
will use the debian/buster64 box (virtual machine image) provided by the
community. More boxes can be found on the official Vagrant
site
.

To create the initial configuration file we can simply run

  1. > vagrant init debian/buster64
  2. A `Vagrantfile` has been placed in this directory. You are now
  3. ready to `vagrant up` your first virtual environment! Please read
  4. the comments in the Vagrantfile as well as documentation on
  5. `vagrantup.com` for more information on using Vagrant.

This will create a new file named Vagrantfile in the current working
directory. This file contains a lot of comments with tips and example
configurations.

Vagrant uses ruby for writing configuration files, but for
basic setups the knowledge of ruby is not required.

You can now run vagrant up and vagrant will download the requested box
for you and start up the VM. You can access the VM through SSH with the
vagrant ssh command. If you are done using your machine, you can turn it
off with vagrant halt or save its state with vagrant suspend. If you
wish to permanently delete your machines it use vagrant destroy.

spark_example/Vagrantfile

We will be starting with the following:

  1. # frozen_string_literal: true
  2. # -*- mode: ruby -*-
  3. # vi: set ft=ruby :
  4. VAGRANTFILE_API_VERSION = '2'
  5. BOX_IMAGE = 'debian/buster64'
  6. Vagrant.configure(VAGRANTFILE_API_VERSION) do |config|
  7. config.vm.box = BOX_IMAGE
  8. end

As you can see, the amount of configuration required to setup a VM with Vagrant
is minimal.

Multi-machine setup

Next, we will setup a multi-machine environment connected using a private
network. With Vagrant we are able to do that with only few lines of code.

Every time you plan to make a bigger change to your configuration be sure to
run vagrant destroy -f to delete your machines first. Sometimes vagrant can
start from a changed configuration with only vagrant reload, but in the
early testing stages it is much easier to create your VMs from scratch.

We will create a multi-machine environment with the following configuration:

  1. # frozen_string_literal: true
  2. # -*- mode: ruby -*-
  3. # vi: set ft=ruby :
  4. VAGRANTFILE_API_VERSION = '2'
  5. # How many slave nodes we want to create
  6. WORKER_COUNT = 4
  7. # Default box to use
  8. BOX_IMAGE = 'debian/buster64'
  9. # Default starting address
  10. IP_START = 10
  11. Vagrant.configure(VAGRANTFILE_API_VERSION) do |config|
  12. # Create the master node
  13. config.vm.define 'master', primary: true do |subconfig|
  14. subconfig.vm.box = BOX_IMAGE
  15. # Connect master to a private network
  16. subconfig.vm.network :private_network, ip: "192.168.50.#{IP_START}"
  17. # Set hostname for master
  18. subconfig.vm.hostname = 'master'
  19. end
  20. # Create the worker nodes
  21. (1..WORKER_COUNT).each do |machine_id|
  22. # Name of the node in format "worker-01", "worker-32", etc.
  23. box_name = "worker-" + machine_id.to_s.rjust(2, "0")
  24. # IP address of the worker
  25. box_ip_address = "192.168.50.#{machine_id + IP_START}"
  26. # Setup one slave node
  27. config.vm.define box_name do |subconfig|
  28. subconfig.vm.box = BOX_IMAGE
  29. # Connect a worker to a private network
  30. subconfig.vm.network :private_network, ip: box_ip_address
  31. # Set worker hostname
  32. subconfig.vm.hostname = box_name
  33. end
  34. end
  35. end

Here, we did a number of things.

  1. Because we wanted to extend the configuration the master node, we defined
    a new node 'master' and started a new configuration block:

    1. # Create the master node
    2. config.vm.define 'master', primary: true do |subconfig|
    3. # ... rest of the configuration
    4. end

    Here we can use the subconfig variable and all the options will be tied
    only with the master node.

    Also note that we defined this node with primary: true. We will get to that
    shortly.

  2. We connected the node to a private_network and assigned it the
    192.168.50.10 IP address with the line:

    1. # Connect master to a private network
    2. subconfig.vm.network :private_network, ip: "192.168.50.#{IP_START}"

    We used to ruby string interpolation syntax #{}.

    We also assigned the node a unique hostname:

    1. # Set hostname for master
    2. subconfig.vm.hostname = 'master'
  3. Next, we added the worker nodes with the following loop

    1. (1..WORKER_COUNT).each do |machine_id|
    2. # ... worker configuration
    3. end

    Note that the standard ruby for loop does not work. See looping over VM definitions.

    The rest of the configuration is similar to the configuration of the master
    node.

You can now vagrant up to create your machines. Note that there is a small
change in some of vagrant commands in multi-machine setups. For example to
vagrant ssh into a machine you have to explicitly specify the machine name
as defined in the vagrant configuration. So vagrant ssh worker-01 would
connect you to the first node. However, you can define one of your machines to
be primary which will be the default option when running vagrant ssh.
We did exactly that with the master node.

You should now be able to connect to the master node by running vagrant ssh
(without additional arguments) and from there ping other VMs on the network.

  1. vagrant > ping 192.168.50.11
  2. PING 192.168.50.11 (192.168.50.11) 56(84) bytes of data.
  3. 64 bytes from 192.168.50.11: icmp_seq=1 ttl=64 time=0.406 ms
  4. 64 bytes from 192.168.50.11: icmp_seq=2 ttl=64 time=0.400 ms
  5. 64 bytes from 192.168.50.11: icmp_seq=3 ttl=64 time=0.453 ms
  6. ...

Configuring the provider

We also want to tell vagrant about the system resources we want assign to our
machines. We will do that globally with the config.vm.provider option.

  1. # ... other variables
  2. VAGRANTFILE_API_VERSION = '2'
  3. # Parameters of the created VMs
  4. VM_MEMORY = 6144
  5. VM_CPUS = 8
  6. Vagrant.configure(VAGRANTFILE_API_VERSION) do |config|
  7. # ... VM configuration
  8. # Tell vagrant about system resources we want to use
  9. config.vm.provider :libvirt do |libvirt|
  10. # Customize the amount of memory on the VM:
  11. libvirt.memory = VM_MEMORY
  12. # Customize the number of cpus on the VM:
  13. libvirt.cpus = VM_CPUS
  14. end
  15. end

Here we specified that we wish to use the libvirt provider and assigned the
desired amount of memory and CPUs.

Disabling default NFS sharing

By default vagrant also mounts the current working directory as NFS drive to
the VM. We disabled this behaviour by adding this line at the end of the
configuration.

  1. # Disable mounting the default vagrant folder
  2. config.vm.synced_folder '.', '/vagrant', disabled: true

Provisioning

For provisioning we will be using Ansible. Ansible
configuration is done via configuration files written in YAML called
playbooks. Playbooks contain a list of so-called plays which are
essentially groups of tasks you want to execute. Tasks are are composed of
modules which are usually the most atomic parts of your playbooks.

A simple playbook might look something like this:

  1. --- # Three dashes signify YAML file, Ansible doesn't care if you forget them though
  2. - name: My first play.
  3. hosts: all
  4. tasks:
  5. - name: "My first task!"
  6. debug: "msg=Hello from VM - {{ inventory_hostname }}"
  7. - name: My second play!
  8. hosts: all
  9. become: True # get superuser privilages
  10. tasks:
  11. - name: Notify user about what I want to do
  12. debug:
  13. msg: I'm going to install packages
  14. # ... tasks for installing a package
  15. # The `name` field is optional (but improves readability).
  16. - debug: msg="packages are installed."

We will get to the structure of an ansible-playbook shortly. For now you can go
ahead and add the contents of this file to setup-spark.yml. We will use it
as a base for provisioning.

We will modify the Vagrantfile by adding the following lines to the
configuration.

  1. # ... other variables
  2. # Playbook to run for provisioning
  3. PLAYBOOK = 'setup-spark.yml'
  4. Vagrant.configure(VAGRANTFILE_API_VERSION) do |config|
  5. # ... rest of the configuration
  6. config.vm.provision :ansible do |ansible|
  7. # Disable default limit to connect to all the machines
  8. ansible.playbook = PLAYBOOK
  9. end
  10. end

You can now create your VMs, Vagrant you will see Vagrant run ansible and
provision them for you. You will also notice that the VMs are being provisioned
one after another (you can check that by running vagrant provision). This
is actually something we don’t want because ansible is capable (and expected)
to run the individual plays in parallel.

To our knowledge there isn’t currently a “clean” way configure Vagrant to run
the provisioning only after booting all the machines. There is however, a small
“hack” that can achieve this. You run the provisioning from the last created
machine only and turn off the default limit for provisioning (which is current
machine only). This also endorsed by Vagrant
documentation
.

We modify the Vagrantfile followingly:

  1. (1..WORKER_COUNT).each do |machine_id|
  2. # ... worker variables
  3. # Setup one slave node
  4. config.vm.define box_name do |subconfig|
  5. # Run ansible provisioning in parallel by running the provisiong from the
  6. # last created node only
  7. if machine_id == WORKER_COUNT
  8. subconfig.vm.provision :ansible do |ansible|
  9. # Disable default limit to connect to all the machines
  10. ansible.limit = 'all'
  11. ansible.playbook = PLAYBOOK
  12. end
  13. end
  14. end
  15. end

If you run vagrant provision now, you should see all the machines
provisioned in parallel.

There is now only a few last things we need to add to our Vagrant
configuration.

Firstly, we want to create a group within ansible and assign all the workers
to it. This will allow us to configure all the workers simultaneously.

We can do that by adding the following lines into the provisioning section:

  1. ansible.groups = {
  2. 'workers' => (1..WORKER_COUNT).map { |i| 'worker-' + id_number.to_s.rjust(2, '0') }
  3. }

Finally, we want to notify ansible about the IP address it should use for
configuration. This is needed in case the VMs have multiple network interfaces.
We will use this variable ourselves when configuring ansible later. We can
achieve this by defining the ansible.host_vars field in configuration ans
supplying it a hash (ruby dictionary) with key-value pairs of hostname and
value we want to set. The value is another hash containing the values we want
to supply ansible.

We start by defining a new hash called spark_ip_addresses and filling it
with the IP addresses for individual nodes like this:

  1. # Define hash for master node
  2. spark_ip_addresses = {
  3. 'master' => {
  4. 'spark_ip_address' => '192.168.50.10'
  5. }
  6. }
  7. # ... add new entry
  8. spark_ip_addresses[box_name] = {
  9. 'spark_ip_address' => box_ip_address
  10. }

In the provisioning section we then provide the hash to the ansible.host_vars:

  1. ansible.host_vars = spark_ip_addresses

Final Vagrantfile

The final Vagrantfile (with some minor improvements) looks like this:

  1. # frozen_string_literal: true
  2. # -*- mode: ruby -*-
  3. # vi: set ft=ruby :
  4. VAGRANTFILE_API_VERSION = '2'
  5. # How many slave nodes we want to create
  6. WORKER_COUNT = 4
  7. # System resources for nodes
  8. VM_MEMORY = 6144
  9. VM_CPUS = 8
  10. # Default starting host address
  11. IP_START = 10
  12. # Default box to use
  13. BOX_IMAGE = 'debian/buster64'
  14. # Playbook to run for provisioning
  15. PLAYBOOK = 'setup-spark.yml'
  16. # Generate a name for the worker based on its ID number
  17. def worker_name(id_number)
  18. # Name of the node in format "worker-01", "worker-32", etc.
  19. 'worker-' + id_number.to_s.rjust(2, '0')
  20. end
  21. Vagrant.configure(VAGRANTFILE_API_VERSION) do |config|
  22. # Create the master node
  23. config.vm.define 'master', primary: true do |subconfig|
  24. subconfig.vm.box = BOX_IMAGE
  25. # Connect master to a private network
  26. subconfig.vm.network :private_network, ip: "192.168.50.#{IP_START}"
  27. # Set hostname for master
  28. subconfig.vm.hostname = 'master'
  29. end
  30. spark_ip_addresses = {
  31. 'master' => {
  32. 'spark_ip_address' => "192.168.50.#{IP_START}"
  33. }
  34. }
  35. # Create the worker nodes
  36. (1..WORKER_COUNT).each do |machine_id|
  37. box_name = worker_name(machine_id)
  38. # IP address of the worker
  39. box_ip_address = "192.168.50.#{machine_id + IP_START}"
  40. # Setup one slave node
  41. config.vm.define box_name do |subconfig|
  42. subconfig.vm.box = BOX_IMAGE
  43. # Connect a worker to a private network
  44. subconfig.vm.network :private_network, ip: box_ip_address
  45. # Set worker hostname
  46. subconfig.vm.hostname = box_name
  47. spark_ip_addresses[box_name] = {
  48. 'spark_ip_address' => box_ip_address
  49. }
  50. # Run ansible provisioning in parallel by running the provisiong from the
  51. # last created node only
  52. if machine_id == WORKER_COUNT
  53. subconfig.vm.provision :ansible do |ansible|
  54. # Disable default limit to connect to all the machines
  55. ansible.limit = 'all'
  56. ansible.playbook = PLAYBOOK
  57. ansible.host_vars = spark_ip_addresses
  58. ansible.groups = {
  59. 'workers' => (1..WORKER_COUNT).map { |i| worker_name(i) }
  60. }
  61. end
  62. end
  63. end
  64. end
  65. # Tell vagrant about system resources we want to use
  66. config.vm.provider :libvirt do |libvirt|
  67. # Customize the amount of memory on the VM:
  68. libvirt.memory = VM_MEMORY
  69. # Customize the number of cpus on the VM:
  70. libvirt.cpus = VM_CPUS
  71. end
  72. # Disable mounting the default vagrant folder
  73. config.vm.synced_folder '.', '/vagrant', disabled: true
  74. end

Ansible setup

Setting up an inventory

Firstly, we need to inform Ansible about the machines we wish to provision.
This is done through what Ansible calls an inventory. In an inventory you
specify the hosts Ansible should configure. Additionally, Ansible can group
hosts into groups, so you can run related task for multiple nodes at once,
which simplifies the configuration even further. The default inventory can be
found in /etc/ansible/hosts.

We will be able to skip writing an inventory file entirely, because Vagrant is
able to auto-generate the inventory file for us. After running vagrant up
with provisioning using Ansible set up, you can find the auto-generated file in
.vagrant/provisioners/ansible/inventory/vagrant_ansible_inventory in the
root of your project directory.

Within, you should be able to see the hosts we defined in Vagrant with
information how to connect as well as the spark_ip_address variable for
each host. There should also be defined the workers group starting with
[workers] and listing all the worker hostnames.

To test that the inventory file is working correctly we can run a one-off
module using the ansible command. We will tell each machine to print its
inventory hostname.

  1. > ansible all \
  2. -i .vagrant/provisioners/ansible/inventory/vagrant_ansible_inventory \
  3. -m debug -a var=inventory_hostname

Here we tell vagrant to run the debug module printing the
inventory_hostname variable for all hosts (all is a automatically
created group containing all hosts in the inventory) from the specified
inventory file. More on modules in a minute.

If you wish to know more about crating inventories, please refer to How to
build an inventory

Setting up ansible.cfg

By default ansible will look for inventory in /etc/ansible/hosts first.
To change this behaviour and make our lives easier we will configure ansible
to use the Vagrant generated inventory.

Create a file name ansible.cfg in the root of your project and fill it
with:

  1. [defaults]
  2. inventory = .vagrant/provisioners/ansible/inventory/vagrant_ansible_inventory
  3. host_key_checking = false

You can now run the command in previous section as:

  1. > ansible all -m debug -a var=inventory_hostname

If you wish to know more about configuring ansible, please refer to Ansible
Configuration Settings

Exploring basic playbook structure

Configuration in ansible is done through YAML configuration files. We will explore
the YAML syntax largely as we go. However if you wish to know more, please refer to the
YAML syntax

Let’s dissect the example playbook from Vagrant configuration.

  1. --- # Three dashes signify YAML file, Ansible doesn't care if you forget them though
  2. - name: My first play.
  3. hosts: all
  4. tasks:
  5. - name: "My first task!"
  6. debug: "msg=Hello from VM - {{ inventory_hostname }}"
  7. - name: My second play!
  8. hosts: all
  9. become: True # get superuser privilages
  10. tasks:
  11. - name: Notify user about what I want to do
  12. debug:
  13. msg: I'm going to install packages
  14. # ... tasks for installing a package
  15. # The `name` field is optional (but improves readability).
  16. - debug: msg="packages are installed."

Here we can see a playbook with two plays defined:

  • My first play
  • My second play!

The hosts option specifies on which hosts we want to run the play. Here we
specify the all group.

The core of a play is the tasks section. In the tasks section we
specify a list of tasks we want to execute. Tasks are realized through
modules. Here we used the debug module. Modules can take options.
We can see the msg option in use here. There are two ways to supply
arguments as seen in the example. It doesn’t matter which way you choose, just
try to be consistent.

The become option allows us to elevate the play to superuser privileges.

If you want to list the options of a module, you can use the ansible-doc command
provided with Ansible:

  1. > ansible-doc debug

To run a playbook, use the ansible-playbook command. For example:

  1. > ansible-playbook setup-spark.yml

Installing packages with ansible

Let’s start with a clean setup-spark.yml file.

To install a package we will use the apt module. For different
distributions there are also modules like dfn or yum and partly
distribution agnostic package module (but you still have to account for
different package names between distributions).

We will install the ntp daemon for time synchronization.

We add the following to spark-setup.yml.

  1. ---
  2. - name: Setup common
  3. hosts: all
  4. tasks:
  5. - name: Install the NTP daemon.
  6. become: True
  7. apt:
  8. name: ntp

Here we can see we can get superuser privileges for a single task only.

After running ansible-playbook setup-spark.yml you should see ansible
install the ntp to all hosts.

Enabling a service

To enable a service we use the service module.

To enable and start the ntp service add the following task to the play:

  1. - name: Start ntp service
  2. service: name=ntp state=started enabled=yes

Note that we could have alternatively used the YAML syntax:

  1. - name: Start ntp service
  2. service:
  3. name: ntp
  4. state: started
  5. enabled: yes

Installing Apache Spark

Apache Spark is unfortunately not available through the official repositories
so we will have to install it manually.

We will assume that ansible has configured one host called master and a
group of hosts called workers.

Firstly, we need to install some dependencies, namely Java. We also ensure,
that we have python3 installed. We define a new play with this task:

  1. - name: Install spark
  2. hosts: all
  3. become: True
  4. tasks:
  5. - name: Installing dependencies
  6. apt:
  7. name: [ "default-jdk", "python3" ]
  8. update_cache: True

The update_cache option tells apt to update the local
repository cache.

Next, we define a new group and user under which we install the package. We
will want to run spark as a service later. For this task, Ansible provides two
modules group and user. Creating them is them as straightforward as
adding the following tasks:

  1. - name: Create spark group
  2. group:
  3. name: spark
  4. state: present
  5. - name: Create spark non-system user
  6. user:
  7. name: spark
  8. group: spark
  9. shell: /usr/sbin/nologin

Next, we are going to download the Apache Spark binary. They can be found here:
https://downloads.apache.org/spark/. We will
be installing spark-3.0.2 with hadoop3.2.

The full link to the corresponding tarball then looks like this:

  1. https://downloads.apache.org/spark/spark-3.0.2/spark-3.0.2-bin-hadoop3.2.tgz

To make the playbook more general we will separate information into
variables. Ansible allows us to declare variables quite a few places ( a bit too
many

for my taste).

We’ve already defined some variables (host variables in ansible speak) when
we configured Vagrant provisioning (spark_ip_address). They live in the inventory.

On a play level we can declare them using the vars
option. For the sake of brevity we declared the following variables (which we will use
later)

  • spark_mirror: Mirror from which to download Spark (i.e https://downloads.apache.org/spark)
  • spark_version: The version of Spark we wish to use (i.e. "3.0.2")
  • hadoop_version: Version of hadoop. (i.e "hadoop3.2")

  • spark_parent_dir: Location where we wish to put Spark ( i.e. “/usr/local” )

  • spark_link_dir: Directory where will the Spark reside ( i.e. “/usr/local/spark” )

And some variables for convenience:

  • spark_name: Just version with appended spark- prefix ("spark-3.0.2")
  • spark_fullname: Full name of the Spark package. ("{{ spark_name }}-bin-{{ hadoop_version }}")
  • spark_target_dir: Location where we wish to put a specific installation
    of Spark ( i.e. “/usr/local/spark-3.0.2-bin-hadoop3.2” )
  • spark_tarball: Name of the downloaded tarball. ("{{ spark_fullname }}.tgz")
  • spark_tarball_url: Full link to the package ("{{ spark_mirror }}/{{ spark_name }}/{{ spark_tarball }}")

The playbook with the variables defined will then look like this:

  1. ---
  2. - name: Install Apache Spark
  3. hosts: all
  4. becme: True
  5. vars:
  6. - spark_mirror: "https://downloads.apache.org/spark"
  7. - spark_version: "3.0.2"
  8. - hadoop_version: "hadoop3.2"
  9. - spark_parent_dir: "/usr/local"
  10. - spark_link_dir: "/usr/local/spark"
  11. - spark_name: "spark-{{ spark_version }}"
  12. - spark_fullname: "{{ spark_name }}-bin-{{ hadoop_version }}"
  13. - spark_target_dir: "{{ spark_parent_dir }}/{{ spark_fullname }}"
  14. - spark_tarball: "{{ spark_fullname }}.tgz"
  15. - spark_tarball_url: "{{ spark_mirror }}/{{ spark_name }}/{{ spark_tarball }}"
  16. tasks:
  17. # ... tasks

To download the file, Ansible provides a module called get_url. We
add the following task:

  1. - name: Downloading Apache Spark sources
  2. get_url:
  3. url: "{{ spark_tarball_url }}"
  4. dest: "/tmp/{{ spark_tarball }}"
  5. owner: spark
  6. group: spark
  7. mode: 0644

Again url, dest, owner, group and mode are relevant options
of the module (ansible-doc get-url).

Next, we unpack the archive. We use the unarchive module.

  1. - name: Unpacking tarball
  2. unarchive:
  3. remote_src: yes
  4. dest: "{{ spark_parent_dir }}"
  5. src: "/tmp/{{ spark_tarball }}"
  6. owner: spark
  7. group: spark
  8. creates: "{{ spark_target_dir }}"

Here we see two notable options. The remote_src informs ansible that the
source file is located at the remote machine. Without this option it would
try to find the archive on your local host (the machine from which you
are running ansible).

The creates option informs ansible about the state of the environment after
the task is done. Next time the task is run and the tarball is already
unpacked, the task will simply be skipped. Used here, it’s actually useless,
because the unarchive module is smart enough to infer this without the
option. However it can be very useful if you, for example create a file
manually through the shell or command module.

Next, we link the module to the desired destination. Note that we could have
just renamed the directory, but linking makes it easier when we (potentially)
want to manage multiple versions.

  1. - name: Link spark directory
  2. file:
  3. src: "{{ spark_parent_dir }}/{{ spark_fullname }}"
  4. dest: "{{ spark_link_dir }}"
  5. state: link

Next we have to setup the Spark environment. We want to do two things:

  • Add Spark installation to PATH
  • Setup environment variables for Spark

To add Spark to PATH, we will simply add a new script to /etc/profile.d.
These scripts are executed by the shell on startup. To generate the new file
we will use a template. Ansible supports the Jinja2 templating engine.

We create a new directory templates/ and add a new file spark.sh.j2
where we export the variables the usual way:

  1. export SPARK_HOME={{ spark_link_dir }}
  2. export PATH="$PATH:{{ spark_target_dir }}/bin:{{ spark_target_dir }}/sbin"

We add the following task to the playbook:

  1. - name: Add spark to environment
  2. template:
  3. src: "spark.sh.j2"
  4. dest: "/etc/profile.d/spark.sh"
  5. owner: root
  6. group: root
  7. mode: 0644

Ansible will automatically substitute the variables using the variables defined
in the playbook.

To configure spark environment, we will want to define the following
environmental variables:

  • SPARK_MASTER_HOST - IP address of the master
  • SPARK_MASTER_PORT - port on which to connect to master
  • SPARK_LOCAL_IP - IP address on which to start the local Spark process
  • SPARK_HOME - home of the current spark installation
  • PYSPARK_PYTHON - python interpreter to use for PySpark

We will group these into a YAML dictionary, which we put into the vars
section.

  1. vars:
  2. # ... other variables
  3. - spark_master_ip: "{{ hostvars['master'].spark_ip_address }}"
  4. - spark_master_port: 7077
  5. - spark_local_ip: "{{ hostvars[inventory_hostname].spark_ip_address }}"
  6. - spark_worker_port: 7078
  7. - spark_env_vars:
  8. SPARK_MASTER_HOST: "{{ spark_master_ip }}"
  9. SPARK_MASTER_PORT: "{{ spark_master_port }}"
  10. SPARK_LOCAL_IP: "{{ spark_local_ip }}"
  11. SPARK_HOME: "{{ spark_link_dir }}"
  12. PYSPARK_PYTHON: "/usr/bin/python3"

We also defined spark_worker_port which we will use later.

Now we create a template to populate the spark-env.sh file found in
conf/ directory of the Spark installation.

  1. {% if spark_env_vars is defined %}{% for key, value in spark_env_vars.items() %}
  2. export {{key}}={{value}}
  3. {% endfor %}{% endif %}

Here, we use Jinja in a bit more advanced way, but the result should still be
evident.

We add the following task:

  1. - name: Configure spark environment
  2. template:
  3. src: "spark-env.sh.j2"
  4. dest: "{{ spark_target_dir }}/conf/spark-env.sh"
  5. owner: spark
  6. group: spark
  7. mode: 0755

Finally, we cleanup the downloaded files:

  1. - name: Cleanup
  2. file:
  3. path: "/tmp/{{ spark_tarball }}"
  4. state: absent

Adding systemd units

We won’t be covering how to define systemd units. If you wish to know more
Understanding Systemd Units and Unit
Files

might be a good start.

We define the following templates:

spark-master.service.j2

  1. [Unit]
  2. Description=Spark Master Service
  3. Wants=network.target network-online.target
  4. After=network.target network-online.target
  5. [Service]
  6. Type=forking
  7. User=spark
  8. WorkingDirectory={{ spark_link_dir }}
  9. ExecStart={{ spark_link_dir }}/sbin/start-master.sh
  10. ExecStop={{ spark_link_dir }}/sbin/stop-master.sh
  11. Restart=on-abort
  12. [Install]
  13. WantedBy=multi-user.target

spark-worker.service.j2

  1. [Unit]
  2. Description=Spark Worker Service
  3. Wants=network.target network-online.target
  4. After=network.target network-online.target
  5. [Service]
  6. Type=forking
  7. User=spark
  8. WorkingDirectory={{ spark_link_dir }}
  9. ExecStart={{ spark_link_dir }}/sbin/start-slave.sh "spark://{{ spark_master_ip }}:{{ spark_master_port }}" --port {{ spark_worker_port }}
  10. ExecStop={{ spark_link_dir }}/sbin/stop-slave.sh
  11. Restart=on-abort
  12. [Install]
  13. WantedBy=multi-user.target

However, we will want to copy these files to the machines only if systemd
is present in the system. That requires some form of flow control. We will use
the Ansible when clause.

First, we have to capture the output of a module. In Ansible this is called
registering a variable and can be done like this:

  1. - name: Check if systemd exists
  2. stat: path=/lib/systemd/system/
  3. register: systemd_check

Here we use the stat module to check the /lib/systemd/system path and
register the output as a new variable systemd_check . The contents of this
variable are dependent on the module used, so we recommend to use the debug
module to check its contents.

We will introduce one more Ansible construct and that is block. It groups
multiple tasks together so different options can be applied to them together.

The new tasks together then look like this:

  1. - name: Check if systemd exists
  2. stat: path=/lib/systemd/system/
  3. register: systemd_check
  4. - block:
  5. - name: Create master systemd start script
  6. template:
  7. src: "spark-master.service.j2"
  8. dest: "/etc/systemd/system/spark-master.service"
  9. owner: root
  10. group: root
  11. mode: 0644
  12. - name: Create worker systemd start script
  13. template:
  14. src: "spark-worker.service.j2"
  15. dest: "/etc/systemd/system/spark-worker.service"
  16. owner: root
  17. group: root
  18. mode: 0644
  19. when: systemd_check.stat.exists == True
  20. become: True

Creating roles

We will look at two ways of simplifying playbooks, namely roles and
including other YAML files using include.

Roles are usually defined in a roles/ directory and follow a specific
directory structure.

To simplify our playbook, we might want to define two roles:

  1. - ``common``: role that setups common environment on all host
  2. - ``spark``: role that installs Spark

We therefore create the following directory structure:

  1. roles
  2. ├── common
  3. └── tasks
  4. └── spark
  5. ├── defaults
  6. ├── tasks
  7. ├── templates
  8. └── vars

The individual directories correspond to different parts of playbooks. For
complete overview please refer to Role directory
structure

Basic overview of the directories:

  • defaults: Contains default variables or variables we expect will be overwritten
  • vars: variables we use within the role
  • tasks: tasks of the role
  • templates: template files

We distribute the configuration from the main playbook into to roles in the
following way:

roles/common/tasks/main.yml:

  1. ---
  2. - block:
  3. - name: Install the NTP daemon.
  4. apt:
  5. name: ntp
  6. - name: Start ntp service
  7. service: name=ntp state=started enabled=yes
  8. become: True

roles/spark/defaults/main.yml:

  1. ---
  2. spark_mirror: "https://downloads.apache.org/spark"
  3. spark_version: "3.0.2"
  4. hadoop_version: "hadoop3.2"
  5. spark_parent_dir: "/usr/local"
  6. spark_link_dir: "/usr/local/spark"
  7. # Custom master configuration
  8. spark_master_ip: "{{ hostvars['master'].ansible_default_ipv4.address
  9. | default(hostvars['master'].ansible_all_ipv4_addresses[0]) }}"
  10. spark_master_port: 7077
  11. # IP address Spark binds to on this node
  12. spark_local_ip: "{{ ansible_default_ipv4.address
  13. | default(ansible_all_ipv4_addresses[0]) }}"
  14. spark_worker_port: 7078
  15. # Custom spark runtime configuration options
  16. spark_extra_vars: {}
  17. spark_default_vars:
  18. SPARK_MASTER_HOST: "{{ spark_master_ip }}"
  19. SPARK_MASTER_PORT: "{{ spark_master_port }}"
  20. SPARK_LOCAL_IP: "{{ spark_local_ip }}"
  21. SPARK_HOME: "{{ spark_link_dir }}"
  22. # Python binary executable to use for PySpark in both driver and workers
  23. # (default is python2.7 if available, otherwise python). Property
  24. # spark.pyspark.python take precedence if it is set
  25. PYSPARK_PYTHON: "/usr/bin/python3"

roles/spark/vars/main.yml:

  1. ---
  2. spark_env_vars: "{{ spark_extra_vars | default({}) | combine(spark_default_vars) }}"
  3. spark_name: "spark-{{ spark_version }}"
  4. spark_fullname: "{{ spark_name }}-bin-{{ hadoop_version }}"
  5. spark_tarball: "{{ spark_fullname }}.tgz"
  6. spark_tarball_url: "{{ spark_mirror }}/{{ spark_name }}/{{ spark_tarball }}"
  7. spark_target_dir: "{{ spark_parent_dir }}/{{ spark_fullname }}"

roles/spark/tasks/systemd.yml:

  1. ---
  2. - name: Check if systemd exists
  3. stat: path=/lib/systemd/system/
  4. register: systemd_check
  5. - block:
  6. - name: Create master systemd start script
  7. template:
  8. src: "spark-master.service.j2"
  9. dest: "/etc/systemd/system/spark-master.service"
  10. owner: root
  11. group: root
  12. mode: 0644
  13. - name: Create worker systemd start script
  14. template:
  15. src: "spark-worker.service.j2"
  16. dest: "/etc/systemd/system/spark-worker.service"
  17. owner: root
  18. group: root
  19. mode: 0644
  20. when: systemd_check.stat.exists == True
  21. become: True

roles/spark/tasks/main.yml:

  1. - name: Check for existing ansible installation
  2. stat:
  3. path: '{{ spark_target_dir }}'
  4. register: spark_installation
  5. tags:
  6. - spark
  7. - name: Download, install and setup Spark environment
  8. when: not spark_installation.stat.exists
  9. block:
  10. - name: Installing dependencies
  11. apt:
  12. name: [ "default-jdk", "python3" ]
  13. - name: Create spark group
  14. group:
  15. name: spark
  16. state: present
  17. - name: Create spark non-system user
  18. user:
  19. name: spark
  20. group: spark
  21. shell: /usr/sbin/nologin
  22. - name: Downloading Apache Spark sources
  23. get_url:
  24. url: "{{ spark_tarball_url }}"
  25. dest: "/tmp/{{ spark_tarball }}"
  26. owner: spark
  27. group: spark
  28. mode: 0644
  29. - name: Unpacking tarball
  30. unarchive:
  31. remote_src: yes
  32. dest: "{{ spark_parent_dir }}"
  33. src: "/tmp/{{ spark_tarball }}"
  34. owner: spark
  35. group: spark
  36. creates: "{{ spark_target_dir }}"
  37. - name: Add spark to environment
  38. template:
  39. src: "spark.sh.j2"
  40. dest: "/etc/profile.d/spark.sh"
  41. owner: root
  42. group: root
  43. mode: 0644
  44. - name: Configure spark environment
  45. template:
  46. src: "spark-env.sh.j2"
  47. dest: "{{ spark_target_dir }}/conf/spark-env.sh"
  48. owner: spark
  49. group: spark
  50. mode: 0755
  51. become: True
  52. - name: Link spark directory
  53. become: True
  54. file:
  55. src: "{{ spark_parent_dir }}/{{ spark_fullname }}"
  56. dest: "{{ spark_link_dir }}"
  57. state: link
  58. - name: Cleanup
  59. become: True
  60. file:
  61. path: "/tmp/{{ spark_tarball }}"
  62. state: absent
  63. - name: Include systemd services
  64. include: systemd.yml

The final directory structure looks like this:

  1. roles
  2. ├── common
  3. └── tasks
  4. └── main.yml
  5. └── spark
  6. ├── defaults
  7. └── main.yml
  8. ├── tasks
  9. ├── main.yml
  10. └── systemd.yml
  11. ├── templates
  12. ├── spark-env.sh.j2
  13. ├── spark-master.service.j2
  14. ├── spark.sh.j2
  15. └── spark-worker.service.j2
  16. └── vars
  17. └── main.yml

A few notable changes happened:

  1. default values for spark_master_ip and spark_local_ip have been
    defined using the Jinja default filter.
  2. definition of spark environmental variables has been moved into
    spark_default_vars, a new dictionaryspark_extra_vars has been
    defined and they are combined into the spark_env_vars using the Jinja
    combine filter
  3. the tasks for systemd are defined in its own file systemd.yml and
    included from main.yml
  4. the spark installation now proceeds only when it detects that Spark is not
    installed

Finally, we edit the setup-spark.yml:

  1. - name: Install spark
  2. hosts: all
  3. roles:
  4. - common
  5. - role: spark
  6. vars:
  7. spark_local_ip: "{{ hostvars[inventory_hostname].spark_ip_address }}"
  8. spark_master_ip: "{{ hostvars['master'].spark_ip_address }}"
  9. tasks:
  10. - name: Add vagrant to spark group
  11. become: True
  12. user:
  13. name: vagrant
  14. groups: spark
  15. append: yes
  16. - name: Start master node
  17. hosts: master
  18. tasks:
  19. - name: Start master service
  20. become: True
  21. service:
  22. name: spark-master
  23. state: started
  24. enabled: yes
  25. - name: Copy example files to master
  26. become: True
  27. copy:
  28. src: example/
  29. dest: /example
  30. owner: spark
  31. group: spark
  32. mode: 0755
  33. - name: Start worker nodes
  34. hosts: all
  35. tasks:
  36. - name: Start worker service
  37. become: True
  38. service:
  39. name: spark-worker
  40. state: started

Some notes to Apache Spark ML

Download the data

Do not forget to distribute the files to the workers!

Boston Housing dataset:

wget https://raw.githubusercontent.com/selva86/datasets/master/BostonHousing.csv

Mnist train and test:

wget https://pjreddie.com/media/files/mnist_train.csv
wget https://pjreddie.com/media/files/mnist_test.csv

I have also created a script for downloading the data in pyspark/download_data.sh:

  1. #!/usr/bin/env bash
  2. wget https://raw.githubusercontent.com/selva86/datasets/master/BostonHousing.csv
  3. wget https://pjreddie.com/media/files/mnist_train.csv
  4. wget https://pjreddie.com/media/files/mnist_test.csv

Spark Shell

Find the directory where Spark is installed using echo $SPARK_HOME.
In this directory run ./bin/pyspark to enter the Python Spark Shell. This connects
you to the cluster automatically by creating SparkSession (more on that later).

Spark Shell is interactive so you can try whatever you want. It’s good for learning
how Spark ML works.

SparkSession

This is an entry point to the cluster. Every program you want to run on the cluster
needs object of this type. Spark Shell creates it without you even noticing.

You can create SparkSession by:

  1. spark = SparkSession.builder.master("spark://192.168.50.10:7077").appName("Linear regression with pipeline - Boston").getOrCreate()

where spark://192.168.50.10:7077 is the address of the master node, then you choose a name for
the app and create the SparkSession (or get one earlier created). Note that in older versions there was SparkContext, however
the SparkSession replaced it and therefore SparkContext is no longer used.

How to store data?

There are multiple objects that can store data:

  1. RDD - Resilient Distributed Dataset
  2. DataFrame
  3. DataSet

We will talk only about RDD and DataFrames since only these two types we use in our project.

RDD

RDD is the oldest object that represents data. It is

  • immutable - when the object is created it cannot be modified (modification returns a new RDD)
  • fault tolerant - when something goes wrong the app doesn’t crash
  • distributed - RDD is divided into logic partitions and distributed between cluster worker nodes and therefore the parallelization is possible
  • lazy evaluated - RDDs are lazy evaluated meaning actions and transformations are evaluated only when necessary. Imagine
    a person who is really lazy but does everything (you can rely on him). He just postpones all work
    and when you start to yell that you really need it, he finally does it.

We can perform transformations and actions on RDD:

  • transformations return a new RDD (since it is immutable). Possible transformations are .map(), .filter(), .randomSplit(), …
  • actions return non-RDD object (for example just an int number) - .first(), .count(), …

You can create a RDD from existing collections (lists for example) or from files on the disk.

DataFrames

This is the newer object representing data that we will use in our applications.

If you have experience with Pandas DataFames, this is virtually the same. You can think about Spark ML DataFrames
as Pandas DataFrames. Spark DataFrames have only some optimizations under the hood.

DataFrame is just a 2D array with labelled columns. It is also:

  • immutable
  • fault tolerant
  • distributed
  • lazy evaluated

You can create one from RDD, existing collections (like lists) or from files on the disk (.csv, .txt, .xml, HDFS, S3, HBase, …).
We create DataFrame in lr_pipeline.py from a .csv file like this:

data = spark.read.format("csv").option('header', 'true').option('inferSchema', 'true').load(filename)

In our video there is explained what all the commands mean.

You can also create DataFrame from a list like this:

  1. our_list = [('Martin', 180), ('David', 182), ('Dennis', 190), ('Tobias', 183)]
  2. columns = ["name", "height"]
  3. our_DataFrame = spark.CreateDataFrame(data=our_list, schema=columns)

Simple operations on DataFrames I didn’t have time to show on video

  • df.show(10) … shows header and first 10 lines of the DataFrame
  • df.printSchema() … prints schema - the header with column type
  • df.select("col_name").show() … shows only col_name column
  • df.filter(df["col_name"] > 10).show() .. filters only rows which have a value > 10 in column col_name

And other very useful commands:
train, test = df.randomSplit([0.8, 0.2]) … splits the DataFrame to train and test sets
train.schema.names … returns list of names of the columns

VectorAssembler

Transformer that creates a new column by connecting more columns together to a list. We use it in lr_pipeline.py:

  1. v_assembler = VectorAssembler(inputCols=features_list, outputCol='features')
  2. new_dataframe = v_assembler.transform(old_dataframe)

Pipeline

This is a concept from ML that makes the whole workflow of data easier. You put data into pipeline
and the pipeline transforms it to the output. In pipeline there can be many stages - many transformers
and estimators. In file lr_pipeline.py we demonstrate a very simple pipeline with just one
transformer (polynomial expansion) and one estimator (linear regression):

  1. poly_exp = PolynomialExpansion(degree=3, inputCol="features", outputCol="poly_features")
  2. lr = LinearRegression(regParam=0.1, featuresCol="poly_features")
  3. pipeline = Pipeline(stages=[poly_exp, lr])

Te pipeline then behaves like a simple estimator - you put data to input and on that
data you fit a model:

  1. model = pipeline.fit(train)

A model is a transformer. It transforms input data into predictions:

  1. prediction = model.transform(train).select("prediction")

To evaluate our model we can use Evaluators - for linear regression:

  1. evaluator = RegressionEvaluator()
  2. prediction_and_labels = model.transform(train).select("prediction", "label")
  3. print("Precision train: " + str(evaluator.evaluate(prediction_and_labels)))
  4. prediction_and_labels = model.transform(test).select("prediction", "label")
  5. print("Precision test: " + str(evaluator.evaluate(prediction_and_labels)))

The whole file lr_pipeline.py with a prepare_data() function that prepares data for the model looks like this:

  1. from pyspark.ml.linalg import Vectors
  2. from pyspark.ml.feature import VectorAssembler
  3. from pyspark.ml import Pipeline
  4. from pyspark.ml.evaluation import RegressionEvaluator
  5. from pyspark.ml.regression import LinearRegression
  6. from pyspark.ml.feature import PolynomialExpansion
  7. from pyspark.sql import SparkSession
  8. def prepare_data(filename):
  9. """
  10. Transform data for ML learning algorithm
  11. Input: Data to be transformed (DataFrame)
  12. Returns: Transformed data (DataFrame)
  13. """
  14. data = spark.read.format("csv").option('header', 'true').option('inferSchema', 'true').load(filename)
  15. data = data.withColumnRenamed('medv', 'label')
  16. # get columns that represent features
  17. features_list = data.columns
  18. # pop last column since it is our target
  19. features_list.remove('label')
  20. # make a new column with a vector of features
  21. v_assembler = VectorAssembler(inputCols=features_list, outputCol='features')
  22. return v_assembler.transform(data)
  23. if __name__ == "__main__":
  24. train_ratio = 0.8
  25. test_ratio = 1 - train_ratio
  26. # create SparkSession - the entry to the cluster
  27. spark = SparkSession.builder.master("spark://192.168.50.10:7077").appName("Linear regression with pipeline - Boston").getOrCreate()
  28. data = prepare_data("BostonHousing.csv")
  29. # split data into train and test DataFrames
  30. train, test = data.randomSplit([train_ratio, test_ratio])
  31. poly_exp = PolynomialExpansion(degree=3, inputCol="features", outputCol="poly_features")
  32. lr = LinearRegression(regParam=0.1, featuresCol="poly_features")
  33. pipeline = Pipeline(stages=[poly_exp, lr])
  34. # fit the model
  35. model = pipeline.fit(train)
  36. evaluator = RegressionEvaluator()
  37. prediction_and_labels = model.transform(train).select("prediction", "label")
  38. print("Precision train: " + str(evaluator.evaluate(prediction_and_labels)))
  39. prediction_and_labels = model.transform(test).select("prediction", "label")
  40. print("Precision test: " + str(evaluator.evaluate(prediction_and_labels)))

Running the application on the cluster

To run the application on the cluster we use bin/spark-submit script in $SPARK_HOME directory.

Note: every node needs access to the files used in our models. We use Boston Housing and MNIST datasets.
I have created a script to download all data. This script has to be run in all worker nodes.

To run the application on our cluster, run:
/usr/local/spark/bin/spark-submit --master "spark://192.168.50.10:7077" linear_regression.py 2> logfile

We redirected logs to logfile. If you want to see what Spark is doing, erase 2> logfile.

Note: As you can see in logfile, Spark does many things. It has to initialize the whole cluster,
create server where you can watch your jobs (on localhost on port 4040) and many others.
Therefore the overhead for our application is large. When we use as little amount of data
as in the BostonHousing dataset, the size of the overhead causes the application to run for a
longer time than in regular Python. Therefore Spark ML is not suitable for really tiny apps as
it takes longer to run them due to the overhead. But Spark ML is excellent when we have large
models and large amount of data - the overhead is tiny compared to the model and parallelization is
really beneficial.

Multilayer Perceptron