Skip to content

Commit

Permalink
Fixes:
Browse files Browse the repository at this point in the history
- DateTime -> Instant
- Append -> append()
- aj uses >= instead of = now
  • Loading branch information
jjbrosnan committed Aug 2, 2023
1 parent deaf859 commit e77b993
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 38 deletions.
36 changes: 18 additions & 18 deletions data/app.d/tables.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,33 +13,33 @@
('DayVolume', dht.int_),
('DayTurnover', dht.int_),
('Direction', dht.int_),
('Timestamp', dht.DateTime),
('Timestamp', dht.Instant),
('RawFlags', dht.int_),
('IsETH', dht.int_),
('Scope', dht.int_)
]),table_type = TableType.Append).sort_descending(["KafkaOffset"])
]),table_type = TableType.append()).sort_descending(["KafkaOffset"])

quotes = ck.consume({'bootstrap.servers': 'redpanda:29092'} , 'Quote', key_spec=KeyValueSpec.IGNORE, value_spec=ck.json_spec([
('Symbol', dht.string),
('Sequence', dht.int_),
('Timestamp', dht.DateTime),
('BidTime', dht.DateTime),
('Timestamp', dht.Instant),
('BidTime', dht.Instant),
('BidExchangeCode', dht.string),
('BidPrice', dht.double),
('BidSize', dht.int_),
('AskTime', dht.DateTime),
('AskTime', dht.Instant),
('AskExchangeCode', dht.string),
('AskPrice', dht.double),
('AskSize', dht.int_),
('Scope', dht.int_)
]),table_type = TableType.Append).sort_descending(["KafkaOffset"])
]),table_type = TableType.append()).sort_descending(["KafkaOffset"])


candle = ck.consume({'bootstrap.servers': 'redpanda:29092'} , 'Candle', key_spec=KeyValueSpec.IGNORE, value_spec=ck.json_spec([
('Symbol', dht.string),
('EventFlags', dht.int_),
('Index', dht.int64),
('Timestamp', dht.DateTime),
('Timestamp', dht.Instant),
('Sequence', dht.int_),
('Count', dht.int_),
('Open', dht.double),
Expand Down Expand Up @@ -69,14 +69,14 @@
('FreeFloat', dht.string),
('HighLimitPrice', dht.double),
('LowLimitPrice', dht.double),
('HaltStartTime', dht.DateTime),
('HaltEndTime', dht.DateTime),
('HaltStartTime', dht.Instant),
('HaltEndTime', dht.Instant),
('Description', dht.string),
('RawFlags', dht.int_),
('StatusReason', dht.string),
('TradingStatus', dht.int_),
('ShortSaleRestriction', dht.int_)
]),table_type = TableType.Append).sort_descending(["KafkaOffset"])
]),table_type = TableType.append()).sort_descending(["KafkaOffset"])


summary = ck.consume({'bootstrap.servers': 'redpanda:29092'} , 'Summary', key_spec=KeyValueSpec.IGNORE, value_spec=ck.json_spec([
Expand All @@ -95,13 +95,13 @@
('DayClosePriceType', dht.int_),
('PrevDayClosePriceType', dht.int_),
('Scope', dht.int_)
]),table_type = TableType.Append).sort_descending(["KafkaOffset"])
]),table_type = TableType.append()).sort_descending(["KafkaOffset"])

order = ck.consume({'bootstrap.servers': 'redpanda:29092'} , 'Order', key_spec=KeyValueSpec.IGNORE, value_spec=ck.json_spec([
('Symbol', dht.string),
('EventFlags', dht.int_),
('Index', dht.int64),
('Timestamp', dht.DateTime),
('Timestamp', dht.Instant),
('Sequence', dht.int_),
('Price', dht.double),
('Size', dht.int_),
Expand All @@ -112,21 +112,21 @@
('Source', dht.string),
('MarketMaker', dht.string),
('SpreadSymbol', dht.string)
]),table_type = TableType.Append).sort_descending(["KafkaOffset"])
]),table_type = TableType.append()).sort_descending(["KafkaOffset"])

underlying = ck.consume({'bootstrap.servers': 'redpanda:29092'} , 'Underlying', key_spec=KeyValueSpec.IGNORE, value_spec=ck.json_spec([
('Symbol', dht.string),
('Volatility', dht.double),
('FrontVolatility', dht.double),
('BackVolatility', dht.double),
('PutCallRatio', dht.double)
]),table_type = TableType.Append).sort_descending(["KafkaOffset"])
]),table_type = TableType.append()).sort_descending(["KafkaOffset"])

timeAndSale = ck.consume({'bootstrap.servers': 'redpanda:29092'} , 'TimeAndSale', key_spec=KeyValueSpec.IGNORE, value_spec=ck.json_spec([
('Symbol', dht.string),
('EventFlags', dht.int_),
('Index', dht.int64),
('Timestamp', dht.DateTime),
('Timestamp', dht.Instant),
('ExchangeCode', dht.string),
('Price', dht.double),
('Size', dht.double),
Expand All @@ -143,22 +143,22 @@
('TradeThroughExempt', dht.int_),
('IsSpreadLeg', dht.int_),
('Scope', dht.int_)
]),table_type = TableType.Append).sort_descending(["KafkaOffset"])
]),table_type = TableType.append()).sort_descending(["KafkaOffset"])


series = ck.consume({'bootstrap.servers': 'redpanda:29092'} , 'Series', key_spec=KeyValueSpec.IGNORE, value_spec=ck.json_spec([
('Symbol', dht.string),
('EventFlags', dht.int_),
('Index', dht.int64),
('Timestamp', dht.DateTime),
('Timestamp', dht.Instant),
('Sequence', dht.int_),
('Expiration', dht.int_),
('Volatility', dht.double),
('PutCallRatio', dht.double),
('ForwardPrice', dht.double),
('Dividend', dht.double),
('Interest', dht.double)
]),table_type = TableType.Append).sort_descending(["KafkaOffset"])
]),table_type = TableType.append()).sort_descending(["KafkaOffset"])

from deephaven import new_table
from deephaven.column import string_col
Expand Down
2 changes: 1 addition & 1 deletion data/storage/notebooks/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,4 @@
aggs = quotes.agg_by(agg_list,\
["Symbol", "BidExchangeCode", "BidTime", "AskTime"])

relatedQuotes = trades.aj(quotes, ["Symbol, Timestamp = BidTime"], ["BidTime, BidPrice, BidSize"])
related_quotes = trades.aj(quotes, ["Symbol, Timestamp = BidTime"], ["BidTime, BidPrice, BidSize"])
38 changes: 19 additions & 19 deletions data/storage/notebooks/tables.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,33 +13,33 @@
('DayVolume', dht.int_),
('DayTurnover', dht.int_),
('Direction', dht.int_),
('Timestamp', dht.DateTime),
('Timestamp', dht.Instant),
('RawFlags', dht.int_),
('IsETH', dht.int_),
('Scope', dht.int_)
]),table_type = TableType.Append).sort_descending(["KafkaOffset"])
]),table_type = TableType.append()).sort_descending(["KafkaOffset"])

quotes = ck.consume({'bootstrap.servers': 'redpanda:29092'} , 'Quote', key_spec=KeyValueSpec.IGNORE, value_spec=ck.json_spec([
('Symbol', dht.string),
('Sequence', dht.int_),
('Timestamp', dht.DateTime),
('BidTime', dht.DateTime),
('Timestamp', dht.Instant),
('BidTime', dht.Instant),
('BidExchangeCode', dht.string),
('BidPrice', dht.double),
('BidSize', dht.int_),
('AskTime', dht.DateTime),
('AskTime', dht.Instant),
('AskExchangeCode', dht.string),
('AskPrice', dht.double),
('AskSize', dht.int_),
('Scope', dht.int_)
]),table_type = TableType.Append).sort_descending(["KafkaOffset"])
]),table_type = TableType.append()).sort_descending(["KafkaOffset"])


candle = ck.consume({'bootstrap.servers': 'redpanda:29092'} , 'Candle', key_spec=KeyValueSpec.IGNORE, value_spec=ck.json_spec([
('Symbol', dht.string),
('EventFlags', dht.int_),
('Index', dht.int64),
('Timestamp', dht.DateTime),
('Timestamp', dht.Instant),
('Sequence', dht.int_),
('Count', dht.int_),
('Open', dht.double),
Expand All @@ -52,7 +52,7 @@
('AskVolume', dht.int_),
('OpenInterest', dht.string),
('ImpVolatility', dht.string)
]),table_type = TableType.Append).sort_descending(["KafkaOffset"])
]),table_type = TableType.append()).sort_descending(["KafkaOffset"])



Expand All @@ -69,14 +69,14 @@
('FreeFloat', dht.string),
('HighLimitPrice', dht.double),
('LowLimitPrice', dht.double),
('HaltStartTime', dht.DateTime),
('HaltEndTime', dht.DateTime),
('HaltStartTime', dht.Instant),
('HaltEndTime', dht.Instant),
('Description', dht.string),
('RawFlags', dht.int_),
('StatusReason', dht.string),
('TradingStatus', dht.int_),
('ShortSaleRestriction', dht.int_)
]),table_type = TableType.Append).sort_descending(["KafkaOffset"])
]),table_type = TableType.append()).sort_descending(["KafkaOffset"])


summary = ck.consume({'bootstrap.servers': 'redpanda:29092'} , 'Summary', key_spec=KeyValueSpec.IGNORE, value_spec=ck.json_spec([
Expand All @@ -95,13 +95,13 @@
('DayClosePriceType', dht.int_),
('PrevDayClosePriceType', dht.int_),
('Scope', dht.int_)
]),table_type = TableType.Append).sort_descending(["KafkaOffset"])
]),table_type = TableType.append()).sort_descending(["KafkaOffset"])

order = ck.consume({'bootstrap.servers': 'redpanda:29092'} , 'Order', key_spec=KeyValueSpec.IGNORE, value_spec=ck.json_spec([
('Symbol', dht.string),
('EventFlags', dht.int_),
('Index', dht.int64),
('Timestamp', dht.DateTime),
('Timestamp', dht.Instant),
('Sequence', dht.int_),
('Price', dht.double),
('Size', dht.int_),
Expand All @@ -112,21 +112,21 @@
('Source', dht.string),
('MarketMaker', dht.string),
('SpreadSymbol', dht.string)
]),table_type = TableType.Append).sort_descending(["KafkaOffset"])
]),table_type = TableType.append()).sort_descending(["KafkaOffset"])

underlying = ck.consume({'bootstrap.servers': 'redpanda:29092'} , 'Underlying', key_spec=KeyValueSpec.IGNORE, value_spec=ck.json_spec([
('Symbol', dht.string),
('Volatility', dht.double),
('FrontVolatility', dht.double),
('BackVolatility', dht.double),
('PutCallRatio', dht.double)
]),table_type = TableType.Append).sort_descending(["KafkaOffset"])
]),table_type = TableType.append()).sort_descending(["KafkaOffset"])

timeAndSale = ck.consume({'bootstrap.servers': 'redpanda:29092'} , 'TimeAndSale', key_spec=KeyValueSpec.IGNORE, value_spec=ck.json_spec([
('Symbol', dht.string),
('EventFlags', dht.int_),
('Index', dht.int64),
('Timestamp', dht.DateTime),
('Timestamp', dht.Instant),
('ExchangeCode', dht.string),
('Price', dht.double),
('Size', dht.double),
Expand All @@ -143,22 +143,22 @@
('TradeThroughExempt', dht.int_),
('IsSpreadLeg', dht.int_),
('Scope', dht.int_)
]),table_type = TableType.Append).sort_descending(["KafkaOffset"])
]),table_type = TableType.append()).sort_descending(["KafkaOffset"])


series = ck.consume({'bootstrap.servers': 'redpanda:29092'} , 'Series', key_spec=KeyValueSpec.IGNORE, value_spec=ck.json_spec([
('Symbol', dht.string),
('EventFlags', dht.int_),
('Index', dht.int64),
('Timestamp', dht.DateTime),
('Timestamp', dht.Instant),
('Sequence', dht.int_),
('Expiration', dht.int_),
('Volatility', dht.double),
('PutCallRatio', dht.double),
('ForwardPrice', dht.double),
('Dividend', dht.double),
('Interest', dht.double)
]),table_type = TableType.Append).sort_descending(["KafkaOffset"])
]),table_type = TableType.append()).sort_descending(["KafkaOffset"])

from deephaven import new_table
from deephaven.column import string_col
Expand Down

0 comments on commit e77b993

Please sign in to comment.