@@ -180,176 +180,6 @@ def path_repr(path: VersionDef) -> str:
180180 return f'{ versions [0 ]} -> { versions [- 1 ]} '
181181
182182
183- class StorageCompatibilityTest (NodeProvider , unittest .TestCase ):
184-
185- CLUSTER_SETTINGS = {
186- 'cluster.name' : gen_id (),
187- "transport.netty.worker_count" : 16 ,
188- }
189-
190- def test_upgrade_paths (self ):
191- for path in get_test_paths ():
192- try :
193- self .setUp ()
194- self ._test_upgrade_path (path , nodes = 3 )
195- finally :
196- self .tearDown ()
197-
198- def _test_upgrade_path (self , versions : Tuple [VersionDef , ...], nodes : int ):
199- """ Test upgrade path across specified versions.
200-
201- Creates a blob and regular table in first version and inserts a record,
202- then goes through all subsequent versions - each time verifying that a
203- few simple selects work.
204- """
205- version_def = versions [0 ]
206- timestamp = datetime .utcnow ().isoformat (timespec = 'seconds' )
207- print (f"\n { timestamp } Start version: { version_def .version } " )
208- env = prepare_env (version_def .java_home )
209- cluster = self ._new_cluster (
210- version_def .version , nodes , settings = self .CLUSTER_SETTINGS , env = env )
211- paths = [node ._settings ['path.data' ] for node in cluster .nodes ()]
212- try :
213- self ._do_upgrade (cluster , nodes , paths , versions )
214- except Exception as e :
215- msg = ""
216- msg = "\n Logs\n "
217- msg += "==============\n "
218- for i , node in enumerate (cluster .nodes ()):
219- msg += f"-------------- node: { i } \n "
220- logs_path = node .logs_path
221- cluster_name = node .cluster_name
222- logfile = os .path .join (logs_path , cluster_name + ".log" )
223- with open (logfile , "r" ) as f :
224- logs = f .read ()
225- msg += logs
226- msg += "\n "
227- raise Exception (msg ).with_traceback (e .__traceback__ )
228- finally :
229- cluster_name = cluster .nodes ()[0 ].cluster_name
230- for node in cluster .nodes ():
231- logs_path = node .logs_path
232- logfile = os .path .join (logs_path , cluster_name + ".log" )
233- with open (logfile , "a" ) as f :
234- f .truncate ()
235- f .close ()
236-
237- @timeout (1800 )
238- def _do_upgrade (self ,
239- cluster : CrateCluster ,
240- nodes : int ,
241- paths : Iterable [str ],
242- versions : Tuple [VersionDef , ...]):
243- cluster .start ()
244- with connect (cluster .node ().http_url , error_trace = True ) as conn :
245- assert_busy (lambda : self .assert_nodes (conn , nodes ))
246- c = conn .cursor ()
247-
248- c .execute (CREATE_ANALYZER )
249- c .execute (CREATE_DOC_TABLE )
250- c .execute (CREATE_PARTED_TABLE )
251- c .execute (CREATE_DYNAMIC_TABLE )
252-
253- c .execute ("DROP USER IF EXISTS trillian" )
254- c .execute ("CREATE USER trillian" )
255- c .execute ("GRANT DQL ON TABLE t1 TO trillian" )
256-
257- c .execute ('''
258- INSERT INTO t1 (id, text) VALUES (0, 'Phase queue is foo!')
259- ''' )
260- insert_data (conn , 'doc' , 't1' , 10 )
261- c .execute (CREATE_BLOB_TABLE )
262- assert_busy (lambda : self .assert_green (conn , 'blob' , 'b1' ))
263- run_selects (c , versions [0 ].version )
264- container = conn .get_blob_container ('b1' )
265- digest = container .put (BytesIO (b'sample data' ))
266-
267- assert_busy (lambda : self .assert_green (conn , 'blob' , 'b1' ))
268- self .assertIsNotNone (container .get (digest ))
269-
270- accumulated_dynamic_column_names : list [str ] = []
271- self ._process_on_stop ()
272- for version_def in versions [1 :]:
273- timestamp = datetime .utcnow ().isoformat (timespec = 'seconds' )
274- print (f"{ timestamp } Upgrade to: { version_def .version } " )
275- self .assert_data_persistence (version_def , nodes , digest , paths , accumulated_dynamic_column_names )
276- # restart with latest version
277- version_def = versions [- 1 ]
278- self .assert_data_persistence (version_def , nodes , digest , paths , accumulated_dynamic_column_names )
279-
280- def assert_data_persistence (self ,
281- version_def : VersionDef ,
282- nodes : int ,
283- digest : str ,
284- paths : Iterable [str ],
285- accumulated_dynamic_column_names : list [str ]):
286- env = prepare_env (version_def .java_home )
287- version = version_def .version
288- cluster = self ._new_cluster (version , nodes , data_paths = paths , settings = self .CLUSTER_SETTINGS , env = env )
289- cluster .start ()
290- with connect (cluster .node ().http_url , error_trace = True ) as conn :
291- assert_busy (lambda : self .assert_nodes (conn , nodes ))
292- cursor = conn .cursor ()
293- wait_for_active_shards (cursor , 0 )
294- version = version_def .version .replace ("." , "_" )
295- cursor .execute (CREATE_DOC_TABLE .replace (
296- "CREATE TABLE t1 (" ,
297- f'CREATE TABLE IF NOT EXISTS versioned."t{ version } " ('
298- ))
299- cursor .execute ('ALTER TABLE doc.t1 SET ("refresh_interval" = 4000)' )
300- run_selects (cursor , version_def .version )
301- container = conn .get_blob_container ('b1' )
302- container .get (digest )
303- cursor .execute ('ALTER TABLE doc.t1 SET ("refresh_interval" = 2000)' )
304-
305- cursor .execute ("select name from sys.users order by 1" )
306- self .assertEqual (cursor .fetchall (), [["crate" ], ["trillian" ]])
307-
308- cursor .execute ("select * from sys.privileges" )
309- self .assertEqual (cursor .fetchall (), [["TABLE" , "trillian" , "crate" , "doc.t1" , "GRANT" , "DQL" ]])
310-
311- cursor .execute ("select table_name from information_schema.tables where table_schema = 'versioned'" )
312- tables = [row [0 ] for row in cursor .fetchall ()]
313- for table in tables :
314- cursor .execute (f'select * from versioned."{ table } "' )
315- cursor .execute (f'insert into versioned."{ table } " (id, col_int) values (?, ?)' , [str (uuid4 ()), 1 ])
316-
317- # to trigger `alter` stmt bug(https://github.com/crate/crate/pull/17178) that falsely updated the table's
318- # version created setting that resulted in oids instead of column names in resultsets
319- cursor .execute ('ALTER TABLE doc.dynamic SET ("refresh_interval" = 900)' )
320- cursor .execute ('INSERT INTO doc.dynamic (o) values (?)' , [{version : True }])
321- cursor .execute ('REFRESH TABLE doc.dynamic' )
322- accumulated_dynamic_column_names .append (version )
323- cursor .execute ('SELECT o FROM doc.dynamic' )
324- result = cursor .fetchall ()
325- for row in result :
326- for name in row [0 ].keys ():
327- self .assertIn (name , accumulated_dynamic_column_names )
328-
329- # older versions had a bug that caused this to fail
330- if version in ('latest-nightly' , '3.2' ):
331- # Test that partition and dynamic columns can be created
332- obj = {"t_" + version .replace ('.' , '_' ): True }
333- args = (str (uuid4 ()), version , obj )
334- cursor .execute (
335- 'INSERT INTO doc.parted (id, version, cols) values (?, ?, ?)' ,
336- args
337- )
338- self ._process_on_stop ()
339-
340- def assert_green (self , conn : Connection , schema : str , table_name : str ):
341- c = conn .cursor ()
342- c .execute ('select health from sys.health where table_name=? and table_schema=?' , (table_name , schema ))
343- response = c .fetchone ()
344- self .assertNotIsInstance (response , type (None ))
345- self .assertEqual (response [0 ], 'GREEN' )
346-
347- def assert_nodes (self , conn : Connection , num_nodes : int ):
348- c = conn .cursor ()
349- c .execute ("select count(*) from sys.nodes" )
350- self .assertEqual (c .fetchone ()[0 ], num_nodes )
351-
352-
353183class MetaDataCompatibilityTest (NodeProvider , unittest .TestCase ):
354184
355185 CLUSTER_SETTINGS = {
@@ -361,7 +191,9 @@ class MetaDataCompatibilityTest(NodeProvider, unittest.TestCase):
361191 SUPPORTED_VERSIONS = (
362192 VersionDef ('2.3.x' , []),
363193 VersionDef ('3.3.x' , []),
364- VersionDef ('latest-nightly' , [])
194+ VersionDef ('5.10.x' , []),
195+ VersionDef ('6.0.x' , []),
196+ VersionDef ('6.1.x' , []),
365197 )
366198
367199 def test_metadata_compatibility (self ):
@@ -435,192 +267,4 @@ def assert_meta_data(self, version_def, nodes, data_paths=None):
435267 ['SCHEMA' , 'user_a' , 'doc' , 'GRANT' , 'DQL' ]],
436268 cursor .fetchall ())
437269
438- self ._process_on_stop ()
439-
440-
441- class DefaultTemplateMetaDataCompatibilityTest (NodeProvider , unittest .TestCase ):
442- CLUSTER_ID = gen_id ()
443-
444- CLUSTER_SETTINGS = {
445- 'cluster.name' : CLUSTER_ID ,
446- }
447-
448- SUPPORTED_VERSIONS = (
449- VersionDef ('3.0.x' , []),
450- VersionDef ('latest-nightly' , [])
451- )
452-
453- def test_metadata_compatibility (self ):
454- nodes = 3
455-
456- cluster = self ._new_cluster (self .SUPPORTED_VERSIONS [0 ].version ,
457- nodes ,
458- settings = self .CLUSTER_SETTINGS )
459- cluster .start ()
460- with connect (cluster .node ().http_url , error_trace = True ) as conn :
461- cursor = conn .cursor ()
462- cursor .execute ("select 1" )
463- self ._process_on_stop ()
464-
465- paths = [node ._settings ['path.data' ] for node in cluster .nodes ()]
466- for version_def in self .SUPPORTED_VERSIONS [1 :]:
467- self .assert_dynamic_string_detection (version_def , nodes , paths )
468-
469- def assert_dynamic_string_detection (self , version_def , nodes , data_paths ):
470- """ Test that a dynamic string column detection works as expected.
471-
472- If the cluster was initially created/started with a lower CrateDB
473- version, we must ensure that our default template is also upgraded, if
474- needed, because it is persisted in the cluster state. That's why
475- re-creating tables would not help.
476- """
477- self ._move_nodes_folder_if_needed (data_paths )
478- cluster = self ._new_cluster (
479- version_def .version ,
480- nodes ,
481- data_paths ,
482- self .CLUSTER_SETTINGS ,
483- prepare_env (version_def .java_home ),
484- )
485- cluster .start ()
486- with connect (cluster .node ().http_url , error_trace = True ) as conn :
487- cursor = conn .cursor ()
488- cursor .execute ('CREATE TABLE t1 (o object)' )
489- cursor .execute ('''INSERT INTO t1 (o) VALUES ({"name" = 'foo'})''' )
490- self .assertEqual (cursor .rowcount , 1 )
491- cursor .execute ('REFRESH TABLE t1' )
492- cursor .execute ("SELECT o['name'], count(*) FROM t1 GROUP BY 1" )
493- rs = cursor .fetchall ()
494- self .assertEqual (['foo' , 1 ], rs [0 ])
495- cursor .execute ('DROP TABLE t1' )
496- self ._process_on_stop ()
497-
498- def _move_nodes_folder_if_needed (self , data_paths ):
499- """Eliminates the cluster-id folder inside the data directory."""
500- for path in data_paths :
501- data_path_incl_cluster_id = os .path .join (path , self .CLUSTER_ID )
502- if os .path .exists (data_path_incl_cluster_id ):
503- src_path_nodes = os .path .join (data_path_incl_cluster_id , 'nodes' )
504- target_path_nodes = os .path .join (self ._path_data , 'nodes' )
505- shutil .move (src_path_nodes , target_path_nodes )
506- shutil .rmtree (data_path_incl_cluster_id )
507-
508-
509- class SnapshotCompatibilityTest (NodeProvider , unittest .TestCase ):
510-
511- CREATE_REPOSITORY = '''
512- CREATE REPOSITORY r1 TYPE S3
513- WITH (access_key = 'minio',
514- secret_key = 'miniostorage',
515- bucket='backups',
516- endpoint = '127.0.0.1:9000',
517- protocol = 'http')
518- '''
519-
520- CREATE_SNAPSHOT_TPT = "CREATE SNAPSHOT r1.s{} ALL WITH (wait_for_completion = true)"
521-
522- RESTORE_SNAPSHOT_TPT = "RESTORE SNAPSHOT r1.s{} ALL WITH (wait_for_completion = true)"
523-
524- DROP_DOC_TABLE = 'DROP TABLE t1'
525-
526- VERSION = ('5.0.x' , 'latest-nightly' )
527-
528- def test_snapshot_compatibility (self ):
529- """Test snapshot compatibility when upgrading 5.0.x -> latest-nightly
530-
531- Using Minio as a S3 repository, the first cluster that runs
532- creates the repo, a table and inserts/selects some data, which
533- then is snapshotted and deleted. The next cluster recovers the
534- data from the last snapshot, performs further inserts/selects,
535- to then snapshot the data and delete it.
536- """
537- with MinioServer () as minio :
538- t = threading .Thread (target = minio .run )
539- t .daemon = True
540- t .start ()
541- wait_until (lambda : _is_up ('127.0.0.1' , 9000 ))
542-
543- num_nodes = 3
544- num_docs = 30
545- prev_version = None
546- num_snapshot = 1
547-
548- cluster_settings = {
549- 'cluster.name' : gen_id (),
550- }
551-
552- paths = None
553- for version in self .VERSION :
554- cluster = self ._new_cluster (version , num_nodes , paths , settings = cluster_settings )
555- paths = [node ._settings ['path.data' ] for node in cluster .nodes ()]
556- cluster .start ()
557- with connect (cluster .node ().http_url , error_trace = True ) as conn :
558- c = conn .cursor ()
559- if not prev_version :
560- c .execute (self .CREATE_REPOSITORY )
561- c .execute (CREATE_ANALYZER )
562- c .execute (CREATE_DOC_TABLE )
563- insert_data (conn , 'doc' , 't1' , num_docs )
564- else :
565- c .execute (self .RESTORE_SNAPSHOT_TPT .format (num_snapshot - 1 ))
566- c .execute ('SELECT COUNT(*) FROM t1' )
567- rowcount = c .fetchone ()[0 ]
568- self .assertEqual (rowcount , num_docs )
569- run_selects (c , version )
570- c .execute (self .CREATE_SNAPSHOT_TPT .format (num_snapshot ))
571- c .execute (self .DROP_DOC_TABLE )
572- self ._process_on_stop ()
573- prev_version = version
574- num_snapshot += 1
575-
576-
577- class PreOidsFetchValueTest (NodeProvider , unittest .TestCase ):
578-
579- def test_pre_oid_references (self ):
580- cluster = self ._new_cluster ('5.4.x' , 3 )
581- cluster .start ()
582-
583- with connect (cluster .node ().http_url , error_trace = True ) as conn :
584- c = conn .cursor ()
585- c .execute ("create table tbl (a text, b text) partitioned by (a)" )
586- c .execute ("insert into tbl (a, b) values ('foo1', 'bar1')" )
587-
588- for idx , node in enumerate (cluster ):
589- new_node = self .upgrade_node (node , '5.8.5' )
590- cluster [idx ] = new_node
591-
592- with connect (cluster .node ().http_url , error_trace = True ) as conn :
593- c = conn .cursor ()
594- c .execute ("alter table tbl add column c text" )
595- c .execute ("insert into tbl (a, b, c) values ('foo1', 'bar2', 'baz2')" )
596- c .execute ("insert into tbl (a, b, c) values ('foo2', 'bar1', 'baz1')" )
597-
598- for idx , node in enumerate (cluster ):
599- new_node = self .upgrade_node (node , '5.9.x' )
600- cluster [idx ] = new_node
601-
602- with connect (cluster .node ().http_url , error_trace = True ) as conn :
603- c = conn .cursor ()
604- c .execute ("insert into tbl (a, b, c) values ('foo1', 'bar3', 'baz3')" )
605- c .execute ("insert into tbl (a, b, c) values ('foo2', 'bar2', 'baz2')" )
606- c .execute ("insert into tbl (a, b, c) values ('foo3', 'bar1', 'baz1')" )
607-
608- for idx , node in enumerate (cluster ):
609- new_node = self .upgrade_node (node , '5.10' )
610- cluster [idx ] = new_node
611-
612- with connect (cluster .node ().http_url , error_trace = True ) as conn :
613- c = conn .cursor ()
614- c .execute ("insert into tbl (a, b, c) values ('foo1', 'bar4', 'baz4')" )
615- c .execute ("insert into tbl (a, b, c) values ('foo2', 'bar3', 'baz3')" )
616- c .execute ("insert into tbl (a, b, c) values ('foo3', 'bar2', 'baz2')" )
617- c .execute ("insert into tbl (a, b, c) values ('foo4', 'bar1', 'baz1')" )
618-
619- c .execute ("refresh table tbl" )
620-
621- # LIMIT 10 forces the engine to go via _doc, which triggers the bug
622- # fixed by https://github.com/crate/crate/pull/17819
623- c .execute ("select b from tbl limit 10" )
624- result = c .fetchall ()
625- for row in result :
626- self .assertIsNotNone (row [0 ])
270+ self ._process_on_stop ()
0 commit comments