Cloud

GCP, Dataproc

Here is a demo of running a learning algorithm on Google Cloud Platform’s Dataproc.

1 gcloud dataproc batches submit pyspark \
2     pyspark-demo.py \
3     --region=us-central1 \
4     --version=2.0 \
5     --deps-bucket=my-dataproc-deps-bucket \
6     --py-files=pybbn-3.2.3-py3.9.egg,pysparkbbn-0.0.3-py3.9.egg \
7     -- --input gs://my-gcs-folder/data/data-binary.csv

The driver code pyspark-demo.py is as follows.

 1from typing import List
 2from pyspark.sql import SparkSession
 3import argparse
 4import sys
 5import json
 6
 7from pybbn.pptc.inferencecontroller import InferenceController
 8from pysparkbbn.discrete.bbn import get_bbn, get_darkstar_data, get_pybbn_data
 9from pysparkbbn.discrete.data import DiscreteData
10from pysparkbbn.discrete.scblearn import Naive
11from pysparkbbn.discrete.plearn import ParamLearner
12
13
14def parse_pargs(args: List[str]) -> argparse.Namespace:
15    parser = argparse.ArgumentParser()
16    parser.add_argument('--input', type=str, required=True,
17                        help='Input CSV file')
18
19    return parser.parse_args(args)
20
21
22def start(input_path: str):
23    spark = SparkSession \
24        .builder \
25        .appName('learn-naive') \
26        .getOrCreate()
27
28    sdf = spark.read \
29        .option('header', True) \
30        .option('inferSchema', True) \
31        .csv(input_path)
32
33    print('data schema')
34    sdf.printSchema()
35
36    print('')
37    print('data sample')
38    sdf.show(10)
39
40    data = DiscreteData(sdf)
41    naive = Naive(data, 'e')
42    g = naive.get_network()
43
44    print('')
45    print('nodes')
46    print('-' * 10)
47    for n in g.nodes():
48        print(f'{n}')
49
50    print('')
51    print('edges')
52    print('-' * 10)
53    for pa, ch in g.edges():
54        print(f'{pa} -> {ch}')
55
56    param_learner = ParamLearner(data, g)
57    p = param_learner.get_params()
58
59    print('')
60    print('params')
61    print('-' * 10)
62    print(json.dumps(p, indent=2))
63
64    print('')
65    print('py-bbn, posteriors')
66    print('-' * 10)
67    bbn = get_bbn(g, p, data.get_profile())
68    join_tree = InferenceController.apply(bbn)
69
70    for node, posteriors in join_tree.get_posteriors().items():
71        p_str = ', '.join([f'{val}={prob:.5f}' for val, prob in posteriors.items()])
72        print(f'{node} : {p_str}')
73
74    print('')
75    print('py-bbn, data')
76    print('-' * 10)
77    pybbn_data = get_pybbn_data(g, p, data.get_profile())
78    print(json.dumps(pybbn_data, indent=2))
79
80    print('')
81    print('darkstar, data')
82    print('-' * 10)
83    darkstar_data = get_darkstar_data(g, p, data.get_profile())
84    print(json.dumps(darkstar_data, indent=2))
85
86
87if __name__ == '__main__':
88    args = parse_pargs(sys.argv[1:])
89
90    input_path = args.input
91    start(input_path)

The output should look something like the following.

  1Batch [7732907e5b8843f98c5f6c2ccffbd85d] submitted.
  2Using the default container image
  3Waiting for container log creation
  4PYSPARK_PYTHON=/opt/dataproc/conda/bin/python
  5JAVA_HOME=/usr/lib/jvm/temurin-17-jdk-amd64
  6SPARK_EXTRA_CLASSPATH=
  7:: loading settings :: file = /etc/spark/conf/ivysettings.xml
  8data schema
  9root
 10 |-- a: integer (nullable = true)
 11 |-- b: integer (nullable = true)
 12 |-- c: integer (nullable = true)
 13 |-- d: integer (nullable = true)
 14 |-- e: integer (nullable = true)
 15
 16
 17data sample
 18+---+---+---+---+---+
 19|  a|  b|  c|  d|  e|
 20+---+---+---+---+---+
 21|  1|  0|  0|  0|  0|
 22|  1|  0|  0|  0|  0|
 23|  1|  0|  0|  1|  1|
 24|  0|  0|  0|  0|  1|
 25|  0|  0|  0|  0|  0|
 26|  1|  0|  0|  0|  1|
 27|  1|  0|  0|  0|  0|
 28|  1|  0|  0|  1|  1|
 29|  0|  0|  0|  0|  1|
 30|  1|  0|  0|  0|  0|
 31+---+---+---+---+---+
 32only showing top 10 rows
 33
 34
 35nodes
 36----------
 37e
 38a
 39b
 40c
 41d
 42
 43edges
 44----------
 45e -> a
 46e -> b
 47e -> c
 48e -> d
 49
 50params
 51----------
 52{
 53  "e": [
 54    {
 55      "e": "0",
 56      "__p__": 0.7416
 57    },
 58    {
 59      "e": "1",
 60      "__p__": 0.2584
 61    }
 62  ],
 63  "a": [
 64    {
 65      "a": "0",
 66      "e": "0",
 67      "__p__": 0.18743257820927725
 68    },
 69    {
 70      "a": "1",
 71      "e": "0",
 72      "__p__": 0.8125674217907227
 73    },
 74    {
 75      "a": "0",
 76      "e": "1",
 77      "__p__": 0.1946594427244582
 78    },
 79    {
 80      "a": "1",
 81      "e": "1",
 82      "__p__": 0.8053405572755418
 83    }
 84  ],
 85  "b": [
 86    {
 87      "b": "0",
 88      "e": "0",
 89      "__p__": 0.8015102481121898
 90    },
 91    {
 92      "b": "1",
 93      "e": "0",
 94      "__p__": 0.19848975188781015
 95    },
 96    {
 97      "b": "0",
 98      "e": "1",
 99      "__p__": 0.8068885448916409
100    },
101    {
102      "b": "1",
103      "e": "1",
104      "__p__": 0.19311145510835914
105    }
106  ],
107  "c": [
108    {
109      "c": "0",
110      "e": "0",
111      "__p__": 0.6863538295577131
112    },
113    {
114      "c": "1",
115      "e": "0",
116      "__p__": 0.31364617044228693
117    },
118    {
119      "c": "0",
120      "e": "1",
121      "__p__": 0.6884674922600619
122    },
123    {
124      "c": "1",
125      "e": "1",
126      "__p__": 0.31153250773993807
127    }
128  ],
129  "d": [
130    {
131      "d": "0",
132      "e": "0",
133      "__p__": 0.9704692556634305
134    },
135    {
136      "d": "1",
137      "e": "0",
138      "__p__": 0.02953074433656958
139    },
140    {
141      "d": "0",
142      "e": "1",
143      "__p__": 0.2921826625386997
144    },
145    {
146      "d": "1",
147      "e": "1",
148      "__p__": 0.7078173374613003
149    }
150  ]
151}
152
153py-bbn, posteriors
154----------
155e : 0=0.74160, 1=0.25840
156a : 0=0.18930, 1=0.81070
157b : 0=0.80290, 1=0.19710
158c : 0=0.68690, 1=0.31310
159d : 0=0.79520, 1=0.20480
160
161py-bbn, data
162----------
163{
164  "nodes": {
165    "0": {
166      "probs": [
167        0.7416,
168        0.2584
169      ],
170      "variable": {
171        "id": 0,
172        "name": "e",
173        "values": [
174          "0",
175          "1"
176        ]
177      }
178    },
179    "1": {
180      "probs": [
181        0.18743257820927725,
182        0.8125674217907227,
183        0.1946594427244582,
184        0.8053405572755418
185      ],
186      "variable": {
187        "id": 1,
188        "name": "a",
189        "values": [
190          "0",
191          "1"
192        ]
193      }
194    },
195    "2": {
196      "probs": [
197        0.8015102481121898,
198        0.19848975188781015,
199        0.8068885448916409,
200        0.19311145510835914
201      ],
202      "variable": {
203        "id": 2,
204        "name": "b",
205        "values": [
206          "0",
207          "1"
208        ]
209      }
210    },
211    "3": {
212      "probs": [
213        0.6863538295577131,
214        0.31364617044228693,
215        0.6884674922600619,
216        0.31153250773993807
217      ],
218      "variable": {
219        "id": 3,
220        "name": "c",
221        "values": [
222          "0",
223          "1"
224        ]
225      }
226    },
227    "4": {
228      "probs": [
229        0.9704692556634305,
230        0.02953074433656958,
231        0.2921826625386997,
232        0.7078173374613003
233      ],
234      "variable": {
235        "id": 4,
236        "name": "d",
237        "values": [
238          "0",
239          "1"
240        ]
241      }
242    }
243  },
244  "edges": [
245    {
246      "pa": 0,
247      "ch": 1
248    },
249    {
250      "pa": 0,
251      "ch": 2
252    },
253    {
254      "pa": 0,
255      "ch": 3
256    },
257    {
258      "pa": 0,
259      "ch": 4
260    }
261  ]
262}
263
264darkstar, data
265----------
266{
267  "nodes": {
268    "e": {
269      "id": 0,
270      "values": {
271        "0": 0,
272        "1": 1
273      }
274    },
275    "a": {
276      "id": 1,
277      "values": {
278        "0": 0,
279        "1": 1
280      }
281    },
282    "b": {
283      "id": 2,
284      "values": {
285        "0": 0,
286        "1": 1
287      }
288    },
289    "c": {
290      "id": 3,
291      "values": {
292        "0": 0,
293        "1": 1
294      }
295    },
296    "d": {
297      "id": 4,
298      "values": {
299        "0": 0,
300        "1": 1
301      }
302    }
303  },
304  "edges": [
305    {
306      "parent": "e",
307      "child": "a"
308    },
309    {
310      "parent": "e",
311      "child": "b"
312    },
313    {
314      "parent": "e",
315      "child": "c"
316    },
317    {
318      "parent": "e",
319      "child": "d"
320    }
321  ],
322  "parameters": {
323    "e": [
324      {
325        "e": "0",
326        "__p__": 0.7416
327      },
328      {
329        "e": "1",
330        "__p__": 0.2584
331      }
332    ],
333    "a": [
334      {
335        "a": "0",
336        "e": "0",
337        "__p__": 0.18743257820927725
338      },
339      {
340        "a": "1",
341        "e": "0",
342        "__p__": 0.8125674217907227
343      },
344      {
345        "a": "0",
346        "e": "1",
347        "__p__": 0.1946594427244582
348      },
349      {
350        "a": "1",
351        "e": "1",
352        "__p__": 0.8053405572755418
353      }
354    ],
355    "b": [
356      {
357        "b": "0",
358        "e": "0",
359        "__p__": 0.8015102481121898
360      },
361      {
362        "b": "1",
363        "e": "0",
364        "__p__": 0.19848975188781015
365      },
366      {
367        "b": "0",
368        "e": "1",
369        "__p__": 0.8068885448916409
370      },
371      {
372        "b": "1",
373        "e": "1",
374        "__p__": 0.19311145510835914
375      }
376    ],
377    "c": [
378      {
379        "c": "0",
380        "e": "0",
381        "__p__": 0.6863538295577131
382      },
383      {
384        "c": "1",
385        "e": "0",
386        "__p__": 0.31364617044228693
387      },
388      {
389        "c": "0",
390        "e": "1",
391        "__p__": 0.6884674922600619
392      },
393      {
394        "c": "1",
395        "e": "1",
396        "__p__": 0.31153250773993807
397      }
398    ],
399    "d": [
400      {
401        "d": "0",
402        "e": "0",
403        "__p__": 0.9704692556634305
404      },
405      {
406        "d": "1",
407        "e": "0",
408        "__p__": 0.02953074433656958
409      },
410      {
411        "d": "0",
412        "e": "1",
413        "__p__": 0.2921826625386997
414      },
415      {
416        "d": "1",
417        "e": "1",
418        "__p__": 0.7078173374613003
419      }
420    ]
421  }
422}
423Batch [7732907e5b8843f98c5f6c2ccffbd85d] finished.
424metadata:
425  '@type': type.googleapis.com/google.cloud.dataproc.v1.BatchOperationMetadata
426  batch: projects/rocketvector/locations/us-central1/batches/7732907e5b8843f98c5f6c2ccffbd85d
427  batchUuid: 96fc6be5-bb7b-45cf-9123-669ff6fa1a05
428  createTime: '2023-06-08T08:17:28.049693Z'
429  description: Batch
430  operationType: BATCH
431name: projects/rocketvector/regions/us-central1/operations/61f78fed-c3c5-38b6-a0d8-1b492d3d210d

AWS, Spark Processing Job

Here is a demo of running a learning algorithm on AWS Sagemaker using Spark Processing Jobs. We can kick off the job by running a driver program on the command line.

1 python driver.py

The driver program driver.py looks like the following. Note that you should build a Docker image and push it to ECR. That Docker image should have all the APIs you need installed.

 1from sagemaker.spark.processing import PySparkProcessor
 2
 3job = PySparkProcessor(**{
 4    'role': 'your_aws_role',
 5    'instance_type': 'ml.c5.xlarge',
 6    'instance_count': 1,
 7    'base_job_name': 'pyspark-bbn',
 8    'image_uri': 'your_docker_image_uri'
 9})
10
11job.run(
12    submit_app='learn.py',
13    arguments=[
14        '--input_bucket', 'your_input_bucket',
15        '--input_key', 'temp/data-from-structure.csv',
16        '--output_bucket', 'your_output_bucket',
17        '--output_key', 'temp/output/data-from-structure/bbn-naive.json',
18        '--clazz_var', 'your_clazz_variable'
19    ]
20)

The learning program learn.py looks like the following. This learning program simply learns a Naive bayes nodel.

 1import argparse
 2import json
 3import logging
 4import sys
 5from typing import List
 6
 7import boto3
 8from pybbn.graph.dag import Bbn
 9from pyspark.sql import SparkSession
10
11from pysparkbbn.discrete.bbn import get_bbn
12from pysparkbbn.discrete.data import DiscreteData
13from pysparkbbn.discrete.plearn import ParamLearner
14from pysparkbbn.discrete.scblearn import Naive
15
16logging.basicConfig(level=logging.INFO, format='%(asctime)s %(levelname)s %(message)s')
17spark = SparkSession.builder.appName('learn-naive').getOrCreate()
18
19
20def parse_pargs(args: List[str]) -> argparse.Namespace:
21    parser = argparse.ArgumentParser()
22    parser.add_argument('--input_bucket', type=str, required=True)
23    parser.add_argument('--input_key', type=str, required=True)
24    parser.add_argument('--output_bucket', type=str, required=True)
25    parser.add_argument('--output_key', type=str, required=True)
26    parser.add_argument('--clazz_var', type=str, default=None)
27
28    return parser.parse_args(args)
29
30
31def upload(src: str, bucket: str, key: str):
32    s3 = boto3.client('s3')
33    response = s3.upload_file(src, bucket, key)
34    logging.info(f'uploaded {src} to {bucket}/{key}')
35    logging.info(f'response={response}')
36
37
38if __name__ == '__main__':
39    args = parse_pargs(sys.argv[1:])
40
41    logging.info('Job Starting')
42
43    logging.info('Parsed Arguments')
44    logging.info(f'args={args}')
45
46    data_path = f's3://{args.input_bucket}/{args.input_key}'
47    logging.info(f'data_path={data_path}')
48
49    sdf = spark \
50        .read \
51        .option('header', 'true') \
52        .csv(data_path)
53
54    n_rows, n_cols = sdf.count(), len(sdf.columns)
55    logging.info('Read Data')
56    logging.info(f'data dimensions: rows={n_rows:,}, cols={n_cols:,}')
57
58    data = DiscreteData(sdf)
59
60    structure_learner = Naive(data, args.clazz_var)
61    logging.info('Learned Structure')
62    logging.info(f'structure learn type: {type(structure_learner)}')
63
64    g = structure_learner.get_network()
65    logging.info(f'learned structure: nodes={len(g.nodes())}, edges={len(g.edges())}')
66
67    parameter_learner = ParamLearner(data, g)
68    p = parameter_learner.get_params()
69    logging.info('Learned Parameters')
70    logging.info(f'learned parameters: {len(p)}')
71
72    bbn = get_bbn(g, p, data.get_profile())
73    logging.info('Constructed BBN')
74    logging.info(f'bbn: nodes={len(bbn.nodes)}, edges={len(bbn.edges)}')
75
76    j_data = json.dumps(Bbn.to_dict(bbn), indent=2)
77    j_path = '/tmp/bbn.json'
78    with open(j_path, 'w') as f:
79        f.write(j_data)
80
81    logging.info('Serialized BBN')
82    logging.info(f'saved bbn to {j_path}')
83    upload(j_path, args.output_bucket, args.output_key)
84
85    spark.stop()
86
87    print('Finished')

Azure, Machine Learning

Here is a demo of running a Serverless Spark Standalone job in Azure Machine Learning. We can submit a job via the Azure CLI as follows.

1az ml job create \
2             -f learn-naive.yaml \
3             -g your_resource_group \
4             -w your_aml_workspace \
5             --subscription your_subscription_id

The YAML file learn-naive.yaml looks like the following.

 1$schema: http://azureml/sdk-2-0/SparkJob.json
 2type: spark
 3
 4code: ./src
 5entry:
 6  file: learn-naive.py
 7
 8py_files:
 9  - pybbn-3.2.3-py3.9.egg
10  - pysparkbbn-0.0.3-py3.9.egg
11
12conf:
13  spark.driver.cores: 1
14  spark.driver.memory: 2g
15  spark.executor.cores: 2
16  spark.executor.memory: 2g
17  spark.executor.instances: 2
18
19inputs:
20  input_data:
21    type: uri_file
22    path: abfss://your_container@your_storage_account.dfs.core.windows.net/input/data-binary.csv
23    mode: direct
24  clazz: "e"
25
26args: >-
27  --input_data ${{inputs.input_data}} --clazz ${{inputs.clazz}}
28
29identity:
30  type: user_identity
31
32resources:
33  instance_type: standard_e4s_v3
34  runtime_version: "3.2"

The Python program learn-naive.py looks like the following.

 1import argparse
 2import json
 3import sys
 4from typing import List
 5
 6from pybbn.pptc.inferencecontroller import InferenceController
 7from pyspark.sql import SparkSession
 8
 9from pysparkbbn.discrete.bbn import get_bbn, get_darkstar_data, get_pybbn_data
10from pysparkbbn.discrete.data import DiscreteData
11from pysparkbbn.discrete.plearn import ParamLearner
12from pysparkbbn.discrete.scblearn import Naive
13
14
15def parse_pargs(args: List[str]) -> argparse.Namespace:
16    parser = argparse.ArgumentParser()
17    parser.add_argument('--input_data', type=str, required=True, help='Input CSV file')
18    parser.add_argument('--clazz', type=str, required=False, help='Clazz variable')
19
20    return parser.parse_args(args)
21
22
23def start(input_data: str, clazz: str):
24    spark = SparkSession \
25        .builder \
26        .appName('learn-naive') \
27        .getOrCreate()
28
29    sdf = spark.read \
30        .option('header', True) \
31        .option('inferSchema', True) \
32        .csv(input_data)
33
34    print('data schema')
35    sdf.printSchema()
36
37    print('')
38    print('data sample')
39    sdf.show(10)
40
41    data = DiscreteData(sdf)
42    naive = Naive(data, clazz)
43    g = naive.get_network()
44
45    print('')
46    print('nodes')
47    print('-' * 10)
48    for n in g.nodes():
49        print(f'{n}')
50
51    print('')
52    print('edges')
53    print('-' * 10)
54    for pa, ch in g.edges():
55        print(f'{pa} -> {ch}')
56
57    param_learner = ParamLearner(data, g)
58    p = param_learner.get_params()
59
60    print('')
61    print('params')
62    print('-' * 10)
63    print(json.dumps(p, indent=2))
64
65    print('')
66    print('py-bbn, posteriors')
67    print('-' * 10)
68    bbn = get_bbn(g, p, data.get_profile())
69    join_tree = InferenceController.apply(bbn)
70
71    for node, posteriors in join_tree.get_posteriors().items():
72        p_str = ', '.join([f'{val}={prob:.5f}' for val, prob in posteriors.items()])
73        print(f'{node} : {p_str}')
74
75    print('')
76    print('py-bbn, data')
77    print('-' * 10)
78    pybbn_data = get_pybbn_data(g, p, data.get_profile())
79    print(json.dumps(pybbn_data, indent=2))
80
81    print('')
82    print('darkstar, data')
83    print('-' * 10)
84    darkstar_data = get_darkstar_data(g, p, data.get_profile())
85    print(json.dumps(darkstar_data, indent=2))
86
87
88if __name__ == '__main__':
89    args = parse_pargs(sys.argv[1:])
90
91    input_data = args.input_data
92    clazz = args.clazz
93    start(input_data, clazz)