koaning commited on
Commit
34548a0
·
verified ·
1 Parent(s): f85e02e

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +39 -3
app.py CHANGED
@@ -66,12 +66,47 @@ def _(df_taxi):
66
  return
67
 
68
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
69
  @app.cell
70
  def _(IdentityTransform, PartitionField, PartitionSpec):
71
  spec = PartitionSpec(
72
  PartitionField(source_id=3, field_id=1000, name="passenger_count", transform=IdentityTransform())
73
  )
74
- return
75
 
76
 
77
  @app.cell
@@ -81,12 +116,13 @@ def _(df_taxi):
81
 
82
 
83
  @app.cell
84
- def _(catalog, df_taxi):
85
  catalog.create_namespace_if_not_exists("default")
86
 
87
  table = catalog.create_table_if_not_exists(
88
  "default.taxi",
89
  schema=df_taxi.schema,
 
90
  )
91
  return (table,)
92
 
@@ -142,7 +178,7 @@ def _(mo):
142
  r"""
143
  That's a bunch slower!
144
 
145
- A part of the reason is that iceberg had partitions in it, which is great, but the comparison with `read_csv` is a bit unfair. Let's convert the `.csv` file to `.parquet` and also add a partition in polars with statistics. You will now see that we get a similar performance.
146
  """
147
  )
148
  return
 
66
  return
67
 
68
 
69
+ @app.cell(hide_code=True)
70
+ def _(mo):
71
+ mo.md(r"""Let's now take this pyarrow dataframe and prepare it for insertion. We want to extract the right schema and also add a partition.""")
72
+ return
73
+
74
+
75
+ @app.cell
76
+ def _(df_taxi):
77
+ import pyarrow as pa
78
+ from pyiceberg.schema import Schema
79
+ from pyiceberg.types import (
80
+ NestedField, IntegerType, StringType, DoubleType, TimestampType
81
+ )
82
+ from pyiceberg.table.name_mapping import NameMapping, MappedField
83
+ from pyiceberg.io.pyarrow import pyarrow_to_schema
84
+
85
+ # Create a mapping from column names to field IDs
86
+ name_mapping_fields = []
87
+ for idx, field_name in enumerate(df_taxi.column_names, start=1):
88
+ name_mapping_fields.append(MappedField(field_id=idx, names=[field_name]))
89
+
90
+ # Create a name mapping
91
+ name_mapping = NameMapping(name_mapping_fields)
92
+
93
+ # Convert PyArrow schema to Iceberg schema
94
+ iceberg_schema = pyarrow_to_schema(df_taxi.schema, name_mapping)
95
+
96
+ # Now find the field ID for 'passenger_count'
97
+ passenger_count_field = iceberg_schema.find_field("passenger_count")
98
+ source_id = passenger_count_field.field_id
99
+
100
+ print(f"The source_id for 'passenger_count' is: {source_id}")
101
+ return
102
+
103
+
104
  @app.cell
105
  def _(IdentityTransform, PartitionField, PartitionSpec):
106
  spec = PartitionSpec(
107
  PartitionField(source_id=3, field_id=1000, name="passenger_count", transform=IdentityTransform())
108
  )
109
+ return (spec,)
110
 
111
 
112
  @app.cell
 
116
 
117
 
118
  @app.cell
119
+ def _(catalog, df_taxi, spec):
120
  catalog.create_namespace_if_not_exists("default")
121
 
122
  table = catalog.create_table_if_not_exists(
123
  "default.taxi",
124
  schema=df_taxi.schema,
125
+ partition_spec=spec
126
  )
127
  return (table,)
128
 
 
178
  r"""
179
  That's a bunch slower!
180
 
181
+ A part of the reason is that iceberg had partitions in it, which is great, but the comparison with `read_csv` is a bit unfair. Let's convert the `.csv` file to `.parquet` and also add a partition in polars with statistics. You will now see that we get a similar performance.
182
  """
183
  )
184
  return