Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
293 changes: 140 additions & 153 deletions README.md

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion cmd/rabtap/cmd_subscribe_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ func TestCmdSubIntegration(t *testing.T) {
}

// when
output := testcommon.CaptureOutput(rabtap_main)
output := testcommon.CaptureOutput(rabtapMain)

// then
assert.Regexp(t, "(?s).*message received.*\nroutingkey.....: sub-queue-test\n.*Hello", output)
Expand Down
2 changes: 1 addition & 1 deletion cmd/rabtap/cmd_tap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ func TestCmdTapIntegration(t *testing.T) {
"--no-color",
}

output := testcommon.CaptureOutput(rabtap_main)
output := testcommon.CaptureOutput(rabtapMain)

assert.Regexp(t, "(?s).*message received.*\nroutingkey.....: tap-queue-test\n.*Hello", output)
}
256 changes: 137 additions & 119 deletions cmd/rabtap/command_line.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"os"
"regexp"
"strconv"
"strings"
"time"

docopt "github.com/docopt/docopt-go"
Expand All @@ -31,128 +32,118 @@ var (
)

const (
// note: usage is DSL interpreted by docopt - this is code. Change carefully.
// note: usage is DSL template interpreted by docopt - this is code. Change carefully.
// to unclutter the user-presented help, we internally use a template where
// some options common like [TLSOPTIONS] get expanded before getting parsed by docopt.
// This effectively adds a simple macro mechanism to docopt. See toDocoptDSL below.
usage = `rabtap - RabbitMQ wire tap. github.com/jandelgado/rabtap

Usage:
rabtap info [--api=APIURI] [--consumers] [--stats] [--filter=EXPR] [--omit-empty]
[--show-default] [--mode=MODE] [--format=FORMAT] [-kncv]
[(--tls-cert-file=CERTFILE --tls-key-file=KEYFILE)] [--tls-ca-file=CAFILE]
rabtap tap EXCHANGES [--uri=URI] [--saveto=DIR] [--format=FORMAT] [--limit=NUM]
[--idle-timeout=DURATION] [--filter=EXPR] [-jkncsv]
[(--tls-cert-file=CERTFILE --tls-key-file=KEYFILE)] [--tls-ca-file=CAFILE]
rabtap (tap --uri=URI EXCHANGES)... [--saveto=DIR] [--format=FORMAT] [--limit=NUM]
[--idle-timeout=DURATION] [--filter=EXPR] [-jkncsv]
[(--tls-cert-file=CERTFILE --tls-key-file=KEYFILE)] [--tls-ca-file=CAFILE]
rabtap sub QUEUE [--uri URI] [--saveto=DIR] [--format=FORMAT] [--limit=NUM]
[--offset=OFFSET] [--args=KV]... [(--reject [--requeue])] [-jkcsvn]
[--filter=EXPR] [--idle-timeout=DURATION]
[(--tls-cert-file=CERTFILE --tls-key-file=KEYFILE)] [--tls-ca-file=CAFILE]
rabtap pub [--uri=URI] [SOURCE] [--exchange=EXCHANGE] [--format=FORMAT]
[--routingkey=KEY | (--header=KV)...] [ (--property=KV)... ]
[--confirms] [--mandatory] [--delay=DELAY | --speed=FACTOR] [-jkv]
[(--tls-cert-file=CERTFILE --tls-key-file=KEYFILE)] [--tls-ca-file=CAFILE]
rabtap exchange create EXCHANGE [--uri=URI] [--type=TYPE] [--args=KV]... [-kv]
[--autodelete] [--durable]
[(--tls-cert-file=CERTFILE --tls-key-file=KEYFILE)] [--tls-ca-file=CAFILE]
rabtap exchange bind EXCHANGE to DESTEXCHANGE [--uri=URI] [-kv]
(--bindingkey=KEY | (--header=KV)... (--all|--any))
[(--tls-cert-file=CERTFILE --tls-key-file=KEYFILE)] [--tls-ca-file=CAFILE]
rabtap exchange rm EXCHANGE [--uri=URI] [-kv]
[(--tls-cert-file=CERTFILE --tls-key-file=KEYFILE)] [--tls-ca-file=CAFILE]
rabtap queue create QUEUE [--uri=URI] [--queue-type=TYPE] [--args=KV]... [-kv]
[--autodelete] [--durable] [--lazy]
[(--tls-cert-file=CERTFILE --tls-key-file=KEYFILE)] [--tls-ca-file=CAFILE]
rabtap queue bind QUEUE to EXCHANGE [--uri=URI] [-kv]
(--bindingkey=KEY | (--header=KV)... (--all|--any))
[(--tls-cert-file=CERTFILE --tls-key-file=KEYFILE)] [--tls-ca-file=CAFILE]
rabtap queue unbind QUEUE from EXCHANGE [--uri=URI] [-kv]
(--bindingkey=KEY | (--header=KV)... (--all|--any))
[(--tls-cert-file=CERTFILE --tls-key-file=KEYFILE)] [--tls-ca-file=CAFILE]
rabtap queue rm QUEUE [--uri=URI] [-kv]
[(--tls-cert-file=CERTFILE --tls-key-file=KEYFILE)] [--tls-ca-file=CAFILE]
rabtap queue purge QUEUE [--uri=URI] [-kv]
[(--tls-cert-file=CERTFILE --tls-key-file=KEYFILE)] [--tls-ca-file=CAFILE]
rabtap conn close CONNECTION [--api=APIURI] [--reason=REASON] [-kv]
[(--tls-cert-file=CERTFILE --tls-key-file=KEYFILE)] [--tls-ca-file=CAFILE]
[--show-default] [--mode=MODE] [--format=FORMAT] [TLSOPTIONS] [COMMON OPTIONS]
rabtap tap EXCHANGES [--uri=URI] [--saveto=DIR] [--format=FORMAT|--json] [--limit=NUM]
[--idle-timeout=DURATION] [--filter=EXPR] [--silent] [TLSOPTIONS] [COMMON OPTIONS]
rabtap (tap --uri=URI EXCHANGES)... [--saveto=DIR] [--format=FORMAT|--json] [--limit=NUM]
[--idle-timeout=DURATION] [--filter=EXPR] [--silent] [TLSOPTIONS] [COMMON OPTIONS]
rabtap sub QUEUE [--uri URI] [--saveto=DIR] [--format=FORMAT|--json] [--limit=NUM]
[--offset=OFFSET] [--args=KV]... [(--reject [--requeue])] [--silent]
[--filter=EXPR] [--idle-timeout=DURATION] [TLSOPTIONS] [COMMON OPTIONS]
rabtap pub [--uri=URI] [SOURCE] [--exchange=EXCHANGE] [--format=FORMAT|--json]
[--routingkey=KEY | (--header=KV)...] [ (--property=KV)... ] [--confirms]
[--mandatory] [--delay=DURATION | --speed=FACTOR] [TLSOPTIONS] [COMMON OPTIONS]
rabtap exchange create EXCHANGE [--uri=URI] [--type=TYPE] [--args=KV]...
[--autodelete] [--durable] [TLSOPTIONS] [COMMON OPTIONS]
rabtap exchange bind EXCHANGE to DESTEXCHANGE [--uri=URI]
(--bindingkey=KEY | (--header=KV)... (--all|--any)) [TLSOPTIONS]
rabtap exchange rm EXCHANGE [--uri=URI] [TLSOPTIONS] [COMMON OPTIONS]
rabtap queue create QUEUE [--uri=URI] [--queue-type=TYPE] [--args=KV]...
[--autodelete] [--durable] [--lazy] [TLSOPTIONS] [COMMON OPTIONS]
rabtap queue bind QUEUE to EXCHANGE [--uri=URI]
(--bindingkey=KEY | (--header=KV)... (--all|--any)) [TLSOPTIONS] [COMMON OPTIONS]
rabtap queue unbind QUEUE from EXCHANGE [--uri=URI]
(--bindingkey=KEY | (--header=KV)... (--all|--any)) [TLSOPTIONS] [COMMON OPTIONS]
rabtap queue rm QUEUE [--uri=URI] [TLSOPTIONS] [COMMON OPTIONS]
rabtap queue purge QUEUE [--uri=URI] [TLSOPTIONS] [COMMON OPTIONS]
rabtap conn close CONNECTION [--api=APIURI] [--reason=REASON] [TLSOPTIONS] [COMMON OPTIONS]
rabtap --version
rabtap (-h | --help | help) [properties]

`
options = `
Arguments and options:
EXCHANGES comma-separated list of exchanges and optional binding keys,
e.g. amq.topic:# or exchange1:key1,exchange2:key2.
EXCHANGE name of an exchange, e.g. amq.direct.
DESTEXCHANGE name of a a destination exchange in an exchange-to-exchange binding.
SOURCE file or directory to publish in pub mode. If omitted, stdin will be read.
QUEUE name of a queue.
CONNECTION name of a connection.
DIR directory to read messages from.
-a, --autodelete create auto delete exchange/queue.
--all set x-match=all option in header based routing.
--any set x-match=any option in header based routing.
--api=APIURI connect to given API server. If APIURL is omitted,
the environment variable RABTAP_APIURI will be used.
--args=KV A key value pair in the form of "key=value" passed as
additional arguments. e.g. '--args=x-queue-type=quorum'
-b, --bindingkey=KEY binding key to use in bind queue command.
--by-connection output of info command starts with connections.
-c, --color force colored output
--confirms enable publisher confirms and wait for confirmations.
--consumers include consumers and connections in output of info command.
--delay=DELAY Time to wait between sending messages during publish.
If not set then messages will be delayed as recorded.
The value must be suffixed with a time unit, e.g. ms, s etc.
-d, --durable create durable exchange/queue.
--exchange=EXCHANGE Optional exchange to publish to. If omitted, exchange will
be taken from message being published (see JSON message format).
e.g. 'amq.topic:#' or 'exchange1:key1,exchange2:key2'
EXCHANGE name of an exchange, e.g. 'amq.direct'
DESTEXCHANGE name of a a destination exchange in an exchange-to-exchange binding
SOURCE file or directory to publish in pub mode. If omitted, stdin will be read
QUEUE name of a queue
CONNECTION name of a connection
DIR directory to read messages from
DURATION a numerical duration with a unit suffix like "ms", "s", "m", "h"
-a, --autodelete create auto delete exchange/queue
--all set x-match=all option in header based routing
--any set x-match=any option in header based routing
--api=APIURI connect to given API server. If APIURL is omitted, the environment
variable RABTAP_APIURI will be used
--args=KV A key value pair in the form of "key=value" passed as additional
arguments. e.g. '--args=x-queue-type=quorum'
-b, --bindingkey=KEY binding key to use in bind queue command
--by-connection output of info command starts with connections
--confirms enable publisher confirms and wait for confirmations
--consumers include consumers and connections in output of info command
--delay=DURATION Time to wait between sending messages during publish. If not set,
then messages will be delayed as recorded.
-d, --durable create a durable exchange/queue
--exchange=EXCHANGE optional exchange to publish to. If omitted, exchange will be taken
from message being published (see JSON message format)
--filter=EXPR Predicate for sub, tap, info command to filter the output [default: true]
--format=FORMAT * for tap, pub, sub command: format to write/read messages to console
--format=FORMAT for tap, pub, sub command: format to write/read messages to console
and optionally to file (when --saveto DIR is given).
Valid options are: "raw", "json", "json-nopp". Default: raw
* for info command: controls generated output format. Valid
options are: "text", "dot". Default: text
-h, --help print this help
--header=KV A key value pair in the form of "key=value" used as a
routing- or binding-key. Can occur multiple times.
--idle-timeout=DURATION end reading messages when no new message was received
for the given duration. The value must be suffixed with
a time unit, e.g. ms, s etc.
-j, --json deprecated. Use "--format json" instead.
-k, --insecure allow insecure TLS connections (no certificate check).
--lazy create a lazy queue.
--limit=NUM Stop afer NUM messages were received. When set to 0, will
run until terminated [default: 0].
--mandatory enable mandatory publishing (messages must be delivered to queue).
--mode=MODE mode for info command. One of "byConnection", "byExchange".
[default: byExchange].
-n, --no-color don't colorize output (see also environment variable NO_COLOR).
--omit-empty don't show echanges without bindings in info command.
--offset=OFFSET Offset when reading from a stream. Can be 'first', 'last',
'next', a duration like '10m', a RFC3339-Timestamp or
an integer index value. Basically it is an alias for
'--args=x-stream-offset=OFFSET'.
--property=KV A key value pair in the form of "key=value" to specify
message properties like e.g. the content-type.
--queue-type=TYPE type of queue [default: classic].
--reason=REASON reason why the connection was closed [default: closed by rabtap].
--reject Reject messages. Default behaviour is to acknowledge messages.
Valid options are: 'raw', 'json', 'json-nopp'. Default: 'raw'
for info command: controls generated output format. Valid options
are: 'text', 'dot'. Default: 'text'
-h, --help prints this help
--header=KV A key value pair in the form of "key=value" used as a routing- or
binding-key. Can occur multiple times
--idle-timeout=DURATION end reading messages when no new message was received for the
given duration
-j, --json deprecated. Use "--format=json" instead
--lazy create a lazy queue
--limit=NUM Stop afer NUM messages were received. When set to 0, will run until
terminated [default: 0]
--mandatory enable mandatory publishing (messages must be delivered to queue)
--mode=MODE mode for info command. One of 'byConnection', 'byExchange' [default: byExchange]
--omit-empty don't show echanges without bindings in info command
--offset=OFFSET Offset when reading from a stream. Can be 'first', 'last', 'next',
a DURATION like '10m', a RFC3339-Timestamp or an integer index value.
Basically it is an alias for '--args=x-stream-offset=OFFSET'
--property=KV A key value pair in the form of "key=value" to specify message properties
like e.g. the content-type.
--queue-type=TYPE type of queue [default: classic]
--reason=REASON reason why the connection was closed [default: closed by rabtap]
--reject Reject messages. Default behaviour is to acknowledge messages
--requeue Instruct broker to requeue rejected message
-r, --routingkey=KEY routing key to use in publish mode. If omitted, routing key
will be taken from message being published (see JSON
message format).
--saveto=DIR also save messages and metadata to DIR.
--show-default include default exchange in output info command.
-s, --silent suppress message output to stdout.
--speed=FACTOR Speed factor to use during publish [default: 1.0].
--stats include statistics in output of info command.
-t, --type=TYPE type of exchange [default: fanout].
--tls-cert-file=CERTFILE A Cert file to use for client authentication.
--tls-key-file=KEYFILE A Key file to use for client authentication.
--tls-ca-file=CAFILE A CA Cert file to use with TLS.
--uri=URI connect to given AQMP broker. If omitted, the
environment variable RABTAP_AMQPURI will be used.
-v, --verbose enable verbose mode.
--version show version information and exit.
will be taken from message being published (see JSON message format)
--saveto=DIR also save messages and metadata to DIR
--show-default include default exchange in output info command
-s, --silent suppress message output to stdout
--speed=FACTOR Speed factor to use during publish [default: 1.0]
--stats include statistics in output of info command
-t, --type=TYPE type of exchange [default: fanout]
--uri=URI connect to given AQMP broker. If omitted, the environment variable
RABTAP_AMQPURI will be used
--version show version information and exit

Common options:
-c, --color force colored output
-n, --no-color don't colorize output (see also environment variable NO_COLOR)
-v, --verbose enable verbose mode

TLS options:
--tls-cert-file=CERTFILE A Cert file to use for client authentication
--tls-key-file=KEYFILE A Key file to use for client authentication
--tls-ca-file=CAFILE A CA Cert file to use with TLS
-k, --insecure allow insecure TLS connections (disables certificate check)

Examples:
rabtap tap --uri amqp://guest:guest@localhost/ amq.fanout:
Expand All @@ -168,7 +159,7 @@ Examples:
rabtap sub JDQ

# print only messages that have ".Name == 'JAN'" in their JSON payload
rabtap sub JDQ --filter="let b=fromJSON(r.toStr(r.body(r.msg))); b.Name == 'JAN'"
rabtap sub JDQ --filter="let b=fromJSON(r.toStr(r.body(r.msg))); b.Name == 'JAN'"
rabtap queue rm JDQ

# use RABTAP_APIURI environment variable to specify mgmt api uri instead of --api
Expand All @@ -186,7 +177,7 @@ The following message properties are used with the '--property Key=Value'
option of the pub command. All keys are case-insensitve. Use multiple
'--property' options to set multiple properties at once.

DeliveryMode - delivery mode: 'transient' or 'persistent'
DeliveryMode - delivery mode: 'transient' or 'persistent'
Priority - message priority for priority queues
Expiration - message TTL (ms)
ContentType - application use - MIME content type
Expand All @@ -199,6 +190,8 @@ Type - application use - message type name
AppId - application use - creating application id
UserId - user id, validated if set
`
tlsOptions = "[(--tls-cert-file=CERTFILE --tls-key-file=KEYFILE)] [--tls-ca-file=CAFILE] [--insecure]"
commonOptions = "[--verbose] [--no-color|--color]"
)

// ProgramCmd represents the mode of operation
Expand Down Expand Up @@ -458,6 +451,7 @@ func parsePubSubFormatArg(args map[string]interface{}) (string, error) {

// deprecated --json option equals "--format=json"
if args["--json"] != nil && args["--json"].(bool) {
fmt.Fprintln(os.Stderr, "warning: using deprecated --json option, will be removed. Use --format=json instead.")
format = "json"
}

Expand Down Expand Up @@ -763,7 +757,7 @@ func formatVersion() string {
}

func parseCommandLineArgsWithSpec(spec string, cliArgs []string) (CommandLineArgs, error) {
parser := docopt.Parser{SkipHelpFlags: true}
parser := docopt.Parser{SkipHelpFlags: true, HelpHandler: helpHandler}
args, err := parser.ParseArgs(spec, cliArgs, formatVersion())
if err != nil {
return CommandLineArgs{}, err
Expand Down Expand Up @@ -791,19 +785,43 @@ func parseCommandLineArgsWithSpec(spec string, cliArgs []string) (CommandLineArg
return CommandLineArgs{}, fmt.Errorf("command missing")
}

// PrintHelp is explicitly called when the "help" command is given. On -h or
// --help docopt internally prints help
// PrintHelp is explicitly called when the -h, --help or help command is given.
func PrintHelp(topic HelpTopic) {
switch topic {
case GeneralHelp:
docopt.PrintHelpOnly(nil, usage)
docopt.PrintHelpOnly(nil, fmt.Sprintf("%s\n%s", usage, options))
case PropertiesHelp:
fmt.Print(propertiesHelp)
}
}

// since we use templating in the usage text, we override the helpHandler to show
// our unexpanded usage text and not the version presented to docopt
func helpHandler(err error, _ string) {
if err != nil {
// docopt may pass a docopt.UserError with an empty text.
if err.Error() != "" {
fmt.Printf("%s\n%s\n", err, usage)
} else {
fmt.Printf("%s\n", usage)
}
} else {
// no error passed -> full help requested
PrintHelp(GeneralHelp)
}
}

// ParseCommandLineArgs parses command line arguments into an object of
// type CommandLineArgs.
func ParseCommandLineArgs(cliArgs []string) (CommandLineArgs, error) {
return parseCommandLineArgsWithSpec(usage, cliArgs)
}
func ParseCommandLineArgs(args []string) (CommandLineArgs, error) {
spec := toDocoptDSL(fmt.Sprintf("%s\n%s", usage, options))
return parseCommandLineArgsWithSpec(spec, args)
}

func toDocoptDSL(usage string) string {
replacer := strings.NewReplacer(
"[TLSOPTIONS]", tlsOptions,
"[COMMON OPTIONS]", commonOptions,
)
return replacer.Replace(usage)
}
Loading