Skip to content

Commit d989d1e

Browse files
committed
Error Handling
1 parent e5b2c83 commit d989d1e

18 files changed

+1627
-32
lines changed

pkg/bpmn_engine/command.go

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,12 @@ import "github.com/nitram509/lib-bpmn-engine/pkg/spec/BPMN20"
55
type commandType string
66

77
const (
8-
flowTransitionType commandType = "flowTransition"
9-
activityType commandType = "activity"
10-
continueActivityType commandType = "continueActivity"
8+
flowTransitionType commandType = "flowTransition"
9+
activityType commandType = "activity"
10+
continueActivityType commandType = "continueActivity"
11+
// A command that there is a technical error and the engine should fail the process instance
1112
errorType commandType = "error"
13+
eventSubProcessCompletedType commandType = "eventSubProcessCompletedType"
1214
checkExclusiveGatewayDoneType commandType = "checkExclusiveGatewayDone"
1315
)
1416

@@ -72,3 +74,12 @@ type checkExclusiveGatewayDoneCommand struct {
7274
func (t checkExclusiveGatewayDoneCommand) Type() commandType {
7375
return checkExclusiveGatewayDoneType
7476
}
77+
78+
type eventSubProcessCompletedCommand struct {
79+
// activity reference to the event sub-process, which has completed
80+
activity activity
81+
}
82+
83+
func (t eventSubProcessCompletedCommand) Type() commandType {
84+
return eventSubProcessCompletedType
85+
}

pkg/bpmn_engine/engine.go

Lines changed: 180 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -203,11 +203,16 @@ func (state *BpmnEngineState) run(process BPMN20.ProcessElement, instance *proce
203203
instance.ActivityState = Failed
204204
// *activityState = Failed // TODO: check if meaningful
205205
break
206+
case eventSubProcessCompletedType:
207+
subProcessActivity := cmd.(eventSubProcessCompletedCommand).activity
208+
instance.SetState(subProcessActivity.State())
209+
state.exportElementEvent(process, *instance, process, exporter.ElementCompleted)
210+
break
206211
case checkExclusiveGatewayDoneType:
207212
activity := cmd.(checkExclusiveGatewayDoneCommand).gatewayActivity
208213
state.checkExclusiveGatewayDone(activity)
209214
default:
210-
panic("[invariant check] command type check not fully implemented")
215+
return newEngineErrorf("[invariant check] command type check not fully implemented")
211216
}
212217
}
213218

@@ -239,12 +244,41 @@ func (state *BpmnEngineState) handleElement(process BPMN20.ProcessElement, act a
239244
state.exportElementEvent(process, *instance, *element, exporter.ElementCompleted) // special case here, to end the instance
240245
case BPMN20.ServiceTask:
241246
taskElement := (*element).(BPMN20.TaskElement)
242-
_, activity = state.handleServiceTask(process, instance, &taskElement)
243-
createFlowTransitions = activity.State() == Completed
247+
_, job, jobErr := state.handleServiceTask(process, instance, &taskElement)
248+
err = jobErr
249+
activity = job
250+
if err != nil {
251+
nextCommands = append(nextCommands, errorCommand{
252+
err: err,
253+
elementId: (*element).GetId(),
254+
elementName: (*element).GetName(),
255+
})
256+
} else if job.ErrorCode != "" {
257+
// The current process will remain ACTIVE until the event sub-processes have completed.
258+
nextCommands = handleErrorEvent(process, instance, element, job.ErrorCode)
259+
createFlowTransitions = false // TODO confirm
260+
} else {
261+
// Only follow sequence flow if there are no Technical or Business Errors
262+
createFlowTransitions = activity.State() == Completed
263+
}
244264
case BPMN20.UserTask:
245265
taskElement := (*element).(BPMN20.TaskElement)
246-
activity = state.handleUserTask(process, instance, &taskElement)
247-
createFlowTransitions = activity.State() == Completed
266+
job, jobErr := state.handleUserTask(process, instance, &taskElement)
267+
err = jobErr
268+
activity = job
269+
if err != nil {
270+
nextCommands = append(nextCommands, errorCommand{
271+
err: err,
272+
elementId: (*element).GetId(),
273+
elementName: (*element).GetName(),
274+
})
275+
} else if job.ErrorCode != "" {
276+
nextCommands = handleErrorEvent(process, instance, element, job.ErrorCode)
277+
createFlowTransitions = false
278+
} else {
279+
// Only follow sequence flow if there are no Technical or Business Errors
280+
createFlowTransitions = activity.State() == Completed
281+
}
248282
case BPMN20.IntermediateCatchEvent:
249283
ice := (*element).(BPMN20.TIntermediateCatchEvent)
250284
createFlowTransitions, activity, err = state.handleIntermediateCatchEvent(process, instance, ice, originActivity)
@@ -292,24 +326,147 @@ func (state *BpmnEngineState) handleElement(process BPMN20.ProcessElement, act a
292326
createFlowTransitions = true
293327
case BPMN20.SubProcess:
294328
subProcessElement := (*element).(BPMN20.TSubProcess)
295-
activity, err = state.handleSubProcess(instance, &subProcessElement)
329+
subProcess, subProcessErr := state.handleSubProcess(instance, &subProcessElement)
330+
activity = subProcess
331+
err = subProcessErr
296332
if err != nil {
297333
nextCommands = append(nextCommands, errorCommand{
298334
err: err,
299335
elementId: (*element).GetId(),
300336
elementName: (*element).GetName(),
301337
})
338+
} else if subProcessElement.TriggeredByEvent {
339+
// We need to complete the parent process when an event sub-process has completed. but we cant do it here
340+
nextCommands = append(nextCommands, eventSubProcessCompletedCommand{
341+
activity: subProcess,
342+
})
302343
}
303344
createFlowTransitions = activity.State() == Completed
345+
case BPMN20.BoundaryEvent:
346+
boundary := (*element).(BPMN20.TBoundaryEvent)
347+
activity, err = state.handleBoundaryEvent(&boundary, instance)
304348
default:
305-
panic(fmt.Sprintf("[invariant check] unsupported element: id=%s, type=%s", (*element).GetId(), (*element).GetType()))
349+
nextCommands = append(nextCommands, errorCommand{
350+
err: newEngineErrorf("[invariant check] unsupported element: id=%s, type=%s", (*element).GetId(), (*element).GetType()),
351+
elementId: (*element).GetId(),
352+
elementName: (*element).GetName(),
353+
})
306354
}
307355
if createFlowTransitions && err == nil {
308356
nextCommands = append(nextCommands, createNextCommands(process, instance, element, activity)...)
309357
}
310358
return nextCommands
311359
}
312360

361+
func handleErrorEvent(process BPMN20.ProcessElement, instance *processInstanceInfo, element *BPMN20.BaseElement, errorCode string) []command {
362+
// Find the error by code on the process
363+
if errT, found := findErrorDefinition(instance.ProcessInfo.definitions, errorCode); found {
364+
365+
// Find the boundary events for the task
366+
boundaryEvents := findBoundaryEventsForTypeAndReference(instance.ProcessInfo.definitions, BPMN20.ErrorBoundary, (*element).GetId())
367+
if boundaryEvent, foundBoundary := findBoundaryEventForError(boundaryEvents, errT.Id); foundBoundary {
368+
return []command{
369+
activityCommand{element: BPMN20.Ptr[BPMN20.BaseElement](boundaryEvent)},
370+
}
371+
}
372+
373+
// If we still haven't found a command then we should look to see if there is an event sub process we can follow
374+
if subProcess, subFound := findEventSubprocessForError(process, errT.Id); subFound {
375+
return []command{
376+
activityCommand{element: BPMN20.Ptr[BPMN20.BaseElement](subProcess)},
377+
}
378+
}
379+
380+
// If not see if there is a catch-all boundary event
381+
if boundaryEvent, foundBoundary := findBoundaryEventForError(boundaryEvents, ""); foundBoundary {
382+
return []command{
383+
activityCommand{element: BPMN20.Ptr[BPMN20.BaseElement](boundaryEvent)},
384+
}
385+
}
386+
387+
// If not find an event sub process matching catchall
388+
if subProcess, subFound := findEventSubprocessForError(process, ""); subFound {
389+
return []command{
390+
activityCommand{element: BPMN20.Ptr[BPMN20.BaseElement](subProcess)},
391+
}
392+
}
393+
394+
// TODO continue lookup up to the parent process if this is a sub process
395+
396+
return []command{
397+
errorCommand{
398+
err: newEngineErrorf("Could not find suitable handler for ErrorCode event id=%s, code=%s", errT.Id, errT.ErrorCode),
399+
elementId: (*element).GetId(),
400+
elementName: (*element).GetName(),
401+
},
402+
}
403+
} else {
404+
return []command{
405+
errorCommand{
406+
err: newEngineErrorf("Could not find error definition \"%s\"", errorCode),
407+
elementId: (*element).GetId(),
408+
elementName: (*element).GetName(),
409+
},
410+
}
411+
}
412+
}
413+
414+
func findEventSubprocessForError(process BPMN20.ProcessElement, errorReferenceID string) (BPMN20.TSubProcess, bool) {
415+
// Look for event sub-processes in the process
416+
for _, subProcess := range process.GetSubProcess() {
417+
// Check if this is an event sub-process (triggered by event)
418+
if subProcess.TriggeredByEvent {
419+
// Look for start events in the sub-process
420+
for _, startEvent := range subProcess.StartEvents {
421+
// Check if this start event has an error event definition
422+
if startEvent.ErrorEventDefinition.ErrorRef == errorReferenceID {
423+
// We found an event sub-process with an error start event
424+
return subProcess, true
425+
}
426+
}
427+
}
428+
}
429+
430+
// No matching event sub-process found
431+
return BPMN20.TSubProcess{}, false
432+
}
433+
434+
// findBoundaryEventsForReference finds all boundary events attached to the provided element
435+
func findBoundaryEventsForTypeAndReference(definitions BPMN20.TDefinitions, boundaryType BPMN20.BoundaryType, referenceID string) []BPMN20.TBoundaryEvent {
436+
boundaryEvents := make([]BPMN20.TBoundaryEvent, 0)
437+
for _, boundary := range definitions.Process.BoundaryEvent {
438+
if boundary.AttachedToRef == referenceID && boundary.GetBoundaryType() == boundaryType {
439+
boundaryEvents = append(boundaryEvents, boundary)
440+
}
441+
}
442+
return boundaryEvents
443+
}
444+
445+
func findBoundaryEventForError(boundaryEvents []BPMN20.TBoundaryEvent, errorID string) (BPMN20.TBoundaryEvent, bool) {
446+
for _, boundaryEvent := range boundaryEvents {
447+
// Check if this boundary event has an error event definition
448+
if boundaryEvent.ErrorEventDefinition.ErrorRef == errorID {
449+
return boundaryEvent, true
450+
}
451+
}
452+
return BPMN20.TBoundaryEvent{}, false
453+
}
454+
455+
func findErrorDefinition(definitions BPMN20.TDefinitions, errorCode string) (BPMN20.TError, bool) {
456+
457+
// Iterate through all errors in the definitions
458+
for _, err := range definitions.Errors {
459+
// Check if the error code matches the requested code
460+
if err.ErrorCode == errorCode {
461+
return err, true
462+
}
463+
}
464+
465+
// Return empty error if not found
466+
return BPMN20.TError{}, false
467+
468+
}
469+
313470
func createCheckExclusiveGatewayDoneCommand(originActivity activity) (cmds []command) {
314471
if (*originActivity.Element()).GetType() == BPMN20.EventBasedGateway {
315472
evtBasedGatewayActivity := originActivity.(*eventBasedGatewayActivity)
@@ -473,3 +630,19 @@ func (state *BpmnEngineState) findCreatedTimers(instance *processInstanceInfo) (
473630
}
474631
return result
475632
}
633+
634+
func (state *BpmnEngineState) handleBoundaryEvent(element *BPMN20.TBoundaryEvent, instance *processInstanceInfo) (activity, error) {
635+
var be BPMN20.BaseElement = element
636+
activity := &elementActivity{
637+
key: state.generateKey(),
638+
state: Completed,
639+
element: &be,
640+
}
641+
variableHolder := NewVarHolder(&instance.VariableHolder, nil)
642+
err := propagateProcessInstanceVariables(&variableHolder, element.GetOutputMapping())
643+
if err != nil {
644+
instance.ActivityState = Failed
645+
}
646+
647+
return activity, err
648+
}

pkg/bpmn_engine/engine_test.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -215,6 +215,8 @@ func Test_CreateInstanceById_uses_latest_process_version(t *testing.T) {
215215
func Test_CreateAndRunInstanceById_uses_latest_process_version(t *testing.T) {
216216
// setup
217217
engine := New()
218+
engine.NewTaskHandler().Id("id").Handler(jobCompleteHandler)
219+
engine.NewTaskHandler().Id("test-2").Handler(jobCompleteHandler)
218220

219221
// when
220222
v1, err := engine.LoadFromFile("../../test-cases/simple_task.bpmn")
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
package exporter
2+
3+
import "fmt"
4+
5+
// LoggingEventExported writes all events to a log file
6+
type LoggingEventExported struct {
7+
}
8+
9+
// NewEventLogExporter creates a new instance of a LoggingEventExported
10+
func NewEventLogExporter() *LoggingEventExported {
11+
return &LoggingEventExported{}
12+
}
13+
14+
func (*LoggingEventExported) NewProcessEvent(event *ProcessEvent) {
15+
fmt.Printf("New Process event version: %d, processKey: %d, processID: %s\n", event.Version, event.ProcessKey, event.ProcessId)
16+
}
17+
18+
func (*LoggingEventExported) EndProcessEvent(event *ProcessInstanceEvent) {
19+
fmt.Printf("End Process event version: %d, processKey: %d, processID: %s, processInstanceKey: %d\n", event.Version, event.ProcessKey, event.ProcessId, event.ProcessInstanceKey)
20+
}
21+
22+
func (*LoggingEventExported) NewProcessInstanceEvent(event *ProcessInstanceEvent) {
23+
fmt.Printf("New Process Instance version: %d, processKey: %d, processID: %s, processInstanceKey: %d\n", event.Version, event.ProcessKey, event.ProcessId, event.ProcessInstanceKey)
24+
}
25+
26+
func (*LoggingEventExported) NewElementEvent(event *ProcessInstanceEvent, elementInfo *ElementInfo) {
27+
fmt.Printf("New Element event version: %d, processKey: %d, processID: %s, processInstanceKey: %d, elementType: %s, elementId: %s, intent: %s\n",
28+
event.Version, event.ProcessKey, event.ProcessId, event.ProcessInstanceKey,
29+
elementInfo.BpmnElementType, elementInfo.ElementId, elementInfo.Intent)
30+
}

pkg/bpmn_engine/jobs.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,10 @@ type job struct {
1414
JobState ActivityState `json:"s"`
1515
CreatedAt time.Time `json:"c"`
1616
baseElement *BPMN20.BaseElement
17+
// Failure returned by a handler with job.Fail(string)
18+
Failure string `json:"f,omitempty"`
19+
// ErrorCode event thrown by a handler with job.ThrowError(string)
20+
ErrorCode string `json:"ec,omitempty"`
1721
}
1822

1923
func (j job) Key() int64 {

pkg/bpmn_engine/jobs_activated.go

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ type activatedJob struct {
99
processInstanceInfo *processInstanceInfo
1010
completeHandler func()
1111
failHandler func(reason string)
12+
errorHandler func(errorCode string)
1213
key int64
1314
processInstanceKey int64
1415
bpmnProcessId string
@@ -53,11 +54,15 @@ type ActivatedJob interface {
5354
CreatedAt() time.Time
5455

5556
// Fail does set the State the worker missed completing the job
56-
// Fail and Complete mutual exclude each other
57+
// ThrowError, Fail and Complete mutual exclude each other
5758
Fail(reason string)
5859

60+
// ThrowError throws an error event
61+
// ThrowError, Fail and Complete mutual exclude each other
62+
ThrowError(errorCode string)
63+
5964
// Complete does set the State the worker successfully completing the job
60-
// Fail and Complete mutual exclude each other
65+
// ThrowError, Fail and Complete mutual exclude each other
6166
Complete()
6267
}
6368

@@ -116,6 +121,11 @@ func (aj *activatedJob) Fail(reason string) {
116121
aj.failHandler(reason)
117122
}
118123

124+
// ThrowError implements ActivatedJob
125+
func (aj *activatedJob) ThrowError(errorCode string) {
126+
aj.errorHandler(errorCode)
127+
}
128+
119129
// Complete implements ActivatedJob
120130
func (aj *activatedJob) Complete() {
121131
aj.completeHandler()

0 commit comments

Comments
 (0)