Cloud

GCP, Dataproc

Here is a demo of running a learning algorithm on Google Cloud Platform’s Dataproc.

1 gcloud dataproc batches submit pyspark \
2     pyspark-demo.py \
3     --region=us-central1 \
4     --version=2.0 \
5     --deps-bucket=my-dataproc-deps-bucket \
6     --py-files=pybbn-3.2.3-py3.9.egg,pysparkbbn-0.0.3-py3.9.egg \
7     -- --input gs://my-gcs-folder/data/data-binary.csv

The driver code pyspark-demo.py is as follows.

 1import argparse
 2import json
 3import sys
 4from typing import List
 5
 6from pybbn.pptc.inferencecontroller import InferenceController
 7from pyspark.sql import SparkSession
 8
 9from pyspark_bbn.discrete.bbn import get_bbn, get_darkstar_data, get_pybbn_data
10from pyspark_bbn.discrete.data import DiscreteData
11from pyspark_bbn.discrete.plearn import ParamLearner
12from pyspark_bbn.discrete.scblearn import Naive
13
14
15def parse_pargs(args: List[str]) -> argparse.Namespace:
16    """Parse command-line arguments.
17
18    :param args: command-line arguments.
19    :type args: List[str]
20    :return: parsed arguments.
21    :rtype: argparse.Namespace
22    """
23    parser = argparse.ArgumentParser()
24    parser.add_argument("--input", type=str, required=True, help="Input CSV file")
25
26    return parser.parse_args(args)
27
28
29def start(input_path: str) -> None:
30    """Start the PySpark demo.
31
32    :param input_path: path to the input CSV file.
33    :type input_path: str
34    :return: None
35    :rtype: None
36    """
37    spark = SparkSession.builder.appName("learn-naive").getOrCreate()
38
39    sdf = spark.read.option("header", True).option("inferSchema", True).csv(input_path)
40
41    print("data schema")
42    sdf.printSchema()
43
44    print("")
45    print("data sample")
46    sdf.show(10)
47
48    data = DiscreteData(sdf)
49    naive = Naive(data, "e")
50    g = naive.get_network()
51
52    print("")
53    print("nodes")
54    print("-" * 10)
55    for n in g.nodes():
56        print(f"{n}")
57
58    print("")
59    print("edges")
60    print("-" * 10)
61    for pa, ch in g.edges():
62        print(f"{pa} -> {ch}")
63
64    param_learner = ParamLearner(data, g)
65    p = param_learner.get_params()
66
67    print("")
68    print("params")
69    print("-" * 10)
70    print(json.dumps(p, indent=2))
71
72    print("")
73    print("py-bbn, posteriors")
74    print("-" * 10)
75    bbn = get_bbn(g, p, data.get_profile())
76    join_tree = InferenceController.apply(bbn)
77
78    for node, posteriors in join_tree.get_posteriors().items():
79        p_str = ", ".join([f"{val}={prob:.5f}" for val, prob in posteriors.items()])
80        print(f"{node} : {p_str}")
81
82    print("")
83    print("py-bbn, data")
84    print("-" * 10)
85    pybbn_data = get_pybbn_data(g, p, data.get_profile())
86    print(json.dumps(pybbn_data, indent=2))
87
88    print("")
89    print("darkstar, data")
90    print("-" * 10)
91    darkstar_data = get_darkstar_data(g, p, data.get_profile())
92    print(json.dumps(darkstar_data, indent=2))
93
94
95if __name__ == "__main__":
96    args = parse_pargs(sys.argv[1:])
97
98    input_path = args.input
99    start(input_path)

The output should look something like the following.

  1Batch [7732907e5b8843f98c5f6c2ccffbd85d] submitted.
  2Using the default container image
  3Waiting for container log creation
  4PYSPARK_PYTHON=/opt/dataproc/conda/bin/python
  5JAVA_HOME=/usr/lib/jvm/temurin-17-jdk-amd64
  6SPARK_EXTRA_CLASSPATH=
  7:: loading settings :: file = /etc/spark/conf/ivysettings.xml
  8data schema
  9root
 10 |-- a: integer (nullable = true)
 11 |-- b: integer (nullable = true)
 12 |-- c: integer (nullable = true)
 13 |-- d: integer (nullable = true)
 14 |-- e: integer (nullable = true)
 15
 16
 17data sample
 18+---+---+---+---+---+
 19|  a|  b|  c|  d|  e|
 20+---+---+---+---+---+
 21|  1|  0|  0|  0|  0|
 22|  1|  0|  0|  0|  0|
 23|  1|  0|  0|  1|  1|
 24|  0|  0|  0|  0|  1|
 25|  0|  0|  0|  0|  0|
 26|  1|  0|  0|  0|  1|
 27|  1|  0|  0|  0|  0|
 28|  1|  0|  0|  1|  1|
 29|  0|  0|  0|  0|  1|
 30|  1|  0|  0|  0|  0|
 31+---+---+---+---+---+
 32only showing top 10 rows
 33
 34
 35nodes
 36----------
 37e
 38a
 39b
 40c
 41d
 42
 43edges
 44----------
 45e -> a
 46e -> b
 47e -> c
 48e -> d
 49
 50params
 51----------
 52{
 53  "e": [
 54    {
 55      "e": "0",
 56      "__p__": 0.7416
 57    },
 58    {
 59      "e": "1",
 60      "__p__": 0.2584
 61    }
 62  ],
 63  "a": [
 64    {
 65      "a": "0",
 66      "e": "0",
 67      "__p__": 0.18743257820927725
 68    },
 69    {
 70      "a": "1",
 71      "e": "0",
 72      "__p__": 0.8125674217907227
 73    },
 74    {
 75      "a": "0",
 76      "e": "1",
 77      "__p__": 0.1946594427244582
 78    },
 79    {
 80      "a": "1",
 81      "e": "1",
 82      "__p__": 0.8053405572755418
 83    }
 84  ],
 85  "b": [
 86    {
 87      "b": "0",
 88      "e": "0",
 89      "__p__": 0.8015102481121898
 90    },
 91    {
 92      "b": "1",
 93      "e": "0",
 94      "__p__": 0.19848975188781015
 95    },
 96    {
 97      "b": "0",
 98      "e": "1",
 99      "__p__": 0.8068885448916409
100    },
101    {
102      "b": "1",
103      "e": "1",
104      "__p__": 0.19311145510835914
105    }
106  ],
107  "c": [
108    {
109      "c": "0",
110      "e": "0",
111      "__p__": 0.6863538295577131
112    },
113    {
114      "c": "1",
115      "e": "0",
116      "__p__": 0.31364617044228693
117    },
118    {
119      "c": "0",
120      "e": "1",
121      "__p__": 0.6884674922600619
122    },
123    {
124      "c": "1",
125      "e": "1",
126      "__p__": 0.31153250773993807
127    }
128  ],
129  "d": [
130    {
131      "d": "0",
132      "e": "0",
133      "__p__": 0.9704692556634305
134    },
135    {
136      "d": "1",
137      "e": "0",
138      "__p__": 0.02953074433656958
139    },
140    {
141      "d": "0",
142      "e": "1",
143      "__p__": 0.2921826625386997
144    },
145    {
146      "d": "1",
147      "e": "1",
148      "__p__": 0.7078173374613003
149    }
150  ]
151}
152
153py-bbn, posteriors
154----------
155e : 0=0.74160, 1=0.25840
156a : 0=0.18930, 1=0.81070
157b : 0=0.80290, 1=0.19710
158c : 0=0.68690, 1=0.31310
159d : 0=0.79520, 1=0.20480
160
161py-bbn, data
162----------
163{
164  "nodes": {
165    "0": {
166      "probs": [
167        0.7416,
168        0.2584
169      ],
170      "variable": {
171        "id": 0,
172        "name": "e",
173        "values": [
174          "0",
175          "1"
176        ]
177      }
178    },
179    "1": {
180      "probs": [
181        0.18743257820927725,
182        0.8125674217907227,
183        0.1946594427244582,
184        0.8053405572755418
185      ],
186      "variable": {
187        "id": 1,
188        "name": "a",
189        "values": [
190          "0",
191          "1"
192        ]
193      }
194    },
195    "2": {
196      "probs": [
197        0.8015102481121898,
198        0.19848975188781015,
199        0.8068885448916409,
200        0.19311145510835914
201      ],
202      "variable": {
203        "id": 2,
204        "name": "b",
205        "values": [
206          "0",
207          "1"
208        ]
209      }
210    },
211    "3": {
212      "probs": [
213        0.6863538295577131,
214        0.31364617044228693,
215        0.6884674922600619,
216        0.31153250773993807
217      ],
218      "variable": {
219        "id": 3,
220        "name": "c",
221        "values": [
222          "0",
223          "1"
224        ]
225      }
226    },
227    "4": {
228      "probs": [
229        0.9704692556634305,
230        0.02953074433656958,
231        0.2921826625386997,
232        0.7078173374613003
233      ],
234      "variable": {
235        "id": 4,
236        "name": "d",
237        "values": [
238          "0",
239          "1"
240        ]
241      }
242    }
243  },
244  "edges": [
245    {
246      "pa": 0,
247      "ch": 1
248    },
249    {
250      "pa": 0,
251      "ch": 2
252    },
253    {
254      "pa": 0,
255      "ch": 3
256    },
257    {
258      "pa": 0,
259      "ch": 4
260    }
261  ]
262}
263
264darkstar, data
265----------
266{
267  "nodes": {
268    "e": {
269      "id": 0,
270      "values": {
271        "0": 0,
272        "1": 1
273      }
274    },
275    "a": {
276      "id": 1,
277      "values": {
278        "0": 0,
279        "1": 1
280      }
281    },
282    "b": {
283      "id": 2,
284      "values": {
285        "0": 0,
286        "1": 1
287      }
288    },
289    "c": {
290      "id": 3,
291      "values": {
292        "0": 0,
293        "1": 1
294      }
295    },
296    "d": {
297      "id": 4,
298      "values": {
299        "0": 0,
300        "1": 1
301      }
302    }
303  },
304  "edges": [
305    {
306      "parent": "e",
307      "child": "a"
308    },
309    {
310      "parent": "e",
311      "child": "b"
312    },
313    {
314      "parent": "e",
315      "child": "c"
316    },
317    {
318      "parent": "e",
319      "child": "d"
320    }
321  ],
322  "parameters": {
323    "e": [
324      {
325        "e": "0",
326        "__p__": 0.7416
327      },
328      {
329        "e": "1",
330        "__p__": 0.2584
331      }
332    ],
333    "a": [
334      {
335        "a": "0",
336        "e": "0",
337        "__p__": 0.18743257820927725
338      },
339      {
340        "a": "1",
341        "e": "0",
342        "__p__": 0.8125674217907227
343      },
344      {
345        "a": "0",
346        "e": "1",
347        "__p__": 0.1946594427244582
348      },
349      {
350        "a": "1",
351        "e": "1",
352        "__p__": 0.8053405572755418
353      }
354    ],
355    "b": [
356      {
357        "b": "0",
358        "e": "0",
359        "__p__": 0.8015102481121898
360      },
361      {
362        "b": "1",
363        "e": "0",
364        "__p__": 0.19848975188781015
365      },
366      {
367        "b": "0",
368        "e": "1",
369        "__p__": 0.8068885448916409
370      },
371      {
372        "b": "1",
373        "e": "1",
374        "__p__": 0.19311145510835914
375      }
376    ],
377    "c": [
378      {
379        "c": "0",
380        "e": "0",
381        "__p__": 0.6863538295577131
382      },
383      {
384        "c": "1",
385        "e": "0",
386        "__p__": 0.31364617044228693
387      },
388      {
389        "c": "0",
390        "e": "1",
391        "__p__": 0.6884674922600619
392      },
393      {
394        "c": "1",
395        "e": "1",
396        "__p__": 0.31153250773993807
397      }
398    ],
399    "d": [
400      {
401        "d": "0",
402        "e": "0",
403        "__p__": 0.9704692556634305
404      },
405      {
406        "d": "1",
407        "e": "0",
408        "__p__": 0.02953074433656958
409      },
410      {
411        "d": "0",
412        "e": "1",
413        "__p__": 0.2921826625386997
414      },
415      {
416        "d": "1",
417        "e": "1",
418        "__p__": 0.7078173374613003
419      }
420    ]
421  }
422}
423Batch [7732907e5b8843f98c5f6c2ccffbd85d] finished.
424metadata:
425  '@type': type.googleapis.com/google.cloud.dataproc.v1.BatchOperationMetadata
426  batch: projects/rocketvector/locations/us-central1/batches/7732907e5b8843f98c5f6c2ccffbd85d
427  batchUuid: 96fc6be5-bb7b-45cf-9123-669ff6fa1a05
428  createTime: '2023-06-08T08:17:28.049693Z'
429  description: Batch
430  operationType: BATCH
431name: projects/rocketvector/regions/us-central1/operations/61f78fed-c3c5-38b6-a0d8-1b492d3d210d

AWS, Spark Processing Job

Here is a demo of running a learning algorithm on AWS Sagemaker using Spark Processing Jobs. We can kick off the job by running a driver program on the command line.

1 python driver.py

The driver program driver.py looks like the following. Note that you should build a Docker image and push it to ECR. That Docker image should have all the APIs you need installed.

 1from sagemaker.spark.processing import PySparkProcessor
 2
 3job = PySparkProcessor(
 4    **{
 5        "role": "your_aws_role",
 6        "instance_type": "ml.c5.xlarge",
 7        "instance_count": 1,
 8        "base_job_name": "pyspark-bbn",
 9        "image_uri": "your_docker_image_uri",
10    }
11)
12
13job.run(
14    submit_app="learn.py",
15    arguments=[
16        "--input_bucket",
17        "your_input_bucket",
18        "--input_key",
19        "temp/data-from-structure.csv",
20        "--output_bucket",
21        "your_output_bucket",
22        "--output_key",
23        "temp/output/data-from-structure/bbn-naive.json",
24        "--clazz_var",
25        "your_clazz_variable",
26    ],
27)

The learning program learn.py looks like the following. This learning program simply learns a Naive bayes nodel.

  1import argparse
  2import json
  3import logging
  4import sys
  5from typing import List
  6
  7import boto3
  8from pybbn.graph.dag import Bbn
  9from pyspark.sql import SparkSession
 10
 11from pyspark_bbn.discrete.bbn import get_bbn
 12from pyspark_bbn.discrete.data import DiscreteData
 13from pyspark_bbn.discrete.plearn import ParamLearner
 14from pyspark_bbn.discrete.scblearn import Naive
 15
 16logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
 17spark = SparkSession.builder.appName("learn-naive").getOrCreate()
 18
 19
 20def parse_pargs(args: List[str]) -> argparse.Namespace:
 21    """Parse command-line arguments.
 22
 23    :param args: command-line arguments.
 24    :type args: List[str]
 25    :return: parsed arguments.
 26    :rtype: argparse.Namespace
 27    """
 28    parser = argparse.ArgumentParser()
 29    parser.add_argument("--input_bucket", type=str, required=True)
 30    parser.add_argument("--input_key", type=str, required=True)
 31    parser.add_argument("--output_bucket", type=str, required=True)
 32    parser.add_argument("--output_key", type=str, required=True)
 33    parser.add_argument("--clazz_var", type=str, default=None)
 34
 35    return parser.parse_args(args)
 36
 37
 38def upload(src: str, bucket: str, key: str) -> None:
 39    """Upload a file to Amazon S3.
 40
 41    :param src: local file path.
 42    :type src: str
 43    :param bucket: destination S3 bucket.
 44    :type bucket: str
 45    :param key: object key within the bucket.
 46    :type key: str
 47    :return: None
 48    :rtype: None
 49    """
 50    s3 = boto3.client("s3")
 51    response = s3.upload_file(src, bucket, key)
 52    logging.info(f"uploaded {src} to {bucket}/{key}")
 53    logging.info(f"response={response}")
 54
 55
 56if __name__ == "__main__":
 57    args = parse_pargs(sys.argv[1:])
 58
 59    logging.info("Job Starting")
 60
 61    logging.info("Parsed Arguments")
 62    logging.info(f"args={args}")
 63
 64    data_path = f"s3://{args.input_bucket}/{args.input_key}"
 65    logging.info(f"data_path={data_path}")
 66
 67    sdf = spark.read.option("header", "true").csv(data_path)
 68
 69    n_rows, n_cols = sdf.count(), len(sdf.columns)
 70    logging.info("Read Data")
 71    logging.info(f"data dimensions: rows={n_rows:,}, cols={n_cols:,}")
 72
 73    data = DiscreteData(sdf)
 74
 75    structure_learner = Naive(data, args.clazz_var)
 76    logging.info("Learned Structure")
 77    logging.info(f"structure learn type: {type(structure_learner)}")
 78
 79    g = structure_learner.get_network()
 80    logging.info(f"learned structure: nodes={len(g.nodes())}, edges={len(g.edges())}")
 81
 82    parameter_learner = ParamLearner(data, g)
 83    p = parameter_learner.get_params()
 84    logging.info("Learned Parameters")
 85    logging.info(f"learned parameters: {len(p)}")
 86
 87    bbn = get_bbn(g, p, data.get_profile())
 88    logging.info("Constructed BBN")
 89    logging.info(f"bbn: nodes={len(bbn.nodes)}, edges={len(bbn.edges)}")
 90
 91    j_data = json.dumps(Bbn.to_dict(bbn), indent=2)
 92    j_path = "/tmp/bbn.json"
 93    with open(j_path, "w") as f:
 94        f.write(j_data)
 95
 96    logging.info("Serialized BBN")
 97    logging.info(f"saved bbn to {j_path}")
 98    upload(j_path, args.output_bucket, args.output_key)
 99
100    spark.stop()
101
102    print("Finished")

Azure, Machine Learning

Here is a demo of running a Serverless Spark Standalone job in Azure Machine Learning. We can submit a job via the Azure CLI as follows.

1az ml job create \
2             -f learn-naive.yaml \
3             -g your_resource_group \
4             -w your_aml_workspace \
5             --subscription your_subscription_id

The YAML file learn-naive.yaml looks like the following.

 1$schema: http://azureml/sdk-2-0/SparkJob.json
 2type: spark
 3
 4code: ./src
 5entry:
 6  file: learn-naive.py
 7
 8py_files:
 9  - pybbn-3.2.3-py3.9.egg
10  - pysparkbbn-0.0.3-py3.9.egg
11
12conf:
13  spark.driver.cores: 1
14  spark.driver.memory: 2g
15  spark.executor.cores: 2
16  spark.executor.memory: 2g
17  spark.executor.instances: 2
18
19inputs:
20  input_data:
21    type: uri_file
22    path: abfss://your_container@your_storage_account.dfs.core.windows.net/input/data-binary.csv
23    mode: direct
24  clazz: "e"
25
26args: >-
27  --input_data ${{inputs.input_data}} --clazz ${{inputs.clazz}}
28
29identity:
30  type: user_identity
31
32resources:
33  instance_type: standard_e4s_v3
34  runtime_version: "3.2"

The Python program learn-naive.py looks like the following.

  1import argparse
  2import json
  3import sys
  4from typing import List
  5
  6from pybbn.pptc.inferencecontroller import InferenceController
  7from pyspark.sql import SparkSession
  8
  9from pyspark_bbn.discrete.bbn import get_bbn, get_darkstar_data, get_pybbn_data
 10from pyspark_bbn.discrete.data import DiscreteData
 11from pyspark_bbn.discrete.plearn import ParamLearner
 12from pyspark_bbn.discrete.scblearn import Naive
 13
 14
 15def parse_pargs(args: List[str]) -> argparse.Namespace:
 16    """Parse command-line arguments.
 17
 18    :param args: command-line arguments.
 19    :type args: List[str]
 20    :return: parsed arguments.
 21    :rtype: argparse.Namespace
 22    """
 23    parser = argparse.ArgumentParser()
 24    parser.add_argument("--input_data", type=str, required=True, help="Input CSV file")
 25    parser.add_argument("--clazz", type=str, required=False, help="Clazz variable")
 26
 27    return parser.parse_args(args)
 28
 29
 30def start(input_data: str, clazz: str) -> None:
 31    """Start the naive learning demo.
 32
 33    :param input_data: path to input CSV file.
 34    :type input_data: str
 35    :param clazz: class variable name.
 36    :type clazz: str
 37    :return: None
 38    :rtype: None
 39    """
 40    spark = SparkSession.builder.appName("learn-naive").getOrCreate()
 41
 42    sdf = spark.read.option("header", True).option("inferSchema", True).csv(input_data)
 43
 44    print("data schema")
 45    sdf.printSchema()
 46
 47    print("")
 48    print("data sample")
 49    sdf.show(10)
 50
 51    data = DiscreteData(sdf)
 52    naive = Naive(data, clazz)
 53    g = naive.get_network()
 54
 55    print("")
 56    print("nodes")
 57    print("-" * 10)
 58    for n in g.nodes():
 59        print(f"{n}")
 60
 61    print("")
 62    print("edges")
 63    print("-" * 10)
 64    for pa, ch in g.edges():
 65        print(f"{pa} -> {ch}")
 66
 67    param_learner = ParamLearner(data, g)
 68    p = param_learner.get_params()
 69
 70    print("")
 71    print("params")
 72    print("-" * 10)
 73    print(json.dumps(p, indent=2))
 74
 75    print("")
 76    print("py-bbn, posteriors")
 77    print("-" * 10)
 78    bbn = get_bbn(g, p, data.get_profile())
 79    join_tree = InferenceController.apply(bbn)
 80
 81    for node, posteriors in join_tree.get_posteriors().items():
 82        p_str = ", ".join([f"{val}={prob:.5f}" for val, prob in posteriors.items()])
 83        print(f"{node} : {p_str}")
 84
 85    print("")
 86    print("py-bbn, data")
 87    print("-" * 10)
 88    pybbn_data = get_pybbn_data(g, p, data.get_profile())
 89    print(json.dumps(pybbn_data, indent=2))
 90
 91    print("")
 92    print("darkstar, data")
 93    print("-" * 10)
 94    darkstar_data = get_darkstar_data(g, p, data.get_profile())
 95    print(json.dumps(darkstar_data, indent=2))
 96
 97
 98if __name__ == "__main__":
 99    args = parse_pargs(sys.argv[1:])
100
101    input_data = args.input_data
102    clazz = args.clazz
103    start(input_data, clazz)