Cloud
GCP, Dataproc
Here is a demo of running a learning algorithm on Google Cloud Platform’s Dataproc.
1 gcloud dataproc batches submit pyspark \
2 pyspark-demo.py \
3 --region=us-central1 \
4 --version=2.0 \
5 --deps-bucket=my-dataproc-deps-bucket \
6 --py-files=pybbn-3.2.3-py3.9.egg,pysparkbbn-0.0.3-py3.9.egg \
7 -- --input gs://my-gcs-folder/data/data-binary.csv
The driver code pyspark-demo.py
is as follows.
1from typing import List
2from pyspark.sql import SparkSession
3import argparse
4import sys
5import json
6
7from pybbn.pptc.inferencecontroller import InferenceController
8from pysparkbbn.discrete.bbn import get_bbn, get_darkstar_data, get_pybbn_data
9from pysparkbbn.discrete.data import DiscreteData
10from pysparkbbn.discrete.scblearn import Naive
11from pysparkbbn.discrete.plearn import ParamLearner
12
13
14def parse_pargs(args: List[str]) -> argparse.Namespace:
15 parser = argparse.ArgumentParser()
16 parser.add_argument('--input', type=str, required=True,
17 help='Input CSV file')
18
19 return parser.parse_args(args)
20
21
22def start(input_path: str):
23 spark = SparkSession \
24 .builder \
25 .appName('learn-naive') \
26 .getOrCreate()
27
28 sdf = spark.read \
29 .option('header', True) \
30 .option('inferSchema', True) \
31 .csv(input_path)
32
33 print('data schema')
34 sdf.printSchema()
35
36 print('')
37 print('data sample')
38 sdf.show(10)
39
40 data = DiscreteData(sdf)
41 naive = Naive(data, 'e')
42 g = naive.get_network()
43
44 print('')
45 print('nodes')
46 print('-' * 10)
47 for n in g.nodes():
48 print(f'{n}')
49
50 print('')
51 print('edges')
52 print('-' * 10)
53 for pa, ch in g.edges():
54 print(f'{pa} -> {ch}')
55
56 param_learner = ParamLearner(data, g)
57 p = param_learner.get_params()
58
59 print('')
60 print('params')
61 print('-' * 10)
62 print(json.dumps(p, indent=2))
63
64 print('')
65 print('py-bbn, posteriors')
66 print('-' * 10)
67 bbn = get_bbn(g, p, data.get_profile())
68 join_tree = InferenceController.apply(bbn)
69
70 for node, posteriors in join_tree.get_posteriors().items():
71 p_str = ', '.join([f'{val}={prob:.5f}' for val, prob in posteriors.items()])
72 print(f'{node} : {p_str}')
73
74 print('')
75 print('py-bbn, data')
76 print('-' * 10)
77 pybbn_data = get_pybbn_data(g, p, data.get_profile())
78 print(json.dumps(pybbn_data, indent=2))
79
80 print('')
81 print('darkstar, data')
82 print('-' * 10)
83 darkstar_data = get_darkstar_data(g, p, data.get_profile())
84 print(json.dumps(darkstar_data, indent=2))
85
86
87if __name__ == '__main__':
88 args = parse_pargs(sys.argv[1:])
89
90 input_path = args.input
91 start(input_path)
The output should look something like the following.
1Batch [7732907e5b8843f98c5f6c2ccffbd85d] submitted.
2Using the default container image
3Waiting for container log creation
4PYSPARK_PYTHON=/opt/dataproc/conda/bin/python
5JAVA_HOME=/usr/lib/jvm/temurin-17-jdk-amd64
6SPARK_EXTRA_CLASSPATH=
7:: loading settings :: file = /etc/spark/conf/ivysettings.xml
8data schema
9root
10 |-- a: integer (nullable = true)
11 |-- b: integer (nullable = true)
12 |-- c: integer (nullable = true)
13 |-- d: integer (nullable = true)
14 |-- e: integer (nullable = true)
15
16
17data sample
18+---+---+---+---+---+
19| a| b| c| d| e|
20+---+---+---+---+---+
21| 1| 0| 0| 0| 0|
22| 1| 0| 0| 0| 0|
23| 1| 0| 0| 1| 1|
24| 0| 0| 0| 0| 1|
25| 0| 0| 0| 0| 0|
26| 1| 0| 0| 0| 1|
27| 1| 0| 0| 0| 0|
28| 1| 0| 0| 1| 1|
29| 0| 0| 0| 0| 1|
30| 1| 0| 0| 0| 0|
31+---+---+---+---+---+
32only showing top 10 rows
33
34
35nodes
36----------
37e
38a
39b
40c
41d
42
43edges
44----------
45e -> a
46e -> b
47e -> c
48e -> d
49
50params
51----------
52{
53 "e": [
54 {
55 "e": "0",
56 "__p__": 0.7416
57 },
58 {
59 "e": "1",
60 "__p__": 0.2584
61 }
62 ],
63 "a": [
64 {
65 "a": "0",
66 "e": "0",
67 "__p__": 0.18743257820927725
68 },
69 {
70 "a": "1",
71 "e": "0",
72 "__p__": 0.8125674217907227
73 },
74 {
75 "a": "0",
76 "e": "1",
77 "__p__": 0.1946594427244582
78 },
79 {
80 "a": "1",
81 "e": "1",
82 "__p__": 0.8053405572755418
83 }
84 ],
85 "b": [
86 {
87 "b": "0",
88 "e": "0",
89 "__p__": 0.8015102481121898
90 },
91 {
92 "b": "1",
93 "e": "0",
94 "__p__": 0.19848975188781015
95 },
96 {
97 "b": "0",
98 "e": "1",
99 "__p__": 0.8068885448916409
100 },
101 {
102 "b": "1",
103 "e": "1",
104 "__p__": 0.19311145510835914
105 }
106 ],
107 "c": [
108 {
109 "c": "0",
110 "e": "0",
111 "__p__": 0.6863538295577131
112 },
113 {
114 "c": "1",
115 "e": "0",
116 "__p__": 0.31364617044228693
117 },
118 {
119 "c": "0",
120 "e": "1",
121 "__p__": 0.6884674922600619
122 },
123 {
124 "c": "1",
125 "e": "1",
126 "__p__": 0.31153250773993807
127 }
128 ],
129 "d": [
130 {
131 "d": "0",
132 "e": "0",
133 "__p__": 0.9704692556634305
134 },
135 {
136 "d": "1",
137 "e": "0",
138 "__p__": 0.02953074433656958
139 },
140 {
141 "d": "0",
142 "e": "1",
143 "__p__": 0.2921826625386997
144 },
145 {
146 "d": "1",
147 "e": "1",
148 "__p__": 0.7078173374613003
149 }
150 ]
151}
152
153py-bbn, posteriors
154----------
155e : 0=0.74160, 1=0.25840
156a : 0=0.18930, 1=0.81070
157b : 0=0.80290, 1=0.19710
158c : 0=0.68690, 1=0.31310
159d : 0=0.79520, 1=0.20480
160
161py-bbn, data
162----------
163{
164 "nodes": {
165 "0": {
166 "probs": [
167 0.7416,
168 0.2584
169 ],
170 "variable": {
171 "id": 0,
172 "name": "e",
173 "values": [
174 "0",
175 "1"
176 ]
177 }
178 },
179 "1": {
180 "probs": [
181 0.18743257820927725,
182 0.8125674217907227,
183 0.1946594427244582,
184 0.8053405572755418
185 ],
186 "variable": {
187 "id": 1,
188 "name": "a",
189 "values": [
190 "0",
191 "1"
192 ]
193 }
194 },
195 "2": {
196 "probs": [
197 0.8015102481121898,
198 0.19848975188781015,
199 0.8068885448916409,
200 0.19311145510835914
201 ],
202 "variable": {
203 "id": 2,
204 "name": "b",
205 "values": [
206 "0",
207 "1"
208 ]
209 }
210 },
211 "3": {
212 "probs": [
213 0.6863538295577131,
214 0.31364617044228693,
215 0.6884674922600619,
216 0.31153250773993807
217 ],
218 "variable": {
219 "id": 3,
220 "name": "c",
221 "values": [
222 "0",
223 "1"
224 ]
225 }
226 },
227 "4": {
228 "probs": [
229 0.9704692556634305,
230 0.02953074433656958,
231 0.2921826625386997,
232 0.7078173374613003
233 ],
234 "variable": {
235 "id": 4,
236 "name": "d",
237 "values": [
238 "0",
239 "1"
240 ]
241 }
242 }
243 },
244 "edges": [
245 {
246 "pa": 0,
247 "ch": 1
248 },
249 {
250 "pa": 0,
251 "ch": 2
252 },
253 {
254 "pa": 0,
255 "ch": 3
256 },
257 {
258 "pa": 0,
259 "ch": 4
260 }
261 ]
262}
263
264darkstar, data
265----------
266{
267 "nodes": {
268 "e": {
269 "id": 0,
270 "values": {
271 "0": 0,
272 "1": 1
273 }
274 },
275 "a": {
276 "id": 1,
277 "values": {
278 "0": 0,
279 "1": 1
280 }
281 },
282 "b": {
283 "id": 2,
284 "values": {
285 "0": 0,
286 "1": 1
287 }
288 },
289 "c": {
290 "id": 3,
291 "values": {
292 "0": 0,
293 "1": 1
294 }
295 },
296 "d": {
297 "id": 4,
298 "values": {
299 "0": 0,
300 "1": 1
301 }
302 }
303 },
304 "edges": [
305 {
306 "parent": "e",
307 "child": "a"
308 },
309 {
310 "parent": "e",
311 "child": "b"
312 },
313 {
314 "parent": "e",
315 "child": "c"
316 },
317 {
318 "parent": "e",
319 "child": "d"
320 }
321 ],
322 "parameters": {
323 "e": [
324 {
325 "e": "0",
326 "__p__": 0.7416
327 },
328 {
329 "e": "1",
330 "__p__": 0.2584
331 }
332 ],
333 "a": [
334 {
335 "a": "0",
336 "e": "0",
337 "__p__": 0.18743257820927725
338 },
339 {
340 "a": "1",
341 "e": "0",
342 "__p__": 0.8125674217907227
343 },
344 {
345 "a": "0",
346 "e": "1",
347 "__p__": 0.1946594427244582
348 },
349 {
350 "a": "1",
351 "e": "1",
352 "__p__": 0.8053405572755418
353 }
354 ],
355 "b": [
356 {
357 "b": "0",
358 "e": "0",
359 "__p__": 0.8015102481121898
360 },
361 {
362 "b": "1",
363 "e": "0",
364 "__p__": 0.19848975188781015
365 },
366 {
367 "b": "0",
368 "e": "1",
369 "__p__": 0.8068885448916409
370 },
371 {
372 "b": "1",
373 "e": "1",
374 "__p__": 0.19311145510835914
375 }
376 ],
377 "c": [
378 {
379 "c": "0",
380 "e": "0",
381 "__p__": 0.6863538295577131
382 },
383 {
384 "c": "1",
385 "e": "0",
386 "__p__": 0.31364617044228693
387 },
388 {
389 "c": "0",
390 "e": "1",
391 "__p__": 0.6884674922600619
392 },
393 {
394 "c": "1",
395 "e": "1",
396 "__p__": 0.31153250773993807
397 }
398 ],
399 "d": [
400 {
401 "d": "0",
402 "e": "0",
403 "__p__": 0.9704692556634305
404 },
405 {
406 "d": "1",
407 "e": "0",
408 "__p__": 0.02953074433656958
409 },
410 {
411 "d": "0",
412 "e": "1",
413 "__p__": 0.2921826625386997
414 },
415 {
416 "d": "1",
417 "e": "1",
418 "__p__": 0.7078173374613003
419 }
420 ]
421 }
422}
423Batch [7732907e5b8843f98c5f6c2ccffbd85d] finished.
424metadata:
425 '@type': type.googleapis.com/google.cloud.dataproc.v1.BatchOperationMetadata
426 batch: projects/rocketvector/locations/us-central1/batches/7732907e5b8843f98c5f6c2ccffbd85d
427 batchUuid: 96fc6be5-bb7b-45cf-9123-669ff6fa1a05
428 createTime: '2023-06-08T08:17:28.049693Z'
429 description: Batch
430 operationType: BATCH
431name: projects/rocketvector/regions/us-central1/operations/61f78fed-c3c5-38b6-a0d8-1b492d3d210d
AWS, Spark Processing Job
Here is a demo of running a learning algorithm on AWS Sagemaker using Spark Processing Jobs. We can kick off the job by running a driver program on the command line.
1 python driver.py
The driver program driver.py
looks like the following. Note that you should build a Docker image and push it to ECR. That Docker image should have all the APIs you need installed.
1from sagemaker.spark.processing import PySparkProcessor
2
3job = PySparkProcessor(**{
4 'role': 'your_aws_role',
5 'instance_type': 'ml.c5.xlarge',
6 'instance_count': 1,
7 'base_job_name': 'pyspark-bbn',
8 'image_uri': 'your_docker_image_uri'
9})
10
11job.run(
12 submit_app='learn.py',
13 arguments=[
14 '--input_bucket', 'your_input_bucket',
15 '--input_key', 'temp/data-from-structure.csv',
16 '--output_bucket', 'your_output_bucket',
17 '--output_key', 'temp/output/data-from-structure/bbn-naive.json',
18 '--clazz_var', 'your_clazz_variable'
19 ]
20)
The learning program learn.py
looks like the following. This learning program simply learns a Naive bayes nodel.
1import argparse
2import json
3import logging
4import sys
5from typing import List
6
7import boto3
8from pybbn.graph.dag import Bbn
9from pyspark.sql import SparkSession
10
11from pysparkbbn.discrete.bbn import get_bbn
12from pysparkbbn.discrete.data import DiscreteData
13from pysparkbbn.discrete.plearn import ParamLearner
14from pysparkbbn.discrete.scblearn import Naive
15
16logging.basicConfig(level=logging.INFO, format='%(asctime)s %(levelname)s %(message)s')
17spark = SparkSession.builder.appName('learn-naive').getOrCreate()
18
19
20def parse_pargs(args: List[str]) -> argparse.Namespace:
21 parser = argparse.ArgumentParser()
22 parser.add_argument('--input_bucket', type=str, required=True)
23 parser.add_argument('--input_key', type=str, required=True)
24 parser.add_argument('--output_bucket', type=str, required=True)
25 parser.add_argument('--output_key', type=str, required=True)
26 parser.add_argument('--clazz_var', type=str, default=None)
27
28 return parser.parse_args(args)
29
30
31def upload(src: str, bucket: str, key: str):
32 s3 = boto3.client('s3')
33 response = s3.upload_file(src, bucket, key)
34 logging.info(f'uploaded {src} to {bucket}/{key}')
35 logging.info(f'response={response}')
36
37
38if __name__ == '__main__':
39 args = parse_pargs(sys.argv[1:])
40
41 logging.info('Job Starting')
42
43 logging.info('Parsed Arguments')
44 logging.info(f'args={args}')
45
46 data_path = f's3://{args.input_bucket}/{args.input_key}'
47 logging.info(f'data_path={data_path}')
48
49 sdf = spark \
50 .read \
51 .option('header', 'true') \
52 .csv(data_path)
53
54 n_rows, n_cols = sdf.count(), len(sdf.columns)
55 logging.info('Read Data')
56 logging.info(f'data dimensions: rows={n_rows:,}, cols={n_cols:,}')
57
58 data = DiscreteData(sdf)
59
60 structure_learner = Naive(data, args.clazz_var)
61 logging.info('Learned Structure')
62 logging.info(f'structure learn type: {type(structure_learner)}')
63
64 g = structure_learner.get_network()
65 logging.info(f'learned structure: nodes={len(g.nodes())}, edges={len(g.edges())}')
66
67 parameter_learner = ParamLearner(data, g)
68 p = parameter_learner.get_params()
69 logging.info('Learned Parameters')
70 logging.info(f'learned parameters: {len(p)}')
71
72 bbn = get_bbn(g, p, data.get_profile())
73 logging.info('Constructed BBN')
74 logging.info(f'bbn: nodes={len(bbn.nodes)}, edges={len(bbn.edges)}')
75
76 j_data = json.dumps(Bbn.to_dict(bbn), indent=2)
77 j_path = '/tmp/bbn.json'
78 with open(j_path, 'w') as f:
79 f.write(j_data)
80
81 logging.info('Serialized BBN')
82 logging.info(f'saved bbn to {j_path}')
83 upload(j_path, args.output_bucket, args.output_key)
84
85 spark.stop()
86
87 print('Finished')
Azure, Machine Learning
Here is a demo of running a Serverless Spark Standalone job in Azure Machine Learning. We can submit a job via the Azure CLI as follows.
1az ml job create \
2 -f learn-naive.yaml \
3 -g your_resource_group \
4 -w your_aml_workspace \
5 --subscription your_subscription_id
The YAML file learn-naive.yaml
looks like the following.
1$schema: http://azureml/sdk-2-0/SparkJob.json
2type: spark
3
4code: ./src
5entry:
6 file: learn-naive.py
7
8py_files:
9 - pybbn-3.2.3-py3.9.egg
10 - pysparkbbn-0.0.3-py3.9.egg
11
12conf:
13 spark.driver.cores: 1
14 spark.driver.memory: 2g
15 spark.executor.cores: 2
16 spark.executor.memory: 2g
17 spark.executor.instances: 2
18
19inputs:
20 input_data:
21 type: uri_file
22 path: abfss://your_container@your_storage_account.dfs.core.windows.net/input/data-binary.csv
23 mode: direct
24 clazz: "e"
25
26args: >-
27 --input_data ${{inputs.input_data}} --clazz ${{inputs.clazz}}
28
29identity:
30 type: user_identity
31
32resources:
33 instance_type: standard_e4s_v3
34 runtime_version: "3.2"
The Python program learn-naive.py
looks like the following.
1import argparse
2import json
3import sys
4from typing import List
5
6from pybbn.pptc.inferencecontroller import InferenceController
7from pyspark.sql import SparkSession
8
9from pysparkbbn.discrete.bbn import get_bbn, get_darkstar_data, get_pybbn_data
10from pysparkbbn.discrete.data import DiscreteData
11from pysparkbbn.discrete.plearn import ParamLearner
12from pysparkbbn.discrete.scblearn import Naive
13
14
15def parse_pargs(args: List[str]) -> argparse.Namespace:
16 parser = argparse.ArgumentParser()
17 parser.add_argument('--input_data', type=str, required=True, help='Input CSV file')
18 parser.add_argument('--clazz', type=str, required=False, help='Clazz variable')
19
20 return parser.parse_args(args)
21
22
23def start(input_data: str, clazz: str):
24 spark = SparkSession \
25 .builder \
26 .appName('learn-naive') \
27 .getOrCreate()
28
29 sdf = spark.read \
30 .option('header', True) \
31 .option('inferSchema', True) \
32 .csv(input_data)
33
34 print('data schema')
35 sdf.printSchema()
36
37 print('')
38 print('data sample')
39 sdf.show(10)
40
41 data = DiscreteData(sdf)
42 naive = Naive(data, clazz)
43 g = naive.get_network()
44
45 print('')
46 print('nodes')
47 print('-' * 10)
48 for n in g.nodes():
49 print(f'{n}')
50
51 print('')
52 print('edges')
53 print('-' * 10)
54 for pa, ch in g.edges():
55 print(f'{pa} -> {ch}')
56
57 param_learner = ParamLearner(data, g)
58 p = param_learner.get_params()
59
60 print('')
61 print('params')
62 print('-' * 10)
63 print(json.dumps(p, indent=2))
64
65 print('')
66 print('py-bbn, posteriors')
67 print('-' * 10)
68 bbn = get_bbn(g, p, data.get_profile())
69 join_tree = InferenceController.apply(bbn)
70
71 for node, posteriors in join_tree.get_posteriors().items():
72 p_str = ', '.join([f'{val}={prob:.5f}' for val, prob in posteriors.items()])
73 print(f'{node} : {p_str}')
74
75 print('')
76 print('py-bbn, data')
77 print('-' * 10)
78 pybbn_data = get_pybbn_data(g, p, data.get_profile())
79 print(json.dumps(pybbn_data, indent=2))
80
81 print('')
82 print('darkstar, data')
83 print('-' * 10)
84 darkstar_data = get_darkstar_data(g, p, data.get_profile())
85 print(json.dumps(darkstar_data, indent=2))
86
87
88if __name__ == '__main__':
89 args = parse_pargs(sys.argv[1:])
90
91 input_data = args.input_data
92 clazz = args.clazz
93 start(input_data, clazz)