@@ -29,7 +29,7 @@ use futures::StreamExt;
2929use futures:: future:: try_join_all;
3030use iceberg:: arrow:: arrow_schema_to_schema_auto_assign_ids;
3131use iceberg:: inspect:: MetadataTableType ;
32- use iceberg:: { Catalog , Error , ErrorKind , NamespaceIdent , Result , TableCreation } ;
32+ use iceberg:: { Catalog , Error , ErrorKind , NamespaceIdent , Result , TableCreation , TableIdent } ;
3333
3434use crate :: table:: IcebergTableProvider ;
3535use crate :: to_datafusion_error;
@@ -211,6 +211,42 @@ impl SchemaProvider for IcebergSchemaProvider {
211211 DataFusionError :: Execution ( format ! ( "Failed to create Iceberg table: {e}" ) )
212212 } ) ?
213213 }
214+
215+ fn deregister_table ( & self , name : & str ) -> DFResult < Option < Arc < dyn TableProvider > > > {
216+ // Check if table exists
217+ if !self . table_exist ( name) {
218+ return Ok ( None ) ;
219+ }
220+
221+ let catalog = self . catalog . clone ( ) ;
222+ let namespace = self . namespace . clone ( ) ;
223+ let tables = self . tables . clone ( ) ;
224+ let table_name = name. to_string ( ) ;
225+
226+ // Use tokio's spawn_blocking to handle the async work on a blocking thread pool
227+ let result = tokio:: task:: spawn_blocking ( move || {
228+ let rt = tokio:: runtime:: Handle :: current ( ) ;
229+ rt. block_on ( async move {
230+ let table_ident = TableIdent :: new ( namespace, table_name. clone ( ) ) ;
231+
232+ // Drop the table from the Iceberg catalog
233+ catalog
234+ . drop_table ( & table_ident)
235+ . await
236+ . map_err ( to_datafusion_error) ?;
237+
238+ // Remove from local cache and return the removed provider
239+ let removed = tables
240+ . remove ( & table_name)
241+ . map ( |( _, table) | table as Arc < dyn TableProvider > ) ;
242+
243+ Ok ( removed)
244+ } )
245+ } ) ;
246+
247+ futures:: executor:: block_on ( result)
248+ . map_err ( |e| DataFusionError :: Execution ( format ! ( "Failed to drop Iceberg table: {e}" ) ) ) ?
249+ }
214250}
215251
216252/// Verifies that a table provider contains no data by scanning with LIMIT 1.
@@ -368,4 +404,42 @@ mod tests {
368404 "Expected error about table already existing, got: {err}" ,
369405 ) ;
370406 }
407+
408+ #[ tokio:: test]
409+ async fn test_deregister_table_succeeds ( ) {
410+ let ( schema_provider, _temp_dir) = create_test_schema_provider ( ) . await ;
411+
412+ // Create and register an empty table
413+ let arrow_schema = Arc :: new ( ArrowSchema :: new ( vec ! [ Field :: new(
414+ "id" ,
415+ DataType :: Int32 ,
416+ false ,
417+ ) ] ) ) ;
418+
419+ let empty_batch = RecordBatch :: new_empty ( arrow_schema. clone ( ) ) ;
420+ let mem_table = MemTable :: try_new ( arrow_schema, vec ! [ vec![ empty_batch] ] ) . unwrap ( ) ;
421+
422+ // Register the table
423+ let result = schema_provider. register_table ( "drop_me" . to_string ( ) , Arc :: new ( mem_table) ) ;
424+ assert ! ( result. is_ok( ) ) ;
425+ assert ! ( schema_provider. table_exist( "drop_me" ) ) ;
426+
427+ // Deregister the table
428+ let result = schema_provider. deregister_table ( "drop_me" ) ;
429+ assert ! ( result. is_ok( ) ) ;
430+ assert ! ( result. unwrap( ) . is_some( ) ) ;
431+
432+ // Verify the table no longer exists
433+ assert ! ( !schema_provider. table_exist( "drop_me" ) ) ;
434+ }
435+
436+ #[ tokio:: test]
437+ async fn test_deregister_nonexistent_table_returns_none ( ) {
438+ let ( schema_provider, _temp_dir) = create_test_schema_provider ( ) . await ;
439+
440+ // Attempt to deregister a table that doesn't exist
441+ let result = schema_provider. deregister_table ( "nonexistent" ) ;
442+ assert ! ( result. is_ok( ) ) ;
443+ assert ! ( result. unwrap( ) . is_none( ) ) ;
444+ }
371445}
0 commit comments