1class PipelineStage {
2 String name
3 Closure action
4 boolean required = true
5
6 def execute(context) {
7 try {
8 return action.call(context)
9 } catch (Exception e) {
10 if (required) throw e
11 return context
12 }
13 }
14}
15
16class Pipeline {
17 List<PipelineStage> stages = []
18 Map<String, Object> context = [:]
19
20 Pipeline stage(String name, Map opts = [:],
21 Closure action) {
22 stages << new PipelineStage(
23 name: name,
24 action: action,
25 required: opts.get('required', true)
26 )
27 return this
28 }
29
30 Pipeline with(Map<String, Object> data) {
31 context.putAll(data)
32 return this
33 }
34
35 Map<String, Object> execute() {
36 def results = [:]
37 def current = context.clone()
38
39 stages.each { stage ->
40 def start = System.currentTimeMillis()
41 current = stage.execute(current)
42 def elapsed = System.currentTimeMillis() - start
43 results[stage.name] = [
44 status: 'success',
45 duration: elapsed
46 ]
47 }
48
49 return [
50 context: current,
51 results: results,
52 stageCount: stages.size()
53 ]
54 }
55}
56
57class DataTransformer {
58 static Pipeline createEtl() {
59 new Pipeline()
60 .stage('extract') { ctx ->
61 ctx.records = ctx.source.collect { row ->
62 row.collectEntries { k, v ->
63 [(k.toLowerCase()): v]
64 }
65 }
66 ctx
67 }
68 .stage('transform') { ctx ->
69 ctx.records = ctx.records.findAll { rec ->
70 rec.values().every { it != null }
71 }
72 ctx.recordCount = ctx.records.size()
73 ctx
74 }
75 .stage('validate', required: false) { ctx ->
76 ctx.valid = ctx.records.every { rec ->
77 rec.containsKey('id') &&
78 rec.id?.toString()?.isNumber()
79 }
80 ctx
81 }
82 }
83}