Cloud
GCP, Dataproc
Here is a demo of running a learning algorithm on Google Cloud Platform’s Dataproc.
1 gcloud dataproc batches submit pyspark \
2 pyspark-demo.py \
3 --region=us-central1 \
4 --version=2.0 \
5 --deps-bucket=my-dataproc-deps-bucket \
6 --py-files=pybbn-3.2.3-py3.9.egg,pysparkbbn-0.0.3-py3.9.egg \
7 -- --input gs://my-gcs-folder/data/data-binary.csv
The driver code pyspark-demo.py
is as follows.
1import argparse
2import json
3import sys
4from typing import List
5
6from pybbn.pptc.inferencecontroller import InferenceController
7from pyspark.sql import SparkSession
8
9from pyspark_bbn.discrete.bbn import get_bbn, get_darkstar_data, get_pybbn_data
10from pyspark_bbn.discrete.data import DiscreteData
11from pyspark_bbn.discrete.plearn import ParamLearner
12from pyspark_bbn.discrete.scblearn import Naive
13
14
15def parse_pargs(args: List[str]) -> argparse.Namespace:
16 """Parse command-line arguments.
17
18 :param args: command-line arguments.
19 :type args: List[str]
20 :return: parsed arguments.
21 :rtype: argparse.Namespace
22 """
23 parser = argparse.ArgumentParser()
24 parser.add_argument("--input", type=str, required=True, help="Input CSV file")
25
26 return parser.parse_args(args)
27
28
29def start(input_path: str) -> None:
30 """Start the PySpark demo.
31
32 :param input_path: path to the input CSV file.
33 :type input_path: str
34 :return: None
35 :rtype: None
36 """
37 spark = SparkSession.builder.appName("learn-naive").getOrCreate()
38
39 sdf = spark.read.option("header", True).option("inferSchema", True).csv(input_path)
40
41 print("data schema")
42 sdf.printSchema()
43
44 print("")
45 print("data sample")
46 sdf.show(10)
47
48 data = DiscreteData(sdf)
49 naive = Naive(data, "e")
50 g = naive.get_network()
51
52 print("")
53 print("nodes")
54 print("-" * 10)
55 for n in g.nodes():
56 print(f"{n}")
57
58 print("")
59 print("edges")
60 print("-" * 10)
61 for pa, ch in g.edges():
62 print(f"{pa} -> {ch}")
63
64 param_learner = ParamLearner(data, g)
65 p = param_learner.get_params()
66
67 print("")
68 print("params")
69 print("-" * 10)
70 print(json.dumps(p, indent=2))
71
72 print("")
73 print("py-bbn, posteriors")
74 print("-" * 10)
75 bbn = get_bbn(g, p, data.get_profile())
76 join_tree = InferenceController.apply(bbn)
77
78 for node, posteriors in join_tree.get_posteriors().items():
79 p_str = ", ".join([f"{val}={prob:.5f}" for val, prob in posteriors.items()])
80 print(f"{node} : {p_str}")
81
82 print("")
83 print("py-bbn, data")
84 print("-" * 10)
85 pybbn_data = get_pybbn_data(g, p, data.get_profile())
86 print(json.dumps(pybbn_data, indent=2))
87
88 print("")
89 print("darkstar, data")
90 print("-" * 10)
91 darkstar_data = get_darkstar_data(g, p, data.get_profile())
92 print(json.dumps(darkstar_data, indent=2))
93
94
95if __name__ == "__main__":
96 args = parse_pargs(sys.argv[1:])
97
98 input_path = args.input
99 start(input_path)
The output should look something like the following.
1Batch [7732907e5b8843f98c5f6c2ccffbd85d] submitted.
2Using the default container image
3Waiting for container log creation
4PYSPARK_PYTHON=/opt/dataproc/conda/bin/python
5JAVA_HOME=/usr/lib/jvm/temurin-17-jdk-amd64
6SPARK_EXTRA_CLASSPATH=
7:: loading settings :: file = /etc/spark/conf/ivysettings.xml
8data schema
9root
10 |-- a: integer (nullable = true)
11 |-- b: integer (nullable = true)
12 |-- c: integer (nullable = true)
13 |-- d: integer (nullable = true)
14 |-- e: integer (nullable = true)
15
16
17data sample
18+---+---+---+---+---+
19| a| b| c| d| e|
20+---+---+---+---+---+
21| 1| 0| 0| 0| 0|
22| 1| 0| 0| 0| 0|
23| 1| 0| 0| 1| 1|
24| 0| 0| 0| 0| 1|
25| 0| 0| 0| 0| 0|
26| 1| 0| 0| 0| 1|
27| 1| 0| 0| 0| 0|
28| 1| 0| 0| 1| 1|
29| 0| 0| 0| 0| 1|
30| 1| 0| 0| 0| 0|
31+---+---+---+---+---+
32only showing top 10 rows
33
34
35nodes
36----------
37e
38a
39b
40c
41d
42
43edges
44----------
45e -> a
46e -> b
47e -> c
48e -> d
49
50params
51----------
52{
53 "e": [
54 {
55 "e": "0",
56 "__p__": 0.7416
57 },
58 {
59 "e": "1",
60 "__p__": 0.2584
61 }
62 ],
63 "a": [
64 {
65 "a": "0",
66 "e": "0",
67 "__p__": 0.18743257820927725
68 },
69 {
70 "a": "1",
71 "e": "0",
72 "__p__": 0.8125674217907227
73 },
74 {
75 "a": "0",
76 "e": "1",
77 "__p__": 0.1946594427244582
78 },
79 {
80 "a": "1",
81 "e": "1",
82 "__p__": 0.8053405572755418
83 }
84 ],
85 "b": [
86 {
87 "b": "0",
88 "e": "0",
89 "__p__": 0.8015102481121898
90 },
91 {
92 "b": "1",
93 "e": "0",
94 "__p__": 0.19848975188781015
95 },
96 {
97 "b": "0",
98 "e": "1",
99 "__p__": 0.8068885448916409
100 },
101 {
102 "b": "1",
103 "e": "1",
104 "__p__": 0.19311145510835914
105 }
106 ],
107 "c": [
108 {
109 "c": "0",
110 "e": "0",
111 "__p__": 0.6863538295577131
112 },
113 {
114 "c": "1",
115 "e": "0",
116 "__p__": 0.31364617044228693
117 },
118 {
119 "c": "0",
120 "e": "1",
121 "__p__": 0.6884674922600619
122 },
123 {
124 "c": "1",
125 "e": "1",
126 "__p__": 0.31153250773993807
127 }
128 ],
129 "d": [
130 {
131 "d": "0",
132 "e": "0",
133 "__p__": 0.9704692556634305
134 },
135 {
136 "d": "1",
137 "e": "0",
138 "__p__": 0.02953074433656958
139 },
140 {
141 "d": "0",
142 "e": "1",
143 "__p__": 0.2921826625386997
144 },
145 {
146 "d": "1",
147 "e": "1",
148 "__p__": 0.7078173374613003
149 }
150 ]
151}
152
153py-bbn, posteriors
154----------
155e : 0=0.74160, 1=0.25840
156a : 0=0.18930, 1=0.81070
157b : 0=0.80290, 1=0.19710
158c : 0=0.68690, 1=0.31310
159d : 0=0.79520, 1=0.20480
160
161py-bbn, data
162----------
163{
164 "nodes": {
165 "0": {
166 "probs": [
167 0.7416,
168 0.2584
169 ],
170 "variable": {
171 "id": 0,
172 "name": "e",
173 "values": [
174 "0",
175 "1"
176 ]
177 }
178 },
179 "1": {
180 "probs": [
181 0.18743257820927725,
182 0.8125674217907227,
183 0.1946594427244582,
184 0.8053405572755418
185 ],
186 "variable": {
187 "id": 1,
188 "name": "a",
189 "values": [
190 "0",
191 "1"
192 ]
193 }
194 },
195 "2": {
196 "probs": [
197 0.8015102481121898,
198 0.19848975188781015,
199 0.8068885448916409,
200 0.19311145510835914
201 ],
202 "variable": {
203 "id": 2,
204 "name": "b",
205 "values": [
206 "0",
207 "1"
208 ]
209 }
210 },
211 "3": {
212 "probs": [
213 0.6863538295577131,
214 0.31364617044228693,
215 0.6884674922600619,
216 0.31153250773993807
217 ],
218 "variable": {
219 "id": 3,
220 "name": "c",
221 "values": [
222 "0",
223 "1"
224 ]
225 }
226 },
227 "4": {
228 "probs": [
229 0.9704692556634305,
230 0.02953074433656958,
231 0.2921826625386997,
232 0.7078173374613003
233 ],
234 "variable": {
235 "id": 4,
236 "name": "d",
237 "values": [
238 "0",
239 "1"
240 ]
241 }
242 }
243 },
244 "edges": [
245 {
246 "pa": 0,
247 "ch": 1
248 },
249 {
250 "pa": 0,
251 "ch": 2
252 },
253 {
254 "pa": 0,
255 "ch": 3
256 },
257 {
258 "pa": 0,
259 "ch": 4
260 }
261 ]
262}
263
264darkstar, data
265----------
266{
267 "nodes": {
268 "e": {
269 "id": 0,
270 "values": {
271 "0": 0,
272 "1": 1
273 }
274 },
275 "a": {
276 "id": 1,
277 "values": {
278 "0": 0,
279 "1": 1
280 }
281 },
282 "b": {
283 "id": 2,
284 "values": {
285 "0": 0,
286 "1": 1
287 }
288 },
289 "c": {
290 "id": 3,
291 "values": {
292 "0": 0,
293 "1": 1
294 }
295 },
296 "d": {
297 "id": 4,
298 "values": {
299 "0": 0,
300 "1": 1
301 }
302 }
303 },
304 "edges": [
305 {
306 "parent": "e",
307 "child": "a"
308 },
309 {
310 "parent": "e",
311 "child": "b"
312 },
313 {
314 "parent": "e",
315 "child": "c"
316 },
317 {
318 "parent": "e",
319 "child": "d"
320 }
321 ],
322 "parameters": {
323 "e": [
324 {
325 "e": "0",
326 "__p__": 0.7416
327 },
328 {
329 "e": "1",
330 "__p__": 0.2584
331 }
332 ],
333 "a": [
334 {
335 "a": "0",
336 "e": "0",
337 "__p__": 0.18743257820927725
338 },
339 {
340 "a": "1",
341 "e": "0",
342 "__p__": 0.8125674217907227
343 },
344 {
345 "a": "0",
346 "e": "1",
347 "__p__": 0.1946594427244582
348 },
349 {
350 "a": "1",
351 "e": "1",
352 "__p__": 0.8053405572755418
353 }
354 ],
355 "b": [
356 {
357 "b": "0",
358 "e": "0",
359 "__p__": 0.8015102481121898
360 },
361 {
362 "b": "1",
363 "e": "0",
364 "__p__": 0.19848975188781015
365 },
366 {
367 "b": "0",
368 "e": "1",
369 "__p__": 0.8068885448916409
370 },
371 {
372 "b": "1",
373 "e": "1",
374 "__p__": 0.19311145510835914
375 }
376 ],
377 "c": [
378 {
379 "c": "0",
380 "e": "0",
381 "__p__": 0.6863538295577131
382 },
383 {
384 "c": "1",
385 "e": "0",
386 "__p__": 0.31364617044228693
387 },
388 {
389 "c": "0",
390 "e": "1",
391 "__p__": 0.6884674922600619
392 },
393 {
394 "c": "1",
395 "e": "1",
396 "__p__": 0.31153250773993807
397 }
398 ],
399 "d": [
400 {
401 "d": "0",
402 "e": "0",
403 "__p__": 0.9704692556634305
404 },
405 {
406 "d": "1",
407 "e": "0",
408 "__p__": 0.02953074433656958
409 },
410 {
411 "d": "0",
412 "e": "1",
413 "__p__": 0.2921826625386997
414 },
415 {
416 "d": "1",
417 "e": "1",
418 "__p__": 0.7078173374613003
419 }
420 ]
421 }
422}
423Batch [7732907e5b8843f98c5f6c2ccffbd85d] finished.
424metadata:
425 '@type': type.googleapis.com/google.cloud.dataproc.v1.BatchOperationMetadata
426 batch: projects/rocketvector/locations/us-central1/batches/7732907e5b8843f98c5f6c2ccffbd85d
427 batchUuid: 96fc6be5-bb7b-45cf-9123-669ff6fa1a05
428 createTime: '2023-06-08T08:17:28.049693Z'
429 description: Batch
430 operationType: BATCH
431name: projects/rocketvector/regions/us-central1/operations/61f78fed-c3c5-38b6-a0d8-1b492d3d210d
AWS, Spark Processing Job
Here is a demo of running a learning algorithm on AWS Sagemaker using Spark Processing Jobs. We can kick off the job by running a driver program on the command line.
1 python driver.py
The driver program driver.py
looks like the following. Note that you should build a Docker image and push it to ECR. That Docker image should have all the APIs you need installed.
1from sagemaker.spark.processing import PySparkProcessor
2
3job = PySparkProcessor(
4 **{
5 "role": "your_aws_role",
6 "instance_type": "ml.c5.xlarge",
7 "instance_count": 1,
8 "base_job_name": "pyspark-bbn",
9 "image_uri": "your_docker_image_uri",
10 }
11)
12
13job.run(
14 submit_app="learn.py",
15 arguments=[
16 "--input_bucket",
17 "your_input_bucket",
18 "--input_key",
19 "temp/data-from-structure.csv",
20 "--output_bucket",
21 "your_output_bucket",
22 "--output_key",
23 "temp/output/data-from-structure/bbn-naive.json",
24 "--clazz_var",
25 "your_clazz_variable",
26 ],
27)
The learning program learn.py
looks like the following. This learning program simply learns a Naive bayes nodel.
1import argparse
2import json
3import logging
4import sys
5from typing import List
6
7import boto3
8from pybbn.graph.dag import Bbn
9from pyspark.sql import SparkSession
10
11from pyspark_bbn.discrete.bbn import get_bbn
12from pyspark_bbn.discrete.data import DiscreteData
13from pyspark_bbn.discrete.plearn import ParamLearner
14from pyspark_bbn.discrete.scblearn import Naive
15
16logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
17spark = SparkSession.builder.appName("learn-naive").getOrCreate()
18
19
20def parse_pargs(args: List[str]) -> argparse.Namespace:
21 """Parse command-line arguments.
22
23 :param args: command-line arguments.
24 :type args: List[str]
25 :return: parsed arguments.
26 :rtype: argparse.Namespace
27 """
28 parser = argparse.ArgumentParser()
29 parser.add_argument("--input_bucket", type=str, required=True)
30 parser.add_argument("--input_key", type=str, required=True)
31 parser.add_argument("--output_bucket", type=str, required=True)
32 parser.add_argument("--output_key", type=str, required=True)
33 parser.add_argument("--clazz_var", type=str, default=None)
34
35 return parser.parse_args(args)
36
37
38def upload(src: str, bucket: str, key: str) -> None:
39 """Upload a file to Amazon S3.
40
41 :param src: local file path.
42 :type src: str
43 :param bucket: destination S3 bucket.
44 :type bucket: str
45 :param key: object key within the bucket.
46 :type key: str
47 :return: None
48 :rtype: None
49 """
50 s3 = boto3.client("s3")
51 response = s3.upload_file(src, bucket, key)
52 logging.info(f"uploaded {src} to {bucket}/{key}")
53 logging.info(f"response={response}")
54
55
56if __name__ == "__main__":
57 args = parse_pargs(sys.argv[1:])
58
59 logging.info("Job Starting")
60
61 logging.info("Parsed Arguments")
62 logging.info(f"args={args}")
63
64 data_path = f"s3://{args.input_bucket}/{args.input_key}"
65 logging.info(f"data_path={data_path}")
66
67 sdf = spark.read.option("header", "true").csv(data_path)
68
69 n_rows, n_cols = sdf.count(), len(sdf.columns)
70 logging.info("Read Data")
71 logging.info(f"data dimensions: rows={n_rows:,}, cols={n_cols:,}")
72
73 data = DiscreteData(sdf)
74
75 structure_learner = Naive(data, args.clazz_var)
76 logging.info("Learned Structure")
77 logging.info(f"structure learn type: {type(structure_learner)}")
78
79 g = structure_learner.get_network()
80 logging.info(f"learned structure: nodes={len(g.nodes())}, edges={len(g.edges())}")
81
82 parameter_learner = ParamLearner(data, g)
83 p = parameter_learner.get_params()
84 logging.info("Learned Parameters")
85 logging.info(f"learned parameters: {len(p)}")
86
87 bbn = get_bbn(g, p, data.get_profile())
88 logging.info("Constructed BBN")
89 logging.info(f"bbn: nodes={len(bbn.nodes)}, edges={len(bbn.edges)}")
90
91 j_data = json.dumps(Bbn.to_dict(bbn), indent=2)
92 j_path = "/tmp/bbn.json"
93 with open(j_path, "w") as f:
94 f.write(j_data)
95
96 logging.info("Serialized BBN")
97 logging.info(f"saved bbn to {j_path}")
98 upload(j_path, args.output_bucket, args.output_key)
99
100 spark.stop()
101
102 print("Finished")
Azure, Machine Learning
Here is a demo of running a Serverless Spark Standalone job in Azure Machine Learning. We can submit a job via the Azure CLI as follows.
1az ml job create \
2 -f learn-naive.yaml \
3 -g your_resource_group \
4 -w your_aml_workspace \
5 --subscription your_subscription_id
The YAML file learn-naive.yaml
looks like the following.
1$schema: http://azureml/sdk-2-0/SparkJob.json
2type: spark
3
4code: ./src
5entry:
6 file: learn-naive.py
7
8py_files:
9 - pybbn-3.2.3-py3.9.egg
10 - pysparkbbn-0.0.3-py3.9.egg
11
12conf:
13 spark.driver.cores: 1
14 spark.driver.memory: 2g
15 spark.executor.cores: 2
16 spark.executor.memory: 2g
17 spark.executor.instances: 2
18
19inputs:
20 input_data:
21 type: uri_file
22 path: abfss://your_container@your_storage_account.dfs.core.windows.net/input/data-binary.csv
23 mode: direct
24 clazz: "e"
25
26args: >-
27 --input_data ${{inputs.input_data}} --clazz ${{inputs.clazz}}
28
29identity:
30 type: user_identity
31
32resources:
33 instance_type: standard_e4s_v3
34 runtime_version: "3.2"
The Python program learn-naive.py
looks like the following.
1import argparse
2import json
3import sys
4from typing import List
5
6from pybbn.pptc.inferencecontroller import InferenceController
7from pyspark.sql import SparkSession
8
9from pyspark_bbn.discrete.bbn import get_bbn, get_darkstar_data, get_pybbn_data
10from pyspark_bbn.discrete.data import DiscreteData
11from pyspark_bbn.discrete.plearn import ParamLearner
12from pyspark_bbn.discrete.scblearn import Naive
13
14
15def parse_pargs(args: List[str]) -> argparse.Namespace:
16 """Parse command-line arguments.
17
18 :param args: command-line arguments.
19 :type args: List[str]
20 :return: parsed arguments.
21 :rtype: argparse.Namespace
22 """
23 parser = argparse.ArgumentParser()
24 parser.add_argument("--input_data", type=str, required=True, help="Input CSV file")
25 parser.add_argument("--clazz", type=str, required=False, help="Clazz variable")
26
27 return parser.parse_args(args)
28
29
30def start(input_data: str, clazz: str) -> None:
31 """Start the naive learning demo.
32
33 :param input_data: path to input CSV file.
34 :type input_data: str
35 :param clazz: class variable name.
36 :type clazz: str
37 :return: None
38 :rtype: None
39 """
40 spark = SparkSession.builder.appName("learn-naive").getOrCreate()
41
42 sdf = spark.read.option("header", True).option("inferSchema", True).csv(input_data)
43
44 print("data schema")
45 sdf.printSchema()
46
47 print("")
48 print("data sample")
49 sdf.show(10)
50
51 data = DiscreteData(sdf)
52 naive = Naive(data, clazz)
53 g = naive.get_network()
54
55 print("")
56 print("nodes")
57 print("-" * 10)
58 for n in g.nodes():
59 print(f"{n}")
60
61 print("")
62 print("edges")
63 print("-" * 10)
64 for pa, ch in g.edges():
65 print(f"{pa} -> {ch}")
66
67 param_learner = ParamLearner(data, g)
68 p = param_learner.get_params()
69
70 print("")
71 print("params")
72 print("-" * 10)
73 print(json.dumps(p, indent=2))
74
75 print("")
76 print("py-bbn, posteriors")
77 print("-" * 10)
78 bbn = get_bbn(g, p, data.get_profile())
79 join_tree = InferenceController.apply(bbn)
80
81 for node, posteriors in join_tree.get_posteriors().items():
82 p_str = ", ".join([f"{val}={prob:.5f}" for val, prob in posteriors.items()])
83 print(f"{node} : {p_str}")
84
85 print("")
86 print("py-bbn, data")
87 print("-" * 10)
88 pybbn_data = get_pybbn_data(g, p, data.get_profile())
89 print(json.dumps(pybbn_data, indent=2))
90
91 print("")
92 print("darkstar, data")
93 print("-" * 10)
94 darkstar_data = get_darkstar_data(g, p, data.get_profile())
95 print(json.dumps(darkstar_data, indent=2))
96
97
98if __name__ == "__main__":
99 args = parse_pargs(sys.argv[1:])
100
101 input_data = args.input_data
102 clazz = args.clazz
103 start(input_data, clazz)