@@ -273,7 +273,11 @@ def _handle_connect(self, conn_event):
273273 self .threads [thread .getName ()] = thread
274274 conn_event .conn_obj .assign_thread (thread .getName ())
275275 self .connections .update ({conn_event .conn_obj .id : conn_event .conn_obj })
276- thread .start ()
276+ try :
277+ thread .start ()
278+ except WebSocketConnectionClosedException :
279+ print (WebSocketBadStatusException )
280+ print ('Received in _handle_connect. Report to github.' )
277281
278282 def _init_connection (self , conn_obj ):
279283 """
@@ -439,8 +443,8 @@ def _handle_subscribe(self, sub_event):
439443 for cb in server_callback :
440444 for ticker in tickers :
441445 conn .corehub .server .invoke (cb , ticker )
442- if self .tickers .get_sub_state (ticker , SUB_TYPE_ORDERBOOK ) is True :
443- self ._get_snapshot ([ticker ])
446+ # if self.tickers.get_sub_state(ticker, SUB_TYPE_ORDERBOOK) is True:
447+ # self._get_snapshot([ticker])
444448 conn .increment_ticker ()
445449 if server_callback_no_payload is not None :
446450 if tickers == ALL_TICKERS :
@@ -498,20 +502,13 @@ def _handle_get_snapshot(self, snapshot_event):
498502 """
499503 conn , ticker = snapshot_event .conn_object , snapshot_event .ticker
500504 method = 'queryExchangeState'
501- # Wait for the connection to start successfully and record N nounces of data
502- while conn .state is False or self .tickers .get_nounces (ticker ) < 5 :
503- sleep (0.1 )
504- else :
505- try :
506- logger .info ('[Subscription][{}][{}]: Order book snapshot '
507- 'requested.' .format (SUB_TYPE_ORDERBOOK , ticker ))
508- conn .corehub .server .invoke (method , ticker )
509- self .tickers .set_snapshot_state (ticker , SNAPSHOT_SENT )
510- except Exception as e :
511- print (e )
512- print ('Failed to invoke snapshot query' )
513- while self .tickers .get_snapshot_state (ticker ) is not SNAPSHOT_ON :
514- sleep (0.5 )
505+ try :
506+ logger .info (NSG_INFO_ORDER_BOOK_REQUESTED .format (SUB_TYPE_ORDERBOOK , ticker ))
507+ conn .corehub .server .invoke (method , ticker )
508+ self .tickers .set_snapshot_state (ticker , SNAPSHOT_SENT )
509+ except Exception as e :
510+ print (e )
511+ print ('Failed to invoke snapshot query' )
515512
516513 def _handle_reconnect (self , reconnect_event ):
517514 ticker , sub_type , book_depth = reconnect_event .tickers , reconnect_event .sub_type , reconnect_event .book_depth
@@ -597,7 +594,18 @@ def _get_snapshot(self, tickers):
597594 conn_id = self .tickers .get_sub_type_conn_id (ticker_name , SUB_TYPE_ORDERBOOK )
598595 else :
599596 break
600- self .control_queue .put (SnapshotEvent (ticker_name , conn ))
597+ self .tickers .set_snapshot_state (ticker_name , SNAPSHOT_QUEUED )
598+ ### EXPERIMENTAL ###
599+ method = 'queryExchangeState'
600+ try :
601+ logger .info (NSG_INFO_ORDER_BOOK_REQUESTED .format (SUB_TYPE_ORDERBOOK , ticker_name ))
602+ conn .corehub .server .invoke (method , ticker_name )
603+ self .tickers .set_snapshot_state (ticker_name , SNAPSHOT_SENT )
604+ except Exception as e :
605+ print (e )
606+ print ('Failed to invoke snapshot query' )
607+ ###
608+ # self.control_queue.put(SnapshotEvent(ticker_name, conn))
601609
602610 def _is_order_queue (self ):
603611 if self .order_queue is None :
@@ -617,7 +625,7 @@ def _start_order_queue(self):
617625 if order_event is not None :
618626 ticker = order_event ['MarketName' ]
619627 snapshot_state = self .tickers .get_snapshot_state (ticker )
620- if snapshot_state in [SNAPSHOT_OFF , SNAPSHOT_SENT ]:
628+ if snapshot_state in [SNAPSHOT_OFF , SNAPSHOT_QUEUED , SNAPSHOT_SENT ]:
621629 self ._init_backorder_queue (ticker , order_event )
622630 elif snapshot_state == SNAPSHOT_RCVD :
623631 if self ._transfer_backorder_queue (ticker ):
@@ -930,16 +938,21 @@ def _is_orderbook_snapshot(self, msg):
930938 # Detect if the message contains order book snapshots and manipulate them.
931939 if 'R' in msg and type (msg ['R' ]) is not bool :
932940 if 'MarketName' in msg ['R' ] and msg ['R' ]['MarketName' ] is None :
933- for ticker in self .tickers .list .values ():
934- if ticker [SUB_TYPE_ORDERBOOK ]['SnapshotState' ] == SNAPSHOT_SENT :
935- msg ['R' ]['MarketName' ] = ticker ['Name' ]
936- del msg ['R' ]['Fills' ]
937- self .order_books [ticker ['Name' ]] = msg ['R' ]
938- self .tickers .set_snapshot_state (ticker ['Name' ], SNAPSHOT_RCVD )
939- break
940- logger .info (
941- '[Subscription][{}][{}]: Order book snapshot received.' .format (SUB_TYPE_ORDERBOOK ,
942- msg ['R' ]['MarketName' ]))
941+ thread_name = current_thread ().getName ()
942+ conn_id = self ._return_conn_by_thread_name (thread_name ).id
943+ subs = self .tickers .sort_by_conn_id (conn_id )['OrderBook' ]
944+ for ticker in subs .keys ():
945+ # for ticker in self.tickers.list.values():
946+ if self .tickers .get_snapshot_state (ticker ) is SNAPSHOT_SENT :
947+ # if ticker[SUB_TYPE_ORDERBOOK]['SnapshotState'] == SNAPSHOT_SENT:
948+ ### experimental - confirm
949+ if self ._transfer_backorder_queue2 (ticker , msg ['R' ]):
950+ msg ['R' ]['MarketName' ] = ticker
951+ del msg ['R' ]['Fills' ]
952+ self .order_books [ticker ] = msg ['R' ]
953+ self .tickers .set_snapshot_state (ticker , SNAPSHOT_RCVD )
954+ break
955+ logger .info (NSG_INFO_ORDER_BOOK_RECEIVED .format (SUB_TYPE_ORDERBOOK , msg ['R' ]['MarketName' ]))
943956
944957 def _init_backorder_queue (self , ticker , msg ):
945958 sub = self .tickers .list [ticker ][SUB_TYPE_ORDERBOOK ]
@@ -966,6 +979,27 @@ def _transfer_backorder_queue(self, ticker):
966979 self .tickers .set_snapshot_state (ticker , SNAPSHOT_ON )
967980 q .task_done ()
968981
982+ def _transfer_backorder_queue2 (self , ticker , snapshot ):
983+ confirmed = False
984+ sub = self .tickers .list [ticker ][SUB_TYPE_ORDERBOOK ]
985+ q = sub ['InternalQueue' ]
986+ q2 = queue .Queue ()
987+ while True :
988+ try :
989+ e = q .get (False )
990+ q2 .put (e )
991+ except queue .Empty :
992+ sub ['InternalQueue' ] = q2
993+ return confirmed
994+ except AttributeError :
995+ raise NotImplementedError ('Please report error to '
996+ 'https://github.com/slazarov/python-bittrex-websocket, '
997+ 'Error:_transfer_backorder_queue:AttributeError' )
998+ else :
999+ if self ._confirm_order_book (snapshot , e ):
1000+ confirmed = True
1001+ q .task_done ()
1002+
9691003 # ========================
9701004 # Private Channels Methods
9711005 # ========================
@@ -976,48 +1010,64 @@ def _on_debug(self, **kwargs):
9761010 Don't edit unless you know what you are doing.
9771011 Redirect full order book snapshots to on_message
9781012 """
979- if self ._is_close_me ():
980- return
981- self ._is_orderbook_snapshot (kwargs )
1013+ try :
1014+ if self ._is_close_me ():
1015+ return
1016+ self ._is_orderbook_snapshot (kwargs )
1017+ except Exception as e :
1018+ print (e )
1019+ print ('Got this exception from _on_debug. This is bug testing. Please report to '
1020+ 'https://github.com/slazarov/python-bittrex-websocket with this message' )
9821021
9831022 def _on_tick_update (self , msg ):
984- if self ._is_close_me ():
985- return
986- ticker = msg ['MarketName' ]
987- subs = self .tickers .get_ticker_subs (ticker )
988- if self .tickers .get_sub_state (ticker , SUB_TYPE_ORDERBOOK ) is SUB_STATE_ON :
989- self .order_queue .put (msg )
990- if self .tickers .get_sub_state (ticker , SUB_TYPE_ORDERBOOKUPDATE ) is SUB_STATE_ON :
991- d = dict (self ._create_base_layout (msg ),
992- ** {'bids' : msg ['Buys' ],
993- 'asks' : msg ['Sells' ]})
994- self .orderbook_update .on_change (d )
995- if self .tickers .get_sub_state (ticker , SUB_TYPE_TRADES ) is SUB_STATE_ON :
996- if msg ['Fills' ]:
1023+ try :
1024+ if self ._is_close_me ():
1025+ return
1026+ ticker = msg ['MarketName' ]
1027+ if self .tickers .get_sub_state (ticker , SUB_TYPE_ORDERBOOK ) is SUB_STATE_ON :
1028+ if self .tickers .get_snapshot_state (ticker ) is SNAPSHOT_OFF :
1029+ self ._get_snapshot ([ticker ])
1030+ self .order_queue .put (msg )
1031+ if self .tickers .get_sub_state (ticker , SUB_TYPE_ORDERBOOKUPDATE ) is SUB_STATE_ON :
9971032 d = dict (self ._create_base_layout (msg ),
998- ** {'trades' : msg ['Fills' ]})
999- self .trades .on_change (d )
1033+ ** {'bids' : msg ['Buys' ],
1034+ 'asks' : msg ['Sells' ]})
1035+ self .orderbook_update .on_change (d )
1036+ if self .tickers .get_sub_state (ticker , SUB_TYPE_TRADES ) is SUB_STATE_ON :
1037+ if msg ['Fills' ]:
1038+ d = dict (self ._create_base_layout (msg ),
1039+ ** {'trades' : msg ['Fills' ]})
1040+ self .trades .on_change (d )
1041+ except Exception as e :
1042+ print (e )
1043+ print ('Got this exception from _on_tick_update. This is bug testing. Please report to '
1044+ 'https://github.com/slazarov/python-bittrex-websocket with this message' )
10001045
10011046 def _on_ticker_update (self , msg ):
10021047 """
10031048 Invoking summary state updates for specific filter
10041049 doesn't work right now. So we will filter them manually.
10051050 """
1006- if self ._is_close_me ():
1007- return
1008- if 'Deltas' in msg :
1009- for update in msg ['Deltas' ]:
1010- if self .tickers .get_sub_state (ALL_TICKERS , SUB_TYPE_TICKERUPDATE ) is SUB_STATE_ON :
1011- self .updateSummaryState .on_change (msg ['Deltas' ])
1012- else :
1013- try :
1014- ticker = update ['MarketName' ]
1015- subs = self .tickers .get_ticker_subs (ticker )
1016- except KeyError : # not in the subscription list
1017- continue
1051+ try :
1052+ if self ._is_close_me ():
1053+ return
1054+ if 'Deltas' in msg :
1055+ for update in msg ['Deltas' ]:
1056+ if self .tickers .get_sub_state (ALL_TICKERS , SUB_TYPE_TICKERUPDATE ) is SUB_STATE_ON :
1057+ self .updateSummaryState .on_change (msg ['Deltas' ])
10181058 else :
1019- if subs ['TickerUpdate' ]['Active' ]:
1020- self .updateSummaryState .on_change (update )
1059+ try :
1060+ ticker = update ['MarketName' ]
1061+ subs = self .tickers .get_ticker_subs (ticker )
1062+ except KeyError : # not in the subscription list
1063+ continue
1064+ else :
1065+ if subs ['TickerUpdate' ]['Active' ]:
1066+ self .updateSummaryState .on_change (update )
1067+ except Exception as e :
1068+ print (e )
1069+ print ('Got this exception from _on_ticker_update. This is bug testing. Please report to '
1070+ 'https://github.com/slazarov/python-bittrex-websocket with this message' )
10211071
10221072 # -------------------------------------
10231073 # Private Channels Supplemental Methods
@@ -1101,6 +1151,43 @@ def _sync_order_book(self, ticker, order_data):
11011151 event = ReconnectEvent ([ticker ], SUB_TYPE_ORDERBOOK , book_depth )
11021152 self .control_queue .put (event )
11031153
1154+ def _confirm_order_book (self , snapshot , nounce_data ):
1155+ # Syncs the order book for the pair, given the most recent data from the socket
1156+ nounce_diff = nounce_data ['Nounce' ] - snapshot ['Nounce' ]
1157+ if nounce_diff == 1 :
1158+ # Start confirming
1159+ for side in [['Buys' , True ], ['Sells' , False ]]:
1160+ made_change = False
1161+ for item in nounce_data [side [0 ]]:
1162+ # TYPE 1: Cancelled / filled order entries at matching price
1163+ # -> DELETE from the order book
1164+ if item ['Type' ] == 1 :
1165+ for i , existing_order in enumerate (
1166+ self .order_books [snapshot ][side [0 ]]):
1167+ if existing_order ['Rate' ] == item ['Rate' ]:
1168+ del self .order_books [snapshot ][side [0 ]][i ]
1169+ made_change = True
1170+ break
1171+
1172+ # TYPE 2: Changed order entries at matching price (partial fills, cancellations)
1173+ # -> EDIT the order book
1174+ elif item ['Type' ] == 2 :
1175+ for existing_order in self .order_books [snapshot ][side [0 ]]:
1176+ if existing_order ['Rate' ] == item ['Rate' ]:
1177+ existing_order ['Quantity' ] = item ['Quantity' ]
1178+ made_change = True
1179+ break
1180+ if made_change :
1181+ return True
1182+ else :
1183+ return False
1184+ # The next nounce will trigger a sync.
1185+ elif nounce_diff == 0 :
1186+ return True
1187+ # The order book snapshot nounce is ahead. Discard this nounce.
1188+ elif nounce_diff < 0 :
1189+ return False
1190+
11041191 def _is_close_me (self ):
11051192 thread_name = current_thread ().getName ()
11061193 conn_object = self ._return_conn_by_thread_name (thread_name )
@@ -1134,8 +1221,13 @@ def on_close(self):
11341221
11351222 def on_error (self , error ):
11361223 # Error handler
1137- print (error )
1138- self .disconnect ()
1224+ try :
1225+ print (error )
1226+ self .disconnect ()
1227+ except Exception as e :
1228+ print (e )
1229+ print ('Got this exception from on_error. This is bug testing. Please report to '
1230+ 'https://github.com/slazarov/python-bittrex-websocket with this message' )
11391231
11401232 def on_orderbook (self , msg ):
11411233 # The main channel of subscribe_to_orderbook().
0 commit comments